Skip to content

Replace RowBinary serializer with clickhouse-crate inserter sink [EAP-584]#8063

Open
phacops wants to merge 10 commits into
masterfrom
claude/exciting-hawking-0ju4nd
Open

Replace RowBinary serializer with clickhouse-crate inserter sink [EAP-584]#8063
phacops wants to merge 10 commits into
masterfrom
claude/exciting-hawking-0ju4nd

Conversation

@phacops

@phacops phacops commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Overview

This PR replaces the custom RowBinary serialization pipeline for eap_items with a new EapItemsInserterSink that uses the official clickhouse crate's Inserter API. This eliminates the need for custom serialization code and improves memory efficiency by streaming rows directly to ClickHouse.

Key Changes

New Inserter Sink (inserter_sink.rs)

  • Implements a long-lived actor task that owns a clickhouse::Inserter<EAPItemRow> instance
  • Rows are serialized and written to the inserter's byte buffer immediately upon arrival, then dropped
  • Peak memory is bounded by the inserter's buffer size, not by row count
  • Flush boundaries are managed by the inserter's configured batch settings (max_rows, max_bytes, max_period)
  • Only successfully flushed rows have their Kafka offsets committed downstream
  • Failed flushes are never pushed, ensuring batch replay on restart (same durability as the old writer)

Processor Changes (eap_items.rs)

  • Renamed process_message_row_binary to process_message_eap_row for clarity
  • Returns TypedInsertBatch<EAPItemRow> instead of pre-encoded bytes
  • Removed inline RowBinary encoding; the inserter sink handles serialization
  • Includes source proto length as byte-size estimate for batch sizing

Type System (types.rs)

  • Added TypedInsertBatch<R> to represent rows that will be serialized downstream
  • Carries optional row, timestamps, metrics, and byte estimates
  • Supports skip semantics (row == None) for messages that don't produce inserts

Pipeline Wiring (factory_v2.rs, processor.rs)

  • Added make_rust_processor_typed to build pipelines for typed-row processors
  • When use_row_binary is enabled, eap_items now uses the inserter sink path instead of the Reduce + ClickhouseWriterStep pair
  • Assertion ensures RowBinary mode is only used with EAPItemsProcessor

Removed Code

  • Deleted custom rowbinary/ser.rs serializer module (replaced by clickhouse crate)
  • Removed InsertFormat enum and related writer step configuration
  • Removed custom UUID serialization adapter (using clickhouse::serde::uuid instead)

Dependencies

  • Added clickhouse crate (v0.15) with default features disabled
  • Updated Rust toolchain from 1.88.0 to 1.94.1

Benefits

  1. Reduced Memory Footprint: Rows are serialized and dropped immediately rather than accumulating in a batch
  2. Official Support: Uses the upstream clickhouse crate's battle-tested inserter instead of custom serialization
  3. Simplified Code: Eliminates ~400 lines of custom RowBinary serialization logic
  4. Better Durability: Explicit flush tracking ensures offsets are only committed after successful writes
  5. Schema Validation: The clickhouse crate validates rows against the table schema at serialization time

Testing

Existing unit tests for the RowBinary serializer are removed as that functionality is now provided by the upstream crate. The inserter sink's behavior is covered by integration with the processor pipeline and the existing Arroyo strategy test infrastructure.

https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf

Replace the hand-rolled RowBinary write path for eap_items (a vendored
RowBinary serializer + reqwest/native-LZ4 HTTP client) with the official
`clickhouse` crate's `Inserter`, picking up RowBinaryWithNamesAndTypes
schema validation, compression, and connection pooling.

The Rust consumer is a sentry_arroyo push-based pipeline, so the crate's
typed `Inserter` is wired into a new `EapItemsInserterSink` strategy:

- The eap_items processor now emits a typed `EAPItemRow`
  (`process_message_eap_row` → `BytesInsertBatch<Option<EAPItemRow>>`)
  instead of pre-encoded bytes.
