Skip to content

Commit e30a2c3

Browse files
ampampcode-com
andcommitted
perf(provider): lazily refresh cached static-file jars
Pass the live segment header from static-file writers into provider index updates so append and truncate bookkeeping can use in-memory block and transaction ranges without reopening the jar. Only reload and remap a static-file jar when that segment/range is already cached by readers, and keep uncached ranges lazy. Add a regression test covering both the uncached fast path and cached refresh behavior. Amp-Thread-ID: https://ampcode.com/threads/T-019df204-bb69-724c-a1e4-fb1180d49bec Co-authored-by: Amp <[email protected]>
1 parent ddb3819 commit e30a2c3

3 files changed

Lines changed: 81 additions & 16 deletions

File tree

crates/static-file/static-file/src/static_file_producer.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,11 @@ where
152152

153153
self.provider.static_file_provider().commit()?;
154154
for (segment, block_range) in segments {
155-
self.provider
156-
.static_file_provider()
157-
.update_index(segment.segment(), Some(*block_range.end()))?;
155+
self.provider.static_file_provider().update_index(
156+
segment.segment(),
157+
Some(*block_range.end()),
158+
None,
159+
)?;
158160
}
159161

160162
let elapsed = start.elapsed(); // TODO(alexey): track in metrics

crates/storage/provider/src/providers/static_file/manager.rs

