Skip to content

Commit 24cb3f3

Browse files
feat: add import api support for multiple groups with a single alphas (#9381)
This PR adds import APIs for multiple shard groups of Dgraph Alpha, but it does not support HA clusters or multi-shard HA clusters. It only works with a single Alpha or a multi-shard single Alpha cluster.
1 parent b74a1ce commit 24cb3f3

19 files changed

Lines changed: 2150 additions & 1167 deletions

File tree

check_upgrade/check_upgrade_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestCheckUpgrade(t *testing.T) {
7070
require.NoError(t, err)
7171
defer func() { c1.Cleanup(t.Failed()) }()
7272
require.NoError(t, c1.Start())
73-
alphaHttp, err := c.GetAlphaHttpPublicPort()
73+
alphaHttp, err := c.GetAlphaHttpPublicPort(0)
7474
require.NoError(t, err)
7575

7676
args := []string{

dgraph/cmd/alpha/run_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1640,14 +1640,14 @@ func TestMain(m *testing.M) {
16401640

16411641
x.Panic(dg.Login(context.Background(), dgraphapi.DefaultUser, dgraphapi.DefaultPassword))
16421642

1643-
alphaGrpcPort, err := c.GetAlphaGrpcPublicPort()
1643+
alphaGrpcPort, err := c.GetAlphaGrpcPublicPort(0)
16441644
x.Panic(err)
16451645

16461646
alphaSockAdd = "0.0.0.0:" + alphaGrpcPort
1647-
alphaSockAddHttp, err := c.GetAlphaHttpPublicPort()
1647+
alphaSockAddHttp, err := c.GetAlphaHttpPublicPort(0)
16481648
x.Panic(err)
16491649
addr = "http://0.0.0.0:" + alphaSockAddHttp
1650-
zeroSockAdd, err := c.GetZeroGrpcPublicPort()
1650+
zeroSockAdd, err := c.GetZeroGrpcPublicPort(0)
16511651
x.Panic(err)
16521652

16531653
// Increment lease, so that mutations work.
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* SPDX-FileCopyrightText: © Hypermode Inc. <[email protected]>
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package dgraphimport
7+
8+
import (
9+
"context"
10+
"errors"
11+
"fmt"
12+
"io"
13+
"math"
14+
"os"
15+
"path/filepath"
16+
17+
"github.com/dgraph-io/badger/v4"
18+
apiv25 "github.com/dgraph-io/dgo/v250/protos/api.v25"
19+
"github.com/dgraph-io/ristretto/v2/z"
20+
21+
"github.com/golang/glog"
22+
"golang.org/x/sync/errgroup"
23+
"google.golang.org/grpc"
24+
)
25+
26+
// newClient creates a new import client with the specified endpoint and gRPC options.
27+
func newClient(endpoint string, opts grpc.DialOption) (apiv25.DgraphClient, error) {
28+
conn, err := grpc.NewClient(endpoint, opts)
29+
if err != nil {
30+
return nil, fmt.Errorf("failed to connect to endpoint [%s]: %w", endpoint, err)
31+
}
32+
33+
glog.Infof("Successfully connected to Dgraph endpoint: %s", endpoint)
34+
return apiv25.NewDgraphClient(conn), nil
35+
}
36+
37+
func Import(ctx context.Context, endpoint string, opts grpc.DialOption, bulkOutDir string) error {
38+
dg, err := newClient(endpoint, opts)
39+
if err != nil {
40+
return err
41+
}
42+
resp, err := startPDirStream(ctx, dg)
43+
if err != nil {
44+
return err
45+
}
46+
47+
return sendPDir(ctx, dg, bulkOutDir, resp.Groups)
48+
}
49+
50+
// startPDirStream initiates a snapshot stream session with the Dgraph server.
51+
func startPDirStream(ctx context.Context, dc apiv25.DgraphClient) (*apiv25.InitiatePDirStreamResponse, error) {
52+
glog.Info("Initiating pdir stream")
53+
req := &apiv25.InitiatePDirStreamRequest{}
54+
resp, err := dc.InitiatePDirStream(ctx, req)
55+
if err != nil {
56+
glog.Errorf("failed to initiate pdir stream: %v", err)
57+
return nil, fmt.Errorf("failed to initiate pdir stream: %v", err)
58+
}
59+
glog.Info("Pdir stream initiated successfully")
60+
return resp, nil
61+
}
62+
63+
// sendPDir takes a p directory and a set of group IDs and streams the data from the
64+
// p directory to the corresponding group IDs. It first scans the provided directory for
65+
// subdirectories named with numeric group IDs.
66+
func sendPDir(ctx context.Context, dg apiv25.DgraphClient, baseDir string, groups []uint32) error {
67+
glog.Infof("Starting to stream pdir from directory: %s", baseDir)
68+
69+
errG, ctx := errgroup.WithContext(ctx)
70+
for _, group := range groups {
71+
group := group
72+
errG.Go(func() error {
73+
pDir := filepath.Join(baseDir, fmt.Sprintf("%d", group-1), "p")
74+
if _, err := os.Stat(pDir); err != nil {
75+
return fmt.Errorf("p directory does not exist for group [%d]: [%s]", group, pDir)
76+
}
77+
78+
glog.Infof("Streaming data for group [%d] from directory: [%s]", group, pDir)
79+
if err := streamData(ctx, dg, pDir, group); err != nil {
80+
glog.Errorf("Failed to stream data for groups [%v] from directory: [%s]: %v", group, pDir, err)
81+
return err
82+
}
83+
84+
return nil
85+
})
86+
}
87+
if err := errG.Wait(); err != nil {
88+
return err
89+
}
90+
91+
glog.Infof("Completed streaming all pdirs")
92+
return nil
93+
}
94+
95+
// streamData handles the actual data streaming process for a single group.
96+
// It opens the BadgerDB at the specified directory and streams all data to the server.
97+
func streamData(ctx context.Context, dg apiv25.DgraphClient, pdir string, groupId uint32) error {
98+
glog.Infof("Opening stream for group %d from directory %s", groupId, pdir)
99+
100+
// Initialize stream with the server
101+
out, err := dg.StreamPDir(ctx)
102+
if err != nil {
103+
return fmt.Errorf("failed to start pdir stream for group %d: %w", groupId, err)
104+
}
105+
106+
// Open the BadgerDB instance at the specified directory
107+
opt := badger.DefaultOptions(pdir)
108+
ps, err := badger.OpenManaged(opt)
109+
if err != nil {
110+
return fmt.Errorf("failed to open BadgerDB at [%s]: %w", pdir, err)
111+
}
112+
113+
defer func() {
114+
if err := ps.Close(); err != nil {
115+
glog.Warningf("Error closing BadgerDB: %v", err)
116+
}
117+
}()
118+
119+
// Send group ID as the first message in the stream
120+
glog.Infof("Sending group ID [%d] to server", groupId)
121+
groupReq := &apiv25.StreamPDirRequest{GroupId: groupId}
122+
if err := out.Send(groupReq); err != nil {
123+
return fmt.Errorf("failed to send group ID [%d]: %w", groupId, err)
124+
}
125+
126+
// Configure and start the BadgerDB stream
127+
glog.Infof("Starting BadgerDB stream for group [%d]", groupId)
128+
stream := ps.NewStreamAt(math.MaxUint64)
129+
stream.LogPrefix = fmt.Sprintf("Sending P dir for group [%d]", groupId)
130+
stream.KeyToList = nil
131+
stream.Send = func(buf *z.Buffer) error {
132+
p := &apiv25.StreamPacket{Data: buf.Bytes()}
133+
if err := out.Send(&apiv25.StreamPDirRequest{StreamPacket: p}); err != nil && !errors.Is(err, io.EOF) {
134+
return fmt.Errorf("failed to send data chunk: %w", err)
135+
}
136+
return nil
137+
}
138+
139+
// Execute the stream process
140+
if err := stream.Orchestrate(ctx); err != nil {
141+
return fmt.Errorf("stream orchestration failed for group [%d]: %w", groupId, err)
142+
}
143+
144+
// Send the final 'done' signal to mark completion
145+
glog.Infof("Sending completion signal for group [%d]", groupId)
146+
done := &apiv25.StreamPacket{Done: true}
147+
148+
if err := out.Send(&apiv25.StreamPDirRequest{StreamPacket: done}); err != nil && !errors.Is(err, io.EOF) {
149+
return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err)
150+
}
151+
// Wait for acknowledgment from the server
152+
if _, err := out.CloseAndRecv(); err != nil {
153+
return fmt.Errorf("failed to receive ACK for group [%d]: %w", groupId, err)
154+
}
155+
glog.Infof("Group [%d]: Received ACK ", groupId)
156+
157+
return nil
158+
}

0 commit comments

Comments
 (0)