Skip to content

Commit c245e02

Browse files
Add support for HA and multishard functionality in import APIs
1 parent 684cbb2 commit c245e02

10 files changed

Lines changed: 1389 additions & 899 deletions

File tree

dgraph/cmd/dgraphimport/import_client.go

Lines changed: 36 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,13 @@ package dgraphimport
77

88
import (
99
"context"
10-
"errors"
1110
"fmt"
12-
"io"
13-
"math"
1411
"os"
1512
"path/filepath"
1613

1714
"github.com/dgraph-io/badger/v4"
1815
apiv2 "github.com/dgraph-io/dgo/v250/protos/api.v2"
19-
"github.com/dgraph-io/ristretto/v2/z"
16+
"github.com/hypermodeinc/dgraph/v25/worker"
2017

2118
"github.com/golang/glog"
2219
"golang.org/x/sync/errgroup"
@@ -48,10 +45,13 @@ func Import(ctx context.Context, endpoint string, opts grpc.DialOption, bulkOutD
4845
}
4946

5047
// startPDirStream initiates a snapshot stream session with the Dgraph server.
51-
func startPDirStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.InitiatePDirStreamResponse, error) {
48+
func startPDirStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.DrainModeResponse, error) {
5249
glog.Info("Initiating pdir stream")
53-
req := &apiv2.InitiatePDirStreamRequest{}
54-
resp, err := dc.InitiatePDirStream(ctx, req)
50+
req := &apiv2.DrainModeRequest{
51+
DrainMode: true,
52+
DropData: false,
53+
}
54+
resp, err := dc.UpdateClusterDrainMode(ctx, req)
5555
if err != nil {
5656
glog.Errorf("failed to initiate pdir stream: %v", err)
5757
return nil, fmt.Errorf("failed to initiate pdir stream: %v", err)
@@ -63,7 +63,7 @@ func startPDirStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.Initiat
6363
// sendPDir takes a p directory and a set of group IDs and streams the data from the
6464
// p directory to the corresponding group IDs. It first scans the provided directory for
6565
// subdirectories named with numeric group IDs.
66-
func sendPDir(ctx context.Context, dg apiv2.DgraphClient, baseDir string, groups []uint32) error {
66+
func sendPDir(ctx context.Context, dc apiv2.DgraphClient, baseDir string, groups []uint32) error {
6767
glog.Infof("Starting to stream pdir from directory: %s", baseDir)
6868

6969
errG, ctx := errgroup.WithContext(ctx)
@@ -74,20 +74,39 @@ func sendPDir(ctx context.Context, dg apiv2.DgraphClient, baseDir string, groups
7474
if _, err := os.Stat(pDir); err != nil {
7575
return fmt.Errorf("p directory does not exist for group [%d]: [%s]", group, pDir)
7676
}
77-
7877
glog.Infof("Streaming data for group [%d] from directory: [%s]", group, pDir)
79-
if err := streamData(ctx, dg, pDir, group); err != nil {
78+
if err := streamData(ctx, dc, pDir, group); err != nil {
8079
glog.Errorf("Failed to stream data for groups [%v] from directory: [%s]: %v", group, pDir, err)
8180
return err
8281
}
8382

8483
return nil
8584
})
8685
}
87-
if err := errG.Wait(); err != nil {
88-
return err
86+
if err1 := errG.Wait(); err1 != nil {
87+
// If the p directory doesn't exist for this group, it indicates that
88+
// streaming might be in progress to other groups. We disable drain mode
89+
// to prevent interference and drop any streamed data to ensure a clean state.
90+
req := &apiv2.DrainModeRequest{
91+
DrainMode: false,
92+
DropData: true,
93+
}
94+
if _, err := dc.UpdateClusterDrainMode(context.Background(), req); err != nil {
95+
return fmt.Errorf("failed to stream data :%v failed to off drain mode: %v", err1, err)
96+
}
97+
98+
glog.Info("success fully disabled drain mode")
99+
return err1
89100
}
90101

102+
glog.Info("Completed streaming all pdirs")
103+
req := &apiv2.DrainModeRequest{
104+
DrainMode: false,
105+
DropData: false,
106+
}
107+
if _, err := dc.UpdateClusterDrainMode(context.Background(), req); err != nil {
108+
return fmt.Errorf("failed to off drain mode: %v", err)
109+
}
91110
glog.Infof("Completed streaming all pdirs")
92111
return nil
93112
}
@@ -125,34 +144,16 @@ func streamData(ctx context.Context, dg apiv2.DgraphClient, pdir string, groupId
125144

126145
// Configure and start the BadgerDB stream
127146
glog.Infof("Starting BadgerDB stream for group [%d]", groupId)
128-
stream := ps.NewStreamAt(math.MaxUint64)
129-
stream.LogPrefix = fmt.Sprintf("Sending P dir for group [%d]", groupId)
130-
stream.KeyToList = nil
131-
stream.Send = func(buf *z.Buffer) error {
132-
p := &apiv2.StreamPacket{Data: buf.Bytes()}
133-
if err := out.Send(&apiv2.StreamPDirRequest{StreamPacket: p}); err != nil && !errors.Is(err, io.EOF) {
134-
return fmt.Errorf("failed to send data chunk: %w", err)
135-
}
136-
return nil
137-
}
147+
// if err := RunBadgerStream(ctx, ps, out, groupId); err != nil {
148+
// return fmt.Errorf("badger stream failed for group [%d]: %w", groupId, err)
149+
// }
138150

139-
// Execute the stream process
140-
if err := stream.Orchestrate(ctx); err != nil {
141-
return fmt.Errorf("stream orchestration failed for group [%d]: %w", groupId, err)
151+
if err := worker.RunBadgerStream(ctx, ps, out, groupId); err != nil {
152+
return fmt.Errorf("badger stream failed for group [%d]: %w", groupId, err)
142153
}
143-
144-
// Send the final 'done' signal to mark completion
145-
glog.Infof("Sending completion signal for group [%d]", groupId)
146-
done := &apiv2.StreamPacket{Done: true}
147-
148-
if err := out.Send(&apiv2.StreamPDirRequest{StreamPacket: done}); err != nil && !errors.Is(err, io.EOF) {
149-
return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err)
150-
}
151-
// Wait for acknowledgment from the server
152154
if _, err := out.CloseAndRecv(); err != nil {
153155
return fmt.Errorf("failed to receive ACK for group [%d]: %w", groupId, err)
154156
}
155157
glog.Infof("Group [%d]: Received ACK ", groupId)
156-
157158
return nil
158159
}

dgraph/cmd/dgraphimport/import_test.go

Lines changed: 198 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ package dgraphimport
99
import (
1010
"context"
1111
"encoding/json"
12+
"fmt"
1213
"path/filepath"
14+
"strconv"
15+
"strings"
1316
"testing"
1417

1518
"github.com/hypermodeinc/dgraph/v25/dgraphapi"
@@ -91,39 +94,223 @@ func TestDrainModeAfterStartSnapshotStream(t *testing.T) {
9194
// TestImportApis tests import functionality with different cluster configurations
9295
func TestImportApis(t *testing.T) {
9396
tests := []struct {
94-
name string
95-
bulkAlphas int
96-
targetAlphas int
97-
replicasFactor int
97+
name string
98+
bulkAlphas int // Number of alphas in source cluster
99+
targetAlphas int // Number of alphas in target cluster
100+
replicasFactor int // Number of replicas for each group
101+
downAlphas int // Number of alphas to be shutdown
102+
negativeTestCase bool // True if this is an expected failure case
103+
description string
104+
err string
98105
}{
99-
{"SingleGroupSingleAlpha", 1, 1, 1},
100-
{"TwoGroupsSingleAlpha", 2, 2, 1},
101-
{"ThreeGroupsSingleAlpha", 3, 3, 1},
106+
{
107+
name: "SingleGroupShutTwoAlphasPerGroup",
108+
bulkAlphas: 1,
109+
targetAlphas: 3,
110+
replicasFactor: 3,
111+
downAlphas: 2,
112+
negativeTestCase: true,
113+
description: "Single group with 3 alphas, shutdown 2 alphas",
114+
err: "failed to initiate pdir stream",
115+
},
116+
{
117+
name: "TwoGroupShutTwoAlphasPerGroup",
118+
bulkAlphas: 2,
119+
targetAlphas: 6,
120+
replicasFactor: 3,
121+
downAlphas: 2,
122+
negativeTestCase: true,
123+
description: "Two groups with 3 alphas each, shutdown 2 alphas per group",
124+
err: "failed to initiate pdir stream",
125+
},
126+
{
127+
name: "TwoGroupShutTwoAlphasPerGroupNoPDir",
128+
bulkAlphas: 1,
129+
targetAlphas: 6,
130+
replicasFactor: 3,
131+
downAlphas: 0,
132+
negativeTestCase: true,
133+
description: "Two groups with 3 alphas each, 1 p directory is not available",
134+
err: "p directory does not exist for group [2]",
135+
},
136+
{
137+
name: "ThreeGroupShutTwoAlphasPerGroup",
138+
bulkAlphas: 3,
139+
targetAlphas: 9,
140+
replicasFactor: 3,
141+
downAlphas: 2,
142+
negativeTestCase: true,
143+
description: "Three groups with 3 alphas each, shutdown 2 alphas per group",
144+
err: "failed to initiate pdir stream",
145+
},
146+
{
147+
name: "SingleGroupShutOneAlpha",
148+
bulkAlphas: 1,
149+
targetAlphas: 3,
150+
replicasFactor: 3,
151+
downAlphas: 1,
152+
negativeTestCase: false,
153+
description: "Single group with multiple alphas, shutdown 1 alpha",
154+
err: "",
155+
},
156+
{
157+
name: "TwoGroupShutOneAlphaPerGroup",
158+
bulkAlphas: 2,
159+
targetAlphas: 6,
160+
replicasFactor: 3,
161+
downAlphas: 1,
162+
negativeTestCase: false,
163+
description: "Multiple groups with multiple alphas, shutdown 2 alphas per group",
164+
err: "",
165+
},
166+
{
167+
name: "ThreeGroupShutOneAlphaPerGroup",
168+
bulkAlphas: 3,
169+
targetAlphas: 9,
170+
replicasFactor: 3,
171+
downAlphas: 1,
172+
negativeTestCase: false,
173+
description: "Three groups with 3 alphas each, shutdown 1 alpha per group",
174+
err: "",
175+
},
176+
{
177+
name: "SingleGroupAllAlphasOnline",
178+
bulkAlphas: 1,
179+
targetAlphas: 3,
180+
replicasFactor: 3,
181+
downAlphas: 0,
182+
negativeTestCase: false,
183+
description: "Single group with multiple alphas, all alphas are online",
184+
err: "",
185+
},
186+
{
187+
name: "TwoGroupAllAlphasOnline",
188+
bulkAlphas: 2,
189+
targetAlphas: 6,
190+
replicasFactor: 3,
191+
downAlphas: 0,
192+
negativeTestCase: false,
193+
description: "Multiple groups with multiple alphas, all alphas are online",
194+
err: "",
195+
},
196+
{
197+
name: "ThreeGroupAllAlphasOnline",
198+
bulkAlphas: 3,
199+
targetAlphas: 9,
200+
replicasFactor: 3,
201+
downAlphas: 0,
202+
negativeTestCase: false,
203+
description: "Three groups with 3 alphas each, all alphas are online",
204+
err: "",
205+
},
102206
}
103207

104208
for _, tt := range tests {
105209
t.Run(tt.name, func(t *testing.T) {
106-
runImportTest(t, tt.bulkAlphas, tt.targetAlphas, tt.replicasFactor)
210+
if tt.negativeTestCase {
211+
t.Logf("Running negative test case: %s", tt.description)
212+
} else {
213+
t.Logf("Running test case: %s", tt.description)
214+
}
215+
runImportTest(t, tt.bulkAlphas, tt.targetAlphas, tt.replicasFactor, tt.downAlphas, tt.negativeTestCase, tt.err)
107216
})
108217
}
109218
}
110219

111-
func runImportTest(t *testing.T, bulkAlphas, targetAlphas, replicasFactor int) {
220+
func runImportTest(t *testing.T, bulkAlphas, targetAlphas, replicasFactor, numDownAlphas int, negative bool, errStr string) {
112221
bulkCluster, baseDir := setupBulkCluster(t, bulkAlphas)
113222
defer func() { bulkCluster.Cleanup(t.Failed()) }()
114223

115224
targetCluster, gc, gcCleanup := setupTargetCluster(t, targetAlphas, replicasFactor)
116225
defer func() { targetCluster.Cleanup(t.Failed()) }()
117226
defer gcCleanup()
118227

228+
_, err := gc.Query("schema{}")
229+
require.NoError(t, err)
230+
119231
url, err := targetCluster.GetAlphaGrpcEndpoint(0)
120232
require.NoError(t, err)
121233
outDir := filepath.Join(baseDir, "out")
122234

235+
// Get group information for all alphas
236+
_, cleanup, err := targetCluster.Client()
237+
require.NoError(t, err)
238+
defer cleanup()
239+
240+
// Get health status for all instances
241+
hc, err := targetCluster.HTTPClient()
242+
require.NoError(t, err)
243+
244+
healthResp, err := hc.HealthForInstance()
245+
require.NoError(t, err)
246+
247+
// Parse health response to get group information
248+
var guardianResp struct {
249+
Health []struct {
250+
Instance string
251+
Address string
252+
LastEcho int64
253+
Status string
254+
Version string
255+
UpTime int64
256+
Group string
257+
}
258+
}
259+
require.NoError(t, json.Unmarshal(healthResp, &guardianResp))
260+
261+
// Group alphas by their group number
262+
alphaGroups := make(map[string][]int)
263+
for _, h := range guardianResp.Health {
264+
if strings.Contains(h.Instance, "zero") || strings.Contains(h.Address, "alpha0") {
265+
continue
266+
}
267+
// Extract alpha number from address format like "alpha2:7080"
268+
alphaNum := strings.TrimPrefix(h.Address, "alpha")
269+
alphaNum = strings.TrimSuffix(alphaNum, ":7080")
270+
271+
alphaID, err := strconv.Atoi(alphaNum)
272+
require.NoError(t, err)
273+
alphaGroups[h.Group] = append(alphaGroups[h.Group], alphaID)
274+
}
275+
276+
// Shutdown specified number of alphas from each group
277+
for group, alphas := range alphaGroups {
278+
for i := 0; i < numDownAlphas; i++ {
279+
alphaID := alphas[i]
280+
t.Logf("Shutting down alpha %d from group %s", alphaID, group)
281+
require.NoError(t, targetCluster.StopAlpha(alphaID))
282+
}
283+
}
284+
285+
if negative {
286+
err := Import(context.Background(), url, grpc.WithTransportCredentials(insecure.NewCredentials()), outDir)
287+
require.Error(t, err)
288+
fmt.Println("Error: ", err)
289+
require.ErrorContains(t, err, errStr)
290+
return
291+
}
292+
123293
require.NoError(t, Import(context.Background(), url,
124294
grpc.WithTransportCredentials(insecure.NewCredentials()), outDir))
125295

126-
verifyImportResults(t, gc)
296+
for group, alphas := range alphaGroups {
297+
for i := 0; i < numDownAlphas; i++ {
298+
alphaID := alphas[i]
299+
t.Logf("Shutting down alpha %v from group %v", alphaID, group)
300+
require.NoError(t, targetCluster.StartAlpha(alphaID))
301+
}
302+
}
303+
304+
require.NoError(t, targetCluster.HealthCheck(false))
305+
306+
fmt.Println("Import completed")
307+
308+
for i := 0; i < targetAlphas; i++ {
309+
gc, cleanup, err := targetCluster.AlphaClient(i)
310+
require.NoError(t, err)
311+
defer cleanup()
312+
verifyImportResults(t, gc)
313+
}
127314
}
128315

129316
// setupBulkCluster creates and configures a cluster for bulk loading data
@@ -156,7 +343,7 @@ func setupBulkCluster(t *testing.T, numAlphas int) (*dgraphtest.LocalCluster, st
156343
func setupTargetCluster(t *testing.T, numAlphas, replicasFactor int) (*dgraphtest.LocalCluster, *dgraphapi.GrpcClient, func()) {
157344
conf := dgraphtest.NewClusterConfig().
158345
WithNumAlphas(numAlphas).
159-
WithNumZeros(1).
346+
WithNumZeros(3).
160347
WithReplicas(replicasFactor)
161348

162349
cluster, err := dgraphtest.NewLocalCluster(conf)

0 commit comments

Comments
 (0)