@@ -7,6 +7,7 @@ package dgraphimport
77
88import (
99 "context"
10+ "errors"
1011 "fmt"
1112 "io"
1213 "math"
@@ -22,11 +23,11 @@ import (
2223 "google.golang.org/grpc"
2324)
2425
25- // NewClient creates a new import client with the specified endpoint and gRPC options.
26+ // newClient creates a new import client with the specified endpoint and gRPC options.
2627func newClient (endpoint string , opts grpc.DialOption ) (apiv25.DgraphClient , error ) {
2728 conn , err := grpc .NewClient (endpoint , opts )
2829 if err != nil {
29- return nil , fmt .Errorf ("failed to connect to endpoint [%s]: %w" , endpoint , err )
30+ return nil , fmt .Errorf ("Failed to connect to endpoint [%s]: %w" , endpoint , err )
3031 }
3132
3233 glog .Infof ("Successfully connected to Dgraph endpoint: %s" , endpoint )
@@ -46,20 +47,20 @@ func Import(ctx context.Context, endpoint string, opts grpc.DialOption, bulkOutD
4647 return sendSnapshot (ctx , dg , bulkOutDir , resp .Groups )
4748}
4849
49- // StartSnapshotStream initiates a snapshot stream session with the Dgraph server.
50+ // startSnapshotStream initiates a snapshot stream session with the Dgraph server.
5051func startSnapshotStream (ctx context.Context , dc apiv25.DgraphClient ) (* apiv25.InitiateSnapshotStreamResponse , error ) {
51- glog .V ( 2 ). Infof ("Initiating snapshot stream" )
52+ glog .Info ("Initiating snapshot stream" )
5253 req := & apiv25.InitiateSnapshotStreamRequest {}
5354 resp , err := dc .InitiateSnapshotStream (ctx , req )
5455 if err != nil {
5556 glog .Errorf ("Failed to initiate snapshot stream: %v" , err )
56- return nil , err
57+ return nil , fmt . Errorf ( "Failed to initiate snapshot stream: %v" , err )
5758 }
58- glog .Infof ("Snapshot stream initiated successfully" )
59+ glog .Info ("Snapshot stream initiated successfully" )
5960 return resp , nil
6061}
6162
62- // SendSnapshot takes a p directory and a set of group IDs and streams the data from the
63+ // sendSnapshot takes a p directory and a set of group IDs and streams the data from the
6364// p directory to the corresponding group IDs. It first scans the provided directory for
6465// subdirectories named with numeric group IDs.
6566func sendSnapshot (ctx context.Context , dg apiv25.DgraphClient , baseDir string , groups []uint32 ) error {
@@ -71,11 +72,12 @@ func sendSnapshot(ctx context.Context, dg apiv25.DgraphClient, baseDir string, g
7172 errG .Go (func () error {
7273 pDir := filepath .Join (baseDir , fmt .Sprintf ("%d" , group - 1 ), "p" )
7374 if _ , err := os .Stat (pDir ); err != nil {
74- return fmt .Errorf ("p directory does not exist for group %d: %s " , group , pDir )
75+ return fmt .Errorf ("p directory does not exist for group [%d]: [%s] " , group , pDir )
7576 }
7677
77- glog .Infof ("Streaming data for group %d from directory: %s " , group , pDir )
78+ glog .Infof ("Streaming data for group [%d] from directory: [%s] " , group , pDir )
7879 if err := streamData (ctx , dg , pDir , group ); err != nil {
80+ glog .Errorf ("Failed to stream data for groups [%v] from directory: [%s]: %v" , group , pDir , err )
7981 return err
8082 }
8183
@@ -98,55 +100,59 @@ func streamData(ctx context.Context, dg apiv25.DgraphClient, pdir string, groupI
98100 // Initialize stream with the server
99101 out , err := dg .StreamSnapshot (ctx )
100102 if err != nil {
101- return fmt .Errorf ("failed to start snapshot stream: %w" , err )
103+ return fmt .Errorf ("failed to start snapshot stream for group %d : %w" , groupId , err )
102104 }
103105
104106 // Open the BadgerDB instance at the specified directory
105107 opt := badger .DefaultOptions (pdir )
106108 ps , err := badger .OpenManaged (opt )
107109 if err != nil {
108- return fmt .Errorf ("failed to open BadgerDB at %s : %w" , pdir , err )
110+ return fmt .Errorf ("failed to open BadgerDB at [%s] : %w" , pdir , err )
109111 }
110- defer ps .Close ()
112+
113+ defer func () {
114+ if err := ps .Close (); err != nil {
115+ glog .Warningf ("Error closing BadgerDB: %v" , err )
116+ }
117+ }()
111118
112119 // Send group ID as the first message in the stream
113- glog .V (2 ).Infof ("Sending group ID %d to server" , groupId )
120+ glog .V (2 ).Infof ("Sending group ID [%d] to server" , groupId )
114121 groupReq := & apiv25.StreamSnapshotRequest {GroupId : groupId }
115122 if err := out .Send (groupReq ); err != nil {
116- return fmt .Errorf ("failed to send group ID %d : %w" , groupId , err )
123+ return fmt .Errorf ("failed to send group ID [%d] : %w" , groupId , err )
117124 }
118125
119126 // Configure and start the BadgerDB stream
120- glog .V (2 ).Infof ("Starting BadgerDB stream for group %d " , groupId )
127+ glog .V (2 ).Infof ("Starting BadgerDB stream for group [%d] " , groupId )
121128 stream := ps .NewStreamAt (math .MaxUint64 )
122- stream .LogPrefix = fmt .Sprintf ("Sending P dir ( group %d) " , groupId )
129+ stream .LogPrefix = fmt .Sprintf ("Sending P dir for group [%d] " , groupId )
123130 stream .KeyToList = nil
124131 stream .Send = func (buf * z.Buffer ) error {
125132 kvs := & apiv25.KVS {Data : buf .Bytes ()}
126- if err := out .Send (& apiv25.StreamSnapshotRequest {Pairs : kvs }); err != nil && err != io .EOF {
133+ if err := out .Send (& apiv25.StreamSnapshotRequest {Pairs : kvs }); err != nil && ! errors . Is ( err , io .EOF ) {
127134 return fmt .Errorf ("failed to send data chunk: %w" , err )
128135 }
129136 return nil
130137 }
131138
132139 // Execute the stream process
133140 if err := stream .Orchestrate (ctx ); err != nil {
134- return fmt .Errorf ("stream orchestration failed for group %d : %w" , groupId , err )
141+ return fmt .Errorf ("stream orchestration failed for group [%d] : %w" , groupId , err )
135142 }
136143
137144 // Send the final 'done' signal to mark completion
138- glog .V (2 ).Infof ("Sending completion signal for group %d " , groupId )
145+ glog .V (2 ).Infof ("Sending completion signal for group [%d] " , groupId )
139146 done := & apiv25.KVS {Done : true }
140147
141- if err := out .Send (& apiv25.StreamSnapshotRequest {Pairs : done }); err != nil && err != io .EOF {
142- return fmt .Errorf ("failed to send 'done' signal for group %d : %w" , groupId , err )
148+ if err := out .Send (& apiv25.StreamSnapshotRequest {Pairs : done }); err != nil && ! errors . Is ( err , io .EOF ) {
149+ return fmt .Errorf ("failed to send 'done' signal for group [%d] : %w" , groupId , err )
143150 }
144151 // Wait for acknowledgment from the server
145- ack , err := out .CloseAndRecv ()
146- if err != nil {
147- return fmt .Errorf ("failed to receive ACK for group %d: %w" , groupId , err )
152+ if _ , err := out .CloseAndRecv (); err != nil {
153+ return fmt .Errorf ("failed to receive ACK for group [%d]: %w" , groupId , err )
148154 }
149- glog .Infof ("Group %d : Received ACK with message: %v " , groupId , ack . Done )
155+ glog .Infof ("Group [%d] : Received ACK " , groupId )
150156
151157 return nil
152158}
0 commit comments