Skip to content

Commit 96c5e77

Browse files
add import api support for multiple groups with a single alphs
1 parent e648e77 commit 96c5e77

18 files changed

Lines changed: 1853 additions & 784 deletions

File tree

check_upgrade/check_upgrade_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestCheckUpgrade(t *testing.T) {
7070
require.NoError(t, err)
7171
defer func() { c1.Cleanup(t.Failed()) }()
7272
require.NoError(t, c1.Start())
73-
alphaHttp, err := c.GetAlphaHttpPublicPort()
73+
alphaHttp, err := c.GetAlphaHttpPublicPort(0)
7474
require.NoError(t, err)
7575

7676
args := []string{

dgraph/cmd/alpha/run_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1640,14 +1640,14 @@ func TestMain(m *testing.M) {
16401640

16411641
x.Panic(dg.Login(context.Background(), dgraphapi.DefaultUser, dgraphapi.DefaultPassword))
16421642

1643-
alphaGrpcPort, err := c.GetAlphaGrpcPublicPort()
1643+
alphaGrpcPort, err := c.GetAlphaGrpcPublicPort(0)
16441644
x.Panic(err)
16451645

16461646
alphaSockAdd = "0.0.0.0:" + alphaGrpcPort
1647-
alphaSockAddHttp, err := c.GetAlphaHttpPublicPort()
1647+
alphaSockAddHttp, err := c.GetAlphaHttpPublicPort(0)
16481648
x.Panic(err)
16491649
addr = "http://0.0.0.0:" + alphaSockAddHttp
1650-
zeroSockAdd, err := c.GetZeroGrpcPublicPort()
1650+
zeroSockAdd, err := c.GetZeroGrpcPublicPort(0)
16511651
x.Panic(err)
16521652

16531653
// Increment lease, so that mutations work.
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* SPDX-FileCopyrightText: © Hypermode Inc. <[email protected]>
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package dgraphimport
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"io"
12+
"math"
13+
"os"
14+
"path/filepath"
15+
16+
"github.com/dgraph-io/badger/v4"
17+
apiv25 "github.com/dgraph-io/dgo/v250/protos/api.v25"
18+
"github.com/dgraph-io/ristretto/v2/z"
19+
20+
"github.com/golang/glog"
21+
"golang.org/x/sync/errgroup"
22+
"google.golang.org/grpc"
23+
)
24+
25+
// NewClient creates a new import client with the specified endpoint and gRPC options.
26+
func newClient(endpoint string, opts grpc.DialOption) (apiv25.DgraphClient, error) {
27+
conn, err := grpc.NewClient(endpoint, opts)
28+
if err != nil {
29+
return nil, fmt.Errorf("failed to connect to endpoint [%s]: %w", endpoint, err)
30+
}
31+
32+
glog.Infof("Successfully connected to Dgraph endpoint: %s", endpoint)
33+
return apiv25.NewDgraphClient(conn), nil
34+
}
35+
36+
func Import(ctx context.Context, endpoint string, opts grpc.DialOption, bulkOutDir string) error {
37+
dg, err := newClient(endpoint, opts)
38+
if err != nil {
39+
return err
40+
}
41+
resp, err := startSnapshotStream(ctx, dg)
42+
if err != nil {
43+
return err
44+
}
45+
46+
return sendSnapshot(ctx, dg, bulkOutDir, resp.Groups)
47+
}
48+
49+
// StartSnapshotStream initiates a snapshot stream session with the Dgraph server.
50+
func startSnapshotStream(ctx context.Context, dc apiv25.DgraphClient) (*apiv25.InitiateSnapshotStreamResponse, error) {
51+
glog.V(2).Infof("Initiating snapshot stream")
52+
req := &apiv25.InitiateSnapshotStreamRequest{}
53+
resp, err := dc.InitiateSnapshotStream(ctx, req)
54+
if err != nil {
55+
glog.Errorf("Failed to initiate snapshot stream: %v", err)
56+
return nil, err
57+
}
58+
glog.Infof("Snapshot stream initiated successfully")
59+
return resp, nil
60+
}
61+
62+
// SendSnapshot takes a p directory and a set of group IDs and streams the data from the
63+
// p directory to the corresponding group IDs. It first scans the provided directory for
64+
// subdirectories named with numeric group IDs.
65+
func sendSnapshot(ctx context.Context, dg apiv25.DgraphClient, baseDir string, groups []uint32) error {
66+
glog.Infof("Starting to stream snapshots from directory: %s", baseDir)
67+
68+
errG, ctx := errgroup.WithContext(ctx)
69+
for _, group := range groups {
70+
group := group
71+
errG.Go(func() error {
72+
pDir := filepath.Join(baseDir, fmt.Sprintf("%d", group-1), "p")
73+
if _, err := os.Stat(pDir); err != nil {
74+
return fmt.Errorf("p directory does not exist for group %d: %s", group, pDir)
75+
}
76+
77+
glog.Infof("Streaming data for group %d from directory: %s", group, pDir)
78+
if err := streamData(ctx, dg, pDir, group); err != nil {
79+
return err
80+
}
81+
82+
return nil
83+
})
84+
}
85+
if err := errG.Wait(); err != nil {
86+
return err
87+
}
88+
89+
glog.Infof("Completed streaming all snapshots")
90+
return nil
91+
}
92+
93+
// streamData handles the actual data streaming process for a single group.
94+
// It opens the BadgerDB at the specified directory and streams all data to the server.
95+
func streamData(ctx context.Context, dg apiv25.DgraphClient, pdir string, groupId uint32) error {
96+
glog.Infof("Opening stream for group %d from directory %s", groupId, pdir)
97+
98+
// Initialize stream with the server
99+
out, err := dg.StreamSnapshot(ctx)
100+
if err != nil {
101+
return fmt.Errorf("failed to start snapshot stream: %w", err)
102+
}
103+
104+
// Open the BadgerDB instance at the specified directory
105+
opt := badger.DefaultOptions(pdir)
106+
ps, err := badger.OpenManaged(opt)
107+
if err != nil {
108+
return fmt.Errorf("failed to open BadgerDB at %s: %w", pdir, err)
109+
}
110+
defer ps.Close()
111+
112+
// Send group ID as the first message in the stream
113+
glog.V(2).Infof("Sending group ID %d to server", groupId)
114+
groupReq := &apiv25.StreamSnapshotRequest{GroupId: groupId}
115+
if err := out.Send(groupReq); err != nil {
116+
return fmt.Errorf("failed to send group ID %d: %w", groupId, err)
117+
}
118+
119+
// Configure and start the BadgerDB stream
120+
glog.V(2).Infof("Starting BadgerDB stream for group %d", groupId)
121+
stream := ps.NewStreamAt(math.MaxUint64)
122+
stream.LogPrefix = fmt.Sprintf("Sending P dir (group %d)", groupId)
123+
stream.KeyToList = nil
124+
stream.Send = func(buf *z.Buffer) error {
125+
kvs := &apiv25.KVS{Data: buf.Bytes()}
126+
if err := out.Send(&apiv25.StreamSnapshotRequest{Pairs: kvs}); err != nil && err != io.EOF {
127+
return fmt.Errorf("failed to send data chunk: %w", err)
128+
}
129+
return nil
130+
}
131+
132+
// Execute the stream process
133+
if err := stream.Orchestrate(ctx); err != nil {
134+
return fmt.Errorf("stream orchestration failed for group %d: %w", groupId, err)
135+
}
136+
137+
// Send the final 'done' signal to mark completion
138+
glog.V(2).Infof("Sending completion signal for group %d", groupId)
139+
done := &apiv25.KVS{Done: true}
140+
141+
if err := out.Send(&apiv25.StreamSnapshotRequest{Pairs: done}); err != nil && err != io.EOF {
142+
return fmt.Errorf("failed to send 'done' signal for group %d: %w", groupId, err)
143+
}
144+
// Wait for acknowledgment from the server
145+
ack, err := out.CloseAndRecv()
146+
if err != nil {
147+
return fmt.Errorf("failed to receive ACK for group %d: %w", groupId, err)
148+
}
149+
glog.Infof("Group %d: Received ACK with message: %v", groupId, ack.Done)
150+
151+
return nil
152+
}

0 commit comments

Comments
 (0)