@@ -7,16 +7,13 @@ package dgraphimport
77
88import (
99 "context"
10- "errors"
1110 "fmt"
12- "io"
13- "math"
1411 "os"
1512 "path/filepath"
1613
1714 "github.com/dgraph-io/badger/v4"
1815 apiv2 "github.com/dgraph-io/dgo/v250/protos/api.v2"
19- "github.com/dgraph-io/ristretto/v2/z "
16+ "github.com/hypermodeinc/ dgraph/v25/worker "
2017
2118 "github.com/golang/glog"
2219 "golang.org/x/sync/errgroup"
@@ -48,10 +45,13 @@ func Import(ctx context.Context, endpoint string, opts grpc.DialOption, bulkOutD
4845}
4946
5047// startPDirStream initiates a snapshot stream session with the Dgraph server.
51- func startPDirStream (ctx context.Context , dc apiv2.DgraphClient ) (* apiv2.InitiatePDirStreamResponse , error ) {
48+ func startPDirStream (ctx context.Context , dc apiv2.DgraphClient ) (* apiv2.DrainModeResponse , error ) {
5249 glog .Info ("Initiating pdir stream" )
53- req := & apiv2.InitiatePDirStreamRequest {}
54- resp , err := dc .InitiatePDirStream (ctx , req )
50+ req := & apiv2.DrainModeRequest {
51+ DrainMode : true ,
52+ DropData : false ,
53+ }
54+ resp , err := dc .UpdateClusterDrainMode (ctx , req )
5555 if err != nil {
5656 glog .Errorf ("failed to initiate pdir stream: %v" , err )
5757 return nil , fmt .Errorf ("failed to initiate pdir stream: %v" , err )
@@ -63,7 +63,7 @@ func startPDirStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.Initiat
6363// sendPDir takes a p directory and a set of group IDs and streams the data from the
6464// p directory to the corresponding group IDs. It first scans the provided directory for
6565// subdirectories named with numeric group IDs.
66- func sendPDir (ctx context.Context , dg apiv2.DgraphClient , baseDir string , groups []uint32 ) error {
66+ func sendPDir (ctx context.Context , dc apiv2.DgraphClient , baseDir string , groups []uint32 ) error {
6767 glog .Infof ("Starting to stream pdir from directory: %s" , baseDir )
6868
6969 errG , ctx := errgroup .WithContext (ctx )
@@ -74,20 +74,39 @@ func sendPDir(ctx context.Context, dg apiv2.DgraphClient, baseDir string, groups
7474 if _ , err := os .Stat (pDir ); err != nil {
7575 return fmt .Errorf ("p directory does not exist for group [%d]: [%s]" , group , pDir )
7676 }
77-
7877 glog .Infof ("Streaming data for group [%d] from directory: [%s]" , group , pDir )
79- if err := streamData (ctx , dg , pDir , group ); err != nil {
78+ if err := streamData (ctx , dc , pDir , group ); err != nil {
8079 glog .Errorf ("Failed to stream data for groups [%v] from directory: [%s]: %v" , group , pDir , err )
8180 return err
8281 }
8382
8483 return nil
8584 })
8685 }
87- if err := errG .Wait (); err != nil {
88- return err
86+ if err1 := errG .Wait (); err1 != nil {
87+ // If the p directory doesn't exist for this group, it indicates that
88+ // streaming might be in progress to other groups. We disable drain mode
89+ // to prevent interference and drop any streamed data to ensure a clean state.
90+ req := & apiv2.DrainModeRequest {
91+ DrainMode : false ,
92+ DropData : true ,
93+ }
94+ if _ , err := dc .UpdateClusterDrainMode (context .Background (), req ); err != nil {
95+ return fmt .Errorf ("failed to stream data :%v failed to off drain mode: %v" , err1 , err )
96+ }
97+
98+ glog .Info ("success fully disabled drain mode" )
99+ return err1
89100 }
90101
102+ glog .Info ("Completed streaming all pdirs" )
103+ req := & apiv2.DrainModeRequest {
104+ DrainMode : false ,
105+ DropData : false ,
106+ }
107+ if _ , err := dc .UpdateClusterDrainMode (context .Background (), req ); err != nil {
108+ return fmt .Errorf ("failed to off drain mode: %v" , err )
109+ }
91110 glog .Infof ("Completed streaming all pdirs" )
92111 return nil
93112}
@@ -125,34 +144,16 @@ func streamData(ctx context.Context, dg apiv2.DgraphClient, pdir string, groupId
125144
126145 // Configure and start the BadgerDB stream
127146 glog .Infof ("Starting BadgerDB stream for group [%d]" , groupId )
128- stream := ps .NewStreamAt (math .MaxUint64 )
129- stream .LogPrefix = fmt .Sprintf ("Sending P dir for group [%d]" , groupId )
130- stream .KeyToList = nil
131- stream .Send = func (buf * z.Buffer ) error {
132- p := & apiv2.StreamPacket {Data : buf .Bytes ()}
133- if err := out .Send (& apiv2.StreamPDirRequest {StreamPacket : p }); err != nil && ! errors .Is (err , io .EOF ) {
134- return fmt .Errorf ("failed to send data chunk: %w" , err )
135- }
136- return nil
137- }
147+ // if err := RunBadgerStream(ctx, ps, out, groupId); err != nil {
148+ // return fmt.Errorf("badger stream failed for group [%d]: %w", groupId, err)
149+ // }
138150
139- // Execute the stream process
140- if err := stream .Orchestrate (ctx ); err != nil {
141- return fmt .Errorf ("stream orchestration failed for group [%d]: %w" , groupId , err )
151+ if err := worker .RunBadgerStream (ctx , ps , out , groupId ); err != nil {
152+ return fmt .Errorf ("badger stream failed for group [%d]: %w" , groupId , err )
142153 }
143-
144- // Send the final 'done' signal to mark completion
145- glog .Infof ("Sending completion signal for group [%d]" , groupId )
146- done := & apiv2.StreamPacket {Done : true }
147-
148- if err := out .Send (& apiv2.StreamPDirRequest {StreamPacket : done }); err != nil && ! errors .Is (err , io .EOF ) {
149- return fmt .Errorf ("failed to send 'done' signal for group [%d]: %w" , groupId , err )
150- }
151- // Wait for acknowledgment from the server
152154 if _ , err := out .CloseAndRecv (); err != nil {
153155 return fmt .Errorf ("failed to receive ACK for group [%d]: %w" , groupId , err )
154156 }
155157 glog .Infof ("Group [%d]: Received ACK " , groupId )
156-
157158 return nil
158159}
0 commit comments