@@ -127,23 +127,18 @@ func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir str
127127 if err != nil {
128128 return fmt .Errorf ("failed to start external snapshot stream for group %d: %w" , groupId , err )
129129 }
130-
131130 defer func () {
132- if _ , err := out .CloseAndRecv (); err != nil {
133- glog .Errorf ("failed to close the stream for group [%v]: %v" , groupId , err )
134- }
135-
136- glog .Infof ("[import] Group [%v]: Received ACK " , groupId )
131+ _ = out .CloseSend ()
137132 }()
138133
139134 // Open the BadgerDB instance at the specified directory
140135 opt := badger .DefaultOptions (pdir )
136+ opt .ReadOnly = true
141137 ps , err := badger .OpenManaged (opt )
142138 if err != nil {
143139 glog .Errorf ("failed to open BadgerDB at [%s]: %v" , pdir , err )
144140 return fmt .Errorf ("failed to open BadgerDB at [%v]: %v" , pdir , err )
145141 }
146-
147142 defer func () {
148143 if err := ps .Close (); err != nil {
149144 glog .Warningf ("[import] Error closing BadgerDB: %v" , err )
@@ -154,17 +149,14 @@ func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir str
154149 glog .Infof ("[import] Sending request for streaming external snapshot for group ID [%v]" , groupId )
155150 groupReq := & apiv2.StreamExtSnapshotRequest {GroupId : groupId }
156151 if err := out .Send (groupReq ); err != nil {
157- return fmt .Errorf ("failed to send request for streaming external snapshot for group ID [%v] to the server: %w" ,
158- groupId , err )
152+ return fmt .Errorf ("failed to send request for group ID [%v] to the server: %w" , groupId , err )
159153 }
160154
161155 // Configure and start the BadgerDB stream
162156 glog .Infof ("[import] Starting BadgerDB stream for group [%v]" , groupId )
163-
164157 if err := streamBadger (ctx , ps , out , groupId ); err != nil {
165158 return fmt .Errorf ("badger streaming failed for group [%v]: %v" , groupId , err )
166159 }
167-
168160 return nil
169161}
170162
@@ -180,6 +172,11 @@ func streamBadger(ctx context.Context, ps *badger.DB, out apiv2.Dgraph_StreamExt
180172 if err := out .Send (& apiv2.StreamExtSnapshotRequest {Pkt : p }); err != nil && ! errors .Is (err , io .EOF ) {
181173 return fmt .Errorf ("failed to send data chunk: %w" , err )
182174 }
175+ if _ , err := out .Recv (); err != nil {
176+ return fmt .Errorf ("failed to receive response for group ID [%v] from the server: %w" , groupId , err )
177+ }
178+ glog .Infof ("[import] Group [%v]: Received ACK for sending data chunk" , groupId )
179+
183180 return nil
184181 }
185182
@@ -196,5 +193,10 @@ func streamBadger(ctx context.Context, ps *badger.DB, out apiv2.Dgraph_StreamExt
196193 return fmt .Errorf ("failed to send 'done' signal for group [%d]: %w" , groupId , err )
197194 }
198195
196+ if _ , err := out .Recv (); err != nil {
197+ return fmt .Errorf ("failed to receive response for group ID [%v] from the server: %w" , groupId , err )
198+ }
199+ glog .Infof ("[import] Group [%v]: Received ACK for sending completion signal" , groupId )
200+
199201 return nil
200202}
0 commit comments