diff --git a/README.md b/README.md index c4f66f3..4d76f10 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,7 @@ stability label. | `source/jetstream` | NATS JetStream Inlet over nats.go: pull consumer, ack/nak/term, MaxAckPending. | experimental | | `source/redis` | Redis Streams Inlet over go-redis: consumer group, XACK/pending-claim, DLQ. | experimental | | `source/cloudevents` | CloudEvents codec with structured and binary content modes. | experimental | +| `source/cdc` | Change-data-capture codec: decode Debezium/OpenCDC change events, drive by key. | experimental | | `source/statemachine` | Bridge: an inbound message drives a transition, ack tied to the durable commit. | experimental | source also ships composable reliability middleware as its own opt-in modules @@ -83,8 +84,9 @@ snapshots; inspection; and JSON (de)serialization. It is backed by its `analysis until it reaches v1. The `telemetry` interface and its `slog`, `otel`, and `datadog` adapters are released. The `sink` egress seam and its destination adapters, and the `source` ingress seam with its Kafka, JetStream, and Redis -Streams adapters, CloudEvents codec, reliability middleware, and state-machine -bridge, are now available and documented; the `broker` module is planned. +Streams adapters, CloudEvents and CDC codecs, reliability middleware, and +state-machine bridge, are now available and documented; the `broker` module is +planned. ## Roadmap: event-driven seams @@ -99,7 +101,10 @@ and forcing nothing third-party on the consumer: [Docs](https://stablekernel.github.io/crucible/sink/overview/). - [x] **`source`**: ingress. Subscribe external streams and drive machines, with the ack tied to a durable transition; the symmetric counterpart to `sink`. - [Docs](https://stablekernel.github.io/crucible/source/overview/). + [Docs](https://stablekernel.github.io/crucible/source/overview/). The + `source/cdc` codec decodes Debezium/OpenCDC change-event topics into typed + change events; a native database write-ahead-log connector (logical replication + slot, binlog) remains future work. - [ ] **`bellows`** _(exploring)_: resilience seam. Circuit-breaking and backpressure around the IO edges. diff --git a/docs/src/content/docs/source/codecs.md b/docs/src/content/docs/source/codecs.md index b007057..4b9d776 100644 --- a/docs/src/content/docs/source/codecs.md +++ b/docs/src/content/docs/source/codecs.md @@ -46,3 +46,58 @@ The decoded type then flows into your handler, and for the [state-machine binding](/crucible/source/with-state/) it becomes the event the router fires. A decode failure is a typed `*DecodeError` the engine classifies as poison and routes to the [DLQ](/crucible/source/reliability/#dlq). + +## cdc + +`crucible/source/cdc` is the change-data-capture codec. It decodes the standard +Debezium JSON change-event envelope (also the de-facto OpenCDC normalized record +shape) into a typed `ChangeEvent`: + +- an `Operation` (create, snapshot read, update, delete, or tombstone), +- the `before` and `after` row images, kept as deferred JSON so one codec serves + every table on a topic without binding to a row type at decode time, +- a decoded `Source` metadata block (connector, database, schema, table, + snapshot marker, log position, transaction id), and +- the commit `Timestamp` the connector reported. + +Recover the value with `DecodeEvent`, project a row image into a concrete type +with `BeforeAs[T]` / `AfterAs[T]`, and read the source metadata as typed +`source.Headers` through `SourceHeaders`. A log-compaction tombstone (an empty +payload) decodes to an `OpTombstone` event rather than a decode failure, so a +handler routes it (a delete-and-forget for the key) or skips it. A malformed +envelope is a typed `*DecodeError` the engine classifies as poison. + +### Scope: envelope plus topic pattern, not a native connector + +This codec covers the change-event **envelope** and the pattern for driving a +statechart from a change-event **topic**. The intended shape is to let an +existing connector (Debezium, or any producer emitting the same envelope) write +row changes to a topic, consume that topic through a backend inlet such as +[`source/kafka`](/crucible/source/inlets/), decode each message with this codec, +and drive a statechart per primary key through the +[state-machine binding](/crucible/source/with-state/). Because the codec is +instance-scoped, a service can run a CDC consumer alongside a plain-JSON or +CloudEvents consumer with no shared decode state. + +A native database write-ahead-log connector (reading a Postgres logical +replication slot or a MySQL binlog directly, without a broker in between) is +deliberately out of scope and tracked as future work. The codec gives you the +typed change event; the connector that produces those events stays a separate, +operational concern. + +```go +// Decode a Debezium change event, then route its after-image into a transition. +registry := source.NewRegistry().SetDefault(cdc.New()) + +router := func(m source.Message) (Key, Event, error) { + change, err := cdc.DecodeEvent(registry, m) + if err != nil { + return zeroKey, zeroEvent, err + } + row, err := cdc.AfterAs[Row](change) + if err != nil { + return zeroKey, zeroEvent, err + } + return keyOf(row), eventOf(change.Operation, row), nil +} +``` diff --git a/source/cdc/cdc.go b/source/cdc/cdc.go new file mode 100644 index 0000000..73f44af --- /dev/null +++ b/source/cdc/cdc.go @@ -0,0 +1,369 @@ +// SPDX-License-Identifier: Apache-2.0 + +// Package cdc is a source codec that decodes change-data-capture (CDC) events +// into a typed [ChangeEvent]. It plugs into a [source.Registry] as an +// instance-scoped [source.Codec]: construct one with [New] and register it +// under the content types your CDC topic carries, or set it as the registry +// default. There is no package-global registration; every codec is constructed +// and injected, never shared by import. +// +// # Scope +// +// This codec handles the change-event envelope only: the standard Debezium JSON +// shape (an "op" of c/r/u/d, "before"/"after" row images, a "source" metadata +// block, and a "ts_ms" commit timestamp), which is also the de-facto OpenCDC +// normalized record shape. It decodes one row-change message into a +// [ChangeEvent] whose row images stay as deferred JSON ([RawJSON]) so a handler +// recovers a concrete row type only when it needs one, via [BeforeAs] / [AfterAs]. +// +// A native database write-ahead-log connector (reading a Postgres logical +// replication slot, a MySQL binlog, and so on) is deliberately out of scope and +// tracked as future work. The intended pattern is to let an existing CDC +// connector (Debezium, or any producer emitting the same envelope) write change +// events to a topic, consume that topic through a backend inlet such as +// source/kafka, decode each message with this codec, and drive a statechart per +// primary key through source/statemachine. +// +// # Decoded value +// +// Decode yields a [ChangeEvent] (by value). Recover it from a registry result +// with [EventOf] or the one-call [DecodeEvent]; project its row images into a +// concrete type with [BeforeAs] / [AfterAs]. Useful fields from the source +// metadata block are surfaced as core [source.Headers] (see [SourceHeaders]) so +// a handler reads them through the same typed-header surface as any other +// inbound metadata instead of a magic-string map. +// +// # Tombstones +// +// A Kafka log-compaction tombstone (an empty payload) decodes to a [ChangeEvent] +// whose Operation is [OpTombstone] and whose row images are both absent. It is a +// valid, retryable-free outcome, not a decode failure: a handler routes it (a +// delete-and-forget for the key) or skips it. +// +// # Stability +// +// Experimental (pre-v1); the API may change until the suite locks v1.0.0. +package cdc + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/stablekernel/crucible/source" +) + +// DebeziumJSONContentType is the media type a Debezium JSON converter stamps on +// change-event messages. Register the [Codec] under this type to decode a +// Debezium topic, or set the codec as the registry default when the topic +// carries no content type. +const DebeziumJSONContentType = "application/vnd.debezium.cdc+json" + +// Sentinel errors a malformed envelope reports. Each is wrapped by the codec +// with context and, through the [source.Registry], in a [*source.DecodeError] +// that reports [source.ErrPoison]: a structurally invalid change event cannot be +// retried into validity. Match them with errors.Is. +var ( + // ErrMalformedEnvelope reports a payload that is not a JSON object or whose + // top-level shape is not a change-event envelope. + ErrMalformedEnvelope = errors.New("cdc: malformed change-event envelope") + // ErrUnknownOperation reports an "op" value the codec does not recognize. + ErrUnknownOperation = errors.New("cdc: unknown operation") + // ErrMissingImage reports a typed-image projection ([BeforeAs] / [AfterAs]) + // for a row image the operation does not carry (a "before" image on a create, + // say). + ErrMissingImage = errors.New("cdc: row image absent") +) + +// Operation is the kind of row change a [ChangeEvent] carries, mirroring the +// Debezium "op" field. +type Operation uint8 + +const ( + // OpUnknown is the zero value: an envelope whose operation has not been set. + OpUnknown Operation = iota + // OpCreate is an insert ("c"): only an after-image is present. + OpCreate + // OpRead is a snapshot read ("r"): a row captured during the connector's + // initial snapshot, carrying an after-image and no before-image. + OpRead + // OpUpdate is an update ("u"): both before- and after-images are present + // when the connector is configured to capture them. + OpUpdate + // OpDelete is a delete ("d"): only a before-image is present; the after-image + // is null. + OpDelete + // OpTombstone is a log-compaction tombstone: an empty message that follows a + // delete on a compacted topic. It carries no images. + OpTombstone +) + +// String renders the operation as its Debezium op code ("c", "r", "u", "d"), +// "tombstone", or "unknown" for diagnostics and headers. +func (o Operation) String() string { + switch o { + case OpCreate: + return "c" + case OpRead: + return "r" + case OpUpdate: + return "u" + case OpDelete: + return "d" + case OpTombstone: + return "tombstone" + default: + return "unknown" + } +} + +// operationFromCode maps a Debezium "op" code to an [Operation], reporting +// whether the code was recognized. +func operationFromCode(code string) (Operation, bool) { + switch code { + case "c": + return OpCreate, true + case "r": + return OpRead, true + case "u": + return OpUpdate, true + case "d": + return OpDelete, true + default: + return OpUnknown, false + } +} + +// RawJSON is a deferred row image: the raw JSON bytes of a "before" or "after" +// row, decoded into a concrete type on demand via [BeforeAs] / [AfterAs] (or +// directly with [RawJSON.As]). It is nil when the image is absent. Keeping the +// image deferred lets one codec serve every table on a topic without binding to +// a row type at decode time. +type RawJSON []byte + +// Present reports whether the row image is set (non-nil). A create has no +// before-image and a delete has no after-image, so a handler checks Present +// before projecting. +func (r RawJSON) Present() bool { return r != nil } + +// As decodes the deferred row image into out. It returns [ErrMissingImage] when +// the image is absent (nil), and the json.Unmarshal error when the bytes do not +// fit out's shape. +func (r RawJSON) As(out any) error { + if r == nil { + return ErrMissingImage + } + if err := json.Unmarshal(r, out); err != nil { + return fmt.Errorf("cdc: decode row image: %w", err) + } + return nil +} + +// SourceMetadata is the decoded "source" block of a change event: the connector +// metadata Debezium attaches to every record. Fields absent from a given +// connector's payload stay at their zero value. The full block is retained as +// [SourceMetadata.Raw] for fields not surfaced here. +type SourceMetadata struct { + // Connector is the Debezium connector name ("postgresql", "mysql", ...). + Connector string + // Name is the logical server / database-server name configured on the + // connector. + Name string + // Database is the source database the change came from. + Database string + // Schema is the source schema (Postgres) the table lives in. + Schema string + // Table is the source table the row belongs to. + Table string + // Snapshot reports whether the record was captured during the connector's + // initial snapshot (the Debezium "snapshot" marker is truthy). + Snapshot bool + // LSN is the source log sequence number / position, as a string to span the + // per-connector representations (a Postgres LSN, a MySQL binlog coordinate). + LSN string + // TxID is the source transaction identifier, when the connector reports one. + TxID string + // Raw is the undecoded source block, for fields not surfaced above. + Raw RawJSON +} + +// ChangeEvent is a decoded CDC envelope: one row change with its before/after +// images, source metadata, and commit timestamp. The row images are deferred +// ([RawJSON]); project them with [BeforeAs] / [AfterAs] (or +// [ChangeEvent.Before] / [ChangeEvent.After] directly). +type ChangeEvent struct { + // Operation is the kind of change (create, read/snapshot, update, delete, or + // tombstone). + Operation Operation + // Before is the row image prior to the change ([RawJSON]); absent (nil) on a + // create, a snapshot read, and a tombstone. + Before RawJSON + // After is the row image after the change ([RawJSON]); absent (nil) on a + // delete and a tombstone. + After RawJSON + // Source is the decoded connector metadata block. + Source SourceMetadata + // Timestamp is the commit time the connector reported ("ts_ms"), in UTC; the + // zero time when the envelope carried none. + Timestamp time.Time +} + +// envelope is the wire shape of a Debezium JSON change event. Images and the +// source block stay as json.RawMessage so the codec defers row decoding and +// surfaces only the metadata it understands. +type envelope struct { + Op string `json:"op"` + Before json.RawMessage `json:"before"` + After json.RawMessage `json:"after"` + Source json.RawMessage `json:"source"` + TsMS *int64 `json:"ts_ms"` +} + +// sourceBlock is the subset of the Debezium "source" block the codec surfaces. +// snapshot is decoded permissively (a bool or a string such as "true"/"last") +// across connector versions. +type sourceBlock struct { + Connector string `json:"connector"` + Name string `json:"name"` + Database string `json:"db"` + Schema string `json:"schema"` + Table string `json:"table"` + Snapshot json.RawMessage `json:"snapshot"` + LSN json.RawMessage `json:"lsn"` + TxID json.RawMessage `json:"txId"` +} + +// Codec decodes a [source.Message] carrying a Debezium/OpenCDC JSON change-event +// envelope into a [ChangeEvent]. It holds no mutable state and is safe for +// concurrent use; the [source.Hopper] decodes from per-lane worker goroutines. +// +// Construct it with [New] and register it on a [source.Registry] (or set it as +// the default). It is the instance seam: there is no package-global +// registration. +type Codec struct{} + +var _ source.Codec = Codec{} + +// New returns a [Codec]. It is a constructor for symmetry with the rest of the +// suite and to leave room for future options; the zero value is equally valid +// since the codec carries no state. +func New() Codec { return Codec{} } + +// Decode turns a message's bytes into a [ChangeEvent]. An empty payload is a +// log-compaction tombstone and decodes to a [ChangeEvent] with [OpTombstone]. +// A non-empty payload must be a Debezium JSON envelope; a payload that is not a +// JSON object, or whose "op" is unrecognized, returns an error the +// [source.Registry] wraps in a [*source.DecodeError] (which reports +// [source.ErrPoison]). +// +// Headers are not consulted: the change-event shape is self-describing in the +// body. Recover the value with [EventOf] / [DecodeEvent] and project its row +// images with [BeforeAs] / [AfterAs]. +func (Codec) Decode(data []byte, _ source.Headers) (any, error) { + if len(data) == 0 { + return ChangeEvent{Operation: OpTombstone}, nil + } + + var env envelope + if err := json.Unmarshal(data, &env); err != nil { + return ChangeEvent{}, fmt.Errorf("%w: %v", ErrMalformedEnvelope, err) + } + + op, ok := operationFromCode(env.Op) + if !ok { + return ChangeEvent{}, fmt.Errorf("%w: %q", ErrUnknownOperation, env.Op) + } + + ev := ChangeEvent{ + Operation: op, + Before: rawImage(env.Before), + After: rawImage(env.After), + } + if env.TsMS != nil { + ev.Timestamp = time.UnixMilli(*env.TsMS).UTC() + } + if len(env.Source) > 0 && !isJSONNull(env.Source) { + meta, err := decodeSource(env.Source) + if err != nil { + return ChangeEvent{}, err + } + ev.Source = meta + } + return ev, nil +} + +// decodeSource parses the source metadata block into a [SourceMetadata], +// retaining the raw block and normalizing the permissive snapshot/lsn/txId +// fields. +func decodeSource(raw json.RawMessage) (SourceMetadata, error) { + var sb sourceBlock + if err := json.Unmarshal(raw, &sb); err != nil { + return SourceMetadata{}, fmt.Errorf("%w: source block: %v", ErrMalformedEnvelope, err) + } + return SourceMetadata{ + Connector: sb.Connector, + Name: sb.Name, + Database: sb.Database, + Schema: sb.Schema, + Table: sb.Table, + Snapshot: truthySnapshot(sb.Snapshot), + LSN: scalarString(sb.LSN), + TxID: scalarString(sb.TxID), + Raw: rawImage(raw), + }, nil +} + +// rawImage converts a json.RawMessage into a [RawJSON], returning nil for an +// absent or JSON-null image so [RawJSON.Present] reads correctly. +func rawImage(m json.RawMessage) RawJSON { + if len(m) == 0 || isJSONNull(m) { + return nil + } + out := make(RawJSON, len(m)) + copy(out, m) + return out +} + +// isJSONNull reports whether the raw message is the JSON null literal (allowing +// surrounding space). +func isJSONNull(m json.RawMessage) bool { + return bytes.Equal(bytes.TrimSpace(m), []byte("null")) +} + +// truthySnapshot decodes the permissive Debezium snapshot marker: a JSON bool, +// or a string such as "true", "last", "incremental" (anything but "false"/"" is +// truthy). An absent marker is false. +func truthySnapshot(m json.RawMessage) bool { + if len(m) == 0 || isJSONNull(m) { + return false + } + var b bool + if err := json.Unmarshal(m, &b); err == nil { + return b + } + var s string + if err := json.Unmarshal(m, &s); err == nil { + return s != "" && s != "false" + } + return false +} + +// scalarString renders a permissive scalar (a JSON string or number) as a +// string, for LSN/TxID fields whose representation varies by connector. An +// absent or null value yields "". +func scalarString(m json.RawMessage) string { + if len(m) == 0 || isJSONNull(m) { + return "" + } + var s string + if err := json.Unmarshal(m, &s); err == nil { + return s + } + var n json.Number + if err := json.Unmarshal(m, &n); err == nil { + return n.String() + } + return "" +} diff --git a/source/cdc/cdc_test.go b/source/cdc/cdc_test.go new file mode 100644 index 0000000..9e383c3 --- /dev/null +++ b/source/cdc/cdc_test.go @@ -0,0 +1,352 @@ +// SPDX-License-Identifier: Apache-2.0 + +package cdc_test + +import ( + "errors" + "testing" + "time" + + "github.com/stablekernel/crucible/source" + "github.com/stablekernel/crucible/source/cdc" +) + +// row is the concrete shape the test projects before/after images into. +type row struct { + ID int64 `json:"id"` + Name string `json:"name"` + Email string `json:"email"` +} + +func TestCodec_Decode_Operations(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + payload string + wantOp cdc.Operation + wantOpStr string + hasBefore bool + hasAfter bool + wantTable string + wantSnap bool + wantTS time.Time + }{ + { + name: "create has after only", + payload: `{"op":"c","before":null, + "after":{"id":1,"name":"ada","email":"ada@x.io"}, + "source":{"connector":"postgresql","db":"shop","schema":"public","table":"users","lsn":42,"txId":"900"}, + "ts_ms":1700000000000}`, + wantOp: cdc.OpCreate, + wantOpStr: "c", + hasAfter: true, + wantTable: "users", + wantTS: time.UnixMilli(1700000000000).UTC(), + }, + { + name: "snapshot read marks snapshot", + payload: `{"op":"r", + "after":{"id":2,"name":"grace","email":"grace@x.io"}, + "source":{"connector":"mysql","db":"shop","table":"users","snapshot":"true"}, + "ts_ms":1700000001000}`, + wantOp: cdc.OpRead, + wantOpStr: "r", + hasAfter: true, + wantTable: "users", + wantSnap: true, + wantTS: time.UnixMilli(1700000001000).UTC(), + }, + { + name: "update has before and after", + payload: `{"op":"u", + "before":{"id":1,"name":"ada","email":"ada@x.io"}, + "after":{"id":1,"name":"ada lovelace","email":"ada@x.io"}, + "source":{"connector":"postgresql","db":"shop","schema":"public","table":"users","snapshot":false}, + "ts_ms":1700000002000}`, + wantOp: cdc.OpUpdate, + wantOpStr: "u", + hasBefore: true, + hasAfter: true, + wantTable: "users", + wantTS: time.UnixMilli(1700000002000).UTC(), + }, + { + name: "delete has before only", + payload: `{"op":"d", + "before":{"id":1,"name":"ada","email":"ada@x.io"}, + "after":null, + "source":{"connector":"postgresql","table":"users"}, + "ts_ms":1700000003000}`, + wantOp: cdc.OpDelete, + wantOpStr: "d", + hasBefore: true, + wantTable: "users", + wantTS: time.UnixMilli(1700000003000).UTC(), + }, + { + name: "no ts_ms yields zero time", + payload: `{"op":"c","after":{"id":3,"name":"x","email":"x@x.io"}}`, + wantOp: cdc.OpCreate, + wantOpStr: "c", + hasAfter: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ev := decode(t, []byte(tt.payload)) + + if ev.Operation != tt.wantOp { + t.Fatalf("operation = %v, want %v", ev.Operation, tt.wantOp) + } + if got := ev.Operation.String(); got != tt.wantOpStr { + t.Errorf("operation string = %q, want %q", got, tt.wantOpStr) + } + if ev.Before.Present() != tt.hasBefore { + t.Errorf("before present = %v, want %v", ev.Before.Present(), tt.hasBefore) + } + if ev.After.Present() != tt.hasAfter { + t.Errorf("after present = %v, want %v", ev.After.Present(), tt.hasAfter) + } + if ev.Source.Table != tt.wantTable { + t.Errorf("table = %q, want %q", ev.Source.Table, tt.wantTable) + } + if ev.Source.Snapshot != tt.wantSnap { + t.Errorf("snapshot = %v, want %v", ev.Source.Snapshot, tt.wantSnap) + } + if !ev.Timestamp.Equal(tt.wantTS) { + t.Errorf("timestamp = %v, want %v", ev.Timestamp, tt.wantTS) + } + }) + } +} + +func TestCodec_Decode_TypedImages(t *testing.T) { + t.Parallel() + + ev := decode(t, []byte(`{"op":"u", + "before":{"id":1,"name":"ada","email":"ada@x.io"}, + "after":{"id":1,"name":"ada lovelace","email":"ada@x.io"}, + "source":{"table":"users"}}`)) + + before, err := cdc.BeforeAs[row](ev) + if err != nil { + t.Fatalf("BeforeAs: %v", err) + } + if before.Name != "ada" { + t.Errorf("before.Name = %q, want %q", before.Name, "ada") + } + + after, err := cdc.AfterAs[row](ev) + if err != nil { + t.Fatalf("AfterAs: %v", err) + } + if after.Name != "ada lovelace" { + t.Errorf("after.Name = %q, want %q", after.Name, "ada lovelace") + } +} + +func TestCodec_Decode_MissingImage(t *testing.T) { + t.Parallel() + + // A create has no before-image: projecting it reports ErrMissingImage. + ev := decode(t, []byte(`{"op":"c","after":{"id":1,"name":"ada","email":"ada@x.io"}}`)) + + if _, err := cdc.BeforeAs[row](ev); !errors.Is(err, cdc.ErrMissingImage) { + t.Fatalf("BeforeAs error = %v, want ErrMissingImage", err) + } + // A delete has no after-image. + del := decode(t, []byte(`{"op":"d","before":{"id":1,"name":"ada","email":"ada@x.io"}}`)) + if _, err := cdc.AfterAs[row](del); !errors.Is(err, cdc.ErrMissingImage) { + t.Fatalf("AfterAs error = %v, want ErrMissingImage", err) + } +} + +func TestCodec_Decode_Tombstone(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + payload []byte + }{ + {name: "empty slice", payload: []byte{}}, + {name: "nil slice", payload: nil}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ev := decode(t, tt.payload) + if ev.Operation != cdc.OpTombstone { + t.Fatalf("operation = %v, want OpTombstone", ev.Operation) + } + if ev.Before.Present() || ev.After.Present() { + t.Errorf("tombstone carried an image: before=%v after=%v", + ev.Before.Present(), ev.After.Present()) + } + if got := ev.Operation.String(); got != "tombstone" { + t.Errorf("operation string = %q, want %q", got, "tombstone") + } + }) + } +} + +func TestCodec_Decode_Malformed(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + payload string + wantErr error + }{ + {name: "not json", payload: `}{`, wantErr: cdc.ErrMalformedEnvelope}, + {name: "json array", payload: `[1,2,3]`, wantErr: cdc.ErrMalformedEnvelope}, + {name: "unknown op", payload: `{"op":"z","after":{}}`, wantErr: cdc.ErrUnknownOperation}, + {name: "empty op", payload: `{"after":{}}`, wantErr: cdc.ErrUnknownOperation}, + { + name: "bad source block", + payload: `{"op":"c","after":{},"source":"not-an-object"}`, + wantErr: cdc.ErrMalformedEnvelope, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + codec := cdc.New() + _, err := codec.Decode([]byte(tt.payload), nil) + if err == nil { + t.Fatalf("Decode(%q) = nil error, want %v", tt.payload, tt.wantErr) + } + if !errors.Is(err, tt.wantErr) { + t.Fatalf("Decode(%q) error = %v, want errors.Is %v", tt.payload, err, tt.wantErr) + } + }) + } +} + +func TestRegistry_Decode_MalformedIsPoison(t *testing.T) { + t.Parallel() + + registry := source.NewRegistry().SetDefault(cdc.New()) + msg := changeMessage{value: []byte(`}{`)} + + _, err := registry.Decode(msg) + if err == nil { + t.Fatal("Decode = nil error, want *source.DecodeError") + } + var decErr *source.DecodeError + if !errors.As(err, &decErr) { + t.Fatalf("error = %v, want *source.DecodeError", err) + } + if !errors.Is(err, source.ErrPoison) { + t.Errorf("error does not report ErrPoison: %v", err) + } + if !errors.Is(err, cdc.ErrMalformedEnvelope) { + t.Errorf("error does not unwrap to ErrMalformedEnvelope: %v", err) + } +} + +func TestDecodeEvent_WrongTypeIsPoison(t *testing.T) { + t.Parallel() + + // A registry default that decodes to a non-ChangeEvent value: DecodeEvent + // must report poison rather than a panic. + registry := source.NewRegistry().SetDefault(source.CodecFunc( + func([]byte, source.Headers) (any, error) { return 42, nil }, + )) + _, err := cdc.DecodeEvent(registry, changeMessage{value: []byte(`{}`)}) + if !errors.Is(err, source.ErrPoison) { + t.Fatalf("DecodeEvent error = %v, want ErrPoison", err) + } +} + +func TestSourceHeaders(t *testing.T) { + t.Parallel() + + ev := decode(t, []byte(`{"op":"r", + "after":{"id":1,"name":"ada","email":"ada@x.io"}, + "source":{"connector":"postgresql","db":"shop","schema":"public","table":"users","snapshot":"last","lsn":42,"txId":"900"}}`)) + + h := cdc.SourceHeaders(ev) + + want := map[string]string{ + cdc.OperationHeader: "r", + cdc.ConnectorHeader: "postgresql", + cdc.DatabaseHeader: "shop", + cdc.SchemaHeader: "public", + cdc.TableHeader: "users", + cdc.SnapshotHeader: "true", + cdc.LSNHeader: "42", + cdc.TxIDHeader: "900", + } + for key, wantVal := range want { + got, ok := h.Get(key) + if !ok { + t.Errorf("header %q absent", key) + continue + } + if got != wantVal { + t.Errorf("header %q = %q, want %q", key, got, wantVal) + } + } +} + +func TestSourceHeaders_OmitsEmpty(t *testing.T) { + t.Parallel() + + // A non-snapshot create with a sparse source block: only operation and the + // present fields appear; no blank snapshot header. + ev := decode(t, []byte(`{"op":"c","after":{"id":1},"source":{"table":"users"}}`)) + h := cdc.SourceHeaders(ev) + + if _, ok := h.Get(cdc.SnapshotHeader); ok { + t.Error("snapshot header present for a non-snapshot record") + } + if _, ok := h.Get(cdc.ConnectorHeader); ok { + t.Error("connector header present for an absent connector") + } + if got, _ := h.Get(cdc.TableHeader); got != "users" { + t.Errorf("table header = %q, want %q", got, "users") + } +} + +func TestOperation_String_Unknown(t *testing.T) { + t.Parallel() + + if got := cdc.OpUnknown.String(); got != "unknown" { + t.Errorf("OpUnknown.String() = %q, want %q", got, "unknown") + } +} + +// decode is a helper that runs the codec and fails the test on error. +func decode(t *testing.T, payload []byte) cdc.ChangeEvent { + t.Helper() + v, err := cdc.New().Decode(payload, nil) + if err != nil { + t.Fatalf("Decode: %v", err) + } + ev, ok := cdc.EventOf(v) + if !ok { + t.Fatalf("decoded value is not a ChangeEvent: %T", v) + } + return ev +} + +// changeMessage is a minimal source.Message for registry-level tests. +type changeMessage struct { + value []byte +} + +func (m changeMessage) Key() []byte { return nil } +func (m changeMessage) Value() []byte { return m.value } +func (m changeMessage) Headers() source.Headers { return nil } +func (m changeMessage) Subject() string { return "shop.public.users" } +func (m changeMessage) PartitionKey() string { return "" } +func (m changeMessage) Cursor() source.Cursor { return nil } +func (m changeMessage) As(any) bool { return false } diff --git a/source/cdc/consume_test.go b/source/cdc/consume_test.go new file mode 100644 index 0000000..70742ef --- /dev/null +++ b/source/cdc/consume_test.go @@ -0,0 +1,69 @@ +// SPDX-License-Identifier: Apache-2.0 + +package cdc_test + +import ( + "context" + "testing" + + "github.com/stablekernel/crucible/source" + "github.com/stablekernel/crucible/source/cdc" + "github.com/stablekernel/crucible/source/memsource" +) + +// TestConsume_DebeziumTopic exercises the documented pattern end-to-end with no +// broker: a memsource inlet stands in for a Debezium topic, a Hopper drives a +// handler that decodes each change event with the codec, and the ledger +// confirms every message was acked. A tombstone is skipped rather than failed. +func TestConsume_DebeziumTopic(t *testing.T) { + t.Parallel() + + type user struct { + ID int64 `json:"id"` + Name string `json:"name"` + } + + registry := source.NewRegistry().SetDefault(cdc.New()) + + var applied []string // ordered record of what the handler did per key + handler := func(_ context.Context, m source.Message) source.Result { + ev, err := cdc.DecodeEvent(registry, m) + if err != nil { + return source.Term(err) + } + switch ev.Operation { + case cdc.OpTombstone: + return source.Skip() + case cdc.OpDelete: + applied = append(applied, "delete") + return source.Ack() + default: + row, err := cdc.AfterAs[user](ev) + if err != nil { + return source.Term(err) + } + applied = append(applied, row.Name) + return source.Ack() + } + } + + h := memsource.NewHarness(t, nil, + memsource.Msg{Key: "1", Value: []byte(`{"op":"c","after":{"id":1,"name":"ada"}}`)}, + memsource.Msg{Key: "1", Value: []byte(`{"op":"u","after":{"id":1,"name":"ada lovelace"}}`)}, + memsource.Msg{Key: "1", Value: []byte(`{"op":"d","before":{"id":1,"name":"ada lovelace"}}`)}, + memsource.Msg{Key: "1", Value: nil}, // tombstone + ) + h.Run(handler) + + h.AssertCounts(memsource.Counts{Acked: 3, Dropped: 1}) + + want := []string{"ada", "ada lovelace", "delete"} + if len(applied) != len(want) { + t.Fatalf("applied = %v, want %v", applied, want) + } + for i := range want { + if applied[i] != want[i] { + t.Fatalf("applied[%d] = %q, want %q", i, applied[i], want[i]) + } + } +} diff --git a/source/cdc/data.go b/source/cdc/data.go new file mode 100644 index 0000000..2e075be --- /dev/null +++ b/source/cdc/data.go @@ -0,0 +1,116 @@ +// SPDX-License-Identifier: Apache-2.0 + +package cdc + +import ( + "github.com/stablekernel/crucible/source" +) + +// Header keys under which [SourceHeaders] surfaces a change event's source +// metadata through the core [source.Headers], keeping CDC metadata legible as +// typed headers instead of a magic-string map. +const ( + // OperationHeader carries the change operation's code (see [Operation.String]). + OperationHeader = "cdc-op" + // ConnectorHeader carries the source connector name. + ConnectorHeader = "cdc-connector" + // DatabaseHeader carries the source database. + DatabaseHeader = "cdc-database" + // SchemaHeader carries the source schema. + SchemaHeader = "cdc-schema" + // TableHeader carries the source table. + TableHeader = "cdc-table" + // SnapshotHeader carries "true" when the record came from the initial + // snapshot, and is omitted otherwise. + SnapshotHeader = "cdc-snapshot" + // LSNHeader carries the source log sequence number / position, when present. + LSNHeader = "cdc-lsn" + // TxIDHeader carries the source transaction id, when present. + TxIDHeader = "cdc-txid" +) + +// EventOf recovers the [ChangeEvent] a [Codec] decoded from a registry result. +// It is the typed bridge between [source.Registry.Decode] (which returns any) +// and a handler that works with change events: pass the decoded value, get back +// the event and whether the value was in fact a ChangeEvent. +// +// A false return means the registry routed the message to a different codec (the +// content type matched something other than this codec); it is not a decode +// failure. +func EventOf(v any) (ChangeEvent, bool) { + e, ok := v.(ChangeEvent) + return e, ok +} + +// DecodeEvent decodes m through r and recovers the [ChangeEvent], the +// convenience path a CDC handler uses instead of calling [source.Registry.Decode] +// and [EventOf] in sequence. A decode failure returns the [*source.DecodeError] +// from the registry; a value that is not a ChangeEvent (some other codec matched) +// returns a *source.DecodeError wrapping [source.ErrPoison], since a payload that +// decoded to the wrong shape cannot be retried into the right one. +func DecodeEvent(r *source.Registry, m source.Message) (ChangeEvent, error) { + v, err := r.Decode(m) + if err != nil { + return ChangeEvent{}, err + } + e, ok := EventOf(v) + if !ok { + return ChangeEvent{}, &source.DecodeError{ + Subject: m.Subject(), + Err: source.ErrPoison, + } + } + return e, nil +} + +// BeforeAs decodes a change event's before-image into a fresh value of type T: +// the typed view of the row prior to the change. It returns [ErrMissingImage] +// when the operation carries no before-image (a create, a snapshot read, a +// tombstone). +func BeforeAs[T any](e ChangeEvent) (T, error) { + var v T + if err := e.Before.As(&v); err != nil { + return v, err + } + return v, nil +} + +// AfterAs decodes a change event's after-image into a fresh value of type T: the +// typed view of the row after the change. It returns [ErrMissingImage] when the +// operation carries no after-image (a delete, a tombstone). +func AfterAs[T any](e ChangeEvent) (T, error) { + var v T + if err := e.After.As(&v); err != nil { + return v, err + } + return v, nil +} + +// SourceHeaders surfaces a change event's operation and source metadata as core +// [source.Headers], so a handler reads CDC metadata through the same typed-header +// surface as any other inbound metadata. Only non-empty fields are emitted; the +// snapshot header appears only when the record came from a snapshot. The result +// is a fresh slice in a stable order the caller may retain. +func SourceHeaders(e ChangeEvent) source.Headers { + headers := make(source.Headers, 0, 8) + headers = append(headers, source.Header{Key: OperationHeader, Value: e.Operation.String()}) + appendNonEmpty(&headers, ConnectorHeader, e.Source.Connector) + appendNonEmpty(&headers, DatabaseHeader, e.Source.Database) + appendNonEmpty(&headers, SchemaHeader, e.Source.Schema) + appendNonEmpty(&headers, TableHeader, e.Source.Table) + if e.Source.Snapshot { + headers = append(headers, source.Header{Key: SnapshotHeader, Value: "true"}) + } + appendNonEmpty(&headers, LSNHeader, e.Source.LSN) + appendNonEmpty(&headers, TxIDHeader, e.Source.TxID) + return headers +} + +// appendNonEmpty appends a header only when its value is non-empty, so absent +// metadata never produces a blank header that shadows a real one. +func appendNonEmpty(h *source.Headers, key, value string) { + if value == "" { + return + } + *h = append(*h, source.Header{Key: key, Value: value}) +} diff --git a/source/cdc/example_test.go b/source/cdc/example_test.go new file mode 100644 index 0000000..feb4965 --- /dev/null +++ b/source/cdc/example_test.go @@ -0,0 +1,73 @@ +// SPDX-License-Identifier: Apache-2.0 + +package cdc_test + +import ( + "fmt" + + "github.com/stablekernel/crucible/source" + "github.com/stablekernel/crucible/source/cdc" +) + +// Example shows wiring the CDC codec into a source.Registry and decoding a +// Debezium JSON change event into a typed row image. The codec is +// instance-scoped: it is registered on a registry the caller owns, never on a +// process-global table. +func Example() { + type user struct { + ID int64 `json:"id"` + Name string `json:"name"` + } + + // Register the codec under the Debezium JSON content type, and as the + // default so a topic that carries no content-type header still routes to it. + codec := cdc.New() + registry := source.NewRegistry(). + Register(cdc.DebeziumJSONContentType, codec). + SetDefault(codec) + + // A Debezium update envelope for the users table: both row images present. + msg := exampleMessage{ + subject: "shop.public.users", + value: []byte(`{ + "op":"u", + "before":{"id":1,"name":"ada"}, + "after":{"id":1,"name":"ada lovelace"}, + "source":{"connector":"postgresql","db":"shop","schema":"public","table":"users"}, + "ts_ms":1700000000000 + }`), + } + + event, err := cdc.DecodeEvent(registry, msg) + if err != nil { + fmt.Println("decode failed:", err) + return + } + + after, err := cdc.AfterAs[user](event) + if err != nil { + fmt.Println("after image:", err) + return + } + table, _ := cdc.SourceHeaders(event).Get(cdc.TableHeader) + + fmt.Printf("op=%s table=%s id=%d name=%q\n", + event.Operation, table, after.ID, after.Name) + + // Output: + // op=u table=users id=1 name="ada lovelace" +} + +// exampleMessage is a minimal source.Message for the example. +type exampleMessage struct { + subject string + value []byte +} + +func (m exampleMessage) Key() []byte { return nil } +func (m exampleMessage) Value() []byte { return m.value } +func (m exampleMessage) Headers() source.Headers { return nil } +func (m exampleMessage) Subject() string { return m.subject } +func (m exampleMessage) PartitionKey() string { return "" } +func (m exampleMessage) Cursor() source.Cursor { return nil } +func (m exampleMessage) As(any) bool { return false } diff --git a/source/statemachine/example_cdc_test.go b/source/statemachine/example_cdc_test.go new file mode 100644 index 0000000..a52595a --- /dev/null +++ b/source/statemachine/example_cdc_test.go @@ -0,0 +1,90 @@ +// SPDX-License-Identifier: Apache-2.0 + +package statemachine_test + +import ( + "context" + "fmt" + + "github.com/stablekernel/crucible/source" + "github.com/stablekernel/crucible/source/cdc" + "github.com/stablekernel/crucible/source/statemachine" + "github.com/stablekernel/crucible/state" +) + +// ExampleDrive_cdc shows the intended change-data-capture pattern: a Debezium +// topic carries row changes for a turnstile table, a cdc.Codec decodes each +// change event, and a Router projects the decoded row into the instance key and +// the event to fire. The Hopper then drives the statechart per primary key, +// acking only after the transition is durably persisted. +// +// Here a single update row (the turnstile becomes funded) decodes to a coin +// event, unlocking the machine. No broker is involved; the message stands in for +// one Debezium record off the topic. +func ExampleDrive_cdc() { + machine := buildTurnstile() + store := statemachine.NewMemStore[turnstileState, turnstileState, turnstileEvent, *turnstile]() + + // Seed a funded turnstile in the locked state at version 1. + seeded := machine.Cast(&turnstile{Funded: true}, state.WithInitialState[turnstileState](locked)) + _ = store.Save(context.Background(), locked, + statemachine.Record[turnstileState, turnstileEvent, *turnstile]{Snapshot: seeded.Snapshot(), Version: 1}, 0) + + // One codec, registered as the registry default for the CDC topic. + registry := source.NewRegistry().SetDefault(cdc.New()) + + // The Router decodes the change event and projects its after-image into a + // (key, event): a create or update on a funded row drives the coin event. + router := func(m source.Message) (turnstileState, turnstileEvent, error) { + event, err := cdc.DecodeEvent(registry, m) + if err != nil { + return 0, 0, err + } + row, err := cdc.AfterAs[turnstile](event) + if err != nil { + return 0, 0, fmt.Errorf("cdc example: project after-image: %w", err) + } + if !row.Funded { + return 0, 0, fmt.Errorf("cdc example: row not funded") + } + return locked, coin, nil + } + + sink := statemachine.SinkFunc(func(_ context.Context, eff any) error { + fmt.Printf("emit: %v\n", eff) + return nil + }) + h := statemachine.Drive[turnstileState, turnstileEvent, *turnstile]( + machine, store, router, statemachine.WithSink(sink), + ) + + // A Debezium update envelope: the row's funded column flips true. + change := cdcMessage(`{ + "op":"u", + "before":{"funded":false}, + "after":{"funded":true}, + "source":{"connector":"postgresql","db":"gate","schema":"public","table":"turnstile"}, + "ts_ms":1700000000000 + }`, "lsn-100") + + res := h(context.Background(), change) + fmt.Println("result:", res.Action) + + rec, _, _ := store.Load(context.Background(), locked) + fmt.Println("state:", rec.Snapshot.Current, "version:", rec.Version) + + // Output: + // emit: {coin} + // result: ack + // state: unlocked version: 2 +} + +// cdcMessage builds a fakeMessage carrying a Debezium JSON change event on a +// CDC topic, with the cursor standing in for the source log position. +func cdcMessage(payload, cursor string) fakeMessage { + return fakeMessage{ + value: []byte(payload), + subject: "gate.public.turnstile", + cursor: fakeCursor(cursor), + } +} diff --git a/tools/docsgen/reference.go b/tools/docsgen/reference.go index b1dd5ce..c7f4385 100644 --- a/tools/docsgen/reference.go +++ b/tools/docsgen/reference.go @@ -106,23 +106,27 @@ var referencePackages = []referencePackage{ desc: "A CloudEvents codec with structured and binary content modes.", }, { - mod: "source/retry", pkg: ".", slug: "source-retry", title: "source/retry", order: 19, + mod: "source/cdc", pkg: ".", slug: "source-cdc", title: "source/cdc", order: 19, + desc: "A change-data-capture codec: decode a Debezium/OpenCDC change event into a typed ChangeEvent and drive a statechart per primary key.", + }, + { + mod: "source/retry", pkg: ".", slug: "source-retry", title: "source/retry", order: 20, desc: "Classification-aware retry middleware: backoff for Retryable, straight to DLQ for Poison and InvalidForState.", }, { - mod: "source/dlq", pkg: ".", slug: "source-dlq", title: "source/dlq", order: 20, + mod: "source/dlq", pkg: ".", slug: "source-dlq", title: "source/dlq", order: 21, desc: "Layered dead-letter middleware whose parking topic is itself an Inlet, so draining it is first-class replay.", }, { - mod: "source/idempotency", pkg: ".", slug: "source-idempotency", title: "source/idempotency", order: 21, + mod: "source/idempotency", pkg: ".", slug: "source-idempotency", title: "source/idempotency", order: 22, desc: "A Deduper seam with a no-op default; for the state-machine binding, dedup is transition idempotency.", }, { - mod: "source/schema", pkg: ".", slug: "source-schema", title: "source/schema", order: 22, + mod: "source/schema", pkg: ".", slug: "source-schema", title: "source/schema", order: 23, desc: "An optional proto/Avro/JSON-Schema validator that runs before the handler, routing invalid payloads to the DLQ.", }, { - mod: "source/statemachine", pkg: ".", slug: "source-statemachine", title: "source/statemachine", order: 23, + mod: "source/statemachine", pkg: ".", slug: "source-statemachine", title: "source/statemachine", order: 24, desc: "The state-to-source bridge: an inbound message drives a transition and the ack is tied to the durable transition.", }, }