From ef854d9856f6d9aabe2aa682aa77d149142ed3ae Mon Sep 17 00:00:00 2001 From: dimakis Date: Fri, 12 Jun 2026 22:32:55 +0100 Subject: [PATCH 01/10] fix(client): send reconnect POST on every SSE welcome to fix session reattach MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit EventSource auto-reconnect reuses the original URL, which never included the ?sessions= query param. This meant the server never ran handleReconnect on auto-reconnect — no watch, no reattach, no event replay. Users had to send multiple messages before the session would respond. Move reconnect from URL query param to explicit POST in the welcome handler. This fires on every reconnect (auto or explicit), ensuring the server always runs the full reconnect flow: watch + reattach + event replay. Co-Authored-By: Claude Opus 4.6 --- .../src/__tests__/sse-connection.test.ts | 51 +++++++++++++++++-- packages/client/src/sse-connection.ts | 28 +++++++--- 2 files changed, 67 insertions(+), 12 deletions(-) diff --git a/packages/client/src/__tests__/sse-connection.test.ts b/packages/client/src/__tests__/sse-connection.test.ts index d0a9db18..c8e9f288 100644 --- a/packages/client/src/__tests__/sse-connection.test.ts +++ b/packages/client/src/__tests__/sse-connection.test.ts @@ -290,20 +290,63 @@ describe('SseConnection', () => { expect(conn.isConnected()).toBe(false); }); - it('includes sessions in reconnect URL', () => { - const conn = new SseConnection(createConfig()); + it('sends reconnect POST on welcome when has tracked sessions', () => { + const mockFetch = vi.fn().mockResolvedValue({ ok: true }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); conn.connect(); lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); // Track a session conn.trackSeq('sess-1', 10); + // Force reconnect — creates new EventSource + conn.checkAndReconnect(true); + mockFetch.mockClear(); + + // New welcome arrives — should trigger reconnect POST + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + + expect(mockFetch).toHaveBeenCalledWith( + 'https://localhost:3100/api/chat/reconnect', + expect.objectContaining({ + method: 'POST', + body: JSON.stringify({ + type: 'reconnect', + sessions: [{ sessionId: 'sess-1', lastSeq: 10 }], + }), + }), + ); + }); + + it('does not send reconnect POST on first connection', () => { + const mockFetch = vi.fn().mockResolvedValue({ ok: true }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + conn.connect(); + + // Track a session BEFORE welcome (simulating a pre-existing session) + conn.trackSeq('sess-1', 5); + + // First welcome — _isReconnect is false, so no reconnect POST + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + + expect(mockFetch).not.toHaveBeenCalledWith( + 'https://localhost:3100/api/chat/reconnect', + expect.any(Object), + ); + }); + + it('reconnect URL never includes sessions query param', () => { + const conn = new SseConnection(createConfig()); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + // Force reconnect conn.checkAndReconnect(true); + // URL should be clean — reconnect is handled via POST, not query param const newES = lastES(); - expect(newES.url).toContain('sessions='); - expect(newES.url).toContain('sess-1%7C10'); + expect(newES.url).toBe('https://localhost:3100/api/chat/events'); }); it('checkAndReconnect(false) is no-op when connected', () => { diff --git a/packages/client/src/sse-connection.ts b/packages/client/src/sse-connection.ts index 938374e9..7a1d2adc 100644 --- a/packages/client/src/sse-connection.ts +++ b/packages/client/src/sse-connection.ts @@ -195,14 +195,11 @@ export class SseConnection implements ChatConnection { this.reconnectTimer = null; } - // Build URL with reconnect sessions if this is a reconnect - let url = `${this.config.baseUrl}/api/chat/events`; - if (this._isReconnect && this.seqBySession.size > 0) { - const sessionsParam = Array.from(this.seqBySession.entries()) - .map(([sid, seq]) => `${sid}|${seq}`) - .join(','); - url += `?sessions=${encodeURIComponent(sessionsParam)}`; - } + // Always use the base URL — reconnect sessions are sent via POST in the + // welcome handler. This avoids the bug where EventSource auto-reconnect + // reuses the original URL (missing ?sessions=), and eliminates double + // handleReconnect when doConnect() AND welcome both trigger it. + const url = `${this.config.baseUrl}/api/chat/events`; const es = this.config.createEventSource(url); this.es = es; @@ -217,6 +214,21 @@ export class SseConnection implements ChatConnection { } this._connectionId = msg.connectionId as string; this._connected = true; + + // Always send reconnect on welcome if we have tracked sessions. + // Native EventSource auto-reconnect reuses the original URL (without + // ?sessions= param), so the server never runs handleReconnect — no + // watch, no reattach, no event replay. This POST ensures every + // reconnect (auto or explicit) triggers the full server-side + // reconnect flow: watch + reattach + event replay. + if (this._isReconnect && this.seqBySession.size > 0) { + this.doPost('reconnect', { + type: 'reconnect', + sessions: Array.from(this.seqBySession.entries()).map( + ([sessionId, lastSeq]) => ({ sessionId, lastSeq }), + ), + }); + } this._isReconnect = true; this.flushPendingSends(); From f83df2a2a3b6cda9e86d0d74b3352bd1738ebc10 Mon Sep 17 00:00:00 2001 From: dimakis Date: Mon, 22 Jun 2026 11:06:07 +0100 Subject: [PATCH 02/10] fix(client): await reconnect POST before flushing pending sends The welcome handler was firing flushPendingSends() and _open immediately after doPost('reconnect', ...) without awaiting. This caused user messages queued during tool execution to race the server-side reconnect setup (watch + reattach + replay), resulting in messages being silently dropped. Use .finally() to ensure pending sends only flush after the reconnect POST completes, matching the server's session reattach lifecycle. Co-Authored-By: Claude Opus 4.6 --- packages/client/src/sse-connection.ts | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/packages/client/src/sse-connection.ts b/packages/client/src/sse-connection.ts index 7a1d2adc..cface7e4 100644 --- a/packages/client/src/sse-connection.ts +++ b/packages/client/src/sse-connection.ts @@ -224,15 +224,19 @@ export class SseConnection implements ChatConnection { if (this._isReconnect && this.seqBySession.size > 0) { this.doPost('reconnect', { type: 'reconnect', - sessions: Array.from(this.seqBySession.entries()).map( - ([sessionId, lastSeq]) => ({ sessionId, lastSeq }), - ), + sessions: Array.from(this.seqBySession.entries()).map(([sessionId, lastSeq]) => ({ + sessionId, + lastSeq, + })), + }).finally(() => { + this.flushPendingSends(); + this.listener?.({ type: '_open' }); }); + } else { + this.flushPendingSends(); + this.listener?.({ type: '_open' }); } this._isReconnect = true; - - this.flushPendingSends(); - this.listener?.({ type: '_open' }); }); // Catch-all for session events. Server sends all non-welcome events as From 1d0c25ca25a07d2acf7e052b6dce18c54b77810a Mon Sep 17 00:00:00 2001 From: dimakis Date: Mon, 22 Jun 2026 11:12:16 +0100 Subject: [PATCH 03/10] fix(client): defer _connected until reconnect POST completes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses Centaur review: _connected = true was set before the reconnect POST, allowing external send() calls to bypass the pending queue and fire before the server reattached the session — defeating the flush ordering fix. Move _connected into both branches so it's only set when it's safe to send directly. Co-Authored-By: Claude Opus 4.6 --- packages/client/src/sse-connection.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/client/src/sse-connection.ts b/packages/client/src/sse-connection.ts index cface7e4..9034a6f7 100644 --- a/packages/client/src/sse-connection.ts +++ b/packages/client/src/sse-connection.ts @@ -213,7 +213,6 @@ export class SseConnection implements ChatConnection { return; } this._connectionId = msg.connectionId as string; - this._connected = true; // Always send reconnect on welcome if we have tracked sessions. // Native EventSource auto-reconnect reuses the original URL (without @@ -221,6 +220,10 @@ export class SseConnection implements ChatConnection { // watch, no reattach, no event replay. This POST ensures every // reconnect (auto or explicit) triggers the full server-side // reconnect flow: watch + reattach + event replay. + // + // _connected is deferred until reconnect completes — setting it earlier + // lets external send() bypass the pending queue before the server has + // reattached the session. if (this._isReconnect && this.seqBySession.size > 0) { this.doPost('reconnect', { type: 'reconnect', @@ -229,10 +232,12 @@ export class SseConnection implements ChatConnection { lastSeq, })), }).finally(() => { + this._connected = true; this.flushPendingSends(); this.listener?.({ type: '_open' }); }); } else { + this._connected = true; this.flushPendingSends(); this.listener?.({ type: '_open' }); } From ae90f9214d0329e1e58d1177fde3bca9d492563c Mon Sep 17 00:00:00 2001 From: dimakis Date: Mon, 22 Jun 2026 11:32:01 +0100 Subject: [PATCH 04/10] fix(client): guard .finally() against disconnect and double-welcome races Addresses Centaur review findings: - Defer _connected into both branches so external send() can't bypass the pending queue while reconnect POST is in-flight - Guard .finally() with es-null check (disconnect during POST) and connectionId staleness check (newer welcome arrived) - Add 4 tests: deferred _connected, dangling finally after disconnect, stale finally on double welcome, flush ordering after reconnect Co-Authored-By: Claude Opus 4.6 --- .../src/__tests__/sse-connection.test.ts | 157 ++++++++++++++++++ packages/client/src/sse-connection.ts | 4 + 2 files changed, 161 insertions(+) diff --git a/packages/client/src/__tests__/sse-connection.test.ts b/packages/client/src/__tests__/sse-connection.test.ts index c8e9f288..775104f2 100644 --- a/packages/client/src/__tests__/sse-connection.test.ts +++ b/packages/client/src/__tests__/sse-connection.test.ts @@ -318,6 +318,163 @@ describe('SseConnection', () => { ); }); + it('defers _connected until reconnect POST completes', async () => { + let resolveReconnect!: (v: { ok: true }) => void; + const mockFetch = vi.fn().mockImplementation((_url: string) => { + if (_url.includes('/reconnect')) { + return new Promise((resolve) => { + resolveReconnect = resolve; + }); + } + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect + conn.checkAndReconnect(true); + mockFetch.mockClear(); + listener.mockClear(); + + // New welcome — reconnect POST fires but doesn't resolve yet + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + + // _connected should still be false while POST is in-flight + expect(conn.isConnected()).toBe(false); + expect(listener).not.toHaveBeenCalledWith({ type: '_open' }); + + // Resolve the reconnect POST + resolveReconnect({ ok: true }); + await vi.runAllTimersAsync(); + + // Now _connected should be true and _open emitted + expect(conn.isConnected()).toBe(true); + expect(listener).toHaveBeenCalledWith({ type: '_open' }); + }); + + it('discards dangling .finally() if disconnect() called during reconnect POST', async () => { + let resolveReconnect!: (v: { ok: true }) => void; + const mockFetch = vi.fn().mockImplementation((_url: string) => { + if (_url.includes('/reconnect')) { + return new Promise((resolve) => { + resolveReconnect = resolve; + }); + } + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect + conn.checkAndReconnect(true); + + // New welcome — reconnect POST in-flight + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + + // Disconnect while POST is in-flight + conn.disconnect(); + expect(conn.isConnected()).toBe(false); + listener.mockClear(); + + // Resolve the reconnect POST — .finally() should bail out + resolveReconnect({ ok: true }); + await vi.runAllTimersAsync(); + + // Must remain disconnected — .finally() must not overwrite + expect(conn.isConnected()).toBe(false); + expect(listener).not.toHaveBeenCalledWith({ type: '_open' }); + }); + + it('discards stale .finally() when a newer welcome arrives', async () => { + const reconnectCalls: Array<(v: { ok: true }) => void> = []; + const mockFetch = vi.fn().mockImplementation((_url: string) => { + if (_url.includes('/reconnect')) { + return new Promise((resolve) => { + reconnectCalls.push(resolve); + }); + } + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect + conn.checkAndReconnect(true); + + // First welcome — reconnect POST #1 in-flight + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + const resolveFirst = reconnectCalls[0]; + + // Second welcome arrives (rapid reconnect race) — reconnect POST #2 in-flight + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-ghi' }); + const resolveSecond = reconnectCalls[1]; + + // Resolve the FIRST (stale) reconnect POST + resolveFirst({ ok: true }); + await vi.runAllTimersAsync(); + + // Must NOT set _connected — connectionId has moved on to conn-ghi + expect(conn.isConnected()).toBe(false); + expect(conn.getConnectionId()).toBe('conn-ghi'); + + // Resolve the SECOND (current) reconnect POST + listener.mockClear(); + resolveSecond({ ok: true }); + await vi.runAllTimersAsync(); + + // Now _connected should be true + expect(conn.isConnected()).toBe(true); + expect(listener).toHaveBeenCalledWith({ type: '_open' }); + }); + + it('flushes pending sends only after reconnect POST completes', async () => { + let resolveReconnect!: (v: { ok: true }) => void; + const postEndpoints: string[] = []; + const mockFetch = vi.fn().mockImplementation((url: string) => { + const endpoint = url.replace('https://localhost:3100/api/chat/', ''); + postEndpoints.push(endpoint); + if (url.includes('/reconnect')) { + return new Promise((resolve) => { + resolveReconnect = resolve; + }); + } + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect — sends are now queued + conn.checkAndReconnect(true); + conn.send({ type: 'send', prompt: 'queued msg', clientMsgId: 'q-1' }); + postEndpoints.length = 0; + + // Welcome — reconnect POST fires, queued send waits + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + + // Only reconnect POST should have fired, not the queued send + expect(postEndpoints).toEqual(['reconnect']); + + // Resolve reconnect — now the queued send should flush + resolveReconnect({ ok: true }); + await vi.runAllTimersAsync(); + + expect(postEndpoints).toEqual(['reconnect', 'send']); + }); + it('does not send reconnect POST on first connection', () => { const mockFetch = vi.fn().mockResolvedValue({ ok: true }); const conn = new SseConnection(createConfig({ fetch: mockFetch })); diff --git a/packages/client/src/sse-connection.ts b/packages/client/src/sse-connection.ts index 9034a6f7..fc3d73ce 100644 --- a/packages/client/src/sse-connection.ts +++ b/packages/client/src/sse-connection.ts @@ -224,6 +224,7 @@ export class SseConnection implements ChatConnection { // _connected is deferred until reconnect completes — setting it earlier // lets external send() bypass the pending queue before the server has // reattached the session. + const welcomeConnectionId = msg.connectionId as string; if (this._isReconnect && this.seqBySession.size > 0) { this.doPost('reconnect', { type: 'reconnect', @@ -232,6 +233,9 @@ export class SseConnection implements ChatConnection { lastSeq, })), }).finally(() => { + // Guard: bail if disconnect() was called or a newer welcome + // arrived while the POST was in-flight. + if (!this.es || this._connectionId !== welcomeConnectionId) return; this._connected = true; this.flushPendingSends(); this.listener?.({ type: '_open' }); From f310b84d2f3dcb0ced214bd44bc8ca463e7a3192 Mon Sep 17 00:00:00 2001 From: dimakis Date: Mon, 22 Jun 2026 12:05:52 +0100 Subject: [PATCH 05/10] fix(client): reject reconnect on POST failure, add recovery tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace .finally() with dedicated doReconnectPost() that checks res.ok — on failure, the client stays disconnected and lets EventSource auto- reconnect retry. Prevents flushing pending sends when the server never ran handleReconnect. Add 3 tests: POST returns 500, POST throws network error, recovery on next auto-reconnect welcome after a failed attempt. Co-Authored-By: Claude Opus 4.6 --- .../src/__tests__/sse-connection.test.ts | 88 +++++++++++++++++++ packages/client/src/sse-connection.ts | 57 +++++++++--- 2 files changed, 131 insertions(+), 14 deletions(-) diff --git a/packages/client/src/__tests__/sse-connection.test.ts b/packages/client/src/__tests__/sse-connection.test.ts index 775104f2..32185c98 100644 --- a/packages/client/src/__tests__/sse-connection.test.ts +++ b/packages/client/src/__tests__/sse-connection.test.ts @@ -475,6 +475,94 @@ describe('SseConnection', () => { expect(postEndpoints).toEqual(['reconnect', 'send']); }); + it('stays disconnected when reconnect POST fails', async () => { + const mockFetch = vi.fn().mockImplementation((url: string) => { + if (url.includes('/reconnect')) { + return Promise.resolve({ ok: false, status: 500 }); + } + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect + conn.checkAndReconnect(true); + conn.send({ type: 'send', prompt: 'should stay queued', clientMsgId: 'q-1' }); + listener.mockClear(); + + // New welcome — reconnect POST will fail + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + await vi.runAllTimersAsync(); + + // Must stay disconnected — server never ran handleReconnect + expect(conn.isConnected()).toBe(false); + expect(listener).not.toHaveBeenCalledWith({ type: '_open' }); + }); + + it('stays disconnected when reconnect POST throws network error', async () => { + const mockFetch = vi.fn().mockImplementation((url: string) => { + if (url.includes('/reconnect')) { + return Promise.reject(new Error('network error')); + } + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + conn.checkAndReconnect(true); + listener.mockClear(); + + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + await vi.runAllTimersAsync(); + + expect(conn.isConnected()).toBe(false); + expect(listener).not.toHaveBeenCalledWith({ type: '_open' }); + }); + + it('recovers after failed reconnect when EventSource auto-reconnects', async () => { + let callCount = 0; + const mockFetch = vi.fn().mockImplementation((url: string) => { + if (url.includes('/reconnect')) { + callCount++; + // First reconnect fails, second succeeds + if (callCount === 1) return Promise.resolve({ ok: false, status: 500 }); + return Promise.resolve({ ok: true }); + } + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect + conn.checkAndReconnect(true); + listener.mockClear(); + + // First welcome — reconnect POST fails + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + await vi.runAllTimersAsync(); + expect(conn.isConnected()).toBe(false); + + // EventSource auto-reconnect fires a new welcome + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-ghi' }); + await vi.runAllTimersAsync(); + + // Second attempt succeeds + expect(conn.isConnected()).toBe(true); + expect(listener).toHaveBeenCalledWith({ type: '_open' }); + }); + it('does not send reconnect POST on first connection', () => { const mockFetch = vi.fn().mockResolvedValue({ ok: true }); const conn = new SseConnection(createConfig({ fetch: mockFetch })); diff --git a/packages/client/src/sse-connection.ts b/packages/client/src/sse-connection.ts index fc3d73ce..1d5ad565 100644 --- a/packages/client/src/sse-connection.ts +++ b/packages/client/src/sse-connection.ts @@ -226,20 +226,7 @@ export class SseConnection implements ChatConnection { // reattached the session. const welcomeConnectionId = msg.connectionId as string; if (this._isReconnect && this.seqBySession.size > 0) { - this.doPost('reconnect', { - type: 'reconnect', - sessions: Array.from(this.seqBySession.entries()).map(([sessionId, lastSeq]) => ({ - sessionId, - lastSeq, - })), - }).finally(() => { - // Guard: bail if disconnect() was called or a newer welcome - // arrived while the POST was in-flight. - if (!this.es || this._connectionId !== welcomeConnectionId) return; - this._connected = true; - this.flushPendingSends(); - this.listener?.({ type: '_open' }); - }); + this.doReconnectPost(welcomeConnectionId); } else { this._connected = true; this.flushPendingSends(); @@ -278,6 +265,48 @@ export class SseConnection implements ChatConnection { // but we wait for the 'welcome' event before marking as connected. } + /** + * Send the reconnect POST and only mark connected on success. + * + * On failure the client stays disconnected — the next EventSource + * auto-reconnect will trigger a fresh welcome + retry. This prevents + * flushing pending sends into the void when the server never ran + * handleReconnect (no watch, no reattach, no replay). + */ + private async doReconnectPost(welcomeConnectionId: string): Promise { + try { + const res = await this.config.fetch(`${this.config.baseUrl}/api/chat/reconnect`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Connection-ID': welcomeConnectionId, + }, + body: JSON.stringify({ + type: 'reconnect', + sessions: Array.from(this.seqBySession.entries()).map(([sessionId, lastSeq]) => ({ + sessionId, + lastSeq, + })), + }), + }); + + // Guard: bail if disconnect() was called or a newer welcome + // arrived while the POST was in-flight. + if (!this.es || this._connectionId !== welcomeConnectionId) return; + + if (res.ok) { + this._connected = true; + this.flushPendingSends(); + this.listener?.({ type: '_open' }); + } + // On !res.ok: stay disconnected. EventSource auto-reconnect will + // trigger a new welcome and retry the reconnect POST. + } catch { + // Network error: same as !res.ok — stay disconnected, let + // EventSource auto-reconnect handle retry. + } + } + private async doPost(endpoint: string, body: Record): Promise { if (!this._connectionId) return; try { From a183c4e71c8c78cafad90972a786021e9586abef Mon Sep 17 00:00:00 2001 From: dimakis Date: Mon, 22 Jun 2026 12:12:10 +0100 Subject: [PATCH 06/10] fix(client): handle reconnect POST failure, remove dead query-param path Address Centaur review round 3: - Remove redundant cast; use this._connectionId snapshot directly - Remove dead server-side ?sessions= query param handler (unreachable since client now uses POST for all reconnects) and unused import - Add test: SSE events dispatch to listener while reconnect POST is in-flight (_connected=false doesn't gate onmessage) - Add test: queued sends survive POST failure and flush on successful retry via next auto-reconnect welcome Co-Authored-By: Claude Opus 4.6 --- .../src/__tests__/sse-connection.test.ts | 78 +++++++++++++++++++ packages/client/src/sse-connection.ts | 5 +- server/index.ts | 20 +---- 3 files changed, 86 insertions(+), 17 deletions(-) diff --git a/packages/client/src/__tests__/sse-connection.test.ts b/packages/client/src/__tests__/sse-connection.test.ts index 32185c98..cf4e7171 100644 --- a/packages/client/src/__tests__/sse-connection.test.ts +++ b/packages/client/src/__tests__/sse-connection.test.ts @@ -563,6 +563,84 @@ describe('SseConnection', () => { expect(listener).toHaveBeenCalledWith({ type: '_open' }); }); + it('dispatches SSE events to listener while reconnect POST is in-flight', async () => { + let resolveReconnect!: (v: { ok: boolean }) => void; + const mockFetch = vi.fn().mockImplementation((url: string) => { + if (url.includes('/reconnect')) { + return new Promise((resolve) => { + resolveReconnect = resolve; + }); + } + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + conn.checkAndReconnect(true); + listener.mockClear(); + + // Welcome — reconnect POST in-flight, _connected = false + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + expect(conn.isConnected()).toBe(false); + + // Server replays events via SSE while reconnect POST is processing. + // onmessage is independent of _connected — these must still dispatch. + lastES()._emit('block_delta', { + type: 'block_delta', + sessionId: 'sess-1', + seq: 11, + delta: 'replayed', + }); + + expect(listener).toHaveBeenCalledWith( + expect.objectContaining({ type: 'block_delta', delta: 'replayed' }), + ); + + resolveReconnect({ ok: true }); + await vi.runAllTimersAsync(); + }); + + it('queued sends survive POST failure and flush on successful retry', async () => { + let callCount = 0; + const postEndpoints: string[] = []; + const mockFetch = vi.fn().mockImplementation((url: string) => { + const endpoint = url.replace('https://localhost:3100/api/chat/', ''); + if (url.includes('/reconnect')) { + callCount++; + if (callCount === 1) return Promise.resolve({ ok: false, status: 500 }); + postEndpoints.push(endpoint); + return Promise.resolve({ ok: true }); + } + postEndpoints.push(endpoint); + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect and queue a send + conn.checkAndReconnect(true); + conn.send({ type: 'send', prompt: 'must survive', clientMsgId: 'q-1' }); + postEndpoints.length = 0; + + // First welcome — reconnect fails, send stays queued + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + await vi.runAllTimersAsync(); + expect(conn.isConnected()).toBe(false); + expect(postEndpoints).toEqual([]); + + // Second welcome — reconnect succeeds, queued send flushes + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-ghi' }); + await vi.runAllTimersAsync(); + expect(conn.isConnected()).toBe(true); + expect(postEndpoints).toEqual(['reconnect', 'send']); + }); + it('does not send reconnect POST on first connection', () => { const mockFetch = vi.fn().mockResolvedValue({ ok: true }); const conn = new SseConnection(createConfig({ fetch: mockFetch })); diff --git a/packages/client/src/sse-connection.ts b/packages/client/src/sse-connection.ts index 1d5ad565..5d722329 100644 --- a/packages/client/src/sse-connection.ts +++ b/packages/client/src/sse-connection.ts @@ -224,7 +224,10 @@ export class SseConnection implements ChatConnection { // _connected is deferred until reconnect completes — setting it earlier // lets external send() bypass the pending queue before the server has // reattached the session. - const welcomeConnectionId = msg.connectionId as string; + // Snapshot the current connectionId for the staleness guard in + // doReconnectPost — if a newer welcome arrives while the POST is + // in-flight, the callback bails out. + const welcomeConnectionId = this._connectionId; if (this._isReconnect && this.seqBySession.size > 0) { this.doReconnectPost(welcomeConnectionId); } else { diff --git a/server/index.ts b/server/index.ts index b70235cd..f404a209 100644 --- a/server/index.ts +++ b/server/index.ts @@ -82,7 +82,6 @@ import { ConnectionRegistry } from '@mitzo/harness'; import { isHelloHandshake, handleHello, - handleReconnect, dispatchV2Message, type V2HandlerContext, } from './ws-handler-v2.js'; @@ -388,21 +387,10 @@ app.get('/api/chat/events', (req, res) => { connectionId, }); - // Handle reconnect if sessions provided as query param - const sessionsParam = req.query.sessions as string | undefined; - if (sessionsParam) { - try { - const sessions = sessionsParam.split(',').map((entry) => { - const sep = entry.lastIndexOf('|'); - const sessionId = sep > 0 ? entry.slice(0, sep) : entry; - const seqStr = sep > 0 ? entry.slice(sep + 1) : '0'; - return { sessionId, lastSeq: parseInt(seqStr, 10) || 0 }; - }); - handleReconnect(connectionId, { type: 'reconnect', sessions }, v2Ctx); - } catch (err) { - log.warn('SSE chat reconnect parse error', { connectionId, error: String(err) }); - } - } + // Reconnect is handled via POST /api/chat/reconnect — the client sends + // a reconnect POST on every welcome event. The old ?sessions= query param + // path was removed because EventSource auto-reconnect reuses the original + // URL (without the param), making it unreliable. log.info('SSE chat stream connected', { connectionId }); From 9698a2a0e3a0c53d3904ac64e7317d3379fdaa6c Mon Sep 17 00:00:00 2001 From: dimakis Date: Mon, 22 Jun 2026 12:24:12 +0100 Subject: [PATCH 07/10] =?UTF-8?q?fix(client):=20address=20Centaur=20round?= =?UTF-8?q?=204=20=E2=80=94=20logging,=20test=20names,=20coverage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add console.warn to doReconnectPost catch block for diagnostics - Rename test names from stale .finally() references to match current await-based implementation - Add test: reconnect with empty seqBySession skips POST and connects immediately Co-Authored-By: Claude Opus 4.6 --- .../src/__tests__/sse-connection.test.ts | 29 +++++++++++++++++-- packages/client/src/sse-connection.ts | 3 +- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/packages/client/src/__tests__/sse-connection.test.ts b/packages/client/src/__tests__/sse-connection.test.ts index cf4e7171..f035d476 100644 --- a/packages/client/src/__tests__/sse-connection.test.ts +++ b/packages/client/src/__tests__/sse-connection.test.ts @@ -356,7 +356,7 @@ describe('SseConnection', () => { expect(listener).toHaveBeenCalledWith({ type: '_open' }); }); - it('discards dangling .finally() if disconnect() called during reconnect POST', async () => { + it('bails out if disconnect() called during in-flight reconnect POST', async () => { let resolveReconnect!: (v: { ok: true }) => void; const mockFetch = vi.fn().mockImplementation((_url: string) => { if (_url.includes('/reconnect')) { @@ -393,7 +393,7 @@ describe('SseConnection', () => { expect(listener).not.toHaveBeenCalledWith({ type: '_open' }); }); - it('discards stale .finally() when a newer welcome arrives', async () => { + it('ignores stale reconnect POST when a newer welcome arrives', async () => { const reconnectCalls: Array<(v: { ok: true }) => void> = []; const mockFetch = vi.fn().mockImplementation((_url: string) => { if (_url.includes('/reconnect')) { @@ -641,6 +641,31 @@ describe('SseConnection', () => { expect(postEndpoints).toEqual(['reconnect', 'send']); }); + it('connects immediately on reconnect when seqBySession is empty', () => { + const mockFetch = vi.fn().mockResolvedValue({ ok: true }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + + // Track and then clear — simulates all sessions being closed + conn.trackSeq('sess-1', 10); + conn.clearSession('sess-1'); + + // Force reconnect — _isReconnect=true but seqBySession is empty + conn.checkAndReconnect(true); + mockFetch.mockClear(); + listener.mockClear(); + + // New welcome — should skip reconnect POST and connect immediately + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + + expect(mockFetch).not.toHaveBeenCalled(); + expect(conn.isConnected()).toBe(true); + expect(listener).toHaveBeenCalledWith({ type: '_open' }); + }); + it('does not send reconnect POST on first connection', () => { const mockFetch = vi.fn().mockResolvedValue({ ok: true }); const conn = new SseConnection(createConfig({ fetch: mockFetch })); diff --git a/packages/client/src/sse-connection.ts b/packages/client/src/sse-connection.ts index 5d722329..2a789ee9 100644 --- a/packages/client/src/sse-connection.ts +++ b/packages/client/src/sse-connection.ts @@ -304,9 +304,10 @@ export class SseConnection implements ChatConnection { } // On !res.ok: stay disconnected. EventSource auto-reconnect will // trigger a new welcome and retry the reconnect POST. - } catch { + } catch (err) { // Network error: same as !res.ok — stay disconnected, let // EventSource auto-reconnect handle retry. + console.warn('[SseConnection] reconnect POST failed, will retry on next welcome', err); } } From 3ed95a1ba16b429c587d18cb3444fdb8bbefec02 Mon Sep 17 00:00:00 2001 From: dimakis Date: Mon, 22 Jun 2026 12:39:45 +0100 Subject: [PATCH 08/10] fix(client): force reconnect on POST failure, trim comments Address Centaur round 5: - Force checkAndReconnect(true) on POST failure/network error so the client doesn't stay stuck when the SSE stream remains healthy - Trim verbose welcome handler comment to 2 lines - Add tests: forced reconnect on POST failure, multi-retry recovery Co-Authored-By: Claude Opus 4.6 --- .../src/__tests__/sse-connection.test.ts | 69 +++++++++++++++++++ packages/client/src/sse-connection.ts | 27 +++----- 2 files changed, 78 insertions(+), 18 deletions(-) diff --git a/packages/client/src/__tests__/sse-connection.test.ts b/packages/client/src/__tests__/sse-connection.test.ts index f035d476..f4a6bab0 100644 --- a/packages/client/src/__tests__/sse-connection.test.ts +++ b/packages/client/src/__tests__/sse-connection.test.ts @@ -641,6 +641,75 @@ describe('SseConnection', () => { expect(postEndpoints).toEqual(['reconnect', 'send']); }); + it('forces reconnect when reconnect POST fails (SSE stream stays up)', async () => { + const mockFetch = vi.fn().mockImplementation((url: string) => { + if (url.includes('/reconnect')) { + return Promise.resolve({ ok: false, status: 500 }); + } + return Promise.resolve({ ok: true }); + }); + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect + conn.checkAndReconnect(true); + + // Welcome — reconnect POST will fail + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + await vi.runAllTimersAsync(); + + // Should have forced a new reconnect (new EventSource created) + // The failed POST triggers checkAndReconnect(true), which closes + // the old ES and creates a new one + expect(conn.isConnected()).toBe(false); + const esCount = MockEventSource.instances.length; + expect(esCount).toBeGreaterThanOrEqual(3); // initial + reconnect + forced retry + + warnSpy.mockRestore(); + }); + + it('recovers via checkAndReconnect(false) when POST fails but SSE stays up', async () => { + let reconnectCallCount = 0; + const mockFetch = vi.fn().mockImplementation((url: string) => { + if (url.includes('/reconnect')) { + reconnectCallCount++; + // Fail first two (initial + forced retry), succeed on third + if (reconnectCallCount <= 2) return Promise.resolve({ ok: false, status: 500 }); + return Promise.resolve({ ok: true }); + } + return Promise.resolve({ ok: true }); + }); + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect — POST fails, triggers checkAndReconnect(true) + conn.checkAndReconnect(true); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + await vi.runAllTimersAsync(); + + // Forced retry also fails + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-ghi' }); + await vi.runAllTimersAsync(); + expect(conn.isConnected()).toBe(false); + + // Third welcome — reconnect POST succeeds + listener.mockClear(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-jkl' }); + await vi.runAllTimersAsync(); + + expect(conn.isConnected()).toBe(true); + expect(listener).toHaveBeenCalledWith({ type: '_open' }); + warnSpy.mockRestore(); + }); + it('connects immediately on reconnect when seqBySession is empty', () => { const mockFetch = vi.fn().mockResolvedValue({ ok: true }); const conn = new SseConnection(createConfig({ fetch: mockFetch })); diff --git a/packages/client/src/sse-connection.ts b/packages/client/src/sse-connection.ts index 2a789ee9..5f3716a5 100644 --- a/packages/client/src/sse-connection.ts +++ b/packages/client/src/sse-connection.ts @@ -214,19 +214,9 @@ export class SseConnection implements ChatConnection { } this._connectionId = msg.connectionId as string; - // Always send reconnect on welcome if we have tracked sessions. - // Native EventSource auto-reconnect reuses the original URL (without - // ?sessions= param), so the server never runs handleReconnect — no - // watch, no reattach, no event replay. This POST ensures every - // reconnect (auto or explicit) triggers the full server-side - // reconnect flow: watch + reattach + event replay. - // - // _connected is deferred until reconnect completes — setting it earlier - // lets external send() bypass the pending queue before the server has - // reattached the session. - // Snapshot the current connectionId for the staleness guard in - // doReconnectPost — if a newer welcome arrives while the POST is - // in-flight, the callback bails out. + // _connected deferred until doReconnectPost succeeds — prevents + // external send() from bypassing the pending queue mid-reconnect. + // welcomeConnectionId is a staleness guard for concurrent welcomes. const welcomeConnectionId = this._connectionId; if (this._isReconnect && this.seqBySession.size > 0) { this.doReconnectPost(welcomeConnectionId); @@ -301,13 +291,14 @@ export class SseConnection implements ChatConnection { this._connected = true; this.flushPendingSends(); this.listener?.({ type: '_open' }); + } else { + console.warn('[SseConnection] reconnect POST returned', res.status); + this.checkAndReconnect(true); } - // On !res.ok: stay disconnected. EventSource auto-reconnect will - // trigger a new welcome and retry the reconnect POST. } catch (err) { - // Network error: same as !res.ok — stay disconnected, let - // EventSource auto-reconnect handle retry. - console.warn('[SseConnection] reconnect POST failed, will retry on next welcome', err); + if (!this.es || this._connectionId !== welcomeConnectionId) return; + console.warn('[SseConnection] reconnect POST failed', err); + this.checkAndReconnect(true); } } From 36f40303ad5989ad641515f60f35d1a36418b0c6 Mon Sep 17 00:00:00 2001 From: dimakis Date: Mon, 22 Jun 2026 12:44:32 +0100 Subject: [PATCH 09/10] fix(client): add backoff on reconnect POST failure, fix spurious _close Address Centaur round 6: - Replace immediate checkAndReconnect(true) with scheduleReconnect() that uses reconnectDelayMs to avoid tight retry loops on persistent server failures - Gate _close emission on wasConnected to prevent spurious events when the connection was never opened Co-Authored-By: Claude Opus 4.6 --- .../src/__tests__/sse-connection.test.ts | 10 ++++----- packages/client/src/sse-connection.ts | 22 ++++++++++++++++--- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/packages/client/src/__tests__/sse-connection.test.ts b/packages/client/src/__tests__/sse-connection.test.ts index f4a6bab0..76810e4a 100644 --- a/packages/client/src/__tests__/sse-connection.test.ts +++ b/packages/client/src/__tests__/sse-connection.test.ts @@ -641,7 +641,7 @@ describe('SseConnection', () => { expect(postEndpoints).toEqual(['reconnect', 'send']); }); - it('forces reconnect when reconnect POST fails (SSE stream stays up)', async () => { + it('schedules delayed reconnect when reconnect POST fails', async () => { const mockFetch = vi.fn().mockImplementation((url: string) => { if (url.includes('/reconnect')) { return Promise.resolve({ ok: false, status: 500 }); @@ -656,17 +656,15 @@ describe('SseConnection', () => { // Force reconnect conn.checkAndReconnect(true); + const esCountBefore = MockEventSource.instances.length; // Welcome — reconnect POST will fail lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); await vi.runAllTimersAsync(); - // Should have forced a new reconnect (new EventSource created) - // The failed POST triggers checkAndReconnect(true), which closes - // the old ES and creates a new one + // Should have scheduled a delayed reconnect (new ES after timer) expect(conn.isConnected()).toBe(false); - const esCount = MockEventSource.instances.length; - expect(esCount).toBeGreaterThanOrEqual(3); // initial + reconnect + forced retry + expect(MockEventSource.instances.length).toBeGreaterThan(esCountBefore); warnSpy.mockRestore(); }); diff --git a/packages/client/src/sse-connection.ts b/packages/client/src/sse-connection.ts index 5f3716a5..2c83db99 100644 --- a/packages/client/src/sse-connection.ts +++ b/packages/client/src/sse-connection.ts @@ -180,8 +180,11 @@ export class SseConnection implements ChatConnection { this.es.close(); this.es = null; } + const wasConnected = this._connected; this._connected = false; - this.listener?.({ type: '_close' }); + if (wasConnected) { + this.listener?.({ type: '_close' }); + } this.doConnect(); } @@ -293,15 +296,28 @@ export class SseConnection implements ChatConnection { this.listener?.({ type: '_open' }); } else { console.warn('[SseConnection] reconnect POST returned', res.status); - this.checkAndReconnect(true); + this.scheduleReconnect(); } } catch (err) { if (!this.es || this._connectionId !== welcomeConnectionId) return; console.warn('[SseConnection] reconnect POST failed', err); - this.checkAndReconnect(true); + this.scheduleReconnect(); } } + /** Tear down and reconnect after a delay to avoid tight retry loops. */ + private scheduleReconnect(): void { + if (this.reconnectTimer) return; + if (this.es) { + this.es.close(); + this.es = null; + } + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = null; + this.doConnect(); + }, this.config.reconnectDelayMs); + } + private async doPost(endpoint: string, body: Record): Promise { if (!this._connectionId) return; try { From 3d9e7de0f2cd187d97ff9272e079221ef896884c Mon Sep 17 00:00:00 2001 From: dimakis Date: Mon, 22 Jun 2026 12:52:53 +0100 Subject: [PATCH 10/10] fix(client): guard against stale POST when ES instance replaced mid-flight MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address Centaur round 7: - Capture ES instance in doReconnectPost closure and check this.es identity — prevents stale POST from setting _connected when checkAndReconnect replaced the EventSource during the await - Add test: stale POST resolves after checkAndReconnect, must not set _connected - Add test: _close not emitted when already disconnected - Rename misleading test name to match scheduleReconnect behavior Co-Authored-By: Claude Opus 4.6 --- .../src/__tests__/sse-connection.test.ts | 47 ++++++++++++++++++- packages/client/src/sse-connection.ts | 18 ++++--- 2 files changed, 57 insertions(+), 8 deletions(-) diff --git a/packages/client/src/__tests__/sse-connection.test.ts b/packages/client/src/__tests__/sse-connection.test.ts index 76810e4a..13a4c582 100644 --- a/packages/client/src/__tests__/sse-connection.test.ts +++ b/packages/client/src/__tests__/sse-connection.test.ts @@ -669,7 +669,52 @@ describe('SseConnection', () => { warnSpy.mockRestore(); }); - it('recovers via checkAndReconnect(false) when POST fails but SSE stays up', async () => { + it('stale doReconnectPost does not set _connected when checkAndReconnect fires mid-flight', async () => { + let resolveReconnect!: (v: { ok: boolean }) => void; + const mockFetch = vi.fn().mockImplementation((url: string) => { + if (url.includes('/reconnect')) { + return new Promise((resolve) => { + resolveReconnect = resolve; + }); + } + return Promise.resolve({ ok: true }); + }); + const conn = new SseConnection(createConfig({ fetch: mockFetch })); + conn.connect(); + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-abc' }); + conn.trackSeq('sess-1', 10); + + // Force reconnect — creates ES2 + conn.checkAndReconnect(true); + + // ES2 welcome — doReconnectPost(conn-def) starts + lastES()._emit('welcome', { type: 'welcome', protocolVersion: 2, connectionId: 'conn-def' }); + + // checkAndReconnect fires again while POST is in-flight — creates ES3 + conn.checkAndReconnect(true); + + // Stale POST resolves successfully — must NOT set _connected + resolveReconnect({ ok: true }); + await vi.runAllTimersAsync(); + + // ES3 hasn't welcomed yet, so _connected must remain false + expect(conn.isConnected()).toBe(false); + }); + + it('does not emit _close when checkAndReconnect called while already disconnected', () => { + const conn = new SseConnection(createConfig()); + const listener = vi.fn(); + conn.onMessage(listener); + conn.connect(); + // Don't send welcome — _connected stays false + + conn.checkAndReconnect(true); + + // _close should NOT have been emitted since we were never connected + expect(listener).not.toHaveBeenCalledWith({ type: '_close' }); + }); + + it('recovers via scheduleReconnect after repeated POST failures', async () => { let reconnectCallCount = 0; const mockFetch = vi.fn().mockImplementation((url: string) => { if (url.includes('/reconnect')) { diff --git a/packages/client/src/sse-connection.ts b/packages/client/src/sse-connection.ts index 2c83db99..8ca85ed8 100644 --- a/packages/client/src/sse-connection.ts +++ b/packages/client/src/sse-connection.ts @@ -219,10 +219,11 @@ export class SseConnection implements ChatConnection { // _connected deferred until doReconnectPost succeeds — prevents // external send() from bypassing the pending queue mid-reconnect. - // welcomeConnectionId is a staleness guard for concurrent welcomes. + // Capture both connectionId and ES instance for the staleness guard. const welcomeConnectionId = this._connectionId; + const welcomeEs = this.es; if (this._isReconnect && this.seqBySession.size > 0) { - this.doReconnectPost(welcomeConnectionId); + this.doReconnectPost(welcomeConnectionId, welcomeEs); } else { this._connected = true; this.flushPendingSends(); @@ -269,7 +270,10 @@ export class SseConnection implements ChatConnection { * flushing pending sends into the void when the server never ran * handleReconnect (no watch, no reattach, no replay). */ - private async doReconnectPost(welcomeConnectionId: string): Promise { + private async doReconnectPost( + welcomeConnectionId: string, + welcomeEs: EventSource | null, + ): Promise { try { const res = await this.config.fetch(`${this.config.baseUrl}/api/chat/reconnect`, { method: 'POST', @@ -286,9 +290,9 @@ export class SseConnection implements ChatConnection { }), }); - // Guard: bail if disconnect() was called or a newer welcome - // arrived while the POST was in-flight. - if (!this.es || this._connectionId !== welcomeConnectionId) return; + // Guard: bail if disconnect() was called, a newer welcome arrived, + // or checkAndReconnect replaced the EventSource while in-flight. + if (!this.es || this.es !== welcomeEs || this._connectionId !== welcomeConnectionId) return; if (res.ok) { this._connected = true; @@ -299,7 +303,7 @@ export class SseConnection implements ChatConnection { this.scheduleReconnect(); } } catch (err) { - if (!this.es || this._connectionId !== welcomeConnectionId) return; + if (!this.es || this.es !== welcomeEs || this._connectionId !== welcomeConnectionId) return; console.warn('[SseConnection] reconnect POST failed', err); this.scheduleReconnect(); }