@@ -7,16 +7,13 @@ package dgraphimport
77
88import (
99 "context"
10- "errors"
1110 "fmt"
12- "io"
13- "math"
1411 "os"
1512 "path/filepath"
1613
1714 "github.com/dgraph-io/badger/v4"
1815 apiv2 "github.com/dgraph-io/dgo/v250/protos/api.v2"
19- "github.com/dgraph-io/ristretto/v2/z "
16+ "github.com/hypermodeinc/ dgraph/v25/worker "
2017
2118 "github.com/golang/glog"
2219 "golang.org/x/sync/errgroup"
@@ -48,10 +45,12 @@ func Import(ctx context.Context, endpoint string, opts grpc.DialOption, bulkOutD
4845}
4946
5047// startPDirStream initiates a snapshot stream session with the Dgraph server.
51- func startPDirStream (ctx context.Context , dc apiv2.DgraphClient ) (* apiv2.InitiatePDirStreamResponse , error ) {
48+ func startPDirStream (ctx context.Context , dc apiv2.DgraphClient ) (* apiv2.UpdateExtSnapshotStreamingStateResponse , error ) {
5249 glog .Info ("Initiating pdir stream" )
53- req := & apiv2.InitiatePDirStreamRequest {}
54- resp , err := dc .InitiatePDirStream (ctx , req )
50+ req := & apiv2.UpdateExtSnapshotStreamingStateRequest {
51+ Start : true ,
52+ }
53+ resp , err := dc .UpdateExtSnapshotStreamingState (ctx , req )
5554 if err != nil {
5655 glog .Errorf ("failed to initiate pdir stream: %v" , err )
5756 return nil , fmt .Errorf ("failed to initiate pdir stream: %v" , err )
@@ -63,7 +62,7 @@ func startPDirStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.Initiat
6362// sendPDir takes a p directory and a set of group IDs and streams the data from the
6463// p directory to the corresponding group IDs. It first scans the provided directory for
6564// subdirectories named with numeric group IDs.
66- func sendPDir (ctx context.Context , dg apiv2.DgraphClient , baseDir string , groups []uint32 ) error {
65+ func sendPDir (ctx context.Context , dc apiv2.DgraphClient , baseDir string , groups []uint32 ) error {
6766 glog .Infof ("Starting to stream pdir from directory: %s" , baseDir )
6867
6968 errG , ctx := errgroup .WithContext (ctx )
@@ -74,31 +73,43 @@ func sendPDir(ctx context.Context, dg apiv2.DgraphClient, baseDir string, groups
7473 if _ , err := os .Stat (pDir ); err != nil {
7574 return fmt .Errorf ("p directory does not exist for group [%d]: [%s]" , group , pDir )
7675 }
77-
7876 glog .Infof ("Streaming data for group [%d] from directory: [%s]" , group , pDir )
79- if err := streamData (ctx , dg , pDir , group ); err != nil {
77+ if err := streamData (ctx , dc , pDir , group ); err != nil {
8078 glog .Errorf ("Failed to stream data for groups [%v] from directory: [%s]: %v" , group , pDir , err )
8179 return err
8280 }
8381
8482 return nil
8583 })
8684 }
87- if err := errG .Wait (); err != nil {
88- return err
85+ if err1 := errG .Wait (); err1 != nil {
86+ // If the p directory doesn't exist for this group, it indicates that
87+ // streaming might be in progress to other groups. We disable drain mode
88+ // to prevent interference and drop any streamed data to ensure a clean state.
89+ req := & apiv2.UpdateExtSnapshotStreamingStateRequest {
90+ Start : false ,
91+ Finish : true ,
92+ DropData : true ,
93+ }
94+ if _ , err := dc .UpdateExtSnapshotStreamingState (context .Background (), req ); err != nil {
95+ return fmt .Errorf ("failed to stream data :%v failed to off drain mode: %v" , err1 , err )
96+ }
97+
98+ glog .Info ("successfully disabled drain mode" )
99+ return err1
89100 }
90101
91- glog .Infof ("Completed streaming all pdirs" )
102+ glog .Info ("Completed streaming all pdirs" )
92103 return nil
93104}
94105
95106// streamData handles the actual data streaming process for a single group.
96107// It opens the BadgerDB at the specified directory and streams all data to the server.
97- func streamData (ctx context.Context , dg apiv2.DgraphClient , pdir string , groupId uint32 ) error {
108+ func streamData (ctx context.Context , dc apiv2.DgraphClient , pdir string , groupId uint32 ) error {
98109 glog .Infof ("Opening stream for group %d from directory %s" , groupId , pdir )
99110
100111 // Initialize stream with the server
101- out , err := dg . StreamPDir (ctx )
112+ out , err := dc . StreamExtSnapshot (ctx )
102113 if err != nil {
103114 return fmt .Errorf ("failed to start pdir stream for group %d: %w" , groupId , err )
104115 }
@@ -118,41 +129,23 @@ func streamData(ctx context.Context, dg apiv2.DgraphClient, pdir string, groupId
118129
119130 // Send group ID as the first message in the stream
120131 glog .Infof ("Sending group ID [%d] to server" , groupId )
121- groupReq := & apiv2.StreamPDirRequest {GroupId : groupId }
132+ groupReq := & apiv2.StreamExtSnapshotRequest {GroupId : groupId }
122133 if err := out .Send (groupReq ); err != nil {
123134 return fmt .Errorf ("failed to send group ID [%d]: %w" , groupId , err )
124135 }
125136
126137 // Configure and start the BadgerDB stream
127138 glog .Infof ("Starting BadgerDB stream for group [%d]" , groupId )
128- stream := ps .NewStreamAt (math .MaxUint64 )
129- stream .LogPrefix = fmt .Sprintf ("Sending P dir for group [%d]" , groupId )
130- stream .KeyToList = nil
131- stream .Send = func (buf * z.Buffer ) error {
132- p := & apiv2.StreamPacket {Data : buf .Bytes ()}
133- if err := out .Send (& apiv2.StreamPDirRequest {StreamPacket : p }); err != nil && ! errors .Is (err , io .EOF ) {
134- return fmt .Errorf ("failed to send data chunk: %w" , err )
135- }
136- return nil
137- }
138-
139- // Execute the stream process
140- if err := stream .Orchestrate (ctx ); err != nil {
141- return fmt .Errorf ("stream orchestration failed for group [%d]: %w" , groupId , err )
142- }
143-
144- // Send the final 'done' signal to mark completion
145- glog .Infof ("Sending completion signal for group [%d]" , groupId )
146- done := & apiv2.StreamPacket {Done : true }
139+ // if err := RunBadgerStream(ctx, ps, out, groupId); err != nil {
140+ // return fmt.Errorf("badger stream failed for group [%d]: %w", groupId, err)
141+ // }
147142
148- if err := out . Send ( & apiv2. StreamPDirRequest { StreamPacket : done } ); err != nil && ! errors . Is ( err , io . EOF ) {
149- return fmt .Errorf ("failed to send 'done' signal for group [%d]: %w" , groupId , err )
143+ if err := worker . RunBadgerStream ( ctx , ps , out , groupId ); err != nil {
144+ return fmt .Errorf ("badger stream failed for group [%d]: %w" , groupId , err )
150145 }
151- // Wait for acknowledgment from the server
152146 if _ , err := out .CloseAndRecv (); err != nil {
153147 return fmt .Errorf ("failed to receive ACK for group [%d]: %w" , groupId , err )
154148 }
155149 glog .Infof ("Group [%d]: Received ACK " , groupId )
156-
157150 return nil
158151}
0 commit comments