Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion packages/opencode/src/bus/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { InstanceState } from "@/effect"
import { makeRuntime } from "@/effect/run-service"

const log = Log.create({ service: "bus" })
const WILDCARD_CAPACITY = 1024

type BusProperties<D extends BusEvent.Definition<string, Schema.Top>> = Schema.Schema.Type<D["properties"]>

Expand Down Expand Up @@ -45,7 +46,10 @@ export const layer = Layer.effect(
Effect.gen(function* () {
const state = yield* InstanceState.make<State>(
Effect.fn("Bus.state")(function* (ctx) {
const wildcard = yield* PubSub.unbounded<Payload>()
// Wildcard subscribers (SSE streams, plugins) can be much slower than high-frequency
// publishers such as message/tool updates. Keep only a recent sliding window so a
// stalled subscriber does not retain an unbounded event backlog in memory.
const wildcard = yield* PubSub.sliding<Payload>(WILDCARD_CAPACITY)
const typed = new Map<string, PubSub.PubSub<Payload>>()

yield* Effect.addFinalizer(() =>
Expand Down
47 changes: 31 additions & 16 deletions packages/opencode/src/server/routes/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,39 @@ import { Config } from "../../config"
import { errors } from "../error"

const log = Log.create({ service: "server" })
const SSE_QUEUE_MAX_SIZE = 256

export const GlobalDisposedEvent = BusEvent.define("global.disposed", Schema.Struct({}))

async function streamEvents(c: Context, subscribe: (q: AsyncQueue<string | null>) => () => void) {
async function streamEvents(
c: Context,
subscribe: (push: (data: string) => void, stop: () => void) => () => void,
) {
return streamSSE(c, async (stream) => {
const q = new AsyncQueue<string | null>()
const q = new AsyncQueue<string | null>(SSE_QUEUE_MAX_SIZE)
let done = false
let unsub = () => {}

q.push(
const stop = () => {
if (done) return
done = true
clearInterval(heartbeat)
unsub()
q.push(null)
log.info("global event disconnected")
}

const push = (data: string) => {
if (done) return
if (q.size >= SSE_QUEUE_MAX_SIZE) {
log.warn("global event queue overflow")
stop()
return
}
q.push(data)
}

push(
JSON.stringify({
payload: {
type: "server.connected",
Expand All @@ -36,7 +60,7 @@ async function streamEvents(c: Context, subscribe: (q: AsyncQueue<string | null>

// Send heartbeat every 10s to prevent stalled proxy streams.
const heartbeat = setInterval(() => {
q.push(
push(
JSON.stringify({
payload: {
type: "server.heartbeat",
Expand All @@ -46,16 +70,7 @@ async function streamEvents(c: Context, subscribe: (q: AsyncQueue<string | null>
)
}, 10_000)

const stop = () => {
if (done) return
done = true
clearInterval(heartbeat)
unsub()
q.push(null)
log.info("global event disconnected")
}

const unsub = subscribe(q)
unsub = subscribe(push, stop)

stream.onAbort(stop)

Expand Down Expand Up @@ -127,9 +142,9 @@ export const GlobalRoutes = lazy(() =>
c.header("X-Accel-Buffering", "no")
c.header("X-Content-Type-Options", "nosniff")

return streamEvents(c, (q) => {
return streamEvents(c, (push) => {
async function handler(event: any) {
q.push(JSON.stringify(event))
push(JSON.stringify(event))
}
GlobalBus.on("event", handler)
return () => GlobalBus.off("event", handler)
Expand Down
40 changes: 26 additions & 14 deletions packages/opencode/src/server/routes/instance/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { Bus } from "@/bus"
import { AsyncQueue } from "@/util/queue"

const log = Log.create({ service: "server" })
const SSE_QUEUE_MAX_SIZE = 256

export const EventRoutes = () =>
new Hono().get(
Expand Down Expand Up @@ -37,10 +38,30 @@ export const EventRoutes = () =>
c.header("X-Accel-Buffering", "no")
c.header("X-Content-Type-Options", "nosniff")
return streamSSE(c, async (stream) => {
const q = new AsyncQueue<string | null>()
const q = new AsyncQueue<string | null>(SSE_QUEUE_MAX_SIZE)
let done = false
let unsub = () => {}

q.push(
const stop = () => {
if (done) return
done = true
clearInterval(heartbeat)
unsub()
q.push(null)
log.info("event disconnected")
}

const push = (data: string) => {
if (done) return
if (q.size >= SSE_QUEUE_MAX_SIZE) {
log.warn("event queue overflow")
stop()
return
}
q.push(data)
}

push(
JSON.stringify({
type: "server.connected",
properties: {},
Expand All @@ -49,25 +70,16 @@ export const EventRoutes = () =>

// Send heartbeat every 10s to prevent stalled proxy streams.
const heartbeat = setInterval(() => {
q.push(
push(
JSON.stringify({
type: "server.heartbeat",
properties: {},
}),
)
}, 10_000)

const stop = () => {
if (done) return
done = true
clearInterval(heartbeat)
unsub()
q.push(null)
log.info("event disconnected")
}

const unsub = Bus.subscribeAll((event) => {
q.push(JSON.stringify(event))
unsub = Bus.subscribeAll((event) => {
push(JSON.stringify(event))
if (event.type === Bus.InstanceDisposed.type) {
stop()
}
Expand Down
123 changes: 106 additions & 17 deletions packages/opencode/src/session/prompt.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import path from "path"
import os from "os"
import { createWriteStream } from "node:fs"
import z from "zod"
import * as EffectZod from "@/util/effect-zod"
import { SessionID, MessageID, PartID } from "./schema"
Expand Down Expand Up @@ -66,9 +67,25 @@ IMPORTANT:

const STRUCTURED_OUTPUT_SYSTEM_PROMPT = `IMPORTANT: The user has requested structured output. You MUST use the StructuredOutput tool to provide your final response. Do NOT respond with plain text - you MUST call the StructuredOutput tool with your answer formatted according to the schema.`

const SHELL_STREAM_PREVIEW_MAX_CHARS = 30_000
const SHELL_STREAM_UPDATE_INTERVAL_MS = 50
const TOOL_METADATA_UPDATE_INTERVAL_MS = 100
const log = Log.create({ service: "session.prompt" })
const elog = EffectLogger.create({ service: "session.prompt" })

function shellStreamPreview(text: string) {
if (text.length <= SHELL_STREAM_PREVIEW_MAX_CHARS) return text
return "...\n\n" + text.slice(-SHELL_STREAM_PREVIEW_MAX_CHARS)
}

function toolMetadataFingerprint(input: { title?: string; metadata?: Record<string, any> }) {
try {
return JSON.stringify(input)
} catch {
return undefined
}
}

export interface Interface {
readonly cancel: (sessionID: SessionID) => Effect.Effect<void>
readonly prompt: (input: PromptInput) => Effect.Effect<MessageV2.WithParts>
Expand Down Expand Up @@ -367,6 +384,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
const tools: Record<string, AITool> = {}
const run = yield* runner()
const promptOps = yield* ops()
const toolMetadata = new Map<string, { at: number; key?: string }>()

const context = (args: any, options: ToolExecutionOptions): Tool.Context => ({
sessionID: input.session.id,
Expand All @@ -376,20 +394,28 @@ NOTE: At any point in time through this workflow you should feel free to ask the
extra: { model: input.model, bypassAgentCheck: input.bypassAgentCheck, promptOps },
agent: input.agent.name,
messages: input.messages,
metadata: (val) =>
input.processor.updateToolCall(options.toolCallId, (match) => {
metadata: (val) => {
const current = Date.now()
const previous = toolMetadata.get(options.toolCallId)
if (previous && current - previous.at < TOOL_METADATA_UPDATE_INTERVAL_MS) return Effect.void
const key = toolMetadataFingerprint(val)
if (key !== undefined && previous?.key === key) return Effect.void
toolMetadata.set(options.toolCallId, { at: current, key })
return input.processor.updateToolCall(options.toolCallId, (match) => {
if (!["running", "pending"].includes(match.state.status)) return match
const running = match.state.status === "running" ? match.state : undefined
return {
...match,
state: {
title: val.title,
metadata: val.metadata,
title: val.title ?? running?.title,
metadata: val.metadata ?? running?.metadata,
status: "running",
input: args,
time: { start: Date.now() },
time: { start: running?.time.start ?? current },
},
}
}),
})
},
ask: (req) =>
permission
.ask({
Expand Down Expand Up @@ -438,7 +464,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
yield* input.processor.completeToolCall(options.toolCallId, output)
}
return output
}),
}).pipe(Effect.ensuring(Effect.sync(() => toolMetadata.delete(options.toolCallId)))),
)
},
})
Expand Down Expand Up @@ -517,7 +543,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
yield* input.processor.completeToolCall(opts.toolCallId, output)
}
return output
}),
}).pipe(Effect.ensuring(Effect.sync(() => toolMetadata.delete(opts.toolCallId)))),
)
tools[key] = item
}
Expand Down Expand Up @@ -840,26 +866,83 @@ NOTE: At any point in time through this workflow you should feel free to ask the
forceKillAfter: "3 seconds",
})

let output = ""
let bufferedOutput = ""
let aborted = false
let outputPath: string | undefined
let outputSink: ReturnType<typeof createWriteStream> | undefined
let lastStreamUpdate = 0

const keep = limits.maxBytes * 2
const chunks: { text: string; size: number }[] = []
let used = 0

const appendOutput = Effect.fn("SessionPrompt.shellAppendOutput")(function* (chunk: string) {
const size = Buffer.byteLength(chunk, "utf-8")
chunks.push({ text: chunk, size })
used += size
while (used > keep && chunks.length > 1) {
const item = chunks.shift()
if (!item) break
used -= item.size
}

if (outputPath) {
yield* Effect.sync(() => outputSink?.write(chunk))
return
}

bufferedOutput += chunk
if (Buffer.byteLength(bufferedOutput, "utf-8") <= limits.maxBytes) return

outputPath = yield* truncate.write(bufferedOutput)
outputSink = createWriteStream(outputPath, { flags: "a" })
bufferedOutput = ""
})

const toolOutput = () => chunks.map((item) => item.text).join("")

const finish = Effect.uninterruptible(
Effect.gen(function* () {
if (aborted) {
output += "\n\n" + ["<metadata>", "User aborted the command", "</metadata>"].join("\n")
yield* appendOutput("\n\n" + ["<metadata>", "User aborted the command", "</metadata>"].join("\n"))
}
if (!msg.time.completed) {
msg.time.completed = Date.now()
yield* sessions.updateMessage(msg)
}
const truncated = outputPath
? {
content: [`...output truncated...`, "", `Full output saved to: ${outputPath}`, "", toolOutput() || "(no output)"].join(
"\n",
),
truncated: true as const,
outputPath,
}
: yield* truncate.output(bufferedOutput, {}, agent)
outputPath = truncated.truncated ? truncated.outputPath : undefined
if (outputSink) {
const stream = outputSink
yield* Effect.promise(
() =>
new Promise<void>((resolve) => {
stream.end(() => resolve())
stream.on("error", () => resolve())
}),
)
}
if (part.state.status === "running") {
part.state = {
status: "completed",
time: { ...part.state.time, end: Date.now() },
input: part.state.input,
title: "",
metadata: { output, description: "" },
output,
metadata: {
output: shellStreamPreview(toolOutput()),
description: "",
truncated: truncated.truncated,
...(outputPath && { outputPath }),
},
output: truncated.content,
}
yield* sessions.updatePart(part)
}
Expand All @@ -869,12 +952,18 @@ NOTE: At any point in time through this workflow you should feel free to ask the
const exit = yield* Effect.gen(function* () {
const handle = yield* spawner.spawn(cmd)
yield* Stream.runForEach(Stream.decodeText(handle.all), (chunk) =>
Effect.sync(() => {
output += chunk
if (part.state.status === "running") {
part.state.metadata = { output, description: "" }
void run.fork(sessions.updatePart(part))
Effect.gen(function* () {
yield* appendOutput(chunk)
if (part.state.status !== "running") return
if (Date.now() - lastStreamUpdate < SHELL_STREAM_UPDATE_INTERVAL_MS) return
lastStreamUpdate = Date.now()
part.state.metadata = {
output: shellStreamPreview(toolOutput()),
description: "",
truncated: !!outputPath,
...(outputPath && { outputPath }),
}
void run.fork(sessions.updatePart(part))
}),
)
yield* handle.exitCode
Expand Down
Loading
Loading