Skip to content

Commit d4235b1

Browse files
resolve review comments
1 parent 6457b26 commit d4235b1

18 files changed

Lines changed: 255 additions & 296 deletions

File tree

dgraph/cmd/alpha/run_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1640,14 +1640,14 @@ func TestMain(m *testing.M) {
16401640

16411641
x.Panic(dg.Login(context.Background(), dgraphapi.DefaultUser, dgraphapi.DefaultPassword))
16421642

1643-
alphaGrpcPort, err := c.GetAlphaGrpcPublicPort()
1643+
alphaGrpcPort, err := c.GetAlphaGrpcPublicPort(0)
16441644
x.Panic(err)
16451645

16461646
alphaSockAdd = "0.0.0.0:" + alphaGrpcPort
16471647
alphaSockAddHttp, err := c.GetAlphaHttpPublicPort(0)
16481648
x.Panic(err)
16491649
addr = "http://0.0.0.0:" + alphaSockAddHttp
1650-
zeroSockAdd, err := c.GetZeroGrpcPublicPort()
1650+
zeroSockAdd, err := c.GetZeroGrpcPublicPort(0)
16511651
x.Panic(err)
16521652

16531653
// Increment lease, so that mutations work.

dgraph/cmd/bulk/loader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type options struct {
4343
DataFormat string
4444
SchemaFile string
4545
GqlSchemaFile string
46-
OutDir string
46+
outDirNoMount string
4747
ReplaceOutDir bool
4848
TmpDir string
4949
NumGoroutines int

dgraph/cmd/bulk/run.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func run() {
141141
GqlSchemaFile: Bulk.Conf.GetString("graphql_schema"),
142142
Encrypted: Bulk.Conf.GetBool("encrypted"),
143143
EncryptedOut: Bulk.Conf.GetBool("encrypted_out"),
144-
OutDir: Bulk.Conf.GetString("out"),
144+
outDirNoMount: Bulk.Conf.GetString("out"),
145145
ReplaceOutDir: Bulk.Conf.GetBool("replace_out"),
146146
TmpDir: Bulk.Conf.GetString("tmp"),
147147
NumGoroutines: Bulk.Conf.GetInt("num_go_routines"),
@@ -259,8 +259,8 @@ func run() {
259259

260260
// Make sure it's OK to create or replace the directory specified with the --out option.
261261
// It is always OK to create or replace the default output directory.
262-
if opt.OutDir != defaultOutDir && !opt.ReplaceOutDir {
263-
err := x.IsMissingOrEmptyDir(opt.OutDir)
262+
if opt.outDirNoMount != defaultOutDir && !opt.ReplaceOutDir {
263+
err := x.IsMissingOrEmptyDir(opt.outDirNoMount)
264264
if err == nil {
265265
fmt.Fprintf(os.Stderr, "Output directory exists and is not empty."+
266266
" Use --replace_out to overwrite it.\n")
@@ -271,9 +271,9 @@ func run() {
271271
}
272272

273273
// Delete and recreate the output dirs to ensure they are empty.
274-
x.Check(os.RemoveAll(opt.OutDir))
274+
x.Check(os.RemoveAll(opt.outDirNoMount))
275275
for i := range opt.ReduceShards {
276-
dir := filepath.Join(opt.OutDir, strconv.Itoa(i), "p")
276+
dir := filepath.Join(opt.outDirNoMount, strconv.Itoa(i), "p")
277277
x.Check(os.MkdirAll(dir, 0700))
278278
opt.shardOutputDirs = append(opt.shardOutputDirs, dir)
279279

dgraph/cmd/dgraphimport/import_client.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,35 @@ import (
2020
"google.golang.org/grpc"
2121
)
2222

23-
// Client represents a Dgraph import client that handles snapshot streaming.
24-
type Client struct {
25-
opts grpc.DialOption
26-
dg apiv25.DgraphClient
27-
}
28-
2923
// NewClient creates a new import client with the specified endpoint and gRPC options.
30-
func NewClient(endpoint string, opts grpc.DialOption) (*Client, error) {
24+
func newClient(endpoint string, opts grpc.DialOption) (apiv25.DgraphClient, error) {
3125
conn, err := grpc.NewClient(endpoint, opts)
3226
if err != nil {
3327
return nil, fmt.Errorf("failed to connect to endpoint [%s]: %w", endpoint, err)
3428
}
3529

3630
glog.Infof("Successfully connected to Dgraph endpoint: %s", endpoint)
37-
return &Client{dg: apiv25.NewDgraphClient(conn), opts: opts}, nil
31+
return apiv25.NewDgraphClient(conn), nil
32+
}
33+
34+
func Import(ctx context.Context, endpoint string, opts grpc.DialOption, outDir string) error {
35+
dg, err := newClient(endpoint, opts)
36+
if err != nil {
37+
return err
38+
}
39+
resp, err := startSnapshotStream(ctx, dg)
40+
if err != nil {
41+
return err
42+
}
43+
44+
return sendSnapshot(ctx, dg, outDir, resp.Groups)
3845
}
3946

4047
// StartSnapshotStream initiates a snapshot stream session with the Dgraph server.
41-
func (c *Client) StartSnapshotStream(ctx context.Context) (*apiv25.InitiateSnapshotStreamResponse, error) {
48+
func startSnapshotStream(ctx context.Context, dg apiv25.DgraphClient) (*apiv25.InitiateSnapshotStreamResponse, error) {
4249
glog.V(2).Infof("Initiating snapshot stream")
4350
req := &apiv25.InitiateSnapshotStreamRequest{}
44-
resp, err := c.dg.InitiateSnapshotStream(ctx, req)
51+
resp, err := dg.InitiateSnapshotStream(ctx, req)
4552
if err != nil {
4653
glog.Errorf("Failed to initiate snapshot stream: %v", err)
4754
return nil, err
@@ -53,7 +60,7 @@ func (c *Client) StartSnapshotStream(ctx context.Context) (*apiv25.InitiateSnaps
5360
// SendSnapshot takes a p directory and a set of group IDs and streams the data from the
5461
// p directory to the corresponding group IDs. The function will skip any groups that do not
5562
// have a corresponding p directory.
56-
func (c *Client) SendSnapshot(ctx context.Context, pDir string, groups []uint32) error {
63+
func sendSnapshot(ctx context.Context, dg apiv25.DgraphClient, pDir string, groups []uint32) error {
5764
glog.Infof("Starting to stream snapshots from directory: %s", pDir)
5865

5966
// Get mapping of group IDs to their respective p directories
@@ -77,7 +84,7 @@ func (c *Client) SendSnapshot(ctx context.Context, pDir string, groups []uint32)
7784
}
7885

7986
glog.Infof("Streaming data for group %d from directory: %s", group, pDir)
80-
err = streamData(ctx, c.dg, pDir, group)
87+
err = streamData(ctx, dg, pDir, group)
8188
if err != nil {
8289
glog.Errorf("Failed to stream snapshot for group %d: %v", group, err)
8390
return err

dgraph/cmd/dgraphimport/import_test.go

Lines changed: 16 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -117,13 +117,14 @@ func TestStartSnapshotStreamForSingleNode(t *testing.T) {
117117
require.NoError(t, gc.Login(context.Background(),
118118
dgraphapi.DefaultUser, dgraphapi.DefaultPassword))
119119

120-
pubPort, err := c.GetAlphaGrpcPublicPort()
120+
url, err := c.GetAlphaGrpcEndpoint(0)
121+
require.NoError(t, err)
122+
dg, err := newClient(url, grpc.WithTransportCredentials(insecure.NewCredentials()))
121123
require.NoError(t, err)
122-
client, err := NewClient(dgraphtest.GetLocalHostUrl(pubPort, ""), grpc.WithTransportCredentials(insecure.NewCredentials()))
123124

124125
require.NoError(t, err)
125126

126-
resp, err := client.StartSnapshotStream(context.Background())
127+
resp, err := startSnapshotStream(context.Background(), dg)
127128
require.NoError(t, err)
128129

129130
require.Equal(t, len(resp.Groups), 1)
@@ -141,12 +142,12 @@ func TestStartSnapshotStreamForHA(t *testing.T) {
141142
defer func() { c.Cleanup(t.Failed()) }()
142143
require.NoError(t, c.Start())
143144

144-
pubPort, err := c.GetAlphaGrpcPublicPort()
145+
url, err := c.GetAlphaGrpcEndpoint(0)
145146
require.NoError(t, err)
146-
client, err := NewClient(dgraphtest.GetLocalHostUrl(pubPort, ""), grpc.WithTransportCredentials(insecure.NewCredentials()))
147+
dg, err := newClient(url, grpc.WithTransportCredentials(insecure.NewCredentials()))
147148
require.NoError(t, err)
148149

149-
resp, err := client.StartSnapshotStream(context.Background())
150+
resp, err := startSnapshotStream(context.Background(), dg)
150151
require.NoError(t, err)
151152

152153
require.Equal(t, len(resp.Groups), 1)
@@ -169,13 +170,12 @@ func TestStartSnapshotStreamForHASharded(t *testing.T) {
169170
defer func() { c.Cleanup(t.Failed()) }()
170171
require.NoError(t, c.Start())
171172

172-
pubPort, err := c.GetAlphaGrpcPublicPort()
173+
url, err := c.GetAlphaGrpcEndpoint(0)
173174
require.NoError(t, err)
174-
client, err := NewClient(dgraphtest.GetLocalHostUrl(pubPort, ""),
175-
grpc.WithTransportCredentials(insecure.NewCredentials()))
175+
dg, err := newClient(url, grpc.WithTransportCredentials(insecure.NewCredentials()))
176176
require.NoError(t, err)
177177

178-
resp, err := client.StartSnapshotStream(context.Background())
178+
resp, err := startSnapshotStream(context.Background(), dg)
179179
require.NoError(t, err)
180180
require.Equal(t, len(resp.Groups), 2)
181181

@@ -217,18 +217,11 @@ func runImportTest(t *testing.T, bulkAlphas, targetAlphas, replicasFactor int) {
217217
defer func() { targetCluster.Cleanup(t.Failed()) }()
218218
defer gcCleanup()
219219

220-
importClient, err := createImportClient(t, targetCluster)
221-
require.NoError(t, err)
222-
223-
resp, err := importClient.StartSnapshotStream(context.Background())
220+
url, err := targetCluster.GetAlphaGrpcEndpoint(0)
224221
require.NoError(t, err)
225-
226-
_, err = gc.Query("schema{}")
227-
require.Error(t, err)
228-
require.ErrorContains(t, err, "the server is in draining mode")
229-
230222
outDir := filepath.Join(baseDir, "out")
231-
err = importClient.SendSnapshot(context.Background(), outDir, resp.Groups)
223+
224+
err = Import(context.Background(), url, grpc.WithTransportCredentials(insecure.NewCredentials()), outDir)
232225
require.NoError(t, err)
233226

234227
verifyImportResults(t, gc)
@@ -246,17 +239,13 @@ func setupBulkCluster(t *testing.T, numAlphas int) (*dgraphtest.LocalCluster, st
246239
cluster, err := dgraphtest.NewLocalCluster(bulkConf)
247240
require.NoError(t, err)
248241

249-
// Download and prepare data files
250-
dataPath, err := dgraphtest.DownloadDataFiles()
251-
require.NoError(t, err)
252242
require.NoError(t, cluster.StartZero(0))
253243

254244
// Perform bulk load
255-
schemaFile := filepath.Join(dataPath, "1million.schema")
256-
rdfFile := filepath.Join(dataPath, "1million.rdf.gz")
245+
oneMillion := dgraphtest.GetDataset(dgraphtest.OneMillionDataset)
257246
opts := dgraphtest.BulkOpts{
258-
DataFiles: []string{rdfFile},
259-
SchemaFiles: []string{schemaFile},
247+
DataFiles: []string{oneMillion.DataFilePath()},
248+
SchemaFiles: []string{oneMillion.SchemaPath()},
260249
OutDir: filepath.Join(baseDir, "out"),
261250
}
262251
require.NoError(t, cluster.BulkLoad(opts))
@@ -282,15 +271,6 @@ func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (*dgraphtes
282271
return cluster, gc, cleanup
283272
}
284273

285-
// createImportClient creates a new client for import operations
286-
func createImportClient(t *testing.T, cluster *dgraphtest.LocalCluster) (*Client, error) {
287-
pubPort, err := cluster.GetAlphaGrpcPublicPort()
288-
require.NoError(t, err)
289-
290-
return NewClient(dgraphtest.GetLocalHostUrl(pubPort, ""),
291-
grpc.WithTransportCredentials(insecure.NewCredentials()))
292-
}
293-
294274
// verifyImportResults validates the result of an import operation
295275
func verifyImportResults(t *testing.T, gc *dgraphapi.GrpcClient) {
296276
// Check schema after streaming process

dgraph/cmd/zero/zero.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -860,18 +860,3 @@ func (s *Server) latestMembershipState(ctx context.Context) (*pb.MembershipState
860860
}
861861
return ms, nil
862862
}
863-
864-
func (s *Server) ApplyDrainmode(ctx context.Context, req *pb.Drainmode) (*pb.Status, error) {
865-
knownGroups := s.KnownGroups()
866-
867-
for _, grp := range knownGroups {
868-
pl := s.Leader(grp)
869-
wc := pb.NewWorkerClient(pl.Get())
870-
in := &pb.Drainmode{State: req.State}
871-
872-
if status, err := wc.ApplyDrainmode(ctx, in); err != nil {
873-
return status, errors.Wrapf(err, "while applying drainmode")
874-
}
875-
}
876-
return nil, nil
877-
}

dgraphapi/cluster.go

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@ import (
1717
"strings"
1818
"time"
1919

20-
"github.com/hypermodeinc/dgraph/v25/protos/pb"
2120
"github.com/pkg/errors"
22-
"google.golang.org/protobuf/encoding/protojson"
2321

2422
"github.com/dgraph-io/dgo/v250"
2523
"github.com/dgraph-io/dgo/v250/protos/api"
@@ -650,27 +648,6 @@ func (hc *HTTPClient) GetZeroState() (*LicenseResponse, error) {
650648
return &stateResponse, nil
651649
}
652650

653-
func (hc *HTTPClient) GetAlphaState(url string) (*pb.MembershipState, error) {
654-
if url == "" {
655-
url = hc.alphaStateURL
656-
}
657-
resp, err := http.Get(url)
658-
if err != nil {
659-
return nil, err
660-
}
661-
662-
body, err := io.ReadAll(resp.Body)
663-
if err != nil {
664-
return nil, errors.Wrapf(err, "error reading zero state response body")
665-
}
666-
var state pb.MembershipState
667-
if err := protojson.Unmarshal(body, &state); err != nil {
668-
return nil, err
669-
}
670-
671-
return &state, err
672-
}
673-
674651
func (hc *HTTPClient) PostDqlQuery(query string) ([]byte, error) {
675652
req, err := http.NewRequest(http.MethodPost, hc.dqlURL, bytes.NewBufferString(query))
676653
if err != nil {

dgraphtest/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ type ClusterConfig struct {
102102
refillInterval time.Duration
103103
uidLease int
104104
portOffset int // exposed port offset for grpc/http port for both alpha/zero
105-
bulkOutDir string
105+
bulkOutDirForMount string
106106
lambdaURL string
107107
featureFlags []string
108108
customPlugins bool
@@ -224,7 +224,7 @@ func (cc ClusterConfig) WithExposedPortOffset(offset uint64) ClusterConfig {
224224
// WithBulkLoadOutDir sets the out dir for the bulk loader. This ensures
225225
// that the same p directory is used while setting up alphas.
226226
func (cc ClusterConfig) WithBulkLoadOutDir(dir string) ClusterConfig {
227-
cc.bulkOutDir = dir
227+
cc.bulkOutDirForMount = dir
228228
return cc
229229
}
230230

dgraphtest/dgraph.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,8 @@ func (a *alpha) mounts(c *LocalCluster) ([]mount.Mount, error) {
311311
})
312312
}
313313

314-
if c.conf.bulkOutDir != "" {
315-
pDir := filepath.Join(c.conf.bulkOutDir, strconv.Itoa(a.id/c.conf.replicas), "p")
314+
if c.conf.bulkOutDirForMount != "" {
315+
pDir := filepath.Join(c.conf.bulkOutDirForMount, strconv.Itoa(a.id/c.conf.replicas), "p")
316316
if err := os.MkdirAll(pDir, os.ModePerm); err != nil {
317317
return nil, errors.Wrap(err, "erorr creating bulk dir")
318318
}

0 commit comments

Comments
 (0)