- `EapItemsInserterSink` owns a long-lived `Inserter<EAPItemRow>` on a
  dedicated actor task. It writes each row on arrival (the wide struct is
  serialized into the inserter buffer and dropped immediately, so peak
  memory stays bounded by the buffer, not row count) and lets the inserter
  own the flush boundary via `with_max_rows`/`with_max_bytes` +
  `with_period`. On each `commit()` flush it pushes exactly the flushed
  rows' Kafka offsets downstream — the flush is the durability barrier, so
  a failed flush never advances offsets (batch replays).
- `EAPItemRow` derives `clickhouse::Row`; the hand-maintained `COLUMN_NAMES`
  and the vendored `rowbinary` serializer are removed (the crate generates
  the column list and RowBinaryWithNamesAndTypes tolerates field ordering).

The path is gated by the existing `use_row_binary` flag (EAPItemsProcessor
only); JSONEachRow remains the default and the rollback. The JSONEachRow
HTTP writer (`writer_v2`'s client + native-LZ4) stays for all other
storages and is simplified to JSON-only.

The crate's validation requires Rust 1.89+, so bump the pinned toolchain
to 1.94.1 (Dockerfile/CI install from rust-toolchain.toml); this surfaced
two newer clippy lints on existing code, fixed in passing.

Co-Authored-By: Claude Opus 4.8 <[email protected]>
Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
@phacops phacops requested a review from a team as a code owner June 18, 2026 12:27
@phacops phacops changed the title Replace RowBinary serializer with clickhouse-crate inserter sink Replace RowBinary serializer with clickhouse-crate inserter sink [EAP-584] Jun 18, 2026
@linear-code

linear-code Bot commented Jun 18, 2026

Copy link
Copy Markdown

EAP-584

Comment thread rust_snuba/src/strategies/clickhouse/inserter_sink.rs
Comment thread rust_snuba/src/strategies/clickhouse/inserter_sink.rs Outdated
Comment thread rust_snuba/src/strategies/clickhouse/inserter_sink.rs Outdated
Address PR review findings:

- Actor write-ahead past failure (Bugbot, high): on a write/commit error the
  actor previously rebuilt the inserter and kept consuming queued rows. If a
  later window flushed durably while the strategy had already failed the
  consumer on the first error, those rows landed in ClickHouse with their
  offsets uncommitted → duplicates on replay. The actor now fail-stops
  (surfaces the error and returns), matching the old writer's fail-stop +
  replay model; the in-flight window never flushed, so it replays cleanly.

- join() timeout doubling (Seer, medium): the shutdown wait loop could consume
  the full timeout and then pass the original timeout to next_step.join(),
  blocking up to 2x the caller's budget. Pass the remaining time instead.

Co-Authored-By: Claude Opus 4.8 <[email protected]>
Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
Comment thread rust_snuba/src/strategies/clickhouse/inserter_sink.rs
Comment thread rust_snuba/src/strategies/clickhouse/inserter_sink.rs
Comment thread rust_snuba/src/strategies/clickhouse/inserter_sink.rs Outdated
Re-add the resilience the old JSON writer had (RetryConfig: jittered
exponential backoff, 4 retries) to the clickhouse-crate inserter path,
without an unbounded memory cost:

- The actor retains the current unflushed window's rows (bounded by
  max_batch_size) until that window's flush succeeds.
- On a write/commit error the inserter is poisoned; the actor replays the
  retained window through a fresh inserter via flush_window_with_retry
  (write all rows + end()), with backoff, before failing. On success it
  resets the main inserter, emits the window's offsets, and continues.
- If retries are exhausted, it fail-stops (offsets never pushed → replay),
  matching the old fail-stop-after-retries model.

Peak memory is now one batch of typed rows plus the inserter's byte buffer
(the accepted tradeoff for retry), still bounded by max_batch_size, not by
total row count.

Also: clarify in submit() why an unreserved/closed channel is safe
(try_reserve rejects a closed channel; a dropped row's offset is never
committed, so it replays). The skip-only offset-advance guard is preserved
as `acc.rows.is_empty()` — it must not emit while real rows are buffered
but unflushed, since their offsets are lower than later skips and
committing past them would lose data.

