Skip to content

feat: durable enqueue outbox dispatch — publish-in-commit via attached bus#54

Merged
patrickleet merged 13 commits into
codex/hops-service-create-microsvc-scaffoldfrom
feat/durable-enqueue-outbox-dispatch
Jun 5, 2026
Merged

feat: durable enqueue outbox dispatch — publish-in-commit via attached bus#54
patrickleet merged 13 commits into
codex/hops-service-create-microsvc-scaffoldfrom
feat/durable-enqueue-outbox-dispatch

Conversation

@patrickleet
Copy link
Copy Markdown
Collaborator

Stacked on #53 (base = codex/hops-service-create-microsvc-scaffold); rebases onto main once #53 merges. Implements [[specs/durable-enqueue-outbox-dispatch]].

What this does

Connects the producing side of the bus to a service so one bus config drives both consume and produce, with publish-in-commit as the primary path:

let service = distributed::register_handlers!(
    Service::with_repo(repo.queued().aggregate::<Todo>()),
    command handlers::todo_create,
    event   handlers::todo_projection,
);

service
    .with_bus(InMemoryBus::new())   // transport config on the service
    .run(RunOptions::idempotent())  // auto listen (commands) + subscribe (events)
    .await?;
// In a handler:
ctx.commit_outbox(&mut todo, message).await?;
//  bus attached → claim the outbox row in the commit transaction (InFlight,
//                 short lease, attempts=1) → publish immediately via the bus →
//                 complete; publish failure releases for the poller and never
//                 rolls back the committed aggregate.
//  no bus       → commit pending; the polling worker publishes.

The claim-in-transaction model means the after-commit publish needs no separate claim and cannot race the poller; the lease is the crash-handoff to the worker.

Commits

  1. CommitReceipt from OutboxCommit::commit — the inserted-ids seam (source-compatible across all 52 callers).
  2. BusPublisherBus → AsyncMessagePublisher adapter routing Command/Event to send_message/publish_message (the missing piece that lets the dispatcher publish through any *Bus).
  3. HasOutboxStore — capability resolving the outbox store through the AggregateRepository → QueuedRepository → leaf wrapper chain.
  4. Service::with_bus + Microservice + dispatcher() — transport config on the service (produce side).
  5. OutboxCommit::commit_claimed — claim-in-transaction (row inserts InFlight/leased, not poller-claimable).
  6. Context::commit_outbox + DynPublisher — publish-in-commit end to end. DynPublisher is an object-safe (boxed-future) shim around AsyncMessagePublisher so Service<D> keeps its type rather than becoming Service<D, P>.
  7. Microservice::run — derives listen/subscribe from the registered handlers, executor-agnostic poll-join (no tokio pulled into core). Clone derived for RunOptions.

Tests

cargo test --lib236 passed, 0 failed. All integration test targets compile. New coverage: commit_receipt, bus_publisher routing, has_outbox_store wrapper resolution, commit_claimed (in-flight/leased/not-poller-claimable), commit_outbox immediate publish e2e, run() consuming a queued command.

Not in this PR (follow-ups, tracked in tasks/durable-enqueue-outbox-dispatch-impl)

  • Docs: README "durable enqueue" framing + closing the Quick Start produce loop.
  • Continuous backstop poll loop inside run(): needs an async timer (tokio), which isn't in core. dispatcher() is exposed to drive it; the immediate path covers the happy case, so the loop is the crash backstop only. Open: gate behind a runtime feature vs. a separate worker entrypoint.
  • commit_outbox covers AggregateRepository, not yet SnapshotAggregateRepository.
  • SQL-backed (postgres/sqlite) end-to-end test of commit_outbox (trait impls compile under those features; only HashMap exercised so far).
  • Immediate-publish lease customization (currently defaults; the Microservice setters configure the poller).

🤖 Generated with Claude Code

patrickleet and others added 7 commits June 3, 2026 14:43
OutboxCommit::commit now returns a CommitReceipt carrying the inserted
outbox message id(s) instead of (), so an after-commit dispatcher can
publish exactly the rows the transaction wrote. Source-compatible:
?-statement callers discard the receipt.

