Skip to content

Commit c801bc7

Browse files
resolve review comments
1 parent 50eb712 commit c801bc7

9 files changed

Lines changed: 359 additions & 425 deletions

File tree

dgraph/cmd/dgraphimport/import_client.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,32 +39,32 @@ func Import(ctx context.Context, endpoint string, opts grpc.DialOption, bulkOutD
3939
if err != nil {
4040
return err
4141
}
42-
resp, err := startSnapshotStream(ctx, dg)
42+
resp, err := startPDirStream(ctx, dg)
4343
if err != nil {
4444
return err
4545
}
4646

47-
return sendSnapshot(ctx, dg, bulkOutDir, resp.Groups)
47+
return sendPDir(ctx, dg, bulkOutDir, resp.Groups)
4848
}
4949

50-
// startSnapshotStream initiates a snapshot stream session with the Dgraph server.
51-
func startSnapshotStream(ctx context.Context, dc apiv25.DgraphClient) (*apiv25.InitiateSnapshotStreamResponse, error) {
52-
glog.Info("Initiating snapshot stream")
53-
req := &apiv25.InitiateSnapshotStreamRequest{}
54-
resp, err := dc.InitiateSnapshotStream(ctx, req)
50+
// startPDirStream initiates a snapshot stream session with the Dgraph server.
51+
func startPDirStream(ctx context.Context, dc apiv25.DgraphClient) (*apiv25.InitiatePDirStreamResponse, error) {
52+
glog.Info("Initiating pdir stream")
53+
req := &apiv25.InitiatePDirStreamRequest{}
54+
resp, err := dc.InitiatePDirStream(ctx, req)
5555
if err != nil {
56-
glog.Errorf("failed to initiate snapshot stream: %v", err)
57-
return nil, fmt.Errorf("failed to initiate snapshot stream: %v", err)
56+
glog.Errorf("failed to initiate pdir stream: %v", err)
57+
return nil, fmt.Errorf("failed to initiate pdir stream: %v", err)
5858
}
59-
glog.Info("Snapshot stream initiated successfully")
59+
glog.Info("Pdir stream initiated successfully")
6060
return resp, nil
6161
}
6262

63-
// sendSnapshot takes a p directory and a set of group IDs and streams the data from the
63+
// sendPDir takes a p directory and a set of group IDs and streams the data from the
6464
// p directory to the corresponding group IDs. It first scans the provided directory for
6565
// subdirectories named with numeric group IDs.
66-
func sendSnapshot(ctx context.Context, dg apiv25.DgraphClient, baseDir string, groups []uint32) error {
67-
glog.Infof("Starting to stream snapshots from directory: %s", baseDir)
66+
func sendPDir(ctx context.Context, dg apiv25.DgraphClient, baseDir string, groups []uint32) error {
67+
glog.Infof("Starting to stream pdir from directory: %s", baseDir)
6868

6969
errG, ctx := errgroup.WithContext(ctx)
7070
for _, group := range groups {
@@ -88,7 +88,7 @@ func sendSnapshot(ctx context.Context, dg apiv25.DgraphClient, baseDir string, g
8888
return err
8989
}
9090

91-
glog.Infof("Completed streaming all snapshots")
91+
glog.Infof("Completed streaming all pdirs")
9292
return nil
9393
}
9494

@@ -98,9 +98,9 @@ func streamData(ctx context.Context, dg apiv25.DgraphClient, pdir string, groupI
9898
glog.Infof("Opening stream for group %d from directory %s", groupId, pdir)
9999

100100
// Initialize stream with the server
101-
out, err := dg.StreamSnapshot(ctx)
101+
out, err := dg.StreamPDir(ctx)
102102
if err != nil {
103-
return fmt.Errorf("failed to start snapshot stream for group %d: %w", groupId, err)
103+
return fmt.Errorf("failed to start pdir stream for group %d: %w", groupId, err)
104104
}
105105

106106
// Open the BadgerDB instance at the specified directory
@@ -118,7 +118,7 @@ func streamData(ctx context.Context, dg apiv25.DgraphClient, pdir string, groupI
118118

119119
// Send group ID as the first message in the stream
120120
glog.Infof("Sending group ID [%d] to server", groupId)
121-
groupReq := &apiv25.StreamSnapshotRequest{GroupId: groupId}
121+
groupReq := &apiv25.StreamPDirRequest{GroupId: groupId}
122122
if err := out.Send(groupReq); err != nil {
123123
return fmt.Errorf("failed to send group ID [%d]: %w", groupId, err)
124124
}
@@ -129,8 +129,8 @@ func streamData(ctx context.Context, dg apiv25.DgraphClient, pdir string, groupI
129129
stream.LogPrefix = fmt.Sprintf("Sending P dir for group [%d]", groupId)
130130
stream.KeyToList = nil
131131
stream.Send = func(buf *z.Buffer) error {
132-
kvs := &apiv25.KVS{Data: buf.Bytes()}
133-
if err := out.Send(&apiv25.StreamSnapshotRequest{Pairs: kvs}); err != nil && !errors.Is(err, io.EOF) {
132+
p := &apiv25.StreamPacket{Data: buf.Bytes()}
133+
if err := out.Send(&apiv25.StreamPDirRequest{StreamPacket: p}); err != nil && !errors.Is(err, io.EOF) {
134134
return fmt.Errorf("failed to send data chunk: %w", err)
135135
}
136136
return nil
@@ -143,9 +143,9 @@ func streamData(ctx context.Context, dg apiv25.DgraphClient, pdir string, groupI
143143

144144
// Send the final 'done' signal to mark completion
145145
glog.Infof("Sending completion signal for group [%d]", groupId)
146-
done := &apiv25.KVS{Done: true}
146+
done := &apiv25.StreamPacket{Done: true}
147147

148-
if err := out.Send(&apiv25.StreamSnapshotRequest{Pairs: done}); err != nil && !errors.Is(err, io.EOF) {
148+
if err := out.Send(&apiv25.StreamPDirRequest{StreamPacket: done}); err != nil && !errors.Is(err, io.EOF) {
149149
return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err)
150150
}
151151
// Wait for acknowledgment from the server

dgraph/cmd/dgraphimport/import_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func TestDrainModeAfterStartSnapshotStream(t *testing.T) {
7171
dc, err := newClient(url, grpc.WithTransportCredentials(insecure.NewCredentials()))
7272
require.NoError(t, err)
7373

74-
resp, err := startSnapshotStream(context.Background(), dc)
74+
resp, err := startPDirStream(context.Background(), dc)
7575
require.NoError(t, err)
7676

7777
require.Equal(t, tt.expectedNum, len(resp.Groups))

edgraph/server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,21 +1810,21 @@ func validateNamespace(ctx context.Context, tc *api.TxnContext) error {
18101810
return nil
18111811
}
18121812

1813-
func (s *ServerV25) InitiateSnapshotStream(ctx context.Context,
1814-
c *apiv25.InitiateSnapshotStreamRequest) (v *apiv25.InitiateSnapshotStreamResponse, err error) {
1813+
func (s *ServerV25) InitiatePDirStream(ctx context.Context,
1814+
c *apiv25.InitiatePDirStreamRequest) (v *apiv25.InitiatePDirStreamResponse, err error) {
18151815

18161816
drainMode := &pb.DrainModeRequest{State: true}
18171817
groups, err := worker.ProposeDrain(ctx, drainMode)
18181818
if err != nil {
18191819
return nil, err
18201820
}
18211821

1822-
resp := &apiv25.InitiateSnapshotStreamResponse{Groups: groups}
1822+
resp := &apiv25.InitiatePDirStreamResponse{Groups: groups}
18231823

18241824
return resp, nil
18251825
}
18261826

1827-
func (s *ServerV25) StreamSnapshot(stream apiv25.Dgraph_StreamSnapshotServer) error {
1827+
func (s *ServerV25) StreamPDir(stream apiv25.Dgraph_StreamPDirServer) error {
18281828
if err := worker.InStream(stream); err != nil {
18291829
return err
18301830
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ require (
99
github.com/Masterminds/semver/v3 v3.3.1
1010
github.com/blevesearch/bleve/v2 v2.5.0
1111
github.com/dgraph-io/badger/v4 v4.7.0
12-
github.com/dgraph-io/dgo/v250 v250.0.0-20250422095606-337c8b78908a
12+
github.com/dgraph-io/dgo/v250 v250.0.0-20250511125955-0567006c46cc
1313
github.com/dgraph-io/gqlgen v0.13.2
1414
github.com/dgraph-io/gqlparser/v2 v2.2.2
1515
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
122122
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
123123
github.com/dgraph-io/badger/v4 v4.7.0 h1:Q+J8HApYAY7UMpL8d9owqiB+odzEc0zn/aqOD9jhc6Y=
124124
github.com/dgraph-io/badger/v4 v4.7.0/go.mod h1:He7TzG3YBy3j4f5baj5B7Zl2XyfNe5bl4Udl0aPemVA=
125-
github.com/dgraph-io/dgo/v250 v250.0.0-20250422095606-337c8b78908a h1:RhE9o87JCsfvNnZmpLQ3KFQ2LRXX/93B32zue9KxHm0=
126-
github.com/dgraph-io/dgo/v250 v250.0.0-20250422095606-337c8b78908a/go.mod h1:h1cPCTmI0GZWc770q608FkWEdQfC4iiWYAgvQGnvQ9A=
125+
github.com/dgraph-io/dgo/v250 v250.0.0-20250511125955-0567006c46cc h1:TACB/WzfNGQxgxWQelzg6ur/NWoxOmeesrjJwVZ+Zjk=
126+
github.com/dgraph-io/dgo/v250 v250.0.0-20250511125955-0567006c46cc/go.mod h1:m5zwXW3OyxYK4asn97VxPRcQBn+dYj0uzT2MIu2R4is=
127127
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=
128128
github.com/dgraph-io/gqlgen v0.13.2/go.mod h1:iCOrOv9lngN7KAo+jMgvUPVDlYHdf7qDwsTkQby2Sis=
129129
github.com/dgraph-io/gqlparser/v2 v2.1.1/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU=

protos/pb.proto

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -597,11 +597,7 @@ service Worker {
597597
rpc DeleteNamespace(DeleteNsRequest) returns (Status) {}
598598
rpc TaskStatus(TaskStatusRequest) returns (TaskStatusResponse) {}
599599
rpc ApplyDrainmode(DrainModeRequest) returns (Status) {}
600-
rpc InternalStreamSnapshot(stream api.v25.StreamSnapshotRequest) returns (api.v25.StreamSnapshotResponse) {}
601-
}
602-
603-
message ReceiveSnapshotKVRequest {
604-
bool done = 1;
600+
rpc InternalStreamPDir(stream api.v25.StreamPDirRequest) returns (api.v25.StreamPDirResponse) {}
605601
}
606602

607603
message DrainModeRequest {

0 commit comments

Comments
 (0)