Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions crates/static-file/static-file/src/static_file_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,11 @@ where

self.provider.static_file_provider().commit()?;
for (segment, block_range) in segments {
self.provider
.static_file_provider()
.update_index(segment.segment(), Some(*block_range.end()))?;
self.provider.static_file_provider().update_index(
segment.segment(),
Some(*block_range.end()),
None,
)?;
}

let elapsed = start.elapsed(); // TODO(alexey): track in metrics
Expand Down
111 changes: 74 additions & 37 deletions crates/storage/provider/src/providers/static_file/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1036,10 +1036,15 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
/// Any entry higher than `segment_max_block` will be deleted from the previous structures.
///
/// If `segment_max_block` is None it means there's no static file for this segment.
///
/// If `segment_header` comes from the live writer for the indexed range, its in-memory state is
/// used for bookkeeping so the append path does not need to reopen the jar unless readers are
/// already caching that range.
pub fn update_index(
&self,
segment: StaticFileSegment,
segment_max_block: Option<BlockNumber>,
segment_header: Option<&SegmentHeader>,
) -> ProviderResult<()> {
debug!(
target: "providers::static_file",
Expand All @@ -1056,25 +1061,29 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
indexes.get(segment).map(|index| &index.expected_block_ranges_by_max_block),
segment_max_block,
);

let jar = NippyJar::<SegmentHeader>::load(
&self.path.join(segment.filename(&fixed_range)),
)
.map_err(ProviderError::other)?;
let key = (fixed_range.end(), segment);
let path = self.path.join(segment.filename(&fixed_range));
let should_refresh_cached_jar = self.map.contains_key(&key);
let mut loaded_jar = None;
let current_header = if let Some(segment_header) =
segment_header.filter(|header| header.expected_block_range() == fixed_range)
{
segment_header.clone()
} else {
let jar =
NippyJar::<SegmentHeader>::load(&path).map_err(ProviderError::other)?;
let header = jar.user_header().clone();
loaded_jar = Some(jar);
header
};

let index = indexes
.entry(segment)
.and_modify(|index| {
// Update max block
index.max_block = segment_max_block;

// Update expected block range index

// Remove all expected block ranges that are less than the new max block
index
.expected_block_ranges_by_max_block
.retain(|_, block_range| block_range.start() < fixed_range.start());
// Insert new expected block range
index
.expected_block_ranges_by_max_block
.insert(fixed_range.end(), fixed_range);
Expand Down Expand Up @@ -1104,10 +1113,8 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
// 2. Sync to block 100, this update sets min_block = [0..=100]
// 3. Pruner calls get_lowest_static_file_block() -> returns 100 (correct). Without
// this update, it would incorrectly return 0 (stale)
if let Some(current_block_range) = jar.user_header().block_range() {
if let Some(current_block_range) = current_header.block_range() {
if let Some(min_block_range) = index.min_block_range.as_mut() {
// delete_jar WILL ALWAYS re-initialize all indexes, so we are always
// sure that current_min is always the lowest.
if current_block_range.start() == min_block_range.start() {
*min_block_range = current_block_range;
}
Expand All @@ -1116,22 +1123,10 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
}
}

// Updates the tx index by first removing all entries which have a higher
// block_start than our current static file.
if let Some(tx_range) = jar.user_header().tx_range() {
// Current block range has the same block start as `fixed_range``, but block end
// might be different if we are still filling this static file.
if let Some(current_block_range) = jar.user_header().block_range() {
if let Some(tx_range) = current_header.tx_range() {
if let Some(current_block_range) = current_header.block_range() {
let tx_end = tx_range.end();

// Considering that `update_index` is called when we either append/truncate,
// we are sure that we are handling the latest data
// points.
//
// Here we remove every entry of the index that has a block start higher or
// equal than our current one. This is important in the case
// that we prune a lot of rows resulting in a file (and thus
// a higher block range) deletion.
if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
index
.retain(|_, block_range| block_range.start() < fixed_range.start());
Expand All @@ -1142,22 +1137,24 @@ impl<N: NodePrimitives> StaticFileProvider<N> {
}
}
} else if segment.is_tx_based() {
// The unwinded file has no more transactions/receipts. However, the highest
// block is within this files' block range. We only retain
// entries with block ranges before the current one.
if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
index.retain(|_, block_range| block_range.start() < fixed_range.start());
}

// If the index is empty, just remove it.
index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty());
}

// Update the cached provider.
debug!(target: "providers::static_file", ?segment, "Inserting updated jar into cache");
self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
// Refresh the cached provider only when readers already have this range resident.
if should_refresh_cached_jar {
debug!(target: "providers::static_file", ?segment, ?fixed_range, "Refreshing cached jar");
let jar = if let Some(jar) = loaded_jar {
jar
} else {
NippyJar::<SegmentHeader>::load(&path).map_err(ProviderError::other)?
};
self.map.insert(key, LoadedJar::new(jar)?);
}

// Delete any cached provider that no longer has an associated jar.
debug!(target: "providers::static_file", ?segment, "Cleaning up jar map");
self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
}
Expand Down Expand Up @@ -3020,11 +3017,15 @@ where
mod tests {
use std::collections::BTreeMap;

use alloy_consensus::Header;
use alloy_primitives::BlockHash;
use reth_chain_state::EthPrimitives;
use reth_db::test_utils::create_test_static_files_dir;
use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};

use crate::{providers::StaticFileProvider, StaticFileProviderBuilder};
use crate::{
providers::StaticFileProvider, HeaderProvider, StaticFileProviderBuilder, StaticFileWriter,
};

#[test]
fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
Expand Down Expand Up @@ -3145,4 +3146,40 @@ mod tests {

Ok(())
}

#[test]
fn test_update_index_only_refreshes_cached_jar() -> eyre::Result<()> {
let (static_dir, _) = create_test_static_files_dir();
let provider: StaticFileProvider<EthPrimitives> =
StaticFileProviderBuilder::read_write(&static_dir).with_blocks_per_file(10).build()?;

let mut writer = provider.latest_writer(StaticFileSegment::Headers)?;
let mut header = Header::default();
for number in 0..=4 {
header.number = number;
writer.append_header(&header, &BlockHash::default())?;
}
writer.commit()?;

assert!(provider.map.is_empty(), "uncached updates should stay lazy");

let cached =
provider.get_segment_provider_for_block(StaticFileSegment::Headers, 0, None)?;
assert_eq!(cached.header_by_number(4)?.unwrap().number, 4);
assert_eq!(provider.map.len(), 1, "reader access should populate the cache");
drop(cached);

for number in 5..=6 {
header.number = number;
writer.append_header(&header, &BlockHash::default())?;
}
writer.commit()?;

assert_eq!(provider.map.len(), 1, "refreshing should reuse the existing cache slot");
let refreshed =
provider.get_segment_provider_for_block(StaticFileSegment::Headers, 0, None)?;
assert_eq!(refreshed.header_by_number(6)?.unwrap().number, 6);

Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
/// Updates the `self.reader` internal index.
fn update_index(&self) -> ProviderResult<()> {
let segment = self.writer.user_header().segment();
let segment_header = self.writer.user_header();

// We find the maximum block of the segment by checking this writer's last block.
//
Expand All @@ -723,7 +724,7 @@ impl<N: NodePrimitives> StaticFileProviderRW<N> {
prev_path.exists().then_some(prev_block)
});

self.reader().update_index(segment, segment_max_block)
self.reader().update_index(segment, segment_max_block, Some(segment_header))
}

/// Ensures that the writer is positioned at the specified block number.
Expand Down
Loading