diff --git a/packages/opencode/src/cli/cmd/run.ts b/packages/opencode/src/cli/cmd/run.ts index 0874beee16c8..fc345f792544 100644 --- a/packages/opencode/src/cli/cmd/run.ts +++ b/packages/opencode/src/cli/cmd/run.ts @@ -27,6 +27,7 @@ import { BashTool } from "../../tool/bash" import { TodoWriteTool } from "../../tool/todo" import { Locale } from "../../util" import { AppRuntime } from "@/effect/app-runtime" +import { Plugin } from "../../plugin" type ToolProps = { input: Tool.InferParameters @@ -534,6 +535,20 @@ export const RunCommand = cmd({ event.properties.sessionID === sessionID && event.properties.status.type === "idle" ) { + // Wait for plugins to finish processing the session.idle event + if (!args.attach) { + await AppRuntime.runPromise(Plugin.Service.use((svc) => svc.waitForPendingEvents())).catch((e) => { + console.error("Failed to wait for pending plugin events:", e) + }) + // If a plugin reprompted the session during handling, new plugin event handlers + // were queued (and the SSE stream has buffered new events including another + // session.idle). Continue the loop to consume those buffered events. + // Only break when there are truly no more pending plugin events. + const hasPending = await AppRuntime.runPromise(Plugin.Service.use((svc) => svc.hasPendingEvents())).catch( + () => false, + ) + if (hasPending) continue + } break } @@ -631,7 +646,7 @@ export const RunCommand = cmd({ } await share(sdk, sessionID) - loop().catch((e) => { + const loopPromise = loop().catch((e) => { console.error(e) process.exit(1) }) @@ -655,6 +670,8 @@ export const RunCommand = cmd({ parts: [...files, { type: "text", text: message }], }) } + + await loopPromise } if (args.attach) { @@ -674,7 +691,17 @@ export const RunCommand = cmd({ const request = new Request(input, init) return Server.Default().app.fetch(request) }) as typeof globalThis.fetch - const sdk = createOpencodeClient({ baseUrl: "http://opencode.internal", fetch: fetchFn }) + const inProcessPassword = Flag.OPENCODE_SERVER_PASSWORD + const inProcessHeaders = inProcessPassword + ? { + Authorization: `Basic ${Buffer.from(`${Flag.OPENCODE_SERVER_USERNAME ?? "opencode"}:${inProcessPassword}`).toString("base64")}`, + } + : undefined + const sdk = createOpencodeClient({ + baseUrl: "http://opencode.internal", + fetch: fetchFn, + headers: inProcessHeaders, + }) await execute(sdk) }) }, diff --git a/packages/opencode/src/flag/flag.ts b/packages/opencode/src/flag/flag.ts index 72c8931f5b71..f2be90b1b457 100644 --- a/packages/opencode/src/flag/flag.ts +++ b/packages/opencode/src/flag/flag.ts @@ -66,6 +66,7 @@ export const Flag = { copy === undefined ? process.platform === "win32" : truthy("OPENCODE_EXPERIMENTAL_DISABLE_COPY_ON_SELECT"), OPENCODE_ENABLE_EXA: truthy("OPENCODE_ENABLE_EXA") || OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_EXA"), OPENCODE_EXPERIMENTAL_BASH_DEFAULT_TIMEOUT_MS: number("OPENCODE_EXPERIMENTAL_BASH_DEFAULT_TIMEOUT_MS"), + OPENCODE_EXPERIMENTAL_PLUGIN_EXIT_DEFAULT_TIMEOUT_MS: number("OPENCODE_EXPERIMENTAL_PLUGIN_EXIT_DEFAULT_TIMEOUT_MS"), OPENCODE_EXPERIMENTAL_OUTPUT_TOKEN_MAX: number("OPENCODE_EXPERIMENTAL_OUTPUT_TOKEN_MAX"), OPENCODE_EXPERIMENTAL_OXFMT: OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_OXFMT"), OPENCODE_EXPERIMENTAL_LSP_TY: truthy("OPENCODE_EXPERIMENTAL_LSP_TY"), diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index 4587d8fb1c5b..9ba252463f36 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -30,6 +30,7 @@ const log = Log.create({ service: "plugin" }) type State = { hooks: Hooks[] + pendingEvents: Set> } // Hook names that follow the (input, output) => Promise trigger pattern @@ -49,6 +50,8 @@ export interface Interface { ) => Effect.Effect readonly list: () => Effect.Effect readonly init: () => Effect.Effect + readonly waitForPendingEvents: (timeoutMs?: number) => Effect.Effect + readonly hasPendingEvents: () => Effect.Effect } export class Service extends Context.Service()("@opencode/Plugin") {} @@ -111,6 +114,7 @@ export const layer = Layer.effect( const state = yield* InstanceState.make( Effect.fn("Plugin.state")(function* (ctx) { const hooks: Hooks[] = [] + const pendingEvents = new Set>() const bridge = yield* EffectBridge.make() function publishPluginError(message: string) { @@ -245,14 +249,21 @@ export const layer = Layer.effect( Stream.runForEach((input) => Effect.sync(() => { for (const hook of hooks) { - void hook["event"]?.({ event: input as any }) + const eventHandler = hook["event"] + if (!eventHandler) continue + const promise = Promise.resolve() + .then(() => eventHandler({ event: input as any })) + .finally(() => { + pendingEvents.delete(promise) + }) + pendingEvents.add(promise) } }), ), Effect.forkScoped, ) - return { hooks } + return { hooks, pendingEvents } }), ) @@ -280,7 +291,34 @@ export const layer = Layer.effect( yield* InstanceState.get(state) }) - return Service.of({ trigger, list, init }) + const waitForPendingEvents = Effect.fn("Plugin.waitForPendingEvents")(function* (timeoutMs?: number) { + const s = yield* InstanceState.get(state) + const timeout = timeoutMs ?? Flag.OPENCODE_EXPERIMENTAL_PLUGIN_EXIT_DEFAULT_TIMEOUT_MS ?? 60000 + + yield* Effect.tryPromise({ + try: async () => { + // Wait a tick to let event handlers be added to pendingEvents + await Promise.resolve() + const pending = Array.from(s.pendingEvents) + if (pending.length === 0) return + await Promise.race([Promise.all(pending), new Promise((resolve) => setTimeout(resolve, timeout))]) + // Wait a tick to allow any new event handlers triggered during the wait + // (e.g. reprompts that caused new session.idle events) to be registered + // before the caller checks hasPendingEvents() + await Promise.resolve() + }, + catch: (err) => { + log.error("failed to wait for pending plugin events", { error: err }) + }, + }).pipe(Effect.ignore) + }) + + const hasPendingEvents = Effect.fn("Plugin.hasPendingEvents")(function* () { + const s = yield* InstanceState.get(state) + return s.pendingEvents.size > 0 + }) + + return Service.of({ trigger, list, init, waitForPendingEvents, hasPendingEvents }) }), ) diff --git a/packages/opencode/test/plugin/event-wait-simple.test.ts b/packages/opencode/test/plugin/event-wait-simple.test.ts new file mode 100644 index 000000000000..38e5a05681e7 --- /dev/null +++ b/packages/opencode/test/plugin/event-wait-simple.test.ts @@ -0,0 +1,168 @@ +import { afterAll, afterEach, describe, expect, test } from "bun:test" +import { Effect } from "effect" +import path from "path" +import { pathToFileURL } from "url" +import { tmpdir } from "../fixture/fixture" + +const disableDefault = process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS +process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS = "1" + +const { Plugin } = await import("../../src/plugin/index") +const { Instance } = await import("../../src/project/instance") +const { Bus } = await import("../../src/bus") +const { SessionStatus } = await import("../../src/session/status") +const { SessionID } = await import("../../src/session/schema") + +afterEach(async () => { + await Instance.disposeAll() +}) + +afterAll(() => { + if (disableDefault === undefined) { + delete process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS + return + } + process.env.OPENCODE_DISABLE_DEFAULT_PLUGINS = disableDefault +}) + +describe("plugin.waitForPendingEvents - regression test", () => { + test("does not exit before async event handlers complete", async () => { + await using tmp = await tmpdir({ + init: async (dir) => { + const completionFile = path.join(dir, "completion.txt") + const file = path.join(dir, "plugin.ts") + + await Bun.write( + file, + [ + `const completionFile = ${JSON.stringify(completionFile)}`, + "export default async () => ({", + " event: async ({ event }) => {", + " if (event.type === 'session.idle') {", + " // Simulate a slow plugin operation", + " await Bun.sleep(200)", + " await Bun.write(completionFile, 'completed')", + " }", + " },", + "})", + "", + ].join("\n"), + ) + + await Bun.write( + path.join(dir, "opencode.json"), + JSON.stringify( + { + $schema: "https://opencode.ai/config.json", + plugin: [pathToFileURL(file).href], + }, + null, + 2, + ), + ) + + return completionFile + }, + }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => + Effect.gen(function* () { + const plugin = yield* Plugin.Service + const bus = yield* Bus.Service + + // Initialize plugin + yield* plugin.init() + + // Give plugin time to set up event subscriptions + yield* Effect.sleep("50 millis") + + // Publish session.idle event + yield* bus.publish(SessionStatus.Event.Idle, { + sessionID: SessionID.make("test-session"), + }) + + // Wait for pending events (this is the functionality we're testing) + yield* plugin.waitForPendingEvents(1000) + + // Check if the plugin completed + const fileExists = yield* Effect.tryPromise({ + try: () => Bun.file(tmp.extra).exists(), + catch: () => false, + }) + + // The completion file should exist if waitForPendingEvents worked + expect(fileExists).toBe(true) + + if (fileExists) { + const content = yield* Effect.promise(() => Bun.file(tmp.extra).text()) + expect(content).toBe("completed") + } + }).pipe(Effect.provide(Plugin.defaultLayer), Effect.provide(Bus.layer), Effect.runPromise), + }) + }) + + test("respects timeout and doesn't wait forever", async () => { + await using tmp = await tmpdir({ + init: async (dir) => { + const file = path.join(dir, "plugin.ts") + + await Bun.write( + file, + [ + "export default async () => ({", + " event: async ({ event }) => {", + " if (event.type === 'session.idle') {", + " // This should timeout", + " await Bun.sleep(500)", + " }", + " },", + "})", + "", + ].join("\n"), + ) + + await Bun.write( + path.join(dir, "opencode.json"), + JSON.stringify( + { + $schema: "https://opencode.ai/config.json", + plugin: [pathToFileURL(file).href], + }, + null, + 2, + ), + ) + }, + }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => + Effect.gen(function* () { + const plugin = yield* Plugin.Service + const bus = yield* Bus.Service + + yield* plugin.init() + + // Give plugin time to set up event subscriptions + yield* Effect.sleep("50 millis") + + yield* bus.publish(SessionStatus.Event.Idle, { + sessionID: SessionID.make("test-session"), + }) + + const start = Date.now() + // Use a short timeout (100ms) - plugin sleeps for 500ms + yield* plugin.waitForPendingEvents(100) + const elapsed = Date.now() - start + + // Should timeout around 100ms, not wait for the full 500ms + // Allow 150ms margin for timing variance + expect(elapsed).toBeGreaterThan(50) + expect(elapsed).toBeLessThan(250) + }).pipe(Effect.provide(Plugin.defaultLayer), Effect.provide(Bus.layer), Effect.runPromise), + }) + }) +})