diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index fe7180238851..dda657fc1f81 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -1,6 +1,7 @@ import { dynamicTool, type Tool, jsonSchema, type JSONSchema7 } from "ai" import { Client } from "@modelcontextprotocol/sdk/client/index.js" import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js" +import { isTransportError } from "./transport-error" import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js" import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js" import { UnauthorizedError } from "@modelcontextprotocol/sdk/client/auth.js" @@ -119,37 +120,6 @@ function remoteURL(key: string, value: string) { log.warn("invalid remote mcp url", { key }) } -// Convert MCP tool definition to AI SDK Tool type -function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool { - const inputSchema = mcpTool.inputSchema - - // Spread first, then override type to ensure it's always "object" - const schema: JSONSchema7 = { - ...(inputSchema as JSONSchema7), - type: "object", - properties: (inputSchema.properties ?? {}) as JSONSchema7["properties"], - additionalProperties: false, - } - - return dynamicTool({ - description: mcpTool.description ?? "", - inputSchema: jsonSchema(schema), - execute: async (args: unknown) => { - return client.callTool( - { - name: mcpTool.name, - arguments: (args || {}) as Record, - }, - CallToolResultSchema, - { - resetTimeoutOnProgress: true, - timeout, - }, - ) - }, - }) -} - function defs(key: string, client: MCPClient, timeout?: number) { return Effect.tryPromise({ try: () => withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT), @@ -243,6 +213,8 @@ export const layer = Layer.effect( const spawner = yield* ChildProcessSpawner.ChildProcessSpawner const auth = yield* McpAuth.Service const bus = yield* Bus.Service + const layerBridge = yield* EffectBridge.make() + const reconnecting = new Map>() type Transport = StdioClientTransport | StreamableHTTPClientTransport | SSEClientTransport @@ -635,6 +607,69 @@ export const layer = Layer.effect( const config = cfg.mcp ?? {} const defaultTimeout = cfg.experimental?.mcp_timeout + // Single-flight reconnect: concurrent tool calls for the same MCP name + // share one in-flight Promise instead of each triggering a new connect. + // The entry is removed on both success and failure. + const reconnectClient = (name: string): Promise => { + const existing = reconnecting.get(name) + if (existing) return existing + const p = layerBridge + .promise(getMcpConfig(name)) + .then((mcp) => { + if (!mcp) return false + return layerBridge + .promise(createAndStore(name, { ...mcp, enabled: true })) + .then((status) => status.status === "connected") + }) + .catch((err) => { + log.error("mcp reconnect failed", { name, error: err instanceof Error ? err.message : String(err) }) + return false + }) + .finally(() => { + reconnecting.delete(name) + }) + reconnecting.set(name, p) + return p + } + + // Wraps an MCP tool as an AI SDK dynamicTool. The key piece is the + // catch branch in execute: on a transport error, call reconnectClient + // and retry once with the fresh client. Non-transport errors and + // failed reconnects are rethrown as-is so business errors stay visible. + const makeTool = (clientName: string, mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool => { + const schema: JSONSchema7 = { + ...(mcpTool.inputSchema as JSONSchema7), + type: "object", + properties: (mcpTool.inputSchema.properties ?? {}) as JSONSchema7["properties"], + additionalProperties: false, + } + return dynamicTool({ + description: mcpTool.description ?? "", + inputSchema: jsonSchema(schema), + execute: (args: unknown) => { + const payload = { + name: mcpTool.name, + arguments: (args || {}) as Record, + } + const opts = { resetTimeoutOnProgress: true, timeout } + return client.callTool(payload, CallToolResultSchema, opts).catch(async (e) => { + if (!isTransportError(e)) throw e + log.warn("mcp transport error, attempting reconnect", { + clientName, + tool: mcpTool.name, + error: e instanceof Error ? e.message : String(e), + }) + const ok = await reconnectClient(clientName) + if (!ok) throw e + const next = await layerBridge.promise(InstanceState.get(state)) + const fresh = next.clients[clientName] + if (!fresh || next.status[clientName]?.status !== "connected") throw e + return fresh.callTool(payload, CallToolResultSchema, opts) + }) + }, + }) + } + const connectedClients = Object.entries(s.clients).filter( ([clientName]) => s.status[clientName]?.status === "connected", ) @@ -654,7 +689,12 @@ export const layer = Layer.effect( const timeout = entry?.timeout ?? defaultTimeout for (const mcpTool of listed) { - result[sanitize(clientName) + "_" + sanitize(mcpTool.name)] = convertMcpTool(mcpTool, client, timeout) + result[sanitize(clientName) + "_" + sanitize(mcpTool.name)] = makeTool( + clientName, + mcpTool, + client, + timeout, + ) } }), { concurrency: "unbounded" }, diff --git a/packages/opencode/src/mcp/transport-error.ts b/packages/opencode/src/mcp/transport-error.ts new file mode 100644 index 000000000000..431f52d63389 --- /dev/null +++ b/packages/opencode/src/mcp/transport-error.ts @@ -0,0 +1,48 @@ +import { StreamableHTTPError } from "@modelcontextprotocol/sdk/client/streamableHttp.js" + +// Fast-path unit tests live at `test/mcp/transport-error.test.ts`; they feed +// synthetic error objects into `isTransportError`. Whenever this classifier +// changes, also run `bun run test:mcp-probe` (test/mcp/transport-error-probe.mjs) +// to verify those synthetic shapes still match what real servers / sockets +// emit under `bun` and `node`. + +const TRANSPORT_ERROR_CODES = new Set([ + // Node / Undici + "ECONNRESET", + "ECONNREFUSED", + "ETIMEDOUT", + "EHOSTUNREACH", + "ENOTFOUND", + "EPIPE", + "ECONNABORTED", + "UND_ERR_SOCKET", + "UND_ERR_CLOSED", + // Bun (uses PascalCase identifiers on err.code) + "ConnectionRefused", + "ConnectionReset", + "ConnectionAborted", + "ConnectionClosed", + "Timeout", + "SocketClosed", + "NotConnected", + "FailedToOpenSocket", +]) + +export function isTransportError(e: unknown): boolean { + if (e instanceof StreamableHTTPError) { + // -1 = SDK protocol-level breakage (unexpected content-type, etc.) + if (e.code === -1) return true + if (typeof e.code !== "number") return false + // 401/403 belong to auth flow, not transport + if (e.code === 401 || e.code === 403) return false + // Anything else 4xx (stale session 404, bad request 400, gone 410, ...) or 5xx counts + return e.code >= 400 + } + if (!(e instanceof Error)) return false + const err = e as Error & { code?: string; cause?: { code?: string } } + if (err.cause?.code && TRANSPORT_ERROR_CODES.has(err.cause.code)) return true + if (err.code && TRANSPORT_ERROR_CODES.has(err.code)) return true + if (err.message.includes("fetch failed")) return true + if (err.message.includes("Unable to connect")) return true + return false +} diff --git a/packages/opencode/test/mcp/headers.test.ts b/packages/opencode/test/mcp/headers.test.ts index 5bc8f803d27b..ee2e807d21d8 100644 --- a/packages/opencode/test/mcp/headers.test.ts +++ b/packages/opencode/test/mcp/headers.test.ts @@ -23,6 +23,14 @@ void mock.module("@modelcontextprotocol/sdk/client/streamableHttp.js", () => ({ throw new Error("Mock transport cannot connect") } }, + // Re-export so `src/mcp/transport-error.ts` can still `instanceof`-check. + StreamableHTTPError: class StreamableHTTPError extends Error { + readonly code: number | undefined + constructor(code: number | undefined, message: string | undefined) { + super(message) + this.code = code + } + }, })) void mock.module("@modelcontextprotocol/sdk/client/sse.js", () => ({ diff --git a/packages/opencode/test/mcp/lifecycle.test.ts b/packages/opencode/test/mcp/lifecycle.test.ts index 10547c9f0821..761b5d670973 100644 --- a/packages/opencode/test/mcp/lifecycle.test.ts +++ b/packages/opencode/test/mcp/lifecycle.test.ts @@ -17,6 +17,11 @@ interface MockClientState { resources: Array<{ name: string; uri: string; description?: string }> closed: boolean notificationHandlers: Map any> + // FIFO queue controlling what each `callTool` invocation returns / throws. + // Each entry is either { error } (throw it) or { result } (return it). + // If the queue is empty, a default ok payload is returned. + callToolOutcomes: Array<{ error?: unknown; result?: unknown }> + callToolCalls: number } const clientStates = new Map() @@ -44,6 +49,8 @@ function getOrCreateClientState(name?: string): MockClientState { resources: [], closed: false, notificationHandlers: new Map(), + callToolOutcomes: [], + callToolCalls: 0, } clientStates.set(key, state) } @@ -96,6 +103,15 @@ void mock.module("@modelcontextprotocol/sdk/client/stdio.js", () => ({ void mock.module("@modelcontextprotocol/sdk/client/streamableHttp.js", () => ({ StreamableHTTPClientTransport: MockStreamableHTTP, + // Also re-export the real error class so `transport-error.ts` can still + // instanceof-check against it (the classifier imports it from this module). + StreamableHTTPError: class StreamableHTTPError extends Error { + readonly code: number | undefined + constructor(code: number | undefined, message: string | undefined) { + super(message) + this.code = code + } + }, })) void mock.module("@modelcontextprotocol/sdk/client/sse.js", () => ({ @@ -156,6 +172,13 @@ void mock.module("@modelcontextprotocol/sdk/client/index.js", () => ({ async close() { if (this._state) this._state.closed = true } + + async callTool(_payload: unknown, _schema?: unknown, _opts?: unknown) { + if (this._state) this._state.callToolCalls++ + const next = this._state?.callToolOutcomes.shift() + if (next?.error) throw next.error + return next?.result ?? { content: [{ type: "text", text: "ok" }] } + } }, })) @@ -786,3 +809,190 @@ test( }), ), ) + +// ======================================================================== +// Test: tool.execute() auto-reconnect on transport errors +// Covers the catch branch added in src/mcp/index.ts (makeTool) that calls +// isTransportError(e), reconnects, and retries on the fresh client, plus the +// single-flight dedup inside reconnectClient(). +// ======================================================================== + +test( + "tool.execute() returns without reconnecting on success", + withInstance( + { + "reconn-ok": { type: "local", command: ["echo", "test"] }, + }, + (mcp) => + Effect.gen(function* () { + lastCreatedClientName = "reconn-ok" + const s = getOrCreateClientState("reconn-ok") + s.tools = [{ name: "ping", description: "", inputSchema: { type: "object", properties: {} } }] + + // Warm up tools() to build the closure. + const tools = yield* mcp.tools() + const key = Object.keys(tools).find((k) => k.endsWith("_ping"))! + const tool = tools[key] as { execute: (args: unknown, opts: unknown) => Promise } + + const createdBefore = clientCreateCount + const result = yield* Effect.promise(() => tool.execute({}, { toolCallId: "x", messages: [] })) + + expect(result).toEqual({ content: [{ type: "text", text: "ok" }] }) + expect(s.callToolCalls).toBe(1) + // No reconnect -> no new Client instantiation + expect(clientCreateCount).toBe(createdBefore) + }), + ), +) + +test( + "tool.execute() rethrows non-transport errors without reconnecting", + withInstance( + { + "reconn-biz": { type: "local", command: ["echo", "test"] }, + }, + (mcp) => + Effect.gen(function* () { + lastCreatedClientName = "reconn-biz" + const s = getOrCreateClientState("reconn-biz") + s.tools = [{ name: "run", description: "", inputSchema: { type: "object", properties: {} } }] + // A plain business error: isTransportError() must classify this as false. + s.callToolOutcomes = [{ error: new Error("business: invalid arg") }] + + const tools = yield* mcp.tools() + const key = Object.keys(tools).find((k) => k.endsWith("_run"))! + const tool = tools[key] as { execute: (args: unknown, opts: unknown) => Promise } + + const createdBefore = clientCreateCount + let caught: unknown + yield* Effect.promise(() => + tool.execute({}, { toolCallId: "x", messages: [] }).catch((err: unknown) => { + caught = err + }), + ) + + expect(caught).toBeInstanceOf(Error) + expect((caught as Error).message).toBe("business: invalid arg") + expect(s.callToolCalls).toBe(1) + expect(clientCreateCount).toBe(createdBefore) + }), + ), +) + +test( + "tool.execute() reconnects and retries once on transport errors", + withInstance( + { + "reconn-retry": { type: "local", command: ["echo", "test"] }, + }, + (mcp) => + Effect.gen(function* () { + lastCreatedClientName = "reconn-retry" + const s = getOrCreateClientState("reconn-retry") + s.tools = [{ name: "run", description: "", inputSchema: { type: "object", properties: {} } }] + // 1st call: socket failure recognized by isTransportError; 2nd: succeed on fresh client. + const transportErr = Object.assign(new Error("Unable to connect"), { code: "ConnectionRefused" }) + s.callToolOutcomes = [{ error: transportErr }, { result: { content: [{ type: "text", text: "retried" }] } }] + + const tools = yield* mcp.tools() + const key = Object.keys(tools).find((k) => k.endsWith("_run"))! + const tool = tools[key] as { execute: (args: unknown, opts: unknown) => Promise } + + const createdBefore = clientCreateCount + const listToolsBefore = s.listToolsCalls + + const result = yield* Effect.promise(() => tool.execute({}, { toolCallId: "x", messages: [] })) + + expect(result).toEqual({ content: [{ type: "text", text: "retried" }] }) + // 1 original failure + 1 retry against the fresh client + expect(s.callToolCalls).toBe(2) + // Reconnect must have spun up a new Client + expect(clientCreateCount).toBe(createdBefore + 1) + // Reconnect path also re-lists tools to refresh the cache + expect(s.listToolsCalls).toBe(listToolsBefore + 1) + }), + ), +) + +test( + "tool.execute() throws original transport error when reconnect fails", + withInstance( + { + "reconn-fail": { type: "local", command: ["echo", "test"] }, + }, + (mcp) => + Effect.gen(function* () { + lastCreatedClientName = "reconn-fail" + const s = getOrCreateClientState("reconn-fail") + s.tools = [{ name: "run", description: "", inputSchema: { type: "object", properties: {} } }] + const transportErr = Object.assign(new Error("Unable to connect"), { code: "ConnectionRefused" }) + s.callToolOutcomes = [{ error: transportErr }] + + const tools = yield* mcp.tools() + const key = Object.keys(tools).find((k) => k.endsWith("_run"))! + const tool = tools[key] as { execute: (args: unknown, opts: unknown) => Promise } + + // Sabotage the reconnect: new transport connect will fail. + connectShouldFail = true + connectError = "reconnect refused" + + let caught: unknown + yield* Effect.promise(() => + tool.execute({}, { toolCallId: "x", messages: [] }).catch((err: unknown) => { + caught = err + }), + ) + + // The ORIGINAL transport error is surfaced, not the reconnect error + expect(caught).toBe(transportErr) + // No retry happened because reconnect returned false + expect(s.callToolCalls).toBe(1) + }), + ), +) + +test( + "concurrent tool calls hitting transport errors trigger only one reconnect", + withInstance( + { + "reconn-dedup": { type: "local", command: ["echo", "test"] }, + }, + (mcp) => + Effect.gen(function* () { + lastCreatedClientName = "reconn-dedup" + const s = getOrCreateClientState("reconn-dedup") + s.tools = [ + { name: "a", description: "", inputSchema: { type: "object", properties: {} } }, + { name: "b", description: "", inputSchema: { type: "object", properties: {} } }, + ] + // Two concurrent tools both see transport errors on the first call. + // Retries fall back to the default ok payload from MockClient.callTool. + s.callToolOutcomes = [ + { error: Object.assign(new Error("Unable to connect"), { code: "ConnectionRefused" }) }, + { error: Object.assign(new Error("Unable to connect"), { code: "ConnectionRefused" }) }, + ] + + const tools = yield* mcp.tools() + const keyA = Object.keys(tools).find((k) => k.endsWith("_a"))! + const keyB = Object.keys(tools).find((k) => k.endsWith("_b"))! + const toolA = tools[keyA] as { execute: (args: unknown, opts: unknown) => Promise } + const toolB = tools[keyB] as { execute: (args: unknown, opts: unknown) => Promise } + + const createdBefore = clientCreateCount + + const [ra, rb] = yield* Effect.promise(() => + Promise.all([ + toolA.execute({}, { toolCallId: "a", messages: [] }), + toolB.execute({}, { toolCallId: "b", messages: [] }), + ]), + ) + + expect(ra).toBeDefined() + expect(rb).toBeDefined() + // The single-flight map must collapse 2 parallel reconnects into 1 Client + expect(clientCreateCount).toBe(createdBefore + 1) + // 2 initial failures + 2 retry successes + expect(s.callToolCalls).toBe(4) + }), + ), +) diff --git a/packages/opencode/test/mcp/mcp-reconnect.test.ts b/packages/opencode/test/mcp/mcp-reconnect.test.ts new file mode 100644 index 000000000000..5fbb0d77be02 --- /dev/null +++ b/packages/opencode/test/mcp/mcp-reconnect.test.ts @@ -0,0 +1,493 @@ +import { test, expect, mock, beforeEach } from "bun:test" +import { Effect } from "effect" +import type { MCP as MCPNS } from "../../src/mcp/index" + +// --- Overview --------------------------------------------------------------- +// +// This file mocks an MCP server and reproduces the transport-level failure +// modes that a real MCP server hits when it is *restarted* on the same port, +// *down*, or kills an in-flight request. The goal is to prove end-to-end that +// `tool.execute()` in `src/mcp/index.ts` transparently reconnects and retries +// exactly once on transport errors, and does NOT retry on business errors. +// +// Contrast with `session-error.test.ts`: that file uses a plain +// `new Error("Session not found")` string, which is a weaker signal. Here we +// feed the exact shapes `isTransportError` recognises — a stub +// `StreamableHTTPError` (for HTTP 4xx/5xx from a restarted server) and plain +// Errors carrying a Bun/Node socket `code`. + +// --- Stub StreamableHTTPError ---------------------------------------------- +// +// We must construct a class that `src/mcp/transport-error.ts` will see as +// `StreamableHTTPError` via `instanceof`. To do that we mock the SDK module +// below and re-export THIS class under that name; the errors we throw in +// tests are then `new StubStreamableHTTPError(...)`, which is the same class +// that `transport-error.ts` imports once the mock is active. +class StubStreamableHTTPError extends Error { + readonly code: number | undefined + constructor(code: number | undefined, message: string) { + super(message) + this.code = code + } +} + +// --- Mock infrastructure ---------------------------------------------------- + +interface MockCallToolBehavior { + // Errors to throw on successive calls. After the array drains, calls succeed + // with `successResult`. + throwQueue: Error[] + // Total invocations across all Client instances bound to this name + // (original call + any retries after reconnect). + invocations: number + successResult: unknown +} + +const callToolBehaviors = new Map() + +// Each Client construction bumps this. We use it to observe whether a +// reconnect actually happened (reconnect path: getMcpConfig -> createAndStore +// -> new Client). +let clientCreateCount = 0 +let lastCreatedClientName: string | undefined + +// Queue of errors to inject on the NEXT transport.start(). We use this to +// simulate a server that is still down during the reconnect attempt. +let transportStartFailQueue: Error[] = [] + +class MockTransport { + // oxlint-disable-next-line no-useless-constructor + constructor(_url?: any, _opts?: any) {} + async start() { + const next = transportStartFailQueue.shift() + if (next) throw next + } + async close() {} + async finishAuth() {} +} + +void mock.module("@modelcontextprotocol/sdk/client/streamableHttp.js", () => ({ + StreamableHTTPClientTransport: MockTransport, + // Re-export the stub so `src/mcp/transport-error.ts`'s `instanceof` check + // resolves to the same class we throw in the tests below. + StreamableHTTPError: StubStreamableHTTPError, +})) +void mock.module("@modelcontextprotocol/sdk/client/sse.js", () => ({ + SSEClientTransport: MockTransport, +})) +void mock.module("@modelcontextprotocol/sdk/client/stdio.js", () => ({ + StdioClientTransport: MockTransport, +})) +void mock.module("@modelcontextprotocol/sdk/client/auth.js", () => ({ + UnauthorizedError: class extends Error { + constructor() { + super("Unauthorized") + } + }, +})) + +void mock.module("@modelcontextprotocol/sdk/client/index.js", () => ({ + Client: class MockClient { + private _name: string | undefined + transport: any + + constructor(_opts: any) { + clientCreateCount++ + } + + async connect(transport: { start: () => Promise }) { + this.transport = transport + await transport.start() + this._name = lastCreatedClientName + } + + setNotificationHandler(_schema: unknown, _handler: (...args: any[]) => any) {} + + async listTools() { + return { + tools: [ + { + name: "do_thing", + description: "does a thing", + inputSchema: { type: "object", properties: {} }, + }, + ], + } + } + + async listPrompts() { + return { prompts: [] } + } + async listResources() { + return { resources: [] } + } + + async callTool(_req: unknown, _schema: unknown, _opts: unknown) { + const key = this._name ?? "default" + const behavior = callToolBehaviors.get(key) + if (!behavior) throw new Error(`No callTool behavior set for "${key}"`) + behavior.invocations++ + const next = behavior.throwQueue.shift() + if (next) throw next + return behavior.successResult + } + + async close() {} + }, +})) + +beforeEach(() => { + callToolBehaviors.clear() + clientCreateCount = 0 + transportStartFailQueue = [] + lastCreatedClientName = undefined +}) + +// Imports AFTER mocks, so src/mcp/* picks up the mocked SDK modules. +const { MCP } = await import("../../src/mcp/index") +const { AppRuntime } = await import("../../src/effect/app-runtime") +const { WithInstance } = await import("../../src/project/with-instance") +const { tmpdir } = await import("../fixture/fixture") +const service = MCP.Service as unknown as Effect.Effect + +function withInstance( + config: Record, + fn: (mcp: MCPNS.Interface) => Effect.Effect, +) { + return async () => { + await using tmp = await tmpdir({ + init: async (dir) => { + await Bun.write( + `${dir}/opencode.json`, + JSON.stringify({ + $schema: "https://opencode.ai/config.json", + mcp: config, + }), + ) + }, + }) + + await WithInstance.provide({ + directory: tmp.path, + fn: async () => { + await AppRuntime.runPromise( + Effect.gen(function* () { + const mcp = yield* service + yield* fn(mcp) + }), + ) + }, + }) + } +} + +// Helper: fetch the wrapped tool and invoke it the way the AI SDK does. +function execTool(tools: Record) { + const key = Object.keys(tools).find((k) => k.includes("do_thing"))! + expect(key).toBeDefined() + return (tools[key] as any).execute({}, undefined as any) as Promise +} + +// ============================================================================ +// HTTP-layer restart: 404 "Session not found" after the server respawned +// ============================================================================ + +test( + "restart window: StreamableHTTPError 404 stale session → reconnect + retry succeeds", + withInstance({ "restart-404": { type: "remote", url: "https://example.com/mcp", oauth: false } }, (mcp) => + Effect.gen(function* () { + lastCreatedClientName = "restart-404" + callToolBehaviors.set("restart-404", { + throwQueue: [new StubStreamableHTTPError(404, "Session 8f7c3d42 not found")], + invocations: 0, + successResult: { content: [{ type: "text", text: "ok" }] }, + }) + + yield* mcp.add("restart-404", { + type: "remote", + url: "https://example.com/mcp", + oauth: false, + }) + + const baseline = clientCreateCount + const tools = yield* mcp.tools() + const result = yield* Effect.promise(() => execTool(tools)) + + expect(result).toEqual({ content: [{ type: "text", text: "ok" }] }) + // original call threw, retry on the fresh client succeeded + expect(callToolBehaviors.get("restart-404")!.invocations).toBe(2) + // exactly one reconnect (no loop) + expect(clientCreateCount - baseline).toBe(1) + }), + ), +) + +// ============================================================================ +// HTTP-layer restart: 400 "No valid session ID provided" (probe-observed) +// ============================================================================ + +test( + "restart window: StreamableHTTPError 400 no valid session → reconnect + retry succeeds", + withInstance({ "restart-400": { type: "remote", url: "https://example.com/mcp", oauth: false } }, (mcp) => + Effect.gen(function* () { + lastCreatedClientName = "restart-400" + callToolBehaviors.set("restart-400", { + throwQueue: [new StubStreamableHTTPError(400, "Bad Request: No valid session ID provided")], + invocations: 0, + successResult: { content: [{ type: "text", text: "recovered" }] }, + }) + + yield* mcp.add("restart-400", { + type: "remote", + url: "https://example.com/mcp", + oauth: false, + }) + + const baseline = clientCreateCount + const tools = yield* mcp.tools() + const result = yield* Effect.promise(() => execTool(tools)) + + expect(result).toEqual({ content: [{ type: "text", text: "recovered" }] }) + expect(callToolBehaviors.get("restart-400")!.invocations).toBe(2) + expect(clientCreateCount - baseline).toBe(1) + }), + ), +) + +// ============================================================================ +// Socket-layer restart: Bun ConnectionRefused (port briefly not listening) +// ============================================================================ + +test( + "restart window: Bun ConnectionRefused on callTool → reconnect + retry succeeds", + withInstance({ "restart-refused": { type: "remote", url: "https://example.com/mcp", oauth: false } }, (mcp) => + Effect.gen(function* () { + lastCreatedClientName = "restart-refused" + const refused = Object.assign(new Error("Unable to connect. Is the computer able to access the url?"), { + code: "ConnectionRefused", + }) + callToolBehaviors.set("restart-refused", { + throwQueue: [refused], + invocations: 0, + successResult: { content: [{ type: "text", text: "back online" }] }, + }) + + yield* mcp.add("restart-refused", { + type: "remote", + url: "https://example.com/mcp", + oauth: false, + }) + + const baseline = clientCreateCount + const tools = yield* mcp.tools() + const result = yield* Effect.promise(() => execTool(tools)) + + expect(result).toEqual({ content: [{ type: "text", text: "back online" }] }) + expect(callToolBehaviors.get("restart-refused")!.invocations).toBe(2) + expect(clientCreateCount - baseline).toBe(1) + }), + ), +) + +// ============================================================================ +// In-flight killed by restart: `fetch failed` + cause.code=ECONNRESET +// ============================================================================ + +test( + "in-flight killed: fetch failed + cause ECONNRESET → reconnect + retry succeeds", + withInstance({ "restart-reset": { type: "remote", url: "https://example.com/mcp", oauth: false } }, (mcp) => + Effect.gen(function* () { + lastCreatedClientName = "restart-reset" + const reset = new Error("fetch failed", { + cause: Object.assign(new Error("socket hang up"), { code: "ECONNRESET" }), + }) + callToolBehaviors.set("restart-reset", { + throwQueue: [reset], + invocations: 0, + successResult: { content: [{ type: "text", text: "resumed" }] }, + }) + + yield* mcp.add("restart-reset", { + type: "remote", + url: "https://example.com/mcp", + oauth: false, + }) + + const baseline = clientCreateCount + const tools = yield* mcp.tools() + const result = yield* Effect.promise(() => execTool(tools)) + + expect(result).toEqual({ content: [{ type: "text", text: "resumed" }] }) + expect(callToolBehaviors.get("restart-reset")!.invocations).toBe(2) + expect(clientCreateCount - baseline).toBe(1) + }), + ), +) + +// ============================================================================ +// Restart but server still stuck: reconnect succeeds, next callTool still 404 +// → retry exactly once, then propagate. No infinite loop. +// ============================================================================ + +test( + "persistent restart failure: 404 on every call → one retry then propagate", + withInstance({ "stuck-restart": { type: "remote", url: "https://example.com/mcp", oauth: false } }, (mcp) => + Effect.gen(function* () { + lastCreatedClientName = "stuck-restart" + callToolBehaviors.set("stuck-restart", { + throwQueue: [ + new StubStreamableHTTPError(404, "Session not found"), + new StubStreamableHTTPError(404, "Session not found"), + // a 3rd in case the no-loop guarantee ever breaks + new StubStreamableHTTPError(404, "Session not found"), + ], + invocations: 0, + successResult: undefined, + }) + + yield* mcp.add("stuck-restart", { + type: "remote", + url: "https://example.com/mcp", + oauth: false, + }) + + const baseline = clientCreateCount + const tools = yield* mcp.tools() + const outcome = yield* Effect.promise(() => + execTool(tools) + .then(() => ({ ok: true as const })) + .catch((err: Error) => ({ ok: false as const, message: err.message })), + ) + + expect(outcome.ok).toBe(false) + if (!outcome.ok) expect(outcome.message).toMatch(/session\s*not\s*found/i) + // Exactly two invocations: original + one retry, no more. + expect(callToolBehaviors.get("stuck-restart")!.invocations).toBe(2) + // Reconnect happened exactly once. + expect(clientCreateCount - baseline).toBe(1) + }), + ), +) + +// ============================================================================ +// Server permanently down: reconnect itself fails (transport.start throws) +// → original transport error propagates, no retry attempted. +// ============================================================================ + +test( + "server down: reconnect attempt fails → original transport error propagates without retry", + withInstance({ "down-srv": { type: "remote", url: "https://example.com/mcp", oauth: false } }, (mcp) => + Effect.gen(function* () { + lastCreatedClientName = "down-srv" + callToolBehaviors.set("down-srv", { + throwQueue: [new StubStreamableHTTPError(404, "Session not found")], + invocations: 0, + successResult: undefined, + }) + + yield* mcp.add("down-srv", { + type: "remote", + url: "https://example.com/mcp", + oauth: false, + }) + + // createAndStore tries StreamableHTTP first and falls back to SSE on + // failure — queue up enough failures to defeat BOTH attempts so the + // reconnect fully fails and the original error is what propagates. + const refused = () => Object.assign(new Error("Unable to connect"), { code: "ConnectionRefused" }) + transportStartFailQueue.push(refused(), refused(), refused(), refused()) + + const baseline = clientCreateCount + const tools = yield* mcp.tools() + const outcome = yield* Effect.promise(() => + execTool(tools) + .then(() => ({ ok: true as const })) + .catch((err: Error) => ({ ok: false as const, message: err.message })), + ) + + expect(outcome.ok).toBe(false) + if (!outcome.ok) expect(outcome.message).toMatch(/session\s*not\s*found/i) + // Original call ran once; reconnect failed so retry was never issued. + expect(callToolBehaviors.get("down-srv")!.invocations).toBe(1) + // Reconnect DID attempt a fresh Client (bumping the counter) even though + // it failed — the important bit is we didn't loop and didn't retry. + expect(clientCreateCount - baseline).toBeGreaterThanOrEqual(1) + }), + ), +) + +// ============================================================================ +// Business error must NOT be treated as a transport error: no reconnect, no +// retry, error surfaces as-is. +// ============================================================================ + +test( + "business error (non-transport) → propagates immediately, zero reconnect", + withInstance({ "biz-err": { type: "remote", url: "https://example.com/mcp", oauth: false } }, (mcp) => + Effect.gen(function* () { + lastCreatedClientName = "biz-err" + callToolBehaviors.set("biz-err", { + throwQueue: [new Error("tool 'do_thing' not found")], + invocations: 0, + successResult: undefined, + }) + + yield* mcp.add("biz-err", { + type: "remote", + url: "https://example.com/mcp", + oauth: false, + }) + + const baseline = clientCreateCount + const tools = yield* mcp.tools() + const outcome = yield* Effect.promise(() => + execTool(tools) + .then(() => ({ ok: true as const })) + .catch((err: Error) => ({ ok: false as const, message: err.message })), + ) + + expect(outcome.ok).toBe(false) + if (!outcome.ok) expect(outcome.message).toMatch(/not found/i) + expect(callToolBehaviors.get("biz-err")!.invocations).toBe(1) + expect(clientCreateCount - baseline).toBe(0) + }), + ), +) + +// ============================================================================ +// Auth errors (401/403) are not transport errors either — they belong to the +// OAuth flow. Must surface without touching the reconnect path. +// ============================================================================ + +test( + "401 unauthorized is not a transport error → propagates, zero reconnect", + withInstance({ "auth-err": { type: "remote", url: "https://example.com/mcp", oauth: false } }, (mcp) => + Effect.gen(function* () { + lastCreatedClientName = "auth-err" + callToolBehaviors.set("auth-err", { + throwQueue: [new StubStreamableHTTPError(401, "unauthorized")], + invocations: 0, + successResult: undefined, + }) + + yield* mcp.add("auth-err", { + type: "remote", + url: "https://example.com/mcp", + oauth: false, + }) + + const baseline = clientCreateCount + const tools = yield* mcp.tools() + const outcome = yield* Effect.promise(() => + execTool(tools) + .then(() => ({ ok: true as const })) + .catch((err: Error) => ({ ok: false as const, message: err.message })), + ) + + expect(outcome.ok).toBe(false) + if (!outcome.ok) expect(outcome.message).toMatch(/unauthorized/i) + expect(callToolBehaviors.get("auth-err")!.invocations).toBe(1) + expect(clientCreateCount - baseline).toBe(0) + }), + ), +) diff --git a/packages/opencode/test/mcp/oauth-auto-connect.test.ts b/packages/opencode/test/mcp/oauth-auto-connect.test.ts index 3cf67742156d..03d5a6dbedb3 100644 --- a/packages/opencode/test/mcp/oauth-auto-connect.test.ts +++ b/packages/opencode/test/mcp/oauth-auto-connect.test.ts @@ -64,6 +64,14 @@ void mock.module("@modelcontextprotocol/sdk/client/streamableHttp.js", () => ({ } async finishAuth(_code: string) {} }, + // Re-export so `src/mcp/transport-error.ts` can still `instanceof`-check. + StreamableHTTPError: class StreamableHTTPError extends Error { + readonly code: number | undefined + constructor(code: number | undefined, message: string | undefined) { + super(message) + this.code = code + } + }, })) void mock.module("@modelcontextprotocol/sdk/client/sse.js", () => ({ diff --git a/packages/opencode/test/mcp/oauth-browser.test.ts b/packages/opencode/test/mcp/oauth-browser.test.ts index 20cb90a18e0a..d69eb64c126a 100644 --- a/packages/opencode/test/mcp/oauth-browser.test.ts +++ b/packages/opencode/test/mcp/oauth-browser.test.ts @@ -63,6 +63,14 @@ void mock.module("@modelcontextprotocol/sdk/client/streamableHttp.js", () => ({ // Mock successful auth completion } }, + // Re-export so `src/mcp/transport-error.ts` can still `instanceof`-check. + StreamableHTTPError: class StreamableHTTPError extends Error { + readonly code: number | undefined + constructor(code: number | undefined, message: string | undefined) { + super(message) + this.code = code + } + }, })) void mock.module("@modelcontextprotocol/sdk/client/sse.js", () => ({ diff --git a/packages/opencode/test/mcp/transport-error.test.ts b/packages/opencode/test/mcp/transport-error.test.ts new file mode 100644 index 000000000000..0fe677363bc1 --- /dev/null +++ b/packages/opencode/test/mcp/transport-error.test.ts @@ -0,0 +1,102 @@ +import { describe, test, expect } from "bun:test" +import { StreamableHTTPError } from "@modelcontextprotocol/sdk/client/streamableHttp.js" +import { isTransportError } from "../../src/mcp/transport-error" + +// Synthetic error shapes are observed from the real-world probe at +// `test/mcp/transport-error-probe.mjs`. Whenever the classifier changes, +// re-run `bun run test:mcp-probe` to confirm these shapes still match reality. + +describe("isTransportError — StreamableHTTPError", () => { + test("code -1 (SDK protocol-level breakage)", () => { + expect(isTransportError(new StreamableHTTPError(-1, "bad content-type"))).toBe(true) + }) + + test("400 bad request (e.g. server not initialized after restart)", () => { + expect(isTransportError(new StreamableHTTPError(400, "Bad Request: Server not initialized"))).toBe(true) + }) + + test("404 stale session", () => { + expect(isTransportError(new StreamableHTTPError(404, "session not found"))).toBe(true) + }) + + test("410 gone", () => { + expect(isTransportError(new StreamableHTTPError(410, "gone"))).toBe(true) + }) + + test("500 server error", () => { + expect(isTransportError(new StreamableHTTPError(503, "bad gateway"))).toBe(true) + }) + + test("401 unauthorized belongs to auth flow, not transport", () => { + expect(isTransportError(new StreamableHTTPError(401, "unauthorized"))).toBe(false) + }) + + test("403 forbidden belongs to auth flow, not transport", () => { + expect(isTransportError(new StreamableHTTPError(403, "forbidden"))).toBe(false) + }) + + test("undefined code is not transport", () => { + expect(isTransportError(new StreamableHTTPError(undefined, "no code"))).toBe(false) + }) +}) + +describe("isTransportError — plain Error with socket code", () => { + test("Bun PascalCase err.code: ConnectionRefused", () => { + const e = Object.assign(new Error("Unable to connect. Is the computer able to access the url?"), { + code: "ConnectionRefused", + }) + expect(isTransportError(e)).toBe(true) + }) + + test("Bun PascalCase err.code: ConnectionReset", () => { + const e = Object.assign(new Error("socket reset"), { code: "ConnectionReset" }) + expect(isTransportError(e)).toBe(true) + }) + + test("Node UPPER_SNAKE err.code: ECONNREFUSED", () => { + const e = Object.assign(new Error("connect ECONNREFUSED 127.0.0.1:1"), { code: "ECONNREFUSED" }) + expect(isTransportError(e)).toBe(true) + }) + + test("Node err.cause.code: ECONNRESET inside fetch failed", () => { + const e = new Error("fetch failed", { cause: Object.assign(new Error("reset"), { code: "ECONNRESET" }) }) + expect(isTransportError(e)).toBe(true) + }) + + test("Undici err.cause.code: UND_ERR_SOCKET", () => { + const e = new Error("fetch failed", { cause: Object.assign(new Error(""), { code: "UND_ERR_SOCKET" }) }) + expect(isTransportError(e)).toBe(true) + }) + + test('message fallback: "fetch failed" with no code', () => { + expect(isTransportError(new Error("fetch failed"))).toBe(true) + }) + + test('message fallback: "Unable to connect"', () => { + expect(isTransportError(new Error("Unable to connect. Is the computer able to access the url?"))).toBe(true) + }) +}) + +describe("isTransportError — non-transport cases", () => { + test("unknown err.code is ignored", () => { + const e = Object.assign(new Error("nope"), { code: "SOME_RANDOM_CODE" }) + expect(isTransportError(e)).toBe(false) + }) + + test("business error without code", () => { + expect(isTransportError(new Error("tool 'echo' not found"))).toBe(false) + }) + + test("non-Error value (string)", () => { + expect(isTransportError("boom")).toBe(false) + }) + + test("non-Error value (plain object with code)", () => { + expect(isTransportError({ code: "ConnectionRefused" })).toBe(false) + }) + + test("null/undefined", () => { + expect(isTransportError(null)).toBe(false) + expect(isTransportError(undefined)).toBe(false) + }) +})