Skip to content

Commit 570681d

Browse files
feat(bulk): add a "skip reduce" flag to the bulk loader (#9618)
**Description** This PR add a `--skip_reduce_phase` flag to the bulk loader. When supplied, the bulk loader stops after the map phase. Workflow: ```sh # start a dgraph zero (for uid and ts mgmt) dgraph bulk --skip_reduce_phase -f data.rdf.gz -s dql.schema --tmp tmp ``` Move your tmp folder, or not... ```sh dgraph bulk --skip_map_phase --tmp tmp --out out ``` Closes #9615 **Checklist** - [x] The PR title follows the [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/#summary) syntax, leading with `fix:`, `feat:`, `chore:`, `ci:`, etc. - [x] Code compiles correctly and linting (via trunk) passes locally - [x] Tests added for new functionality, or regression tests for bug fixes added as applicable - [ ] For public APIs, new features, etc., a PR on the [docs repo](https://github.com/dgraph-io/dgraph-docs) staged and linked here. This process can be simplified by going to the [public docs site](https://docs.dgraph.io/) and clicking the "Edit this page" button at the bottom of page(s) relevant to your changes. Ensure that you indicate in the PR that this is an **unreleased** feature so that it does not get merged into the main docs prematurely.
1 parent 4120211 commit 570681d

4 files changed

Lines changed: 181 additions & 46 deletions

File tree

dgraph/cmd/bulk/loader.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ type BulkOptions struct {
5151
MapBufSize uint64
5252
PartitionBufSize int64
5353
SkipMapPhase bool
54+
SkipReducePhase bool
5455
CleanupTmp bool
5556
NumReducers int
5657
Version bool
@@ -144,7 +145,7 @@ type loader struct {
144145
dg *dgo.Dgraph
145146
}
146147

147-
func newLoader(opt *BulkOptions) *loader {
148+
func newLoader(opt *BulkOptions, precomputedWriteTs uint64) *loader {
148149
if opt == nil {
149150
log.Fatalf("Cannot create loader with nil options.")
150151
}
@@ -185,17 +186,27 @@ func newLoader(opt *BulkOptions) *loader {
185186
fmt.Printf("Error logging enabled, writing to: %s\n", opt.ErrorLogPath)
186187
}
187188

189+
writeTs := precomputedWriteTs
190+
if writeTs == 0 {
191+
writeTs = getWriteTimestamp(zero, dg)
192+
}
188193
st := &state{
189194
opt: opt,
190195
prog: newProgress(),
191196
shards: newShardMap(opt.MapShards),
192197
// Lots of gz readers, so not much channel buffer needed.
193198
readerChunkCh: make(chan *chunkWithMeta, opt.NumGoroutines),
194-
writeTs: getWriteTimestamp(zero, dg),
199+
writeTs: writeTs,
195200
namespaces: &sync.Map{},
196201
errorLog: errLog,
197202
}
198-
st.schema = newSchemaStore(readSchema(opt), opt, st)
203+
var parsedSchema *schema.ParsedSchema
204+
if !opt.SkipMapPhase {
205+
parsedSchema = readSchema(opt)
206+
} else {
207+
parsedSchema = &schema.ParsedSchema{}
208+
}
209+
st.schema = newSchemaStore(parsedSchema, opt, st)
199210
ld := &loader{
200211
state: st,
201212
mappers: make([]*mapper, opt.NumGoroutines),

dgraph/cmd/bulk/run.go

Lines changed: 88 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ func init() {
7878
flag.Int64("partition_mb", 4, "Pick a partition key every N megabytes of data.")
7979
flag.Bool("skip_map_phase", false,
8080
"Skip the map phase (assumes that map output files already exist).")
81+
flag.Bool("skip_reduce_phase", false,
82+
"Skip the reduce phase (stops after map phase completion).")
8183
flag.Bool("cleanup_tmp", true,
8284
"Clean up the tmp directory after the loader finishes. Setting this to false allows the"+
8385
" bulk loader can be re-run while skipping the map phase.")
@@ -150,6 +152,7 @@ func run() {
150152
MapBufSize: uint64(Bulk.Conf.GetInt("mapoutput_mb")),
151153
PartitionBufSize: int64(Bulk.Conf.GetInt("partition_mb")),
152154
SkipMapPhase: Bulk.Conf.GetBool("skip_map_phase"),
155+
SkipReducePhase: Bulk.Conf.GetBool("skip_reduce_phase"),
153156
CleanupTmp: Bulk.Conf.GetBool("cleanup_tmp"),
154157
NumReducers: Bulk.Conf.GetInt("reducers"),
155158
Version: Bulk.Conf.GetBool("version"),
@@ -205,27 +208,29 @@ func RunBulkLoader(opt BulkOptions) {
205208
}
206209
fmt.Printf("Encrypted input: %v; Encrypted output: %v\n", opt.Encrypted, opt.EncryptedOut)
207210

208-
if opt.SchemaFile == "" {
209-
// if only graphql schema is provided, we can generate DQL schema from it.
210-
if opt.GqlSchemaFile == "" {
211-
fmt.Fprint(os.Stderr, "Schema file must be specified.\n")
212-
os.Exit(1)
211+
if !opt.SkipMapPhase {
212+
if opt.SchemaFile == "" {
213+
// if only graphql schema is provided, we can generate DQL schema from it.
214+
if opt.GqlSchemaFile == "" {
215+
fmt.Fprint(os.Stderr, "Schema file must be specified.\n")
216+
os.Exit(1)
217+
}
218+
} else {
219+
if !filestore.Exists(opt.SchemaFile) {
220+
fmt.Fprintf(os.Stderr, "Schema path(%v) does not exist.\n", opt.SchemaFile)
221+
os.Exit(1)
222+
}
213223
}
214-
} else {
215-
if !filestore.Exists(opt.SchemaFile) {
216-
fmt.Fprintf(os.Stderr, "Schema path(%v) does not exist.\n", opt.SchemaFile)
224+
if opt.DataFiles == "" {
225+
fmt.Fprint(os.Stderr, "RDF or JSON file(s) location must be specified.\n")
217226
os.Exit(1)
218-
}
219-
}
220-
if opt.DataFiles == "" {
221-
fmt.Fprint(os.Stderr, "RDF or JSON file(s) location must be specified.\n")
222-
os.Exit(1)
223-
} else {
224-
fileList := strings.SplitSeq(opt.DataFiles, ",")
225-
for file := range fileList {
226-
if !filestore.Exists(file) {
227-
fmt.Fprintf(os.Stderr, "Data path(%v) does not exist.\n", file)
228-
os.Exit(1)
227+
} else {
228+
fileList := strings.SplitSeq(opt.DataFiles, ",")
229+
for file := range fileList {
230+
if !filestore.Exists(file) {
231+
fmt.Fprintf(os.Stderr, "Data path(%v) does not exist.\n", file)
232+
os.Exit(1)
233+
}
229234
}
230235
}
231236
}
@@ -240,6 +245,20 @@ func RunBulkLoader(opt BulkOptions) {
240245
opt.NumReducers, opt.ReduceShards)
241246
os.Exit(1)
242247
}
248+
249+
// Validate skip phase flags
250+
if opt.SkipMapPhase && opt.SkipReducePhase {
251+
fmt.Fprint(os.Stderr, "Cannot skip both map and reduce phases.\n")
252+
os.Exit(1)
253+
}
254+
if opt.SkipReducePhase {
255+
if Bulk.Cmd.Flags().Changed("cleanup_tmp") && opt.CleanupTmp {
256+
fmt.Fprint(os.Stderr, "Cannot use --skip_reduce_phase with --cleanup_tmp=true. "+
257+
"Temp files must be preserved for the later reduce phase.\n")
258+
os.Exit(1)
259+
}
260+
opt.CleanupTmp = false
261+
}
243262
if opt.CustomTokenizers != "" {
244263
for _, soFile := range strings.Split(opt.CustomTokenizers, ",") {
245264
tok.LoadCustomTokenizer(soFile)
@@ -267,25 +286,28 @@ func RunBulkLoader(opt BulkOptions) {
267286

268287
// Make sure it's OK to create or replace the directory specified with the --out option.
269288
// It is always OK to create or replace the default output directory.
270-
if opt.OutDir != defaultOutDir && !opt.ReplaceOutDir {
271-
err := x.IsMissingOrEmptyDir(opt.OutDir)
272-
if err == nil {
273-
fmt.Fprintf(os.Stderr, "Output directory exists and is not empty."+
274-
" Use --replace_out to overwrite it.\n")
275-
os.Exit(1)
276-
} else if err != x.ErrMissingDir {
277-
x.CheckfNoTrace(err)
289+
// Skip output directory validation if we're only doing map phase
290+
if !opt.SkipReducePhase {
291+
if opt.OutDir != defaultOutDir && !opt.ReplaceOutDir {
292+
err := x.IsMissingOrEmptyDir(opt.OutDir)
293+
if err == nil {
294+
fmt.Fprintf(os.Stderr, "Output directory exists and is not empty."+
295+
" Use --replace_out to overwrite it.\n")
296+
os.Exit(1)
297+
} else if err != x.ErrMissingDir {
298+
x.CheckfNoTrace(err)
299+
}
278300
}
279-
}
280301

281-
// Delete and recreate the output dirs to ensure they are empty.
282-
x.Check(os.RemoveAll(opt.OutDir))
283-
for i := range opt.ReduceShards {
284-
dir := filepath.Join(opt.OutDir, strconv.Itoa(i), "p")
285-
x.Check(os.MkdirAll(dir, 0700))
286-
opt.shardOutputDirs = append(opt.shardOutputDirs, dir)
302+
// Delete and recreate the output dirs to ensure they are empty.
303+
x.Check(os.RemoveAll(opt.OutDir))
304+
for i := range opt.ReduceShards {
305+
dir := filepath.Join(opt.OutDir, strconv.Itoa(i), "p")
306+
x.Check(os.MkdirAll(dir, 0700))
307+
opt.shardOutputDirs = append(opt.shardOutputDirs, dir)
287308

288-
x.Check(x.WriteGroupIdFile(dir, uint32(i+1)))
309+
x.Check(x.WriteGroupIdFile(dir, uint32(i+1)))
310+
}
289311
}
290312

291313
// Create a directory just for bulk loader's usage.
@@ -303,10 +325,26 @@ func RunBulkLoader(opt BulkOptions) {
303325
x.Check(os.MkdirAll(bufDir, 0700))
304326
defer os.RemoveAll(bufDir)
305327

306-
loader := newLoader(&opt)
307-
308328
const bulkMetaFilename = "bulk.meta"
309329
bulkMetaPath := filepath.Join(opt.TmpDir, bulkMetaFilename)
330+
const writeTsFilename = "write.ts"
331+
writeTsPath := filepath.Join(opt.TmpDir, writeTsFilename)
332+
333+
var precomputedWriteTs uint64
334+
if opt.SkipMapPhase {
335+
writeTsData, err := os.ReadFile(writeTsPath)
336+
if err != nil {
337+
fmt.Fprintln(os.Stderr, "Error reading write timestamp file; was --skip_reduce_phase used in the map run?")
338+
os.Exit(1)
339+
}
340+
precomputedWriteTs, err = strconv.ParseUint(strings.TrimSpace(string(writeTsData)), 10, 64)
341+
if err != nil {
342+
fmt.Fprintln(os.Stderr, "Error parsing write timestamp file")
343+
os.Exit(1)
344+
}
345+
}
346+
347+
loader := newLoader(&opt, precomputedWriteTs)
310348

311349
if opt.SkipMapPhase {
312350
bulkMetaData, err := os.ReadFile(bulkMetaPath)
@@ -343,7 +381,20 @@ func RunBulkLoader(opt BulkOptions) {
343381
fmt.Fprintln(os.Stderr, "Error writing to bulk meta file")
344382
os.Exit(1)
345383
}
384+
if err = os.WriteFile(writeTsPath, []byte(strconv.FormatUint(loader.writeTs, 10)), 0600); err != nil {
385+
fmt.Fprintln(os.Stderr, "Error writing write timestamp file")
386+
os.Exit(1)
387+
}
346388
}
389+
390+
if opt.SkipReducePhase {
391+
fmt.Println("Skipping reduce phase. Map phase completed successfully.")
392+
fmt.Println("Temp files preserved for later reduce phase processing.")
393+
// Don't call cleanup() to preserve temp files
394+
loader.prog.endSummary()
395+
return
396+
}
397+
347398
loader.reduceStage()
348399
loader.writeSchema()
349400
loader.cleanup()

dgraphtest/load.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -475,12 +475,15 @@ func (c *LocalCluster) LiveLoadFromExport(exportDir string) error {
475475
}
476476

477477
type BulkOpts struct {
478-
DataFiles []string
479-
SchemaFiles []string
480-
GQLSchemaFiles []string
481-
OutDir string
482-
MapShards int // Number of map shards (0 = auto based on numAlphas/replicas)
483-
ReduceShards int // Number of reduce shards (0 = auto based on numAlphas/replicas)
478+
DataFiles []string
479+
SchemaFiles []string
480+
GQLSchemaFiles []string
481+
OutDir string
482+
MapShards int // Number of map shards (0 = auto based on numAlphas/replicas)
483+
ReduceShards int // Number of reduce shards (0 = auto based on numAlphas/replicas)
484+
SkipReducePhase bool // Stop after map phase; preserve tmp dir for later reduce
485+
SkipMapPhase bool // Skip map phase; assumes map output files already exist
486+
TmpDir string // Custom tmp directory (required when splitting map/reduce runs)
484487
}
485488

486489
func (c *LocalCluster) BulkLoad(opts BulkOpts) error {
@@ -518,6 +521,16 @@ func (c *LocalCluster) BulkLoad(opts BulkOpts) error {
518521
"--http", ":0",
519522
}
520523

524+
if opts.TmpDir != "" {
525+
args = append(args, "--tmp", opts.TmpDir)
526+
}
527+
if opts.SkipReducePhase {
528+
args = append(args, "--skip_reduce_phase")
529+
}
530+
if opts.SkipMapPhase {
531+
args = append(args, "--skip_map_phase")
532+
}
533+
521534
if len(opts.DataFiles) > 0 {
522535
args = append(args, "-f", strings.Join(opts.DataFiles, ","))
523536
}

systest/integration2/bulk_loader_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,66 @@ const (
6262
]`
6363
)
6464

65+
func TestBulkLoaderSkipReducePhase(t *testing.T) {
66+
tmpDir := t.TempDir()
67+
68+
conf := dgraphtest.NewClusterConfig().WithNumAlphas(1).WithNumZeros(1).
69+
WithACL(time.Hour).WithReplicas(1).WithBulkLoadOutDir(t.TempDir())
70+
c, err := dgraphtest.NewLocalCluster(conf)
71+
require.NoError(t, err)
72+
defer func() { c.Cleanup(t.Failed()) }()
73+
74+
require.NoError(t, c.StartZero(0))
75+
require.NoError(t, c.HealthCheck(true))
76+
77+
baseDir := t.TempDir()
78+
dataFile := filepath.Join(baseDir, "data.json")
79+
require.NoError(t, os.WriteFile(dataFile, []byte(jsonData), os.ModePerm))
80+
gqlSchemaFile := filepath.Join(baseDir, "gql.schema")
81+
require.NoError(t, os.WriteFile(gqlSchemaFile, []byte(gqlSchema), os.ModePerm))
82+
83+
// First run: map phase only, preserve tmp dir
84+
mapOpts := dgraphtest.BulkOpts{
85+
DataFiles: []string{dataFile},
86+
GQLSchemaFiles: []string{gqlSchemaFile},
87+
TmpDir: tmpDir,
88+
SkipReducePhase: true,
89+
}
90+
require.NoError(t, c.BulkLoad(mapOpts))
91+
92+
// Second run: reduce phase only, using the same tmp dir.
93+
// Data and schema files are not needed; all input was processed in the map phase.
94+
reduceOpts := dgraphtest.BulkOpts{
95+
TmpDir: tmpDir,
96+
SkipMapPhase: true,
97+
}
98+
require.NoError(t, c.BulkLoad(reduceOpts))
99+
100+
require.NoError(t, c.Start())
101+
102+
hc, err := c.HTTPClient()
103+
require.NoError(t, err)
104+
require.NoError(t, hc.LoginIntoNamespace(dgraphapi.DefaultUser,
105+
dgraphapi.DefaultPassword, x.RootNamespace))
106+
107+
params := dgraphapi.GraphQLParams{
108+
Query: `query {
109+
getMessage(uniqueId: 3) {
110+
content
111+
author
112+
}
113+
}`,
114+
}
115+
data, err := hc.RunGraphqlQuery(params, false)
116+
require.NoError(t, err)
117+
require.NoError(t, dgraphapi.CompareJSON(`{
118+
"getMessage": {
119+
"content": "DVTCTXCVYI",
120+
"author": "USYMVFJYXA"
121+
}
122+
}`, string(data)))
123+
}
124+
65125
func TestBulkLoaderNoDqlSchema(t *testing.T) {
66126
conf := dgraphtest.NewClusterConfig().WithNumAlphas(2).WithNumZeros(1).
67127
WithACL(time.Hour).WithReplicas(1).WithBulkLoadOutDir(t.TempDir())

0 commit comments

Comments
 (0)