Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/stream-read-v3-reconnect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/world-vercel': patch
---

Use v3 endpoint for stream reads, which supports automatic transparent reconnects.
10 changes: 6 additions & 4 deletions packages/world-vercel/src/streamer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ describe('streams.get', () => {
vi.restoreAllMocks();
});

it('includes runId in the fetch URL', async () => {
it('reads the live stream from the v3 endpoint (error-on-timeout)', async () => {
const fetchSpy = vi
.spyOn(globalThis, 'fetch')
.mockImplementation(
Expand All @@ -202,10 +202,12 @@ describe('streams.get', () => {

expect(fetchSpy).toHaveBeenCalledTimes(1);
const url = new URL(fetchSpy.mock.calls[0][0] as string);
expect(url.pathname).toBe('/v2/runs/run-123/stream/my-stream');
// v3, not v2: the reconnecting reader relies on the server erroring the
// body on a max-duration timeout rather than closing it cleanly.
expect(url.pathname).toBe('/v3/runs/run-123/stream/my-stream');
});

it('passes startIndex as a query parameter', async () => {
it('passes startIndex as a query parameter on the v3 read', async () => {
const fetchSpy = vi
.spyOn(globalThis, 'fetch')
.mockImplementation(
Expand All @@ -216,7 +218,7 @@ describe('streams.get', () => {
await streamer.streams.get('run-123', 'my-stream', 5);

const url = new URL(fetchSpy.mock.calls[0][0] as string);
expect(url.pathname).toBe('/v2/runs/run-123/stream/my-stream');
expect(url.pathname).toBe('/v3/runs/run-123/stream/my-stream');
expect(url.searchParams.get('startIndex')).toBe('5');
});
});
Expand Down
17 changes: 16 additions & 1 deletion packages/world-vercel/src/streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,27 @@ export const MAX_CHUNKS_PER_REQUEST = 1000;
// (partial writes, long-lived reads), and duplex streams are incompatible
// with undici's experimental H2 support.

// Writes (PUT) and stream completion use the v2 stream endpoint.
function getStreamUrl(name: string, runId: string, httpConfig: HttpConfig) {
return new URL(
`${httpConfig.baseUrl}/v2/runs/${encodeURIComponent(runId)}/stream/${encodeURIComponent(name)}`
);
}

// The live-read (GET) endpoint is versioned at v3: on a max-duration timeout
// (or a mid-stream connection drop) the server errors the response body
// instead of closing it cleanly, which is what lets the reconnecting reader
// (`createReconnectingFramedStream`) resume from the next chunk rather than
// treating the timeout as end-of-stream. Reading from v2 would silently
// truncate long-lived streams at the server's 2-minute limit. Only the live
// read is affected by the timeout — writes, completion, and snapshot reads
// (chunks/info/list) stay on v2.
function getStreamReadUrl(name: string, runId: string, httpConfig: HttpConfig) {
return new URL(
`${httpConfig.baseUrl}/v3/runs/${encodeURIComponent(runId)}/stream/${encodeURIComponent(name)}`
);
}

function createStreamRequestError(
operation: 'write' | 'close',
url: URL,
Expand Down Expand Up @@ -190,7 +205,7 @@ export function createStreamer(config?: APIConfig): Streamer {

async get(runId: string, name: string, startIndex?: number) {
const httpConfig = await getHttpConfig(config);
const url = getStreamUrl(name, runId, httpConfig);
const url = getStreamReadUrl(name, runId, httpConfig);
if (typeof startIndex === 'number') {
url.searchParams.set('startIndex', String(startIndex));
}
Expand Down
Loading