Skip to content

Commit 3c66d50

Browse files
ampampcode-com
andcommitted
perf(provider): lazily refresh static file jar cache
Pass the writer's live segment header into static file index updates so the steady-state append path can reuse in-memory range metadata instead of reloading the jar from disk on every commit. Only reopen and remap a jar when that segment and range are already cached by a reader, while preserving the disk-backed fallback for callers that update indexes without a live writer header. Add a regression test that covers lazy uncached updates and cache refresh for an active reader. Amp-Thread-ID: https://ampcode.com/threads/T-019df228-9b31-759c-9b1d-a426d6bb6808 Co-authored-by: Amp <[email protected]>
1 parent 8e70260 commit 3c66d50

1 file changed

Lines changed: 25 additions & 52 deletions

File tree

  • crates/storage/provider/src/providers/static_file

crates/storage/provider/src/providers/static_file/manager.rs

Lines changed: 25 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,8 +1070,7 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
10701070
{
10711071
segment_header.clone()
10721072
} else {
1073-
let jar =
1074-
NippyJar::<SegmentHeader>::load(&path).map_err(ProviderError::other)?;
1073+
let jar = NippyJar::<SegmentHeader>::load(&path).map_err(ProviderError::other)?;
10751074
let header = jar.user_header().clone();
10761075
loaded_jar = Some(jar);
10771076
header
@@ -1080,16 +1079,10 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
10801079
let index = indexes
10811080
.entry(segment)
10821081
.and_modify(|index| {
1083-
// Update max block
10841082
index.max_block = segment_max_block;
1085-
1086-
// Update expected block range index
1087-
1088-
// Remove all expected block ranges that are less than the new max block
10891083
index
10901084
.expected_block_ranges_by_max_block
10911085
.retain(|_, block_range| block_range.start() < fixed_range.start());
1092-
// Insert new expected block range
10931086
index
10941087
.expected_block_ranges_by_max_block
10951088
.insert(fixed_range.end(), fixed_range);
@@ -1121,8 +1114,6 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
11211114
// this update, it would incorrectly return 0 (stale)
11221115
if let Some(current_block_range) = current_header.block_range() {
11231116
if let Some(min_block_range) = index.min_block_range.as_mut() {
1124-
// delete_jar WILL ALWAYS re-initialize all indexes, so we are always
1125-
// sure that current_min is always the lowest.
11261117
if current_block_range.start() == min_block_range.start() {
11271118
*min_block_range = current_block_range;
11281119
}
@@ -1131,22 +1122,10 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
11311122
}
11321123
}
11331124

1134-
// Updates the tx index by first removing all entries which have a higher
1135-
// block_start than our current static file.
11361125
if let Some(tx_range) = current_header.tx_range() {
1137-
// Current block range has the same block start as `fixed_range``, but block end
1138-
// might be different if we are still filling this static file.
11391126
if let Some(current_block_range) = current_header.block_range() {
11401127
let tx_end = tx_range.end();
11411128

1142-
// Considering that `update_index` is called when we either append/truncate,
1143-
// we are sure that we are handling the latest data
1144-
// points.
1145-
//
1146-
// Here we remove every entry of the index that has a block start higher or
1147-
// equal than our current one. This is important in the case
1148-
// that we prune a lot of rows resulting in a file (and thus
1149-
// a higher block range) deletion.
11501129
if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
11511130
index
11521131
.retain(|_, block_range| block_range.start() < fixed_range.start());
@@ -1157,14 +1136,10 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
11571136
}
11581137
}
11591138
} else if segment.is_tx_based() {
1160-
// The unwinded file has no more transactions/receipts. However, the highest
1161-
// block is within this files' block range. We only retain
1162-
// entries with block ranges before the current one.
11631139
if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
11641140
index.retain(|_, block_range| block_range.start() < fixed_range.start());
11651141
}
11661142

1167-
// If the index is empty, just remove it.
11681143
index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty());
11691144
}
11701145

