Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ stability label.
| `source/jetstream` | NATS JetStream Inlet over nats.go: pull consumer, ack/nak/term, MaxAckPending. | experimental |
| `source/redis` | Redis Streams Inlet over go-redis: consumer group, XACK/pending-claim, DLQ. | experimental |
| `source/cloudevents` | CloudEvents codec with structured and binary content modes. | experimental |
| `source/cdc` | Change-data-capture codec: decode Debezium/OpenCDC change events, drive by key. | experimental |
| `source/statemachine` | Bridge: an inbound message drives a transition, ack tied to the durable commit. | experimental |

source also ships composable reliability middleware as its own opt-in modules
Expand All @@ -83,8 +84,9 @@ snapshots; inspection; and JSON (de)serialization. It is backed by its `analysis
until it reaches v1. The `telemetry` interface and its `slog`, `otel`, and
`datadog` adapters are released. The `sink` egress seam and its destination
adapters, and the `source` ingress seam with its Kafka, JetStream, and Redis
Streams adapters, CloudEvents codec, reliability middleware, and state-machine
bridge, are now available and documented; the `broker` module is planned.
Streams adapters, CloudEvents and CDC codecs, reliability middleware, and
state-machine bridge, are now available and documented; the `broker` module is
planned.

## Roadmap: event-driven seams

Expand All @@ -99,7 +101,10 @@ and forcing nothing third-party on the consumer:
[Docs](https://stablekernel.github.io/crucible/sink/overview/).
- [x] **`source`**: ingress. Subscribe external streams and drive machines, with
the ack tied to a durable transition; the symmetric counterpart to `sink`.
[Docs](https://stablekernel.github.io/crucible/source/overview/).
[Docs](https://stablekernel.github.io/crucible/source/overview/). The
`source/cdc` codec decodes Debezium/OpenCDC change-event topics into typed
change events; a native database write-ahead-log connector (logical replication
slot, binlog) remains future work.
- [ ] **`bellows`** _(exploring)_: resilience seam. Circuit-breaking and
backpressure around the IO edges.

Expand Down
55 changes: 55 additions & 0 deletions docs/src/content/docs/source/codecs.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,58 @@ The decoded type then flows into your handler, and for the
[state-machine binding](/crucible/source/with-state/) it becomes the event the
router fires. A decode failure is a typed `*DecodeError` the engine classifies
as poison and routes to the [DLQ](/crucible/source/reliability/#dlq).

## cdc

`crucible/source/cdc` is the change-data-capture codec. It decodes the standard
Debezium JSON change-event envelope (also the de-facto OpenCDC normalized record
shape) into a typed `ChangeEvent`:

- an `Operation` (create, snapshot read, update, delete, or tombstone),
- the `before` and `after` row images, kept as deferred JSON so one codec serves
every table on a topic without binding to a row type at decode time,
- a decoded `Source` metadata block (connector, database, schema, table,
snapshot marker, log position, transaction id), and
- the commit `Timestamp` the connector reported.

Recover the value with `DecodeEvent`, project a row image into a concrete type
with `BeforeAs[T]` / `AfterAs[T]`, and read the source metadata as typed
`source.Headers` through `SourceHeaders`. A log-compaction tombstone (an empty
payload) decodes to an `OpTombstone` event rather than a decode failure, so a
handler routes it (a delete-and-forget for the key) or skips it. A malformed
envelope is a typed `*DecodeError` the engine classifies as poison.

### Scope: envelope plus topic pattern, not a native connector

This codec covers the change-event **envelope** and the pattern for driving a
statechart from a change-event **topic**. The intended shape is to let an
existing connector (Debezium, or any producer emitting the same envelope) write
row changes to a topic, consume that topic through a backend inlet such as
[`source/kafka`](/crucible/source/inlets/), decode each message with this codec,
and drive a statechart per primary key through the
[state-machine binding](/crucible/source/with-state/). Because the codec is
instance-scoped, a service can run a CDC consumer alongside a plain-JSON or
CloudEvents consumer with no shared decode state.

A native database write-ahead-log connector (reading a Postgres logical
replication slot or a MySQL binlog directly, without a broker in between) is
deliberately out of scope and tracked as future work. The codec gives you the
typed change event; the connector that produces those events stays a separate,
operational concern.

```go
// Decode a Debezium change event, then route its after-image into a transition.
registry := source.NewRegistry().SetDefault(cdc.New())

router := func(m source.Message) (Key, Event, error) {
change, err := cdc.DecodeEvent(registry, m)
if err != nil {
return zeroKey, zeroEvent, err
}
row, err := cdc.AfterAs[Row](change)
if err != nil {
return zeroKey, zeroEvent, err
}
return keyOf(row), eventOf(change.Operation, row), nil
}
```
Loading
Loading