diff --git a/.changeset/nitro-auto-world-start.md b/.changeset/nitro-auto-world-start.md new file mode 100644 index 0000000000..0cc0fd12d4 --- /dev/null +++ b/.changeset/nitro-auto-world-start.md @@ -0,0 +1,5 @@ +--- +'@workflow/nitro': minor +--- + +Start the workflow World automatically at server boot via a generated Nitro plugin, so self-hosted Nitro apps (Nitro v2/v3, Nuxt, Express/Hono/Fastify on Nitro) recover in-flight runs after a restart with no manual wiring. Skipped on Vercel deploys. diff --git a/.changeset/world-local-bundled-version.md b/.changeset/world-local-bundled-version.md new file mode 100644 index 0000000000..fd2d52d940 --- /dev/null +++ b/.changeset/world-local-bundled-version.md @@ -0,0 +1,5 @@ +--- +'@workflow/world-local': patch +--- + +Skip data-dir version-compat enforcement when the package version is the `bundled` sentinel (framework server bundles), so `world.start()` at server startup no longer throws `Invalid version string: "bundled"`. diff --git a/.changeset/world-start-recovery-core.md b/.changeset/world-start-recovery-core.md new file mode 100644 index 0000000000..d638d69a3c --- /dev/null +++ b/.changeset/world-start-recovery-core.md @@ -0,0 +1,6 @@ +--- +'workflow': minor +'@workflow/core': minor +--- + +Add `ensureWorldStarted()` (exported from `workflow/runtime`) which starts the World once per process at server startup, running boot-time recovery of in-flight runs for self-hosted worlds. Call it from your framework's startup hook (e.g. a Next.js `instrumentation.ts`). diff --git a/.changeset/world-start-recovery-vercel.md b/.changeset/world-start-recovery-vercel.md new file mode 100644 index 0000000000..85dbba1cdd --- /dev/null +++ b/.changeset/world-start-recovery-vercel.md @@ -0,0 +1,5 @@ +--- +'@workflow/world-vercel': minor +--- + +Add a no-op `start()` for World-interface compliance. The Vercel World is push-based (VQS redelivery), so it needs no boot-time recovery. diff --git a/.changeset/world-start-recovery-world.md b/.changeset/world-start-recovery-world.md new file mode 100644 index 0000000000..51b0b9df2c --- /dev/null +++ b/.changeset/world-start-recovery-world.md @@ -0,0 +1,5 @@ +--- +'@workflow/world': patch +--- + +Document the `start()` contract: it must be idempotent and may be a no-op for push-based/serverless worlds, and is where queue-backed worlds run boot-time recovery. diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 1665b31aaa..1dc3a7828c 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -931,11 +931,114 @@ jobs: env-vars: ${{ matrix.world.env-vars }} secrets: inherit + # Restart recovery: prove an in-flight (sleeping) run resumes after a hard + # restart with NO workflow operation — i.e. server startup wiring + # (instrumentation.ts -> ensureWorldStarted) runs boot-time recovery. Unlike + # the other e2e jobs, this one does NOT pre-start the server; the test owns + # the server lifecycle (spawn, SIGKILL, respawn). Covers local + postgres on + # nextjs-turbopack. + e2e-restart-recovery: + name: E2E Restart Recovery (nextjs-turbopack - ${{ matrix.world }}) + runs-on: ubuntu-latest + timeout-minutes: 30 + if: ${{ needs.ci-scope.outputs.fast-path != 'true' && !contains(github.event.pull_request.labels.*.name, 'workflow-server-test') }} + needs: [ci-scope, e2e-package-build] + strategy: + fail-fast: false + matrix: + world: [local, postgres] + + # Defined unconditionally; the local matrix entry simply does not use it. + services: + postgres: + image: postgres:18-alpine + env: + POSTGRES_USER: world + POSTGRES_PASSWORD: world + POSTGRES_DB: world + ports: + - 5432:5432 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + env: + TURBO_TOKEN: ${{ secrets.TURBO_TOKEN }} + TURBO_TEAM: ${{ vars.TURBO_TEAM }} + WORKFLOW_PUBLIC_MANIFEST: '1' + APP_NAME: nextjs-turbopack + RESTART_RECOVERY_TEST: '1' + DEPLOYMENT_URL: 'http://localhost:3000' + # Empty for the local matrix entry -> resolveWorkflowTargetWorld() falls + # back to the local world. + WORKFLOW_TARGET_WORLD: ${{ matrix.world == 'postgres' && '@workflow/world-postgres' || '' }} + WORKFLOW_POSTGRES_URL: ${{ matrix.world == 'postgres' && 'postgres://world:world@localhost:5432/world' || '' }} + + steps: + - name: Checkout Repo + uses: actions/checkout@v4 + with: + ref: ${{ github.event.pull_request.head.sha || github.sha }} + + - name: Setup environment + uses: ./.github/actions/setup-workflow-dev + with: + install-dependencies: 'false' + build-packages: 'false' + + - name: Install Dependencies + run: pnpm install --frozen-lockfile + + - name: Download shared package builds + uses: actions/download-artifact@v4 + with: + name: e2e-package-build-artifacts + path: packages + + - name: Setup PostgreSQL Database + if: ${{ matrix.world == 'postgres' }} + run: ./packages/world-postgres/bin/setup.js + + - name: Prepare workbench path + id: prepare-workbench + uses: ./.github/actions/prepare-workbench-path + with: + app-name: nextjs-turbopack + + - name: Build workbench + run: pnpm vitest run packages/core/e2e/local-build.test.ts + env: + APP_NAME: nextjs-turbopack + WORKBENCH_APP_PATH: ${{ steps.prepare-workbench.outputs.workbench_app_path }} + + - name: Run Restart Recovery Test + run: | + pnpm vitest run packages/core/e2e/restart-recovery.test.ts --reporter=default --reporter=json --reporter=./packages/core/e2e/github-reporter.ts --outputFile=e2e-restart-recovery-${{ matrix.world }}.json + env: + NODE_OPTIONS: "--enable-source-maps" + APP_NAME: nextjs-turbopack + WORKBENCH_APP_PATH: ${{ steps.prepare-workbench.outputs.workbench_app_path }} + + - name: Generate E2E summary + if: always() + run: node .github/scripts/aggregate-e2e-results.js . --job-name "E2E Restart Recovery (nextjs-turbopack - ${{ matrix.world }})" >> $GITHUB_STEP_SUMMARY || true + + - name: Upload E2E results + if: always() + uses: actions/upload-artifact@v4 + with: + name: e2e-results-restart-recovery-${{ matrix.world }} + path: e2e-restart-recovery-${{ matrix.world }}.json + retention-days: 7 + if-no-files-found: ignore + # Final job: Aggregate all E2E results and update PR comment summary: name: E2E Summary runs-on: ubuntu-latest - needs: [ci-scope, e2e-vercel-prod, e2e-local-dev, e2e-local-prod, e2e-local-postgres, e2e-windows] + needs: [ci-scope, e2e-vercel-prod, e2e-local-dev, e2e-local-prod, e2e-local-postgres, e2e-restart-recovery, e2e-windows] if: always() && !cancelled() && needs.ci-scope.outputs.fast-path != 'true' timeout-minutes: 10 @@ -966,15 +1069,17 @@ jobs: LOCAL_DEV_STATUS="${{ needs.e2e-local-dev.result }}" LOCAL_PROD_STATUS="${{ needs.e2e-local-prod.result }}" POSTGRES_STATUS="${{ needs.e2e-local-postgres.result }}" + RESTART_RECOVERY_STATUS="${{ needs.e2e-restart-recovery.result }}" WINDOWS_STATUS="${{ needs.e2e-windows.result }}" echo "vercel=$VERCEL_STATUS" >> $GITHUB_OUTPUT echo "local-dev=$LOCAL_DEV_STATUS" >> $GITHUB_OUTPUT echo "local-prod=$LOCAL_PROD_STATUS" >> $GITHUB_OUTPUT echo "postgres=$POSTGRES_STATUS" >> $GITHUB_OUTPUT + echo "restart-recovery=$RESTART_RECOVERY_STATUS" >> $GITHUB_OUTPUT echo "windows=$WINDOWS_STATUS" >> $GITHUB_OUTPUT - if [[ "$VERCEL_STATUS" == "failure" || "$LOCAL_DEV_STATUS" == "failure" || "$LOCAL_PROD_STATUS" == "failure" || "$POSTGRES_STATUS" == "failure" || "$WINDOWS_STATUS" == "failure" ]]; then + if [[ "$VERCEL_STATUS" == "failure" || "$LOCAL_DEV_STATUS" == "failure" || "$LOCAL_PROD_STATUS" == "failure" || "$POSTGRES_STATUS" == "failure" || "$RESTART_RECOVERY_STATUS" == "failure" || "$WINDOWS_STATUS" == "failure" ]]; then echo "has_failures=true" >> $GITHUB_OUTPUT else echo "has_failures=false" >> $GITHUB_OUTPUT @@ -1001,6 +1106,7 @@ jobs: - Local Dev: ${{ needs.e2e-local-dev.result }} - Local Prod: ${{ needs.e2e-local-prod.result }} - Local Postgres: ${{ needs.e2e-local-postgres.result }} + - Restart Recovery: ${{ needs.e2e-restart-recovery.result }} - Windows: ${{ needs.e2e-windows.result }} Check the [workflow run](${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}) for details. @@ -1010,7 +1116,7 @@ jobs: e2e-required-check: name: E2E Required Check runs-on: ubuntu-latest - needs: [ci-scope, unit, e2e-package-build, e2e-vercel-prod, e2e-local-dev, e2e-local-prod, e2e-local-postgres, e2e-windows] + needs: [ci-scope, unit, e2e-package-build, e2e-vercel-prod, e2e-local-dev, e2e-local-prod, e2e-local-postgres, e2e-restart-recovery, e2e-windows] if: always() timeout-minutes: 5 @@ -1023,6 +1129,7 @@ jobs: LOCAL_DEV_STATUS: ${{ needs.e2e-local-dev.result }} LOCAL_PROD_STATUS: ${{ needs.e2e-local-prod.result }} POSTGRES_STATUS: ${{ needs.e2e-local-postgres.result }} + RESTART_RECOVERY_STATUS: ${{ needs.e2e-restart-recovery.result }} WINDOWS_STATUS: ${{ needs.e2e-windows.result }} FAST_PATH: ${{ needs.ci-scope.outputs.fast-path }} VALIDATION_FAST_PATH: ${{ needs.ci-scope.outputs.validation-fast-path }} @@ -1050,6 +1157,7 @@ jobs: [[ "$LOCAL_DEV_STATUS" == "skipped" ]] || echo "Warning: e2e-local-dev was not skipped ($LOCAL_DEV_STATUS)" [[ "$LOCAL_PROD_STATUS" == "skipped" ]] || echo "Warning: e2e-local-prod was not skipped ($LOCAL_PROD_STATUS)" [[ "$POSTGRES_STATUS" == "skipped" ]] || echo "Warning: e2e-local-postgres was not skipped ($POSTGRES_STATUS)" + [[ "$RESTART_RECOVERY_STATUS" == "skipped" ]] || echo "Warning: e2e-restart-recovery was not skipped ($RESTART_RECOVERY_STATUS)" [[ "$WINDOWS_STATUS" == "skipped" ]] || echo "Warning: e2e-windows was not skipped ($WINDOWS_STATUS)" else echo "Standard PR - checking all jobs" @@ -1059,6 +1167,7 @@ jobs: [[ "$LOCAL_DEV_STATUS" == "success" ]] || FAILED_JOBS+=("e2e-local-dev ($LOCAL_DEV_STATUS)") [[ "$LOCAL_PROD_STATUS" == "success" ]] || FAILED_JOBS+=("e2e-local-prod ($LOCAL_PROD_STATUS)") [[ "$POSTGRES_STATUS" == "success" ]] || FAILED_JOBS+=("e2e-local-postgres ($POSTGRES_STATUS)") + [[ "$RESTART_RECOVERY_STATUS" == "success" ]] || FAILED_JOBS+=("e2e-restart-recovery ($RESTART_RECOVERY_STATUS)") [[ "$WINDOWS_STATUS" == "success" ]] || FAILED_JOBS+=("e2e-windows ($WINDOWS_STATUS)") fi diff --git a/docs/content/docs/v4/deploying/meta.json b/docs/content/docs/v4/deploying/meta.json index d9d363f308..0616229cf7 100644 --- a/docs/content/docs/v4/deploying/meta.json +++ b/docs/content/docs/v4/deploying/meta.json @@ -1,4 +1,4 @@ { "title": "Deploying", - "pages": ["...deploying", "building-a-world"] + "pages": ["...deploying", "recovering-in-flight-runs", "building-a-world"] } diff --git a/docs/content/docs/v4/deploying/recovering-in-flight-runs.mdx b/docs/content/docs/v4/deploying/recovering-in-flight-runs.mdx new file mode 100644 index 0000000000..2ef555aea7 --- /dev/null +++ b/docs/content/docs/v4/deploying/recovering-in-flight-runs.mdx @@ -0,0 +1,88 @@ +--- +title: Recovering in-flight runs +description: Start the World at server boot so runs that were in flight when the process stopped resume after a restart. +type: guide +summary: Call the World's start() at server boot to recover in-flight runs after a restart. +prerequisites: + - /docs/deploying +related: + - /docs/deploying/world/local-world + - /docs/deploying/world/postgres-world + - /docs/deploying/world/vercel-world +--- + +When you self-host a workflow app on a long-lived server (the [local](/docs/deploying/world/local-world) and [Postgres](/docs/deploying/world/postgres-world) worlds), a run can be mid-flight — sleeping, waiting on a hook, or between steps — when the process stops or crashes. To resume those runs, the World's `start()` method runs **boot-time recovery**: it re-enqueues every `pending`/`running` run so execution continues. + +Recovery only happens if `start()` is actually called, and it must be called **once at server startup** — not in response to a request. Otherwise an idle server that restarted with in-flight runs would never pick them back up. + +## `ensureWorldStarted()` + +Call `ensureWorldStarted()` from `workflow/runtime` in your framework's server-startup hook: + +```ts +import { ensureWorldStarted } from 'workflow/runtime'; + +await ensureWorldStarted(); +``` + +It is **idempotent** — it starts the World at most once per process, so it is safe to call from a hook that may run more than once. Re-enqueuing a run that is already progressing is harmless: the workflow handler is replay-idempotent, so duplicate enqueues converge rather than double-execute. + +You can call this regardless of which World you target. On the [Vercel World](/docs/deploying/world/vercel-world) it is a no-op — delivery is push-based and the queue redelivers in-flight messages on its own, so there is no long-lived process to recover. + +## Wiring it per framework + +### Next.js + +Add an `instrumentation.ts` at your project root. Guard on the Node.js runtime — `instrumentation.ts` also runs in the Edge runtime, which can't load the world modules: + +```ts title="instrumentation.ts" +export async function register() { + if (process.env.NEXT_RUNTIME === 'nodejs') { + const { ensureWorldStarted } = await import('workflow/runtime'); + await ensureWorldStarted(); + } +} +``` + +### Nitro, Nuxt, Express, Hono, Fastify (Nitro) + +No action required — the `@workflow/nitro` integration registers a Nitro server plugin that starts the World at boot for you. (Not on Vercel deploys, where the push-based Vercel World needs no boot recovery.) + +### SvelteKit + +Use the [`init`](https://svelte.dev/docs/kit/hooks#Shared-hooks-init) server hook: + +```ts title="src/hooks.server.ts" +import type { ServerInit } from '@sveltejs/kit'; + +export const init: ServerInit = async () => { + const { ensureWorldStarted } = await import('workflow/runtime'); + await ensureWorldStarted(); +}; +``` + +### NestJS + +Call it in your `bootstrap()` before listening: + +```ts title="src/main.ts" +async function bootstrap() { + const { ensureWorldStarted } = await import('workflow/runtime'); + await ensureWorldStarted(); + // ...create and listen +} +``` + +### Astro + +Astro has no startup hook that works across all adapters, so start the World from middleware. `ensureWorldStarted()` is idempotent, so it only does real work on the first request: + +```ts title="src/middleware.ts" +import { defineMiddleware } from 'astro:middleware'; + +export const onRequest = defineMiddleware(async (_context, next) => { + const { ensureWorldStarted } = await import('workflow/runtime'); + await ensureWorldStarted(); + return next(); +}); +``` diff --git a/docs/content/docs/v4/deploying/world/local-world.mdx b/docs/content/docs/v4/deploying/world/local-world.mdx index 126bb3589f..b1d0e46093 100644 --- a/docs/content/docs/v4/deploying/world/local-world.mdx +++ b/docs/content/docs/v4/deploying/world/local-world.mdx @@ -18,6 +18,18 @@ To explicitly use the local world in any environment, set the environment variab WORKFLOW_TARGET_WORLD=local ``` +## Starting the World + +The Local World keeps its queue in memory, so a run that is in flight (for example, sleeping) when the process stops only resumes if the World is started again on boot. Start it once at server startup so in-flight runs recover after a restart: + +```ts +import { ensureWorldStarted } from 'workflow/runtime'; + +await ensureWorldStarted(); +``` + +Where to call this depends on your framework — see [Recovering in-flight runs](/docs/deploying/recovering-in-flight-runs). + ## Observability The `workflow` CLI uses the local world by default. Running these commands inside your workflow project will show your local development workflows: diff --git a/docs/content/docs/v4/deploying/world/postgres-world.mdx b/docs/content/docs/v4/deploying/world/postgres-world.mdx index bc8f6a98c9..829ac9afa6 100644 --- a/docs/content/docs/v4/deploying/world/postgres-world.mdx +++ b/docs/content/docs/v4/deploying/world/postgres-world.mdx @@ -41,82 +41,15 @@ The migration is idempotent and can safely be run as a post-deployment lifecycle ## Starting the World -To subscribe to the graphile-worker queue, your workflow app needs to start the world on server start. Here are examples for a few frameworks: +The Postgres World runs a graphile-worker queue and performs boot-time recovery of in-flight runs, so your app **must start the World once at server startup**: - +```ts +import { ensureWorldStarted } from 'workflow/runtime'; - - -Create an `instrumentation.ts` file in your project root: - -```ts title="instrumentation.ts" lineNumbers -export async function register() { - if (process.env.NEXT_RUNTIME !== "edge") { - const { getWorld } = await import("workflow/runtime"); - const world = await getWorld(); - await world.start?.(); - } -} -``` - - -Learn more about [Next.js Instrumentation](https://nextjs.org/docs/app/guides/instrumentation). - - - - - - -Create a `src/hooks.server.ts` file: - -```ts title="src/hooks.server.ts" lineNumbers -import type { ServerInit } from "@sveltejs/kit"; - -export const init: ServerInit = async () => { - const { getWorld } = await import("workflow/runtime"); - const world = await getWorld(); - await world.start?.(); -}; -``` - - -Learn more about [SvelteKit Hooks](https://svelte.dev/docs/kit/hooks). - - - - - - -Create a plugin to start the world on server initialization: - -```ts title="plugins/start-pg-world.ts" lineNumbers -import { defineNitroPlugin } from "nitro/~internal/runtime/plugin"; - -export default defineNitroPlugin(async () => { - const { getWorld } = await import("workflow/runtime"); - const world = await getWorld(); - await world.start?.(); -}); +await ensureWorldStarted(); ``` -Register the plugin in your config: - -```ts title="nitro.config.ts" -import { defineNitroConfig } from "nitropack"; - -export default defineNitroConfig({ - modules: ["workflow/nitro"], - plugins: ["plugins/start-pg-world.ts"], -}); -``` - - -Learn more about [Nitro Plugins](https://v3.nitro.build/docs/plugins). - - - - - +Where to call this depends on your framework (Next.js `instrumentation.ts`, SvelteKit `init`, etc.; Nitro and Nuxt apps start it automatically) — see [Recovering in-flight runs](/docs/deploying/recovering-in-flight-runs). The Postgres World requires a long-lived worker process that polls the database for jobs. This does not work on serverless environments. For Vercel deployments, use the [Vercel World](/worlds/vercel) instead. diff --git a/docs/content/docs/v5/deploying/meta.json b/docs/content/docs/v5/deploying/meta.json index d9d363f308..0616229cf7 100644 --- a/docs/content/docs/v5/deploying/meta.json +++ b/docs/content/docs/v5/deploying/meta.json @@ -1,4 +1,4 @@ { "title": "Deploying", - "pages": ["...deploying", "building-a-world"] + "pages": ["...deploying", "recovering-in-flight-runs", "building-a-world"] } diff --git a/docs/content/docs/v5/deploying/recovering-in-flight-runs.mdx b/docs/content/docs/v5/deploying/recovering-in-flight-runs.mdx new file mode 100644 index 0000000000..2ef555aea7 --- /dev/null +++ b/docs/content/docs/v5/deploying/recovering-in-flight-runs.mdx @@ -0,0 +1,88 @@ +--- +title: Recovering in-flight runs +description: Start the World at server boot so runs that were in flight when the process stopped resume after a restart. +type: guide +summary: Call the World's start() at server boot to recover in-flight runs after a restart. +prerequisites: + - /docs/deploying +related: + - /docs/deploying/world/local-world + - /docs/deploying/world/postgres-world + - /docs/deploying/world/vercel-world +--- + +When you self-host a workflow app on a long-lived server (the [local](/docs/deploying/world/local-world) and [Postgres](/docs/deploying/world/postgres-world) worlds), a run can be mid-flight — sleeping, waiting on a hook, or between steps — when the process stops or crashes. To resume those runs, the World's `start()` method runs **boot-time recovery**: it re-enqueues every `pending`/`running` run so execution continues. + +Recovery only happens if `start()` is actually called, and it must be called **once at server startup** — not in response to a request. Otherwise an idle server that restarted with in-flight runs would never pick them back up. + +## `ensureWorldStarted()` + +Call `ensureWorldStarted()` from `workflow/runtime` in your framework's server-startup hook: + +```ts +import { ensureWorldStarted } from 'workflow/runtime'; + +await ensureWorldStarted(); +``` + +It is **idempotent** — it starts the World at most once per process, so it is safe to call from a hook that may run more than once. Re-enqueuing a run that is already progressing is harmless: the workflow handler is replay-idempotent, so duplicate enqueues converge rather than double-execute. + +You can call this regardless of which World you target. On the [Vercel World](/docs/deploying/world/vercel-world) it is a no-op — delivery is push-based and the queue redelivers in-flight messages on its own, so there is no long-lived process to recover. + +## Wiring it per framework + +### Next.js + +Add an `instrumentation.ts` at your project root. Guard on the Node.js runtime — `instrumentation.ts` also runs in the Edge runtime, which can't load the world modules: + +```ts title="instrumentation.ts" +export async function register() { + if (process.env.NEXT_RUNTIME === 'nodejs') { + const { ensureWorldStarted } = await import('workflow/runtime'); + await ensureWorldStarted(); + } +} +``` + +### Nitro, Nuxt, Express, Hono, Fastify (Nitro) + +No action required — the `@workflow/nitro` integration registers a Nitro server plugin that starts the World at boot for you. (Not on Vercel deploys, where the push-based Vercel World needs no boot recovery.) + +### SvelteKit + +Use the [`init`](https://svelte.dev/docs/kit/hooks#Shared-hooks-init) server hook: + +```ts title="src/hooks.server.ts" +import type { ServerInit } from '@sveltejs/kit'; + +export const init: ServerInit = async () => { + const { ensureWorldStarted } = await import('workflow/runtime'); + await ensureWorldStarted(); +}; +``` + +### NestJS + +Call it in your `bootstrap()` before listening: + +```ts title="src/main.ts" +async function bootstrap() { + const { ensureWorldStarted } = await import('workflow/runtime'); + await ensureWorldStarted(); + // ...create and listen +} +``` + +### Astro + +Astro has no startup hook that works across all adapters, so start the World from middleware. `ensureWorldStarted()` is idempotent, so it only does real work on the first request: + +```ts title="src/middleware.ts" +import { defineMiddleware } from 'astro:middleware'; + +export const onRequest = defineMiddleware(async (_context, next) => { + const { ensureWorldStarted } = await import('workflow/runtime'); + await ensureWorldStarted(); + return next(); +}); +``` diff --git a/docs/content/docs/v5/deploying/world/local-world.mdx b/docs/content/docs/v5/deploying/world/local-world.mdx index 126bb3589f..b1d0e46093 100644 --- a/docs/content/docs/v5/deploying/world/local-world.mdx +++ b/docs/content/docs/v5/deploying/world/local-world.mdx @@ -18,6 +18,18 @@ To explicitly use the local world in any environment, set the environment variab WORKFLOW_TARGET_WORLD=local ``` +## Starting the World + +The Local World keeps its queue in memory, so a run that is in flight (for example, sleeping) when the process stops only resumes if the World is started again on boot. Start it once at server startup so in-flight runs recover after a restart: + +```ts +import { ensureWorldStarted } from 'workflow/runtime'; + +await ensureWorldStarted(); +``` + +Where to call this depends on your framework — see [Recovering in-flight runs](/docs/deploying/recovering-in-flight-runs). + ## Observability The `workflow` CLI uses the local world by default. Running these commands inside your workflow project will show your local development workflows: diff --git a/docs/content/docs/v5/deploying/world/postgres-world.mdx b/docs/content/docs/v5/deploying/world/postgres-world.mdx index bc8f6a98c9..829ac9afa6 100644 --- a/docs/content/docs/v5/deploying/world/postgres-world.mdx +++ b/docs/content/docs/v5/deploying/world/postgres-world.mdx @@ -41,82 +41,15 @@ The migration is idempotent and can safely be run as a post-deployment lifecycle ## Starting the World -To subscribe to the graphile-worker queue, your workflow app needs to start the world on server start. Here are examples for a few frameworks: +The Postgres World runs a graphile-worker queue and performs boot-time recovery of in-flight runs, so your app **must start the World once at server startup**: - +```ts +import { ensureWorldStarted } from 'workflow/runtime'; - - -Create an `instrumentation.ts` file in your project root: - -```ts title="instrumentation.ts" lineNumbers -export async function register() { - if (process.env.NEXT_RUNTIME !== "edge") { - const { getWorld } = await import("workflow/runtime"); - const world = await getWorld(); - await world.start?.(); - } -} -``` - - -Learn more about [Next.js Instrumentation](https://nextjs.org/docs/app/guides/instrumentation). - - - - - - -Create a `src/hooks.server.ts` file: - -```ts title="src/hooks.server.ts" lineNumbers -import type { ServerInit } from "@sveltejs/kit"; - -export const init: ServerInit = async () => { - const { getWorld } = await import("workflow/runtime"); - const world = await getWorld(); - await world.start?.(); -}; -``` - - -Learn more about [SvelteKit Hooks](https://svelte.dev/docs/kit/hooks). - - - - - - -Create a plugin to start the world on server initialization: - -```ts title="plugins/start-pg-world.ts" lineNumbers -import { defineNitroPlugin } from "nitro/~internal/runtime/plugin"; - -export default defineNitroPlugin(async () => { - const { getWorld } = await import("workflow/runtime"); - const world = await getWorld(); - await world.start?.(); -}); +await ensureWorldStarted(); ``` -Register the plugin in your config: - -```ts title="nitro.config.ts" -import { defineNitroConfig } from "nitropack"; - -export default defineNitroConfig({ - modules: ["workflow/nitro"], - plugins: ["plugins/start-pg-world.ts"], -}); -``` - - -Learn more about [Nitro Plugins](https://v3.nitro.build/docs/plugins). - - - - - +Where to call this depends on your framework (Next.js `instrumentation.ts`, SvelteKit `init`, etc.; Nitro and Nuxt apps start it automatically) — see [Recovering in-flight runs](/docs/deploying/recovering-in-flight-runs). The Postgres World requires a long-lived worker process that polls the database for jobs. This does not work on serverless environments. For Vercel deployments, use the [Vercel World](/worlds/vercel) instead. diff --git a/packages/core/e2e/restart-recovery.test.ts b/packages/core/e2e/restart-recovery.test.ts new file mode 100644 index 0000000000..ef26bdf919 --- /dev/null +++ b/packages/core/e2e/restart-recovery.test.ts @@ -0,0 +1,370 @@ +/** + * Kill/restart recovery e2e test. + * + * Proves that a workflow run that is in flight (sleeping) when the server is + * hard-killed resumes to completion after the server is restarted — WITHOUT + * issuing any workflow operation against the restarted server. The only thing + * that can revive the run is the server's startup wiring calling + * `ensureWorldStarted()` (e.g. Next.js `instrumentation.ts`), which runs + * boot-time recovery (`reenqueueActiveRuns`) for the self-hosted worlds. + * + * Why this fails without the startup wiring: + * - local world: the in-memory queue (and its pending sleep timer) dies with + * the process. Only `reenqueueActiveRuns` re-enqueues the run on boot. + * - postgres world: the sleep is a durable graphile-worker job, but the + * worker only auto-starts on the next enqueue. With no post-restart + * operation, nothing enqueues — so the worker only boots (and drains the + * durable job) if `world.start()` is called at startup. + * + * The test drives the workflow start through the server's own + * `/api/workflows/start` route so the SERVER process owns the queue/worker; the + * test process is a pure observer that only reads run status from shared + * storage (filesystem for local, the shared DB for postgres). This is essential + * for postgres: if the test process called `start()` directly it would + * auto-boot a graphile-worker in the test process that would drain the queue + * regardless of the server, masking the behavior under test. + * + * Only runs when RESTART_RECOVERY_TEST=1 and against a local server + * (local or postgres world). Targets the nextjs-turbopack workbench. + */ +import { type ChildProcess, execSync, spawn } from 'node:child_process'; +import { setTimeout as sleep } from 'node:timers/promises'; +import { afterEach, beforeAll, describe, expect, test } from 'vitest'; +import { getWorld } from '../src/runtime'; +import { getWorkbenchAppPath, isLocalDeployment, setupWorld } from './utils'; + +const enabled = + process.env.RESTART_RECOVERY_TEST === '1' && isLocalDeployment(); + +const deploymentUrl = process.env.DEPLOYMENT_URL ?? 'http://localhost:3000'; +const port = Number(new URL(deploymentUrl).port || '3000'); + +// How long the workflow sleeps. Long enough that the run is reliably still +// sleeping when we detect it and kill the server, short enough that by the time +// the restarted server recovers it, the sleep deadline has typically already +// passed (so it completes promptly on replay). +const SLEEP_MS = 8_000; + +// How long the single step in `longStepWorkflow` runs. Long enough that the +// step's queue job is reliably still locked when we detect it and kill the +// server. +const LONG_STEP_MS = 12_000; + +const SERVER_READY_TIMEOUT_MS = 90_000; +const WAIT_CREATED_TIMEOUT_MS = 60_000; +const RECOVERY_TIMEOUT_MS = 120_000; + +let server: ChildProcess | undefined; + +const T0 = Date.now(); +function log(msg: string): void { + console.error(`[restart-recovery +${Date.now() - T0}ms] ${msg}`); +} + +function spawnServer(): ChildProcess { + const child = spawn('pnpm', ['start'], { + cwd: getWorkbenchAppPath(), + // detached so we can SIGKILL the whole process group (`next start` may + // spawn child processes) and simulate a hard crash. + detached: true, + stdio: 'inherit', + env: { + ...process.env, + PORT: String(port), + WORKFLOW_PUBLIC_MANIFEST: '1', + }, + }); + return child; +} + +async function waitForServerReady(child: ChildProcess): Promise { + const deadline = Date.now() + SERVER_READY_TIMEOUT_MS; + while (Date.now() < deadline) { + if (child.exitCode !== null) { + throw new Error( + `Server process exited early with code ${child.exitCode}` + ); + } + try { + const res = await fetch(deploymentUrl, { method: 'GET' }); + // Any HTTP response means the server is accepting connections. + if (res.status > 0) return; + } catch { + // not up yet + } + await sleep(500); + } + throw new Error( + `Server did not become ready within ${SERVER_READY_TIMEOUT_MS}ms` + ); +} + +/** True while anything still answers HTTP on the deployment URL. */ +async function isServerUp(): Promise { + try { + const res = await fetch(deploymentUrl, { method: 'GET' }); + return res.status > 0; + } catch { + return false; + } +} + +/** + * Hard-kill the server AND wait until the port is actually free. `pnpm start` + * wraps `next start`, so SIGKILLing only the wrapper can leave `next` alive on + * the port — which would let the "killed" server finish the run itself and + * defeat the test. We kill the process group, then poll until nothing answers, + * escalating to a port-based kill (`lsof`) as a backstop. + */ +async function killServer(child: ChildProcess): Promise { + const exited = + child.exitCode !== null + ? Promise.resolve() + : new Promise((resolve) => child.once('exit', () => resolve())); + try { + // Negative pid kills the whole process group (hard crash, no graceful + // drain — the in-memory queue is lost). + if (child.pid) process.kill(-child.pid, 'SIGKILL'); + } catch { + try { + child.kill('SIGKILL'); + } catch { + // already gone + } + } + await Promise.race([exited, sleep(5_000)]); + + // Ensure the port is truly free before the test restarts on it. + const deadline = Date.now() + 20_000; + while (Date.now() < deadline) { + if (!(await isServerUp())) return; + // Backstop: kill whatever is still holding the port. + try { + execSync(`lsof -ti tcp:${port} | xargs kill -9`, { stdio: 'ignore' }); + } catch { + // lsof found nothing or isn't available + } + await sleep(500); + } + throw new Error( + `Server still responding on port ${port} after kill — cannot guarantee a true restart` + ); +} + +/** Run ids of in-flight (pending/running) runs whose name matches `pattern`. */ +async function inFlightRunIds(pattern: RegExp): Promise> { + const world = await getWorld(); + const ids = new Set(); + for (const status of ['pending', 'running'] as const) { + const { data } = await world.runs.list({ status, resolveData: 'none' }); + for (const r of data) { + if (pattern.test(r.workflowName)) ids.add(r.runId); + } + } + return ids; +} + +/** + * Start a workflow on the SERVER (server-side `start()` so the SERVER process + * owns the queue/sleep timer/worker — essential: if the test process started + * it, the test process would own the timer/worker and killing the server + * wouldn't matter). + * + * The `/api/workflows/start` route streams until the workflow completes and only + * flushes the `X-Workflow-Run-Id` header with the body, so we must NOT await it + * (that would block, defeating the "kill mid-flight" goal). Instead we fire the + * request and discover the new run id by diffing shared storage — a read that + * never triggers delivery. + */ +async function startWorkflowOnServer( + workflowName: string, + args: unknown[], + pattern: RegExp +): Promise { + const before = await inFlightRunIds(pattern); + + // Fire-and-forget: the request reaches the server (which starts the run) but + // we never read the streaming body. Killing the server later rejects it. + void fetch(new URL('/api/workflows/start', deploymentUrl), { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ workflowName, args }), + }).catch(() => {}); + + const deadline = Date.now() + 30_000; + while (Date.now() < deadline) { + for (const id of await inFlightRunIds(pattern)) { + if (!before.has(id)) return id; + } + await sleep(250); + } + throw new Error(`Server did not start a new ${workflowName} run within 30s`); +} + +/** Wait until a step has begun executing (its queue job is held/locked). */ +async function waitForStepStarted(runId: string): Promise { + const world = await getWorld(); + const deadline = Date.now() + WAIT_CREATED_TIMEOUT_MS; + while (Date.now() < deadline) { + try { + const { data } = await world.events.list({ runId }); + if ( + data.some( + (e) => + e.eventType === 'step_started' || e.eventType === 'step_created' + ) + ) { + return; + } + } catch { + // run/events not visible yet + } + await sleep(250); + } + throw new Error( + `Run ${runId} did not start a step within ${WAIT_CREATED_TIMEOUT_MS}ms` + ); +} + +async function waitForWaitCreated(runId: string): Promise { + const world = await getWorld(); + const deadline = Date.now() + WAIT_CREATED_TIMEOUT_MS; + while (Date.now() < deadline) { + try { + const { data } = await world.events.list({ runId }); + if (data.some((e) => e.eventType === 'wait_created')) return; + } catch { + // run/events not visible yet + } + await sleep(500); + } + throw new Error( + `Run ${runId} did not reach a sleeping (wait_created) state within ${WAIT_CREATED_TIMEOUT_MS}ms` + ); +} + +async function waitForCompleted(runId: string): Promise { + const world = await getWorld(); + const deadline = Date.now() + RECOVERY_TIMEOUT_MS; + let lastStatus = 'unknown'; + while (Date.now() < deadline) { + try { + const run = await world.runs.get(runId); + if (run.status !== lastStatus) log(`run status -> ${run.status}`); + lastStatus = run.status; + if (run.status === 'completed') return; + if (run.status === 'failed' || run.status === 'cancelled') { + throw new Error( + `Run ${runId} ended in unexpected status: ${run.status}` + ); + } + } catch (err) { + if (err instanceof Error && /unexpected status/.test(err.message)) + throw err; + // run not readable yet + } + await sleep(1_000); + } + throw new Error( + `Run ${runId} did not recover to 'completed' within ${RECOVERY_TIMEOUT_MS}ms (last status: ${lastStatus}). ` + + `This indicates server startup did not start the World (ensureWorldStarted).` + ); +} + +describe.skipIf(!enabled)('restart recovery', () => { + beforeAll(() => { + setupWorld(deploymentUrl); + }); + + afterEach(async () => { + if (server) { + await killServer(server); + server = undefined; + } + }); + + test( + 'in-flight sleeping run resumes after a hard restart with no workflow op', + { + timeout: + RECOVERY_TIMEOUT_MS + + WAIT_CREATED_TIMEOUT_MS + + SERVER_READY_TIMEOUT_MS * 2, + }, + async () => { + // 1. Boot the server and start a sleeping workflow on it. + server = spawnServer(); + await waitForServerReady(server); + log('server #1 ready'); + const runId = await startWorkflowOnServer( + 'sleepingWorkflow', + [SLEEP_MS], + /sleepingWorkflow/ + ); + log(`started run ${runId}`); + + // 2. Wait until the run is durably sleeping (server scheduled the wait). + await waitForWaitCreated(runId); + log('run is sleeping (wait_created)'); + + // 3. Hard-kill the server mid-sleep (loses the in-memory queue timer; + // stops the postgres worker). + await killServer(server); + server = undefined; + log('server #1 killed (port free)'); + + // 4. Restart the server. Crucially, issue NO workflow operation against + // it — startup alone must trigger recovery. + server = spawnServer(); + await waitForServerReady(server); + log('server #2 ready'); + + // 5. The run should resume and complete purely from boot-time recovery. + await waitForCompleted(runId); + log('run completed'); + } + ); + + test( + 'in-flight run killed mid-step resumes after a hard restart with no workflow op', + { + timeout: + RECOVERY_TIMEOUT_MS + + WAIT_CREATED_TIMEOUT_MS + + SERVER_READY_TIMEOUT_MS * 2, + }, + async () => { + // Unlike the sleeping case (a delayed, unlocked queue job), this kills the + // server WHILE A STEP IS EXECUTING — so the step's queue job is held/locked + // by the worker at crash time. For postgres this exercises whether boot + // recovery can re-drive a run whose step job is still locked (graphile's + // stale-lock), since the re-dispatched step reuses the same correlationId + // job key. See https://github.com/vercel/workflow/issues/679. + server = spawnServer(); + await waitForServerReady(server); + log('server #1 ready'); + const runId = await startWorkflowOnServer( + 'longStepWorkflow', + [LONG_STEP_MS], + /longStepWorkflow/ + ); + log(`started run ${runId}`); + + await waitForStepStarted(runId); + // Give the worker a beat to actually lock the step job and enter the step. + await sleep(1_000); + log('run is mid-step (step job locked)'); + + await killServer(server); + server = undefined; + log('server #1 killed (port free)'); + + server = spawnServer(); + await waitForServerReady(server); + log('server #2 ready'); + + await waitForCompleted(runId); + log('run completed'); + } + ); +}); diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index da1f7ca615..ca52eca7b0 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -114,6 +114,7 @@ export { // filesystem operations into the flow route bundle. export { createWorld, + ensureWorldStarted, getWorld, getWorldHandlers, setWorld, diff --git a/packages/core/src/runtime/world.ts b/packages/core/src/runtime/world.ts index fcece0c860..0320604da6 100644 --- a/packages/core/src/runtime/world.ts +++ b/packages/core/src/runtime/world.ts @@ -26,12 +26,14 @@ const WorldCachePromise = Symbol.for('@workflow/world//cachePromise'); const StubbedWorldCachePromise = Symbol.for( '@workflow/world//stubbedCachePromise' ); +const WorldStartPromise = Symbol.for('@workflow/world//startPromise'); const globalSymbols: typeof globalThis & { [WorldCache]?: World; [StubbedWorldCache]?: World; [WorldCachePromise]?: Promise; [StubbedWorldCachePromise]?: Promise; + [WorldStartPromise]?: Promise; } = globalThis; // Dynamic import for custom world modules. Uses a standard import() @@ -178,6 +180,33 @@ export const getWorld = async (): Promise => { return globalSymbols[WorldCache]; }; +/** + * Ensure the World's background tasks are started exactly once per process, + * and that boot-time recovery (`reenqueueActiveRuns` for queue-backed Worlds) + * runs. Framework integrations call this at server startup — e.g. a Next.js + * `instrumentation.ts`, a Nitro server plugin, a SvelteKit `init` hook — so + * that in-flight runs resume after a restart WITHOUT requiring a workflow + * operation to wake the process. + * + * Idempotent: the start promise is cached on `globalThis` and reused, so + * repeated calls (e.g. Next.js invoking `register()` for multiple runtimes) + * start the World only once. On failure the cached promise is cleared so a + * later call can retry. Safe to call regardless of the target World — for + * push-based Worlds (Vercel) `world.start()` is a no-op. + */ +export const ensureWorldStarted = async (): Promise => { + if (!globalSymbols[WorldStartPromise]) { + globalSymbols[WorldStartPromise] = (async () => { + const world = await getWorld(); + await world.start?.(); + })().catch((err) => { + globalSymbols[WorldStartPromise] = undefined; + throw err; + }); + } + await globalSymbols[WorldStartPromise]; +}; + /** * Reset the cached world instance. This should be called when environment * variables change and you need to reinitialize the world with new config. @@ -187,6 +216,10 @@ export const setWorld = (world: World | undefined): void => { globalSymbols[StubbedWorldCache] = world; globalSymbols[WorldCachePromise] = undefined; globalSymbols[StubbedWorldCachePromise] = undefined; + // Clear the start guard too: a freshly injected world has not been started, + // so a subsequent ensureWorldStarted() should start it rather than no-op on + // the previous world's cached promise. + globalSymbols[WorldStartPromise] = undefined; }; // Register getWorld on globalThis so getWorldLazy can call it directly when diff --git a/packages/nitro/src/index.ts b/packages/nitro/src/index.ts index 3d2989bf46..6f46124c4d 100644 --- a/packages/nitro/src/index.ts +++ b/packages/nitro/src/index.ts @@ -1,5 +1,5 @@ -import { createRequire } from 'node:module'; import { mkdirSync, readFileSync, writeFileSync } from 'node:fs'; +import { createRequire } from 'node:module'; import { fileURLToPath, pathToFileURL } from 'node:url'; import { WORKFLOW_QUEUE_TRIGGER } from '@workflow/builders'; import { workflowTransformPlugin } from '@workflow/rollup'; @@ -229,6 +229,14 @@ export default { 'workflow/workflows.mjs' ); + // Start the World once at server boot (Nitro server plugin) so in-flight + // runs recover after a restart without needing a workflow operation. + // Covers self-hosted Nitro apps (Nitro v2/v3, Nuxt). Skipped on Vercel: + // the Vercel World's start() is a no-op (push-based — VQS redelivers). + if (!isVercelDeploy) { + addStartupPlugin(nitro); + } + // Nitro v3+ Vercel deploy: configure function rules for the combined // flow handler so it gets the queue triggers + max duration that the // workflow runtime needs. Workflow-required fields (`maxDuration`, @@ -292,6 +300,46 @@ export default { }, } satisfies NitroModule; +/** + * Auto-register a Nitro server plugin that starts the World once at app boot, + * so boot-time recovery (`reenqueueActiveRuns` for queue-backed self-hosted + * Worlds) runs after a restart without requiring a workflow operation to wake + * the process. + * + * The plugin is emitted as a real file in the build dir and imports + * `workflow/runtime` via a *bare* dynamic import (resolved by the bundler) — + * mirroring a hand-written Nitro plugin. This shares the same runtime module + * the flow handler loads, avoiding the CJS/ESM dual-load that a build-time + * `file://` import would trigger against the bundled flow handler. + * `ensureWorldStarted()` caches its start promise on `globalThis`, so the World + * is started exactly once even though the flow handler also reaches the runtime. + * + * Not registered for Vercel deploys (the Vercel World's start() is a no-op, and + * there is nothing to recover at boot for a push-based world). + */ +function addStartupPlugin(nitro: Nitro) { + const dir = join(nitro.options.buildDir, 'workflow'); + mkdirSync(dir, { recursive: true }); + const pluginPath = join(dir, 'start-world-plugin.mjs'); + writeFileSync( + pluginPath, + /* js */ `// Auto-generated by @workflow/nitro — starts the workflow World at server boot. +export default () => { + import('workflow/runtime') + .then(({ ensureWorldStarted }) => ensureWorldStarted()) + .catch((error) => { + console.error('[workflow] Failed to start World on server startup:', error); + }); +}; +` + ); + + nitro.options.plugins ||= []; + if (!nitro.options.plugins.includes(pluginPath)) { + nitro.options.plugins.push(pluginPath); + } +} + const DASHBOARD_VIRTUAL_ID = '#workflow/dashboard-handler'; function addDashboardHandler(nitro: Nitro) { diff --git a/packages/workflow/src/runtime.ts b/packages/workflow/src/runtime.ts index 9d8fcb20fa..f729eb4949 100644 --- a/packages/workflow/src/runtime.ts +++ b/packages/workflow/src/runtime.ts @@ -1,11 +1,12 @@ export { createWorld, + ensureWorldStarted, getWorld, getWorldHandlers, - healthCheck, type HealthCheckEndpoint, type HealthCheckOptions, type HealthCheckResult, + healthCheck, setWorld, workflowEntrypoint, } from '@workflow/core/runtime'; diff --git a/packages/world-local/src/init.ts b/packages/world-local/src/init.ts index 5b0f583a0f..24846fcca6 100644 --- a/packages/world-local/src/init.ts +++ b/packages/world-local/src/init.ts @@ -329,6 +329,17 @@ export async function initDataDir(dataDir: string): Promise { await ensureDataDir(dataDir); const packageInfo = await getPackageInfo(); + + // In bundled contexts (e.g. a framework's server bundle) the package version + // can't be read and `getPackageInfo()` returns the 'bundled' sentinel, which + // isn't a parseable semver. Version-compatibility enforcement isn't + // meaningful without a real version, so skip it. Without this guard, + // `world.start()` (called at server startup via `ensureWorldStarted()`) + // would throw "Invalid version string: \"bundled\"" and crash boot. + if (packageInfo.version === 'bundled') { + return; + } + const currentVersion = parseVersion(packageInfo.version); // Read existing version file diff --git a/packages/world-vercel/src/index.ts b/packages/world-vercel/src/index.ts index 0bdaa91ed6..2474a49ec7 100644 --- a/packages/world-vercel/src/index.ts +++ b/packages/world-vercel/src/index.ts @@ -39,6 +39,13 @@ export function createVercelWorld(config?: APIConfig): World { // `process.exit(1)` is an acceptable response to an exhausted replay // budget. processExitTriggersQueueRedelivery: true, + // No-op for interface compliance. On Vercel, delivery is push-based: the + // queue (VQS) holds in-flight continuations and redelivers them via fresh + // function invocations, so there is no long-lived process to "start" and no + // boot-time recovery to perform (cf. `processExitTriggersQueueRedelivery`). + // Framework integrations call `world.start()` unconditionally at startup; on + // Vercel this is intentionally a no-op rather than re-enqueuing active runs. + async start() {}, ...createQueue(config), ...createStorage(config), ...instrumentObject('world.streams', createStreamer(config)), diff --git a/packages/world/src/interfaces.ts b/packages/world/src/interfaces.ts index a30dae2ab9..2d2711211e 100644 --- a/packages/world/src/interfaces.ts +++ b/packages/world/src/interfaces.ts @@ -310,6 +310,22 @@ export interface World extends Queue, Streamer, Storage { /** * A function that will be called to start any background tasks needed by the World implementation. * For example, in the case of a queue backed World, this would start the queue processing. + * + * Framework integrations are expected to call this exactly once at server + * startup (e.g. from a Next.js `instrumentation.ts`, a Nitro server plugin, + * a SvelteKit `init` hook). Beyond starting background workers, this is also + * where a World performs **restart recovery**: re-enqueuing `pending`/`running` + * runs that were in flight when the process last stopped (see + * `reenqueueActiveRuns`). Because of this, `start()`: + * + * - MUST be idempotent. The shared `ensureWorldStarted()` helper guards + * against repeated invocation per process, but implementations should also + * tolerate being called more than once, and recovery re-enqueues must be + * safe to duplicate (the workflow handler is idempotent via event-log replay). + * - MAY be a no-op. Push-based / serverless Worlds (e.g. the Vercel World) + * need no boot recovery — durability comes from the queue's at-least-once + * redelivery, not from a long-lived process re-scanning storage — so they + * implement an empty `start()` purely for interface compliance. */ start?(): Promise; diff --git a/workbench/astro/src/middleware.ts b/workbench/astro/src/middleware.ts new file mode 100644 index 0000000000..b410202d21 --- /dev/null +++ b/workbench/astro/src/middleware.ts @@ -0,0 +1,14 @@ +import { defineMiddleware } from 'astro:middleware'; + +// Astro has no server-startup hook that works across all adapters, so start the +// World from middleware on the first request instead. `ensureWorldStarted()` is +// idempotent (it starts the World at most once per process), so this only does +// real work on the first request and is a cheap resolved-promise await +// thereafter. Starting the World runs boot-time recovery +// (`reenqueueActiveRuns`) for the local/postgres worlds so in-flight runs +// resume after a restart; it is a no-op on the Vercel World. +export const onRequest = defineMiddleware(async (_context, next) => { + const { ensureWorldStarted } = await import('workflow/runtime'); + await ensureWorldStarted(); + return next(); +}); diff --git a/workbench/example/workflows/99_e2e.ts b/workbench/example/workflows/99_e2e.ts index 304578eebb..c35bb9e8a1 100644 --- a/workbench/example/workflows/99_e2e.ts +++ b/workbench/example/workflows/99_e2e.ts @@ -206,6 +206,23 @@ export async function sleepingWorkflow(durationMs = 10_000) { return { startTime, endTime }; } +// A workflow whose only work is one long-running STEP (not a workflow sleep). +// While the step runs, its queue job is held/locked by the worker — used by the +// restart-recovery e2e to simulate a crash mid-step (a locked job). +async function longRunningStep(durationMs: number) { + 'use step'; + await new Promise((resolve) => setTimeout(resolve, durationMs)); + return durationMs; +} + +export async function longStepWorkflow(durationMs = 12_000) { + 'use workflow'; + const startTime = Date.now(); + await longRunningStep(durationMs); + const endTime = Date.now(); + return { startTime, endTime }; +} + export async function parallelSleepWorkflow() { 'use workflow'; const startTime = Date.now(); diff --git a/workbench/express/.gitignore b/workbench/express/.gitignore index fd7aafb07d..b2443e8290 100644 --- a/workbench/express/.gitignore +++ b/workbench/express/.gitignore @@ -10,3 +10,4 @@ _workflows.ts # Nitro .output +/.swc diff --git a/workbench/nest/src/main.ts b/workbench/nest/src/main.ts index b0c22548c6..e8b8b3d39d 100644 --- a/workbench/nest/src/main.ts +++ b/workbench/nest/src/main.ts @@ -4,15 +4,11 @@ import type { NestExpressApplication } from '@nestjs/platform-express'; import { AppModule } from './app.module.js'; async function bootstrap() { - // Start the Postgres World if configured - if (process.env.WORKFLOW_TARGET_WORLD === '@workflow/world-postgres') { - const { getWorld } = await import('workflow/runtime'); - const world = await getWorld(); - if (world.start) { - console.log('Starting World workers...'); - await world.start(); - } - } + // Start the World once at server boot so in-flight runs are recovered after a + // restart without needing a workflow operation. No-op on the Vercel World; + // runs recovery for the local/postgres worlds. + const { ensureWorldStarted } = await import('workflow/runtime'); + await ensureWorldStarted(); const app = await NestFactory.create(AppModule, { bodyParser: false, diff --git a/workbench/nextjs-turbopack/.gitignore b/workbench/nextjs-turbopack/.gitignore index 16abee95e3..68477b5423 100644 --- a/workbench/nextjs-turbopack/.gitignore +++ b/workbench/nextjs-turbopack/.gitignore @@ -43,3 +43,4 @@ next-env.d.ts # workflow _workflows.ts +/.swc diff --git a/workbench/nextjs-turbopack/instrumentation.ts b/workbench/nextjs-turbopack/instrumentation.ts new file mode 100644 index 0000000000..5946631d22 --- /dev/null +++ b/workbench/nextjs-turbopack/instrumentation.ts @@ -0,0 +1,14 @@ +import { registerOTel } from '@vercel/otel'; + +export async function register() { + registerOTel({ serviceName: 'nextjs-turbopack-workflow' }); + // Start the workflow World once at server boot so in-flight runs are + // recovered after a restart without needing a workflow operation. Only in the + // Node.js runtime (the Edge runtime can't load the world modules and doesn't + // own the queue/recovery loop). No-op on the Vercel World; runs recovery for + // the local/postgres worlds. + if (process.env.NEXT_RUNTIME === 'nodejs') { + const { ensureWorldStarted } = await import('workflow/runtime'); + await ensureWorldStarted(); + } +} diff --git a/workbench/nextjs-webpack/instrumentation.ts b/workbench/nextjs-webpack/instrumentation.ts new file mode 100644 index 0000000000..c7c4e11675 --- /dev/null +++ b/workbench/nextjs-webpack/instrumentation.ts @@ -0,0 +1,14 @@ +import { registerOTel } from '@vercel/otel'; + +export async function register() { + registerOTel({ serviceName: 'nextjs-webpack-workflow' }); + // Start the workflow World once at server boot so in-flight runs are + // recovered after a restart without needing a workflow operation. Only in the + // Node.js runtime (the Edge runtime can't load the world modules and doesn't + // own the queue/recovery loop). No-op on the Vercel World; runs recovery for + // the local/postgres worlds. + if (process.env.NEXT_RUNTIME === 'nodejs') { + const { ensureWorldStarted } = await import('workflow/runtime'); + await ensureWorldStarted(); + } +} diff --git a/workbench/nitro-v3/nitro.config.ts b/workbench/nitro-v3/nitro.config.ts index 946bbbb53c..5e800f3a10 100644 --- a/workbench/nitro-v3/nitro.config.ts +++ b/workbench/nitro-v3/nitro.config.ts @@ -3,5 +3,4 @@ import { defineConfig } from 'nitro'; export default defineConfig({ modules: ['workflow/nitro'], serverDir: './', - plugins: ['plugins/start-pg-world.ts'], }); diff --git a/workbench/nitro-v3/plugins/start-pg-world.ts b/workbench/nitro-v3/plugins/start-pg-world.ts deleted file mode 100644 index 88f5ef3b7c..0000000000 --- a/workbench/nitro-v3/plugins/start-pg-world.ts +++ /dev/null @@ -1,15 +0,0 @@ -import { definePlugin } from 'nitro'; - -// Start the Postgres World -// Needed since we test this in CI -export default definePlugin(async () => { - if (process.env.WORKFLOW_TARGET_WORLD === '@workflow/world-postgres') { - import('workflow/runtime').then(async ({ getWorld }) => { - const world = await getWorld(); - if (world.start) { - console.log('Starting World workers...'); - await world.start(); - } - }); - } -}); diff --git a/workbench/nuxt/server/plugins/start-pg-world.ts b/workbench/nuxt/server/plugins/start-pg-world.ts deleted file mode 100644 index 613d8e3110..0000000000 --- a/workbench/nuxt/server/plugins/start-pg-world.ts +++ /dev/null @@ -1,15 +0,0 @@ -import { defineNitroPlugin } from '#imports'; - -// Start the Postgres World -// Needed since we test this in CI -export default defineNitroPlugin(async () => { - if (process.env.WORKFLOW_TARGET_WORLD === '@workflow/world-postgres') { - import('workflow/runtime').then(async ({ getWorld }) => { - const world = await getWorld(); - if (world.start) { - console.log('Starting World workers...'); - await world.start(); - } - }); - } -}); diff --git a/workbench/sveltekit/src/hooks.server.ts b/workbench/sveltekit/src/hooks.server.ts index 619b303f8b..f952279ea4 100644 --- a/workbench/sveltekit/src/hooks.server.ts +++ b/workbench/sveltekit/src/hooks.server.ts @@ -1,14 +1,9 @@ import type { ServerInit } from '@sveltejs/kit'; export const init: ServerInit = async () => { - // Start the Postgres World - // Needed since we test this in CI - if (process.env.WORKFLOW_TARGET_WORLD === '@workflow/world-postgres') { - const { getWorld } = await import('workflow/runtime'); - const world = await getWorld(); - if (world.start) { - console.log('Starting World workers...'); - await world.start(); - } - } + // Start the World once at server boot so in-flight runs are recovered after a + // restart without needing a workflow operation. No-op on the Vercel World; + // runs recovery for the local/postgres worlds. + const { ensureWorldStarted } = await import('workflow/runtime'); + await ensureWorldStarted(); };