From edae14d0eb6655d45bef212e7d4b48439b5188ad Mon Sep 17 00:00:00 2001 From: LJ Li Date: Sat, 2 May 2026 23:10:27 +0800 Subject: [PATCH 1/2] fix(acp): drain message events before returning end_turn MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `Agent.prompt()` returns `stopReason: "end_turn"` as soon as `sdk.session.prompt()` resolves, but `message.part.delta` events for the final assistant message text are still queued in the SDK event stream at that moment. They get processed by `runEventSubscription` and forwarded to ACP as `agent_message_chunk` frames AFTER the RPC reply has already been sent — a protocol violation visible to ACP clients as text appearing post-end_turn. Cause: two independent async paths share the same ACP wire. - Path A — event subscription (`runEventSubscription`): consumes `sdk.global.event()` and forwards `message.part.delta` as `agent_message_chunk` via `connection.sessionUpdate(...)`. - Path B — prompt RPC: `await sdk.session.prompt(...)` resolves when the LLM finishes, then immediately returns `end_turn`. Path B can return before Path A drains the trailing deltas. Order on the wire is then: ... earlier chunks ... → end_turn reply → trailing chunk. Fix: in `prompt()`, after `sdk.session.prompt()` resolves, await the `message.updated` event for the response message id (i.e. `info.time.completed` set). Because `runEventSubscription` processes events sequentially via `for await` and awaits each `handleEvent`, the `message.updated` (completed) event for a message is necessarily processed AFTER all prior `message.part.delta` events for the same message — so waiting on it guarantees every chunk has already been forwarded. A 5s timeout fallback prevents deadlock if the upstream completion event is never observed. Repro: Send a streaming prompt via ACP. Inspect the wire (DevTools → WS Messages). Observe that the final `agent_message_chunk` (the agent's last text delta) arrives 5–50ms AFTER the RPC reply with `stopReason: end_turn` and matching `id`. Affects every ACP client that gates UI / input on end_turn (e.g. disables streaming indicator, re-enables send button) — they snap to "done" prematurely while text is still being appended. --- packages/opencode/src/acp/agent.ts | 53 ++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index af16cba114fe..11a9b92e51f3 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -147,6 +147,8 @@ export class Agent implements ACPAgent { private bashSnapshots = new Map() private toolStarts = new Set() private permissionQueues = new Map>() + private messageCompletionResolvers = new Map void>() + private completedAssistantMessageIds = new Set() private permissionOptions: PermissionOption[] = [ { optionId: "once", kind: "allow_once", name: "Allow once" }, { optionId: "always", kind: "allow_always", name: "Always allow" }, @@ -270,6 +272,19 @@ export class Agent implements ACPAgent { return } + case "message.updated": { + const info = event.properties.info + if (info.role === "assistant" && info.time.completed !== undefined) { + this.completedAssistantMessageIds.add(info.id) + const resolver = this.messageCompletionResolvers.get(info.id) + if (resolver) { + this.messageCompletionResolvers.delete(info.id) + resolver() + } + } + return + } + case "message.part.updated": { log.info("message part updated", { event: event.properties }) const props = event.properties @@ -531,6 +546,34 @@ export class Agent implements ACPAgent { } } + // Block until `message.updated` for `messageId` (with `time.completed` + // set) has been observed by the event subscription. Because + // `runEventSubscription` processes events sequentially via `for await` + // and awaits each `handleEvent` (which awaits the inner + // `connection.sessionUpdate(...)`), waiting for the completed event + // guarantees every prior `message.part.delta` chunk for this turn has + // already been forwarded to ACP. Without this, `prompt()` returns + // `stopReason: "end_turn"` while trailing chunk events are still queued + // in the SDK event stream, putting `agent_message_chunk` frames on the + // wire AFTER the RPC reply (a protocol violation visible to ACP clients + // as text appearing post-end_turn). + private waitForMessageCompletion(messageId: string, timeoutMs: number): Promise { + if (this.completedAssistantMessageIds.has(messageId)) { + return Promise.resolve() + } + return new Promise((resolve) => { + let settled = false + const finish = () => { + if (settled) return + settled = true + this.messageCompletionResolvers.delete(messageId) + resolve() + } + this.messageCompletionResolvers.set(messageId, finish) + setTimeout(finish, timeoutMs) + }) + } + async initialize(params: InitializeRequest): Promise { log.info("initialize", { protocolVersion: params.protocolVersion }) @@ -1481,6 +1524,12 @@ export class Agent implements ACPAgent { }) const msg = response.data?.info + // Drain trailing message.part.delta events before returning end_turn — + // see `waitForMessageCompletion` for why. + if (msg?.id) { + await this.waitForMessageCompletion(msg.id, 5000) + } + await sendUsageUpdate(this.connection, this.sdk, sessionID, directory) return { @@ -1504,6 +1553,10 @@ export class Agent implements ACPAgent { }) const msg = response.data?.info + if (msg?.id) { + await this.waitForMessageCompletion(msg.id, 5000) + } + await sendUsageUpdate(this.connection, this.sdk, sessionID, directory) return { From 27e44f9228ed5a761491bb633242a45d43d50172 Mon Sep 17 00:00:00 2001 From: LJ Li Date: Tue, 5 May 2026 15:42:57 +0800 Subject: [PATCH 2/2] =?UTF-8?q?fix(acp):=20tighten=20end=5Fturn=20drain=20?= =?UTF-8?q?=E2=80=94=20bound=20id=20set,=20log=20timeout,=20add=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three follow-up nits on top of the message-event drain fix: 1. `completedAssistantMessageIds` was add-only — long-lived agent processes would accumulate one entry per turn forever. Consume the entry on `waitForMessageCompletion` cache hit so the set bounds itself naturally. 2. The 5s timeout fallback was silent. If the SDK ever stops emitting `message.updated` (regression, server bug), `prompt()` would return `end_turn` after a 5s stall with zero log signal. Now logs a warning so the regression is visible in the field rather than a silent UX glitch. Also clears the timer on event-driven completion so it doesn't outlive the resolved promise. 3. Adds a regression test using the existing `createFakeAgent` harness that drives `sdk.session.prompt` deterministically and asserts `prompt()` does not return until `message.updated` (with `info.time.completed`) has been observed by the event subscription. Test plan: - `bun test test/acp/` — 12 pass / 0 fail (was 11) - `bun run typecheck` — clean --- packages/opencode/src/acp/agent.ts | 20 +++- .../test/acp/event-subscription.test.ts | 96 +++++++++++++++++++ 2 files changed, 113 insertions(+), 3 deletions(-) diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index 11a9b92e51f3..4d0b49626884 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -558,19 +558,33 @@ export class Agent implements ACPAgent { // wire AFTER the RPC reply (a protocol violation visible to ACP clients // as text appearing post-end_turn). private waitForMessageCompletion(messageId: string, timeoutMs: number): Promise { + // Cache hit means the completion event landed before we got here. Consume + // the entry so the set doesn't grow unbounded over the agent's lifetime. if (this.completedAssistantMessageIds.has(messageId)) { + this.completedAssistantMessageIds.delete(messageId) return Promise.resolve() } return new Promise((resolve) => { let settled = false - const finish = () => { + let timer: ReturnType | null = null + const finish = (timedOut: boolean) => { if (settled) return settled = true + if (timer) clearTimeout(timer) this.messageCompletionResolvers.delete(messageId) + if (timedOut) { + // Defensive: returning end_turn without observing completion means a + // chunk could still land post-reply. Log so the regression is visible + // in the field rather than a silent UX glitch. + log.warn("waitForMessageCompletion: timeout waiting for message.updated", { + messageId, + timeoutMs, + }) + } resolve() } - this.messageCompletionResolvers.set(messageId, finish) - setTimeout(finish, timeoutMs) + this.messageCompletionResolvers.set(messageId, () => finish(false)) + timer = setTimeout(() => finish(true), timeoutMs) }) } diff --git a/packages/opencode/test/acp/event-subscription.test.ts b/packages/opencode/test/acp/event-subscription.test.ts index bce5e94598cf..0144f216fd80 100644 --- a/packages/opencode/test/acp/event-subscription.test.ts +++ b/packages/opencode/test/acp/event-subscription.test.ts @@ -722,4 +722,100 @@ describe("acp.agent event subscription", () => { }, }) }) + + test("prompt() awaits message.updated for response messageID before returning end_turn", async () => { + await using tmp = await tmpdir() + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const { agent, controller, sdk, stop } = createFakeAgent() + const cwd = "/tmp/opencode-acp-test" + + const sessionId = await agent.newSession({ cwd, mcpServers: [] } as any).then((x) => x.sessionId) + const messageID = "msg_completion_test_1" + + // Mock sdk.session.prompt: resolve only when we tell it to. The + // returned `info` carries the messageID our fix awaits via + // `waitForMessageCompletion`. + let resolveSdkPrompt: (() => void) | null = null + const sdkPromptPromise = new Promise((resolve) => { + resolveSdkPrompt = () => + resolve({ + data: { + info: { + id: messageID, + sessionID: sessionId, + role: "assistant", + time: { created: 1, completed: 2 }, + tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, + cost: 0, + parentID: "u1", + modelID: "big-pickle", + providerID: "opencode", + mode: "build", + agent: "build", + path: { cwd, root: cwd }, + }, + }, + }) + }) + sdk.session.prompt = async () => sdkPromptPromise + + let promptDone = false + const promptPromise = (agent as any) + .prompt({ + sessionId, + prompt: [{ type: "text", text: "test" }], + }) + .then((r: any) => { + promptDone = true + return r + }) + .catch((err: unknown) => { + promptDone = true + throw err + }) + + // Let agent.prompt() reach the `await sdk.session.prompt(...)` boundary, + // then resolve it. After resolution, prompt() should be blocked inside + // waitForMessageCompletion(messageID, ...) — the regression check. + await new Promise((r) => setTimeout(r, 20)) + resolveSdkPrompt!() + await new Promise((r) => setTimeout(r, 60)) + expect(promptDone).toBe(false) + + // Push the assistant message-completed event. The new + // `case "message.updated"` handler should resolve the waiter. + controller.push({ + directory: cwd, + payload: { + type: "message.updated", + properties: { + sessionID: sessionId, + info: { + id: messageID, + sessionID: sessionId, + role: "assistant", + time: { created: 1, completed: 2 }, + parentID: "u1", + modelID: "big-pickle", + providerID: "opencode", + mode: "build", + agent: "build", + path: { cwd, root: cwd }, + cost: 0, + tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } }, + }, + }, + } as any, + }) + + const result = await promptPromise + expect(promptDone).toBe(true) + expect(result.stopReason).toBe("end_turn") + + stop() + }, + }) + }) })