diff --git a/crates/static-file/static-file/src/static_file_producer.rs b/crates/static-file/static-file/src/static_file_producer.rs index 5edd5a995de..61d08abc585 100644 --- a/crates/static-file/static-file/src/static_file_producer.rs +++ b/crates/static-file/static-file/src/static_file_producer.rs @@ -152,9 +152,11 @@ where self.provider.static_file_provider().commit()?; for (segment, block_range) in segments { - self.provider - .static_file_provider() - .update_index(segment.segment(), Some(*block_range.end()))?; + self.provider.static_file_provider().update_index( + segment.segment(), + Some(*block_range.end()), + None, + )?; } let elapsed = start.elapsed(); // TODO(alexey): track in metrics diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index dc79d779097..9792337d72b 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -1036,10 +1036,15 @@ impl StaticFileProvider { /// Any entry higher than `segment_max_block` will be deleted from the previous structures. /// /// If `segment_max_block` is None it means there's no static file for this segment. + /// + /// If `segment_header` comes from the live writer for the indexed range, its in-memory state is + /// used for bookkeeping so the append path does not need to reopen the jar unless readers are + /// already caching that range. pub fn update_index( &self, segment: StaticFileSegment, segment_max_block: Option, + segment_header: Option<&SegmentHeader>, ) -> ProviderResult<()> { debug!( target: "providers::static_file", @@ -1056,25 +1061,29 @@ impl StaticFileProvider { indexes.get(segment).map(|index| &index.expected_block_ranges_by_max_block), segment_max_block, ); - - let jar = NippyJar::::load( - &self.path.join(segment.filename(&fixed_range)), - ) - .map_err(ProviderError::other)?; + let key = (fixed_range.end(), segment); + let path = self.path.join(segment.filename(&fixed_range)); + let should_refresh_cached_jar = self.map.contains_key(&key); + let mut loaded_jar = None; + let current_header = if let Some(segment_header) = + segment_header.filter(|header| header.expected_block_range() == fixed_range) + { + segment_header.clone() + } else { + let jar = + NippyJar::::load(&path).map_err(ProviderError::other)?; + let header = jar.user_header().clone(); + loaded_jar = Some(jar); + header + }; let index = indexes .entry(segment) .and_modify(|index| { - // Update max block index.max_block = segment_max_block; - - // Update expected block range index - - // Remove all expected block ranges that are less than the new max block index .expected_block_ranges_by_max_block .retain(|_, block_range| block_range.start() < fixed_range.start()); - // Insert new expected block range index .expected_block_ranges_by_max_block .insert(fixed_range.end(), fixed_range); @@ -1104,10 +1113,8 @@ impl StaticFileProvider { // 2. Sync to block 100, this update sets min_block = [0..=100] // 3. Pruner calls get_lowest_static_file_block() -> returns 100 (correct). Without // this update, it would incorrectly return 0 (stale) - if let Some(current_block_range) = jar.user_header().block_range() { + if let Some(current_block_range) = current_header.block_range() { if let Some(min_block_range) = index.min_block_range.as_mut() { - // delete_jar WILL ALWAYS re-initialize all indexes, so we are always - // sure that current_min is always the lowest. if current_block_range.start() == min_block_range.start() { *min_block_range = current_block_range; } @@ -1116,22 +1123,10 @@ impl StaticFileProvider { } } - // Updates the tx index by first removing all entries which have a higher - // block_start than our current static file. - if let Some(tx_range) = jar.user_header().tx_range() { - // Current block range has the same block start as `fixed_range``, but block end - // might be different if we are still filling this static file. - if let Some(current_block_range) = jar.user_header().block_range() { + if let Some(tx_range) = current_header.tx_range() { + if let Some(current_block_range) = current_header.block_range() { let tx_end = tx_range.end(); - // Considering that `update_index` is called when we either append/truncate, - // we are sure that we are handling the latest data - // points. - // - // Here we remove every entry of the index that has a block start higher or - // equal than our current one. This is important in the case - // that we prune a lot of rows resulting in a file (and thus - // a higher block range) deletion. if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() { index .retain(|_, block_range| block_range.start() < fixed_range.start()); @@ -1142,22 +1137,24 @@ impl StaticFileProvider { } } } else if segment.is_tx_based() { - // The unwinded file has no more transactions/receipts. However, the highest - // block is within this files' block range. We only retain - // entries with block ranges before the current one. if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() { index.retain(|_, block_range| block_range.start() < fixed_range.start()); } - // If the index is empty, just remove it. index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty()); } - // Update the cached provider. - debug!(target: "providers::static_file", ?segment, "Inserting updated jar into cache"); - self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?); + // Refresh the cached provider only when readers already have this range resident. + if should_refresh_cached_jar { + debug!(target: "providers::static_file", ?segment, ?fixed_range, "Refreshing cached jar"); + let jar = if let Some(jar) = loaded_jar { + jar + } else { + NippyJar::::load(&path).map_err(ProviderError::other)? + }; + self.map.insert(key, LoadedJar::new(jar)?); + } - // Delete any cached provider that no longer has an associated jar. debug!(target: "providers::static_file", ?segment, "Cleaning up jar map"); self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end())); } @@ -3020,11 +3017,15 @@ where mod tests { use std::collections::BTreeMap; + use alloy_consensus::Header; + use alloy_primitives::BlockHash; use reth_chain_state::EthPrimitives; use reth_db::test_utils::create_test_static_files_dir; use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment}; - use crate::{providers::StaticFileProvider, StaticFileProviderBuilder}; + use crate::{ + providers::StaticFileProvider, HeaderProvider, StaticFileProviderBuilder, StaticFileWriter, + }; #[test] fn test_find_fixed_range_with_block_index() -> eyre::Result<()> { @@ -3145,4 +3146,40 @@ mod tests { Ok(()) } + + #[test] + fn test_update_index_only_refreshes_cached_jar() -> eyre::Result<()> { + let (static_dir, _) = create_test_static_files_dir(); + let provider: StaticFileProvider = + StaticFileProviderBuilder::read_write(&static_dir).with_blocks_per_file(10).build()?; + + let mut writer = provider.latest_writer(StaticFileSegment::Headers)?; + let mut header = Header::default(); + for number in 0..=4 { + header.number = number; + writer.append_header(&header, &BlockHash::default())?; + } + writer.commit()?; + + assert!(provider.map.is_empty(), "uncached updates should stay lazy"); + + let cached = + provider.get_segment_provider_for_block(StaticFileSegment::Headers, 0, None)?; + assert_eq!(cached.header_by_number(4)?.unwrap().number, 4); + assert_eq!(provider.map.len(), 1, "reader access should populate the cache"); + drop(cached); + + for number in 5..=6 { + header.number = number; + writer.append_header(&header, &BlockHash::default())?; + } + writer.commit()?; + + assert_eq!(provider.map.len(), 1, "refreshing should reuse the existing cache slot"); + let refreshed = + provider.get_segment_provider_for_block(StaticFileSegment::Headers, 0, None)?; + assert_eq!(refreshed.header_by_number(6)?.unwrap().number, 6); + + Ok(()) + } } diff --git a/crates/storage/provider/src/providers/static_file/writer.rs b/crates/storage/provider/src/providers/static_file/writer.rs index b98569a6106..0052f0e373e 100644 --- a/crates/storage/provider/src/providers/static_file/writer.rs +++ b/crates/storage/provider/src/providers/static_file/writer.rs @@ -697,6 +697,7 @@ impl StaticFileProviderRW { /// Updates the `self.reader` internal index. fn update_index(&self) -> ProviderResult<()> { let segment = self.writer.user_header().segment(); + let segment_header = self.writer.user_header(); // We find the maximum block of the segment by checking this writer's last block. // @@ -723,7 +724,7 @@ impl StaticFileProviderRW { prev_path.exists().then_some(prev_block) }); - self.reader().update_index(segment, segment_max_block) + self.reader().update_index(segment, segment_max_block, Some(segment_header)) } /// Ensures that the writer is positioned at the specified block number.