Step 1 of [[tasks/durable-enqueue-outbox-dispatch-impl]]
Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
Routes outbox-derived messages by MessageKind: commands to send_message
(point-to-point), events to publish_message (fan-out). This is the missing
adapter that lets the outbox dispatcher publish through any *Bus uniformly.

Step 2 of [[tasks/durable-enqueue-outbox-dispatch-impl]]
Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
New trait abstracting 'produce a durable outbox store', resolving through the
AggregateRepository -> QueuedRepository -> leaf repo wrapper chain. Lets the
runtime build an OutboxDispatcher without naming the concrete repository type.
Impls for HashMap (and feature-gated Sqlite/Postgres) leaves + the wrappers.

Step 3 (store access) of [[tasks/durable-enqueue-outbox-dispatch-impl]]
Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
Service::with_bus(bus) wraps the consumer Service into a Microservice carrying
the transport config. Microservice::dispatcher() assembles an OutboxDispatcher
over the service's own outbox store + a BusPublisher, so committed outbox rows
drain to the bus routed by kind. Test proves commit -> dispatch -> published
end to end over InMemoryBus.

Consume side (run() auto listen/subscribe) and the in-transaction commit_outbox
land next.

Step 6 (runtime, produce side) of [[tasks/durable-enqueue-outbox-dispatch-impl]]
Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
Claims the outbox row for publication in the same transaction that commits the
aggregate: the row inserts already InFlight under the worker's lease
(attempts = 1), so the after-commit publish needs no separate claim and cannot
race the poller. Returns the claimed message clone so the caller can build the
transport message and settle the claim. Test proves the row is in-flight,
leased, and not poller-claimable.

Step 4 (claim-in-transaction) of [[tasks/durable-enqueue-outbox-dispatch-impl]]
Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
Wires the durable-enqueue command path end to end:
- DynPublisher: object-safe (boxed-future) form of AsyncMessagePublisher, so a
  publisher can sit behind Arc<dyn> without making Service generic over it.
- Service carries an optional ImmediatePublish (publisher + worker id + lease +
  attempts), set by with_bus; Context receives it.
- Context::commit_outbox: with a bus attached, claims the outbox row in the
  commit transaction then publishes immediately through the bus, completing or
  releasing the claim; with no bus, commits pending for the poller. Best-effort
  publish never rolls back the committed aggregate.

Test: dispatch -> commit_outbox -> row published immediately, none left pending.

Steps 3+5 (DynPublisher + commit_outbox) of [[tasks/durable-enqueue-outbox-dispatch-impl]]
Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
run() reads the service's subscription_plan and drives the consumers
concurrently on the caller's runtime: command handlers via competing listen,
event handlers via fan-out subscribe. Uses an executor-agnostic poll-join (no
spawn, no timer) so it works in core without pulling tokio. Returns on first
error or when the consumers stop. Derive Clone for RunOptions/ConsumerDeliveryMode
so one options value drives both consumers. Test: run() consumes a queued
command and the handler's commit_outbox publishes immediately.

Producing happy-path is commit_outbox (immediate); the backstop poll loop (needs
a timer) is driven from dispatcher() by a runtime that provides one.

Step 6 (runtime, consume side) of [[tasks/durable-enqueue-outbox-dispatch-impl]]
Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Jun 3, 2026

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 27abf415-5ddb-45c2-bb66-2ca706f32f45

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/durable-enqueue-outbox-dispatch

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

patrickleet and others added 6 commits June 3, 2026 15:54
Exercises commit_outbox (claim-in-transaction + immediate publish) and run()
against a real SQL backend (in-memory SQLite), not just HashMapRepository.
Proves the HasOutboxStore impls and the SQL commit path persist the in-flight
claim and complete it. Also fixes a must_use warning on the finished-consumer
future in run().

