Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions packages/opencode/src/cli/cmd/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { BashTool } from "../../tool/bash"
import { TodoWriteTool } from "../../tool/todo"
import { Locale } from "../../util"
import { AppRuntime } from "@/effect/app-runtime"
import { Plugin } from "../../plugin"

type ToolProps<T> = {
input: Tool.InferParameters<T>
Expand Down Expand Up @@ -534,6 +535,20 @@ export const RunCommand = cmd({
event.properties.sessionID === sessionID &&
event.properties.status.type === "idle"
) {
// Wait for plugins to finish processing the session.idle event
if (!args.attach) {
await AppRuntime.runPromise(Plugin.Service.use((svc) => svc.waitForPendingEvents())).catch((e) => {
console.error("Failed to wait for pending plugin events:", e)
})
// If a plugin reprompted the session during handling, new plugin event handlers
// were queued (and the SSE stream has buffered new events including another
// session.idle). Continue the loop to consume those buffered events.
// Only break when there are truly no more pending plugin events.
const hasPending = await AppRuntime.runPromise(Plugin.Service.use((svc) => svc.hasPendingEvents())).catch(
() => false,
)
if (hasPending) continue
}
break
}

Expand Down Expand Up @@ -631,7 +646,7 @@ export const RunCommand = cmd({
}
await share(sdk, sessionID)

loop().catch((e) => {
const loopPromise = loop().catch((e) => {
console.error(e)
process.exit(1)
})
Expand All @@ -655,6 +670,8 @@ export const RunCommand = cmd({
parts: [...files, { type: "text", text: message }],
})
}

await loopPromise
}

if (args.attach) {
Expand All @@ -674,7 +691,17 @@ export const RunCommand = cmd({
const request = new Request(input, init)
return Server.Default().app.fetch(request)
}) as typeof globalThis.fetch
const sdk = createOpencodeClient({ baseUrl: "http://opencode.internal", fetch: fetchFn })
const inProcessPassword = Flag.OPENCODE_SERVER_PASSWORD
const inProcessHeaders = inProcessPassword
? {
Authorization: `Basic ${Buffer.from(`${Flag.OPENCODE_SERVER_USERNAME ?? "opencode"}:${inProcessPassword}`).toString("base64")}`,
}
: undefined
const sdk = createOpencodeClient({
baseUrl: "http://opencode.internal",
fetch: fetchFn,
headers: inProcessHeaders,
})
await execute(sdk)
})
},
Expand Down
1 change: 1 addition & 0 deletions packages/opencode/src/flag/flag.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export const Flag = {
copy === undefined ? process.platform === "win32" : truthy("OPENCODE_EXPERIMENTAL_DISABLE_COPY_ON_SELECT"),
OPENCODE_ENABLE_EXA: truthy("OPENCODE_ENABLE_EXA") || OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_EXA"),
OPENCODE_EXPERIMENTAL_BASH_DEFAULT_TIMEOUT_MS: number("OPENCODE_EXPERIMENTAL_BASH_DEFAULT_TIMEOUT_MS"),
OPENCODE_EXPERIMENTAL_PLUGIN_EXIT_DEFAULT_TIMEOUT_MS: number("OPENCODE_EXPERIMENTAL_PLUGIN_EXIT_DEFAULT_TIMEOUT_MS"),
OPENCODE_EXPERIMENTAL_OUTPUT_TOKEN_MAX: number("OPENCODE_EXPERIMENTAL_OUTPUT_TOKEN_MAX"),
OPENCODE_EXPERIMENTAL_OXFMT: OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_OXFMT"),
OPENCODE_EXPERIMENTAL_LSP_TY: truthy("OPENCODE_EXPERIMENTAL_LSP_TY"),
Expand Down
44 changes: 41 additions & 3 deletions packages/opencode/src/plugin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const log = Log.create({ service: "plugin" })

type State = {
hooks: Hooks[]
pendingEvents: Set<Promise<void>>
}

// Hook names that follow the (input, output) => Promise<void> trigger pattern
Expand All @@ -49,6 +50,8 @@ export interface Interface {
) => Effect.Effect<Output>
readonly list: () => Effect.Effect<Hooks[]>
readonly init: () => Effect.Effect<void>
readonly waitForPendingEvents: (timeoutMs?: number) => Effect.Effect<void>
readonly hasPendingEvents: () => Effect.Effect<boolean>
}

export class Service extends Context.Service<Service, Interface>()("@opencode/Plugin") {}
Expand Down Expand Up @@ -111,6 +114,7 @@ export const layer = Layer.effect(
const state = yield* InstanceState.make<State>(
Effect.fn("Plugin.state")(function* (ctx) {
const hooks: Hooks[] = []
const pendingEvents = new Set<Promise<void>>()
const bridge = yield* EffectBridge.make()

function publishPluginError(message: string) {
Expand Down Expand Up @@ -245,14 +249,21 @@ export const layer = Layer.effect(
Stream.runForEach((input) =>
Effect.sync(() => {
for (const hook of hooks) {
void hook["event"]?.({ event: input as any })
const eventHandler = hook["event"]
if (!eventHandler) continue
const promise = Promise.resolve()
.then(() => eventHandler({ event: input as any }))
.finally(() => {
pendingEvents.delete(promise)
})
pendingEvents.add(promise)
}
}),
),
Effect.forkScoped,
)

return { hooks }
return { hooks, pendingEvents }
}),
)

Expand Down Expand Up @@ -280,7 +291,34 @@ export const layer = Layer.effect(
yield* InstanceState.get(state)
})

return Service.of({ trigger, list, init })
const waitForPendingEvents = Effect.fn("Plugin.waitForPendingEvents")(function* (timeoutMs?: number) {
const s = yield* InstanceState.get(state)
const timeout = timeoutMs ?? Flag.OPENCODE_EXPERIMENTAL_PLUGIN_EXIT_DEFAULT_TIMEOUT_MS ?? 60000

yield* Effect.tryPromise({
try: async () => {
// Wait a tick to let event handlers be added to pendingEvents
await Promise.resolve()
const pending = Array.from(s.pendingEvents)
if (pending.length === 0) return
await Promise.race([Promise.all(pending), new Promise<void>((resolve) => setTimeout(resolve, timeout))])
// Wait a tick to allow any new event handlers triggered during the wait
// (e.g. reprompts that caused new session.idle events) to be registered
// before the caller checks hasPendingEvents()
await Promise.resolve()
},
catch: (err) => {
log.error("failed to wait for pending plugin events", { error: err })
},
}).pipe(Effect.ignore)
})

const hasPendingEvents = Effect.fn("Plugin.hasPendingEvents")(function* () {
const s = yield* InstanceState.get(state)
return s.pendingEvents.size > 0
})

return Service.of({ trigger, list, init, waitForPendingEvents, hasPendingEvents })
}),
)

Expand Down
168 changes: 168 additions & 0 deletions packages/opencode/test/plugin/event-wait-simple.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import { afterAll, afterEach, describe, expect, test } from "bun:test"
import { Effect } from "effect"
import path from "path"
import { pathToFileURL } from "url"
import { tmpdir } from "../fixture/fixture"

const disableDefault = process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS
process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS = "1"

const { Plugin } = await import("../../src/plugin/index")
const { Instance } = await import("../../src/project/instance")
const { Bus } = await import("../../src/bus")
const { SessionStatus } = await import("../../src/session/status")
const { SessionID } = await import("../../src/session/schema")

afterEach(async () => {
await Instance.disposeAll()
})

afterAll(() => {
if (disableDefault === undefined) {
delete process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS
return
}
process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS = disableDefault
})

describe("plugin.waitForPendingEvents - regression test", () => {
test("does not exit before async event handlers complete", async () => {
await using tmp = await tmpdir({
init: async (dir) => {
const completionFile = path.join(dir, "completion.txt")
const file = path.join(dir, "plugin.ts")

await Bun.write(
file,
[
`const completionFile = ${JSON.stringify(completionFile)}`,
"export default async () => ({",
" event: async ({ event }) => {",
" if (event.type === 'session.idle') {",
" // Simulate a slow plugin operation",
" await Bun.sleep(200)",
" await Bun.write(completionFile, 'completed')",
" }",
" },",
"})",
"",
].join("\n"),
)

await Bun.write(
path.join(dir, "opencode.json"),
JSON.stringify(
{
$schema: "https://opencode.ai/config.json",
plugin: [pathToFileURL(file).href],
},
null,
2,
),
)

return completionFile
},
})

await Instance.provide({
directory: tmp.path,
fn: async () =>
Effect.gen(function* () {
const plugin = yield* Plugin.Service
const bus = yield* Bus.Service

// Initialize plugin
yield* plugin.init()

// Give plugin time to set up event subscriptions
yield* Effect.sleep("50 millis")

// Publish session.idle event
yield* bus.publish(SessionStatus.Event.Idle, {
sessionID: SessionID.make("test-session"),
})

// Wait for pending events (this is the functionality we're testing)
yield* plugin.waitForPendingEvents(1000)

// Check if the plugin completed
const fileExists = yield* Effect.tryPromise({
try: () => Bun.file(tmp.extra).exists(),
catch: () => false,
})

// The completion file should exist if waitForPendingEvents worked
expect(fileExists).toBe(true)

if (fileExists) {
const content = yield* Effect.promise(() => Bun.file(tmp.extra).text())
expect(content).toBe("completed")
}
}).pipe(Effect.provide(Plugin.defaultLayer), Effect.provide(Bus.layer), Effect.runPromise),
})
})

test("respects timeout and doesn't wait forever", async () => {
await using tmp = await tmpdir({
init: async (dir) => {
const file = path.join(dir, "plugin.ts")

await Bun.write(
file,
[
"export default async () => ({",
" event: async ({ event }) => {",
" if (event.type === 'session.idle') {",
" // This should timeout",
" await Bun.sleep(500)",
" }",
" },",
"})",
"",
].join("\n"),
)

await Bun.write(
path.join(dir, "opencode.json"),
JSON.stringify(
{
$schema: "https://opencode.ai/config.json",
plugin: [pathToFileURL(file).href],
},
null,
2,
),
)
},
})

await Instance.provide({
directory: tmp.path,
fn: async () =>
Effect.gen(function* () {
const plugin = yield* Plugin.Service
const bus = yield* Bus.Service

yield* plugin.init()

// Give plugin time to set up event subscriptions
yield* Effect.sleep("50 millis")

yield* bus.publish(SessionStatus.Event.Idle, {
sessionID: SessionID.make("test-session"),
})

const start = Date.now()
// Use a short timeout (100ms) - plugin sleeps for 500ms
yield* plugin.waitForPendingEvents(100)
const elapsed = Date.now() - start

// Should timeout around 100ms, not wait for the full 500ms
// Allow 150ms margin for timing variance
expect(elapsed).toBeGreaterThan(50)
expect(elapsed).toBeLessThan(250)
}).pipe(Effect.provide(Plugin.defaultLayer), Effect.provide(Bus.layer), Effect.runPromise),
})
})
})
Loading