Skip to content

Commit 36105f6

Browse files
Use the computed TS from the map phase
Allows no zero in the reduce phase
1 parent 77e8023 commit 36105f6

3 files changed

Lines changed: 60 additions & 29 deletions

File tree

dgraph/cmd/bulk/loader.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ type loader struct {
145145
dg *dgo.Dgraph
146146
}
147147

148-
func newLoader(opt *BulkOptions) *loader {
148+
func newLoader(opt *BulkOptions, precomputedWriteTs uint64) *loader {
149149
if opt == nil {
150150
log.Fatalf("Cannot create loader with nil options.")
151151
}
@@ -186,17 +186,27 @@ func newLoader(opt *BulkOptions) *loader {
186186
fmt.Printf("Error logging enabled, writing to: %s\n", opt.ErrorLogPath)
187187
}
188188

189+
writeTs := precomputedWriteTs
190+
if writeTs == 0 {
191+
writeTs = getWriteTimestamp(zero, dg)
192+
}
189193
st := &state{
190194
opt: opt,
191195
prog: newProgress(),
192196
shards: newShardMap(opt.MapShards),
193197
// Lots of gz readers, so not much channel buffer needed.
194198
readerChunkCh: make(chan *chunkWithMeta, opt.NumGoroutines),
195-
writeTs: getWriteTimestamp(zero, dg),
199+
writeTs: writeTs,
196200
namespaces: &sync.Map{},
197201
errorLog: errLog,
198202
}
199-
st.schema = newSchemaStore(readSchema(opt), opt, st)
203+
var parsedSchema *schema.ParsedSchema
204+
if !opt.SkipMapPhase {
205+
parsedSchema = readSchema(opt)
206+
} else {
207+
parsedSchema = &schema.ParsedSchema{}
208+
}
209+
st.schema = newSchemaStore(parsedSchema, opt, st)
200210
ld := &loader{
201211
state: st,
202212
mappers: make([]*mapper, opt.NumGoroutines),

dgraph/cmd/bulk/run.go

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -208,27 +208,29 @@ func RunBulkLoader(opt BulkOptions) {
208208
}
209209
fmt.Printf("Encrypted input: %v; Encrypted output: %v\n", opt.Encrypted, opt.EncryptedOut)
210210

211-
if opt.SchemaFile == "" {
212-
// if only graphql schema is provided, we can generate DQL schema from it.
213-
if opt.GqlSchemaFile == "" {
214-
fmt.Fprint(os.Stderr, "Schema file must be specified.\n")
215-
os.Exit(1)
211+
if !opt.SkipMapPhase {
212+
if opt.SchemaFile == "" {
213+
// if only graphql schema is provided, we can generate DQL schema from it.
214+
if opt.GqlSchemaFile == "" {
215+
fmt.Fprint(os.Stderr, "Schema file must be specified.\n")
216+
os.Exit(1)
217+
}
218+
} else {
219+
if !filestore.Exists(opt.SchemaFile) {
220+
fmt.Fprintf(os.Stderr, "Schema path(%v) does not exist.\n", opt.SchemaFile)
221+
os.Exit(1)
222+
}
216223
}
217-
} else {
218-
if !filestore.Exists(opt.SchemaFile) {
219-
fmt.Fprintf(os.Stderr, "Schema path(%v) does not exist.\n", opt.SchemaFile)
224+
if opt.DataFiles == "" {
225+
fmt.Fprint(os.Stderr, "RDF or JSON file(s) location must be specified.\n")
220226
os.Exit(1)
221-
}
222-
}
223-
if opt.DataFiles == "" {
224-
fmt.Fprint(os.Stderr, "RDF or JSON file(s) location must be specified.\n")
225-
os.Exit(1)
226-
} else {
227-
fileList := strings.SplitSeq(opt.DataFiles, ",")
228-
for file := range fileList {
229-
if !filestore.Exists(file) {
230-
fmt.Fprintf(os.Stderr, "Data path(%v) does not exist.\n", file)
231-
os.Exit(1)
227+
} else {
228+
fileList := strings.SplitSeq(opt.DataFiles, ",")
229+
for file := range fileList {
230+
if !filestore.Exists(file) {
231+
fmt.Fprintf(os.Stderr, "Data path(%v) does not exist.\n", file)
232+
os.Exit(1)
233+
}
232234
}
233235
}
234236
}
@@ -323,10 +325,26 @@ func RunBulkLoader(opt BulkOptions) {
323325
x.Check(os.MkdirAll(bufDir, 0700))
324326
defer os.RemoveAll(bufDir)
325327

326-
loader := newLoader(&opt)
327-
328328
const bulkMetaFilename = "bulk.meta"
329329
bulkMetaPath := filepath.Join(opt.TmpDir, bulkMetaFilename)
330+
const writeTsFilename = "write.ts"
331+
writeTsPath := filepath.Join(opt.TmpDir, writeTsFilename)
332+
333+
var precomputedWriteTs uint64
334+
if opt.SkipMapPhase {
335+
writeTsData, err := os.ReadFile(writeTsPath)
336+
if err != nil {
337+
fmt.Fprintln(os.Stderr, "Error reading write timestamp file; was --skip_reduce_phase used in the map run?")
338+
os.Exit(1)
339+
}
340+
precomputedWriteTs, err = strconv.ParseUint(strings.TrimSpace(string(writeTsData)), 10, 64)
341+
if err != nil {
342+
fmt.Fprintln(os.Stderr, "Error parsing write timestamp file")
343+
os.Exit(1)
344+
}
345+
}
346+
347+
loader := newLoader(&opt, precomputedWriteTs)
330348

331349
if opt.SkipMapPhase {
332350
bulkMetaData, err := os.ReadFile(bulkMetaPath)
@@ -363,6 +381,10 @@ func RunBulkLoader(opt BulkOptions) {
363381
fmt.Fprintln(os.Stderr, "Error writing to bulk meta file")
364382
os.Exit(1)
365383
}
384+
if err = os.WriteFile(writeTsPath, []byte(strconv.FormatUint(loader.writeTs, 10)), 0600); err != nil {
385+
fmt.Fprintln(os.Stderr, "Error writing write timestamp file")
386+
os.Exit(1)
387+
}
366388
}
367389

368390
if opt.SkipReducePhase {

systest/integration2/bulk_loader_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,11 @@ func TestBulkLoaderSkipReducePhase(t *testing.T) {
8989
}
9090
require.NoError(t, c.BulkLoad(mapOpts))
9191

92-
// Second run: reduce phase only, using the same tmp dir
92+
// Second run: reduce phase only, using the same tmp dir.
93+
// Data and schema files are not needed; all input was processed in the map phase.
9394
reduceOpts := dgraphtest.BulkOpts{
94-
DataFiles: []string{dataFile},
95-
GQLSchemaFiles: []string{gqlSchemaFile},
96-
TmpDir: tmpDir,
97-
SkipMapPhase: true,
95+
TmpDir: tmpDir,
96+
SkipMapPhase: true,
9897
}
9998
require.NoError(t, c.BulkLoad(reduceOpts))
10099

0 commit comments

Comments
 (0)