@@ -16,11 +16,11 @@ import (
1616 "github.com/hypermodeinc/dgraph/v25/posting"
1717 "github.com/hypermodeinc/dgraph/v25/protos/pb"
1818 "github.com/hypermodeinc/dgraph/v25/schema"
19- "google.golang.org/grpc"
2019
2120 "github.com/dustin/go-humanize"
2221 "github.com/golang/glog"
2322 "github.com/pkg/errors"
23+ "google.golang.org/grpc"
2424)
2525
2626// streamProcessor defines the common interface for stream processing
@@ -44,19 +44,19 @@ func ProposeDrain(ctx context.Context, drainMode *pb.DrainModeRequest) ([]uint32
4444 }
4545 continue
4646 }
47- glog .Infof ("Connecting to the leader of the group [%v] from alpha addr [%v]" , gid , groups ().Node .MyAddr )
47+ glog .Infof ("[import] Connecting to the leader of the group [%v] from alpha addr [%v]" , gid , groups ().Node .MyAddr )
4848
4949 pl := groups ().Leader (gid )
5050 if pl == nil {
51- glog .Errorf ("Unable to connect to the leader of group [%v]" , gid )
51+ glog .Errorf ("[import] unable to connect to the leader of group [%v]" , gid )
5252 return nil , fmt .Errorf ("unable to connect to the leader of group [%v] : %v" , gid , conn .ErrNoConnection )
5353 }
5454 con := pl .Get ()
5555 c := pb .NewWorkerClient (con )
56- glog .Infof ("Successfully connected to leader of group [%v]" , gid )
56+ glog .Infof ("[import] Successfully connected to leader of group [%v]" , gid )
5757
5858 if _ , err := c .ApplyDrainmode (ctx , drainMode ); err != nil {
59- glog .Errorf ("Unable to apply drainmode : %v" , err )
59+ glog .Errorf ("[import] unable to apply drainmode : %v" , err )
6060 return nil , err
6161 }
6262 }
@@ -73,7 +73,7 @@ func ProposeDrain(ctx context.Context, drainMode *pb.DrainModeRequest) ([]uint32
7373func InStream (stream apiv25.Dgraph_StreamSnapshotServer ) error {
7474 req , err := stream .Recv ()
7575 if err != nil {
76- return fmt .Errorf ("Failed to receive initial stream message: %v" , err )
76+ return fmt .Errorf ("failed to receive initial stream message: %v" , err )
7777 }
7878
7979 groupId := req .GroupId
@@ -83,15 +83,15 @@ func InStream(stream apiv25.Dgraph_StreamSnapshotServer) error {
8383
8484 pl := groups ().Leader (groupId )
8585 if pl == nil {
86- glog .Errorf ("Unable to connect to the leader of group [%v]" , groupId )
86+ glog .Errorf ("[import] Unable to connect to the leader of group [%v]" , groupId )
8787 return fmt .Errorf ("unable to connect to the leader of group [%v] : %v" , groupId , conn .ErrNoConnection )
8888 }
8989
9090 con := pl .Get ()
9191 c := pb .NewWorkerClient (con )
9292 alphaStream , err := c .InternalStreamSnapshot (stream .Context ())
9393 if err != nil {
94- return fmt .Errorf ("Failed to establish stream with leader: %v" , err )
94+ return fmt .Errorf ("failed to establish stream with leader: %v" , err )
9595 }
9696
9797 return pipeTwoStream (stream , alphaStream )
@@ -107,14 +107,14 @@ func pipeTwoStream(in apiv25.Dgraph_StreamSnapshotServer, out pb.Worker_Internal
107107 for {
108108 select {
109109 case <- ctx .Done ():
110- glog .Info ("Context cancelled, stopping receive goroutine." )
111- errCh <- fmt .Errorf ("Context deadline exceeded" )
110+ glog .Info ("[import] Context cancelled, stopping receive goroutine." )
111+ errCh <- fmt .Errorf ("context deadline exceeded" )
112112 return
113113 default :
114114 msg , err := in .Recv ()
115115 if err != nil {
116116 if ! errors .Is (err , io .EOF ) {
117- glog .Errorf ("Error receiving from in stream: %v" , err )
117+ glog .Errorf ("[import] Error receiving from in stream: %v" , err )
118118 errCh <- err
119119 }
120120 return
@@ -145,33 +145,33 @@ Loop:
145145 glog .Errorf ("Error sending 'done' to out stream: %v" , err )
146146 return err
147147 }
148- glog .Infoln ("All key-values have been transferred." )
148+ glog .Infoln ("[import] All key-values have been transferred." )
149149 break Loop
150150 }
151151
152152 if err := out .Send (data ); err != nil {
153- glog .Errorf ("Error sending to outstream: %v" , err )
154- return err
153+ glog .Errorf ("[import] Error sending to outstream: %v" , err )
154+ return fmt . Errorf ( "error sending to outstream: %v" , err )
155155 }
156156
157157 size += len (msg .Pairs .Data )
158- glog .Infof ("Sent batch of size: %s. Total so far: %s\n " ,
158+ glog .Infof ("[import] Sent batch of size: %s. Total so far: %s\n " ,
159159 humanize .IBytes (uint64 (len (msg .Pairs .Data ))), humanize .IBytes (uint64 (size )))
160160 }
161161 }
162162
163163 // Close the incoming stream properly
164164 if err := in .SendAndClose (& apiv25.StreamSnapshotResponse {Done : true }); err != nil {
165- return fmt .Errorf ("Failed to send close on in: %v" , err )
165+ return fmt .Errorf ("failed to send close on in: %v" , err )
166166 }
167167
168168 // Wait for ACK from the out stream
169169 _ , err := out .CloseAndRecv ()
170170 if err != nil {
171- return fmt .Errorf ("Failed to receive ACK from out stream: %w" , err )
171+ return fmt .Errorf ("failed to receive ACK from out stream: %w" , err )
172172 }
173173
174- glog .Info ("Received ACK" )
174+ glog .Info ("[import] Received ACK" )
175175 return nil
176176}
177177
@@ -225,13 +225,13 @@ func processStreamData(stream streamProcessor) error {
225225 kvs := req .GetPairs ()
226226 // Check if all key-value pairs have been received.
227227 if kvs != nil && kvs .Done {
228- glog .Info ("All key-values have been received." )
228+ glog .Info ("[import] All key-values have been received." )
229229 break
230230 }
231231
232232 // Increment the total size and log the batch size received.
233233 size += len (kvs .Data )
234- glog .Infof ("Received batch of size: %s. Total so far: %s\n " ,
234+ glog .Infof ("[import] Received batch of size: %s. Total so far: %s\n " ,
235235 humanize .IBytes (uint64 (len (kvs .Data ))), humanize .IBytes (uint64 (size )))
236236
237237 // Write the received data to BadgerDB.
@@ -246,18 +246,18 @@ func processStreamData(stream streamProcessor) error {
246246 return err
247247 }
248248
249- glog .Info ("P dir writes DONE. Sending ACK" )
249+ glog .Info ("[import] P dir writes DONE. Sending ACK" )
250250
251251 // Send an acknowledgment to the leader indicating completion.
252252 return stream .SendAndClose (& apiv25.StreamSnapshotResponse {Done : true })
253253}
254254
255255func postStreamProcessing (ctx context.Context ) error {
256256 if err := schema .LoadFromDb (ctx ); err != nil {
257- return errors .Wrapf (err , "Cannot load schema after streaming data" )
257+ return errors .Wrapf (err , "cannot load schema after streaming data" )
258258 }
259259 if err := UpdateMembershipState (ctx ); err != nil {
260- return errors .Wrapf (err , "Cannot update membership state after streaming data" )
260+ return errors .Wrapf (err , "cannot update membership state after streaming data" )
261261 }
262262
263263 gr .informZeroAboutTablets ()
0 commit comments