[[tasks/durable-enqueue-outbox-dispatch-impl]]
Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
Generalize the durable-enqueue command path with an OutboxCommitting<A> trait
that commits an aggregate + outbox row in one transaction, staging whatever the
repo needs. Implemented for AggregateRepository (delegates to the existing
OutboxCommit) and SnapshotAggregateRepository (stages the snapshot + outbox row
together via CommitBatch — previously these could not compose). Context::commit_outbox
now binds D::Repo: OutboxCommitting<A> + HasOutboxStore instead of the concrete
AggregateRepository, so snapshot-backed services get claim-in-transaction +
immediate publish too. Test: snapshot-backed commit_outbox publishes immediately.

[[tasks/durable-enqueue-outbox-dispatch-impl]]
Builds on [[specs/transactional-commit-boundary]]
Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
Fold SnapshotAggregateRepository into AggregateRepository via an optional
SnapshotPolicy whose Snapshottable/SnapshotStore requirements are captured as
monomorphized fn-pointers at with_snapshots() time, keeping the generic
get/commit methods unbounded. Now:

- .with_snapshots(n) returns AggregateRepository<R,A> (same type), so handler
  dependency types are identical with/without snapshots.
- every method works either way; commit stages a snapshot (when due) in the same
  CommitBatch, get hydrates from a snapshot when present. The full repo surface
  (peek/abort/get_with/outbox/...) is available with snapshots on — previously
  the wrapper dropped most of it.
- exactly ONE OutboxCommitting impl (on AggregateRepository); the snapshot-
  specific impl and the whole SnapshotAggregateRepository type are removed.

with_snapshots now requires R: SnapshotStore (you can't cache snapshots in a
store that can't hold them) — stricter and more correct than the old wrapper.
Tests migrated to the unified type; assertions unchanged. Full suite + sqlite green.

Implements [[specs/snapshots-as-transparent-optimization]] [[tasks/snapshots-transparent-optimization]]

Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
Make the existing API do the new functionality instead of adding a method.
Attaching a bus (Service::with_bus) installs an outbox publisher on the
repository; OutboxCommit::commit then claims the row in the commit transaction
and publishes it immediately via that bus, settling the claim (complete, or
release for the worker on failure). No bus configured -> commit stays pending
for the worker, exactly as before.

Removed: ctx.commit_outbox, the OutboxCommitting trait, OutboxCommit::commit_claimed,
Service's ImmediatePublish + the Context publisher plumbing, and the now-unused
DynPublisher (the snapshot unification already collapsed the two repo types, so
the polymorphism trait was dead weight).

Added: OutboxPublishHook (object-safe) + OutboxPublisherConfig on the repo,
BusOutboxPublishHook (store + BusPublisher), ConfigurableOutboxPublisher. Tests
migrated to repo.outbox(msg).commit(agg); full default + sqlite suites green.

Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
Fold Microservice back into Service. Attaching a bus no longer changes the
type: with_bus(bus) returns the same Service<D> and run() is a method on it, so
the whole thing reads as one fluent builder —
  Service::with_repo(r).command(..).handle(..).with_bus(bus).run(opts)

The bus's consume behavior is type-erased into a single closure field on the
service (ServiceRunner), so Service stays single-param — message_router, the
register_handlers! macro, and every existing Service<D> call site are untouched.
Removes the Microservice type and the speculative dispatcher() accessor (the
backstop poll loop is a later, runtime-gated addition).

Net simpler: one type, one builder, less code.

Implements [[specs/durable-enqueue-outbox-dispatch]]

Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
Replace the with_repo / with_read_model_store / with_repo_and_read_model_store
constructors with one fluent builder: every service starts at Service::new() and
chains dependency + bus steps —
  Service::new().with_repo(r).with_read_model_store(s).with_bus(bus)

with_repo/with_read_model_store are type-state transitions that produce exactly
the same D as before (Service<R>, or RepoReadModelDependencies<R,S> for both), so
handler signatures are unchanged — only construction call sites move. Combined
deps now delegate HasOutboxStore + ConfigurableOutboxPublisher to the repo so a
repo+read-model service can also with_bus. Migrated all call sites + README.

Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
@patrickleet patrickleet merged commit 25d5895 into codex/hops-service-create-microsvc-scaffold Jun 5, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant