Skip to content

Commit 6457b26

Browse files
add import api support for multiple groups with a single alphs
1 parent 7546846 commit 6457b26

18 files changed

Lines changed: 1797 additions & 545 deletions

File tree

check_upgrade/check_upgrade_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func TestCheckUpgrade(t *testing.T) {
6969
require.NoError(t, err)
7070
defer func() { c1.Cleanup(t.Failed()) }()
7171
require.NoError(t, c1.Start())
72-
alphaHttp, err := c.GetAlphaHttpPublicPort()
72+
alphaHttp, err := c.GetAlphaHttpPublicPort(0)
7373
require.NoError(t, err)
7474

7575
args := []string{

dgraph/cmd/alpha/run_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1644,7 +1644,7 @@ func TestMain(m *testing.M) {
16441644
x.Panic(err)
16451645

16461646
alphaSockAdd = "0.0.0.0:" + alphaGrpcPort
1647-
alphaSockAddHttp, err := c.GetAlphaHttpPublicPort()
1647+
alphaSockAddHttp, err := c.GetAlphaHttpPublicPort(0)
16481648
x.Panic(err)
16491649
addr = "http://0.0.0.0:" + alphaSockAddHttp
16501650
zeroSockAdd, err := c.GetZeroGrpcPublicPort()
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/*
2+
* SPDX-FileCopyrightText: © Hypermode Inc. <[email protected]>
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package dgraphimport
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"math"
12+
"os"
13+
"path/filepath"
14+
"strconv"
15+
16+
"github.com/dgraph-io/badger/v4"
17+
apiv25 "github.com/dgraph-io/dgo/v250/protos/api.v25"
18+
"github.com/dgraph-io/ristretto/v2/z"
19+
"github.com/golang/glog"
20+
"google.golang.org/grpc"
21+
)
22+
23+
// Client represents a Dgraph import client that handles snapshot streaming.
24+
type Client struct {
25+
opts grpc.DialOption
26+
dg apiv25.DgraphClient
27+
}
28+
29+
// NewClient creates a new import client with the specified endpoint and gRPC options.
30+
func NewClient(endpoint string, opts grpc.DialOption) (*Client, error) {
31+
conn, err := grpc.NewClient(endpoint, opts)
32+
if err != nil {
33+
return nil, fmt.Errorf("failed to connect to endpoint [%s]: %w", endpoint, err)
34+
}
35+
36+
glog.Infof("Successfully connected to Dgraph endpoint: %s", endpoint)
37+
return &Client{dg: apiv25.NewDgraphClient(conn), opts: opts}, nil
38+
}
39+
40+
// StartSnapshotStream initiates a snapshot stream session with the Dgraph server.
41+
func (c *Client) StartSnapshotStream(ctx context.Context) (*apiv25.InitiateSnapshotStreamResponse, error) {
42+
glog.V(2).Infof("Initiating snapshot stream")
43+
req := &apiv25.InitiateSnapshotStreamRequest{}
44+
resp, err := c.dg.InitiateSnapshotStream(ctx, req)
45+
if err != nil {
46+
glog.Errorf("Failed to initiate snapshot stream: %v", err)
47+
return nil, err
48+
}
49+
glog.Infof("Snapshot stream initiated successfully")
50+
return resp, nil
51+
}
52+
53+
// SendSnapshot takes a p directory and a set of group IDs and streams the data from the
54+
// p directory to the corresponding group IDs. The function will skip any groups that do not
55+
// have a corresponding p directory.
56+
func (c *Client) SendSnapshot(ctx context.Context, pDir string, groups []uint32) error {
57+
glog.Infof("Starting to stream snapshots from directory: %s", pDir)
58+
59+
// Get mapping of group IDs to their respective p directories
60+
groupDirs, err := scanPDirs(pDir)
61+
if err != nil {
62+
glog.Errorf("Error getting p directories: %v", err)
63+
return fmt.Errorf("error getting p directories: %w", err)
64+
}
65+
66+
// Process each group in the provided list
67+
for _, group := range groups {
68+
pDir, exists := groupDirs[group-1]
69+
if !exists {
70+
glog.Warningf("No p directory found for group %d, skipping...", group)
71+
continue
72+
}
73+
74+
if _, err := os.Stat(pDir); os.IsNotExist(err) {
75+
glog.Warningf("P directory does not exist: %s, skipping...", pDir)
76+
continue
77+
}
78+
79+
glog.Infof("Streaming data for group %d from directory: %s", group, pDir)
80+
err = streamData(ctx, c.dg, pDir, group)
81+
if err != nil {
82+
glog.Errorf("Failed to stream snapshot for group %d: %v", group, err)
83+
return err
84+
}
85+
glog.Infof("Successfully streamed snapshot for group %d", group)
86+
}
87+
88+
glog.Infof("Completed streaming all snapshots")
89+
return nil
90+
}
91+
92+
// streamData handles the actual data streaming process for a single group.
93+
// It opens the BadgerDB at the specified directory and streams all data to the server.
94+
func streamData(ctx context.Context, dg apiv25.DgraphClient, pdir string, groupId uint32) error {
95+
glog.Infof("Opening stream for group %d from directory %s", groupId, pdir)
96+
97+
// Initialize stream with the server
98+
out, err := dg.StreamSnapshot(ctx)
99+
if err != nil {
100+
return fmt.Errorf("failed to start snapshot stream: %w", err)
101+
}
102+
103+
// Open the BadgerDB instance at the specified directory
104+
opt := badger.DefaultOptions(pdir)
105+
ps, err := badger.OpenManaged(opt)
106+
if err != nil {
107+
return fmt.Errorf("failed to open BadgerDB at %s: %w", pdir, err)
108+
}
109+
defer ps.Close()
110+
111+
// Send group ID as the first message in the stream
112+
glog.V(2).Infof("Sending group ID %d to server", groupId)
113+
groupReq := &apiv25.StreamSnapshotRequest{
114+
GroupId: groupId,
115+
}
116+
if err := out.Send(groupReq); err != nil {
117+
return fmt.Errorf("failed to send group ID %d: %w", groupId, err)
118+
}
119+
120+
// Configure and start the BadgerDB stream
121+
glog.V(2).Infof("Starting BadgerDB stream for group %d", groupId)
122+
stream := ps.NewStreamAt(math.MaxUint64)
123+
stream.LogPrefix = fmt.Sprintf("Sending P dir (group %d)", groupId)
124+
stream.KeyToList = nil
125+
stream.Send = func(buf *z.Buffer) error {
126+
kvs := &apiv25.KVS{Data: buf.Bytes()}
127+
if err := out.Send(&apiv25.StreamSnapshotRequest{
128+
Pairs: kvs}); err != nil {
129+
return fmt.Errorf("failed to send data chunk: %w", err)
130+
}
131+
return nil
132+
}
133+
134+
// Execute the stream process
135+
if err := stream.Orchestrate(ctx); err != nil {
136+
return fmt.Errorf("stream orchestration failed for group %d: %w", groupId, err)
137+
}
138+
139+
// Send the final 'done' signal to mark completion
140+
glog.V(2).Infof("Sending completion signal for group %d", groupId)
141+
done := &apiv25.KVS{
142+
Done: true,
143+
}
144+
145+
if err := out.Send(&apiv25.StreamSnapshotRequest{
146+
Pairs: done}); err != nil {
147+
return fmt.Errorf("failed to send 'done' signal for group %d: %w", groupId, err)
148+
}
149+
150+
// Wait for acknowledgment from the server
151+
ack, err := out.CloseAndRecv()
152+
if err != nil {
153+
return fmt.Errorf("failed to receive ACK for group %d: %w", groupId, err)
154+
}
155+
glog.Infof("Group %d: Received ACK with message: %v", groupId, ack.Done)
156+
157+
return nil
158+
}
159+
160+
// scanPDirs scans the base path and returns a mapping of group IDs to their
161+
// corresponding p directory paths. It looks for numbered subdirectories which contain a "p" folder.
162+
func scanPDirs(basePath string) (map[uint32]string, error) {
163+
glog.V(2).Infof("Scanning for p directories in %s", basePath)
164+
groupDirs := make(map[uint32]string)
165+
166+
entries, err := os.ReadDir(basePath)
167+
if err != nil {
168+
return nil, fmt.Errorf("failed to read directory %s: %w", basePath, err)
169+
}
170+
171+
for _, entry := range entries {
172+
if entry.IsDir() {
173+
groupID, err := strconv.ParseUint(entry.Name(), 10, 32)
174+
if err == nil {
175+
pDir := filepath.Join(basePath, entry.Name(), "p")
176+
if _, err := os.Stat(pDir); err == nil {
177+
groupDirs[uint32(groupID)] = pDir
178+
glog.V(2).Infof("Found p directory for group %d: %s", groupID, pDir)
179+
}
180+
}
181+
}
182+
}
183+
184+
glog.Infof("Found %d group directories", len(groupDirs))
185+
return groupDirs, nil
186+
}

0 commit comments

Comments
 (0)