feat(connectors): add Apache Doris sink connector#3215
Open
ryankert01 wants to merge 5 commits into
Open
Conversation
a9f3652 to
b5434dd
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3215 +/- ##
=============================================
- Coverage 74.46% 51.74% -22.72%
Complexity 943 943
=============================================
Files 1188 1187 -1
Lines 106543 93393 -13150
Branches 83560 70428 -13132
=============================================
- Hits 79332 48325 -31007
- Misses 24459 42484 +18025
+ Partials 2752 2584 -168
🚀 New features to boost your workflow:
|
7e263c9 to
03ed46c
Compare
Sink connector that writes Iggy messages to Apache Doris via the HTTP
Stream Load API. v1 scope: JSON payloads only, HTTP Basic auth,
pre-created tables only (no DDL).
Behaviour:
- Manual 307/308 redirect following (capped at 5) so the Authorization
header survives the FE -> BE hop, which reqwest strips by default.
- Deterministic per-batch label
({prefix}-{stream}-{topic}-{partition}-{first_offset}-{last_offset})
so replays are deduplicated by Doris within label_keep_max_second.
- Response body Status field drives error classification: Success and
"Label Already Exists" -> Ok; Publish Timeout -> CannotStoreData
(transient); Fail or any unknown status -> PermanentHttpError so the
runtime DLQs the batch instead of looping.
- Optional columns / where / max_filter_ratio / batch_size / timeout
forwarded as Stream Load headers.
- Password held as secrecy::SecretString; auth header wrapped in
SecretString so Debug derivation never leaks the base64 credential.
- Client built in open() with InitError on failure; fe_url validated
there too so a bad config fails at startup rather than first batch.
Tests: 6 integration tests under core/integration/tests/connectors/doris
backed by an apache/doris all-in-one testcontainer (FE HTTP + FE MySQL).
Coverage includes happy path, 1k-row bulk, max_filter_ratio skip path,
label-replay dedupe, missing-target-table (proves no auto-create), and
the columns derived-expression header. The container must bind host:8040
1:1 because the FE 307-redirects to 127.0.0.1:8040; tests are serialized
via a 'doris' nextest test-group (max-threads = 1) so concurrent test
processes don't race for that port.
Addresses review feedback on the Doris sink connector before merge.
Correctness:
- Label format now appends an 8-hex blake3 of the *raw* stream/topic names,
so streams that sanitize identically (e.g. `events.v1` vs `events_v1`)
can no longer collide and silently dedupe against each other in Doris.
Each variable-length segment is also truncated; total label is bounded
under Doris's 128-char cap regardless of input length.
- `build_label` is now a pure `pub` free function. The integration test's
manual label construction (used to verify server-side dedupe) now calls
it directly, so the test cannot drift from the production format.
- `consume` tracks the *most severe* error across chunks via `record_error`:
permanent shadows transient. The previous first-error strategy let a
transient error from chunk N hide a permanent error from chunk M and
caused the runtime to retry forever instead of routing to DLQ.
- HTTP 408 (Request Timeout) and 429 (Too Many Requests) classified as
`CannotStoreData` (transient). They are 4xx but recoverable; the old
code lumped them with all 4xx and DLQ'd retryable conditions.
- Parse failures on the response body now return `PermanentHttpError`.
An unparseable 200-OK is almost always a Doris bug or proxy interference
— retrying the same bytes won't help.
Security:
- `open()` rejects `database`/`table` values outside `[A-Za-z0-9_]+`.
Doris would reject them server-side anyway, but rejecting at config-load
also prevents path traversal in the `/api/{db}/{table}/_stream_load` URL.
- `open()` emits a `warn!` when `fe_url` is `http://` and the host is
not loopback. README's new "Security notes" section spells out the
trust boundary the manual-redirect-following implies (a compromised FE
could exfiltrate credentials via a hostile `Location` header).
- Response body truncated to 4 KB at a UTF-8 boundary before being
formatted into errors or logs, so a misbehaving proxy that returns a
giant body cannot OOM the connector or flood logs.
Robustness:
- Explicit `connect_timeout` (5 s) so an unreachable FE fails fast
instead of consuming the full request timeout on the handshake alone.
- `send_stream_load` takes `bytes::Bytes`; clones inside the redirect
loop are now refcount bumps instead of full `Vec<u8>` copies.
Observability:
- `warn!` when Doris reports `number_filtered_rows > 0` — schema drift
in upstream messages was previously logged at `info!` and easy to miss.
- Per-batch success log demoted from `info!` to `debug!`.
- README documents `Expect: 100-continue`, `label_keep_max_second`
guidance, and the filtered-row alert.
Tests: 21 unit tests pass (was 13, added 8 covering hash-suffix label
collision resistance, label length cap, severity ordering, identifier
validation, and log truncation). All 6 testcontainer integration tests
pass against a real Doris all-in-one image.
03ed46c to
9fd85d6
Compare
…308 redirects Three small README gaps surfaced during a re-read against the post-review code: - `database` / `table` must match `[A-Za-z0-9_]+`. The connector rejects anything else at startup with `Error::InvalidConfigValue` — surface the constraint where operators look for it (Requirements + Configuration table). - Non-JSON payloads are dropped with `warn!` and the offset advances past them. That is silent data loss, not a recoverable skip, so the README now spells it out instead of glossing it as "skipped with a warning". - `308 Permanent Redirect` is followed in addition to `307` (defensive), and the redirect cap of 5 is documented.
Member
Author
|
There's a performance optimization(may or may not works) that I want to leave as a follow-up PR: |
Four pre-merge check failures from the previous commit, all mechanical: - typos: `unparseable` → `unparsable` (1 in README, 2 in lib.rs comments). - markdown lint MD013: README's label-format bullet was 583 chars; split into a parent bullet + 3 sub-bullets, all within the 500-char cap. - rustfmt: trailing blank line in the integration test after the recent removal of the local `sanitize` helper. - cargo sort: `iggy_connector_doris_sink` was added under `iggy_connector_sdk` in core/integration/Cargo.toml; reordered so the dependency list stays alphabetical. No behavior change. 21 unit tests still pass; `cargo fmt --check` and `cargo sort --workspace --check` both clean locally.
Contributor
|
we'll check this in upcoming 2-3 days. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #3112
Rationale
Adds an Apache Doris sink so Iggy streams can be written into Doris for analytical querying.
What changed?
Iggy had no path to land messages in Apache Doris. A new
iggy_connector_doris_sinkcrate consumes JSON payloads and writes them via Doris's HTTP Stream Load API (PUT /api/{db}/{table}/_stream_load).The non-obvious bits the connector handles: re-attaching
Authorizationacross the FE→BE 307 redirect (whichreqweststrips by default), parsing the JSONStatusbody to classify success /Label Already Exists/ transient (Publish Timeout, 5xx) / permanent (Fail, 4xx, unknown), and emitting a deterministic per-batch label so replays are deduplicated by Doris's label-keep window. v1 is sink-only, JSON-only, HTTP Basic auth only, and assumes pre-created tables — no DDL.Local Execution
AI Usage
quickwit_sink/influxdb_sink, testcontainer fixture, and iteration on the Stream Load redirect +Status-body classification.apache/doris:doris-all-in-one-2.1.0container, covering happy path, 1k-row bulk,max_filter_ratio, label-replay dedupe, missing-target-table, andcolumnsderived expressions; row state verified via the MySQL frontend. help write docs.