@@ -127,23 +127,18 @@ func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir str
127127 if err != nil {
128128 return fmt .Errorf ("failed to start external snapshot stream for group %d: %w" , groupId , err )
129129 }
130-
131130 defer func () {
132- if _ , err := out .CloseAndRecv (); err != nil {
133- glog .Errorf ("failed to close the stream for group [%v]: %v" , groupId , err )
134- }
135-
136- glog .Infof ("[import] Group [%v]: Received ACK " , groupId )
131+ _ = out .CloseSend ()
137132 }()
138133
139134 // Open the BadgerDB instance at the specified directory
140135 opt := badger .DefaultOptions (pdir )
136+ opt .ReadOnly = true
141137 ps , err := badger .OpenManaged (opt )
142138 if err != nil {
143139 glog .Errorf ("failed to open BadgerDB at [%s]: %v" , pdir , err )
144140 return fmt .Errorf ("failed to open BadgerDB at [%v]: %v" , pdir , err )
145141 }
146-
147142 defer func () {
148143 if err := ps .Close (); err != nil {
149144 glog .Warningf ("[import] Error closing BadgerDB: %v" , err )
@@ -154,17 +149,18 @@ func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir str
154149 glog .Infof ("[import] Sending request for streaming external snapshot for group ID [%v]" , groupId )
155150 groupReq := & apiv2.StreamExtSnapshotRequest {GroupId : groupId }
156151 if err := out .Send (groupReq ); err != nil {
157- return fmt .Errorf ("failed to send request for streaming external snapshot for group ID [%v] to the server: %w" ,
158- groupId , err )
152+ return fmt .Errorf ("failed to send request for group ID [%v] to the server: %w" , groupId , err )
153+ }
154+ if _ , err := out .Recv (); err != nil {
155+ return fmt .Errorf ("failed to receive response for group ID [%v] from the server: %w" , groupId , err )
159156 }
157+ glog .Infof ("[import] Group [%v]: Received ACK for sending group request" , groupId )
160158
161159 // Configure and start the BadgerDB stream
162160 glog .Infof ("[import] Starting BadgerDB stream for group [%v]" , groupId )
163-
164161 if err := streamBadger (ctx , ps , out , groupId ); err != nil {
165162 return fmt .Errorf ("badger streaming failed for group [%v]: %v" , groupId , err )
166163 }
167-
168164 return nil
169165}
170166
@@ -180,6 +176,11 @@ func streamBadger(ctx context.Context, ps *badger.DB, out apiv2.Dgraph_StreamExt
180176 if err := out .Send (& apiv2.StreamExtSnapshotRequest {Pkt : p }); err != nil && ! errors .Is (err , io .EOF ) {
181177 return fmt .Errorf ("failed to send data chunk: %w" , err )
182178 }
179+ if _ , err := out .Recv (); err != nil {
180+ return fmt .Errorf ("failed to receive response for group ID [%v] from the server: %w" , groupId , err )
181+ }
182+ glog .Infof ("[import] Group [%v]: Received ACK for sending data chunk" , groupId )
183+
183184 return nil
184185 }
185186
@@ -196,5 +197,10 @@ func streamBadger(ctx context.Context, ps *badger.DB, out apiv2.Dgraph_StreamExt
196197 return fmt .Errorf ("failed to send 'done' signal for group [%d]: %w" , groupId , err )
197198 }
198199
200+ if _ , err := out .Recv (); err != nil {
201+ return fmt .Errorf ("failed to receive response for group ID [%v] from the server: %w" , groupId , err )
202+ }
203+ glog .Infof ("[import] Group [%v]: Received ACK for sending completion signal" , groupId )
204+
199205 return nil
200206}
0 commit comments