diff --git a/packages/opencode/src/bus/index.ts b/packages/opencode/src/bus/index.ts index 12251f26c7d1..dd26f2c5c3e5 100644 --- a/packages/opencode/src/bus/index.ts +++ b/packages/opencode/src/bus/index.ts @@ -7,6 +7,7 @@ import { InstanceState } from "@/effect" import { makeRuntime } from "@/effect/run-service" const log = Log.create({ service: "bus" }) +const WILDCARD_CAPACITY = 1024 type BusProperties> = Schema.Schema.Type @@ -45,7 +46,10 @@ export const layer = Layer.effect( Effect.gen(function* () { const state = yield* InstanceState.make( Effect.fn("Bus.state")(function* (ctx) { - const wildcard = yield* PubSub.unbounded() + // Wildcard subscribers (SSE streams, plugins) can be much slower than high-frequency + // publishers such as message/tool updates. Keep only a recent sliding window so a + // stalled subscriber does not retain an unbounded event backlog in memory. + const wildcard = yield* PubSub.sliding(WILDCARD_CAPACITY) const typed = new Map>() yield* Effect.addFinalizer(() => diff --git a/packages/opencode/src/server/routes/global.ts b/packages/opencode/src/server/routes/global.ts index c2f8b695d2da..5177d6b76086 100644 --- a/packages/opencode/src/server/routes/global.ts +++ b/packages/opencode/src/server/routes/global.ts @@ -17,15 +17,39 @@ import { Config } from "../../config" import { errors } from "../error" const log = Log.create({ service: "server" }) +const SSE_QUEUE_MAX_SIZE = 256 export const GlobalDisposedEvent = BusEvent.define("global.disposed", Schema.Struct({})) -async function streamEvents(c: Context, subscribe: (q: AsyncQueue) => () => void) { +async function streamEvents( + c: Context, + subscribe: (push: (data: string) => void, stop: () => void) => () => void, +) { return streamSSE(c, async (stream) => { - const q = new AsyncQueue() + const q = new AsyncQueue(SSE_QUEUE_MAX_SIZE) let done = false + let unsub = () => {} - q.push( + const stop = () => { + if (done) return + done = true + clearInterval(heartbeat) + unsub() + q.push(null) + log.info("global event disconnected") + } + + const push = (data: string) => { + if (done) return + if (q.size >= SSE_QUEUE_MAX_SIZE) { + log.warn("global event queue overflow") + stop() + return + } + q.push(data) + } + + push( JSON.stringify({ payload: { type: "server.connected", @@ -36,7 +60,7 @@ async function streamEvents(c: Context, subscribe: (q: AsyncQueue // Send heartbeat every 10s to prevent stalled proxy streams. const heartbeat = setInterval(() => { - q.push( + push( JSON.stringify({ payload: { type: "server.heartbeat", @@ -46,16 +70,7 @@ async function streamEvents(c: Context, subscribe: (q: AsyncQueue ) }, 10_000) - const stop = () => { - if (done) return - done = true - clearInterval(heartbeat) - unsub() - q.push(null) - log.info("global event disconnected") - } - - const unsub = subscribe(q) + unsub = subscribe(push, stop) stream.onAbort(stop) @@ -127,9 +142,9 @@ export const GlobalRoutes = lazy(() => c.header("X-Accel-Buffering", "no") c.header("X-Content-Type-Options", "nosniff") - return streamEvents(c, (q) => { + return streamEvents(c, (push) => { async function handler(event: any) { - q.push(JSON.stringify(event)) + push(JSON.stringify(event)) } GlobalBus.on("event", handler) return () => GlobalBus.off("event", handler) diff --git a/packages/opencode/src/server/routes/instance/event.ts b/packages/opencode/src/server/routes/instance/event.ts index 1d883bd88314..ca6b60d0f230 100644 --- a/packages/opencode/src/server/routes/instance/event.ts +++ b/packages/opencode/src/server/routes/instance/event.ts @@ -8,6 +8,7 @@ import { Bus } from "@/bus" import { AsyncQueue } from "@/util/queue" const log = Log.create({ service: "server" }) +const SSE_QUEUE_MAX_SIZE = 256 export const EventRoutes = () => new Hono().get( @@ -37,10 +38,30 @@ export const EventRoutes = () => c.header("X-Accel-Buffering", "no") c.header("X-Content-Type-Options", "nosniff") return streamSSE(c, async (stream) => { - const q = new AsyncQueue() + const q = new AsyncQueue(SSE_QUEUE_MAX_SIZE) let done = false + let unsub = () => {} - q.push( + const stop = () => { + if (done) return + done = true + clearInterval(heartbeat) + unsub() + q.push(null) + log.info("event disconnected") + } + + const push = (data: string) => { + if (done) return + if (q.size >= SSE_QUEUE_MAX_SIZE) { + log.warn("event queue overflow") + stop() + return + } + q.push(data) + } + + push( JSON.stringify({ type: "server.connected", properties: {}, @@ -49,7 +70,7 @@ export const EventRoutes = () => // Send heartbeat every 10s to prevent stalled proxy streams. const heartbeat = setInterval(() => { - q.push( + push( JSON.stringify({ type: "server.heartbeat", properties: {}, @@ -57,17 +78,8 @@ export const EventRoutes = () => ) }, 10_000) - const stop = () => { - if (done) return - done = true - clearInterval(heartbeat) - unsub() - q.push(null) - log.info("event disconnected") - } - - const unsub = Bus.subscribeAll((event) => { - q.push(JSON.stringify(event)) + unsub = Bus.subscribeAll((event) => { + push(JSON.stringify(event)) if (event.type === Bus.InstanceDisposed.type) { stop() } diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index 600eb42f795e..b9014d3592b4 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -1,5 +1,6 @@ import path from "path" import os from "os" +import { createWriteStream } from "node:fs" import z from "zod" import * as EffectZod from "@/util/effect-zod" import { SessionID, MessageID, PartID } from "./schema" @@ -66,9 +67,25 @@ IMPORTANT: const STRUCTURED_OUTPUT_SYSTEM_PROMPT = `IMPORTANT: The user has requested structured output. You MUST use the StructuredOutput tool to provide your final response. Do NOT respond with plain text - you MUST call the StructuredOutput tool with your answer formatted according to the schema.` +const SHELL_STREAM_PREVIEW_MAX_CHARS = 30_000 +const SHELL_STREAM_UPDATE_INTERVAL_MS = 50 +const TOOL_METADATA_UPDATE_INTERVAL_MS = 100 const log = Log.create({ service: "session.prompt" }) const elog = EffectLogger.create({ service: "session.prompt" }) +function shellStreamPreview(text: string) { + if (text.length <= SHELL_STREAM_PREVIEW_MAX_CHARS) return text + return "...\n\n" + text.slice(-SHELL_STREAM_PREVIEW_MAX_CHARS) +} + +function toolMetadataFingerprint(input: { title?: string; metadata?: Record }) { + try { + return JSON.stringify(input) + } catch { + return undefined + } +} + export interface Interface { readonly cancel: (sessionID: SessionID) => Effect.Effect readonly prompt: (input: PromptInput) => Effect.Effect @@ -367,6 +384,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the const tools: Record = {} const run = yield* runner() const promptOps = yield* ops() + const toolMetadata = new Map() const context = (args: any, options: ToolExecutionOptions): Tool.Context => ({ sessionID: input.session.id, @@ -376,20 +394,28 @@ NOTE: At any point in time through this workflow you should feel free to ask the extra: { model: input.model, bypassAgentCheck: input.bypassAgentCheck, promptOps }, agent: input.agent.name, messages: input.messages, - metadata: (val) => - input.processor.updateToolCall(options.toolCallId, (match) => { + metadata: (val) => { + const current = Date.now() + const previous = toolMetadata.get(options.toolCallId) + if (previous && current - previous.at < TOOL_METADATA_UPDATE_INTERVAL_MS) return Effect.void + const key = toolMetadataFingerprint(val) + if (key !== undefined && previous?.key === key) return Effect.void + toolMetadata.set(options.toolCallId, { at: current, key }) + return input.processor.updateToolCall(options.toolCallId, (match) => { if (!["running", "pending"].includes(match.state.status)) return match + const running = match.state.status === "running" ? match.state : undefined return { ...match, state: { - title: val.title, - metadata: val.metadata, + title: val.title ?? running?.title, + metadata: val.metadata ?? running?.metadata, status: "running", input: args, - time: { start: Date.now() }, + time: { start: running?.time.start ?? current }, }, } - }), + }) + }, ask: (req) => permission .ask({ @@ -438,7 +464,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the yield* input.processor.completeToolCall(options.toolCallId, output) } return output - }), + }).pipe(Effect.ensuring(Effect.sync(() => toolMetadata.delete(options.toolCallId)))), ) }, }) @@ -517,7 +543,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the yield* input.processor.completeToolCall(opts.toolCallId, output) } return output - }), + }).pipe(Effect.ensuring(Effect.sync(() => toolMetadata.delete(opts.toolCallId)))), ) tools[key] = item } @@ -840,26 +866,83 @@ NOTE: At any point in time through this workflow you should feel free to ask the forceKillAfter: "3 seconds", }) - let output = "" + let bufferedOutput = "" let aborted = false + let outputPath: string | undefined + let outputSink: ReturnType | undefined + let lastStreamUpdate = 0 + + const keep = limits.maxBytes * 2 + const chunks: { text: string; size: number }[] = [] + let used = 0 + + const appendOutput = Effect.fn("SessionPrompt.shellAppendOutput")(function* (chunk: string) { + const size = Buffer.byteLength(chunk, "utf-8") + chunks.push({ text: chunk, size }) + used += size + while (used > keep && chunks.length > 1) { + const item = chunks.shift() + if (!item) break + used -= item.size + } + + if (outputPath) { + yield* Effect.sync(() => outputSink?.write(chunk)) + return + } + + bufferedOutput += chunk + if (Buffer.byteLength(bufferedOutput, "utf-8") <= limits.maxBytes) return + + outputPath = yield* truncate.write(bufferedOutput) + outputSink = createWriteStream(outputPath, { flags: "a" }) + bufferedOutput = "" + }) + + const toolOutput = () => chunks.map((item) => item.text).join("") const finish = Effect.uninterruptible( Effect.gen(function* () { if (aborted) { - output += "\n\n" + ["", "User aborted the command", ""].join("\n") + yield* appendOutput("\n\n" + ["", "User aborted the command", ""].join("\n")) } if (!msg.time.completed) { msg.time.completed = Date.now() yield* sessions.updateMessage(msg) } + const truncated = outputPath + ? { + content: [`...output truncated...`, "", `Full output saved to: ${outputPath}`, "", toolOutput() || "(no output)"].join( + "\n", + ), + truncated: true as const, + outputPath, + } + : yield* truncate.output(bufferedOutput, {}, agent) + outputPath = truncated.truncated ? truncated.outputPath : undefined + if (outputSink) { + const stream = outputSink + yield* Effect.promise( + () => + new Promise((resolve) => { + stream.end(() => resolve()) + stream.on("error", () => resolve()) + }), + ) + } if (part.state.status === "running") { part.state = { status: "completed", time: { ...part.state.time, end: Date.now() }, input: part.state.input, title: "", - metadata: { output, description: "" }, - output, + metadata: { + output: shellStreamPreview(toolOutput()), + description: "", + truncated: truncated.truncated, + ...(outputPath && { outputPath }), + }, + output: truncated.content, } yield* sessions.updatePart(part) } @@ -869,12 +952,18 @@ NOTE: At any point in time through this workflow you should feel free to ask the const exit = yield* Effect.gen(function* () { const handle = yield* spawner.spawn(cmd) yield* Stream.runForEach(Stream.decodeText(handle.all), (chunk) => - Effect.sync(() => { - output += chunk - if (part.state.status === "running") { - part.state.metadata = { output, description: "" } - void run.fork(sessions.updatePart(part)) + Effect.gen(function* () { + yield* appendOutput(chunk) + if (part.state.status !== "running") return + if (Date.now() - lastStreamUpdate < SHELL_STREAM_UPDATE_INTERVAL_MS) return + lastStreamUpdate = Date.now() + part.state.metadata = { + output: shellStreamPreview(toolOutput()), + description: "", + truncated: !!outputPath, + ...(outputPath && { outputPath }), } + void run.fork(sessions.updatePart(part)) }), ) yield* handle.exitCode diff --git a/packages/opencode/src/util/queue.ts b/packages/opencode/src/util/queue.ts index a1af53fe8f09..83a21097282a 100644 --- a/packages/opencode/src/util/queue.ts +++ b/packages/opencode/src/util/queue.ts @@ -1,11 +1,21 @@ export class AsyncQueue implements AsyncIterable { private queue: T[] = [] private resolvers: ((value: T) => void)[] = [] + private maxSize: number + + constructor(maxSize = Number.POSITIVE_INFINITY) { + this.maxSize = maxSize + } push(item: T) { const resolve = this.resolvers.shift() if (resolve) resolve(item) - else this.queue.push(item) + else { + this.queue.push(item) + while (this.queue.length > this.maxSize) { + this.queue.shift() + } + } } async next(): Promise { @@ -13,6 +23,10 @@ export class AsyncQueue implements AsyncIterable { return new Promise((resolve) => this.resolvers.push(resolve)) } + get size() { + return this.queue.length + } + async *[Symbol.asyncIterator]() { while (true) yield await this.next() }