@@ -78,6 +78,8 @@ func init() {
7878 flag .Int64 ("partition_mb" , 4 , "Pick a partition key every N megabytes of data." )
7979 flag .Bool ("skip_map_phase" , false ,
8080 "Skip the map phase (assumes that map output files already exist)." )
81+ flag .Bool ("skip_reduce_phase" , false ,
82+ "Skip the reduce phase (stops after map phase completion)." )
8183 flag .Bool ("cleanup_tmp" , true ,
8284 "Clean up the tmp directory after the loader finishes. Setting this to false allows the" +
8385 " bulk loader can be re-run while skipping the map phase." )
@@ -150,6 +152,7 @@ func run() {
150152 MapBufSize : uint64 (Bulk .Conf .GetInt ("mapoutput_mb" )),
151153 PartitionBufSize : int64 (Bulk .Conf .GetInt ("partition_mb" )),
152154 SkipMapPhase : Bulk .Conf .GetBool ("skip_map_phase" ),
155+ SkipReducePhase : Bulk .Conf .GetBool ("skip_reduce_phase" ),
153156 CleanupTmp : Bulk .Conf .GetBool ("cleanup_tmp" ),
154157 NumReducers : Bulk .Conf .GetInt ("reducers" ),
155158 Version : Bulk .Conf .GetBool ("version" ),
@@ -205,27 +208,29 @@ func RunBulkLoader(opt BulkOptions) {
205208 }
206209 fmt .Printf ("Encrypted input: %v; Encrypted output: %v\n " , opt .Encrypted , opt .EncryptedOut )
207210
208- if opt .SchemaFile == "" {
209- // if only graphql schema is provided, we can generate DQL schema from it.
210- if opt .GqlSchemaFile == "" {
211- fmt .Fprint (os .Stderr , "Schema file must be specified.\n " )
212- os .Exit (1 )
211+ if ! opt .SkipMapPhase {
212+ if opt .SchemaFile == "" {
213+ // if only graphql schema is provided, we can generate DQL schema from it.
214+ if opt .GqlSchemaFile == "" {
215+ fmt .Fprint (os .Stderr , "Schema file must be specified.\n " )
216+ os .Exit (1 )
217+ }
218+ } else {
219+ if ! filestore .Exists (opt .SchemaFile ) {
220+ fmt .Fprintf (os .Stderr , "Schema path(%v) does not exist.\n " , opt .SchemaFile )
221+ os .Exit (1 )
222+ }
213223 }
214- } else {
215- if ! filestore .Exists (opt .SchemaFile ) {
216- fmt .Fprintf (os .Stderr , "Schema path(%v) does not exist.\n " , opt .SchemaFile )
224+ if opt .DataFiles == "" {
225+ fmt .Fprint (os .Stderr , "RDF or JSON file(s) location must be specified.\n " )
217226 os .Exit (1 )
218- }
219- }
220- if opt .DataFiles == "" {
221- fmt .Fprint (os .Stderr , "RDF or JSON file(s) location must be specified.\n " )
222- os .Exit (1 )
223- } else {
224- fileList := strings .SplitSeq (opt .DataFiles , "," )
225- for file := range fileList {
226- if ! filestore .Exists (file ) {
227- fmt .Fprintf (os .Stderr , "Data path(%v) does not exist.\n " , file )
228- os .Exit (1 )
227+ } else {
228+ fileList := strings .SplitSeq (opt .DataFiles , "," )
229+ for file := range fileList {
230+ if ! filestore .Exists (file ) {
231+ fmt .Fprintf (os .Stderr , "Data path(%v) does not exist.\n " , file )
232+ os .Exit (1 )
233+ }
229234 }
230235 }
231236 }
@@ -240,6 +245,20 @@ func RunBulkLoader(opt BulkOptions) {
240245 opt .NumReducers , opt .ReduceShards )
241246 os .Exit (1 )
242247 }
248+
249+ // Validate skip phase flags
250+ if opt .SkipMapPhase && opt .SkipReducePhase {
251+ fmt .Fprint (os .Stderr , "Cannot skip both map and reduce phases.\n " )
252+ os .Exit (1 )
253+ }
254+ if opt .SkipReducePhase {
255+ if Bulk .Cmd .Flags ().Changed ("cleanup_tmp" ) && opt .CleanupTmp {
256+ fmt .Fprint (os .Stderr , "Cannot use --skip_reduce_phase with --cleanup_tmp=true. " +
257+ "Temp files must be preserved for the later reduce phase.\n " )
258+ os .Exit (1 )
259+ }
260+ opt .CleanupTmp = false
261+ }
243262 if opt .CustomTokenizers != "" {
244263 for _ , soFile := range strings .Split (opt .CustomTokenizers , "," ) {
245264 tok .LoadCustomTokenizer (soFile )
@@ -267,25 +286,28 @@ func RunBulkLoader(opt BulkOptions) {
267286
268287 // Make sure it's OK to create or replace the directory specified with the --out option.
269288 // It is always OK to create or replace the default output directory.
270- if opt .OutDir != defaultOutDir && ! opt .ReplaceOutDir {
271- err := x .IsMissingOrEmptyDir (opt .OutDir )
272- if err == nil {
273- fmt .Fprintf (os .Stderr , "Output directory exists and is not empty." +
274- " Use --replace_out to overwrite it.\n " )
275- os .Exit (1 )
276- } else if err != x .ErrMissingDir {
277- x .CheckfNoTrace (err )
289+ // Skip output directory validation if we're only doing map phase
290+ if ! opt .SkipReducePhase {
291+ if opt .OutDir != defaultOutDir && ! opt .ReplaceOutDir {
292+ err := x .IsMissingOrEmptyDir (opt .OutDir )
293+ if err == nil {
294+ fmt .Fprintf (os .Stderr , "Output directory exists and is not empty." +
295+ " Use --replace_out to overwrite it.\n " )
296+ os .Exit (1 )
297+ } else if err != x .ErrMissingDir {
298+ x .CheckfNoTrace (err )
299+ }
278300 }
279- }
280301
281- // Delete and recreate the output dirs to ensure they are empty.
282- x .Check (os .RemoveAll (opt .OutDir ))
283- for i := range opt .ReduceShards {
284- dir := filepath .Join (opt .OutDir , strconv .Itoa (i ), "p" )
285- x .Check (os .MkdirAll (dir , 0700 ))
286- opt .shardOutputDirs = append (opt .shardOutputDirs , dir )
302+ // Delete and recreate the output dirs to ensure they are empty.
303+ x .Check (os .RemoveAll (opt .OutDir ))
304+ for i := range opt .ReduceShards {
305+ dir := filepath .Join (opt .OutDir , strconv .Itoa (i ), "p" )
306+ x .Check (os .MkdirAll (dir , 0700 ))
307+ opt .shardOutputDirs = append (opt .shardOutputDirs , dir )
287308
288- x .Check (x .WriteGroupIdFile (dir , uint32 (i + 1 )))
309+ x .Check (x .WriteGroupIdFile (dir , uint32 (i + 1 )))
310+ }
289311 }
290312
291313 // Create a directory just for bulk loader's usage.
@@ -303,10 +325,26 @@ func RunBulkLoader(opt BulkOptions) {
303325 x .Check (os .MkdirAll (bufDir , 0700 ))
304326 defer os .RemoveAll (bufDir )
305327
306- loader := newLoader (& opt )
307-
308328 const bulkMetaFilename = "bulk.meta"
309329 bulkMetaPath := filepath .Join (opt .TmpDir , bulkMetaFilename )
330+ const writeTsFilename = "write.ts"
331+ writeTsPath := filepath .Join (opt .TmpDir , writeTsFilename )
332+
333+ var precomputedWriteTs uint64
334+ if opt .SkipMapPhase {
335+ writeTsData , err := os .ReadFile (writeTsPath )
336+ if err != nil {
337+ fmt .Fprintln (os .Stderr , "Error reading write timestamp file; was --skip_reduce_phase used in the map run?" )
338+ os .Exit (1 )
339+ }
340+ precomputedWriteTs , err = strconv .ParseUint (strings .TrimSpace (string (writeTsData )), 10 , 64 )
341+ if err != nil {
342+ fmt .Fprintln (os .Stderr , "Error parsing write timestamp file" )
343+ os .Exit (1 )
344+ }
345+ }
346+
347+ loader := newLoader (& opt , precomputedWriteTs )
310348
311349 if opt .SkipMapPhase {
312350 bulkMetaData , err := os .ReadFile (bulkMetaPath )
@@ -343,7 +381,20 @@ func RunBulkLoader(opt BulkOptions) {
343381 fmt .Fprintln (os .Stderr , "Error writing to bulk meta file" )
344382 os .Exit (1 )
345383 }
384+ if err = os .WriteFile (writeTsPath , []byte (strconv .FormatUint (loader .writeTs , 10 )), 0600 ); err != nil {
385+ fmt .Fprintln (os .Stderr , "Error writing write timestamp file" )
386+ os .Exit (1 )
387+ }
346388 }
389+
390+ if opt .SkipReducePhase {
391+ fmt .Println ("Skipping reduce phase. Map phase completed successfully." )
392+ fmt .Println ("Temp files preserved for later reduce phase processing." )
393+ // Don't call cleanup() to preserve temp files
394+ loader .prog .endSummary ()
395+ return
396+ }
397+
347398 loader .reduceStage ()
348399 loader .writeSchema ()
349400 loader .cleanup ()
0 commit comments