Co-Authored-By: Claude Opus 4.8 <[email protected]>
Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
Comment thread rust_snuba/src/strategies/clickhouse/inserter_sink.rs
Address a PR review finding (Bugbot, high) about potentially committing
offsets past not-yet-durable rows. In our usage this can't happen: we
commit() after every single write(), and the crate flushes the entire
current INSERT (never a strict subset), so a flush always covers exactly
the retained window and emit_ready only advances offsets for durable rows.

Make that invariant explicit and enforced with a debug_assert that the
flushed row count equals acc.rows.len(), so a future change (e.g. batching
multiple writes before a commit) can't silently regress into committing
ahead.

Co-Authored-By: Claude Opus 4.8 <[email protected]>
Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
Comment thread rust_snuba/src/strategies/clickhouse/inserter_sink.rs
Address a PR review finding (Bugbot, medium): if join() hit its deadline
while the actor was still finalizing, a late FlushOutcome::Ready was never
drained, so rows could be durable in ClickHouse with offsets uncommitted →
duplicate on replay.

On timeout, abort the actor (cancelling an in-flight INSERT that hasn't
landed, so that window simply replays rather than committing rows we'll
never ack), then drain outcomes one final time so any Ready produced right
around the deadline still has its offsets pushed. A flush that already
landed server-side before the abort replays as a duplicate — the same
at-least-once shutdown exposure the old writer had, to be closed by the
(deferred) insert_deduplication_token.

Co-Authored-By: Claude Opus 4.8 <[email protected]>
Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit ab7d324. Configure here.

Comment thread rust_snuba/Cargo.toml
claude added 3 commits June 18, 2026 13:13
…y column

CI's live-ClickHouse test_rust job failed:

  Code: 117 ... Type of 'attributes_array' must be JSON(max_dynamic_paths=128),
  not JSON ... While executing BinaryRowInputFormat. (INCORRECT_DATA)

`eap_items.attributes_array` is a native JSON(max_dynamic_paths=N) column. Under
RowBinaryWithNamesAndTypes the server validates the client's names+types header,
and our `String` field can't satisfy a parametrized JSON column (no Rust type
maps to it). The old hand-rolled writer avoided this by sending plain RowBinary
+ input_format_binary_read_json_as_string=1 (write the JSON as a string, let
ClickHouse parse it into the JSON column).

Set Client::with_validation(false) so the crate uses plain RowBinary, matching
the old wire behavior. The crate still emits the column list (from the Row
derive) in the INSERT, so column mapping stays correct without a hand-rolled
list. Trade-off: this drops RowBinaryWithNamesAndTypes schema validation (it is
incompatible with the JSON column, not merely an optimization).

Co-Authored-By: Claude Opus 4.8 <[email protected]>
Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
Address a PR review finding (Bugbot, medium): the clickhouse crate was built
with default-features = false and no TLS backend, so build_client's https URL
(used when ClickhouseConfig.secure is true) had no TLS stack — row-binary
inserts to secure clusters would fail while the JSON path kept working.

Add the crate's native-tls feature. It matches the existing reqwest-based
writer's TLS (OpenSSL + system roots) and reuses openssl-sys, which reqwest
already pulls into the build, so no new system dependency is introduced. Builds
clean.

Co-Authored-By: Claude Opus 4.8 <[email protected]>
Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
…king-0ju4nd

Resolve conflict in rust_snuba/src/processors/eap_items.rs:
- Drop master's manual COLUMN_NAMES const; the clickhouse::Row derive now
  generates the column list from struct field order (covers the new
  session_id, ai_conversation_id, and attributes_array_* columns).
- Point session_id's serde adapter at clickhouse::serde::uuid (the vendored
  rowbinary module was removed in this branch).
- Update test_column_names_match_struct_layout to read the derived
  COLUMN_NAMES and assert the new 102-column layout.

