11import { dynamicTool , type Tool , jsonSchema , type JSONSchema7 } from "ai"
22import { Client } from "@modelcontextprotocol/sdk/client/index.js"
33import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js"
4+ import { isTransportError } from "./transport-error"
45import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js"
56import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"
67import { UnauthorizedError } from "@modelcontextprotocol/sdk/client/auth.js"
@@ -119,37 +120,6 @@ function remoteURL(key: string, value: string) {
119120 log . warn ( "invalid remote mcp url" , { key } )
120121}
121122
122- // Convert MCP tool definition to AI SDK Tool type
123- function convertMcpTool ( mcpTool : MCPToolDef , client : MCPClient , timeout ?: number ) : Tool {
124- const inputSchema = mcpTool . inputSchema
125-
126- // Spread first, then override type to ensure it's always "object"
127- const schema : JSONSchema7 = {
128- ...( inputSchema as JSONSchema7 ) ,
129- type : "object" ,
130- properties : ( inputSchema . properties ?? { } ) as JSONSchema7 [ "properties" ] ,
131- additionalProperties : false ,
132- }
133-
134- return dynamicTool ( {
135- description : mcpTool . description ?? "" ,
136- inputSchema : jsonSchema ( schema ) ,
137- execute : async ( args : unknown ) => {
138- return client . callTool (
139- {
140- name : mcpTool . name ,
141- arguments : ( args || { } ) as Record < string , unknown > ,
142- } ,
143- CallToolResultSchema ,
144- {
145- resetTimeoutOnProgress : true ,
146- timeout,
147- } ,
148- )
149- } ,
150- } )
151- }
152-
153123function defs ( key : string , client : MCPClient , timeout ?: number ) {
154124 return Effect . tryPromise ( {
155125 try : ( ) => withTimeout ( client . listTools ( ) , timeout ?? DEFAULT_TIMEOUT ) ,
@@ -243,6 +213,8 @@ export const layer = Layer.effect(
243213 const spawner = yield * ChildProcessSpawner . ChildProcessSpawner
244214 const auth = yield * McpAuth . Service
245215 const bus = yield * Bus . Service
216+ const layerBridge = yield * EffectBridge . make ( )
217+ const reconnecting = new Map < string , Promise < boolean > > ( )
246218
247219 type Transport = StdioClientTransport | StreamableHTTPClientTransport | SSEClientTransport
248220
@@ -635,6 +607,69 @@ export const layer = Layer.effect(
635607 const config = cfg . mcp ?? { }
636608 const defaultTimeout = cfg . experimental ?. mcp_timeout
637609
610+ // Single-flight reconnect: concurrent tool calls for the same MCP name
611+ // share one in-flight Promise instead of each triggering a new connect.
612+ // The entry is removed on both success and failure.
613+ const reconnectClient = ( name : string ) : Promise < boolean > => {
614+ const existing = reconnecting . get ( name )
615+ if ( existing ) return existing
616+ const p = layerBridge
617+ . promise ( getMcpConfig ( name ) )
618+ . then ( ( mcp ) => {
619+ if ( ! mcp ) return false
620+ return layerBridge
621+ . promise ( createAndStore ( name , { ...mcp , enabled : true } ) )
622+ . then ( ( status ) => status . status === "connected" )
623+ } )
624+ . catch ( ( err ) => {
625+ log . error ( "mcp reconnect failed" , { name, error : err instanceof Error ? err . message : String ( err ) } )
626+ return false
627+ } )
628+ . finally ( ( ) => {
629+ reconnecting . delete ( name )
630+ } )
631+ reconnecting . set ( name , p )
632+ return p
633+ }
634+
635+ // Wraps an MCP tool as an AI SDK dynamicTool. The key piece is the
636+ // catch branch in execute: on a transport error, call reconnectClient
637+ // and retry once with the fresh client. Non-transport errors and
638+ // failed reconnects are rethrown as-is so business errors stay visible.
639+ const makeTool = ( clientName : string , mcpTool : MCPToolDef , client : MCPClient , timeout ?: number ) : Tool => {
640+ const schema : JSONSchema7 = {
641+ ...( mcpTool . inputSchema as JSONSchema7 ) ,
642+ type : "object" ,
643+ properties : ( mcpTool . inputSchema . properties ?? { } ) as JSONSchema7 [ "properties" ] ,
644+ additionalProperties : false ,
645+ }
646+ return dynamicTool ( {
647+ description : mcpTool . description ?? "" ,
648+ inputSchema : jsonSchema ( schema ) ,
649+ execute : ( args : unknown ) => {
650+ const payload = {
651+ name : mcpTool . name ,
652+ arguments : ( args || { } ) as Record < string , unknown > ,
653+ }
654+ const opts = { resetTimeoutOnProgress : true , timeout }
655+ return client . callTool ( payload , CallToolResultSchema , opts ) . catch ( async ( e ) => {
656+ if ( ! isTransportError ( e ) ) throw e
657+ log . warn ( "mcp transport error, attempting reconnect" , {
658+ clientName,
659+ tool : mcpTool . name ,
660+ error : e instanceof Error ? e . message : String ( e ) ,
661+ } )
662+ const ok = await reconnectClient ( clientName )
663+ if ( ! ok ) throw e
664+ const next = await layerBridge . promise ( InstanceState . get ( state ) )
665+ const fresh = next . clients [ clientName ]
666+ if ( ! fresh || next . status [ clientName ] ?. status !== "connected" ) throw e
667+ return fresh . callTool ( payload , CallToolResultSchema , opts )
668+ } )
669+ } ,
670+ } )
671+ }
672+
638673 const connectedClients = Object . entries ( s . clients ) . filter (
639674 ( [ clientName ] ) => s . status [ clientName ] ?. status === "connected" ,
640675 )
@@ -654,7 +689,12 @@ export const layer = Layer.effect(
654689
655690 const timeout = entry ?. timeout ?? defaultTimeout
656691 for ( const mcpTool of listed ) {
657- result [ sanitize ( clientName ) + "_" + sanitize ( mcpTool . name ) ] = convertMcpTool ( mcpTool , client , timeout )
692+ result [ sanitize ( clientName ) + "_" + sanitize ( mcpTool . name ) ] = makeTool (
693+ clientName ,
694+ mcpTool ,
695+ client ,
696+ timeout ,
697+ )
658698 }
659699 } ) ,
660700 { concurrency : "unbounded" } ,
0 commit comments