@@ -7,31 +7,28 @@ package dgraphimport
77
88import (
99 "context"
10- "errors"
1110 "fmt"
12- "io"
13- "math"
1411 "os"
1512 "path/filepath"
1613
1714 "github.com/dgraph-io/badger/v4"
18- apiv2 "github.com/dgraph-io/dgo/v250/protos/api.v2"
19- "github.com/dgraph-io/ristretto/v2/z "
15+ apiv25 "github.com/dgraph-io/dgo/v250/protos/api.v2"
16+ "github.com/hypermodeinc/ dgraph/v25/worker "
2017
2118 "github.com/golang/glog"
2219 "golang.org/x/sync/errgroup"
2320 "google.golang.org/grpc"
2421)
2522
2623// newClient creates a new import client with the specified endpoint and gRPC options.
27- func newClient (endpoint string , opts grpc.DialOption ) (apiv2 .DgraphClient , error ) {
24+ func newClient (endpoint string , opts grpc.DialOption ) (apiv25 .DgraphClient , error ) {
2825 conn , err := grpc .NewClient (endpoint , opts )
2926 if err != nil {
3027 return nil , fmt .Errorf ("failed to connect to endpoint [%s]: %w" , endpoint , err )
3128 }
3229
3330 glog .Infof ("Successfully connected to Dgraph endpoint: %s" , endpoint )
34- return apiv2 .NewDgraphClient (conn ), nil
31+ return apiv25 .NewDgraphClient (conn ), nil
3532}
3633
3734func Import (ctx context.Context , endpoint string , opts grpc.DialOption , bulkOutDir string ) error {
@@ -48,10 +45,13 @@ func Import(ctx context.Context, endpoint string, opts grpc.DialOption, bulkOutD
4845}
4946
5047// startPDirStream initiates a snapshot stream session with the Dgraph server.
51- func startPDirStream (ctx context.Context , dc apiv2 .DgraphClient ) (* apiv2. InitiatePDirStreamResponse , error ) {
48+ func startPDirStream (ctx context.Context , dc apiv25 .DgraphClient ) (* apiv25. DrainModeResponse , error ) {
5249 glog .Info ("Initiating pdir stream" )
53- req := & apiv2.InitiatePDirStreamRequest {}
54- resp , err := dc .InitiatePDirStream (ctx , req )
50+ req := & apiv25.DrainModeRequest {
51+ DrainMode : true ,
52+ DropData : false ,
53+ }
54+ resp , err := dc .UpdateClusterDrainMode (ctx , req )
5555 if err != nil {
5656 glog .Errorf ("failed to initiate pdir stream: %v" , err )
5757 return nil , fmt .Errorf ("failed to initiate pdir stream: %v" , err )
@@ -63,7 +63,7 @@ func startPDirStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.Initiat
6363// sendPDir takes a p directory and a set of group IDs and streams the data from the
6464// p directory to the corresponding group IDs. It first scans the provided directory for
6565// subdirectories named with numeric group IDs.
66- func sendPDir (ctx context.Context , dg apiv2 .DgraphClient , baseDir string , groups []uint32 ) error {
66+ func sendPDir (ctx context.Context , dc apiv25 .DgraphClient , baseDir string , groups []uint32 ) error {
6767 glog .Infof ("Starting to stream pdir from directory: %s" , baseDir )
6868
6969 errG , ctx := errgroup .WithContext (ctx )
@@ -74,27 +74,46 @@ func sendPDir(ctx context.Context, dg apiv2.DgraphClient, baseDir string, groups
7474 if _ , err := os .Stat (pDir ); err != nil {
7575 return fmt .Errorf ("p directory does not exist for group [%d]: [%s]" , group , pDir )
7676 }
77-
7877 glog .Infof ("Streaming data for group [%d] from directory: [%s]" , group , pDir )
79- if err := streamData (ctx , dg , pDir , group ); err != nil {
78+ if err := streamData (ctx , dc , pDir , group ); err != nil {
8079 glog .Errorf ("Failed to stream data for groups [%v] from directory: [%s]: %v" , group , pDir , err )
8180 return err
8281 }
8382
8483 return nil
8584 })
8685 }
87- if err := errG .Wait (); err != nil {
88- return err
86+ if err1 := errG .Wait (); err1 != nil {
87+ // If the p directory doesn't exist for this group, it indicates that
88+ // streaming might be in progress to other groups. We disable drain mode
89+ // to prevent interference and drop any streamed data to ensure a clean state.
90+ req := & apiv25.DrainModeRequest {
91+ DrainMode : false ,
92+ DropData : true ,
93+ }
94+ if _ , err := dc .UpdateClusterDrainMode (context .Background (), req ); err != nil {
95+ return fmt .Errorf ("failed to stream data :%v failed to off drain mode: %v" , err1 , err )
96+ }
97+
98+ glog .Info ("success fully disabled drain mode" )
99+ return err1
89100 }
90101
102+ glog .Info ("Completed streaming all pdirs" )
103+ req := & apiv25.DrainModeRequest {
104+ DrainMode : false ,
105+ DropData : false ,
106+ }
107+ if _ , err := dc .UpdateClusterDrainMode (context .Background (), req ); err != nil {
108+ return fmt .Errorf ("failed to off drain mode: %v" , err )
109+ }
91110 glog .Infof ("Completed streaming all pdirs" )
92111 return nil
93112}
94113
95114// streamData handles the actual data streaming process for a single group.
96115// It opens the BadgerDB at the specified directory and streams all data to the server.
97- func streamData (ctx context.Context , dg apiv2 .DgraphClient , pdir string , groupId uint32 ) error {
116+ func streamData (ctx context.Context , dg apiv25 .DgraphClient , pdir string , groupId uint32 ) error {
98117 glog .Infof ("Opening stream for group %d from directory %s" , groupId , pdir )
99118
100119 // Initialize stream with the server
@@ -118,41 +137,23 @@ func streamData(ctx context.Context, dg apiv2.DgraphClient, pdir string, groupId
118137
119138 // Send group ID as the first message in the stream
120139 glog .Infof ("Sending group ID [%d] to server" , groupId )
121- groupReq := & apiv2 .StreamPDirRequest {GroupId : groupId }
140+ groupReq := & apiv25 .StreamPDirRequest {GroupId : groupId }
122141 if err := out .Send (groupReq ); err != nil {
123142 return fmt .Errorf ("failed to send group ID [%d]: %w" , groupId , err )
124143 }
125144
126145 // Configure and start the BadgerDB stream
127146 glog .Infof ("Starting BadgerDB stream for group [%d]" , groupId )
128- stream := ps .NewStreamAt (math .MaxUint64 )
129- stream .LogPrefix = fmt .Sprintf ("Sending P dir for group [%d]" , groupId )
130- stream .KeyToList = nil
131- stream .Send = func (buf * z.Buffer ) error {
132- p := & apiv2.StreamPacket {Data : buf .Bytes ()}
133- if err := out .Send (& apiv2.StreamPDirRequest {StreamPacket : p }); err != nil && ! errors .Is (err , io .EOF ) {
134- return fmt .Errorf ("failed to send data chunk: %w" , err )
135- }
136- return nil
137- }
147+ // if err := RunBadgerStream(ctx, ps, out, groupId); err != nil {
148+ // return fmt.Errorf("badger stream failed for group [%d]: %w", groupId, err)
149+ // }
138150
139- // Execute the stream process
140- if err := stream .Orchestrate (ctx ); err != nil {
141- return fmt .Errorf ("stream orchestration failed for group [%d]: %w" , groupId , err )
151+ if err := worker .RunBadgerStream (ctx , ps , out , groupId ); err != nil {
152+ return fmt .Errorf ("badger stream failed for group [%d]: %w" , groupId , err )
142153 }
143-
144- // Send the final 'done' signal to mark completion
145- glog .Infof ("Sending completion signal for group [%d]" , groupId )
146- done := & apiv2.StreamPacket {Done : true }
147-
148- if err := out .Send (& apiv2.StreamPDirRequest {StreamPacket : done }); err != nil && ! errors .Is (err , io .EOF ) {
149- return fmt .Errorf ("failed to send 'done' signal for group [%d]: %w" , groupId , err )
150- }
151- // Wait for acknowledgment from the server
152154 if _ , err := out .CloseAndRecv (); err != nil {
153155 return fmt .Errorf ("failed to receive ACK for group [%d]: %w" , groupId , err )
154156 }
155157 glog .Infof ("Group [%d]: Received ACK " , groupId )
156-
157158 return nil
158159}
0 commit comments