Co-Authored-By: Claude Opus 4.8 <[email protected]>
Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
Comment on lines +559 to +568
Ok(FlushOutcome::Err(e)) => {
counter!(
"rust_consumer.clickhouse_insert_error",
1,
"status" => "insert_error",
"retried" => "false"
);
tracing::error!("ClickHouse inserter flush failed: {}", e);
return Err(StrategyError::Other(e.into()));
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: An unrecoverable insert error causes the rust_consumer.clickhouse_insert_error metric to be emitted twice, once in flush_window_with_retry and again in drain_outcomes, inflating the error count.
Severity: MEDIUM

Suggested Fix

Remove one of the two metric emissions. The most logical place to remove it is from flush_window_with_retry. The drain_outcomes function should be the single source of truth for emitting metrics based on final flush outcomes, whether success or failure. This centralizes metric emission and prevents double-counting.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: rust_snuba/src/strategies/clickhouse/inserter_sink.rs#L559-L568

Potential issue: When an insert to ClickHouse fails and all retries are exhausted, the
error metric `rust_consumer.clickhouse_insert_error` is emitted twice. The first
emission occurs in `flush_window_with_retry` before it returns an error. The caller then
sends this error to the `drain_outcomes` function, which processes the error and emits
the exact same metric a second time. This double-counting inflates the error rate by a
factor of two, which can trigger false alerts and distort monitoring dashboards, making
it difficult to assess the true health of the insert pipeline.

Also affects:

  • rust_snuba/src/strategies/clickhouse/inserter_sink.rs:266~272

…l failure

flush_window_with_retry already emits clickhouse_insert_error (retried=false)
when all retries are exhausted, then returns Err. The actor forwards that as
FlushOutcome::Err, and drain_outcomes was emitting the same metric with
identical tags a second time, inflating the error count 2x.

Drop the emission in drain_outcomes and keep it at the source in
flush_window_with_retry, where the per-attempt retried=true counter also
lives. This also makes the metric robust to the best-effort outcome send
(let _ = out_tx.send(...)): the count is recorded even if the receiver is
already gone during shutdown. drain_outcomes still logs and fail-stops.

Co-Authored-By: Claude Opus 4.8 <[email protected]>
Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
Comment on lines +137 to +142
debug_assert_eq!(
rows as usize,
acc.rows.len(),
"flush count must equal the retained window; committing more offsets \
than were made durable would lose data",
);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: A debug_assert_eq! in emit_ready is compiled away in release builds, risking silent data loss if the number of rows flushed to ClickHouse doesn't match the buffered row count.
Severity: HIGH

Suggested Fix

Replace the debug_assert_eq! with a standard assert_eq! or a runtime check that returns an error. This ensures the invariant is enforced in all builds, preventing the system from committing offsets for rows that were not successfully written.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: rust_snuba/src/strategies/clickhouse/inserter_sink.rs#L137-L142

Potential issue: The `emit_ready` function uses a `debug_assert_eq!` to verify that the
number of rows flushed to ClickHouse matches the number of rows buffered locally. This
assertion is compiled out in release builds. If ClickHouse reports a different number of
rows written (e.g., due to server-side deduplication or filtering) than what was sent,
the check will not trigger in production. This can occur in both the normal processing
loop and the error recovery path. This causes the system to commit offsets for rows that
were not durably stored, leading to silent data loss.

Also affects:

  • rust_snuba/src/strategies/clickhouse/inserter_sink.rs:440~456

emit_ready guarded "flushed rows == retained window length" with a
debug_assert!, which compiles out of release builds. The invariant protects
against committing Kafka offsets for rows that weren't made durable (silent
data loss), so gate it in all builds.

The crate's Quantities.rows is a client-side counter (pending.rows += 1 per
write()), not a server ack, so the invariant holds by construction today and
can't be tripped by ClickHouse-side dedup/filtering. The runtime check guards
against a future refactor (e.g. batching writes before a commit) silently
breaking it.

On mismatch, emit_ready now returns false after sending FlushOutcome::Err
(rather than panicking): a panic in the spawned actor would only disconnect
the channel and stall the consumer, whereas FlushOutcome::Err propagates as a
StrategyError out of poll() — the same loud fail-stop used for exhausted
insert retries. Callers stop the actor on false; the batch replays from the
last committed offset.

Co-Authored-By: Claude Opus 4.8 <[email protected]>
Claude-Session: https://claude.ai/code/session_011Bz3aya6Udg2sd9Sc9uuHf
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants