Skip to content

Commit d425167

Browse files
ShivaShiva
authored andcommitted
dont send group id when forwarding stream to alpha leader from proxy alpha
1 parent f98cfaf commit d425167

1 file changed

Lines changed: 2 additions & 8 deletions

File tree

worker/import.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ func InStream(stream api.Dgraph_StreamExtSnapshotServer) error {
298298
return fmt.Errorf("failed to establish stream with leader: %v", err)
299299
}
300300
glog.Infof("[import] [forward %d -> %d] start", groups().Node.gid, groupId)
301-
glog.Infof("[import] [forward %d -> %d] start", groups().Node.MyAddr, groups().Leader(groupId).Addr)
301+
glog.Infof("[import] [forward %v -> %d] start", groups().Node.MyAddr, groups().Leader(groupId).Addr)
302302

303303
glog.Infof("[import] sending forward true to leader of group [%v]", groupId)
304304
forwardReq := &api.StreamExtSnapshotRequest{Forward: true}
@@ -313,12 +313,6 @@ func InStream(stream api.Dgraph_StreamExtSnapshotServer) error {
313313
func pipeTwoStream(in api.Dgraph_StreamExtSnapshotServer, out pb.Worker_StreamExtSnapshotClient, groupId uint32) error {
314314
currentGroup := groups().Node.gid
315315
ctx := in.Context()
316-
if err := out.Send(&api.StreamExtSnapshotRequest{GroupId: groupId}); err != nil {
317-
return fmt.Errorf("send groupId downstream(%d): %w", groupId, err)
318-
}
319-
if _, err := out.Recv(); err != nil {
320-
return fmt.Errorf("ack groupId downstream(%d): %w", groupId, err)
321-
}
322316

323317
for {
324318
if err := ctx.Err(); err != nil {
@@ -487,7 +481,7 @@ func streamInGroup(stream api.Dgraph_StreamExtSnapshotServer, forward bool) erro
487481
if forward {
488482
// We are not going to return any error from here because we care about the majority of nodes.
489483
// If the majority of nodes are able to receive the data, the remaining ones can catch up later.
490-
glog.Infof("[import] Streaming external snapshot to [%v] from [%v] forward [%v]", member.Addr, node.MyAddr)
484+
glog.Infof("[import] Streaming external snapshot to [%v] from [%v]", member.Addr, node.MyAddr)
491485
eg.Go(func() error {
492486
glog.Infof(`[import:forward] streaming external snapshot to [%v] from [%v]`, member.Addr, node.MyAddr)
493487
if member.AmDead {

0 commit comments

Comments
 (0)