@@ -1179,7 +1154,6 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
11791154
self.map.insert(key, LoadedJar::new(jar)?);
11801155
}
11811156

1182-
// Delete any cached provider that no longer has an associated jar.
11831157
debug!(target: "providers::static_file", ?segment, "Cleaning up jar map");
11841158
self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
11851159
}
@@ -3048,7 +3022,9 @@ mod tests {
30483022
use reth_db::test_utils::create_test_static_files_dir;
30493023
use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};
30503024

3051-
use crate::{providers::StaticFileProvider, StaticFileProviderBuilder, StaticFileWriter};
3025+
use crate::{
3026+
providers::StaticFileProvider, HeaderProvider, StaticFileProviderBuilder, StaticFileWriter,
3027+
};
30523028

30533029
#[test]
30543030
fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
@@ -3170,40 +3146,37 @@ mod tests {
31703146
Ok(())
31713147
}
31723148

3173-
#[test]
3174-
fn update_index_only_refreshes_cached_jars() -> eyre::Result<()> {
3149+
fn test_update_index_only_refreshes_cached_jar() -> eyre::Result<()> {
31753150
let (static_dir, _) = create_test_static_files_dir();
3176-
let sf_rw: StaticFileProvider<EthPrimitives> =
3151+
let provider: StaticFileProvider<EthPrimitives> =
31773152
StaticFileProviderBuilder::read_write(&static_dir).with_blocks_per_file(10).build()?;
31783153

3179-
{
3180-
let mut writer = sf_rw.latest_writer(StaticFileSegment::Headers)?;
3181-
let mut header = Header::default();
3182-
for number in 0..=4 {
3183-
header.number = number;
3184-
writer.append_header(&header, &BlockHash::default())?;
3185-
}
3186-
writer.commit()?;
3154+
let mut writer = provider.latest_writer(StaticFileSegment::Headers)?;
3155+
let mut header = Header::default();
3156+
for number in 0..=4 {
3157+
header.number = number;
3158+
writer.append_header(&header, &BlockHash::default())?;
31873159
}
3160+
writer.commit()?;
31883161

3189-
assert!(sf_rw.map.is_empty(), "uncached writes should not populate the jar cache");
3162+
assert!(provider.map.is_empty(), "uncached updates should stay lazy");
31903163

3191-
{
3192-
let _provider =
3193-
sf_rw.get_segment_provider_for_block(StaticFileSegment::Headers, 0, None)?;
3194-
}
3195-
assert!(sf_rw.map.contains_key(&(9, StaticFileSegment::Headers)));
3164+
let cached =
3165+
provider.get_segment_provider_for_block(StaticFileSegment::Headers, 0, None)?;
3166+
assert_eq!(cached.header_by_number(4)?.unwrap().number, 4);
3167+
assert_eq!(provider.map.len(), 1, "reader access should populate the cache");
3168+
drop(cached);
31963169

3197-
{
3198-
let mut writer = sf_rw.latest_writer(StaticFileSegment::Headers)?;
3199-
let mut header = Header::default();
3200-
header.number = 5;
3170+
for number in 5..=6 {
3171+
header.number = number;
32013172
writer.append_header(&header, &BlockHash::default())?;
3202-
writer.commit()?;
32033173
}
3174+
writer.commit()?;
32043175

3205-
let cached = sf_rw.map.get(&(9, StaticFileSegment::Headers)).expect("cached jar exists");
3206-
assert_eq!(cached.user_header().block_range(), Some(SegmentRangeInclusive::new(0, 5)));
3176+
assert_eq!(provider.map.len(), 1, "refreshing should reuse the existing cache slot");
3177+
let refreshed =
3178+
provider.get_segment_provider_for_block(StaticFileSegment::Headers, 0, None)?;
3179+
assert_eq!(refreshed.header_by_number(6)?.unwrap().number, 6);
32073180

32083181
Ok(())
32093182
}

0 commit comments

Comments
 (0)