@@ -568,7 +568,7 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
568568 CommitOptions {
569569 mut new_meta_files,
570570 new_sst_files,
571- mut new_blob_files,
571+ new_blob_files,
572572 mut sst_seq_numbers_to_delete,
573573 mut blob_seq_numbers_to_delete,
574574 sequence_number : mut seq,
@@ -580,28 +580,67 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
580580 new_meta_files. sort_unstable_by_key ( |( seq, _) | * seq) ;
581581
582582 let sync_span = tracing:: trace_span!( "sync new files" ) . entered ( ) ;
583- let mut new_meta_files = self
583+
584+ enum SyncItem {
585+ Meta ( u32 , File ) ,
586+ Sst ( File ) ,
587+ Blob ( u32 , File ) ,
588+ }
589+ enum SyncResult {
590+ Meta ( MetaFile ) ,
591+ Sst ,
592+ Blob ( u32 , File ) ,
593+ }
594+
595+ let mut sync_items: Vec < SyncItem > =
596+ Vec :: with_capacity ( new_meta_files. len ( ) + new_sst_files. len ( ) + new_blob_files. len ( ) ) ;
597+ for ( seq, file) in new_meta_files {
598+ sync_items. push ( SyncItem :: Meta ( seq, file) ) ;
599+ }
600+ for ( _, file) in new_sst_files {
601+ sync_items. push ( SyncItem :: Sst ( file) ) ;
602+ }
603+ for ( seq, file) in new_blob_files {
604+ sync_items. push ( SyncItem :: Blob ( seq, file) ) ;
605+ }
606+
607+ let results: Vec < SyncResult > = self
584608 . parallel_scheduler
585- . parallel_map_collect_owned :: < _ , _ , Result < Vec < _ > > > ( new_meta_files, |( seq, file) | {
586- file. sync_all ( ) ?;
587- let meta_file = MetaFile :: open ( & self . path , seq) ?;
588- Ok ( meta_file)
609+ . parallel_map_collect_owned :: < _ , _ , Result < Vec < _ > > > ( sync_items, |item| match item {
610+ SyncItem :: Meta ( seq, file) => {
611+ file. sync_data ( ) ?;
612+ let meta_file = MetaFile :: open ( & self . path , seq) ?;
613+ Ok ( SyncResult :: Meta ( meta_file) )
614+ }
615+ SyncItem :: Sst ( file) => {
616+ file. sync_data ( ) ?;
617+ Ok ( SyncResult :: Sst )
618+ }
619+ SyncItem :: Blob ( seq, file) => {
620+ file. sync_data ( ) ?;
621+ Ok ( SyncResult :: Blob ( seq, file) )
622+ }
589623 } ) ?;
590624
625+ let mut new_meta_files: Vec < MetaFile > = Vec :: new ( ) ;
626+ let mut new_blob_files: Vec < ( u32 , File ) > = Vec :: new ( ) ;
627+ for result in results {
628+ match result {
629+ SyncResult :: Meta ( mf) => new_meta_files. push ( mf) ,
630+ SyncResult :: Sst => { }
631+ SyncResult :: Blob ( seq, file) => new_blob_files. push ( ( seq, file) ) ,
632+ }
633+ }
634+
591635 let mut sst_filter = SstFilter :: new ( ) ;
592636 for meta_file in new_meta_files. iter_mut ( ) . rev ( ) {
593637 sst_filter. apply_filter ( meta_file) ;
594638 }
595639
596- self . parallel_scheduler . block_in_place ( || {
597- for ( _, file) in new_sst_files. iter ( ) {
598- file. sync_all ( ) ?;
599- }
600- for ( _, file) in new_blob_files. iter ( ) {
601- file. sync_all ( ) ?;
602- }
603- anyhow:: Ok ( ( ) )
604- } ) ?;
640+ // Sync the directory to ensure the new directory entries (file name → inode mappings)
641+ // are durable before we update CURRENT. Without this, a crash could leave CURRENT pointing
642+ // to files whose directory entries were lost even though their data was flushed.
643+ File :: open ( & self . path ) ?. sync_data ( ) ?;
605644 drop ( sync_span) ;
606645
607646 let new_meta_info = new_meta_files
@@ -679,7 +718,7 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
679718 }
680719 let mut file = File :: create ( self . path . join ( format ! ( "{seq:08}.del" ) ) ) ?;
681720 file. write_all ( & buf) ?;
682- file. sync_all ( ) ?;
721+ file. sync_data ( ) ?;
683722 }
684723
685724 let mut current_file = OpenOptions :: new ( )
@@ -688,7 +727,7 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
688727 . read ( false )
689728 . open ( self . path . join ( "CURRENT" ) ) ?;
690729 current_file. write_u32 :: < BE > ( seq) ?;
691- current_file. sync_all ( ) ?;
730+ current_file. sync_data ( ) ?;
692731
693732 for seq in sst_seq_numbers_to_delete. iter ( ) {
694733 fs:: remove_file ( self . path . join ( format ! ( "{seq:08}.sst" ) ) ) ?;
0 commit comments