Skip to content

Commit 64a5a9c

Browse files
ampampcode-com
andcommitted
perf(provider): lazily refresh static-file jar cache
Pass the live SegmentHeader into static-file index updates so append-side bookkeeping can reuse the writer's in-memory state instead of reopening the jar for every write. This keeps the steady-state save_blocks path from doing redundant jar loads and mmaps when only the provider indexes need to advance. Only rebuild the cached LoadedJar when that segment range is already resident in the reader cache, and leave uncached ranges to be loaded lazily on demand. Update the static-file producer call site for the new API and cover the lazy-refresh behavior with a provider test. Amp-Thread-ID: https://ampcode.com/threads/T-019df23e-aac0-71af-8498-f77b1613ea22 Co-authored-by: Amp <[email protected]>
1 parent d2b4ab5 commit 64a5a9c

3 files changed

Lines changed: 81 additions & 41 deletions

File tree

crates/static-file/static-file/src/static_file_producer.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,11 @@ where
152152

153153
self.provider.static_file_provider().commit()?;
154154
for (segment, block_range) in segments {
155-
self.provider
156-
.static_file_provider()
157-
.update_index(segment.segment(), Some(*block_range.end()))?;
155+
self.provider.static_file_provider().update_index(
156+
segment.segment(),
157+
Some(*block_range.end()),
158+
None,
159+
)?;
158160
}
159161

160162
let elapsed = start.elapsed(); // TODO(alexey): track in metrics

crates/storage/provider/src/providers/static_file/manager.rs

