feat(orchestrator): multi-agent goal execution via session_policy spawn#377
feat(orchestrator): multi-agent goal execution via session_policy spawn#377dimakis wants to merge 2 commits into
Conversation
dimakis
left a comment
There was a problem hiding this comment.
Centaur Review
Found 8 issue(s) (5 warning).
server/task-orchestrator.ts
Solid feature addition with clean separation between spawn/reuse paths; main concerns are race conditions in the async spawn-failure callback (stale this.goalId/this.pinnedClientId after stop()) and a missing human_approval case in the signal resolve endpoint.
- 🟡 bugs (L355): In the spawn failure callback (
.thenhandler),this.goalId!is used with a non-null assertion, butstop()could have been called between spawn initiation and callback resolution, settinggoalIdtonull. The!assertion would then pass a null tosetTaskContext. Guard withif (!this.goalId) return;before the fallback logic.[fixable] - 🟡 bugs (L356): The spawn-failure fallback sets task context and sends prompt to the pinned session, but does NOT set
this.activeTaskId = next.id. This means the orchestrator's status reports no active task, yet a task is being worked on by the pinned session. If anothertick()fires (e.g., from a concurrent task completing),getNextExecutablewon't return this task (it's alreadyactive), but the missingactiveTaskIdbreaks the status reporting contract. Additionally,this.pinnedClientIdcould also be null by the time the callback fires ifstop()was called.[fixable] - 🔵 unsafe_assumptions (L368): The recursive
this.tick()after spawning is safe (bounded by tasks transitioning frompendingtoactive), but it runs synchronously on the call stack. With many spawn-policy tasks (e.g., 50+ parallel subtasks), this creates deep recursion. Consider usingqueueMicrotask(() => this.tick())orsetImmediateto avoid stack overflow in large task trees.[fixable]
server/app.ts
Solid feature addition with clean separation between spawn/reuse paths; main concerns are race conditions in the async spawn-failure callback (stale this.goalId/this.pinnedClientId after stop()) and a missing human_approval case in the signal resolve endpoint.
- 🟡 bugs (L1006):
SignalResolveBodyacceptstype: 'human_approval'but the switch statement in the resolve endpoint has nocase 'human_approval'branch. Ahuman_approvalsignal will match the gate type viafindActiveSignalTasksbutisMatchwill remainfalsesince no case handles it — the signal will silently be ignored. Either add a matching case or removehuman_approvalfrom the schema.[fixable] - 🟡 missing_tests (L980): The new
POST /api/signals/resolveendpoint has no integration tests. The existing/api/tasks/:id/signalendpoint also lacks tests, but adding a new gate-matching endpoint with per-type dispatch logic (centaur_review, gh_ci, gh_review) without tests is risky — especially the matching logic with optionalrepo/pr/pr_urlfields.[fixable] - 🔵 style (L1008): The gate config property access pattern
(gc as Record<string, unknown>).repo as string | undefinedis repeated 6 times across 3 switch cases. Consider destructuring the gate config once before the switch:const { repo: taskRepo, pr: taskPr, pr_url: taskPrUrl } = gc as Record<string, unknown>. This reduces duplication and makes the matching logic easier to read.[fixable]
server/index.ts
Solid feature addition with clean separation between spawn/reuse paths; main concerns are race conditions in the async spawn-failure callback (stale this.goalId/this.pinnedClientId after stop()) and a missing human_approval case in the signal resolve endpoint.
- 🟡 unsafe_assumptions (L240):
createSessionWorktrees(wtId, BASE_REPO, config.repos)is called synchronously and can throw (e.g., git failures). Thetry/catchhandles this, but if worktree creation fails, the function returnsnulland the orchestrator falls back to the pinned session — yet the task is already markedactivein the orchestrator (line 342). The fallback in the orchestrator's.then(null)path sends to the pinned session but the task was spawned for isolation. Consider whether worktree failure should revert the task topendinginstead.[fixable]
server/__tests__/task-orchestrator.test.ts
Solid feature addition with clean separation between spawn/reuse paths; main concerns are race conditions in the async spawn-failure callback (stale this.goalId/this.pinnedClientId after stop()) and a missing human_approval case in the signal resolve endpoint.
- 🔵 missing_tests (L576): The spawn tests verify the happy path and fallback, but don't test the rejection path (line 359-363 in task-orchestrator.ts) where
spawnSessionthrows — the task should revert topending. Also missing: a test for multiple spawn tasks being dispatched in parallel (verifying the recursivetick()spawns all pending tasks in sequence).[fixable]
| log.info('spawned session for task', { taskId: next.id, clientId }); | ||
| } else { | ||
| log.error('failed to spawn session, falling back to pinned', { taskId: next.id }); | ||
| this.deps.setTaskContext(next.id, this.goalId!); |
There was a problem hiding this comment.
🟡 bugs: In the spawn failure callback (.then handler), this.goalId! is used with a non-null assertion, but stop() could have been called between spawn initiation and callback resolution, setting goalId to null. The ! assertion would then pass a null to setTaskContext. Guard with if (!this.goalId) return; before the fallback logic. [fixable]
| } else { | ||
| log.error('failed to spawn session, falling back to pinned', { taskId: next.id }); | ||
| this.deps.setTaskContext(next.id, this.goalId!); | ||
| if (this.pinnedClientId) sendToChat(this.pinnedClientId, prompt); |
There was a problem hiding this comment.
🟡 bugs: The spawn-failure fallback sets task context and sends prompt to the pinned session, but does NOT set this.activeTaskId = next.id. This means the orchestrator's status reports no active task, yet a task is being worked on by the pinned session. If another tick() fires (e.g., from a concurrent task completing), getNextExecutable won't return this task (it's already active), but the missing activeTaskId breaks the status reporting contract. Additionally, this.pinnedClientId could also be null by the time the callback fires if stop() was called. [fixable]
|
|
||
| // Don't set activeTaskId — spawned tasks run independently. | ||
| // Continue ticking to find more parallel work. | ||
| this.tick(); |
There was a problem hiding this comment.
🔵 unsafe_assumptions: The recursive this.tick() after spawning is safe (bounded by tasks transitioning from pending to active), but it runs synchronously on the call stack. With many spawn-policy tasks (e.g., 50+ parallel subtasks), this creates deep recursion. Consider using queueMicrotask(() => this.tick()) or setImmediate to avoid stack overflow in large task trees. [fixable]
| if (!gc) continue; | ||
|
|
||
| let isMatch = false; | ||
| switch (type) { |
There was a problem hiding this comment.
🟡 bugs: SignalResolveBody accepts type: 'human_approval' but the switch statement in the resolve endpoint has no case 'human_approval' branch. A human_approval signal will match the gate type via findActiveSignalTasks but isMatch will remain false since no case handles it — the signal will silently be ignored. Either add a matching case or remove human_approval from the schema. [fixable]
| * External agents (e.g. Centaur) POST here after completing work — | ||
| * they don't need to know task IDs, just the gate parameters. | ||
| */ | ||
| app.post('/api/signals/resolve', (req, res) => { |
There was a problem hiding this comment.
🟡 missing_tests: The new POST /api/signals/resolve endpoint has no integration tests. The existing /api/tasks/:id/signal endpoint also lacks tests, but adding a new gate-matching endpoint with per-type dispatch logic (centaur_review, gh_ci, gh_review) without tests is risky — especially the matching logic with optional repo/pr/pr_url fields. [fixable]
| let isMatch = false; | ||
| switch (type) { | ||
| case 'centaur_review': { | ||
| const taskPrUrl = (gc as Record<string, unknown>).pr_url as string | undefined; |
There was a problem hiding this comment.
🔵 style: The gate config property access pattern (gc as Record<string, unknown>).repo as string | undefined is repeated 6 times across 3 switch cases. Consider destructuring the gate config once before the switch: const { repo: taskRepo, pr: taskPr, pr_url: taskPrUrl } = gc as Record<string, unknown>. This reduces duplication and makes the matching logic easier to read. [fixable]
| const config = getRepoConfig(); | ||
|
|
||
| try { | ||
| createSessionWorktrees(wtId, BASE_REPO, config.repos); |
There was a problem hiding this comment.
🟡 unsafe_assumptions: createSessionWorktrees(wtId, BASE_REPO, config.repos) is called synchronously and can throw (e.g., git failures). The try/catch handles this, but if worktree creation fails, the function returns null and the orchestrator falls back to the pinned session — yet the task is already marked active in the orchestrator (line 342). The fallback in the orchestrator's .then(null) path sends to the pinned session but the task was spawned for isolation. Consider whether worktree failure should revert the task to pending instead. [fixable]
|
|
||
| const [, prompt, goalArg] = spawnSession.mock.calls[0]; | ||
| expect(goalArg).toBe(goal.id); | ||
| expect(prompt).toContain('Spawn task'); |
There was a problem hiding this comment.
🔵 missing_tests: The spawn tests verify the happy path and fallback, but don't test the rejection path (line 359-363 in task-orchestrator.ts) where spawnSession throws — the task should revert to pending. Also missing: a test for multiple spawn tasks being dispatched in parallel (verifying the recursive tick() spawns all pending tasks in sequence). [fixable]
Enable the TaskOrchestrator to spawn dedicated headless sessions for individual tasks, unlocking multi-agent goal coordination. Changes: - task-orchestrator: implement session_policy 'spawn' path in tick(). Spawned tasks get independent sessions and don't block activeTaskId, allowing parallel execution. Falls back to pinned session on failure. - index.ts: wire spawnSession dep — creates worktrees, registers in event store, starts headless session with NullTransport. - app.ts: add POST /api/signals/resolve endpoint for external agents (e.g. Centaur) to resolve wait_for_signal tasks by gate metadata (type + repo + PR) without knowing task IDs. - api-schemas: add SignalResolveBody zod schema. - task-store: add findActiveSignalTasks(gateType) query method. - Tests: 7 new tests covering spawn dispatch, activeTaskId semantics, fallback behavior, and signal task lookup. Part of multi-agent orchestration initiative — Phase 1 of 4. Next: Centaur signal bridge (Phase 2), PR Shepherd goals (Phase 3). Co-Authored-By: Claude Opus 4.6 <[email protected]>
615d0d3 to
c000d4f
Compare
|
@centaur-review Please review this PR. This is Phase 1 of the multi-agent orchestration initiative — enables Key areas to review:
|
dimakis
left a comment
There was a problem hiding this comment.
Centaur Review
Found 7 issue(s) (4 warning).
server/task-orchestrator.ts
Sound multi-agent spawn design with correct DFS dispatch and orphan recovery, but the async .then() callbacks read this.goalId/this.pinnedClientId live — creating a race with stop() that can pass null to setTaskContext. The /api/signals/resolve endpoint needs test coverage and has an unhandled human_approval gate type.
- 🟡 bugs (L355): Race condition:
this.goalId!is read inside an async.then()callback, butstop()nullifiesthis.goalId(line 141). If the orchestrator is stopped beforespawnSessionresolves and returns null,this.goalId!evaluates tonull— passed tosetTaskContextas a null goalId. CapturegoalIdbefore the async call (e.g.const goalId = this.goalId;) and use the captured value in the callback, with a guard for the null case.[fixable] - 🟡 bugs (L356): Same race:
this.pinnedClientIdis read inside the async.then()callback, butstop()nullifies it (line 145). If the orchestrator stops before the spawn-failure fallback fires, the fallback silently does nothing — the task staysactivewith no session and no pinned client to handle it, effectively orphaned until the next tick reclaims it. Consider guarding withif (this.state !== 'running') return;at the top of both.then()callbacks.[fixable] - 🔵 unsafe_assumptions (L368): The recursive
this.tick()call after spawning enables parallel task dispatch, but there's no depth guard. If a goal has many spawn-policy leaf tasks, tick() recurses once per task before any stack frame returns. For a goal with ~100 spawn tasks this could approach stack limits. Consider using a loop orqueueMicrotask(()=> this.tick())to avoid unbounded recursion.[fixable]
server/app.ts
Sound multi-agent spawn design with correct DFS dispatch and orphan recovery, but the async .then() callbacks read this.goalId/this.pinnedClientId live — creating a race with stop() that can pass null to setTaskContext. The /api/signals/resolve endpoint needs test coverage and has an unhandled human_approval gate type.
- 🟡 bugs (L1027):
human_approvalis accepted by theSignalResolveBodyschema (api-schemas.ts:188) but has no case in the switch statement (lines 1027-1043). Tasks withtype: 'human_approval'will be fetched from the DB but never matched, silently returning{ ok: true, matched: [] }. Either add a matching case or removehuman_approvalfrom the schema's enum to avoid misleading callers into thinking their resolve request succeeded.[fixable] - 🟡 missing_tests (L1001): The
/api/signals/resolveendpoint has no test coverage. The gate-matching logic (centaur_review by pr_url/repo+pr, gh_ci/gh_review by repo+pr) is non-trivial and untested at the HTTP layer. At minimum, add tests for: successful match by repo+pr, match by pr_url, no-match when gate metadata doesn't align, and auth rejection.[fixable] - 🔵 style (L1029): Repeated
(gc as Record<string, unknown>).fieldName as Typecasts across the switch cases are noisy. Consider extracting a typed interface for gate config metadata (e.g.{ repo?: string; pr?: number; pr_url?: string }) or a helper function to reduce the casting boilerplate.[fixable]
server/__tests__/task-orchestrator.test.ts
Sound multi-agent spawn design with correct DFS dispatch and orphan recovery, but the async .then() callbacks read this.goalId/this.pinnedClientId live — creating a race with stop() that can pass null to setTaskContext. The /api/signals/resolve endpoint needs test coverage and has an unhandled human_approval gate type.
- 🔵 missing_tests: No test covers the spawn-failure fallback path (spawnSession returning null or rejecting). The existing tests verify the happy path and the missing-spawnSession fallback, but not the case where spawnSession is provided and fails. This is where the
this.goalId!race condition lives.[fixable]
| log.info('spawned session for task', { taskId: next.id, clientId }); | ||
| } else { | ||
| log.error('failed to spawn session, falling back to pinned', { taskId: next.id }); | ||
| this.deps.setTaskContext(next.id, this.goalId!); |
There was a problem hiding this comment.
🟡 bugs: Race condition: this.goalId! is read inside an async .then() callback, but stop() nullifies this.goalId (line 141). If the orchestrator is stopped before spawnSession resolves and returns null, this.goalId! evaluates to null — passed to setTaskContext as a null goalId. Capture goalId before the async call (e.g. const goalId = this.goalId;) and use the captured value in the callback, with a guard for the null case. [fixable]
| } else { | ||
| log.error('failed to spawn session, falling back to pinned', { taskId: next.id }); | ||
| this.deps.setTaskContext(next.id, this.goalId!); | ||
| if (this.pinnedClientId) sendToChat(this.pinnedClientId, prompt); |
There was a problem hiding this comment.
🟡 bugs: Same race: this.pinnedClientId is read inside the async .then() callback, but stop() nullifies it (line 145). If the orchestrator stops before the spawn-failure fallback fires, the fallback silently does nothing — the task stays active with no session and no pinned client to handle it, effectively orphaned until the next tick reclaims it. Consider guarding with if (this.state !== 'running') return; at the top of both .then() callbacks. [fixable]
|
|
||
| // Don't set activeTaskId — spawned tasks run independently. | ||
| // Continue ticking to find more parallel work. | ||
| this.tick(); |
There was a problem hiding this comment.
🔵 unsafe_assumptions: The recursive this.tick() call after spawning enables parallel task dispatch, but there's no depth guard. If a goal has many spawn-policy leaf tasks, tick() recurses once per task before any stack frame returns. For a goal with ~100 spawn tasks this could approach stack limits. Consider using a loop or queueMicrotask(()=> this.tick()) to avoid unbounded recursion. [fixable]
| if (!gc) continue; | ||
|
|
||
| let isMatch = false; | ||
| switch (type) { |
There was a problem hiding this comment.
🟡 bugs: human_approval is accepted by the SignalResolveBody schema (api-schemas.ts:188) but has no case in the switch statement (lines 1027-1043). Tasks with type: 'human_approval' will be fetched from the DB but never matched, silently returning { ok: true, matched: [] }. Either add a matching case or remove human_approval from the schema's enum to avoid misleading callers into thinking their resolve request succeeded. [fixable]
| * External agents (e.g. Centaur) POST here after completing work — | ||
| * they don't need to know task IDs, just the gate parameters. | ||
| */ | ||
| app.post('/api/signals/resolve', (req, res) => { |
There was a problem hiding this comment.
🟡 missing_tests: The /api/signals/resolve endpoint has no test coverage. The gate-matching logic (centaur_review by pr_url/repo+pr, gh_ci/gh_review by repo+pr) is non-trivial and untested at the HTTP layer. At minimum, add tests for: successful match by repo+pr, match by pr_url, no-match when gate metadata doesn't align, and auth rejection. [fixable]
| let isMatch = false; | ||
| switch (type) { | ||
| case 'centaur_review': { | ||
| const taskPrUrl = (gc as Record<string, unknown>).pr_url as string | undefined; |
There was a problem hiding this comment.
🔵 style: Repeated (gc as Record<string, unknown>).fieldName as Type casts across the switch cases are noisy. Consider extracting a typed interface for gate config metadata (e.g. { repo?: string; pr?: number; pr_url?: string }) or a helper function to reduce the casting boilerplate. [fixable]
- Fix race condition: capture goalId/pinnedClientId before async boundary and compare against current state in callbacks. Handles stop()+start() with different goal, not just stop() alone. - Add spawn depth guard (MAX_SPAWN_DEPTH=50) to prevent runaway recursive tick dispatch for goals with many spawn tasks. - Fix fallback conflict: when spawn returns null, only claim pinned session if activeTaskId is null; otherwise mark task as blocked. - Add human_approval case to /api/signals/resolve switch statement. - Extract typed destructuring for gate config in resolve endpoint. - Add tests: goal-restart race, pinned-session-busy fallback, gate matching logic for all signal types, endpoint resolve flow. All 63 tests pass. Addresses all findings from Centaur reviews. Co-Authored-By: Claude Opus 4.6 <[email protected]>
|
@centaur-review Please re-review — all findings from the previous two reviews have been addressed:
|
Summary
Enable the TaskOrchestrator to spawn dedicated headless sessions for individual tasks, unlocking multi-agent goal coordination. This is Phase 1 of 4 in the multi-agent orchestration initiative.
What changed
task-orchestrator.ts: Implement
session_policy: 'spawn'path intick(). When a task hassessionPolicy: 'spawn', the orchestrator creates a dedicated headless session instead of routing to the pinned session. Spawned tasks don't setactiveTaskId, enabling parallel execution. Falls back to pinned session on failure.index.ts: Wire
spawnSessiondependency — creates worktrees, registers in EventStore, starts headless session withNullTransport.app.ts: Add
POST /api/signals/resolveendpoint. External agents (e.g. Centaur) can resolvewait_for_signaltasks by gate metadata (type+repo+PR) without knowing task IDs. Authenticated via internal token.api-schemas.ts: Add
SignalResolveBodyzod schema for the new endpoint.task-store.ts: Add
findActiveSignalTasks(gateType)query method for gate-metadata-based task lookup.Context: Multi-Agent Orchestration Roadmap
The
session_policycolumn andwait_for_signalstage type already existed in the schema but were unused. This PR activates them.Phase 1 (this PR): Multi-session
tick()+ signal resolve endpointPhase 2: Centaur signal bridge — Centaur POSTs
ReviewCompletedto/api/signals/resolvePhase 3: PR Shepherd creates Task Board goals instead of ad-hoc sessions
Phase 4: Scheduled PR discovery trigger
A PR lifecycle would look like:
Key files
server/task-orchestrator.ts— spawn path in tick()server/index.ts— spawnSession wiringserver/app.ts— POST /api/signals/resolveserver/task-store.ts— findActiveSignalTasks()Test plan
🤖 Generated with Claude Code