@@ -30,7 +30,7 @@ func newClient(endpoint string, opts grpc.DialOption) (apiv2.DgraphClient, error
3030 return nil , fmt .Errorf ("failed to connect to endpoint [%s]: %w" , endpoint , err )
3131 }
3232
33- glog .Infof ("Successfully connected to Dgraph endpoint: %s" , endpoint )
33+ glog .Infof ("[import] Successfully connected to Dgraph endpoint: %s" , endpoint )
3434 return apiv2 .NewDgraphClient (conn ), nil
3535}
3636
@@ -39,120 +39,160 @@ func Import(ctx context.Context, endpoint string, opts grpc.DialOption, bulkOutD
3939 if err != nil {
4040 return err
4141 }
42- resp , err := startPDirStream (ctx , dg )
42+ resp , err := initiateSnapshotStream (ctx , dg )
4343 if err != nil {
4444 return err
4545 }
4646
47- return sendPDir (ctx , dg , bulkOutDir , resp .Groups )
47+ return streamSnapshot (ctx , dg , bulkOutDir , resp .Groups )
4848}
4949
50- // startPDirStream initiates a snapshot stream session with the Dgraph server.
51- func startPDirStream (ctx context.Context , dc apiv2.DgraphClient ) (* apiv2.InitiatePDirStreamResponse , error ) {
52- glog .Info ("Initiating pdir stream" )
53- req := & apiv2.InitiatePDirStreamRequest {}
54- resp , err := dc .InitiatePDirStream (ctx , req )
50+ // initiateSnapshotStream initiates a snapshot stream session with the Dgraph server.
51+ func initiateSnapshotStream (ctx context.Context , dc apiv2.DgraphClient ) (* apiv2.UpdateExtSnapshotStreamingStateResponse , error ) {
52+ glog .Info ("[import] Initiating external snapshot stream" )
53+ req := & apiv2.UpdateExtSnapshotStreamingStateRequest {
54+ Start : true ,
55+ }
56+ resp , err := dc .UpdateExtSnapshotStreamingState (ctx , req )
5557 if err != nil {
56- glog .Errorf ("failed to initiate pdir stream: %v" , err )
57- return nil , fmt .Errorf ("failed to initiate pdir stream: %v" , err )
58+ glog .Errorf ("[import] failed to initiate external snapshot stream: %v" , err )
59+ return nil , fmt .Errorf ("failed to initiate external snapshot stream: %v" , err )
5860 }
59- glog .Info ("Pdir stream initiated successfully" )
61+ glog .Info ("[import] External snapshot stream initiated successfully" )
6062 return resp , nil
6163}
6264
63- // sendPDir takes a p directory and a set of group IDs and streams the data from the
65+ // streamSnapshot takes a p directory and a set of group IDs and streams the data from the
6466// p directory to the corresponding group IDs. It first scans the provided directory for
6567// subdirectories named with numeric group IDs.
66- func sendPDir (ctx context.Context , dg apiv2.DgraphClient , baseDir string , groups []uint32 ) error {
67- glog .Infof ("Starting to stream pdir from directory: %s" , baseDir )
68+ func streamSnapshot (ctx context.Context , dc apiv2.DgraphClient , baseDir string , groups []uint32 ) error {
69+ glog .Infof ("[import] Starting to stream snapshot from directory: %s" , baseDir )
6870
69- errG , ctx := errgroup .WithContext (ctx )
71+ errG , errGrpCtx := errgroup .WithContext (ctx )
7072 for _ , group := range groups {
71- group := group
7273 errG .Go (func () error {
7374 pDir := filepath .Join (baseDir , fmt .Sprintf ("%d" , group - 1 ), "p" )
7475 if _ , err := os .Stat (pDir ); err != nil {
7576 return fmt .Errorf ("p directory does not exist for group [%d]: [%s]" , group , pDir )
7677 }
77-
78- glog .Infof ("Streaming data for group [%d] from directory: [%s]" , group , pDir )
79- if err := streamData (ctx , dg , pDir , group ); err != nil {
80- glog .Errorf ("Failed to stream data for groups [%v] from directory: [%s]: %v" , group , pDir , err )
78+ glog .Infof ("[import] Streaming data for group [%d] from directory: [%s]" , group , pDir )
79+ if err := streamSnapshotForGroup (errGrpCtx , dc , pDir , group ); err != nil {
80+ glog .Errorf ("[import] Failed to stream data for group [%v] from directory: [%s]: %v" , group , pDir , err )
8181 return err
8282 }
8383
8484 return nil
8585 })
8686 }
87+
8788 if err := errG .Wait (); err != nil {
89+ glog .Errorf ("[import] failed to stream external snapshot: %v" , err )
90+ // If errors occurs during streaming of the external snapshot, we drop all the data and
91+ // go back to ensure a clean slate and the cluster remains in working state.
92+ req := & apiv2.UpdateExtSnapshotStreamingStateRequest {
93+ Start : false ,
94+ Finish : true ,
95+ DropData : true ,
96+ }
97+ if _ , err := dc .UpdateExtSnapshotStreamingState (ctx , req ); err != nil {
98+ return fmt .Errorf ("failed to turn off drain mode: %v" , err )
99+ }
100+
101+ glog .Info ("[import] successfully disabled drain mode" )
88102 return err
89103 }
90104
91- glog .Infof ("Completed streaming all pdirs" )
105+ glog .Info ("[import] Completed streaming external snapshot" )
106+ req := & apiv2.UpdateExtSnapshotStreamingStateRequest {
107+ Start : false ,
108+ Finish : true ,
109+ DropData : false ,
110+ }
111+ if _ , err := dc .UpdateExtSnapshotStreamingState (ctx , req ); err != nil {
112+ glog .Errorf ("[import] failed to disable drain mode: %v" , err )
113+ return fmt .Errorf ("failed to disable drain mode: %v" , err )
114+ }
115+ glog .Info ("[import] successfully disable drain mode" )
92116 return nil
93117}
94118
95- // streamData handles the actual data streaming process for a single group.
119+ // streamSnapshotForGroup handles the actual data streaming process for a single group.
96120// It opens the BadgerDB at the specified directory and streams all data to the server.
97- func streamData (ctx context.Context , dg apiv2.DgraphClient , pdir string , groupId uint32 ) error {
121+ func streamSnapshotForGroup (ctx context.Context , dc apiv2.DgraphClient , pdir string , groupId uint32 ) error {
98122 glog .Infof ("Opening stream for group %d from directory %s" , groupId , pdir )
99123
100124 // Initialize stream with the server
101- out , err := dg . StreamPDir (ctx )
125+ out , err := dc . StreamExtSnapshot (ctx )
102126 if err != nil {
103- return fmt .Errorf ("failed to start pdir stream for group %d: %w" , groupId , err )
127+ return fmt .Errorf ("failed to start external snapshot stream for group %d: %w" , groupId , err )
104128 }
105129
130+ defer func () {
131+ if _ , err := out .CloseAndRecv (); err != nil {
132+ glog .Errorf ("[import] failed to receive ACK for group [%v]: %v" , groupId , err )
133+ }
134+
135+ glog .Infof ("Group [%v]: Received ACK " , groupId )
136+ }()
137+
106138 // Open the BadgerDB instance at the specified directory
107139 opt := badger .DefaultOptions (pdir )
108140 ps , err := badger .OpenManaged (opt )
109141 if err != nil {
110- return fmt .Errorf ("failed to open BadgerDB at [%s]: %w" , pdir , err )
142+ glog .Errorf ("failed to open BadgerDB at [%s]: %v" , pdir , err )
143+ return fmt .Errorf ("failed to open BadgerDB at [%v]: %v" , pdir , err )
111144 }
112145
113146 defer func () {
114147 if err := ps .Close (); err != nil {
115- glog .Warningf ("Error closing BadgerDB: %v" , err )
148+ glog .Warningf ("[import] Error closing BadgerDB: %v" , err )
116149 }
117150 }()
118151
119152 // Send group ID as the first message in the stream
120- glog .Infof ("Sending group ID [%d ] to server" , groupId )
121- groupReq := & apiv2.StreamPDirRequest {GroupId : groupId }
153+ glog .Infof ("[import] Sending group ID [%v ] to server" , groupId )
154+ groupReq := & apiv2.StreamExtSnapshotRequest {GroupId : groupId }
122155 if err := out .Send (groupReq ); err != nil {
123- return fmt .Errorf ("failed to send group ID [%d ]: %w " , groupId , err )
156+ return fmt .Errorf ("failed to send group ID [%v ]: %v " , groupId , err )
124157 }
125158
126159 // Configure and start the BadgerDB stream
127- glog .Infof ("Starting BadgerDB stream for group [%d]" , groupId )
160+ glog .Infof ("[import] Starting BadgerDB stream for group [%v]" , groupId )
161+
162+ if err := streamBadger (ctx , ps , out , groupId ); err != nil {
163+ return fmt .Errorf ("badger streaming failed for group [%v]: %v" , groupId , err )
164+ }
165+
166+ return nil
167+ }
168+
169+ // streamBadger runs a BadgerDB stream to send key-value pairs to the specified group.
170+ // It creates a new stream at the maximum sequence number and sends the data to the specified group.
171+ // It also sends a final 'done' signal to mark completion.
172+ func streamBadger (ctx context.Context , ps * badger.DB , out apiv2.Dgraph_StreamExtSnapshotClient , groupId uint32 ) error {
128173 stream := ps .NewStreamAt (math .MaxUint64 )
129- stream .LogPrefix = fmt . Sprintf ( " Sending P dir for group [%d] " , groupId )
174+ stream .LogPrefix = "[import] Sending external snapshot to group [" + fmt . Sprintf ( "%d " , groupId ) + "]"
130175 stream .KeyToList = nil
131176 stream .Send = func (buf * z.Buffer ) error {
132177 p := & apiv2.StreamPacket {Data : buf .Bytes ()}
133- if err := out .Send (& apiv2.StreamPDirRequest { StreamPacket : p }); err != nil && ! errors .Is (err , io .EOF ) {
178+ if err := out .Send (& apiv2.StreamExtSnapshotRequest { Pkt : p }); err != nil && ! errors .Is (err , io .EOF ) {
134179 return fmt .Errorf ("failed to send data chunk: %w" , err )
135180 }
136181 return nil
137182 }
138183
139184 // Execute the stream process
140185 if err := stream .Orchestrate (ctx ); err != nil {
141- return fmt .Errorf ("stream orchestration failed for group [%d] : %w" , groupId , err )
186+ return fmt .Errorf ("stream orchestration failed: %w" , err )
142187 }
143188
144189 // Send the final 'done' signal to mark completion
145- glog .Infof ("Sending completion signal for group [%d]" , groupId )
190+ glog .Infof ("[import] Sending completion signal for group [%d]" , groupId )
146191 done := & apiv2.StreamPacket {Done : true }
147192
148- if err := out .Send (& apiv2.StreamPDirRequest { StreamPacket : done }); err != nil && ! errors .Is (err , io .EOF ) {
193+ if err := out .Send (& apiv2.StreamExtSnapshotRequest { Pkt : done }); err != nil && ! errors .Is (err , io .EOF ) {
149194 return fmt .Errorf ("failed to send 'done' signal for group [%d]: %w" , groupId , err )
150195 }
151- // Wait for acknowledgment from the server
152- if _ , err := out .CloseAndRecv (); err != nil {
153- return fmt .Errorf ("failed to receive ACK for group [%d]: %w" , groupId , err )
154- }
155- glog .Infof ("Group [%d]: Received ACK " , groupId )
156196
157197 return nil
158198}
0 commit comments