@@ -103,7 +103,7 @@ func TestImportApis(t *testing.T) {
103103 negativeTestCase bool // True if this is an expected failure case
104104 description string
105105 err string
106- sleep bool
106+ waitForSnapshot bool
107107 }{
108108 {
109109 name : "SingleGroupShutTwoAlphasPerGroup" ,
@@ -154,7 +154,7 @@ func TestImportApis(t *testing.T) {
154154 negativeTestCase : false ,
155155 description : "Single group with multiple alphas, shutdown 1 alpha" ,
156156 err : "" ,
157- sleep : true ,
157+ waitForSnapshot : true ,
158158 },
159159 {
160160 name : "TwoGroupShutOneAlphaPerGroup" ,
@@ -165,7 +165,7 @@ func TestImportApis(t *testing.T) {
165165 negativeTestCase : false ,
166166 description : "Multiple groups with multiple alphas, shutdown 2 alphas per group" ,
167167 err : "" ,
168- sleep : true ,
168+ waitForSnapshot : true ,
169169 },
170170 {
171171 name : "ThreeGroupShutOneAlphaPerGroup" ,
@@ -176,7 +176,7 @@ func TestImportApis(t *testing.T) {
176176 negativeTestCase : false ,
177177 description : "Three groups with 3 alphas each, shutdown 1 alpha per group" ,
178178 err : "" ,
179- sleep : true ,
179+ waitForSnapshot : true ,
180180 },
181181 {
182182 name : "SingleGroupAllAlphasOnline" ,
@@ -218,18 +218,18 @@ func TestImportApis(t *testing.T) {
218218 t .Logf ("Running test case: %s" , tt .description )
219219 }
220220 runImportTest (t , tt .bulkAlphas , tt .targetAlphas , tt .replicasFactor , tt .downAlphas , tt .negativeTestCase ,
221- tt .err , tt .sleep )
221+ tt .err , tt .waitForSnapshot )
222222 })
223223 }
224224}
225225
226226func runImportTest (t * testing.T , bulkAlphas , targetAlphas , replicasFactor , numDownAlphas int , negative bool ,
227- errStr string , sleep bool ) {
227+ errStr string , waitForSnapshot bool ) {
228228 bulkCluster , baseDir := setupBulkCluster (t , bulkAlphas )
229229 defer func () { bulkCluster .Cleanup (t .Failed ()) }()
230230
231231 targetCluster , gc , gcCleanup := setupTargetCluster (t , targetAlphas , replicasFactor )
232- defer func () { targetCluster .Cleanup (t .Failed ()) }()
232+ // defer func() { targetCluster.Cleanup(t.Failed()) }()
233233 defer gcCleanup ()
234234
235235 _ , err := gc .Query ("schema{}" )
@@ -310,8 +310,20 @@ func runImportTest(t *testing.T, bulkAlphas, targetAlphas, replicasFactor, numDo
310310
311311 require .NoError (t , targetCluster .HealthCheck (false ))
312312
313- if sleep {
314- time .Sleep (1 * time .Minute )
313+ if waitForSnapshot {
314+ grp := 1
315+ for _ , alphas := range alphaGroups {
316+ for i := 0 ; i < numDownAlphas ; i ++ {
317+ hc , err := targetCluster .GetAlphaHttpClient (alphas [i ])
318+ require .NoError (t , err )
319+
320+ prevTs , err := hc .GetCurrentSnapshotTs (uint64 (grp ))
321+ require .NoError (t , err )
322+ _ , err = hc .WaitForSnapshot (uint64 (grp ), prevTs )
323+ require .NoError (t , err )
324+ }
325+ grp ++
326+ }
315327 }
316328
317329 t .Log ("Import completed" )
@@ -368,24 +380,49 @@ func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (*dgraphtes
368380 return cluster , gc , cleanup
369381}
370382
371- // verifyImportResults validates the result of an import operation
383+ // verifyImportResults validates the result of an import operation with retry logic
372384func verifyImportResults (t * testing.T , gc * dgraphapi.GrpcClient ) {
373- // Check schema after streaming process
374- schemaResp , err := gc .Query ("schema{}" )
375- require .NoError (t , err )
376- // Compare the schema response with the expected schema
377- var actualSchema , expectedSchemaObj map [string ]interface {}
378- require .NoError (t , json .Unmarshal (schemaResp .Json , & actualSchema ))
379- require .NoError (t , json .Unmarshal ([]byte (expectedSchema ), & expectedSchemaObj ))
385+ maxRetries := 10
386+ retryDelay := 500 * time .Millisecond
387+ hasAllPredicates := true
380388
381- // Check if the actual schema contains all the predicates from expected schema
382- actualPredicates := getPredicateMap (actualSchema )
389+ // Get expected predicates first
390+ var expectedSchemaObj map [string ]interface {}
391+ require .NoError (t , json .Unmarshal ([]byte (expectedSchema ), & expectedSchemaObj ))
383392 expectedPredicates := getPredicateMap (expectedSchemaObj )
384393
385- for predName , predDetails := range expectedPredicates {
386- actualPred , exists := actualPredicates [predName ]
387- require .True (t , exists , "Predicate '%s' not found in actual schema" , predName )
388- require .Equal (t , predDetails , actualPred , "Predicate '%s' details don't match" , predName )
394+ for i := 0 ; i < maxRetries ; i ++ {
395+ schemaResp , err := gc .Query ("schema{}" )
396+ require .NoError (t , err )
397+
398+ // Parse schema response
399+ var actualSchema map [string ]interface {}
400+ require .NoError (t , json .Unmarshal (schemaResp .Json , & actualSchema ))
401+
402+ // Get actual predicates
403+ actualPredicates := getPredicateMap (actualSchema )
404+
405+ // Check if all expected predicates are present
406+ for predName := range expectedPredicates {
407+ if _ , exists := actualPredicates [predName ]; ! exists {
408+ hasAllPredicates = false
409+ break
410+ }
411+ }
412+
413+ if hasAllPredicates {
414+ break
415+ }
416+
417+ if i < maxRetries - 1 {
418+ t .Logf ("Not all predicates found yet, retrying in %v" , retryDelay )
419+ time .Sleep (retryDelay )
420+ retryDelay *= 2
421+ }
422+ }
423+
424+ if ! hasAllPredicates {
425+ t .Fatalf ("Not all predicates found in schema" )
389426 }
390427
391428 for _ , tt := range common .OneMillionTCs {
0 commit comments