Lines changed: 74 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,10 +1036,15 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
10361036
/// Any entry higher than `segment_max_block` will be deleted from the previous structures.
10371037
///
10381038
/// If `segment_max_block` is None it means there's no static file for this segment.
1039+
///
1040+
/// If `segment_header` comes from the live writer for the indexed range, its in-memory state is
1041+
/// used for bookkeeping so the append path does not need to reopen the jar unless readers are
1042+
/// already caching that range.
10391043
pub fn update_index(
10401044
&self,
10411045
segment: StaticFileSegment,
10421046
segment_max_block: Option<BlockNumber>,
1047+
segment_header: Option<&SegmentHeader>,
10431048
) -> ProviderResult<()> {
10441049
debug!(
10451050
target: "providers::static_file",
@@ -1056,11 +1061,21 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
10561061
indexes.get(segment).map(|index| &index.expected_block_ranges_by_max_block),
10571062
segment_max_block,
10581063
);
1059-
1060-
let jar = NippyJar::<SegmentHeader>::load(
1061-
&self.path.join(segment.filename(&fixed_range)),
1062-
)
1063-
.map_err(ProviderError::other)?;
1064+
let key = (fixed_range.end(), segment);
1065+
let path = self.path.join(segment.filename(&fixed_range));
1066+
let should_refresh_cached_jar = self.map.contains_key(&key);
1067+
let mut loaded_jar = None;
1068+
let current_header = if let Some(segment_header) =
1069+
segment_header.filter(|header| header.expected_block_range() == fixed_range)
1070+
{
1071+
segment_header.clone()
1072+
} else {
1073+
let jar =
1074+
NippyJar::<SegmentHeader>::load(&path).map_err(ProviderError::other)?;
1075+
let header = jar.user_header().clone();
1076+
loaded_jar = Some(jar);
1077+
header
1078+
};
10641079

10651080
let index = indexes
10661081
.entry(segment)
@@ -1104,7 +1119,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
11041119
// 2. Sync to block 100, this update sets min_block = [0..=100]
11051120
// 3. Pruner calls get_lowest_static_file_block() -> returns 100 (correct). Without
11061121
// this update, it would incorrectly return 0 (stale)
1107-
if let Some(current_block_range) = jar.user_header().block_range() {
1122+
if let Some(current_block_range) = current_header.block_range() {
11081123
if let Some(min_block_range) = index.min_block_range.as_mut() {
11091124
// delete_jar WILL ALWAYS re-initialize all indexes, so we are always
11101125
// sure that current_min is always the lowest.
@@ -1118,10 +1133,10 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
11181133

11191134
// Updates the tx index by first removing all entries which have a higher
11201135
// block_start than our current static file.
1121-
if let Some(tx_range) = jar.user_header().tx_range() {
1136+
if let Some(tx_range) = current_header.tx_range() {
11221137
// Current block range has the same block start as `fixed_range``, but block end
11231138
// might be different if we are still filling this static file.
1124-
if let Some(current_block_range) = jar.user_header().block_range() {
1139+
if let Some(current_block_range) = current_header.block_range() {
11251140
let tx_end = tx_range.end();
11261141

11271142
// Considering that `update_index` is called when we either append/truncate,
@@ -1153,9 +1168,16 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
11531168
index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty());
11541169
}
11551170

1156-
// Update the cached provider.
1157-
debug!(target: "providers::static_file", ?segment, "Inserting updated jar into cache");
1158-
self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
1171+
// Refresh the cached provider only when readers already have this range resident.
1172+
if should_refresh_cached_jar {
1173+
debug!(target: "providers::static_file", ?segment, ?fixed_range, "Refreshing cached jar");
1174+
let jar = if let Some(jar) = loaded_jar {
1175+
jar
1176+
} else {
1177+
NippyJar::<SegmentHeader>::load(&path).map_err(ProviderError::other)?
1178+
};
1179+
self.map.insert(key, LoadedJar::new(jar)?);
1180+
}
11591181

11601182
// Delete any cached provider that no longer has an associated jar.
11611183
debug!(target: "providers::static_file", ?segment, "Cleaning up jar map");
@@ -3020,11 +3042,13 @@ where
30203042
mod tests {
30213043
use std::collections::BTreeMap;
30223044

3045+
use alloy_consensus::Header;
3046+
use alloy_primitives::BlockHash;
30233047
use reth_chain_state::EthPrimitives;
30243048
use reth_db::test_utils::create_test_static_files_dir;
30253049
use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};
30263050

3027-
use crate::{providers::StaticFileProvider, StaticFileProviderBuilder};
3051+
use crate::{providers::StaticFileProvider, StaticFileProviderBuilder, StaticFileWriter};
30283052

30293053
#[test]
30303054
fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
@@ -3145,4 +3169,42 @@ mod tests {
31453169

31463170
Ok(())
31473171
}
3172+
3173+
#[test]
3174+
fn update_index_only_refreshes_cached_jars() -> eyre::Result<()> {
3175+
let (static_dir, _) = create_test_static_files_dir();
3176+
let sf_rw: StaticFileProvider<EthPrimitives> =
3177+
StaticFileProviderBuilder::read_write(&static_dir).with_blocks_per_file(10).build()?;
3178+
3179+
{
3180+
let mut writer = sf_rw.latest_writer(StaticFileSegment::Headers)?;
3181+
let mut header = Header::default();
3182+
for number in 0..=4 {
3183+
header.number = number;
3184+
writer.append_header(&header, &BlockHash::default())?;
3185+
}
3186+
writer.commit()?;
3187+
}
3188+
3189+
assert!(sf_rw.map.is_empty(), "uncached writes should not populate the jar cache");
3190+
3191+
{
3192+
let _provider =
3193+
sf_rw.get_segment_provider_for_block(StaticFileSegment::Headers, 0, None)?;
3194+
}
3195+
assert!(sf_rw.map.contains_key(&(9, StaticFileSegment::Headers)));
3196+
3197+
{
3198+
let mut writer = sf_rw.latest_writer(StaticFileSegment::Headers)?;
3199+
let mut header = Header::default();
3200+
header.number = 5;
3201+
writer.append_header(&header, &BlockHash::default())?;
3202+
writer.commit()?;
3203+
}
3204+
3205+
let cached = sf_rw.map.get(&(9, StaticFileSegment::Headers)).expect("cached jar exists");
3206+
assert_eq!(cached.user_header().block_range(), Some(SegmentRangeInclusive::new(0, 5)));
3207+
3208+
Ok(())
3209+
}
31483210
}

crates/storage/provider/src/providers/static_file/writer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
697697
/// Updates the `self.reader` internal index.
698698
fn update_index(&self) -> ProviderResult<()> {
699699
let segment = self.writer.user_header().segment();
700+
let segment_header = self.writer.user_header();
700701

701702
// We find the maximum block of the segment by checking this writer's last block.
702703
//
@@ -723,7 +724,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
723724
prev_path.exists().then_some(prev_block)
724725
});
725726

726-
self.reader().update_index(segment, segment_max_block)
727+
self.reader().update_index(segment, segment_max_block, Some(segment_header))
727728
}
728729

729730
/// Ensures that the writer is positioned at the specified block number.

0 commit comments

Comments
 (0)