Skip to content

Commit 12c6837

Browse files
resolve review comments
1 parent c4105f3 commit 12c6837

4 files changed

Lines changed: 72 additions & 57 deletions

File tree

dgraph/cmd/bulk/loader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type options struct {
4343
DataFormat string
4444
SchemaFile string
4545
GqlSchemaFile string
46-
outDirNoMount string
46+
OutDir string
4747
ReplaceOutDir bool
4848
TmpDir string
4949
NumGoroutines int

dgraph/cmd/bulk/run.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func run() {
141141
GqlSchemaFile: Bulk.Conf.GetString("graphql_schema"),
142142
Encrypted: Bulk.Conf.GetBool("encrypted"),
143143
EncryptedOut: Bulk.Conf.GetBool("encrypted_out"),
144-
outDirNoMount: Bulk.Conf.GetString("out"),
144+
OutDir: Bulk.Conf.GetString("out"),
145145
ReplaceOutDir: Bulk.Conf.GetBool("replace_out"),
146146
TmpDir: Bulk.Conf.GetString("tmp"),
147147
NumGoroutines: Bulk.Conf.GetInt("num_go_routines"),
@@ -259,8 +259,8 @@ func run() {
259259

260260
// Make sure it's OK to create or replace the directory specified with the --out option.
261261
// It is always OK to create or replace the default output directory.
262-
if opt.outDirNoMount != defaultOutDir && !opt.ReplaceOutDir {
263-
err := x.IsMissingOrEmptyDir(opt.outDirNoMount)
262+
if opt.OutDir != defaultOutDir && !opt.ReplaceOutDir {
263+
err := x.IsMissingOrEmptyDir(opt.OutDir)
264264
if err == nil {
265265
fmt.Fprintf(os.Stderr, "Output directory exists and is not empty."+
266266
" Use --replace_out to overwrite it.\n")
@@ -271,9 +271,9 @@ func run() {
271271
}
272272

273273
// Delete and recreate the output dirs to ensure they are empty.
274-
x.Check(os.RemoveAll(opt.outDirNoMount))
274+
x.Check(os.RemoveAll(opt.OutDir))
275275
for i := range opt.ReduceShards {
276-
dir := filepath.Join(opt.outDirNoMount, strconv.Itoa(i), "p")
276+
dir := filepath.Join(opt.OutDir, strconv.Itoa(i), "p")
277277
x.Check(os.MkdirAll(dir, 0700))
278278
opt.shardOutputDirs = append(opt.shardOutputDirs, dir)
279279

dgraph/cmd/dgraphimport/import_client.go

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/dgraph-io/badger/v4"
1717
apiv25 "github.com/dgraph-io/dgo/v250/protos/api.v25"
1818
"github.com/dgraph-io/ristretto/v2/z"
19+
1920
"github.com/golang/glog"
2021
"google.golang.org/grpc"
2122
)
@@ -58,8 +59,8 @@ func startSnapshotStream(ctx context.Context, dg apiv25.DgraphClient) (*apiv25.I
5859
}
5960

6061
// SendSnapshot takes a p directory and a set of group IDs and streams the data from the
61-
// p directory to the corresponding group IDs. The function will skip any groups that do not
62-
// have a corresponding p directory.
62+
// p directory to the corresponding group IDs. It first scans the provided directory for
63+
// subdirectories named with numeric group IDs.
6364
func sendSnapshot(ctx context.Context, dg apiv25.DgraphClient, pDir string, groups []uint32) error {
6465
glog.Infof("Starting to stream snapshots from directory: %s", pDir)
6566

@@ -74,18 +75,11 @@ func sendSnapshot(ctx context.Context, dg apiv25.DgraphClient, pDir string, grou
7475
for _, group := range groups {
7576
pDir, exists := groupDirs[group-1]
7677
if !exists {
77-
glog.Warningf("No p directory found for group %d, skipping...", group)
78-
continue
79-
}
80-
81-
if _, err := os.Stat(pDir); os.IsNotExist(err) {
82-
glog.Warningf("P directory does not exist: %s, skipping...", pDir)
83-
continue
78+
return fmt.Errorf("No p directory found for group %d, skipping...", group)
8479
}
8580

8681
glog.Infof("Streaming data for group %d from directory: %s", group, pDir)
87-
err = streamData(ctx, dg, pDir, group)
88-
if err != nil {
82+
if err := streamData(ctx, dg, pDir, group); err != nil {
8983
glog.Errorf("Failed to stream snapshot for group %d: %v", group, err)
9084
return err
9185
}
@@ -131,8 +125,7 @@ func streamData(ctx context.Context, dg apiv25.DgraphClient, pdir string, groupI
131125
stream.KeyToList = nil
132126
stream.Send = func(buf *z.Buffer) error {
133127
kvs := &apiv25.KVS{Data: buf.Bytes()}
134-
if err := out.Send(&apiv25.StreamSnapshotRequest{
135-
Pairs: kvs}); err != nil {
128+
if err := out.Send(&apiv25.StreamSnapshotRequest{Pairs: kvs}); err != nil {
136129
return fmt.Errorf("failed to send data chunk: %w", err)
137130
}
138131
return nil
@@ -164,8 +157,10 @@ func streamData(ctx context.Context, dg apiv25.DgraphClient, pdir string, groupI
164157
return nil
165158
}
166159

167-
// scanPDirs scans the base path and returns a mapping of group IDs to their
168-
// corresponding p directory paths. It looks for numbered subdirectories which contain a "p" folder.
160+
// scanPDirs scans the specified base directory for subdirectories named with numeric group IDs.
161+
// It looks for a "p" directory inside each group directory and maps the group ID to the path
162+
// of the "p" directory. If a "p" directory is not found or an error occurs, the function
163+
// returns an error.
169164
func scanPDirs(basePath string) (map[uint32]string, error) {
170165
glog.V(2).Infof("Scanning for p directories in %s", basePath)
171166
groupDirs := make(map[uint32]string)
@@ -183,6 +178,8 @@ func scanPDirs(basePath string) (map[uint32]string, error) {
183178
if _, err := os.Stat(pDir); err == nil {
184179
groupDirs[uint32(groupID)] = pDir
185180
glog.V(2).Infof("Found p directory for group %d: %s", groupID, pDir)
181+
} else {
182+
return nil, fmt.Errorf("P directory does not exist: %s, skipping...", pDir)
186183
}
187184
}
188185
}

worker/import.go

Lines changed: 54 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,13 @@ import (
1212

1313
apiv25 "github.com/dgraph-io/dgo/v250/protos/api.v25"
1414
"github.com/dgraph-io/ristretto/v2/z"
15-
"github.com/dustin/go-humanize"
16-
"github.com/golang/glog"
1715
"github.com/hypermodeinc/dgraph/v25/conn"
1816
"github.com/hypermodeinc/dgraph/v25/posting"
19-
2017
"github.com/hypermodeinc/dgraph/v25/protos/pb"
21-
2218
"github.com/hypermodeinc/dgraph/v25/schema"
19+
20+
"github.com/dustin/go-humanize"
21+
"github.com/golang/glog"
2322
"github.com/pkg/errors"
2423
)
2524

@@ -43,8 +42,7 @@ func ProposeDrain(ctx context.Context, drainMode *pb.Drainmode) error {
4342
}
4443
con := pl.Get()
4544
c := pb.NewWorkerClient(con)
46-
_, err := c.ApplyDrainmode(ctx, drainMode)
47-
if err != nil {
45+
if _, err := c.ApplyDrainmode(ctx, drainMode); err != nil {
4846
return err
4947
}
5048
}
@@ -54,18 +52,18 @@ func ProposeDrain(ctx context.Context, drainMode *pb.Drainmode) error {
5452

5553
// DoStreamPDir handles streaming of snapshots to a target group. It first checks the group
5654
// associated with the incoming stream and, if it's the same as the current node's group, it
57-
// flushes the data using FlushKvs1. If the group is different, it establishes a connection
55+
// flushes the data using FlushKvs. If the group is different, it establishes a connection
5856
// with the leader of that group and streams data to it. The function returns an error if
5957
// there are any issues in the process, such as a broken connection or failure to establish
6058
// a stream with the leader.
6159
func DoStreamPDir(stream apiv25.Dgraph_StreamSnapshotServer) error {
62-
groupId, err := checkGroup(stream)
60+
groupId, err := getGroup(stream)
6361
if err != nil {
6462
return err
6563
}
6664

6765
if groupId == groups().Node.gid {
68-
return FlushKvs(stream)
66+
return flushKvs(stream)
6967
}
7068

7169
pl := groups().Leader(groupId)
@@ -82,46 +80,67 @@ func DoStreamPDir(stream apiv25.Dgraph_StreamSnapshotServer) error {
8280

8381
return streamToAnotherGroup(stream, out)
8482
}
85-
8683
func streamToAnotherGroup(in apiv25.Dgraph_StreamSnapshotServer, out pb.Worker_StreamPDirClient) error {
8784
chan1 := make(chan *apiv25.StreamSnapshotRequest, 10)
85+
errCh := make(chan error, 1)
86+
ctx := in.Context()
8887

8988
go func() {
9089
defer close(chan1)
9190
for {
92-
msg, err := in.Recv()
93-
if err != nil {
94-
if err != io.EOF {
95-
glog.Errorf("Error receiving from in stream: %v", err)
96-
}
91+
select {
92+
case <-ctx.Done():
93+
glog.Infof("Context cancelled, stopping receive goroutine.")
94+
errCh <- fmt.Errorf("context deadline exceeded")
9795
return
96+
default:
97+
msg, err := in.Recv()
98+
if err != nil {
99+
if err != io.EOF {
100+
glog.Errorf("Error receiving from in stream: %v", err)
101+
errCh <- err
102+
}
103+
return
104+
}
105+
chan1 <- msg
98106
}
99-
chan1 <- msg
100107
}
101108
}()
102109

103110
size := 0
104111

105-
for msg := range chan1 {
106-
data := &pb.KVS{Data: msg.Pairs.Data}
112+
Loop:
113+
for {
114+
select {
115+
case err := <-errCh:
116+
return err
117+
118+
case msg, ok := <-chan1:
119+
if !ok {
120+
// Channel closed, exit loop
121+
break Loop
122+
}
123+
124+
data := &pb.KVS{Data: msg.Pairs.Data}
125+
126+
if msg.Pairs.Done {
127+
if err := out.Send(&pb.KVS{Done: true}); err != nil {
128+
glog.Errorf("Error sending 'done' to out stream: %v", err)
129+
return err
130+
}
131+
glog.Infoln("All key-values have been transferred.")
132+
break Loop
133+
}
107134

108-
if msg.Pairs.Done {
109-
if err := out.Send(&pb.KVS{Done: true}); err != nil {
110-
glog.Errorf("Error sending 'done' to out stream: %v", err)
135+
if err := out.Send(data); err != nil {
136+
glog.Errorf("Error sending to outstream: %v", err)
111137
return err
112138
}
113-
glog.Infoln("All key-values have been transferred.")
114-
break
115-
}
116139

117-
if err := out.Send(data); err != nil {
118-
glog.Errorf("Error sending to outstream: %v", err)
119-
return err
140+
size += len(msg.Pairs.Data)
141+
glog.Infof("Sent batch of size: %s. Total so far: %s\n",
142+
humanize.IBytes(uint64(len(msg.Pairs.Data))), humanize.IBytes(uint64(size)))
120143
}
121-
122-
size += len(msg.Pairs.Data)
123-
glog.Infof("Sent batch of size: %s. Total so far: %s\n",
124-
humanize.IBytes(uint64(len(msg.Pairs.Data))), humanize.IBytes(uint64(size)))
125144
}
126145

127146
// Close the incoming stream properly
@@ -136,14 +155,13 @@ func streamToAnotherGroup(in apiv25.Dgraph_StreamSnapshotServer, out pb.Worker_S
136155
}
137156

138157
glog.Infof("Received ACK with message: %v\n", ack.Done)
139-
140158
return nil
141159
}
142160

143-
// checkGroup receives the initial message from the stream and extracts the group ID.
161+
// getGroup receives the initial message from the stream and extracts the group ID.
144162
// It returns the group ID if successful, otherwise an error if there is an issue
145163
// receiving the message.
146-
func checkGroup(stream apiv25.Dgraph_StreamSnapshotServer) (uint32, error) {
164+
func getGroup(stream apiv25.Dgraph_StreamSnapshotServer) (uint32, error) {
147165
req, err := stream.Recv()
148166
if err != nil {
149167
return 0, fmt.Errorf("failed to receive initial stream message: %v", err)
@@ -152,9 +170,9 @@ func checkGroup(stream apiv25.Dgraph_StreamSnapshotServer) (uint32, error) {
152170
return req.GroupId, nil
153171
}
154172

155-
// FlushKvs receives the stream of data from the client and writes it to BadgerDB.
173+
// flushKvs receives the stream of data from the client and writes it to BadgerDB.
156174
// It also sends a streams the data to other nodes of the same group and reloads the schema from the DB.
157-
func FlushKvs(stream apiv25.Dgraph_StreamSnapshotServer) error {
175+
func flushKvs(stream apiv25.Dgraph_StreamSnapshotServer) error {
158176
var writer badgerWriter
159177
sw := pstore.NewStreamWriter()
160178
defer sw.Cancel()

0 commit comments

Comments
 (0)