diff --git a/packages/client/src/__tests__/sse-connection.test.ts b/packages/client/src/__tests__/sse-connection.test.ts index d0a9db18..13a4c582 100644 --- a/packages/client/src/__tests__/sse-connection.test.ts +++ b/packages/client/src/__tests__/sse-connection.test.ts @@ -290,20 +290,523 @@ describe('SseConnection', () => { expect(conn.isConnected()).toBe(false); }); - it('includes sessions in reconnect URL', () => { - const conn = new SseConnection(createConfig()); + it('sends reconnect POST on welcome when has tracked sessions', () => { + const mockFetch = vi.fn().mockResolvedValue({ ok: true }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); conn.connect(); lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); // Track a session conn.trackSeq('sess-1', 10); + // Force reconnect — creates new EventSource + conn.checkAndReconnect(true); + mockFetch.mockClear(); + + // New welcome arrives — should trigger reconnect POST + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + + expect(mockFetch).toHaveBeenCalledWith( + 'https://localhost:3100/api/chat/reconnect', + expect.objectContaining({ + method: 'POST', + body: JSON.stringify({ + type: 'reconnect', + sessions: [{ sessionId: 'sess-1', lastSeq: 10 }], + }), + }), + ); + }); + + it('defers _connected until reconnect POST completes', async () => { + let resolveReconnect!: (v: { ok: true }) => void; + const mockFetch = vi.fn().mockImplementation((_url: string) => { + if (_url.includes('/reconnect')) { + return new Promise((resolve) => { + resolveReconnect = resolve; + }); + } + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect + conn.checkAndReconnect(true); + mockFetch.mockClear(); + listener.mockClear(); + + // New welcome — reconnect POST fires but doesn't resolve yet + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + + // _connected should still be false while POST is in-flight + expect(conn.isConnected()).toBe(false); + expect(listener).not.toHaveBeenCalledWith({ type: '_open' }); + + // Resolve the reconnect POST + resolveReconnect({ ok: true }); + await vi.runAllTimersAsync(); + + // Now _connected should be true and _open emitted + expect(conn.isConnected()).toBe(true); + expect(listener).toHaveBeenCalledWith({ type: '_open' }); + }); + + it('bails out if disconnect() called during in-flight reconnect POST', async () => { + let resolveReconnect!: (v: { ok: true }) => void; + const mockFetch = vi.fn().mockImplementation((_url: string) => { + if (_url.includes('/reconnect')) { + return new Promise((resolve) => { + resolveReconnect = resolve; + }); + } + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect + conn.checkAndReconnect(true); + + // New welcome — reconnect POST in-flight + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + + // Disconnect while POST is in-flight + conn.disconnect(); + expect(conn.isConnected()).toBe(false); + listener.mockClear(); + + // Resolve the reconnect POST — .finally() should bail out + resolveReconnect({ ok: true }); + await vi.runAllTimersAsync(); + + // Must remain disconnected — .finally() must not overwrite + expect(conn.isConnected()).toBe(false); + expect(listener).not.toHaveBeenCalledWith({ type: '_open' }); + }); + + it('ignores stale reconnect POST when a newer welcome arrives', async () => { + const reconnectCalls: Array<(v: { ok: true }) => void> = []; + const mockFetch = vi.fn().mockImplementation((_url: string) => { + if (_url.includes('/reconnect')) { + return new Promise((resolve) => { + reconnectCalls.push(resolve); + }); + } + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect + conn.checkAndReconnect(true); + + // First welcome — reconnect POST #1 in-flight + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + const resolveFirst = reconnectCalls[0]; + + // Second welcome arrives (rapid reconnect race) — reconnect POST #2 in-flight + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-ghi' }); + const resolveSecond = reconnectCalls[1]; + + // Resolve the FIRST (stale) reconnect POST + resolveFirst({ ok: true }); + await vi.runAllTimersAsync(); + + // Must NOT set _connected — connectionId has moved on to conn-ghi + expect(conn.isConnected()).toBe(false); + expect(conn.getConnectionId()).toBe('conn-ghi'); + + // Resolve the SECOND (current) reconnect POST + listener.mockClear(); + resolveSecond({ ok: true }); + await vi.runAllTimersAsync(); + + // Now _connected should be true + expect(conn.isConnected()).toBe(true); + expect(listener).toHaveBeenCalledWith({ type: '_open' }); + }); + + it('flushes pending sends only after reconnect POST completes', async () => { + let resolveReconnect!: (v: { ok: true }) => void; + const postEndpoints: string[] = []; + const mockFetch = vi.fn().mockImplementation((url: string) => { + const endpoint = url.replace('https://localhost:3100/api/chat/', ''); + postEndpoints.push(endpoint); + if (url.includes('/reconnect')) { + return new Promise((resolve) => { + resolveReconnect = resolve; + }); + } + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect — sends are now queued + conn.checkAndReconnect(true); + conn.send({ type: 'send', prompt: 'queued msg', clientMsgId: 'q-1' }); + postEndpoints.length = 0; + + // Welcome — reconnect POST fires, queued send waits + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + + // Only reconnect POST should have fired, not the queued send + expect(postEndpoints).toEqual(['reconnect']); + + // Resolve reconnect — now the queued send should flush + resolveReconnect({ ok: true }); + await vi.runAllTimersAsync(); + + expect(postEndpoints).toEqual(['reconnect', 'send']); + }); + + it('stays disconnected when reconnect POST fails', async () => { + const mockFetch = vi.fn().mockImplementation((url: string) => { + if (url.includes('/reconnect')) { + return Promise.resolve({ ok: false, status: 500 }); + } + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect + conn.checkAndReconnect(true); + conn.send({ type: 'send', prompt: 'should stay queued', clientMsgId: 'q-1' }); + listener.mockClear(); + + // New welcome — reconnect POST will fail + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + await vi.runAllTimersAsync(); + + // Must stay disconnected — server never ran handleReconnect + expect(conn.isConnected()).toBe(false); + expect(listener).not.toHaveBeenCalledWith({ type: '_open' }); + }); + + it('stays disconnected when reconnect POST throws network error', async () => { + const mockFetch = vi.fn().mockImplementation((url: string) => { + if (url.includes('/reconnect')) { + return Promise.reject(new Error('network error')); + } + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + conn.checkAndReconnect(true); + listener.mockClear(); + + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + await vi.runAllTimersAsync(); + + expect(conn.isConnected()).toBe(false); + expect(listener).not.toHaveBeenCalledWith({ type: '_open' }); + }); + + it('recovers after failed reconnect when EventSource auto-reconnects', async () => { + let callCount = 0; + const mockFetch = vi.fn().mockImplementation((url: string) => { + if (url.includes('/reconnect')) { + callCount++; + // First reconnect fails, second succeeds + if (callCount === 1) return Promise.resolve({ ok: false, status: 500 }); + return Promise.resolve({ ok: true }); + } + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect + conn.checkAndReconnect(true); + listener.mockClear(); + + // First welcome — reconnect POST fails + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + await vi.runAllTimersAsync(); + expect(conn.isConnected()).toBe(false); + + // EventSource auto-reconnect fires a new welcome + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-ghi' }); + await vi.runAllTimersAsync(); + + // Second attempt succeeds + expect(conn.isConnected()).toBe(true); + expect(listener).toHaveBeenCalledWith({ type: '_open' }); + }); + + it('dispatches SSE events to listener while reconnect POST is in-flight', async () => { + let resolveReconnect!: (v: { ok: boolean }) => void; + const mockFetch = vi.fn().mockImplementation((url: string) => { + if (url.includes('/reconnect')) { + return new Promise((resolve) => { + resolveReconnect = resolve; + }); + } + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + conn.checkAndReconnect(true); + listener.mockClear(); + + // Welcome — reconnect POST in-flight, _connected = false + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + expect(conn.isConnected()).toBe(false); + + // Server replays events via SSE while reconnect POST is processing. + // onmessage is independent of _connected — these must still dispatch. + lastES()._emit('block_delta', { + type: 'block_delta', + sessionId: 'sess-1', + seq: 11, + delta: 'replayed', + }); + + expect(listener).toHaveBeenCalledWith( + expect.objectContaining({ type: 'block_delta', delta: 'replayed' }), + ); + + resolveReconnect({ ok: true }); + await vi.runAllTimersAsync(); + }); + + it('queued sends survive POST failure and flush on successful retry', async () => { + let callCount = 0; + const postEndpoints: string[] = []; + const mockFetch = vi.fn().mockImplementation((url: string) => { + const endpoint = url.replace('https://localhost:3100/api/chat/', ''); + if (url.includes('/reconnect')) { + callCount++; + if (callCount === 1) return Promise.resolve({ ok: false, status: 500 }); + postEndpoints.push(endpoint); + return Promise.resolve({ ok: true }); + } + postEndpoints.push(endpoint); + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect and queue a send + conn.checkAndReconnect(true); + conn.send({ type: 'send', prompt: 'must survive', clientMsgId: 'q-1' }); + postEndpoints.length = 0; + + // First welcome — reconnect fails, send stays queued + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + await vi.runAllTimersAsync(); + expect(conn.isConnected()).toBe(false); + expect(postEndpoints).toEqual([]); + + // Second welcome — reconnect succeeds, queued send flushes + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-ghi' }); + await vi.runAllTimersAsync(); + expect(conn.isConnected()).toBe(true); + expect(postEndpoints).toEqual(['reconnect', 'send']); + }); + + it('schedules delayed reconnect when reconnect POST fails', async () => { + const mockFetch = vi.fn().mockImplementation((url: string) => { + if (url.includes('/reconnect')) { + return Promise.resolve({ ok: false, status: 500 }); + } + return Promise.resolve({ ok: true }); + }); + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect + conn.checkAndReconnect(true); + const esCountBefore = MockEventSource.instances.length; + + // Welcome — reconnect POST will fail + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + await vi.runAllTimersAsync(); + + // Should have scheduled a delayed reconnect (new ES after timer) + expect(conn.isConnected()).toBe(false); + expect(MockEventSource.instances.length).toBeGreaterThan(esCountBefore); + + warnSpy.mockRestore(); + }); + + it('stale doReconnectPost does not set _connected when checkAndReconnect fires mid-flight', async () => { + let resolveReconnect!: (v: { ok: boolean }) => void; + const mockFetch = vi.fn().mockImplementation((url: string) => { + if (url.includes('/reconnect')) { + return new Promise((resolve) => { + resolveReconnect = resolve; + }); + } + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect — creates ES2 + conn.checkAndReconnect(true); + + // ES2 welcome — doReconnectPost(conn-def) starts + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + + // checkAndReconnect fires again while POST is in-flight — creates ES3 + conn.checkAndReconnect(true); + + // Stale POST resolves successfully — must NOT set _connected + resolveReconnect({ ok: true }); + await vi.runAllTimersAsync(); + + // ES3 hasn't welcomed yet, so _connected must remain false + expect(conn.isConnected()).toBe(false); + }); + + it('does not emit _close when checkAndReconnect called while already disconnected', () => { + const conn = new SseConnection(createConfig()); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + // Don't send welcome — _connected stays false + + conn.checkAndReconnect(true); + + // _close should NOT have been emitted since we were never connected + expect(listener).not.toHaveBeenCalledWith({ type: '_close' }); + }); + + it('recovers via scheduleReconnect after repeated POST failures', async () => { + let reconnectCallCount = 0; + const mockFetch = vi.fn().mockImplementation((url: string) => { + if (url.includes('/reconnect')) { + reconnectCallCount++; + // Fail first two (initial + forced retry), succeed on third + if (reconnectCallCount <= 2) return Promise.resolve({ ok: false, status: 500 }); + return Promise.resolve({ ok: true }); + } + return Promise.resolve({ ok: true }); + }); + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect — POST fails, triggers checkAndReconnect(true) + conn.checkAndReconnect(true); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + await vi.runAllTimersAsync(); + + // Forced retry also fails + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-ghi' }); + await vi.runAllTimersAsync(); + expect(conn.isConnected()).toBe(false); + + // Third welcome — reconnect POST succeeds + listener.mockClear(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-jkl' }); + await vi.runAllTimersAsync(); + + expect(conn.isConnected()).toBe(true); + expect(listener).toHaveBeenCalledWith({ type: '_open' }); + warnSpy.mockRestore(); + }); + + it('connects immediately on reconnect when seqBySession is empty', () => { + const mockFetch = vi.fn().mockResolvedValue({ ok: true }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + + // Track and then clear — simulates all sessions being closed + conn.trackSeq('sess-1', 10); + conn.clearSession('sess-1'); + + // Force reconnect — _isReconnect=true but seqBySession is empty + conn.checkAndReconnect(true); + mockFetch.mockClear(); + listener.mockClear(); + + // New welcome — should skip reconnect POST and connect immediately + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + + expect(mockFetch).not.toHaveBeenCalled(); + expect(conn.isConnected()).toBe(true); + expect(listener).toHaveBeenCalledWith({ type: '_open' }); + }); + + it('does not send reconnect POST on first connection', () => { + const mockFetch = vi.fn().mockResolvedValue({ ok: true }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + conn.connect(); + + // Track a session BEFORE welcome (simulating a pre-existing session) + conn.trackSeq('sess-1', 5); + + // First welcome — _isReconnect is false, so no reconnect POST + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + + expect(mockFetch).not.toHaveBeenCalledWith( + 'https://localhost:3100/api/chat/reconnect', + expect.any(Object), + ); + }); + + it('reconnect URL never includes sessions query param', () => { + const conn = new SseConnection(createConfig()); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + // Force reconnect conn.checkAndReconnect(true); + // URL should be clean — reconnect is handled via POST, not query param const newES = lastES(); - expect(newES.url).toContain('sessions='); - expect(newES.url).toContain('sess-1%7C10'); + expect(newES.url).toBe('https://localhost:3100/api/chat/events'); }); it('checkAndReconnect(false) is no-op when connected', () => { diff --git a/packages/client/src/sse-connection.ts b/packages/client/src/sse-connection.ts index 938374e9..8ca85ed8 100644 --- a/packages/client/src/sse-connection.ts +++ b/packages/client/src/sse-connection.ts @@ -180,8 +180,11 @@ export class SseConnection implements ChatConnection { this.es.close(); this.es = null; } + const wasConnected = this._connected; this._connected = false; - this.listener?.({ type: '_close' }); + if (wasConnected) { + this.listener?.({ type: '_close' }); + } this.doConnect(); } @@ -195,14 +198,11 @@ export class SseConnection implements ChatConnection { this.reconnectTimer = null; } - // Build URL with reconnect sessions if this is a reconnect - let url = `${this.config.baseUrl}/api/chat/events`; - if (this._isReconnect && this.seqBySession.size > 0) { - const sessionsParam = Array.from(this.seqBySession.entries()) - .map(([sid, seq]) => `${sid}|${seq}`) - .join(','); - url += `?sessions=${encodeURIComponent(sessionsParam)}`; - } + // Always use the base URL — reconnect sessions are sent via POST in the + // welcome handler. This avoids the bug where EventSource auto-reconnect + // reuses the original URL (missing ?sessions=), and eliminates double + // handleReconnect when doConnect() AND welcome both trigger it. + const url = `${this.config.baseUrl}/api/chat/events`; const es = this.config.createEventSource(url); this.es = es; @@ -216,11 +216,20 @@ export class SseConnection implements ChatConnection { return; } this._connectionId = msg.connectionId as string; - this._connected = true; - this._isReconnect = true; - this.flushPendingSends(); - this.listener?.({ type: '_open' }); + // _connected deferred until doReconnectPost succeeds — prevents + // external send() from bypassing the pending queue mid-reconnect. + // Capture both connectionId and ES instance for the staleness guard. + const welcomeConnectionId = this._connectionId; + const welcomeEs = this.es; + if (this._isReconnect && this.seqBySession.size > 0) { + this.doReconnectPost(welcomeConnectionId, welcomeEs); + } else { + this._connected = true; + this.flushPendingSends(); + this.listener?.({ type: '_open' }); + } + this._isReconnect = true; }); // Catch-all for session events. Server sends all non-welcome events as @@ -253,6 +262,66 @@ export class SseConnection implements ChatConnection { // but we wait for the 'welcome' event before marking as connected. } + /** + * Send the reconnect POST and only mark connected on success. + * + * On failure the client stays disconnected — the next EventSource + * auto-reconnect will trigger a fresh welcome + retry. This prevents + * flushing pending sends into the void when the server never ran + * handleReconnect (no watch, no reattach, no replay). + */ + private async doReconnectPost( + welcomeConnectionId: string, + welcomeEs: EventSource | null, + ): Promise { + try { + const res = await this.config.fetch(`${this.config.baseUrl}/api/chat/reconnect`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Connection-ID': welcomeConnectionId, + }, + body: JSON.stringify({ + type: 'reconnect', + sessions: Array.from(this.seqBySession.entries()).map(([sessionId, lastSeq]) => ({ + sessionId, + lastSeq, + })), + }), + }); + + // Guard: bail if disconnect() was called, a newer welcome arrived, + // or checkAndReconnect replaced the EventSource while in-flight. + if (!this.es || this.es !== welcomeEs || this._connectionId !== welcomeConnectionId) return; + + if (res.ok) { + this._connected = true; + this.flushPendingSends(); + this.listener?.({ type: '_open' }); + } else { + console.warn('[SseConnection] reconnect POST returned', res.status); + this.scheduleReconnect(); + } + } catch (err) { + if (!this.es || this.es !== welcomeEs || this._connectionId !== welcomeConnectionId) return; + console.warn('[SseConnection] reconnect POST failed', err); + this.scheduleReconnect(); + } + } + + /** Tear down and reconnect after a delay to avoid tight retry loops. */ + private scheduleReconnect(): void { + if (this.reconnectTimer) return; + if (this.es) { + this.es.close(); + this.es = null; + } + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = null; + this.doConnect(); + }, this.config.reconnectDelayMs); + } + private async doPost(endpoint: string, body: Record): Promise { if (!this._connectionId) return; try { diff --git a/server/index.ts b/server/index.ts index b70235cd..f404a209 100644 --- a/server/index.ts +++ b/server/index.ts @@ -82,7 +82,6 @@ import { ConnectionRegistry } from '@mitzo/harness'; import { isHelloHandshake, handleHello, - handleReconnect, dispatchV2Message, type V2HandlerContext, } from './ws-handler-v2.js'; @@ -388,21 +387,10 @@ app.get('/api/chat/events', (req, res) => { connectionId, }); - // Handle reconnect if sessions provided as query param - const sessionsParam = req.query.sessions as string | undefined; - if (sessionsParam) { - try { - const sessions = sessionsParam.split(',').map((entry) => { - const sep = entry.lastIndexOf('|'); - const sessionId = sep > 0 ? entry.slice(0, sep) : entry; - const seqStr = sep > 0 ? entry.slice(sep + 1) : '0'; - return { sessionId, lastSeq: parseInt(seqStr, 10) || 0 }; - }); - handleReconnect(connectionId, { type: 'reconnect', sessions }, v2Ctx); - } catch (err) { - log.warn('SSE chat reconnect parse error', { connectionId, error: String(err) }); - } - } + // Reconnect is handled via POST /api/chat/reconnect — the client sends + // a reconnect POST on every welcome event. The old ?sessions= query param + // path was removed because EventSource auto-reconnect reuses the original + // URL (without the param), making it unreliable. log.info('SSE chat stream connected', { connectionId });