Skip to content

Commit b43c4ef

Browse files
authored
use sorted inserts to improve builder performance (#91553)
## What Optimizes AMQF (Approximate Membership Query Filter) construction in `StreamingSstWriter` by deferring filter building to `close()` time and using qfilter's sorted `Builder` API. ## Why The previous approach inserted each key hash into the AMQF filter eagerly during `add()`, using random-access `insert_fingerprint` calls. The new qfilter `Builder` API supports sequential sorted insertion which is significantly faster, but requires fingerprints in non-decreasing order. ## How 1. **Deferred construction**: Instead of building the AMQF incrementally during `add()`, collect key hashes (truncated to `u32`) into a vec during writes, then build the filter in one pass at `close()` time. 2. **Sorted Builder insertion**: Sort collected hashes by fingerprint value, then feed them to `Builder::insert_fingerprint` in order. This uses the Builder's optimized sorted-insert path. 3. **u32 storage**: Since fingerprint size is always ≤32 bits, store collected hashes as `u32` instead of `u64`, halving memory usage and improving sort cache behavior. 4. **Exact sizing**: The Builder is constructed with the exact entry count (known at `close()` time) rather than the `max_entry_count` estimate, producing optimally-sized filters. ## Benchmark results (vs `filter_ref` baseline) `write/key_8/value_4/` benchmark: | Entries | filter_ref | sorted_insert | Change | |---------|-----------|---------------|--------| | 85K | 22.3 ms | 21.1 ms | **-5.3%** | | 853K | 149.2 ms | 111.9 ms | **-25.0%** | | 8.3M | 1049 ms | 998.8 ms | **-4.8%** | The 853K case (typical compacted SST size) shows the largest improvement as AMQF construction is a significant fraction of total write time at that scale.
1 parent d3cbd5b commit b43c4ef

1 file changed

Lines changed: 25 additions & 18 deletions

File tree

turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -506,8 +506,9 @@ pub struct StreamingSstWriter<E: Entry> {
506506
// Reusable buffer for building key blocks
507507
key_buffer: Vec<u8>,
508508

509-
// AMQF filter (built incrementally). Wrapped in Option for the same reason as `file`.
510-
filter: Option<qfilter::Filter>,
509+
// Collected key hashes truncated to u32 for deferred AMQF construction via sorted Builder
510+
// in close(). Fingerprint size is always <32 bits, so the lower 32 bits suffice.
511+
collected_fingerprints: Vec<u32>,
511512

512513
// Index block data: (first_hash, block_index) for each key block written
513514
key_block_boundaries: Vec<(u64, u16)>,
@@ -536,13 +537,9 @@ pub struct StreamingSstWriter<E: Entry> {
536537
impl<E: Entry> StreamingSstWriter<E> {
537538
/// Creates a new streaming SST writer.
538539
///
539-
/// `max_entry_count` is used to size the AMQF filter. It must be an upper bound on the number
540-
/// of entries that will be added; the filter is not resizable. A slightly oversized value only
541-
/// improves the false-positive rate.
540+
/// `max_entry_count` is used to pre-allocate buffers and estimate block counts.
542541
pub fn new(file: &Path, flags: MetaEntryFlags, max_entry_count: u64) -> Result<Self> {
543542
let file = BufWriter::new(File::create(file)?);
544-
let filter = qfilter::Filter::new(max_entry_count.max(1), AMQF_FALSE_POSITIVE_RATE)
545-
.expect("Filter can't be constructed");
546543

547544
// Estimate number of key blocks based on max entry count.
548545
// Each key block holds up to MAX_KEY_BLOCK_ENTRIES entries.
@@ -569,7 +566,7 @@ impl<E: Entry> StreamingSstWriter<E> {
569566
MIN_SMALL_VALUE_BLOCK_SIZE + MAX_SMALL_VALUE_SIZE,
570567
),
571568
key_buffer: Vec::with_capacity(MAX_KEY_BLOCK_SIZE),
572-
filter: Some(filter),
569+
collected_fingerprints: Vec::with_capacity(max_entry_count as usize),
573570
key_block_boundaries: Vec::with_capacity(estimated_key_blocks),
574571
min_hash: u64::MAX,
575572
max_hash: 0,
@@ -627,12 +624,8 @@ impl<E: Entry> StreamingSstWriter<E> {
627624
self.max_hash = key_hash;
628625
self.entry_count += 1;
629626

630-
// Insert into AMQF
631-
self.filter
632-
.as_mut()
633-
.unwrap()
634-
.insert_fingerprint(false, key_hash)
635-
.expect("AMQF insert failed");
627+
// Collect hash for deferred AMQF construction in close()
628+
self.collected_fingerprints.push(key_hash as u32);
636629

637630
// Track key size for fullness and block capacity
638631
self.total_key_size += key_len;
@@ -950,10 +943,24 @@ impl<E: Entry> StreamingSstWriter<E> {
950943
.try_into()
951944
.expect("Block count overflow");
952945

953-
// Shrink the AMQF filter to the actual entry count. The filter was created with
954-
// `max_entry_count` which may be larger than the number of entries actually added.
955-
let mut filter = self.filter.take().unwrap();
956-
filter.shrink_to_fit();
946+
// Build AMQF from collected hashes using sorted Builder insertion.
947+
// Hashes are already sorted by key_hash (SST invariant), but fingerprints
948+
// (truncated hashes) may not be sorted, so we sort by `fingerprint & mask`.
949+
let actual_count = self.collected_fingerprints.len() as u64;
950+
let mut builder = qfilter::Builder::new(actual_count.max(1), AMQF_FALSE_POSITIVE_RATE)
951+
.expect("Filter can't be constructed");
952+
let fp_size = builder.fingerprint_size();
953+
assert!(fp_size < 32, "fp_size {fp_size} exceeds u32");
954+
let fp_mask = (1u32 << fp_size) - 1;
955+
// Mask in-place to fingerprint size and sort.
956+
self.collected_fingerprints
957+
.sort_unstable_by_key(|&h| h & fp_mask);
958+
for &h in &self.collected_fingerprints {
959+
builder
960+
.insert_fingerprint(false, h as u64)
961+
.expect("AMQF insert failed");
962+
}
963+
let filter = builder.into_filter();
957964

958965
// Serialize AMQF using postcard for zero-copy deserialization via FilterRef
959966
let amqf = postcard::to_allocvec(&filter).expect("AMQF serialization failed");

0 commit comments

Comments
 (0)