feat: Add operation run dispatcher#3093
Conversation
Summary by CodeRabbit
WalkthroughThis PR adds a ChangesOperation-run dispatcher and completed-with-failures status
Estimated code review effort: 5 (Critical) | ~120 minutes Sequence Diagram(s)sequenceDiagram
participant Dispatcher
participant Store
participant TaskStore
participant TaskManager
Dispatcher->>Store: FetchRunnableIDs
Dispatcher->>Store: RunInTransaction(prepare, decide, execute)
Store->>Store: LockRunnable / LockOperationRunTargets
Dispatcher->>TaskStore: GetTask
Dispatcher->>Store: UpdateTargetState / UpdateRunState
Dispatcher->>TaskManager: SubmitTask
TaskManager-->>Dispatcher: taskID or ErrRackConflict
Compact metadata: No related issues or related PRs were supplied. Suggested labels: feature, operation-run, dispatcher, database-migration Suggested reviewers: Reviewers familiar with operation-run state transitions, PostgreSQL locking, and dispatcher orchestration should inspect this closely. Poem A run turns green, then failure splits the light, 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
rest-api/flow/internal/operationrun/operationrun_test.go (1)
18-55: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low valueConsider broadening non-terminal coverage.
Only
TaskStatusRunningis exercised for the non-terminal branch. Iftaskcommon.TaskStatushas other non-terminal values (e.g. pending/queued), adding a row per value would tighten coverage of the "default" mapping without much effort.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rest-api/flow/internal/operationrun/operationrun_test.go` around lines 18 - 55, The test for OperationRunTargetStatusFromTaskStatus only covers TaskStatusRunning for the non-terminal path, so broaden the table in TestOperationRunTargetStatusFromTaskStatus to include any other non-terminal TaskStatus values supported by taskcommon. Keep the existing assertions against OperationRunTargetStatusSubmitted, and add one row per non-terminal status so the default mapping is exercised more thoroughly.rest-api/flow/internal/operationrun/manager/dispatcher/safety.go (1)
39-53: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winEnrich the pause message with which gate tripped.
evaluatealways returns the same static"failure threshold reached"message regardless of which gate tripped or its configured threshold/kind. When an operator inspects a paused run'sStatusReason/message to diagnose why it stopped, this generic text gives no clue which gate (or how close to/over threshold) caused the pause, especially when multiple gates are configured.Consider including the gate kind and the observed stats, e.g.:
♻️ Proposed enrichment
return pauseDecision{ pause: true, reason: operationrun.OperationRunStatusReasonSafetyGate, - message: "failure threshold reached", + message: fmt.Sprintf( + "safety gate %s tripped: %d/%d failed", + gate.SafetyGateKind(), stats.failed, stats.total, + ), }As per path instructions, Flow changes should be reviewed for "observability for stuck or failed operations."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rest-api/flow/internal/operationrun/manager/dispatcher/safety.go` around lines 39 - 53, The pause message in evaluate is too generic because it always returns the same text when a gate trips. Update the pauseDecision returned from evaluate to include the specific gate identity from gate.SafetyGateScope() and the observed stats/threshold context from statsForScope so operators can tell which gate caused the pause. Keep the existing control flow in dispatcher/safety.go, but enrich the message built in the loop where gate.IsTripped(...) succeeds.Source: Path instructions
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rest-api/flow/internal/operationrun/manager/dispatcher/execution.go`:
- Around line 68-72: The submission flow in Dispatcher execution is persisting
OperationRunTargetStatusSubmitted before a child task is safely created and
linked, which can leave claimed work unrecoverable if submit or
UpdateTargetState fails. Update the target state handling in execution.go so the
claim is a recoverable lease/claim until SubmitTask succeeds and TaskID is
persisted, then transition to Submitted only after the task reference is durably
recorded. Make the submit path/idempotent or atomic around the submit logic in
the submit/execution flow so a crash or canceled context can be reconciled
without blocking concurrency.
In `@rest-api/flow/internal/operationrun/manager/store/dispatch.go`:
- Around line 96-136: UpdateRunState and UpdateTargetState currently ignore
whether Exec actually updated any rows, so stale or missing run/target IDs can
silently succeed. Capture the sql.Result returned by
s.idb(ctx).NewUpdate().Exec(ctx) in both PostgresStore methods, check
RowsAffected, and return an error when it is zero (or otherwise unexpected) so
dispatcher state transitions fail loudly. Keep the fix localized to
UpdateRunState and UpdateTargetState, using the existing run.ID and target.ID
filters as the signal for whether persistence really happened.
---
Nitpick comments:
In `@rest-api/flow/internal/operationrun/manager/dispatcher/safety.go`:
- Around line 39-53: The pause message in evaluate is too generic because it
always returns the same text when a gate trips. Update the pauseDecision
returned from evaluate to include the specific gate identity from
gate.SafetyGateScope() and the observed stats/threshold context from
statsForScope so operators can tell which gate caused the pause. Keep the
existing control flow in dispatcher/safety.go, but enrich the message built in
the loop where gate.IsTripped(...) succeeds.
In `@rest-api/flow/internal/operationrun/operationrun_test.go`:
- Around line 18-55: The test for OperationRunTargetStatusFromTaskStatus only
covers TaskStatusRunning for the non-terminal path, so broaden the table in
TestOperationRunTargetStatusFromTaskStatus to include any other non-terminal
TaskStatus values supported by taskcommon. Keep the existing assertions against
OperationRunTargetStatusSubmitted, and add one row per non-terminal status so
the default mapping is exercised more thoroughly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Enterprise
Run ID: 256e565b-474b-45dd-b535-f3fa0da86531
⛔ Files ignored due to path filters (1)
rest-api/flow/pkg/proto/v1/flow.pb.gois excluded by!**/*.pb.go,!rest-api/**/*.pb.go
📒 Files selected for processing (30)
rest-api/flow/internal/converter/protobuf/operationrun_converter.gorest-api/flow/internal/converter/protobuf/operationrun_converter_test.gorest-api/flow/internal/db/migrations/20260626120000_operation_run_completed_with_failures_status.down.sqlrest-api/flow/internal/db/migrations/20260626120000_operation_run_completed_with_failures_status.up.sqlrest-api/flow/internal/operationrun/configuration.gorest-api/flow/internal/operationrun/manager/dispatcher/config.gorest-api/flow/internal/operationrun/manager/dispatcher/conflict_policy.gorest-api/flow/internal/operationrun/manager/dispatcher/decision.gorest-api/flow/internal/operationrun/manager/dispatcher/dependencies.gorest-api/flow/internal/operationrun/manager/dispatcher/dispatch_run.gorest-api/flow/internal/operationrun/manager/dispatcher/dispatcher.gorest-api/flow/internal/operationrun/manager/dispatcher/dispatcher_test.gorest-api/flow/internal/operationrun/manager/dispatcher/execution.gorest-api/flow/internal/operationrun/manager/dispatcher/phase.gorest-api/flow/internal/operationrun/manager/dispatcher/policy.gorest-api/flow/internal/operationrun/manager/dispatcher/preparation.gorest-api/flow/internal/operationrun/manager/dispatcher/reconciliation.gorest-api/flow/internal/operationrun/manager/dispatcher/safety.gorest-api/flow/internal/operationrun/manager/lifecycle.gorest-api/flow/internal/operationrun/manager/manager.gorest-api/flow/internal/operationrun/manager/manager_test.gorest-api/flow/internal/operationrun/manager/queries.gorest-api/flow/internal/operationrun/manager/store.gorest-api/flow/internal/operationrun/manager/store/dispatch.gorest-api/flow/internal/operationrun/manager/store/store.gorest-api/flow/internal/operationrun/operationrun.gorest-api/flow/internal/operationrun/operationrun_test.gorest-api/flow/internal/service/service.gorest-api/flow/internal/task/manager/manager.gorest-api/flow/proto/v1/flow.proto
- Add OperationRunDispatcher lifecyle wiring to Flow service startup/shutdown. - One dispatcher runs periodically to lock/reconcile/decide/claim/submission. - The conflict policy, safety policy and phase policy are handled. - Add unit tests
175c457 to
7f71873
Compare
🔍 Container Scan Summary
Per-CVE detail lives in the per-service |
🔐 TruffleHog Secret Scan✅ No secrets or credentials found! Your code has been scanned for 700+ types of secrets and credentials. All clear! 🎉 🕐 Last updated: 2026-07-02 18:42:13 UTC | Commit: 7f71873 |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
rest-api/flow/internal/operationrun/manager/dispatcher/config.go (1)
8-20: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winPromote
defaultSubmitPersistTimeoutintoConfig.Every other dispatcher timing knob (
PollInterval,FetchBatch,ClaimLease) is exposed onConfigand defaulted viawithDefaults().defaultSubmitPersistTimeoutis defined here but consumed directly as a package constant inexecution.go'supdateTargetAfterSubmit, leaving it neither configurable nor easily overridable in tests that want to exercise timeout behavior deterministically.♻️ Proposed fix
type Config struct { PollInterval time.Duration FetchBatch int ClaimLease time.Duration + SubmitPersistTimeout time.Duration } func (c Config) withDefaults() Config { if c.PollInterval <= 0 { c.PollInterval = defaultPollInterval } ... + if c.SubmitPersistTimeout <= 0 { + c.SubmitPersistTimeout = defaultSubmitPersistTimeout + } return c }Then update
execution.go'supdateTargetAfterSubmitto used.cfg.SubmitPersistTimeoutinstead of the bare constant.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rest-api/flow/internal/operationrun/manager/dispatcher/config.go` around lines 8 - 20, Promote the submit/persist timeout into dispatcher configuration so it is configurable and testable like the other timing knobs. Add a SubmitPersistTimeout field to Config, default it in withDefaults alongside PollInterval, FetchBatch, and ClaimLease, and keep the existing default value centralized there. Then update updateTargetAfterSubmit in execution.go to read d.cfg.SubmitPersistTimeout instead of the package-level defaultSubmitPersistTimeout.Source: Path instructions
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@rest-api/flow/internal/operationrun/manager/dispatcher/config.go`:
- Around line 8-20: Promote the submit/persist timeout into dispatcher
configuration so it is configurable and testable like the other timing knobs.
Add a SubmitPersistTimeout field to Config, default it in withDefaults alongside
PollInterval, FetchBatch, and ClaimLease, and keep the existing default value
centralized there. Then update updateTargetAfterSubmit in execution.go to read
d.cfg.SubmitPersistTimeout instead of the package-level
defaultSubmitPersistTimeout.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Enterprise
Run ID: 033fc593-6ca0-4e10-9a55-1ef0b7abf576
⛔ Files ignored due to path filters (1)
rest-api/flow/pkg/proto/v1/flow.pb.gois excluded by!**/*.pb.go,!rest-api/**/*.pb.go
📒 Files selected for processing (32)
rest-api/flow/internal/converter/protobuf/operationrun_converter.gorest-api/flow/internal/converter/protobuf/operationrun_converter_test.gorest-api/flow/internal/db/migrations/20260626120000_operation_run_completed_with_failures_status.down.sqlrest-api/flow/internal/db/migrations/20260626120000_operation_run_completed_with_failures_status.up.sqlrest-api/flow/internal/operationrun/configuration.gorest-api/flow/internal/operationrun/manager/dispatcher/config.gorest-api/flow/internal/operationrun/manager/dispatcher/conflict_policy.gorest-api/flow/internal/operationrun/manager/dispatcher/decision.gorest-api/flow/internal/operationrun/manager/dispatcher/dependencies.gorest-api/flow/internal/operationrun/manager/dispatcher/dispatch_run.gorest-api/flow/internal/operationrun/manager/dispatcher/dispatcher.gorest-api/flow/internal/operationrun/manager/dispatcher/dispatcher_test.gorest-api/flow/internal/operationrun/manager/dispatcher/execution.gorest-api/flow/internal/operationrun/manager/dispatcher/phase.gorest-api/flow/internal/operationrun/manager/dispatcher/policy.gorest-api/flow/internal/operationrun/manager/dispatcher/preparation.gorest-api/flow/internal/operationrun/manager/dispatcher/reconciliation.gorest-api/flow/internal/operationrun/manager/dispatcher/safety.gorest-api/flow/internal/operationrun/manager/dispatcher/safety_test.gorest-api/flow/internal/operationrun/manager/lifecycle.gorest-api/flow/internal/operationrun/manager/manager.gorest-api/flow/internal/operationrun/manager/manager_test.gorest-api/flow/internal/operationrun/manager/queries.gorest-api/flow/internal/operationrun/manager/store.gorest-api/flow/internal/operationrun/manager/store/dispatch.gorest-api/flow/internal/operationrun/manager/store/store.gorest-api/flow/internal/operationrun/manager/store/store_test.gorest-api/flow/internal/operationrun/operationrun.gorest-api/flow/internal/operationrun/operationrun_test.gorest-api/flow/internal/service/service.gorest-api/flow/internal/task/manager/manager.gorest-api/flow/proto/v1/flow.proto
✅ Files skipped from review due to trivial changes (2)
- rest-api/flow/internal/operationrun/manager/store/store.go
- rest-api/flow/internal/operationrun/manager/dispatcher/policy.go
🚧 Files skipped from review as they are similar to previous changes (20)
- rest-api/flow/internal/operationrun/operationrun_test.go
- rest-api/flow/internal/operationrun/manager/dispatcher/dependencies.go
- rest-api/flow/proto/v1/flow.proto
- rest-api/flow/internal/task/manager/manager.go
- rest-api/flow/internal/operationrun/manager/store.go
- rest-api/flow/internal/operationrun/manager/dispatcher/phase.go
- rest-api/flow/internal/operationrun/manager/dispatcher/preparation.go
- rest-api/flow/internal/operationrun/manager/dispatcher/conflict_policy.go
- rest-api/flow/internal/service/service.go
- rest-api/flow/internal/operationrun/manager/store/dispatch.go
- rest-api/flow/internal/operationrun/manager/dispatcher/safety.go
- rest-api/flow/internal/operationrun/manager/dispatcher/reconciliation.go
- rest-api/flow/internal/operationrun/manager/manager_test.go
- rest-api/flow/internal/operationrun/manager/dispatcher/decision.go
- rest-api/flow/internal/operationrun/manager/manager.go
- rest-api/flow/internal/operationrun/operationrun.go
- rest-api/flow/internal/operationrun/configuration.go
- rest-api/flow/internal/operationrun/manager/dispatcher/dispatch_run.go
- rest-api/flow/internal/operationrun/manager/dispatcher/dispatcher_test.go
- rest-api/flow/internal/operationrun/manager/dispatcher/dispatcher.go
Related issues
Type of Change
Breaking Changes
Testing
Additional Notes
Manual testing is performed on local dev-deployment to verify the full cycle of an operation-run.