diff --git a/.changeset/stream-read-v3-reconnect.md b/.changeset/stream-read-v3-reconnect.md new file mode 100644 index 0000000000..71fd0c843c --- /dev/null +++ b/.changeset/stream-read-v3-reconnect.md @@ -0,0 +1,5 @@ +--- +'@workflow/world-vercel': patch +--- + +Use v3 endpoint for stream reads, which supports automatic transparent reconnects. diff --git a/packages/world-vercel/src/streamer.test.ts b/packages/world-vercel/src/streamer.test.ts index af44a6c755..515981b784 100644 --- a/packages/world-vercel/src/streamer.test.ts +++ b/packages/world-vercel/src/streamer.test.ts @@ -190,7 +190,7 @@ describe('streams.get', () => { vi.restoreAllMocks(); }); - it('includes runId in the fetch URL', async () => { + it('reads the live stream from the v3 endpoint (error-on-timeout)', async () => { const fetchSpy = vi .spyOn(globalThis, 'fetch') .mockImplementation( @@ -202,10 +202,12 @@ describe('streams.get', () => { expect(fetchSpy).toHaveBeenCalledTimes(1); const url = new URL(fetchSpy.mock.calls[0][0] as string); - expect(url.pathname).toBe('/v2/runs/run-123/stream/my-stream'); + // v3, not v2: the reconnecting reader relies on the server erroring the + // body on a max-duration timeout rather than closing it cleanly. + expect(url.pathname).toBe('/v3/runs/run-123/stream/my-stream'); }); - it('passes startIndex as a query parameter', async () => { + it('passes startIndex as a query parameter on the v3 read', async () => { const fetchSpy = vi .spyOn(globalThis, 'fetch') .mockImplementation( @@ -216,7 +218,7 @@ describe('streams.get', () => { await streamer.streams.get('run-123', 'my-stream', 5); const url = new URL(fetchSpy.mock.calls[0][0] as string); - expect(url.pathname).toBe('/v2/runs/run-123/stream/my-stream'); + expect(url.pathname).toBe('/v3/runs/run-123/stream/my-stream'); expect(url.searchParams.get('startIndex')).toBe('5'); }); }); diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts index b74e2ae2e9..9be08bbe9a 100644 --- a/packages/world-vercel/src/streamer.ts +++ b/packages/world-vercel/src/streamer.ts @@ -23,12 +23,27 @@ export const MAX_CHUNKS_PER_REQUEST = 1000; // (partial writes, long-lived reads), and duplex streams are incompatible // with undici's experimental H2 support. +// Writes (PUT) and stream completion use the v2 stream endpoint. function getStreamUrl(name: string, runId: string, httpConfig: HttpConfig) { return new URL( `${httpConfig.baseUrl}/v2/runs/${encodeURIComponent(runId)}/stream/${encodeURIComponent(name)}` ); } +// The live-read (GET) endpoint is versioned at v3: on a max-duration timeout +// (or a mid-stream connection drop) the server errors the response body +// instead of closing it cleanly, which is what lets the reconnecting reader +// (`createReconnectingFramedStream`) resume from the next chunk rather than +// treating the timeout as end-of-stream. Reading from v2 would silently +// truncate long-lived streams at the server's 2-minute limit. Only the live +// read is affected by the timeout — writes, completion, and snapshot reads +// (chunks/info/list) stay on v2. +function getStreamReadUrl(name: string, runId: string, httpConfig: HttpConfig) { + return new URL( + `${httpConfig.baseUrl}/v3/runs/${encodeURIComponent(runId)}/stream/${encodeURIComponent(name)}` + ); +} + function createStreamRequestError( operation: 'write' | 'close', url: URL, @@ -190,7 +205,7 @@ export function createStreamer(config?: APIConfig): Streamer { async get(runId: string, name: string, startIndex?: number) { const httpConfig = await getHttpConfig(config); - const url = getStreamUrl(name, runId, httpConfig); + const url = getStreamReadUrl(name, runId, httpConfig); if (typeof startIndex === 'number') { url.searchParams.set('startIndex', String(startIndex)); }