From 3acf057c5c93d33a736cfa9ac10953a313006ee4 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 18 Jun 2026 14:46:56 -0700 Subject: [PATCH 1/9] feat(core): turbo mode for the first invocation On the first delivery of a run's first invocation, background run_started, skip the initial event-log load, and force optimistic inline start so the run reaches its first steps with no preceding network round-trips. Safe because the first delivery has no concurrent handler to race the step create-claim; turbo exits the moment a suspension creates a hook or wait, and is a no-op for every other invocation. On by default; disable with WORKFLOW_TURBO=0. Co-Authored-By: Claude Opus 4.8 (1M context) --- .changeset/turbo-mode-first-invocation.md | 6 + docs/content/docs/v5/changelog/meta.json | 3 +- docs/content/docs/v5/changelog/turbo-mode.md | 56 +++ packages/core/src/runtime.test.ts | 260 ++++++++++++++ packages/core/src/runtime.ts | 327 ++++++++++++------ packages/core/src/runtime/constants.test.ts | 40 +++ packages/core/src/runtime/constants.ts | 23 ++ packages/core/src/runtime/step-executor.ts | 41 ++- .../core/src/runtime/step-handler.test.ts | 63 ++++ .../core/src/runtime/suspension-handler.ts | 43 +++ 10 files changed, 755 insertions(+), 107 deletions(-) create mode 100644 .changeset/turbo-mode-first-invocation.md create mode 100644 docs/content/docs/v5/changelog/turbo-mode.md diff --git a/.changeset/turbo-mode-first-invocation.md b/.changeset/turbo-mode-first-invocation.md new file mode 100644 index 0000000000..f58dfdc8c2 --- /dev/null +++ b/.changeset/turbo-mode-first-invocation.md @@ -0,0 +1,6 @@ +--- +'workflow': minor +'@workflow/core': minor +--- + +Add turbo mode (on by default, disable with `WORKFLOW_TURBO=0`): on the first delivery of a run's first invocation the runtime backgrounds `run_started`, skips the initial event-log load, and forces optimistic inline start so the run reaches its first steps with no preceding network round-trips. It is safe there because the first delivery has no concurrent handler to race; turbo exits the moment a suspension creates a hook or wait, and it is a no-op for every other invocation. diff --git a/docs/content/docs/v5/changelog/meta.json b/docs/content/docs/v5/changelog/meta.json index b65b777d83..10ab651c65 100644 --- a/docs/content/docs/v5/changelog/meta.json +++ b/docs/content/docs/v5/changelog/meta.json @@ -4,7 +4,8 @@ "index", "eager-processing", "resilient-start", - "lazy-event-creation" + "lazy-event-creation", + "turbo-mode" ], "defaultOpen": false } diff --git a/docs/content/docs/v5/changelog/turbo-mode.md b/docs/content/docs/v5/changelog/turbo-mode.md new file mode 100644 index 0000000000..7513f7a682 --- /dev/null +++ b/docs/content/docs/v5/changelog/turbo-mode.md @@ -0,0 +1,56 @@ +--- +title: Turbo mode (fast first invocation) +description: Fast-path the very first delivery of the first invocation — background run_started, skip the initial event-log load, and force optimistic inline start — so a run blazes through its first steps. A no-op for everything else. +--- + +# Turbo mode + +## Motivation + +The first invocation of a workflow run is where time-to-first-step matters most, yet it pays the most fixed network latency before any user code runs. Three round-trips sit on that critical path today: + +1. **`run_started` is awaited.** The handler writes `run_started` and waits for it to return the run entity before doing anything else. +2. **The event log is loaded.** A full `events.list` runs before the first replay — even though on the very first delivery nothing has written any events yet. +3. **Optimistic inline start is off by default.** The [optimistic inline start](./lazy-event-creation#optimistic-inline-start-opt-in-off-by-default) optimization (running a step body before its `step_started` is confirmed) is off by default because under contention two handlers can both run a body and corrupt non-idempotent side effects. + +Turbo mode removes all three costs **for the first delivery of the first invocation only**, where each is provably safe to remove, then gets out of the way. For every subsequent invocation it is a complete no-op. + +## What turbo mode does + +When the handler detects the first delivery of the first invocation, it: + +1. **Backgrounds `run_started`.** The event is written without awaiting; the run entity is synthesized locally from the queued run input (status `running`, `startedAt` now) so replay can begin immediately. The `run_started` round-trip overlaps replay instead of blocking it. This reuses the [resilient start](./resilient-start) contract — `run_started` carrying the run input creates the run on the fly (synthetic `run_created`) if it doesn't exist yet. +2. **Skips the initial event-log load.** Nothing has been written, so the first replay runs against an empty log. The second loop iteration does a normal incremental load once the first step's events exist. +3. **Forces optimistic inline start** for that invocation, independent of `WORKFLOW_OPTIMISTIC_INLINE_START`. The step body runs immediately against locally-synthesized state; only the `step_started` network write waits for the backgrounded `run_started`. + +The net effect: the first step body starts after just the in-process replay, with `run_started` and `step_started` happening in the background around it, and no `events.list` before it. + +## Why this is safe (and where it stops) + +### Detection + +The first-invocation message is the only one that carries the queued **run input**, and the queue delivery **attempt is 1** (a redelivery is attempt ≥ 2). Together with "not a background-step invocation" and "not a divergence recovery", that uniquely identifies the first delivery of the first invocation — with no new message field and no world/backend change. + +### The single-handler guarantee + +Forcing optimistic start is unsafe *in general* because two handlers racing the same step's create-claim can both run the body before one wins. On the first delivery of the first invocation there is **no concurrent peer handler** — the run was created moments ago by `start()` and only this one message is in flight. So the body runs exactly once, and forcing optimistic start is safe here even though the global flag is off. + +### Turbo exits on the first hook or wait + +That single-handler guarantee ends the moment the run creates a **hook** or **wait** (or writes attributes): those introduce later resume/parallel invocations that *can* race. So turbo stops forcing optimistic start as soon as a suspension creates any of them — the inline steps of that suspension fall back to the normal await-then-run path, and the rest of the run behaves exactly as it does today. A pure-step suspension (the common hot path) stays on the fast path. + +### Write ordering is preserved + +Because `run_started` is backgrounded, every event write is gated on a run-ready barrier so nothing is written before the run exists: + +- The optimistic `step_started` is **chained** on the barrier — the body still runs immediately, only the network write waits. +- The suspension handler **awaits** the barrier before any eager write (`hook_created`, `wait_created`, overflow `step_created`). The pure inline hot path defers all its steps and writes nothing here, so it never blocks on the barrier. +- Terminal run writes (`run_completed` / `run_failed`) await the barrier too, so a workflow that finishes with no steps still orders its completion after `run_started`. + +The event log therefore still reads `run_created → run_started → step_created → step_started → step_completed`. If the backgrounded `run_started` genuinely fails (e.g. the run was cancelled in the meantime), the chained writes surface the real error (`gone` / run-not-found) and the message redelivers as a normal, non-turbo attempt. + +## Configuration + +Turbo mode is **on by default**. Set `WORKFLOW_TURBO=0` (or `false`) to disable it — every invocation then takes the existing awaited path. This is a useful kill-switch for deployments whose first-step bodies are not idempotent and stream-safe (the same caveat as optimistic inline start), or for isolating behavior while debugging. + +Turbo mode is purely client-side and builds on the lazy/optimistic inline start support already shipped — it requires no world or backend changes. diff --git a/packages/core/src/runtime.test.ts b/packages/core/src/runtime.test.ts index f2738f471f..37363c4e2c 100644 --- a/packages/core/src/runtime.test.ts +++ b/packages/core/src/runtime.test.ts @@ -1330,3 +1330,263 @@ describe('workflowEntrypoint step-dispatch ack ordering', () => { expect(stepIdMessages).toHaveLength(0); }); }); + +describe('workflowEntrypoint turbo mode', () => { + const ORIG_TURBO = process.env.WORKFLOW_TURBO; + const ORIG_OPT = process.env.WORKFLOW_OPTIMISTIC_INLINE_START; + + // Default: turbo ON (unset) and the global optimistic flag OFF (unset). Any + // optimistic behavior observed in these tests therefore comes from turbo + // forcing it — never from WORKFLOW_OPTIMISTIC_INLINE_START. + beforeEach(() => { + delete process.env.WORKFLOW_TURBO; + delete process.env.WORKFLOW_OPTIMISTIC_INLINE_START; + turboOrder = []; + }); + afterEach(() => { + if (ORIG_TURBO === undefined) delete process.env.WORKFLOW_TURBO; + else process.env.WORKFLOW_TURBO = ORIG_TURBO; + if (ORIG_OPT === undefined) { + delete process.env.WORKFLOW_OPTIMISTIC_INLINE_START; + } else { + process.env.WORKFLOW_OPTIMISTIC_INLINE_START = ORIG_OPT; + } + setWorld(undefined); + vi.clearAllMocks(); + waitUntilPromises.length = 0; + }); + + const xform = (name: string) => + `;globalThis.__private_workflows = new Map(); + globalThis.__private_workflows.set(${JSON.stringify(name)}, ${name});`; + + // The step body records 'body' the moment it runs — its position relative to + // 'run_started_resolved' / 'step_started_called' is what proves (or disproves) + // optimistic start. Registered once; reads the current `turboOrder` binding. + let turboOrder: string[] = []; + registerStepFunction('turboStep', async () => { + turboOrder.push('body'); + return undefined; + }); + + const oneStepWorkflow = `const s = globalThis[Symbol.for("WORKFLOW_USE_STEP")]("turboStep"); + async function workflow() { return await s(); }${xform('workflow')}`; + + // A step raced against a sleep: the suspension creates a wait, which makes + // turbo exit (no forced optimistic start) for the inline step. + const stepAndSleepWorkflow = `const s = globalThis[Symbol.for("WORKFLOW_USE_STEP")]("turboStep"); + const sleep = globalThis[Symbol.for("WORKFLOW_SLEEP")]; + async function workflow() { + const [r] = await Promise.all([s(), sleep('1h')]); + return r; + }${xform('workflow')}`; + + async function makeRunInput(runId: string) { + return { + input: await dehydrateWorkflowArguments([], runId, undefined, []), + deploymentId: 'test-deployment', + workflowName: 'workflow', + specVersion: SPEC_VERSION_CURRENT, + executionContext: {}, + }; + } + + /** + * Drives the handler with a first-invocation message (runInput present) at the + * given delivery `attempt`. `runStartedGate`, when provided, holds the + * `run_started` create until released — its resolution pushes + * 'run_started_resolved' so tests can assert the body ran before or after it. + */ + async function driveTurbo(opts: { + runId: string; + attempt: number; + source: string; + runStartedGate?: Promise; + }) { + const { runId, attempt, source } = opts; + const order = turboOrder; + const durable: Event[] = []; + let seq = 0; + const rec = (data: any): Event => { + seq += 1; + const e = { + eventId: `e-${seq}`, + runId, + createdAt: new Date(), + ...data, + } as Event; + durable.push(e); + return e; + }; + const runEntity: WorkflowRun = { + runId, + workflowName: 'workflow', + status: 'running', + input: await dehydrateWorkflowArguments([], runId, undefined, []), + createdAt: new Date('2024-01-01T00:00:00.000Z'), + updatedAt: new Date('2024-01-01T00:00:00.000Z'), + startedAt: new Date('2024-01-01T00:00:00.000Z'), + deploymentId: 'test-deployment', + }; + + const eventsCreate = vi.fn(async (_runId: string, data: any) => { + if (data.eventType === 'run_started') { + if (opts.runStartedGate) await opts.runStartedGate; + order.push('run_started_resolved'); + return { run: runEntity, events: [] as Event[] }; + } + if (data.eventType === 'step_started') { + order.push('step_started_called'); + const d = data.eventData as { stepName?: string; input?: unknown }; + if (d?.input !== undefined) { + rec({ + eventType: 'step_created', + specVersion: SPEC_VERSION_CURRENT, + correlationId: data.correlationId, + eventData: { stepName: d.stepName, input: d.input }, + }); + } + return { + event: rec(data), + step: { + runId, + stepId: data.correlationId, + stepName: d?.stepName, + status: 'running' as const, + attempt: 1, + input: d?.input, + startedAt: new Date(), + createdAt: new Date(), + updatedAt: new Date(), + }, + ...(d?.input !== undefined ? { stepCreated: true } : {}), + }; + } + if (data.eventType === 'wait_created') order.push('wait_created'); + return { event: rec(data) }; + }); + + setWorld({ + specVersion: SPEC_VERSION_CURRENT, + createQueueHandler: vi.fn( + (_p: string, handler: (m: unknown, md: unknown) => Promise) => + async () => { + await handler( + { + runId, + requestedAt: new Date('2024-01-01T00:00:00.000Z'), + runInput: await makeRunInput(runId), + }, + { + requestId: 'req_turbo', + attempt, + queueName: '__wkf_workflow_workflow', + messageId: 'msg_turbo', + } + ); + return new Response(null, { status: 204 }); + } + ), + events: { + create: eventsCreate, + list: vi.fn(async () => ({ + data: [...durable], + hasMore: false, + cursor: 'cursor_turbo', + })), + }, + runs: { get: vi.fn(async () => runEntity) }, + queue: vi.fn(async () => ({ messageId: null })), + getEncryptionKeyForRun: vi.fn(async () => undefined), + } as any); + + const handlerPromise = workflowEntrypoint(source)( + new Request('https://example.test') + ) as Promise; + return { handlerPromise, order, eventsCreate }; + } + + it('backgrounds run_started and forces optimistic start on the first delivery', async () => { + let release!: () => void; + const gate = new Promise((r) => { + release = r; + }); + + const { handlerPromise, order, eventsCreate } = await driveTurbo({ + runId: 'wrun_turbo_first', + attempt: 1, + source: oneStepWorkflow, + runStartedGate: gate, + }); + + // The body runs while run_started is still in flight — proving run_started + // was backgrounded AND optimistic start was forced (the env flag is off). + await vi.waitFor(() => expect(order).toContain('body')); + expect(order).not.toContain('run_started_resolved'); + // The lazy step_started is chained on the run-ready barrier, so it is not + // even issued until run_started lands. + expect(order).not.toContain('step_started_called'); + + release(); + const res = await handlerPromise; + expect(res.status).toBe(204); + // After release: step_started fires, ordered strictly after run_started. + expect(order).toContain('step_started_called'); + expect(order.indexOf('run_started_resolved')).toBeLessThan( + order.indexOf('step_started_called') + ); + // run_started was created exactly once (idempotent first write). + const runStartedCreates = eventsCreate.mock.calls.filter( + (c) => (c[1] as any).eventType === 'run_started' + ); + expect(runStartedCreates).toHaveLength(1); + }); + + it('does not turbo on a redelivery (attempt > 1): run_started is awaited first', async () => { + const { handlerPromise, order } = await driveTurbo({ + runId: 'wrun_turbo_redeliver', + attempt: 2, + source: oneStepWorkflow, + }); + + const res = await handlerPromise; + expect(res.status).toBe(204); + // Non-turbo awaits run_started up front, so the body runs strictly after it. + expect(order.indexOf('run_started_resolved')).toBeLessThan( + order.indexOf('body') + ); + }); + + it('does not turbo when WORKFLOW_TURBO=0 (parity with the awaited path)', async () => { + process.env.WORKFLOW_TURBO = '0'; + const { handlerPromise, order } = await driveTurbo({ + runId: 'wrun_turbo_off', + attempt: 1, + source: oneStepWorkflow, + }); + + const res = await handlerPromise; + expect(res.status).toBe(204); + expect(order.indexOf('run_started_resolved')).toBeLessThan( + order.indexOf('body') + ); + }); + + it('exits turbo (no forced optimistic) when the suspension creates a wait', async () => { + const { handlerPromise, order } = await driveTurbo({ + runId: 'wrun_turbo_wait', + attempt: 1, + source: stepAndSleepWorkflow, + }); + + const res = await handlerPromise; + expect(res.status).toBe(204); + // A wait was created this suspension, so turbo exited: the inline step took + // the normal await-then-run path, i.e. step_started was awaited BEFORE the + // body ran (the opposite ordering from the forced-optimistic case above). + expect(order).toContain('wait_created'); + expect(order.indexOf('step_started_called')).toBeLessThan( + order.indexOf('body') + ); + }); +}); diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index da1f7ca615..c9e84758bc 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -28,6 +28,7 @@ import { describeError } from './describe-error.js'; import { WorkflowSuspension } from './global.js'; import { runtimeLogger } from './logger.js'; import { + isTurboEnabled, MAX_QUEUE_DELIVERIES, REPLAY_DIVERGENCE_MAX_RETRIES, } from './runtime/constants.js'; @@ -503,6 +504,49 @@ export function workflowEntrypoint( let preloadedEvents: Event[] | undefined; let preloadedEventsCursor: string | null | undefined; + // Turbo mode fast-paths the very first delivery of the very + // first invocation, where it is provably safe to: background + // `run_started`, skip the initial event-log load (nothing has + // been written yet), and force optimistic inline start (no + // concurrent peer handler exists to race the create-claim). + // `runInput` is only present on the start()-enqueued message, + // and `attempt === 1` (1-based) means this is the first + // delivery; `incomingStepId` would mark a background-step + // invocation and `replayDivergence` a recovery replay — both + // ineligible. The single-handler guarantee that makes forced + // optimistic start safe ends once a hook/wait/attr is created, + // so turbo exits at that point (see `forceOptimisticStart`). + const turbo = + isTurboEnabled() && + runInput !== undefined && + metadata.attempt === 1 && + incomingStepId === undefined && + !replayDivergence; + + // Turbo mode only: resolves once the backgrounded + // `run_started` has landed (or rejects if it failed). Threaded + // into handleSuspension and executeStep so no step/hook/wait + // write races ahead of the run's creation. Undefined outside + // turbo, where `run_started` is awaited up front. + let runReadyBarrier: Promise | undefined; + + // Order a terminal run write (run_completed / run_failed) after + // the backgrounded run_started in turbo mode — a no-step + // workflow can otherwise reach run_completed before the run + // exists. Best-effort: a barrier rejection is swallowed for + // ordering only; if run_started truly failed the terminal write + // surfaces the real error (run not found / gone) and the message + // redelivers. No-op outside turbo. + const awaitRunReady = async (): Promise => { + if (runReadyBarrier) { + try { + await runReadyBarrier; + } catch { + // intentional: ordering barrier only — see above. + } + } + }; + // If incoming message has a stepId, this is a background step // execution. Execute the step, then check if all parallel steps // from the batch are done. If so, replay inline (saving a queue @@ -652,118 +696,169 @@ export function workflowEntrypoint( // Contract: events.create('run_started') must be idempotent // for runs already in 'running' status (return the run // without error), not just for pending → running transitions. - try { - const result = await world.events.create( + const runStartedEvent = { + eventType: 'run_started' as const, + // Use the spec version from the original start() call + // when available, so the resilient start path creates + // the run with the correct version (not always current). + specVersion: + runInput?.specVersion ?? SPEC_VERSION_CURRENT, + // Pass run input from queue so the server can + // create the run if run_created was missed. + // Uint8Array values survive the queue natively + // (CBOR on world-vercel, JSON reviver on world-local). + ...(runInput + ? { + eventData: { + input: runInput.input, + deploymentId: runInput.deploymentId, + workflowName: runInput.workflowName, + executionContext: runInput.executionContext, + attributes: runInput.attributes, + allowReservedAttributes: + runInput.allowReservedAttributes, + }, + } + : {}), + }; + + if (turbo && runInput) { + // Turbo: background `run_started` and synthesize the run + // entity locally so replay can begin without waiting for + // the round-trip. Safe here because this is the first + // delivery of the first invocation — start() created the + // run moments ago and no events have been written yet. The + // barrier is consumed by every downstream write (suspension + // handler, optimistic step_started, terminal run writes) so + // nothing is written before the run exists. + const startedPromise = world.events.create( runId, - { - eventType: 'run_started', - // Use the spec version from the original start() call - // when available, so the resilient start path creates - // the run with the correct version (not always current). - specVersion: - runInput?.specVersion ?? SPEC_VERSION_CURRENT, - // Pass run input from queue so the server can - // create the run if run_created was missed. - // Uint8Array values survive the queue natively - // (CBOR on world-vercel, JSON reviver on world-local). - ...(runInput - ? { - eventData: { - input: runInput.input, - deploymentId: runInput.deploymentId, - workflowName: runInput.workflowName, - executionContext: runInput.executionContext, - attributes: runInput.attributes, - allowReservedAttributes: - runInput.allowReservedAttributes, - }, - } - : {}), - }, + runStartedEvent, { requestId } ); - if (!result.run) { - throw new WorkflowRuntimeError( - `Event creation for 'run_started' did not return the run entity for run "${runId}"` + runReadyBarrier = startedPromise; + // Attach a no-op rejection handler so an early failure + // never surfaces as an unhandledRejection before a consumer + // (await/then) is attached; consumers still observe it. + startedPromise.catch(() => {}); + // Skip the initial events.list: nothing has been written to + // the log yet on a first delivery (run_started is still in + // flight). An empty preloaded set routes iteration 1 through + // the no-load preloaded branch; iteration 2 then takes the + // existing post-preloaded full reload to pick up a cursor + // (no spurious "cursor missing" warning). `[]` is + // intentionally truthy here — do not change the load + // branches' `if (preloadedEvents)` checks to test length. + preloadedEvents = []; + const now = new Date(); + workflowRun = { + runId, + status: 'running', + deploymentId: runInput.deploymentId, + workflowName: runInput.workflowName, + specVersion: runInput.specVersion, + executionContext: runInput.executionContext, + input: runInput.input, + attributes: runInput.attributes ?? {}, + startedAt: now, + createdAt: now, + updatedAt: now, + }; + workflowStartedAt = +now; + span?.setAttributes({ + ...Attribute.WorkflowRunStatus('running'), + ...Attribute.WorkflowStartedAt(workflowStartedAt), + }); + } else { + try { + const result = await world.events.create( + runId, + runStartedEvent, + { requestId } ); - } - workflowRun = result.run; + if (!result.run) { + throw new WorkflowRuntimeError( + `Event creation for 'run_started' did not return the run entity for run "${runId}"` + ); + } + workflowRun = result.run; - // If the response includes events, use them to skip - // the initial events.list call and reduce TTFB. - if ( - result.events && - result.events.length > 0 && - result.hasMore !== true - ) { - preloadedEvents = result.events; - preloadedEventsCursor = result.cursor; - } + // If the response includes events, use them to skip + // the initial events.list call and reduce TTFB. + if ( + result.events && + result.events.length > 0 && + result.hasMore !== true + ) { + preloadedEvents = result.events; + preloadedEventsCursor = result.cursor; + } - if (!workflowRun.startedAt) { - throw new WorkflowRuntimeError( - `Workflow run "${runId}" has no "startedAt" timestamp` - ); - } - } catch (err) { - // Run was concurrently completed/failed/cancelled - if ( - EntityConflictError.is(err) || - RunExpiredError.is(err) - ) { - // EntityConflictError: run was concurrently - // completed/failed/cancelled during setup. - // RunExpiredError: run already in terminal state. - // In both cases, skip processing this message. - runtimeLogger.info( - 'Run already finished during setup, skipping', - { workflowRunId: runId, message: err.message } - ); - return; - } else { - const errorCode = getWorkflowSetupErrorCode(err); - if (!errorCode) { - throw err; + if (!workflowRun.startedAt) { + throw new WorkflowRuntimeError( + `Workflow run "${runId}" has no "startedAt" timestamp` + ); + } + } catch (err) { + // Run was concurrently completed/failed/cancelled + if ( + EntityConflictError.is(err) || + RunExpiredError.is(err) + ) { + // EntityConflictError: run was concurrently + // completed/failed/cancelled during setup. + // RunExpiredError: run already in terminal state. + // In both cases, skip processing this message. + runtimeLogger.info( + 'Run already finished during setup, skipping', + { workflowRunId: runId, message: err.message } + ); + return; + } else { + const errorCode = getWorkflowSetupErrorCode(err); + if (!errorCode) { + throw err; + } + await recordFatalRunError({ + world, + workflowRun, + runId, + requestId, + err, + errorCode, + logMessage: + 'Fatal runtime error during workflow setup', + }); + return; } - await recordFatalRunError({ - world, - workflowRun, - runId, - requestId, - err, - errorCode, - logMessage: - 'Fatal runtime error during workflow setup', - }); - return; } - } - workflowStartedAt = +workflowRun.startedAt; + workflowStartedAt = +workflowRun.startedAt; - span?.setAttributes({ - ...Attribute.WorkflowRunStatus(workflowRun.status), - ...Attribute.WorkflowStartedAt(workflowStartedAt), - }); + span?.setAttributes({ + ...Attribute.WorkflowRunStatus(workflowRun.status), + ...Attribute.WorkflowStartedAt(workflowStartedAt), + }); - if (workflowRun.status !== 'running') { - // Workflow has already completed or failed, so we can skip it - runtimeLogger.info( - 'Workflow already completed or failed, skipping', - { - workflowRunId: runId, - status: workflowRun.status, - } - ); + if (workflowRun.status !== 'running') { + // Workflow has already completed or failed, so we can skip it + runtimeLogger.info( + 'Workflow already completed or failed, skipping', + { + workflowRunId: runId, + status: workflowRun.status, + } + ); - // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event - // inside the workflow context so the user can gracefully exit. this is SIGTERM - // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL - // so that we actually exit here without replaying the workflow at all, in the case - // the replaying the workflow is itself failing. + // TODO: for `cancel`, we actually want to propagate a WorkflowCancelled event + // inside the workflow context so the user can gracefully exit. this is SIGTERM + // TODO: furthermore, there should be a timeout or a way to force cancel SIGKILL + // so that we actually exit here without replaying the workflow at all, in the case + // the replaying the workflow is itself failing. - return; - } + return; + } + } // end else (non-turbo run_started) } // end if (!workflowRun) // Resolve the encryption key for this run's deployment. @@ -1085,6 +1180,10 @@ export function workflowEntrypoint( // Workflow completed try { + // Turbo: a workflow that finishes with no steps reaches + // here before the backgrounded run_started; order the + // terminal write after it so the run exists. + await awaitRunReady(); await world.events.create( runId, { @@ -1144,6 +1243,7 @@ export function workflowEntrypoint( run: workflowRun, span, requestId, + runReadyBarrier, }); } catch (suspensionError) { if (!FatalError.is(suspensionError)) { @@ -1170,6 +1270,9 @@ export function workflowEntrypoint( } ); try { + // Turbo: order the terminal write after the + // backgrounded run_started so the run exists. + await awaitRunReady(); await world.events.create( runId, { @@ -1419,6 +1522,23 @@ export function workflowEntrypoint( !suspensionResult.waitTimeout && !hasOpenHookOrWait(cachedEvents ?? []); + // Turbo mode forces optimistic inline start for this + // batch — but only while the run is still "clean" (a pure + // step suspension). The moment a hook or wait (or attr) is + // created, later resume/parallel invocations become + // possible, so the single-handler guarantee that makes + // forced optimistic start safe no longer holds — turbo + // exits and the steps take the normal (env-gated) + // await-then-run path. The hook-conflict / attr cases + // already returned early above and the awaited-hook case + // emptied lazyInlineSteps; the checks below are defensive. + const forceOptimisticStart = + turbo && + !suspensionResult.waitTimeout && + !suspensionResult.hasHookEvents && + !suspensionResult.hasAttributeEvents && + !suspensionResult.hasAwaitedHookCreation; + // Execute the inline steps in parallel. The replay // budget is paused for the whole batch — step duration is // bounded by the platform's function maxDuration, not the @@ -1444,6 +1564,12 @@ export function workflowEntrypoint( // input on step_started so the world creates the // step on the fly. lazyStepInput: s.dehydratedInput, + // Turbo: force optimistic start and hold the lazy + // step_started until the backgrounded run_started + // lands (the body still runs immediately). Both + // are undefined/false outside turbo. + forceOptimisticStart, + runReadyBarrier, ...(requestInlineDelta && preInlineWriteCursor ? { inlineDeltaSinceCursor: @@ -1736,6 +1862,9 @@ export function workflowEntrypoint( // type identity and custom properties round-trip // through the event log. try { + // Turbo: order the terminal write after the + // backgrounded run_started so the run exists. + await awaitRunReady(); await world.events.create( runId, { diff --git a/packages/core/src/runtime/constants.test.ts b/packages/core/src/runtime/constants.test.ts index 441e19d547..bcfb1205bc 100644 --- a/packages/core/src/runtime/constants.test.ts +++ b/packages/core/src/runtime/constants.test.ts @@ -5,6 +5,7 @@ import { getMaxInlineSteps, getReplayTimeoutMs, isOptimisticInlineStartEnabled, + isTurboEnabled, MAX_INLINE_STEPS, MAX_MAX_INLINE_STEPS, MAX_REPLAY_TIMEOUT_MS, @@ -204,3 +205,42 @@ describe('isOptimisticInlineStartEnabled', () => { expect(isOptimisticInlineStartEnabled()).toBe(false); }); }); + +describe('isTurboEnabled', () => { + const originalEnv = process.env.WORKFLOW_TURBO; + + afterEach(() => { + if (originalEnv === undefined) { + delete process.env.WORKFLOW_TURBO; + } else { + process.env.WORKFLOW_TURBO = originalEnv; + } + }); + + it('defaults to enabled when unset', () => { + delete process.env.WORKFLOW_TURBO; + expect(isTurboEnabled()).toBe(true); + }); + + it('defaults to enabled when empty', () => { + process.env.WORKFLOW_TURBO = ''; + expect(isTurboEnabled()).toBe(true); + }); + + it('is disabled by an explicit "0"', () => { + process.env.WORKFLOW_TURBO = '0'; + expect(isTurboEnabled()).toBe(false); + }); + + it('is disabled by "false" (case-insensitive)', () => { + process.env.WORKFLOW_TURBO = 'FALSE'; + expect(isTurboEnabled()).toBe(false); + }); + + it('stays enabled for "1" and other truthy values', () => { + process.env.WORKFLOW_TURBO = '1'; + expect(isTurboEnabled()).toBe(true); + process.env.WORKFLOW_TURBO = 'yes'; + expect(isTurboEnabled()).toBe(true); + }); +}); diff --git a/packages/core/src/runtime/constants.ts b/packages/core/src/runtime/constants.ts index 92bdf96a7e..1982fbe514 100644 --- a/packages/core/src/runtime/constants.ts +++ b/packages/core/src/runtime/constants.ts @@ -202,6 +202,29 @@ export function isOptimisticInlineStartEnabled(): boolean { return raw === '1' || raw.toLowerCase() === 'true'; } +/** + * Whether "turbo mode" is enabled. Turbo mode fast-paths the *first delivery of + * the first invocation* of a run (detected by the entrypoint via `runInput` + * presence + `metadata.attempt === 1`): it backgrounds the `run_started` event + * creation, skips the initial event-log load (nothing has been written yet), + * and forces optimistic inline step start for that invocation — independent of + * `WORKFLOW_OPTIMISTIC_INLINE_START`. + * + * Forcing optimistic start is safe here because the first delivery has no + * concurrent peer handler to race the step create-claim, so a step body runs + * exactly once. That single-handler guarantee ends as soon as the run creates a + * hook or wait (which introduce resume/parallel invocations), so the runtime + * exits turbo at that point. + * + * Reads `process.env.WORKFLOW_TURBO` lazily. Default **ON**; disabled only by an + * explicit `'0'` / `'false'` (case-insensitive). + */ +export function isTurboEnabled(): boolean { + const raw = process.env.WORKFLOW_TURBO; + if (raw === undefined || raw === '') return true; + return !(raw === '0' || raw.toLowerCase() === 'false'); +} + // A replay-consumer mismatch can be caused by a transient divergent replay // rather than an invalid persisted history. Queue bounded recovery replays // before recording terminal corruption for a run that cannot replay. diff --git a/packages/core/src/runtime/step-executor.ts b/packages/core/src/runtime/step-executor.ts index 96386c3032..c2fc3473c2 100644 --- a/packages/core/src/runtime/step-executor.ts +++ b/packages/core/src/runtime/step-executor.ts @@ -99,6 +99,23 @@ export interface StepExecutorParams { * handler is the sole inline writer for the run on this iteration. */ inlineDeltaSinceCursor?: string; + /** + * Force optimistic inline start regardless of + * `WORKFLOW_OPTIMISTIC_INLINE_START`. Set by turbo mode on the first delivery + * of the first invocation, where forcing it is safe: there is no concurrent + * peer handler to race the create-claim, so the body runs exactly once. Only + * meaningful together with `lazyStepInput` (a brand-new lazy step). + */ + forceOptimisticStart?: boolean; + /** + * Turbo mode only: a promise that resolves once the backgrounded + * `run_started` has landed. When set, the lazy/optimistic `step_started` is + * chained on it so the step is never created before its run exists. The body + * still runs immediately against locally-synthesized state — only the network + * write waits — so the `run_started` round-trip overlaps the body. `undefined` + * outside turbo, where `run_started` was already awaited up front. + */ + runReadyBarrier?: Promise; } /** @@ -311,7 +328,9 @@ export async function executeStep( // execute a step more than once when handlers race — inline step bodies // must be idempotent; disable via WORKFLOW_OPTIMISTIC_INLINE_START=0. const optimisticStart = - params.lazyStepInput !== undefined && isOptimisticInlineStartEnabled(); + params.lazyStepInput !== undefined && + (params.forceOptimisticStart === true || + isOptimisticInlineStartEnabled()); let step: Step; // Settled outcome of the in-flight optimistic `step_started`. Handlers are @@ -338,12 +357,20 @@ export async function executeStep( }; if (optimisticStart) { - const startedPromise = world.events.create(workflowRunId, { - eventType: 'step_started', - specVersion: SPEC_VERSION_CURRENT, - correlationId: stepId, - eventData: { stepName, workflowName, input: params.lazyStepInput }, - }); + // Chain the lazy `step_started` on the run-ready barrier (turbo mode): + // the step can't be created before its run exists, but the body below + // runs immediately against synthesized state, so the `run_started` + // round-trip overlaps the body rather than blocking it. Outside turbo the + // barrier is undefined and this is a plain create. + const startedPromise = (params.runReadyBarrier ?? Promise.resolve()).then( + () => + world.events.create(workflowRunId, { + eventType: 'step_started', + specVersion: SPEC_VERSION_CURRENT, + correlationId: stepId, + eventData: { stepName, workflowName, input: params.lazyStepInput }, + }) + ); optimisticStartSettled = startedPromise.then( () => ({ ok: true as const }), (err) => ({ ok: false as const, err }) diff --git a/packages/core/src/runtime/step-handler.test.ts b/packages/core/src/runtime/step-handler.test.ts index b4cb64df43..d7322427e5 100644 --- a/packages/core/src/runtime/step-handler.test.ts +++ b/packages/core/src/runtime/step-handler.test.ts @@ -1294,4 +1294,67 @@ describe('executeStep optimistic inline start', () => { expect(result.type).toBe('skipped'); expect(mockStepFn).not.toHaveBeenCalled(); }); + + it('forces optimistic start via forceOptimisticStart even when the flag is off', async () => { + // Turbo passes forceOptimisticStart independent of the env var. + process.env.WORKFLOW_OPTIMISTIC_INLINE_START = '0'; + mockEventsCreate + .mockReset() + .mockImplementation((_runId: string, event: { eventType: string }) => { + if (event.eventType === 'step_started') { + return Promise.reject(new EntityConflictError('lost create race')); + } + return Promise.resolve({ event: {} }); + }); + + const world = await getWorld(); + const result = await executeStep({ + world: world as never, + ...baseParams, + forceOptimisticStart: true, + }); + + // Forced optimistic: the body ran before the (lost) create-claim resolved — + // unlike the env-disabled case above, where the body never runs. + expect(mockStepFn).toHaveBeenCalledTimes(1); + expect(result.type).toBe('skipped'); + }); + + it('holds the optimistic step_started until runReadyBarrier resolves, but runs the body immediately', async () => { + process.env.WORKFLOW_OPTIMISTIC_INLINE_START = '0'; + let release!: () => void; + const barrier = new Promise((r) => { + release = r; + }); + const calls: string[] = []; + mockEventsCreate + .mockReset() + .mockImplementation((_runId: string, event: { eventType: string }) => { + calls.push(event.eventType); + return Promise.resolve({ event: {} }); + }); + mockStepFn.mockReset().mockImplementation(async () => { + calls.push('body'); + return 'step-result'; + }); + + const world = await getWorld(); + const resultPromise = executeStep({ + world: world as never, + ...baseParams, + forceOptimisticStart: true, + runReadyBarrier: barrier, + }); + + // The body runs immediately against synthesized state; step_started is NOT + // issued until the run-ready barrier resolves. + await vi.waitFor(() => expect(calls).toContain('body')); + expect(calls).not.toContain('step_started'); + + release(); + const result = await resultPromise; + expect(result.type).toBe('completed'); + expect(calls).toContain('step_started'); + expect(calls.indexOf('body')).toBeLessThan(calls.indexOf('step_started')); + }); }); diff --git a/packages/core/src/runtime/suspension-handler.ts b/packages/core/src/runtime/suspension-handler.ts index e31b8ba42a..bca1453693 100644 --- a/packages/core/src/runtime/suspension-handler.ts +++ b/packages/core/src/runtime/suspension-handler.ts @@ -35,6 +35,16 @@ export interface SuspensionHandlerParams { run: WorkflowRun; span?: Span; requestId?: string; + /** + * Turbo mode only: a promise that resolves once the backgrounded + * `run_started` has landed (the run exists). When present, every world write + * this suspension performs (`hook_created`, `wait_created`, eager overflow + * `step_created`, …) is gated on it so the write never races ahead of the + * run's creation. The pure inline hot path defers all of its steps and writes + * nothing here, so it never awaits this barrier. `undefined` outside turbo, + * where `run_started` was already awaited up front. + */ + runReadyBarrier?: Promise; } /** @@ -85,6 +95,14 @@ export interface SuspensionHandlerResult { hasAwaitedHookCreation: boolean; /** Whether native workflow attribute events were written for replay. */ hasAttributeEvents: boolean; + /** + * Whether this suspension created any hook (`hook_created`) events. Unlike + * `hasHookConflict` / `hasAwaitedHookCreation`, this is true even for a plain + * fire-and-forget hook with no conflict and no awaiter. Turbo mode uses it to + * detect "a hook was created this suspension" and stop forcing optimistic + * inline start (a hook introduces later resume invocations that could race). + */ + hasHookEvents: boolean; } async function createHookEvent({ @@ -164,8 +182,26 @@ export async function handleSuspension({ run, span, requestId, + runReadyBarrier, }: SuspensionHandlerParams): Promise { const runId = run.runId; + // Turbo mode: hold every world write below until the backgrounded + // `run_started` has *settled*, so we never write a step/hook/wait event for a + // run that does not exist yet. A no-op outside turbo (barrier undefined) and + // on the pure inline hot path, which defers all steps and writes nothing. + // Awaiting the same (usually already-settled) promise more than once is cheap. + // A barrier rejection is swallowed for ordering only: if `run_started` truly + // failed the run does not exist, so the subsequent write surfaces the real + // error (run not found / gone) and the message redelivers. + const ensureRunReady = async (): Promise => { + if (runReadyBarrier) { + try { + await runReadyBarrier; + } catch { + // intentional: ordering barrier only — see above. + } + } + }; // Separate queue items by type const stepItems = suspension.steps.filter( (item): item is StepInvocationQueueItem => item.type === 'step' @@ -234,6 +270,7 @@ export async function handleSuspension({ let hasAwaitedHookCreation = false; if (hookEvents.length > 0) { + await ensureRunReady(); const results = await Promise.all( hookEvents.map(({ hookEvent, queueItem }) => createHookEvent({ @@ -253,6 +290,7 @@ export async function handleSuspension({ // Process hook disposals — these release hook tokens for reuse by other workflows. if (hooksNeedingDisposal.length > 0) { + await ensureRunReady(); await Promise.all( hooksNeedingDisposal.map(async (queueItem) => { const hookDisposedEvent: CreateEventRequest = { @@ -306,6 +344,7 @@ export async function handleSuspension({ ); if (hooksNeedingAbort.length > 0) { + await ensureRunReady(); await Promise.all( hooksNeedingAbort.map(async (queueItem) => { try { @@ -459,6 +498,7 @@ export async function handleSuspension({ }, }; try { + await ensureRunReady(); await world.events.create(runId, stepEvent, { requestId }); createdStepCorrelationIds.add(queueItem.correlationId); } catch (err) { @@ -491,6 +531,7 @@ export async function handleSuspension({ }, }; try { + await ensureRunReady(); await world.events.create(runId, waitEvent, { requestId }); } catch (err) { if (EntityConflictError.is(err)) { @@ -512,6 +553,7 @@ export async function handleSuspension({ ops.push( (async () => { try { + await ensureRunReady(); await world.events.create( runId, { @@ -612,6 +654,7 @@ export async function handleSuspension({ hasHookConflict, hasAwaitedHookCreation, hasAttributeEvents: attributeItems.length > 0, + hasHookEvents: hookEvents.length > 0, }; } From d446c4d0f7789c317a217bb10f550e671a57f127 Mon Sep 17 00:00:00 2001 From: "vercel[bot]" <35613825+vercel[bot]@users.noreply.github.com> Date: Thu, 18 Jun 2026 21:57:46 +0000 Subject: [PATCH 2/9] Fix: Turbo's `forceOptimisticStart` re-enables on later pure-step iterations of the same delivery after a hook/wait was already opened, because it is recomputed solely from the current batch's `suspensionResult` with no latch over the cumulative event log. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit fixes the issue reported at packages/core/src/runtime.ts:1534 ## Bug In `packages/core/src/runtime.ts` (~line 1534), `forceOptimisticStart` is computed **per replay-loop iteration** from the *current* batch only: ```ts const forceOptimisticStart = turbo && !suspensionResult.waitTimeout && !suspensionResult.hasHookEvents && !suspensionResult.hasAttributeEvents && !suspensionResult.hasAwaitedHookCreation; ``` All `suspensionResult.*` flags reflect **only what the current suspension batch created**. Confirmed in `suspension-handler.ts`: * `hasHookEvents: hookEvents.length > 0` where `hookEvents` is built from `hooksNeedingCreation = allHookItems.filter(item => !item.hasCreatedEvent)` — i.e. only hooks that do **not** yet have a `hook_created` event. A hook created in an *earlier* iteration already has its created event, so it is excluded and `hasHookEvents` is `false` on subsequent iterations. `turbo` is computed once per delivery (~line 516), and the replay loop runs many iterations within a single delivery. ### Reachability A fire-and-forget hook (`createHook('h')` not awaited) writes `hook_created` but does **not** block the workflow, so the replay loop continues to later pure-step suspensions in the **same delivery**: * Iteration A: `hook_created` written → `hasHookEvents = true` → `forceOptimisticStart = false`. ✅ * Iteration B (a later pure-step suspension): the hook already has its created event, so it is not in `hooksNeedingCreation` → `hasHookEvents = false` → `forceOptimisticStart = true` again. ❌ This directly contradicts the invariant documented in the surrounding comment ("The moment a hook or wait ... is created ... the single-handler guarantee that makes forced optimistic start safe no longer holds — turbo exits"). Because the hook is open, a concurrent resume handler can be triggered and race the inline create-claim, and `forceOptimisticStart` overrides the user's `WORKFLOW_OPTIMISTIC_INLINE_START=0` kill switch (see `step-executor.ts:332`), risking double-execution of a non-idempotent step body. ## Fix Latch turbo off permanently once any hook or wait is open anywhere in the cumulative event log, using the existing `hasOpenHookOrWait` helper over `cachedEvents` (the cumulative replay log, set to `events` each iteration at ~line 1148): ```ts const forceOptimisticStart = turbo && !suspensionResult.waitTimeout && !suspensionResult.hasHookEvents && !suspensionResult.hasAttributeEvents && !suspensionResult.hasAwaitedHookCreation && !hasOpenHookOrWait(cachedEvents ?? []); ``` This mirrors the exact gate already applied to `requestInlineDelta` a few lines above (line 1522), where `!hasOpenHookOrWait(cachedEvents ?? [])` is used for the same "no out-of-band concurrent writer" safety reasoning. By keying off the cumulative log rather than the current batch, turbo now exits the moment a hook/wait exists and stays off for the remainder of the run, matching the documented single-handler guarantee. Co-authored-by: Vercel Co-authored-by: VaguelySerious --- packages/core/src/runtime.ts | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index c9e84758bc..96bffa6402 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -1532,12 +1532,24 @@ export function workflowEntrypoint( // await-then-run path. The hook-conflict / attr cases // already returned early above and the awaited-hook case // emptied lazyInlineSteps; the checks below are defensive. + // + // The `suspensionResult.*` flags only reflect what THIS + // batch created, so they do not catch a hook/wait opened + // in an earlier iteration of the same delivery (e.g. a + // fire-and-forget `createHook(...)` that doesn't block the + // workflow, letting the replay loop continue to later pure + // step suspensions). Once any hook or wait is open in the + // cumulative log, resume/parallel invocations are possible + // for the rest of the run, so turbo must latch off + // permanently — checked here via `hasOpenHookOrWait` over + // the cumulative `cachedEvents`. const forceOptimisticStart = turbo && !suspensionResult.waitTimeout && !suspensionResult.hasHookEvents && !suspensionResult.hasAttributeEvents && - !suspensionResult.hasAwaitedHookCreation; + !suspensionResult.hasAwaitedHookCreation && + !hasOpenHookOrWait(cachedEvents ?? []); // Execute the inline steps in parallel. The replay // budget is paused for the whole batch — step duration is From e3c462a08415a72a6a38da033900363224ad09c4 Mon Sep 17 00:00:00 2001 From: "vercel[bot]" <35613825+vercel[bot]@users.noreply.github.com> Date: Thu, 18 Jun 2026 23:28:30 +0000 Subject: [PATCH 3/9] Fix: Correlation IDs are derived from `ulid(+startedAt)`, but under turbo mode the first delivery synthesizes `startedAt` from the runtime-local clock while later non-turbo deliveries load the server-canonical `startedAt`, so replay regenerates different correlation IDs and throws `ReplayDivergenceError`. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit fixes the issue reported at packages/core/src/runtime.ts:763 ## The bug In `packages/core/src/workflow.ts` (`runWorkflow`), the workflow orchestrator context exposed: ```ts generateUlid: () => ulid(+startedAt), ``` where `startedAt = workflowRun.startedAt`. Every durable correlation ID is derived from this: - `step.ts:25` → `step_${ctx.generateUlid()}` - `workflow/hook.ts:73` → `hook_${ctx.generateUlid()}` - `workflow/sleep.ts:18` → `wait_${ctx.generateUlid()}` - `workflow/attribute-dispatcher.ts:20` → `attr_${ctx.generateUlid()}` The 48-bit time prefix of every correlation ID therefore equals `+startedAt`. For replay to succeed, the value fed to `ulid()` **must be identical on every delivery** — otherwise `EventsConsumer.onUnconsumedEvent` fires and rejects with `ReplayDivergenceError`. ## Why turbo breaks it `startedAt` is **not** replay-stable under turbo: - **Turbo first delivery** (`runtime.ts` ~L753): the run is synthesized locally with `startedAt: now`, where `now = new Date()` is the runtime-local clock. The first delivery's `generateUlid` thus encodes the local `now`, and any `step_started`/`wait`/`hook_created` events persisted in this delivery carry correlation IDs encoding that local `now`. - **Backend persistence**: the backgrounded `run_started` write records the storage layer's own clock as the canonical `startedAt` (`world-local events-storage` uses `currentRun.startedAt ?? now`), which differs from the runtime's local `now`. - **Next (non-turbo) delivery**: the run is loaded from the backend with the server-canonical `startedAt`. `generateUlid()` now produces ULIDs with a different time prefix, so the regenerated correlation IDs no longer match the persisted ones → `ReplayDivergenceError`. The divergence only requires a ≥1 ms difference between the two ms-resolution clocks, so it is intermittent but real — and turbo is on by default. This was already a known hazard: the RNG `seed` and the VM clock `fixedTimestamp` were *deliberately* decoupled from `startedAt`/`createdAt` (see the comment "Dropping the timestamp means the seed no longer depends on startedAt/createdAt, so it ... can be computed before any server round-trip"). `generateUlid` was simply missed in that refactor. ## The fix Feed `generateUlid` the same replay-stable value already used for the seed and VM clock: ```ts generateUlid: () => ulid(fixedTimestamp), ``` where `fixedTimestamp = runIdCreatedAt(workflowRun.runId) ?? +workflowRun.createdAt`. Production run IDs are always `wrun_` (minted client-side in `start()`), so `runIdCreatedAt` recovers the same epoch-ms value the instant the queue message arrives — identical on turbo and non-turbo deliveries alike. Correlation IDs become replay-stable in all delivery paths. `workflowStartedAt` (line 296, a user-facing `Date` exposed to workflow code) intentionally keeps using `startedAt` — it is not a correlation ID and is not part of replay matching. ## Test compatibility The two integration tests that compute expected correlation IDs through the real `runWorkflow` path use non-ULID run IDs (`wrun_stale_wait_replay`, `wrun_test`). For those, `runIdCreatedAt` returns `undefined` and `fixedTimestamp` falls back to `+createdAt`, which equals `+startedAt` in those fixtures — so `ulid(fixedTimestamp)` yields the same IDs as before and the assertions still hold. The unit-test fixtures that hand-build their own `generateUlid: () => ulid(workflowStartedAt)` do not go through `runWorkflow` and are unaffected. Co-authored-by: Vercel Co-authored-by: VaguelySerious --- packages/core/src/workflow.ts | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/packages/core/src/workflow.ts b/packages/core/src/workflow.ts index d342f90b0c..7ba888ce87 100644 --- a/packages/core/src/workflow.ts +++ b/packages/core/src/workflow.ts @@ -211,7 +211,16 @@ export async function runWorkflow( globalThis: vmGlobalThis, onWorkflowError: workflowDiscontinuation.reject, eventsConsumer, - generateUlid: () => ulid(+startedAt), + // Correlation IDs (step_/wait_/hook_) are derived from `generateUlid`, so + // the time prefix fed to `ulid()` MUST be replay-stable across every + // delivery — otherwise a redelivery regenerates different correlation IDs + // and replay throws ReplayDivergenceError. `startedAt` is NOT safe here: + // under turbo the first delivery synthesizes `startedAt` from the local + // clock, but later (non-turbo) deliveries load the server-canonical + // `startedAt`, which differs by >=1ms. Use the same replay-stable value + // that already seeds the RNG and the VM clock (`fixedTimestamp`, recovered + // from the run ID's ULID and known the instant the message arrives). + generateUlid: () => ulid(fixedTimestamp), generateNanoid, invocationsQueue: new Map(), // Use getter/setter so the EventsConsumer's getPromiseQueue() always From dd43273d343d498335627e24eb6192f4ff88fcd2 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 18 Jun 2026 16:33:23 -0700 Subject: [PATCH 4/9] docs(changelog): record run-ahead exploration and why turbo awaits each step_completed Turbo overlaps start round-trips with step bodies but still awaits each step_completed before advancing. Documents the considered "run-ahead" extension (defer step writes to a background queue, run sequential steps ahead) and why it was not pursued: crash re-execution blast radius, and divergent branches when a step runs against a non-durable result a redelivery can re-decide. Co-Authored-By: Claude Opus 4.8 (1M context) --- docs/content/docs/v5/changelog/turbo-mode.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/docs/content/docs/v5/changelog/turbo-mode.md b/docs/content/docs/v5/changelog/turbo-mode.md index 7513f7a682..d33617f493 100644 --- a/docs/content/docs/v5/changelog/turbo-mode.md +++ b/docs/content/docs/v5/changelog/turbo-mode.md @@ -54,3 +54,14 @@ The event log therefore still reads `run_created → run_started → step_create Turbo mode is **on by default**. Set `WORKFLOW_TURBO=0` (or `false`) to disable it — every invocation then takes the existing awaited path. This is a useful kill-switch for deployments whose first-step bodies are not idempotent and stream-safe (the same caveat as optimistic inline start), or for isolating behavior while debugging. Turbo mode is purely client-side and builds on the lazy/optimistic inline start support already shipped — it requires no world or backend changes. + +## Considered: running ahead of durable writes (not implemented) + +Turbo overlaps the *start* round-trips with a step's body, but it still **awaits each `step_completed` before advancing** to the next step. We explored going further — "run-ahead": within a single invocation, execute the workflow forward across a sequential chain *without* awaiting each step's event writes, draining `step_started`/`step_completed` through a background FIFO queue and only blocking on a full drain before acking. A run of three sub-millisecond steps would then fire all the bodies back-to-back while the six event posts caught up in the background, turning per-step latency into `max(Σ body, Σ post)` instead of `Σ(body + post)`. + +We decided **not** to ship it, for two reasons: + +1. **Re-execution blast radius on failure.** Awaiting each completion means a crash re-runs essentially one in-flight step. Running ahead leaves many completions undrained at once, so a crash or `maxDuration` SIGTERM re-runs *all* of them on redelivery — a much larger at-least-once blast radius, precisely on the latency-sensitive runs most likely to pack many steps into one invocation. +2. **Divergent branches from non-durable results.** Advancing past a step before its result is durable lets the workflow commit to a forward path that a crash-and-redeliver can re-decide differently. A `Promise.race([B, C])` resolved by local timing can pick `B`, run `D(B)`, then crash before `step_completed_B` is durable — and the redelivery may re-resolve to `C`, so `D` executed against a winner the durable history never records. The same shape appears for a branch on a non-deterministic step output (`B(v1)` runs, crash, redelivery commits `B(v2)`). Idempotency doesn't cover these — `D(B)`/`D(C)` and `B(v1)`/`B(v2)` are *different* operations, not retries of one. A "run ahead only while at most one result is undurable" gate would contain the race case (a race needs ≥2 concurrent undurable steps) but not the non-deterministic-output case, and that residual hazard plus the re-execution blast radius outweighed the gain. + +So turbo deliberately stops at forced-optimistic *start* and awaits each `step_completed` before moving on: re-execution after a crash stays deterministic (each step re-runs against the same durable inputs) and bounded (roughly one step, not the whole chain). The idea is recorded here in case a future change (e.g. a determinism signal on steps, or deterministic race resolution) makes run-ahead safe enough to revisit. From a3e0abea76d2981e585acec0d10563c889808c61 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 18 Jun 2026 16:43:10 -0700 Subject: [PATCH 5/9] test(core): update Promise.race fixture for runId-derived correlation IDs The generateUlid fix (correlation IDs keyed on the replay-stable fixedTimestamp = runIdCreatedAt(runId) instead of startedAt) changes the ULID time prefix for fixtures whose ULID runId encodes a different time than their startedAt. This race-replay fixture used a 2025 ULID runId with a 2024 startedAt, so its step_ correlation ID prefixes move from the startedAt-derived 01HK153X00 to the runId-derived 01K75533W5 (suffixes, seed-derived, are unchanged). Realigns the fixture; no behavior change. Co-Authored-By: Claude Opus 4.8 (1M context) --- packages/core/src/workflow.test.ts | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/core/src/workflow.test.ts b/packages/core/src/workflow.test.ts index 3a51969164..04042ba057 100644 --- a/packages/core/src/workflow.test.ts +++ b/packages/core/src/workflow.test.ts @@ -978,7 +978,7 @@ describe('runWorkflow', () => { const events: Event[] = [ { eventType: 'step_started', - correlationId: 'step_01HK153X00WAVWBK9YGJQC6R7M', + correlationId: 'step_01K75533W5WAVWBK9YGJQC6R7M', runId: 'wrun_01K75533W56DAE35VY3082DN3P', eventId: 'evnt_01K755385N02MMWXYHFCQSP9P0', eventData: { @@ -988,7 +988,7 @@ describe('runWorkflow', () => { }, { eventType: 'step_started', - correlationId: 'step_01HK153X00WAVWBK9YGJQC6R7N', + correlationId: 'step_01K75533W5WAVWBK9YGJQC6R7N', runId: 'wrun_01K75533W56DAE35VY3082DN3P', eventId: 'evnt_01K755386GHGAFYYDC58V17E3T', eventData: { @@ -998,7 +998,7 @@ describe('runWorkflow', () => { }, { eventType: 'step_started', - correlationId: 'step_01HK153X00WAVWBK9YGJQC6R7P', + correlationId: 'step_01K75533W5WAVWBK9YGJQC6R7P', runId: 'wrun_01K75533W56DAE35VY3082DN3P', eventId: 'evnt_01K75538D4Q4X8PJ1ZNDZD5R0W', eventData: { @@ -1008,7 +1008,7 @@ describe('runWorkflow', () => { }, { eventType: 'step_started', - correlationId: 'step_01HK153X00WAVWBK9YGJQC6R7Q', + correlationId: 'step_01K75533W5WAVWBK9YGJQC6R7Q', runId: 'wrun_01K75533W56DAE35VY3082DN3P', eventId: 'evnt_01K75538Y9GEHXJQXT3JB89M4C', eventData: { @@ -1018,7 +1018,7 @@ describe('runWorkflow', () => { }, { eventType: 'step_started', - correlationId: 'step_01HK153X00WAVWBK9YGJQC6R7R', + correlationId: 'step_01K75533W5WAVWBK9YGJQC6R7R', runId: 'wrun_01K75533W56DAE35VY3082DN3P', eventId: 'evnt_01K75539CD2PAH419SKJ2X5V5T', eventData: { @@ -1028,7 +1028,7 @@ describe('runWorkflow', () => { }, { eventType: 'step_completed', - correlationId: 'step_01HK153X00WAVWBK9YGJQC6R7R', + correlationId: 'step_01K75533W5WAVWBK9YGJQC6R7R', eventData: { stepName: 'promiseRaceStressTestDelayStep', result: await dehydrateStepReturnValue( @@ -1044,7 +1044,7 @@ describe('runWorkflow', () => { }, { eventType: 'step_completed', - correlationId: 'step_01HK153X00WAVWBK9YGJQC6R7Q', + correlationId: 'step_01K75533W5WAVWBK9YGJQC6R7Q', eventData: { stepName: 'promiseRaceStressTestDelayStep', result: await dehydrateStepReturnValue( @@ -1060,7 +1060,7 @@ describe('runWorkflow', () => { }, { eventType: 'step_completed', - correlationId: 'step_01HK153X00WAVWBK9YGJQC6R7P', + correlationId: 'step_01K75533W5WAVWBK9YGJQC6R7P', eventData: { stepName: 'promiseRaceStressTestDelayStep', result: await dehydrateStepReturnValue( @@ -1076,7 +1076,7 @@ describe('runWorkflow', () => { }, { eventType: 'step_completed', - correlationId: 'step_01HK153X00WAVWBK9YGJQC6R7N', + correlationId: 'step_01K75533W5WAVWBK9YGJQC6R7N', eventData: { stepName: 'promiseRaceStressTestDelayStep', result: await dehydrateStepReturnValue( @@ -1092,7 +1092,7 @@ describe('runWorkflow', () => { }, { eventType: 'step_completed', - correlationId: 'step_01HK153X00WAVWBK9YGJQC6R7M', + correlationId: 'step_01K75533W5WAVWBK9YGJQC6R7M', eventData: { stepName: 'promiseRaceStressTestDelayStep', result: await dehydrateStepReturnValue( From 53e4029870485be8b24e454aee7ac2a05d00f044 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 18 Jun 2026 16:54:08 -0700 Subject: [PATCH 6/9] fix(core): address turbo review feedback (optimistic opt-out, unregistered-step barrier, docs) - forceOptimisticStart now defers to an explicit WORKFLOW_OPTIMISTIC_INLINE_START=0: turbo still forces optimistic start when the flag is unset, but an operator's explicit opt-out (the "body runs before start is confirmed" property) wins. Adds isOptimisticInlineStartExplicitlyDisabled(). - Gate the unregistered-step ("step not found") lazy step_started on runReadyBarrier so it never precedes the backgrounded run_started under turbo. - Document that the forced-optimistic first step body's stream/ops writes run before run_started (stream-safety caveat + WORKFLOW_TURBO=0), and that a run cancelled/expired before its first delivery still runs the first step body (reconciled away) under turbo. Co-Authored-By: Claude Opus 4.8 (1M context) --- docs/content/docs/v5/changelog/turbo-mode.md | 8 ++++ packages/core/src/runtime/constants.test.ts | 38 +++++++++++++++++++ packages/core/src/runtime/constants.ts | 15 ++++++++ packages/core/src/runtime/step-executor.ts | 24 ++++++++++-- .../core/src/runtime/step-handler.test.ts | 34 +++++++++++++++-- 5 files changed, 112 insertions(+), 7 deletions(-) diff --git a/docs/content/docs/v5/changelog/turbo-mode.md b/docs/content/docs/v5/changelog/turbo-mode.md index d33617f493..0e721d39d0 100644 --- a/docs/content/docs/v5/changelog/turbo-mode.md +++ b/docs/content/docs/v5/changelog/turbo-mode.md @@ -49,10 +49,18 @@ Because `run_started` is backgrounded, every event write is gated on a run-ready The event log therefore still reads `run_created → run_started → step_created → step_started → step_completed`. If the backgrounded `run_started` genuinely fails (e.g. the run was cancelled in the meantime), the chained writes surface the real error (`gone` / run-not-found) and the message redelivers as a normal, non-turbo attempt. +The barrier orders **event** writes. The forced-optimistic first step **body** runs immediately, so any side effects it performs *before* the terminal write — stream writes via `getWritable()` and the per-step ops flush — are **not** gated on the barrier and can reach the world before the backgrounded `run_started` lands (and are orphaned if it ultimately fails). This is the same exposure as optimistic inline start and is covered by the stream-safety caveat below; deployments whose first step writes to the workflow stream and require strict `run_created → run_started` ordering of stream data should set `WORKFLOW_TURBO=0`. + +### A run cancelled before its first delivery still runs the first step body + +The non-turbo path awaits `run_started` up front and, if the run was cancelled or expired between `start()` and this delivery, returns before any workflow/step code runs. Turbo synthesizes `status: 'running'` and runs the first step body optimistically, so such a cancellation is only observed when the backgrounded `run_started` (and the barrier-chained `step_started`) rejects — *after* the body's side effects have executed (they are then discarded via reconciliation). For non-idempotent first steps this is the same "body runs before ownership is confirmed" tradeoff as optimistic inline start; `WORKFLOW_TURBO=0` restores the up-front skip. + ## Configuration Turbo mode is **on by default**. Set `WORKFLOW_TURBO=0` (or `false`) to disable it — every invocation then takes the existing awaited path. This is a useful kill-switch for deployments whose first-step bodies are not idempotent and stream-safe (the same caveat as optimistic inline start), or for isolating behavior while debugging. +Turbo forces optimistic inline start on the first invocation regardless of `WORKFLOW_OPTIMISTIC_INLINE_START` (its single-handler guarantee removes the double-execution race that flag guards against). It does, however, **honor an explicit `WORKFLOW_OPTIMISTIC_INLINE_START=0`**: because forced optimistic start still runs the body before `step_started`/`run_started` is confirmed, an operator who has explicitly disabled optimistic start keeps the await-then-run path even under turbo (the rest of turbo — backgrounded `run_started`, skipped initial load — still applies). With the flag unset (the default), turbo forces it on. + Turbo mode is purely client-side and builds on the lazy/optimistic inline start support already shipped — it requires no world or backend changes. ## Considered: running ahead of durable writes (not implemented) diff --git a/packages/core/src/runtime/constants.test.ts b/packages/core/src/runtime/constants.test.ts index bcfb1205bc..d7ffc7fde9 100644 --- a/packages/core/src/runtime/constants.test.ts +++ b/packages/core/src/runtime/constants.test.ts @@ -5,6 +5,7 @@ import { getMaxInlineSteps, getReplayTimeoutMs, isOptimisticInlineStartEnabled, + isOptimisticInlineStartExplicitlyDisabled, isTurboEnabled, MAX_INLINE_STEPS, MAX_MAX_INLINE_STEPS, @@ -206,6 +207,43 @@ describe('isOptimisticInlineStartEnabled', () => { }); }); +describe('isOptimisticInlineStartExplicitlyDisabled', () => { + const originalEnv = process.env.WORKFLOW_OPTIMISTIC_INLINE_START; + + afterEach(() => { + if (originalEnv === undefined) { + delete process.env.WORKFLOW_OPTIMISTIC_INLINE_START; + } else { + process.env.WORKFLOW_OPTIMISTIC_INLINE_START = originalEnv; + } + }); + + it('is false when unset (off-by-default, but not an explicit opt-out)', () => { + delete process.env.WORKFLOW_OPTIMISTIC_INLINE_START; + expect(isOptimisticInlineStartExplicitlyDisabled()).toBe(false); + }); + + it('is false when empty', () => { + process.env.WORKFLOW_OPTIMISTIC_INLINE_START = ''; + expect(isOptimisticInlineStartExplicitlyDisabled()).toBe(false); + }); + + it('is true for an explicit "0"', () => { + process.env.WORKFLOW_OPTIMISTIC_INLINE_START = '0'; + expect(isOptimisticInlineStartExplicitlyDisabled()).toBe(true); + }); + + it('is true for "false" (case-insensitive)', () => { + process.env.WORKFLOW_OPTIMISTIC_INLINE_START = 'False'; + expect(isOptimisticInlineStartExplicitlyDisabled()).toBe(true); + }); + + it('is false when enabled', () => { + process.env.WORKFLOW_OPTIMISTIC_INLINE_START = '1'; + expect(isOptimisticInlineStartExplicitlyDisabled()).toBe(false); + }); +}); + describe('isTurboEnabled', () => { const originalEnv = process.env.WORKFLOW_TURBO; diff --git a/packages/core/src/runtime/constants.ts b/packages/core/src/runtime/constants.ts index 1982fbe514..c8fddcf69d 100644 --- a/packages/core/src/runtime/constants.ts +++ b/packages/core/src/runtime/constants.ts @@ -202,6 +202,21 @@ export function isOptimisticInlineStartEnabled(): boolean { return raw === '1' || raw.toLowerCase() === 'true'; } +/** + * Whether an operator has **explicitly disabled** optimistic inline start via + * `WORKFLOW_OPTIMISTIC_INLINE_START=0` / `=false`. Distinct from "unset": unset + * leaves the optimization off by default but lets turbo force it on; an explicit + * `0`/`false` is an operator opt-out that turbo must honor (turbo's forced + * optimistic start still runs a step body before `step_started`/`run_started` is + * confirmed, the property such an operator is opting out of), so + * `forceOptimisticStart` defers to this. Reads the env var lazily. + */ +export function isOptimisticInlineStartExplicitlyDisabled(): boolean { + const raw = process.env.WORKFLOW_OPTIMISTIC_INLINE_START; + if (raw === undefined || raw === '') return false; + return raw === '0' || raw.toLowerCase() === 'false'; +} + /** * Whether "turbo mode" is enabled. Turbo mode fast-paths the *first delivery of * the first invocation* of a run (detected by the entrypoint via `runInput` diff --git a/packages/core/src/runtime/step-executor.ts b/packages/core/src/runtime/step-executor.ts index c2fc3473c2..2b74e9ed2d 100644 --- a/packages/core/src/runtime/step-executor.ts +++ b/packages/core/src/runtime/step-executor.ts @@ -33,7 +33,10 @@ import { promoteAbortErrorToFatal, } from '../types.js'; -import { isOptimisticInlineStartEnabled } from './constants.js'; +import { + isOptimisticInlineStartEnabled, + isOptimisticInlineStartExplicitlyDisabled, +} from './constants.js'; import { getPortLazy } from './get-port-lazy.js'; import { memoizeEncryptionKey } from './helpers.js'; import { safeWaitUntil } from './wait-until.js'; @@ -218,6 +221,13 @@ export async function executeStep( // return `skipped` and never write the failure twice. if (params.lazyStepInput !== undefined) { try { + // Turbo: this lazy `step_started` must not precede the backgrounded + // `run_started`. Order it after the run-ready barrier (best-effort — + // a barrier rejection means the run doesn't exist, and the create + // below surfaces the real error). No-op outside turbo. + if (params.runReadyBarrier) { + await params.runReadyBarrier.catch(() => {}); + } await world.events.create(workflowRunId, { eventType: 'step_started', specVersion: SPEC_VERSION_CURRENT, @@ -327,10 +337,18 @@ export async function executeStep( // discard the body result. Running the body before confirming ownership can // execute a step more than once when handlers race — inline step bodies // must be idempotent; disable via WORKFLOW_OPTIMISTIC_INLINE_START=0. + // + // Turbo mode passes `forceOptimisticStart` to enable this regardless of the + // env flag (its single-handler guarantee removes the race). But it still + // defers to an *explicit* `WORKFLOW_OPTIMISTIC_INLINE_START=0`: forced + // optimistic start runs the body before `step_started`/`run_started` is + // confirmed, which is exactly the property an operator opts out of with that + // flag, so an explicit opt-out wins over turbo's force. const optimisticStart = params.lazyStepInput !== undefined && - (params.forceOptimisticStart === true || - isOptimisticInlineStartEnabled()); + (isOptimisticInlineStartEnabled() || + (params.forceOptimisticStart === true && + !isOptimisticInlineStartExplicitlyDisabled())); let step: Step; // Settled outcome of the in-flight optimistic `step_started`. Handlers are diff --git a/packages/core/src/runtime/step-handler.test.ts b/packages/core/src/runtime/step-handler.test.ts index d7322427e5..2825df04b4 100644 --- a/packages/core/src/runtime/step-handler.test.ts +++ b/packages/core/src/runtime/step-handler.test.ts @@ -1295,9 +1295,10 @@ describe('executeStep optimistic inline start', () => { expect(mockStepFn).not.toHaveBeenCalled(); }); - it('forces optimistic start via forceOptimisticStart even when the flag is off', async () => { - // Turbo passes forceOptimisticStart independent of the env var. - process.env.WORKFLOW_OPTIMISTIC_INLINE_START = '0'; + it('forces optimistic start via forceOptimisticStart when the flag is unset', async () => { + // Turbo passes forceOptimisticStart; with the env var UNSET (off by default + // but not an explicit opt-out), turbo forces optimistic start on. + delete process.env.WORKFLOW_OPTIMISTIC_INLINE_START; mockEventsCreate .mockReset() .mockImplementation((_runId: string, event: { eventType: string }) => { @@ -1320,8 +1321,33 @@ describe('executeStep optimistic inline start', () => { expect(result.type).toBe('skipped'); }); - it('holds the optimistic step_started until runReadyBarrier resolves, but runs the body immediately', async () => { + it('forceOptimisticStart defers to an EXPLICIT WORKFLOW_OPTIMISTIC_INLINE_START=0', async () => { + // An operator who explicitly set the flag to 0 has opted out of "body runs + // before start is confirmed"; that opt-out wins over turbo's force, so the + // step takes the await-then-run path and the body never runs on a 409. process.env.WORKFLOW_OPTIMISTIC_INLINE_START = '0'; + mockEventsCreate + .mockReset() + .mockImplementation((_runId: string, event: { eventType: string }) => { + if (event.eventType === 'step_started') { + return Promise.reject(new EntityConflictError('already running')); + } + return Promise.resolve({ event: {} }); + }); + + const world = await getWorld(); + const result = await executeStep({ + world: world as never, + ...baseParams, + forceOptimisticStart: true, + }); + + expect(result.type).toBe('skipped'); + expect(mockStepFn).not.toHaveBeenCalled(); + }); + + it('holds the optimistic step_started until runReadyBarrier resolves, but runs the body immediately', async () => { + delete process.env.WORKFLOW_OPTIMISTIC_INLINE_START; let release!: () => void; const barrier = new Promise((r) => { release = r; From 5dcc7d529082342ebf3c099e62f01ce2aff31a53 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Thu, 18 Jun 2026 17:02:55 -0700 Subject: [PATCH 7/9] test(core): widen vi.waitFor timeout in turbo body-runs assertions The two turbo tests that wait for the inline step body to run before releasing the run-ready barrier used vi.waitFor's 1s default, which the full VM replay can exceed on cold Windows CI (intermittent "expected [] to include 'body'"). Bump to 15s, matching the existing queue_dispatch_start waitFor in the same suite. Co-Authored-By: Claude Opus 4.8 (1M context) --- packages/core/src/runtime.test.ts | 6 +++++- packages/core/src/runtime/step-handler.test.ts | 7 +++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/packages/core/src/runtime.test.ts b/packages/core/src/runtime.test.ts index 37363c4e2c..3fa05f6c7f 100644 --- a/packages/core/src/runtime.test.ts +++ b/packages/core/src/runtime.test.ts @@ -1521,7 +1521,11 @@ describe('workflowEntrypoint turbo mode', () => { // The body runs while run_started is still in flight — proving run_started // was backgrounded AND optimistic start was forced (the env flag is off). - await vi.waitFor(() => expect(order).toContain('body')); + // The full VM replay leading up to the body can exceed vi.waitFor's default + // 1s timeout on slow CI runners (notably Windows), so widen it. + await vi.waitFor(() => expect(order).toContain('body'), { + timeout: 15_000, + }); expect(order).not.toContain('run_started_resolved'); // The lazy step_started is chained on the run-ready barrier, so it is not // even issued until run_started lands. diff --git a/packages/core/src/runtime/step-handler.test.ts b/packages/core/src/runtime/step-handler.test.ts index 2825df04b4..dc5a4bd670 100644 --- a/packages/core/src/runtime/step-handler.test.ts +++ b/packages/core/src/runtime/step-handler.test.ts @@ -1373,8 +1373,11 @@ describe('executeStep optimistic inline start', () => { }); // The body runs immediately against synthesized state; step_started is NOT - // issued until the run-ready barrier resolves. - await vi.waitFor(() => expect(calls).toContain('body')); + // issued until the run-ready barrier resolves. Widen the default 1s + // vi.waitFor timeout — reaching the body can be slow on cold CI (Windows). + await vi.waitFor(() => expect(calls).toContain('body'), { + timeout: 15_000, + }); expect(calls).not.toContain('step_started'); release(); From ddcb5aabe78cf161dd45d02e07c55c268a39baf0 Mon Sep 17 00:00:00 2001 From: Peter Wielander Date: Fri, 19 Jun 2026 11:29:57 -0700 Subject: [PATCH 8/9] fix(core): turbo re-invokes must not re-trigger turbo (async-queue wedge) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Turbo's immediate re-invoke exits returned `{ timeoutSeconds: 0 }`, which makes the queue reschedule the CURRENT delivery's message. That message carries `runInput`, and on async queues (graphile-worker / world-vercel) a reschedule comes back as delivery attempt 1 — so turbo re-engaged, skipped the event-log load again, replayed against an empty log, never observed the hook/attr event it had just written, re-suspended, and rescheduled forever. The run wedged (every hook + experimental_setAttributes e2e test timed out on world-postgres and world-vercel; world-local's reschedule increments the attempt, so it was unaffected). Turbo now re-invokes via an explicit continuation that carries NO `runInput` (`reinvoke()`), so the next delivery is a normal non-turbo load-and-replay that observes the committed events and makes progress. Applies to the hasHookConflict, hasAttributeEvents, hasAwaitedHookCreation, and throttle re-invoke exits. Verified against world-postgres: hook.getConflict() + experimental_setAttributes workflows that previously wedged now complete. Co-Authored-By: Claude Opus 4.8 (1M context) --- packages/core/src/runtime.ts | 39 ++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 96bffa6402..b081cd7d2f 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -547,6 +547,37 @@ export function workflowEntrypoint( } }; + // Re-invoke the orchestrator. Outside turbo this returns + // `{ timeoutSeconds }`, which makes the queue reschedule the + // CURRENT delivery's message. In turbo that is a trap: the + // current message carries `runInput`, and on async queues + // (e.g. graphile-worker) a reschedule comes back as delivery + // attempt 1 — so turbo re-engages, skips the event-log load + // again, replays against an empty log, never observes the + // hook/attr event this invocation just wrote, and re-suspends + // forever (the run wedges). Under turbo we instead enqueue an + // explicit continuation that carries NO `runInput`, so the + // next delivery is a normal (non-turbo) load-and-replay that + // observes the committed events and makes progress; we then + // return `undefined` so the queue treats this delivery as done + // rather than also rescheduling it. + const reinvoke = async ( + delaySeconds: number + ): Promise<{ timeoutSeconds: number } | undefined> => { + if (!turbo) return { timeoutSeconds: delaySeconds }; + await queueMessage( + world, + getWorkflowQueueName(workflowName, namespace), + { + runId, + traceCarrier: await nextTraceCarrier(), + requestedAt: new Date(), + }, + delaySeconds > 0 ? { delaySeconds } : undefined + ); + return undefined; + }; + // If incoming message has a stepId, this is a background step // execution. Execute the step, then check if all parallel steps // from the batch are done. If so, replay inline (saving a queue @@ -1335,7 +1366,7 @@ export function workflowEntrypoint( // Hook conflict: break loop, re-invoke via queue if (suspensionResult.hasHookConflict) { - return { timeoutSeconds: 0 }; + return await reinvoke(0); } // Native workflow attribute events are resolved through @@ -1344,7 +1375,7 @@ export function workflowEntrypoint( // durable attribute event can win without executing // the losing step. if (suspensionResult.hasAttributeEvents) { - return { timeoutSeconds: 0 }; + return await reinvoke(0); } const pendingSteps = suspensionResult.pendingSteps; @@ -1467,7 +1498,7 @@ export function workflowEntrypoint( // queued or none pending) the run would sit idle // until some unrelated message woke it. if (suspensionResult.hasAwaitedHookCreation) { - return { timeoutSeconds: 0 }; + return await reinvoke(0); } return; } @@ -1674,7 +1705,7 @@ export function workflowEntrypoint( // the in-invocation flush window (<= 500ms + waitUntil), // so ops settle before the post-backoff redelivery // replays and reads them. - return { timeoutSeconds: throttleTimeout }; + return await reinvoke(throttleTimeout); } if (toRetry.length > 0) { From 0cdbdfdb8670e58b58bfd585bb197a9f7ada0087 Mon Sep 17 00:00:00 2001 From: Pranay Prakash Date: Sat, 20 Jun 2026 13:11:34 -0700 Subject: [PATCH 9/9] test(core): guard turbo replay-stability + doc workflowStartedAt caveat Add a workflow.test.ts regression that replays a recorded step under a startedAt that diverges from createdAt, proving step correlation IDs are regenerated from the run-ID-derived fixedTimestamp (not startedAt) and so stay stable across deliveries. Reverting generateUlid to ulid(+startedAt) fails this test. Document in the turbo-mode changelog that getWorkflowMetadata().workflowStartedAt reflects the first delivery's clock under turbo (local on the first delivery, server-canonical on later ones) and must not drive replayed control flow. Co-Authored-By: Claude Opus 4.8 --- docs/content/docs/v5/changelog/turbo-mode.md | 4 + packages/core/src/workflow.test.ts | 79 ++++++++++++++++++++ 2 files changed, 83 insertions(+) diff --git a/docs/content/docs/v5/changelog/turbo-mode.md b/docs/content/docs/v5/changelog/turbo-mode.md index 0e721d39d0..0369787b70 100644 --- a/docs/content/docs/v5/changelog/turbo-mode.md +++ b/docs/content/docs/v5/changelog/turbo-mode.md @@ -55,6 +55,10 @@ The barrier orders **event** writes. The forced-optimistic first step **body** r The non-turbo path awaits `run_started` up front and, if the run was cancelled or expired between `start()` and this delivery, returns before any workflow/step code runs. Turbo synthesizes `status: 'running'` and runs the first step body optimistically, so such a cancellation is only observed when the backgrounded `run_started` (and the barrier-chained `step_started`) rejects — *after* the body's side effects have executed (they are then discarded via reconciliation). For non-idempotent first steps this is the same "body runs before ownership is confirmed" tradeoff as optimistic inline start; `WORKFLOW_TURBO=0` restores the up-front skip. +### `workflowStartedAt` reflects the first delivery's clock + +Replay matching — step/wait/hook correlation IDs, the VM seed, and the in-VM `Date.now()` — is derived from a replay-stable timestamp recovered from the run ID, so it does **not** depend on `startedAt` and is identical on every delivery. The one value that still tracks `startedAt` is the user-facing `getWorkflowMetadata().workflowStartedAt`: under turbo the first delivery synthesizes it from the local clock, while a later (non-turbo) delivery loads the server-canonical `startedAt`, so the two can differ by the start→first-delivery latency. Treat `workflowStartedAt` as an approximate, human-facing timestamp — do **not** branch workflow control flow on it (e.g. `Date.now() - +workflowStartedAt > threshold`), since that can take different paths across deliveries and diverge on replay. For timing logic that must survive replay, use the in-VM `Date.now()` / `new Date()`, which is replay-stable. + ## Configuration Turbo mode is **on by default**. Set `WORKFLOW_TURBO=0` (or `false`) to disable it — every invocation then takes the existing awaited path. This is a useful kill-switch for deployments whose first-step bodies are not idempotent and stream-safe (the same caveat as optimistic inline start), or for isolating behavior while debugging. diff --git a/packages/core/src/workflow.test.ts b/packages/core/src/workflow.test.ts index 04042ba057..12aebe329a 100644 --- a/packages/core/src/workflow.test.ts +++ b/packages/core/src/workflow.test.ts @@ -219,6 +219,85 @@ describe('runWorkflow', () => { ).toEqual(3); }); + it('regenerates step correlation IDs independent of startedAt (turbo replay-stability)', async () => { + // Turbo's first delivery synthesizes `startedAt` from the local clock, + // while later (non-turbo) deliveries load the server-canonical `startedAt`. + // Replay matching must NOT depend on `startedAt`: correlation IDs come from + // `generateUlid`, keyed off the run-ID-recovered `fixedTimestamp`, not + // `startedAt`. Here the recorded `add` event uses the createdAt-derived + // correlation ID, but `startedAt` is months away — replay must still + // regenerate the same ID and consume the completion rather than throwing + // ReplayDivergenceError. Reverting `generateUlid` to `ulid(+startedAt)` + // fails this test. + const ops: Promise[] = []; + const workflowRunId = 'wrun_123'; + const workflowRun: WorkflowRun = { + runId: workflowRunId, + workflowName: 'workflow', + status: 'running', + input: await dehydrateWorkflowArguments( + [], + 'wrun_123', + noEncryptionKey, + ops + ), + createdAt: new Date('2024-01-01T00:00:00.000Z'), + updatedAt: new Date('2024-01-01T00:00:00.000Z'), + // Diverges from createdAt by months — replay must ignore it. + startedAt: new Date('2024-06-01T12:34:56.000Z'), + deploymentId: 'test-deployment', + }; + + const events: Event[] = [ + { + eventId: 'event-0', + runId: workflowRunId, + eventType: 'step_started', + correlationId: 'step_01HK153X00SFW49DWMQP3J810S', + eventData: { + stepName: 'add', + }, + createdAt: new Date('2024-01-01T00:00:01.000Z'), + }, + { + eventId: 'event-1', + runId: workflowRunId, + eventType: 'step_completed', + correlationId: 'step_01HK153X00SFW49DWMQP3J810S', + eventData: { + stepName: 'add', + result: await dehydrateStepReturnValue( + 3, + 'wrun_123', + noEncryptionKey, + ops + ), + }, + createdAt: new Date('2024-01-01T00:00:02.000Z'), + }, + ]; + + const result = await runWorkflow( + `const add = globalThis[Symbol.for("WORKFLOW_USE_STEP")]("add"); + async function workflow() { + const a = await add(1, 2); + return a; + }${getWorkflowTransformCode('workflow')}`, + workflowRun, + events, + noEncryptionKey + ); + + expect( + await hydrateWorkflowReturnValue( + result as any, + 'wrun_123', + noEncryptionKey, + ops + ) + ).toEqual(3); + }); + // Test that timestamps update correctly as events are consumed it('should update the timestamp in the vm context as events are replayed', async () => { const ops: Promise[] = [];