Lines changed: 74 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,10 +1036,15 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
10361036
/// Any entry higher than `segment_max_block` will be deleted from the previous structures.
10371037
///
10381038
/// If `segment_max_block` is None it means there's no static file for this segment.
1039+
///
1040+
/// If `segment_header` comes from the live writer for the indexed range, its in-memory state is
1041+
/// used for bookkeeping so the append path does not need to reopen the jar unless readers are
1042+
/// already caching that range.
10391043
pub fn update_index(
10401044
&self,
10411045
segment: StaticFileSegment,
10421046
segment_max_block: Option<BlockNumber>,
1047+
segment_header: Option<&SegmentHeader>,
10431048
) -> ProviderResult<()> {
10441049
debug!(
10451050
target: "providers::static_file",
@@ -1056,25 +1061,29 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
10561061
indexes.get(segment).map(|index| &index.expected_block_ranges_by_max_block),
10571062
segment_max_block,
10581063
);
1059-
1060-
let jar = NippyJar::<SegmentHeader>::load(
1061-
&self.path.join(segment.filename(&fixed_range)),
1062-
)
1063-
.map_err(ProviderError::other)?;
1064+
let key = (fixed_range.end(), segment);
1065+
let path = self.path.join(segment.filename(&fixed_range));
1066+
let should_refresh_cached_jar = self.map.contains_key(&key);
1067+
let mut loaded_jar = None;
1068+
let current_header = if let Some(segment_header) =
1069+
segment_header.filter(|header| header.expected_block_range() == fixed_range)
1070+
{
1071+
segment_header.clone()
1072+
} else {
1073+
let jar =
1074+
NippyJar::<SegmentHeader>::load(&path).map_err(ProviderError::other)?;
1075+
let header = jar.user_header().clone();
1076+
loaded_jar = Some(jar);
1077+
header
1078+
};
10641079

10651080
let index = indexes
10661081
.entry(segment)
10671082
.and_modify(|index| {
1068-
// Update max block
10691083
index.max_block = segment_max_block;
1070-
1071-
// Update expected block range index
1072-
1073-
// Remove all expected block ranges that are less than the new max block
10741084
index
10751085
.expected_block_ranges_by_max_block
10761086
.retain(|_, block_range| block_range.start() < fixed_range.start());
1077-
// Insert new expected block range
10781087
index
10791088
.expected_block_ranges_by_max_block
10801089
.insert(fixed_range.end(), fixed_range);
@@ -1104,10 +1113,8 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
11041113
// 2. Sync to block 100, this update sets min_block = [0..=100]
11051114
// 3. Pruner calls get_lowest_static_file_block() -> returns 100 (correct). Without
11061115
// this update, it would incorrectly return 0 (stale)
1107-
if let Some(current_block_range) = jar.user_header().block_range() {
1116+
if let Some(current_block_range) = current_header.block_range() {
11081117
if let Some(min_block_range) = index.min_block_range.as_mut() {
1109-
// delete_jar WILL ALWAYS re-initialize all indexes, so we are always
1110-
// sure that current_min is always the lowest.
11111118
if current_block_range.start() == min_block_range.start() {
11121119
*min_block_range = current_block_range;
11131120
}
@@ -1116,22 +1123,10 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
11161123
}
11171124
}
11181125

1119-
// Updates the tx index by first removing all entries which have a higher
1120-
// block_start than our current static file.
1121-
if let Some(tx_range) = jar.user_header().tx_range() {
1122-
// Current block range has the same block start as `fixed_range``, but block end
1123-
// might be different if we are still filling this static file.
1124-
if let Some(current_block_range) = jar.user_header().block_range() {
1126+
if let Some(tx_range) = current_header.tx_range() {
1127+
if let Some(current_block_range) = current_header.block_range() {
11251128
let tx_end = tx_range.end();
11261129

1127-
// Considering that `update_index` is called when we either append/truncate,
1128-
// we are sure that we are handling the latest data
1129-
// points.
1130-
//
1131-
// Here we remove every entry of the index that has a block start higher or
1132-
// equal than our current one. This is important in the case
1133-
// that we prune a lot of rows resulting in a file (and thus
1134-
// a higher block range) deletion.
11351130
if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
11361131
index
11371132
.retain(|_, block_range| block_range.start() < fixed_range.start());
@@ -1142,22 +1137,24 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
11421137
}
11431138
}
11441139
} else if segment.is_tx_based() {
1145-
// The unwinded file has no more transactions/receipts. However, the highest
1146-
// block is within this files' block range. We only retain
1147-
// entries with block ranges before the current one.
11481140
if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
11491141
index.retain(|_, block_range| block_range.start() < fixed_range.start());
11501142
}
11511143

1152-
// If the index is empty, just remove it.
11531144
index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty());
11541145
}
11551146

1156-
// Update the cached provider.
1157-
debug!(target: "providers::static_file", ?segment, "Inserting updated jar into cache");
1158-
self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
1147+
// Refresh the cached provider only when readers already have this range resident.
1148+
if should_refresh_cached_jar {
1149+
debug!(target: "providers::static_file", ?segment, ?fixed_range, "Refreshing cached jar");
1150+
let jar = if let Some(jar) = loaded_jar {
1151+
jar
1152+
} else {
1153+
NippyJar::<SegmentHeader>::load(&path).map_err(ProviderError::other)?
1154+
};
1155+
self.map.insert(key, LoadedJar::new(jar)?);
1156+
}
11591157

1160-
// Delete any cached provider that no longer has an associated jar.
11611158
debug!(target: "providers::static_file", ?segment, "Cleaning up jar map");
11621159
self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
11631160
}
@@ -3020,11 +3017,15 @@ where
30203017
mod tests {
30213018
use std::collections::BTreeMap;
30223019

3020+
use alloy_consensus::Header;
3021+
use alloy_primitives::BlockHash;
30233022
use reth_chain_state::EthPrimitives;
30243023
use reth_db::test_utils::create_test_static_files_dir;
30253024
use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};
30263025

3027-
use crate::{providers::StaticFileProvider, StaticFileProviderBuilder};
3026+
use crate::{
3027+
providers::StaticFileProvider, HeaderProvider, StaticFileProviderBuilder, StaticFileWriter,
3028+
};
30283029

30293030
#[test]
30303031
fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
@@ -3145,4 +3146,40 @@ mod tests {
31453146

31463147
Ok(())
31473148
}
3149+
3150+
#[test]
3151+
fn test_update_index_only_refreshes_cached_jar() -> eyre::Result<()> {
3152+
let (static_dir, _) = create_test_static_files_dir();
3153+
let provider: StaticFileProvider<EthPrimitives> =
3154+
StaticFileProviderBuilder::read_write(&static_dir).with_blocks_per_file(10).build()?;
3155+
3156+
let mut writer = provider.latest_writer(StaticFileSegment::Headers)?;
3157+
let mut header = Header::default();
3158+
for number in 0..=4 {
3159+
header.number = number;
3160+
writer.append_header(&header, &BlockHash::default())?;
3161+
}
3162+
writer.commit()?;
3163+
3164+
assert!(provider.map.is_empty(), "uncached updates should stay lazy");
3165+
3166+
let cached =
3167+
provider.get_segment_provider_for_block(StaticFileSegment::Headers, 0, None)?;
3168+
assert_eq!(cached.header_by_number(4)?.unwrap().number, 4);
3169+
assert_eq!(provider.map.len(), 1, "reader access should populate the cache");
3170+
drop(cached);
3171+
3172+
for number in 5..=6 {
3173+
header.number = number;
3174+
writer.append_header(&header, &BlockHash::default())?;
3175+
}
3176+
writer.commit()?;
3177+
3178+
assert_eq!(provider.map.len(), 1, "refreshing should reuse the existing cache slot");
3179+
let refreshed =
3180+
provider.get_segment_provider_for_block(StaticFileSegment::Headers, 0, None)?;
3181+
assert_eq!(refreshed.header_by_number(6)?.unwrap().number, 6);
3182+
3183+
Ok(())
3184+
}
31483185
}

crates/storage/provider/src/providers/static_file/writer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
697697
/// Updates the `self.reader` internal index.
698698
fn update_index(&self) -> ProviderResult<()> {
699699
let segment = self.writer.user_header().segment();
700+
let segment_header = self.writer.user_header();
700701

701702
// We find the maximum block of the segment by checking this writer's last block.
702703
//
@@ -723,7 +724,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
723724
prev_path.exists().then_some(prev_block)
724725
});
725726

726-
self.reader().update_index(segment, segment_max_block)
727+
self.reader().update_index(segment, segment_max_block, Some(segment_header))
727728
}
728729

729730
/// Ensures that the writer is positioned at the specified block number.

0 commit comments

Comments
 (0)