From 64a5a9cb10f952d801574707de1d47fa95de6a48 Mon Sep 17 00:00:00 2001 From: Amp Date: Mon, 4 May 2026 09:14:35 +0000 Subject: [PATCH] perf(provider): lazily refresh static-file jar cache Pass the live SegmentHeader into static-file index updates so append-side bookkeeping can reuse the writer's in-memory state instead of reopening the jar for every write. This keeps the steady-state save_blocks path from doing redundant jar loads and mmaps when only the provider indexes need to advance. Only rebuild the cached LoadedJar when that segment range is already resident in the reader cache, and leave uncached ranges to be loaded lazily on demand. Update the static-file producer call site for the new API and cover the lazy-refresh behavior with a provider test. Amp-Thread-ID: https://ampcode.com/threads/T-019df23e-aac0-71af-8498-f77b1613ea22 Co-authored-by: Amp --- .../static-file/src/static_file_producer.rs | 8 +- .../src/providers/static_file/manager.rs | 111 ++++++++++++------ .../src/providers/static_file/writer.rs | 3 +- 3 files changed, 81 insertions(+), 41 deletions(-) diff --git a/crates/static-file/static-file/src/static_file_producer.rs b/crates/static-file/static-file/src/static_file_producer.rs index 5edd5a995de..61d08abc585 100644 --- a/crates/static-file/static-file/src/static_file_producer.rs +++ b/crates/static-file/static-file/src/static_file_producer.rs @@ -152,9 +152,11 @@ where self.provider.static_file_provider().commit()?; for (segment, block_range) in segments { - self.provider - .static_file_provider() - .update_index(segment.segment(), Some(*block_range.end()))?; + self.provider.static_file_provider().update_index( + segment.segment(), + Some(*block_range.end()), + None, + )?; } let elapsed = start.elapsed(); // TODO(alexey): track in metrics diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index dc79d779097..9792337d72b 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -1036,10 +1036,15 @@ impl StaticFileProvider { /// Any entry higher than `segment_max_block` will be deleted from the previous structures. /// /// If `segment_max_block` is None it means there's no static file for this segment. + /// + /// If `segment_header` comes from the live writer for the indexed range, its in-memory state is + /// used for bookkeeping so the append path does not need to reopen the jar unless readers are + /// already caching that range. pub fn update_index( &self, segment: StaticFileSegment, segment_max_block: Option, + segment_header: Option<&SegmentHeader>, ) -> ProviderResult<()> { debug!( target: "providers::static_file", @@ -1056,25 +1061,29 @@ impl StaticFileProvider { indexes.get(segment).map(|index| &index.expected_block_ranges_by_max_block), segment_max_block, ); - - let jar = NippyJar::::load( - &self.path.join(segment.filename(&fixed_range)), - ) - .map_err(ProviderError::other)?; + let key = (fixed_range.end(), segment); + let path = self.path.join(segment.filename(&fixed_range)); + let should_refresh_cached_jar = self.map.contains_key(&key); + let mut loaded_jar = None; + let current_header = if let Some(segment_header) = + segment_header.filter(|header| header.expected_block_range() == fixed_range) + { + segment_header.clone() + } else { + let jar = + NippyJar::::load(&path).map_err(ProviderError::other)?; + let header = jar.user_header().clone(); + loaded_jar = Some(jar); + header + }; let index = indexes .entry(segment) .and_modify(|index| { - // Update max block index.max_block = segment_max_block; - - // Update expected block range index - - // Remove all expected block ranges that are less than the new max block index .expected_block_ranges_by_max_block .retain(|_, block_range| block_range.start() < fixed_range.start()); - // Insert new expected block range index .expected_block_ranges_by_max_block .insert(fixed_range.end(), fixed_range); @@ -1104,10 +1113,8 @@ impl StaticFileProvider { // 2. Sync to block 100, this update sets min_block = [0..=100] // 3. Pruner calls get_lowest_static_file_block() -> returns 100 (correct). Without // this update, it would incorrectly return 0 (stale) - if let Some(current_block_range) = jar.user_header().block_range() { + if let Some(current_block_range) = current_header.block_range() { if let Some(min_block_range) = index.min_block_range.as_mut() { - // delete_jar WILL ALWAYS re-initialize all indexes, so we are always - // sure that current_min is always the lowest. if current_block_range.start() == min_block_range.start() { *min_block_range = current_block_range; } @@ -1116,22 +1123,10 @@ impl StaticFileProvider { } } - // Updates the tx index by first removing all entries which have a higher - // block_start than our current static file. - if let Some(tx_range) = jar.user_header().tx_range() { - // Current block range has the same block start as `fixed_range``, but block end - // might be different if we are still filling this static file. - if let Some(current_block_range) = jar.user_header().block_range() { + if let Some(tx_range) = current_header.tx_range() { + if let Some(current_block_range) = current_header.block_range() { let tx_end = tx_range.end(); - // Considering that `update_index` is called when we either append/truncate, - // we are sure that we are handling the latest data - // points. - // - // Here we remove every entry of the index that has a block start higher or - // equal than our current one. This is important in the case - // that we prune a lot of rows resulting in a file (and thus - // a higher block range) deletion. if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() { index .retain(|_, block_range| block_range.start() < fixed_range.start()); @@ -1142,22 +1137,24 @@ impl StaticFileProvider { } } } else if segment.is_tx_based() { - // The unwinded file has no more transactions/receipts. However, the highest - // block is within this files' block range. We only retain - // entries with block ranges before the current one. if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() { index.retain(|_, block_range| block_range.start() < fixed_range.start()); } - // If the index is empty, just remove it. index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty()); } - // Update the cached provider. - debug!(target: "providers::static_file", ?segment, "Inserting updated jar into cache"); - self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?); + // Refresh the cached provider only when readers already have this range resident. + if should_refresh_cached_jar { + debug!(target: "providers::static_file", ?segment, ?fixed_range, "Refreshing cached jar"); + let jar = if let Some(jar) = loaded_jar { + jar + } else { + NippyJar::::load(&path).map_err(ProviderError::other)? + }; + self.map.insert(key, LoadedJar::new(jar)?); + } - // Delete any cached provider that no longer has an associated jar. debug!(target: "providers::static_file", ?segment, "Cleaning up jar map"); self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end())); } @@ -3020,11 +3017,15 @@ where mod tests { use std::collections::BTreeMap; + use alloy_consensus::Header; + use alloy_primitives::BlockHash; use reth_chain_state::EthPrimitives; use reth_db::test_utils::create_test_static_files_dir; use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment}; - use crate::{providers::StaticFileProvider, StaticFileProviderBuilder}; + use crate::{ + providers::StaticFileProvider, HeaderProvider, StaticFileProviderBuilder, StaticFileWriter, + }; #[test] fn test_find_fixed_range_with_block_index() -> eyre::Result<()> { @@ -3145,4 +3146,40 @@ mod tests { Ok(()) } + + #[test] + fn test_update_index_only_refreshes_cached_jar() -> eyre::Result<()> { + let (static_dir, _) = create_test_static_files_dir(); + let provider: StaticFileProvider = + StaticFileProviderBuilder::read_write(&static_dir).with_blocks_per_file(10).build()?; + + let mut writer = provider.latest_writer(StaticFileSegment::Headers)?; + let mut header = Header::default(); + for number in 0..=4 { + header.number = number; + writer.append_header(&header, &BlockHash::default())?; + } + writer.commit()?; + + assert!(provider.map.is_empty(), "uncached updates should stay lazy"); + + let cached = + provider.get_segment_provider_for_block(StaticFileSegment::Headers, 0, None)?; + assert_eq!(cached.header_by_number(4)?.unwrap().number, 4); + assert_eq!(provider.map.len(), 1, "reader access should populate the cache"); + drop(cached); + + for number in 5..=6 { + header.number = number; + writer.append_header(&header, &BlockHash::default())?; + } + writer.commit()?; + + assert_eq!(provider.map.len(), 1, "refreshing should reuse the existing cache slot"); + let refreshed = + provider.get_segment_provider_for_block(StaticFileSegment::Headers, 0, None)?; + assert_eq!(refreshed.header_by_number(6)?.unwrap().number, 6); + + Ok(()) + } } diff --git a/crates/storage/provider/src/providers/static_file/writer.rs b/crates/storage/provider/src/providers/static_file/writer.rs index b98569a6106..0052f0e373e 100644 --- a/crates/storage/provider/src/providers/static_file/writer.rs +++ b/crates/storage/provider/src/providers/static_file/writer.rs @@ -697,6 +697,7 @@ impl StaticFileProviderRW { /// Updates the `self.reader` internal index. fn update_index(&self) -> ProviderResult<()> { let segment = self.writer.user_header().segment(); + let segment_header = self.writer.user_header(); // We find the maximum block of the segment by checking this writer's last block. // @@ -723,7 +724,7 @@ impl StaticFileProviderRW { prev_path.exists().then_some(prev_block) }); - self.reader().update_index(segment, segment_max_block) + self.reader().update_index(segment, segment_max_block, Some(segment_header)) } /// Ensures that the writer is positioned at the specified block number.