@@ -34,7 +34,6 @@ export type Info = z.infer<typeof Info>
3434export const ConnectionStatus = z . object ( {
3535 workspaceID : WorkspaceID . zod ,
3636 status : z . enum ( [ "connected" , "connecting" , "disconnected" , "error" ] ) ,
37- error : z . string ( ) . optional ( ) ,
3837} )
3938export type ConnectionStatus = z . infer < typeof ConnectionStatus >
4039
@@ -345,10 +344,10 @@ const connections = new Map<WorkspaceID, ConnectionStatus>()
345344const aborts = new Map < WorkspaceID , AbortController > ( )
346345const TIMEOUT = 5000
347346
348- function setStatus ( id : WorkspaceID , status : ConnectionStatus [ "status" ] , error ?: string ) {
347+ function setStatus ( id : WorkspaceID , status : ConnectionStatus [ "status" ] ) {
349348 const prev = connections . get ( id )
350- if ( prev ?. status === status && prev ?. error === error ) return
351- const next = { workspaceID : id , status, error }
349+ if ( prev ?. status === status ) return
350+ const next = { workspaceID : id , status }
352351 connections . set ( id , next )
353352
354353 if ( status === "error" ) {
@@ -425,68 +424,78 @@ function route(url: string | URL, path: string) {
425424 return next
426425}
427426
428- async function syncWorkspace ( space : Info , signal : AbortSignal ) {
429- while ( ! signal . aborted ) {
430- log . info ( "connecting to global sync" , { workspace : space . name } )
431- setStatus ( space . id , "connecting" )
427+ async function connectSSE ( url : URL | string , headers : HeadersInit | undefined , signal : AbortSignal ) {
428+ const res = await fetch ( route ( url , "/global/event" ) , {
429+ method : "GET" ,
430+ headers,
431+ signal,
432+ } )
432433
433- const adaptor = await getAdaptor ( space . projectID , space . type )
434- const target = await adaptor . target ( space )
434+ if ( ! res . ok ) throw new Error ( `Workspace sync HTTP failure: ${ res . status } ` )
435+ if ( ! res . body ) throw new Error ( "No response body from global sync" )
435436
436- if ( target . type === "local" ) return
437+ return res . body
438+ }
437439
438- const res = await fetch ( route ( target . url , "/global/event" ) , {
439- method : "GET" ,
440- headers : target . headers ,
441- signal,
442- } ) . catch ( ( err : unknown ) => {
443- setStatus ( space . id , "error" , err instanceof Error ? err . message : String ( err ) )
440+ async function syncWorkspaceLoop ( space : Info , signal : AbortSignal ) {
441+ const adaptor = await getAdaptor ( space . projectID , space . type )
442+ const target = await adaptor . target ( space )
443+
444+ if ( target . type === "local" ) return null
445+
446+ let attempt = 0
447+
448+ while ( ! signal . aborted ) {
449+ log . info ( "connecting to global sync" , { workspace : space . name } )
450+ setStatus ( space . id , "connecting" )
444451
452+ let stream
453+ try {
454+ stream = await connectSSE ( target . url , target . headers , signal )
455+ } catch ( err ) {
456+ setStatus ( space . id , "error" )
445457 log . info ( "failed to connect to global sync" , {
446458 workspace : space . name ,
447- error : err ,
459+ err,
448460 } )
449- return undefined
450- } )
451-
452- if ( ! res || ! res . ok || ! res . body ) {
453- const error = ! res ? "No response from global sync" : `Global sync HTTP ${ res . status } `
454- log . info ( "failed to connect to global sync" , { workspace : space . name , error } )
455- setStatus ( space . id , "error" , error )
456- await sleep ( 1000 )
457- continue
458461 }
459462
460- log . info ( "global sync connected" , { workspace : space . name } )
461- setStatus ( space . id , "connected" )
463+ if ( stream ) {
464+ attempt = 0
462465
463- await parseSSE ( res . body , signal , ( evt : any ) => {
464- try {
465- if ( ! ( "payload" in evt ) ) return
466+ log . info ( "global sync connected" , { workspace : space . name } )
467+ setStatus ( space . id , "connected" )
466468
467- if ( evt . payload . type === "sync" ) {
468- SyncEvent . replay ( evt . payload . syncEvent as SyncEvent . SerializedEvent )
469- }
469+ await parseSSE ( stream , signal , ( evt : any ) => {
470+ try {
471+ if ( ! ( "payload" in evt ) ) return
470472
471- GlobalBus . emit ( "event" , {
472- directory : evt . directory ,
473- project : evt . project ,
474- workspace : space . id ,
475- payload : evt . payload ,
476- } )
477- } catch ( err ) {
478- log . info ( "failed to replay global event" , {
479- workspaceID : space . id ,
480- error : err ,
481- } )
482- }
483- } )
473+ if ( evt . payload . type === "sync" ) {
474+ SyncEvent . replay ( evt . payload . syncEvent as SyncEvent . SerializedEvent )
475+ }
484476
485- log . info ( "disconnected from global sync: " + space . id )
486- setStatus ( space . id , "disconnected" )
477+ GlobalBus . emit ( "event" , {
478+ directory : evt . directory ,
479+ project : evt . project ,
480+ workspace : space . id ,
481+ payload : evt . payload ,
482+ } )
483+ } catch ( err ) {
484+ log . info ( "failed to replay global event" , {
485+ workspaceID : space . id ,
486+ error : err ,
487+ } )
488+ }
489+ } )
490+
491+ log . info ( "disconnected from global sync: " + space . id )
492+ setStatus ( space . id , "disconnected" )
493+ }
487494
488- // TODO: Implement exponential backoff
489- await sleep ( 1000 )
495+ // Back off reconnect attempts up to 2 minutes while the workspace
496+ // stays unavailable.
497+ await sleep ( Math . min ( 120_000 , 1_000 * 2 ** attempt ) )
498+ attempt += 1
490499 }
491500}
492501
@@ -498,7 +507,7 @@ async function startSync(space: Info) {
498507
499508 if ( target . type === "local" ) {
500509 void Filesystem . exists ( target . directory ) . then ( ( exists ) => {
501- setStatus ( space . id , exists ? "connected" : "error" , exists ? undefined : "directory does not exist" )
510+ setStatus ( space . id , exists ? "connected" : "error" )
502511 } )
503512 return
504513 }
@@ -510,10 +519,10 @@ async function startSync(space: Info) {
510519 const abort = new AbortController ( )
511520 aborts . set ( space . id , abort )
512521
513- void syncWorkspace ( space , abort . signal ) . catch ( ( error ) => {
522+ void syncWorkspaceLoop ( space , abort . signal ) . catch ( ( error ) => {
514523 aborts . delete ( space . id )
515524
516- setStatus ( space . id , "error" , String ( error ) )
525+ setStatus ( space . id , "error" )
517526 log . warn ( "workspace listener failed" , {
518527 workspaceID : space . id ,
519528 error,
0 commit comments