From bfa8cd3dde43c191aa510615567a0f358bda5ab0 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Wed, 12 Mar 2025 11:56:34 +0100 Subject: [PATCH 1/7] initial implementation --- examples/e2e/app-router/open-next.config.ts | 5 +- examples/e2e/app-router/wrangler.jsonc | 16 ++++ .../cloudflare/src/api/cloudflare-context.ts | 3 + .../src/api/durable-objects/queue.ts | 73 +++++++++++++++++++ packages/cloudflare/src/api/durable-queue.ts | 22 ++++++ .../cloudflare/src/cli/build/bundle-server.ts | 22 +++++- .../cloudflare/src/cli/templates/worker.ts | 3 + 7 files changed, 140 insertions(+), 4 deletions(-) create mode 100644 packages/cloudflare/src/api/durable-objects/queue.ts create mode 100644 packages/cloudflare/src/api/durable-queue.ts diff --git a/examples/e2e/app-router/open-next.config.ts b/examples/e2e/app-router/open-next.config.ts index 00db54284..e4913befa 100644 --- a/examples/e2e/app-router/open-next.config.ts +++ b/examples/e2e/app-router/open-next.config.ts @@ -1,10 +1,11 @@ import { defineCloudflareConfig } from "@opennextjs/cloudflare"; import d1TagCache from "@opennextjs/cloudflare/d1-tag-cache"; import kvIncrementalCache from "@opennextjs/cloudflare/kv-cache"; -import memoryQueue from "@opennextjs/cloudflare/memory-queue"; +// import memoryQueue from "@opennextjs/cloudflare/memory-queue"; +import doQueue from "@opennextjs/cloudflare/durable-queue"; export default defineCloudflareConfig({ incrementalCache: kvIncrementalCache, tagCache: d1TagCache, - queue: memoryQueue, + queue: doQueue, }); diff --git a/examples/e2e/app-router/wrangler.jsonc b/examples/e2e/app-router/wrangler.jsonc index 25be2dde5..cc5646e11 100644 --- a/examples/e2e/app-router/wrangler.jsonc +++ b/examples/e2e/app-router/wrangler.jsonc @@ -8,6 +8,22 @@ "directory": ".open-next/assets", "binding": "ASSETS" }, + "durable_objects": { + "bindings": [ + { + "name": "NEXT_CACHE_REVALIDATION_DURABLE_OBJECT", + "class_name": "DurableObjectQueueHandler" + } + ] + }, + "migrations": [ + { + "tag": "v1", + "new_classes": [ + "DurableObjectQueueHandler" + ] + } + ], "kv_namespaces": [ { "binding": "NEXT_CACHE_WORKERS_KV", diff --git a/packages/cloudflare/src/api/cloudflare-context.ts b/packages/cloudflare/src/api/cloudflare-context.ts index f13a6a5b6..43526bb39 100644 --- a/packages/cloudflare/src/api/cloudflare-context.ts +++ b/packages/cloudflare/src/api/cloudflare-context.ts @@ -1,5 +1,7 @@ import type { Context, RunningCodeOptions } from "node:vm"; +import type { DurableObjectQueueHandler } from "./durable-objects/queue"; + declare global { interface CloudflareEnv { NEXT_CACHE_WORKERS_KV?: KVNamespace; @@ -7,6 +9,7 @@ declare global { NEXT_CACHE_D1_TAGS_TABLE?: string; NEXT_CACHE_D1_REVALIDATIONS_TABLE?: string; NEXT_CACHE_REVALIDATION_WORKER?: Service; + NEXT_CACHE_REVALIDATION_DURABLE_OBJECT?: DurableObjectNamespace; ASSETS?: Fetcher; } } diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts new file mode 100644 index 000000000..6473ce5b3 --- /dev/null +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -0,0 +1,73 @@ +import { error } from "@opennextjs/aws/adapters/logger.js"; +import type { QueueMessage } from "@opennextjs/aws/types/overrides"; +import { IgnorableError } from "@opennextjs/aws/utils/error.js"; +import { DurableObject } from "cloudflare:workers"; + +const MAX_REVALIDATION_BY_DURABLE_OBJECT = 5; +const DEFAULT_REVALIDATION_TIMEOUT_MS = 10_000; + +interface ExtendedQueueMessage extends QueueMessage { + previewModeId: string; +} + +export class DurableObjectQueueHandler extends DurableObject { + // Ongoing revalidations are deduped by the deduplication id + // Since this is running in waitUntil, we expect the durable object state to persist this during the duration of the revalidation + // TODO: handle incremental cache with only eventual consistency (i.e. KV or R2/D1 with the optional cache layer on top) + ongoingRevalidations = new Map>(); + + service: NonNullable; + + // TODO: allow this to be configurable + maxRevalidations = MAX_REVALIDATION_BY_DURABLE_OBJECT; + + constructor(ctx: DurableObjectState, env: CloudflareEnv) { + super(ctx, env); + const service = env.NEXT_CACHE_REVALIDATION_WORKER; + // If there is no service binding, we throw an error because we can't revalidate without it + if (!service) throw new IgnorableError("No service binding for cache revalidation worker"); + this.service = service; + + } + + async revalidate(msg: ExtendedQueueMessage) { + // If there is already an ongoing revalidation, we don't need to revalidate again + if (this.ongoingRevalidations.has(msg.MessageDeduplicationId)) return; + + if(this.ongoingRevalidations.size >= MAX_REVALIDATION_BY_DURABLE_OBJECT) { + const ongoingRevalidations = this.ongoingRevalidations.values() + await this.ctx.blockConcurrencyWhile(() => Promise.race(ongoingRevalidations)); + } + + const revalidationPromise = this.executeRevalidation(msg); + + // We store the promise to dedupe the revalidation + this.ongoingRevalidations.set( + msg.MessageDeduplicationId, + revalidationPromise + ); + + this.ctx.waitUntil(revalidationPromise); + } + + private async executeRevalidation({MessageBody: {host, url}, MessageDeduplicationId, previewModeId}: ExtendedQueueMessage) { + try { + const protocol = host.includes("localhost") ? "http" : "https"; + + //TODO: handle the different types of errors that can occur during the fetch (i.e. timeout, network error, etc) + await this.service.fetch(`${protocol}://${host}${url}`, { + method: "HEAD", + headers: { + "x-prerender-revalidate": previewModeId, + "x-isr": "1", + }, + signal: AbortSignal.timeout(DEFAULT_REVALIDATION_TIMEOUT_MS) + }) + } catch (e) { + error(e); + } finally { + this.ongoingRevalidations.delete(MessageDeduplicationId); + } + } + +} \ No newline at end of file diff --git a/packages/cloudflare/src/api/durable-queue.ts b/packages/cloudflare/src/api/durable-queue.ts new file mode 100644 index 000000000..f91e47884 --- /dev/null +++ b/packages/cloudflare/src/api/durable-queue.ts @@ -0,0 +1,22 @@ +import type { Queue, QueueMessage } from "@opennextjs/aws/types/overrides"; +import { IgnorableError } from "@opennextjs/aws/utils/error.js"; + +import { getCloudflareContext } from "./cloudflare-context"; + + + +export default { + name: "durable-queue", + send: async (msg: QueueMessage) => { + const durableObject = getCloudflareContext().env.NEXT_CACHE_REVALIDATION_DURABLE_OBJECT; + if (!durableObject) throw new IgnorableError("No durable object binding for cache revalidation"); + + const id = durableObject.idFromName(msg.MessageGroupId); + const stub = durableObject.get(id); + const previewModeId = process.env.__NEXT_PREVIEW_MODE_ID!; + await stub.revalidate({ + ...msg, + previewModeId, + }); + } +} satisfies Queue; \ No newline at end of file diff --git a/packages/cloudflare/src/cli/build/bundle-server.ts b/packages/cloudflare/src/cli/build/bundle-server.ts index 580bdc9e5..a04a8bbf2 100644 --- a/packages/cloudflare/src/cli/build/bundle-server.ts +++ b/packages/cloudflare/src/cli/build/bundle-server.ts @@ -49,14 +49,30 @@ export async function bundleServer(buildOpts: BuildOptions): Promise { patches.copyPackageCliFiles(packageDistDir, buildOpts); const { appPath, outputDir, monorepoRoot } = buildOpts; - const serverFiles = path.join( + const baseManifestPath = path.join( outputDir, "server-functions/default", getPackagePath(buildOpts), - ".next/required-server-files.json" + ".next" + ) + const serverFiles = path.join( + baseManifestPath, + "required-server-files.json" ); const nextConfig = JSON.parse(fs.readFileSync(serverFiles, "utf-8")).config; + // TODO: This is a temporary solution to get the previewModeId from the prerender-manifest.json + // We should find a better way to get this value, probably directly provided from aws + // probably in an env variable exactly as for BUILD_ID + const prerenderManifest = path.join( + baseManifestPath, + "prerender-manifest.json" + ); + const prerenderManifestContent = fs.readFileSync(prerenderManifest, "utf-8"); + const prerenderManifestJson = JSON.parse(prerenderManifestContent); + const previewModeId = prerenderManifestJson.preview.previewModeId; + + console.log(`\x1b[35m⚙️ Bundling the OpenNext server...\n\x1b[0m`); await patchWebpackRuntime(buildOpts); @@ -144,6 +160,8 @@ export async function bundleServer(buildOpts: BuildOptions): Promise { "process.env.TURBOPACK": "false", // This define should be safe to use for Next 14.2+, earlier versions (13.5 and less) will cause trouble "process.env.__NEXT_EXPERIMENTAL_REACT": `${needsExperimentalReact(nextConfig)}`, + // Used for the durable object queue handler + "process.env.__NEXT_PREVIEW_MODE_ID": `"${previewModeId}"`, }, platform: "node", banner: { diff --git a/packages/cloudflare/src/cli/templates/worker.ts b/packages/cloudflare/src/cli/templates/worker.ts index 57504b863..4e36ceb3e 100644 --- a/packages/cloudflare/src/cli/templates/worker.ts +++ b/packages/cloudflare/src/cli/templates/worker.ts @@ -17,6 +17,9 @@ Object.defineProperty(globalThis, Symbol.for("__cloudflare-context__"), { }, }); +//@ts-expect-error: Will be resolved by wrangler build +export { DurableObjectQueueHandler } from "@opennextjs/cloudflare/durable-objects/queue"; + // Populate process.env on the first request let processEnvPopulated = false; From e823060e71f43f31540d0ca7c9756f455555ea32 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Thu, 13 Mar 2025 10:20:37 +0100 Subject: [PATCH 2/7] Handle failure and add some test --- .../src/api/durable-objects/queue.spec.ts | 274 ++++++++++++++++++ .../src/api/durable-objects/queue.ts | 94 +++++- 2 files changed, 363 insertions(+), 5 deletions(-) create mode 100644 packages/cloudflare/src/api/durable-objects/queue.spec.ts diff --git a/packages/cloudflare/src/api/durable-objects/queue.spec.ts b/packages/cloudflare/src/api/durable-objects/queue.spec.ts new file mode 100644 index 000000000..1a0f6398c --- /dev/null +++ b/packages/cloudflare/src/api/durable-objects/queue.spec.ts @@ -0,0 +1,274 @@ +import { describe, expect, it, vi } from "vitest"; + +import { DurableObjectQueueHandler } from "./queue"; + +vi.mock('cloudflare:workers', () => ({ + DurableObject: class { + ctx: DurableObjectState; + env: CloudflareEnv; + constructor(ctx: DurableObjectState, env: CloudflareEnv) { + this.ctx = ctx; + this.env = env; + } + } +})) + +const createDurableObjectQueue = ({fetchDuration, statusCode, headers}: { fetchDuration: number, statusCode?: number, headers?: Headers }) => { + const mockState = { + waitUntil: vi.fn(), + blockConcurrencyWhile: vi.fn().mockImplementation(async(fn) => fn()), + storage: { + setAlarm: vi.fn(), + getAlarm: vi.fn(), + } + }; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return new DurableObjectQueueHandler(mockState as any, { + NEXT_CACHE_REVALIDATION_WORKER: { + fetch: vi.fn().mockReturnValue(new Promise((res) => setTimeout(() => res(new Response(null, { + status: statusCode, + headers: headers ?? new Headers([["x-nextjs-cache", "REVALIDATED"]]) + })), fetchDuration))), + connect: vi.fn(), + } + }); +}; + +const createMessage = (dedupId: string) => ({ + MessageBody: { host: "test.local", url: "/test" }, + MessageGroupId: "test.local/test", + MessageDeduplicationId: dedupId, + previewModeId: "test", +}) + +describe('DurableObjectQueue', () => { + describe('successful revalidation', () => { + it("should process a single revalidation", async () => { + const queue = createDurableObjectQueue({fetchDuration:10}); + const firstRequest = await queue.revalidate(createMessage("id")); + expect(firstRequest).toBeUndefined(); + expect(queue.ongoingRevalidations.size).toBe(1); + expect(queue.ongoingRevalidations.has("id")).toBe(true); + + await queue.ongoingRevalidations.get("id"); + + expect(queue.ongoingRevalidations.size).toBe(0); + expect(queue.ongoingRevalidations.has("id")).toBe(false); + expect(queue.service.fetch).toHaveBeenCalledWith("https://test.local/test", { + method: "HEAD", + headers: { + "x-prerender-revalidate": "test", + "x-isr": "1", + }, + signal: expect.any(AbortSignal) + }); + }) + + it('should dedupe revalidations', async () => { + const queue = createDurableObjectQueue({fetchDuration:10}); + await queue.revalidate(createMessage("id")); + await queue.revalidate(createMessage("id")); + expect(queue.ongoingRevalidations.size).toBe(1); + expect(queue.ongoingRevalidations.has("id")).toBe(true); + }); + + it('should block concurrency', async () => { + const queue = createDurableObjectQueue({fetchDuration: 10}); + await queue.revalidate(createMessage("id")); + await queue.revalidate(createMessage("id2")); + await queue.revalidate(createMessage("id3")); + await queue.revalidate(createMessage("id4")); + await queue.revalidate(createMessage("id5")); + // the next one should block until one of the previous ones finishes + const blockedReq = queue.revalidate(createMessage("id6")); + + expect(queue.ongoingRevalidations.size).toBe(5); + expect(queue.ongoingRevalidations.has("id6")).toBe(false); + expect(Array.from(queue.ongoingRevalidations.keys())).toEqual(["id", "id2", "id3", "id4", "id5"]); + + // @ts-expect-error + expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(1); + + // Here we await the blocked request to ensure it's resolved + await blockedReq; + // We then need to await for the actual revalidation to finish + await Promise.all(Array.from(queue.ongoingRevalidations.values())); + expect(queue.ongoingRevalidations.size).toBe(0); + expect(queue.service.fetch).toHaveBeenCalledTimes(6); + }); + }) + + describe('failed revalidation', () => { + it('should not put it in failed state for an incorrect 200', async () => { + const queue = createDurableObjectQueue({ + fetchDuration: 10, + statusCode: 200, + headers: new Headers([["x-nextjs-cache", "MISS"]]) + }); + await queue.revalidate(createMessage("id")); + + await queue.ongoingRevalidations.get("id"); + + expect(queue.routeInFailedState.size).toBe(0); + }) + + it('should not put it in failed state for a failed revalidation with 404', async () => { + const queue = createDurableObjectQueue({ + fetchDuration: 10, + statusCode: 404, + }); + await queue.revalidate(createMessage("id")); + + await queue.ongoingRevalidations.get("id"); + + expect(queue.routeInFailedState.size).toBe(0); + expect(queue.service.fetch).toHaveBeenCalledTimes(1); + + await queue.revalidate(createMessage("id")); + + expect(queue.routeInFailedState.size).toBe(0); + expect(queue.service.fetch).toHaveBeenCalledTimes(2); + }); + + it('should put it in failed state if revalidation fails with 500', async () => { + const queue = createDurableObjectQueue({ + fetchDuration: 10, + statusCode: 500, + }); + await queue.revalidate(createMessage("id")); + + await queue.ongoingRevalidations.get("id"); + + expect(queue.routeInFailedState.size).toBe(1); + expect(queue.routeInFailedState.has("id")).toBe(true); + expect(queue.service.fetch).toHaveBeenCalledTimes(1); + + await queue.revalidate(createMessage("id")); + + expect(queue.routeInFailedState.size).toBe(1); + expect(queue.service.fetch).toHaveBeenCalledTimes(1); + }); + + it('should put it in failed state if revalidation fetch throw', async () => { + const queue = createDurableObjectQueue({ + fetchDuration: 10, + }); + // @ts-expect-error - This is mocked above + queue.service.fetch.mockImplementationOnce(() => Promise.reject(new Error("fetch error"))); + await queue.revalidate(createMessage("id")); + + await queue.ongoingRevalidations.get("id"); + + expect(queue.routeInFailedState.size).toBe(1); + expect(queue.routeInFailedState.has("id")).toBe(true); + expect(queue.ongoingRevalidations.size).toBe(0); + expect(queue.service.fetch).toHaveBeenCalledTimes(1); + + await queue.revalidate(createMessage("id")); + + expect(queue.routeInFailedState.size).toBe(1); + expect(queue.service.fetch).toHaveBeenCalledTimes(1); + }); + + }) + + describe('addAlarm', () => { + const getStorage = (queue: DurableObjectQueueHandler) : DurableObjectStorage => { + // @ts-expect-error - ctx is a protected field + return queue.ctx.storage + } + + it('should not add an alarm if there are no failed states', async () => { + const queue = createDurableObjectQueue({fetchDuration: 10}); + await queue.addAlarm(); + expect(getStorage(queue).setAlarm).not.toHaveBeenCalled(); + }); + + it('should add an alarm if there are failed states', async () => { + const queue = createDurableObjectQueue({fetchDuration: 10}); + queue.routeInFailedState.set("id", {msg: createMessage("id"), retryCount: 0, nextAlarm: 1000}); + await queue.addAlarm(); + expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(1000); + }); + + it('should not add an alarm if there is already an alarm set', async () => { + const queue = createDurableObjectQueue({fetchDuration: 10}); + queue.routeInFailedState.set("id", {msg: createMessage("id"), retryCount: 0, nextAlarm: 1000}); + // @ts-expect-error + queue.ctx.storage.getAlarm.mockResolvedValueOnce(1000); + await queue.addAlarm(); + expect(getStorage(queue).setAlarm).not.toHaveBeenCalled(); + }); + + it('should set the alarm to the lowest nextAlarm', async () => { + const queue = createDurableObjectQueue({fetchDuration: 10}); + queue.routeInFailedState.set("id", {msg: createMessage("id"), retryCount: 0, nextAlarm: 1000}); + queue.routeInFailedState.set("id2", {msg: createMessage("id2"), retryCount: 0, nextAlarm: 500}); + await queue.addAlarm(); + expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(500); + }); + }); + + + describe('addToFailedState', () => { + it('should add a failed state', async () => { + const queue = createDurableObjectQueue({fetchDuration: 10}); + await queue.addToFailedState(createMessage("id")); + expect(queue.routeInFailedState.size).toBe(1); + expect(queue.routeInFailedState.has("id")).toBe(true); + expect(queue.routeInFailedState.get("id")?.retryCount).toBe(1); + }); + + it('should add a failed state with the correct nextAlarm', async () => { + const queue = createDurableObjectQueue({fetchDuration: 10}); + await queue.addToFailedState(createMessage("id")); + expect(queue.routeInFailedState.get("id")?.nextAlarm).toBeGreaterThan(Date.now()); + expect(queue.routeInFailedState.get("id")?.retryCount).toBe(1) + }); + + it('should add a failed state with the correct nextAlarm for a retry', async () => { + const queue = createDurableObjectQueue({fetchDuration: 10}); + await queue.addToFailedState(createMessage("id")); + await queue.addToFailedState(createMessage("id")); + expect(queue.routeInFailedState.get("id")?.nextAlarm).toBeGreaterThan(Date.now()); + expect(queue.routeInFailedState.get("id")?.retryCount).toBe(2) + }); + + it('should not add a failed state if it has been retried 6 times', async () => { + const queue = createDurableObjectQueue({fetchDuration: 10}); + queue.routeInFailedState.set("id", {msg: createMessage("id"), retryCount: 6, nextAlarm: 1000}); + await queue.addToFailedState(createMessage("id")); + expect(queue.routeInFailedState.size).toBe(0); + }) + }) + + describe('alarm', () => { + it('should execute revalidations for expired events', async () => { + const queue = createDurableObjectQueue({fetchDuration: 10}); + queue.routeInFailedState.set("id", {msg: createMessage("id"), retryCount: 0, nextAlarm: Date.now()-1000}); + queue.routeInFailedState.set("id2", {msg: createMessage("id2"), retryCount: 0, nextAlarm: Date.now()-1000}); + await queue.alarm(); + expect(queue.routeInFailedState.size).toBe(0); + expect(queue.service.fetch).toHaveBeenCalledTimes(2); + }); + + it('should execute revalidations for the next event to retry', async () => { + const queue = createDurableObjectQueue({fetchDuration: 10}); + queue.routeInFailedState.set("id", {msg: createMessage("id"), retryCount: 0, nextAlarm: Date.now() + 1000}); + queue.routeInFailedState.set("id2", {msg: createMessage("id2"), retryCount: 0, nextAlarm: Date.now() + 500}); + await queue.alarm(); + expect(queue.routeInFailedState.size).toBe(1); + expect(queue.service.fetch).toHaveBeenCalledTimes(1); + expect(queue.routeInFailedState.has("id2")).toBe(false); + }); + + it('should execute revalidations for the next event to retry and expired events', async () => { + const queue = createDurableObjectQueue({fetchDuration: 10}); + queue.routeInFailedState.set("id", {msg: createMessage("id"), retryCount: 0, nextAlarm: Date.now() + 1000}); + queue.routeInFailedState.set("id2", {msg: createMessage("id2"), retryCount: 0, nextAlarm: Date.now()-1000}); + await queue.alarm(); + expect(queue.routeInFailedState.size).toBe(0); + expect(queue.service.fetch).toHaveBeenCalledTimes(2); + }); + }) +}) \ No newline at end of file diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts index 6473ce5b3..e472d5433 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -1,6 +1,6 @@ import { error } from "@opennextjs/aws/adapters/logger.js"; import type { QueueMessage } from "@opennextjs/aws/types/overrides"; -import { IgnorableError } from "@opennextjs/aws/utils/error.js"; +import { FatalError, IgnorableError, isOpenNextError, RecoverableError } from "@opennextjs/aws/utils/error.js"; import { DurableObject } from "cloudflare:workers"; const MAX_REVALIDATION_BY_DURABLE_OBJECT = 5; @@ -16,9 +16,12 @@ export class DurableObjectQueueHandler extends DurableObject { // TODO: handle incremental cache with only eventual consistency (i.e. KV or R2/D1 with the optional cache layer on top) ongoingRevalidations = new Map>(); + // TODO: restore the state of the failed revalidations - Probably in the next PR where i'll add the storage + routeInFailedState = new Map(); + service: NonNullable; - // TODO: allow this to be configurable + // TODO: allow this to be configurable - How do we want todo that? env variable? passed down from the queue override ? maxRevalidations = MAX_REVALIDATION_BY_DURABLE_OBJECT; constructor(ctx: DurableObjectState, env: CloudflareEnv) { @@ -34,8 +37,14 @@ export class DurableObjectQueueHandler extends DurableObject { // If there is already an ongoing revalidation, we don't need to revalidate again if (this.ongoingRevalidations.has(msg.MessageDeduplicationId)) return; + // The route is already in a failed state, it will be retried later + if (this.routeInFailedState.has(msg.MessageDeduplicationId)) return; + if(this.ongoingRevalidations.size >= MAX_REVALIDATION_BY_DURABLE_OBJECT) { const ongoingRevalidations = this.ongoingRevalidations.values() + // When there is more than the max revalidations, we block concurrency until one of the revalidations finishes + // We still await the promise to ensure the revalidation is completed + // This is fine because the queue itself run inside a waitUntil await this.ctx.blockConcurrencyWhile(() => Promise.race(ongoingRevalidations)); } @@ -47,15 +56,17 @@ export class DurableObjectQueueHandler extends DurableObject { revalidationPromise ); + // TODO: check if the object stays up during waitUntil so that the internal state is maintained this.ctx.waitUntil(revalidationPromise); } - private async executeRevalidation({MessageBody: {host, url}, MessageDeduplicationId, previewModeId}: ExtendedQueueMessage) { + private async executeRevalidation(msg: ExtendedQueueMessage) { try { + const {MessageBody: {host, url}, previewModeId} = msg; const protocol = host.includes("localhost") ? "http" : "https"; //TODO: handle the different types of errors that can occur during the fetch (i.e. timeout, network error, etc) - await this.service.fetch(`${protocol}://${host}${url}`, { + const response = await this.service.fetch(`${protocol}://${host}${url}`, { method: "HEAD", headers: { "x-prerender-revalidate": previewModeId, @@ -63,11 +74,84 @@ export class DurableObjectQueueHandler extends DurableObject { }, signal: AbortSignal.timeout(DEFAULT_REVALIDATION_TIMEOUT_MS) }) + // Now we need to handle errors from the fetch + if(response.status === 200 && response.headers.get("x-nextjs-cache") !== "REVALIDATED") { + // Something is very wrong here, it means that either the page is not ISR/SSG (and we shouldn't be here) or the `x-prerender-revalidate` header is not correct (and it should not happen either) + throw new FatalError(`The revalidation for ${host}${url} cannot be done. This error should never happen.`); + } else if (response.status === 404) { + // The page is not found, we should not revalidate it + throw new IgnorableError(`The revalidation for ${host}${url} cannot be done because the page is not found. It's either expected or an error in user code itself`); + } else if (response.status === 500) { + // A server error occurred, we should retry + + await this.addToFailedState(msg); + + throw new IgnorableError(`Something went wrong while revalidating ${host}${url}`); + } else if (response.status !== 200) { + // TODO: check if we need to handle cloudflare specific status codes/errors + // An unknown error occurred, most likely from something in user code like missing auth in the middleware + throw new RecoverableError(`An unknown error occurred while revalidating ${host}${url}`); + } } catch (e) { + // Do we want to propagate the error to the calling worker? + if(!isOpenNextError(e)) { + await this.addToFailedState(msg); + } error(e); } finally { - this.ongoingRevalidations.delete(MessageDeduplicationId); + this.ongoingRevalidations.delete(msg.MessageDeduplicationId); + } + } + + override async alarm() { + // We fetch the first event that needs to be retried or if the date is expired + const nextEventToRetry = Array.from(this.routeInFailedState.values()).filter((failing) => failing.nextAlarm > Date.now()).sort(({nextAlarm: a}, {nextAlarm: b}) => a - b)[0]; + // We also have to check if there are expired events, if the revalidation takes too long, or if the + const expiredEvents = Array.from(this.routeInFailedState.values()).filter(({nextAlarm}) => nextAlarm <= Date.now()); + const allEventsToRetry = (nextEventToRetry && nextEventToRetry.nextAlarm > Date.now()) ? [nextEventToRetry, ...expiredEvents] : expiredEvents; + for(const event of allEventsToRetry) { + await this.executeRevalidation(event.msg); + this.routeInFailedState.delete(event.msg.MessageDeduplicationId); } } + + async addToFailedState(msg: ExtendedQueueMessage) { + const existingFailedState = this.routeInFailedState.get(msg.MessageDeduplicationId); + let nextAlarm = Date.now() + 2_000; + + if(existingFailedState) { + if(existingFailedState.retryCount >= 6) { + // We give up after 6 retries and log the error + error(`The revalidation for ${msg.MessageBody.host}${msg.MessageBody.url} has failed after 6 retries. It will not be tried again, but subsequent ISR requests will retry.`); + this.routeInFailedState.delete(msg.MessageDeduplicationId); + return; + } + nextAlarm = Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * 2_000; + this.routeInFailedState.set(msg.MessageDeduplicationId, { + ...existingFailedState, + retryCount: existingFailedState.retryCount + 1, + nextAlarm + }); + } else { + this.routeInFailedState.set(msg.MessageDeduplicationId, { + msg, + retryCount: 1, + nextAlarm + }); + } + // We probably want to do something if routeInFailedState is becoming too big, at least log it + await this.addAlarm(); + + } + + async addAlarm() { + const existingAlarm = await this.ctx.storage.getAlarm({allowConcurrency: false}); + if(existingAlarm) return; + if(this.routeInFailedState.size === 0) return; + + const nextAlarmToSetup = Array.from(this.routeInFailedState.values()).reduce((acc, {nextAlarm}) => Math.min(acc, nextAlarm), Infinity); + await this.ctx.storage.setAlarm(nextAlarmToSetup); + } + } \ No newline at end of file From eaf8bf52728668a2fd97d8733f248557dce3c39f Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Thu, 13 Mar 2025 10:59:23 +0100 Subject: [PATCH 3/7] Fix linting --- examples/e2e/app-router/wrangler.jsonc | 4 +- .../src/api/durable-objects/queue.spec.ts | 212 +++++++++++------- .../src/api/durable-objects/queue.ts | 96 ++++---- packages/cloudflare/src/api/durable-queue.ts | 6 +- .../cloudflare/src/cli/build/bundle-server.ts | 11 +- 5 files changed, 189 insertions(+), 140 deletions(-) diff --git a/examples/e2e/app-router/wrangler.jsonc b/examples/e2e/app-router/wrangler.jsonc index cc5646e11..8440d8c5e 100644 --- a/examples/e2e/app-router/wrangler.jsonc +++ b/examples/e2e/app-router/wrangler.jsonc @@ -19,9 +19,7 @@ "migrations": [ { "tag": "v1", - "new_classes": [ - "DurableObjectQueueHandler" - ] + "new_classes": ["DurableObjectQueueHandler"] } ], "kv_namespaces": [ diff --git a/packages/cloudflare/src/api/durable-objects/queue.spec.ts b/packages/cloudflare/src/api/durable-objects/queue.spec.ts index 1a0f6398c..a7d3ecb08 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.spec.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.spec.ts @@ -2,7 +2,7 @@ import { describe, expect, it, vi } from "vitest"; import { DurableObjectQueueHandler } from "./queue"; -vi.mock('cloudflare:workers', () => ({ +vi.mock("cloudflare:workers", () => ({ DurableObject: class { ctx: DurableObjectState; env: CloudflareEnv; @@ -10,27 +10,45 @@ vi.mock('cloudflare:workers', () => ({ this.ctx = ctx; this.env = env; } - } -})) - -const createDurableObjectQueue = ({fetchDuration, statusCode, headers}: { fetchDuration: number, statusCode?: number, headers?: Headers }) => { + }, +})); + +const createDurableObjectQueue = ({ + fetchDuration, + statusCode, + headers, +}: { + fetchDuration: number; + statusCode?: number; + headers?: Headers; +}) => { const mockState = { waitUntil: vi.fn(), - blockConcurrencyWhile: vi.fn().mockImplementation(async(fn) => fn()), + blockConcurrencyWhile: vi.fn().mockImplementation(async (fn) => fn()), storage: { setAlarm: vi.fn(), getAlarm: vi.fn(), - } + }, }; // eslint-disable-next-line @typescript-eslint/no-explicit-any return new DurableObjectQueueHandler(mockState as any, { NEXT_CACHE_REVALIDATION_WORKER: { - fetch: vi.fn().mockReturnValue(new Promise((res) => setTimeout(() => res(new Response(null, { - status: statusCode, - headers: headers ?? new Headers([["x-nextjs-cache", "REVALIDATED"]]) - })), fetchDuration))), + fetch: vi.fn().mockReturnValue( + new Promise((res) => + setTimeout( + () => + res( + new Response(null, { + status: statusCode, + headers: headers ?? new Headers([["x-nextjs-cache", "REVALIDATED"]]), + }) + ), + fetchDuration + ) + ) + ), connect: vi.fn(), - } + }, }); }; @@ -39,19 +57,19 @@ const createMessage = (dedupId: string) => ({ MessageGroupId: "test.local/test", MessageDeduplicationId: dedupId, previewModeId: "test", -}) +}); -describe('DurableObjectQueue', () => { - describe('successful revalidation', () => { +describe("DurableObjectQueue", () => { + describe("successful revalidation", () => { it("should process a single revalidation", async () => { - const queue = createDurableObjectQueue({fetchDuration:10}); + const queue = createDurableObjectQueue({ fetchDuration: 10 }); const firstRequest = await queue.revalidate(createMessage("id")); expect(firstRequest).toBeUndefined(); expect(queue.ongoingRevalidations.size).toBe(1); expect(queue.ongoingRevalidations.has("id")).toBe(true); - + await queue.ongoingRevalidations.get("id"); - + expect(queue.ongoingRevalidations.size).toBe(0); expect(queue.ongoingRevalidations.has("id")).toBe(false); expect(queue.service.fetch).toHaveBeenCalledWith("https://test.local/test", { @@ -60,20 +78,20 @@ describe('DurableObjectQueue', () => { "x-prerender-revalidate": "test", "x-isr": "1", }, - signal: expect.any(AbortSignal) + signal: expect.any(AbortSignal), }); - }) - - it('should dedupe revalidations', async () => { - const queue = createDurableObjectQueue({fetchDuration:10}); + }); + + it("should dedupe revalidations", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); await queue.revalidate(createMessage("id")); await queue.revalidate(createMessage("id")); expect(queue.ongoingRevalidations.size).toBe(1); expect(queue.ongoingRevalidations.has("id")).toBe(true); }); - - it('should block concurrency', async () => { - const queue = createDurableObjectQueue({fetchDuration: 10}); + + it("should block concurrency", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); await queue.revalidate(createMessage("id")); await queue.revalidate(createMessage("id2")); await queue.revalidate(createMessage("id3")); @@ -81,14 +99,14 @@ describe('DurableObjectQueue', () => { await queue.revalidate(createMessage("id5")); // the next one should block until one of the previous ones finishes const blockedReq = queue.revalidate(createMessage("id6")); - + expect(queue.ongoingRevalidations.size).toBe(5); expect(queue.ongoingRevalidations.has("id6")).toBe(false); expect(Array.from(queue.ongoingRevalidations.keys())).toEqual(["id", "id2", "id3", "id4", "id5"]); - + // @ts-expect-error expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(1); - + // Here we await the blocked request to ensure it's resolved await blockedReq; // We then need to await for the actual revalidation to finish @@ -96,23 +114,23 @@ describe('DurableObjectQueue', () => { expect(queue.ongoingRevalidations.size).toBe(0); expect(queue.service.fetch).toHaveBeenCalledTimes(6); }); - }) + }); - describe('failed revalidation', () => { - it('should not put it in failed state for an incorrect 200', async () => { + describe("failed revalidation", () => { + it("should not put it in failed state for an incorrect 200", async () => { const queue = createDurableObjectQueue({ fetchDuration: 10, statusCode: 200, - headers: new Headers([["x-nextjs-cache", "MISS"]]) + headers: new Headers([["x-nextjs-cache", "MISS"]]), }); await queue.revalidate(createMessage("id")); await queue.ongoingRevalidations.get("id"); expect(queue.routeInFailedState.size).toBe(0); - }) + }); - it('should not put it in failed state for a failed revalidation with 404', async () => { + it("should not put it in failed state for a failed revalidation with 404", async () => { const queue = createDurableObjectQueue({ fetchDuration: 10, statusCode: 404, @@ -130,7 +148,7 @@ describe('DurableObjectQueue', () => { expect(queue.service.fetch).toHaveBeenCalledTimes(2); }); - it('should put it in failed state if revalidation fails with 500', async () => { + it("should put it in failed state if revalidation fails with 500", async () => { const queue = createDurableObjectQueue({ fetchDuration: 10, statusCode: 500, @@ -149,7 +167,7 @@ describe('DurableObjectQueue', () => { expect(queue.service.fetch).toHaveBeenCalledTimes(1); }); - it('should put it in failed state if revalidation fetch throw', async () => { + it("should put it in failed state if revalidation fetch throw", async () => { const queue = createDurableObjectQueue({ fetchDuration: 10, }); @@ -169,106 +187,128 @@ describe('DurableObjectQueue', () => { expect(queue.routeInFailedState.size).toBe(1); expect(queue.service.fetch).toHaveBeenCalledTimes(1); }); + }); - }) - - describe('addAlarm', () => { - const getStorage = (queue: DurableObjectQueueHandler) : DurableObjectStorage => { + describe("addAlarm", () => { + const getStorage = (queue: DurableObjectQueueHandler): DurableObjectStorage => { // @ts-expect-error - ctx is a protected field - return queue.ctx.storage - } + return queue.ctx.storage; + }; - it('should not add an alarm if there are no failed states', async () => { - const queue = createDurableObjectQueue({fetchDuration: 10}); + it("should not add an alarm if there are no failed states", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); await queue.addAlarm(); expect(getStorage(queue).setAlarm).not.toHaveBeenCalled(); }); - it('should add an alarm if there are failed states', async () => { - const queue = createDurableObjectQueue({fetchDuration: 10}); - queue.routeInFailedState.set("id", {msg: createMessage("id"), retryCount: 0, nextAlarm: 1000}); + it("should add an alarm if there are failed states", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarm: 1000 }); await queue.addAlarm(); expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(1000); }); - it('should not add an alarm if there is already an alarm set', async () => { - const queue = createDurableObjectQueue({fetchDuration: 10}); - queue.routeInFailedState.set("id", {msg: createMessage("id"), retryCount: 0, nextAlarm: 1000}); + it("should not add an alarm if there is already an alarm set", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarm: 1000 }); // @ts-expect-error queue.ctx.storage.getAlarm.mockResolvedValueOnce(1000); await queue.addAlarm(); expect(getStorage(queue).setAlarm).not.toHaveBeenCalled(); }); - it('should set the alarm to the lowest nextAlarm', async () => { - const queue = createDurableObjectQueue({fetchDuration: 10}); - queue.routeInFailedState.set("id", {msg: createMessage("id"), retryCount: 0, nextAlarm: 1000}); - queue.routeInFailedState.set("id2", {msg: createMessage("id2"), retryCount: 0, nextAlarm: 500}); + it("should set the alarm to the lowest nextAlarm", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarm: 1000 }); + queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, nextAlarm: 500 }); await queue.addAlarm(); expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(500); }); }); - - describe('addToFailedState', () => { - it('should add a failed state', async () => { - const queue = createDurableObjectQueue({fetchDuration: 10}); + describe("addToFailedState", () => { + it("should add a failed state", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); await queue.addToFailedState(createMessage("id")); expect(queue.routeInFailedState.size).toBe(1); expect(queue.routeInFailedState.has("id")).toBe(true); expect(queue.routeInFailedState.get("id")?.retryCount).toBe(1); }); - it('should add a failed state with the correct nextAlarm', async () => { - const queue = createDurableObjectQueue({fetchDuration: 10}); + it("should add a failed state with the correct nextAlarm", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); await queue.addToFailedState(createMessage("id")); expect(queue.routeInFailedState.get("id")?.nextAlarm).toBeGreaterThan(Date.now()); - expect(queue.routeInFailedState.get("id")?.retryCount).toBe(1) + expect(queue.routeInFailedState.get("id")?.retryCount).toBe(1); }); - it('should add a failed state with the correct nextAlarm for a retry', async () => { - const queue = createDurableObjectQueue({fetchDuration: 10}); + it("should add a failed state with the correct nextAlarm for a retry", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); await queue.addToFailedState(createMessage("id")); await queue.addToFailedState(createMessage("id")); expect(queue.routeInFailedState.get("id")?.nextAlarm).toBeGreaterThan(Date.now()); - expect(queue.routeInFailedState.get("id")?.retryCount).toBe(2) + expect(queue.routeInFailedState.get("id")?.retryCount).toBe(2); }); - it('should not add a failed state if it has been retried 6 times', async () => { - const queue = createDurableObjectQueue({fetchDuration: 10}); - queue.routeInFailedState.set("id", {msg: createMessage("id"), retryCount: 6, nextAlarm: 1000}); + it("should not add a failed state if it has been retried 6 times", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 6, nextAlarm: 1000 }); await queue.addToFailedState(createMessage("id")); expect(queue.routeInFailedState.size).toBe(0); - }) - }) - - describe('alarm', () => { - it('should execute revalidations for expired events', async () => { - const queue = createDurableObjectQueue({fetchDuration: 10}); - queue.routeInFailedState.set("id", {msg: createMessage("id"), retryCount: 0, nextAlarm: Date.now()-1000}); - queue.routeInFailedState.set("id2", {msg: createMessage("id2"), retryCount: 0, nextAlarm: Date.now()-1000}); + }); + }); + + describe("alarm", () => { + it("should execute revalidations for expired events", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + queue.routeInFailedState.set("id", { + msg: createMessage("id"), + retryCount: 0, + nextAlarm: Date.now() - 1000, + }); + queue.routeInFailedState.set("id2", { + msg: createMessage("id2"), + retryCount: 0, + nextAlarm: Date.now() - 1000, + }); await queue.alarm(); expect(queue.routeInFailedState.size).toBe(0); expect(queue.service.fetch).toHaveBeenCalledTimes(2); }); - it('should execute revalidations for the next event to retry', async () => { - const queue = createDurableObjectQueue({fetchDuration: 10}); - queue.routeInFailedState.set("id", {msg: createMessage("id"), retryCount: 0, nextAlarm: Date.now() + 1000}); - queue.routeInFailedState.set("id2", {msg: createMessage("id2"), retryCount: 0, nextAlarm: Date.now() + 500}); + it("should execute revalidations for the next event to retry", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + queue.routeInFailedState.set("id", { + msg: createMessage("id"), + retryCount: 0, + nextAlarm: Date.now() + 1000, + }); + queue.routeInFailedState.set("id2", { + msg: createMessage("id2"), + retryCount: 0, + nextAlarm: Date.now() + 500, + }); await queue.alarm(); expect(queue.routeInFailedState.size).toBe(1); expect(queue.service.fetch).toHaveBeenCalledTimes(1); expect(queue.routeInFailedState.has("id2")).toBe(false); }); - it('should execute revalidations for the next event to retry and expired events', async () => { - const queue = createDurableObjectQueue({fetchDuration: 10}); - queue.routeInFailedState.set("id", {msg: createMessage("id"), retryCount: 0, nextAlarm: Date.now() + 1000}); - queue.routeInFailedState.set("id2", {msg: createMessage("id2"), retryCount: 0, nextAlarm: Date.now()-1000}); + it("should execute revalidations for the next event to retry and expired events", async () => { + const queue = createDurableObjectQueue({ fetchDuration: 10 }); + queue.routeInFailedState.set("id", { + msg: createMessage("id"), + retryCount: 0, + nextAlarm: Date.now() + 1000, + }); + queue.routeInFailedState.set("id2", { + msg: createMessage("id2"), + retryCount: 0, + nextAlarm: Date.now() - 1000, + }); await queue.alarm(); expect(queue.routeInFailedState.size).toBe(0); expect(queue.service.fetch).toHaveBeenCalledTimes(2); }); - }) -}) \ No newline at end of file + }); +}); diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts index e472d5433..184e53d34 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -1,6 +1,11 @@ import { error } from "@opennextjs/aws/adapters/logger.js"; import type { QueueMessage } from "@opennextjs/aws/types/overrides"; -import { FatalError, IgnorableError, isOpenNextError, RecoverableError } from "@opennextjs/aws/utils/error.js"; +import { + FatalError, + IgnorableError, + isOpenNextError, + RecoverableError, +} from "@opennextjs/aws/utils/error.js"; import { DurableObject } from "cloudflare:workers"; const MAX_REVALIDATION_BY_DURABLE_OBJECT = 5; @@ -17,7 +22,10 @@ export class DurableObjectQueueHandler extends DurableObject { ongoingRevalidations = new Map>(); // TODO: restore the state of the failed revalidations - Probably in the next PR where i'll add the storage - routeInFailedState = new Map(); + routeInFailedState = new Map< + string, + { msg: ExtendedQueueMessage; retryCount: number; nextAlarm: number } + >(); service: NonNullable; @@ -30,9 +38,8 @@ export class DurableObjectQueueHandler extends DurableObject { // If there is no service binding, we throw an error because we can't revalidate without it if (!service) throw new IgnorableError("No service binding for cache revalidation worker"); this.service = service; - } - + async revalidate(msg: ExtendedQueueMessage) { // If there is already an ongoing revalidation, we don't need to revalidate again if (this.ongoingRevalidations.has(msg.MessageDeduplicationId)) return; @@ -40,8 +47,8 @@ export class DurableObjectQueueHandler extends DurableObject { // The route is already in a failed state, it will be retried later if (this.routeInFailedState.has(msg.MessageDeduplicationId)) return; - if(this.ongoingRevalidations.size >= MAX_REVALIDATION_BY_DURABLE_OBJECT) { - const ongoingRevalidations = this.ongoingRevalidations.values() + if (this.ongoingRevalidations.size >= MAX_REVALIDATION_BY_DURABLE_OBJECT) { + const ongoingRevalidations = this.ongoingRevalidations.values(); // When there is more than the max revalidations, we block concurrency until one of the revalidations finishes // We still await the promise to ensure the revalidation is completed // This is fine because the queue itself run inside a waitUntil @@ -51,10 +58,7 @@ export class DurableObjectQueueHandler extends DurableObject { const revalidationPromise = this.executeRevalidation(msg); // We store the promise to dedupe the revalidation - this.ongoingRevalidations.set( - msg.MessageDeduplicationId, - revalidationPromise - ); + this.ongoingRevalidations.set(msg.MessageDeduplicationId, revalidationPromise); // TODO: check if the object stays up during waitUntil so that the internal state is maintained this.ctx.waitUntil(revalidationPromise); @@ -62,7 +66,10 @@ export class DurableObjectQueueHandler extends DurableObject { private async executeRevalidation(msg: ExtendedQueueMessage) { try { - const {MessageBody: {host, url}, previewModeId} = msg; + const { + MessageBody: { host, url }, + previewModeId, + } = msg; const protocol = host.includes("localhost") ? "http" : "https"; //TODO: handle the different types of errors that can occur during the fetch (i.e. timeout, network error, etc) @@ -72,19 +79,23 @@ export class DurableObjectQueueHandler extends DurableObject { "x-prerender-revalidate": previewModeId, "x-isr": "1", }, - signal: AbortSignal.timeout(DEFAULT_REVALIDATION_TIMEOUT_MS) - }) + signal: AbortSignal.timeout(DEFAULT_REVALIDATION_TIMEOUT_MS), + }); // Now we need to handle errors from the fetch - if(response.status === 200 && response.headers.get("x-nextjs-cache") !== "REVALIDATED") { + if (response.status === 200 && response.headers.get("x-nextjs-cache") !== "REVALIDATED") { // Something is very wrong here, it means that either the page is not ISR/SSG (and we shouldn't be here) or the `x-prerender-revalidate` header is not correct (and it should not happen either) - throw new FatalError(`The revalidation for ${host}${url} cannot be done. This error should never happen.`); + throw new FatalError( + `The revalidation for ${host}${url} cannot be done. This error should never happen.` + ); } else if (response.status === 404) { // The page is not found, we should not revalidate it - throw new IgnorableError(`The revalidation for ${host}${url} cannot be done because the page is not found. It's either expected or an error in user code itself`); + throw new IgnorableError( + `The revalidation for ${host}${url} cannot be done because the page is not found. It's either expected or an error in user code itself` + ); } else if (response.status === 500) { // A server error occurred, we should retry - await this.addToFailedState(msg); + await this.addToFailedState(msg); throw new IgnorableError(`Something went wrong while revalidating ${host}${url}`); } else if (response.status !== 200) { @@ -94,7 +105,7 @@ export class DurableObjectQueueHandler extends DurableObject { } } catch (e) { // Do we want to propagate the error to the calling worker? - if(!isOpenNextError(e)) { + if (!isOpenNextError(e)) { await this.addToFailedState(msg); } error(e); @@ -105,25 +116,33 @@ export class DurableObjectQueueHandler extends DurableObject { override async alarm() { // We fetch the first event that needs to be retried or if the date is expired - const nextEventToRetry = Array.from(this.routeInFailedState.values()).filter((failing) => failing.nextAlarm > Date.now()).sort(({nextAlarm: a}, {nextAlarm: b}) => a - b)[0]; - // We also have to check if there are expired events, if the revalidation takes too long, or if the - const expiredEvents = Array.from(this.routeInFailedState.values()).filter(({nextAlarm}) => nextAlarm <= Date.now()); - const allEventsToRetry = (nextEventToRetry && nextEventToRetry.nextAlarm > Date.now()) ? [nextEventToRetry, ...expiredEvents] : expiredEvents; - for(const event of allEventsToRetry) { + const nextEventToRetry = Array.from(this.routeInFailedState.values()) + .filter((failing) => failing.nextAlarm > Date.now()) + .sort(({ nextAlarm: a }, { nextAlarm: b }) => a - b)[0]; + // We also have to check if there are expired events, if the revalidation takes too long, or if the + const expiredEvents = Array.from(this.routeInFailedState.values()).filter( + ({ nextAlarm }) => nextAlarm <= Date.now() + ); + const allEventsToRetry = + nextEventToRetry && nextEventToRetry.nextAlarm > Date.now() + ? [nextEventToRetry, ...expiredEvents] + : expiredEvents; + for (const event of allEventsToRetry) { await this.executeRevalidation(event.msg); this.routeInFailedState.delete(event.msg.MessageDeduplicationId); } } - async addToFailedState(msg: ExtendedQueueMessage) { const existingFailedState = this.routeInFailedState.get(msg.MessageDeduplicationId); let nextAlarm = Date.now() + 2_000; - - if(existingFailedState) { - if(existingFailedState.retryCount >= 6) { + + if (existingFailedState) { + if (existingFailedState.retryCount >= 6) { // We give up after 6 retries and log the error - error(`The revalidation for ${msg.MessageBody.host}${msg.MessageBody.url} has failed after 6 retries. It will not be tried again, but subsequent ISR requests will retry.`); + error( + `The revalidation for ${msg.MessageBody.host}${msg.MessageBody.url} has failed after 6 retries. It will not be tried again, but subsequent ISR requests will retry.` + ); this.routeInFailedState.delete(msg.MessageDeduplicationId); return; } @@ -131,27 +150,28 @@ export class DurableObjectQueueHandler extends DurableObject { this.routeInFailedState.set(msg.MessageDeduplicationId, { ...existingFailedState, retryCount: existingFailedState.retryCount + 1, - nextAlarm + nextAlarm, }); } else { this.routeInFailedState.set(msg.MessageDeduplicationId, { msg, retryCount: 1, - nextAlarm + nextAlarm, }); } // We probably want to do something if routeInFailedState is becoming too big, at least log it await this.addAlarm(); - } async addAlarm() { - const existingAlarm = await this.ctx.storage.getAlarm({allowConcurrency: false}); - if(existingAlarm) return; - if(this.routeInFailedState.size === 0) return; - - const nextAlarmToSetup = Array.from(this.routeInFailedState.values()).reduce((acc, {nextAlarm}) => Math.min(acc, nextAlarm), Infinity); + const existingAlarm = await this.ctx.storage.getAlarm({ allowConcurrency: false }); + if (existingAlarm) return; + if (this.routeInFailedState.size === 0) return; + + const nextAlarmToSetup = Array.from(this.routeInFailedState.values()).reduce( + (acc, { nextAlarm }) => Math.min(acc, nextAlarm), + Infinity + ); await this.ctx.storage.setAlarm(nextAlarmToSetup); } - -} \ No newline at end of file +} diff --git a/packages/cloudflare/src/api/durable-queue.ts b/packages/cloudflare/src/api/durable-queue.ts index f91e47884..52dd3bdcc 100644 --- a/packages/cloudflare/src/api/durable-queue.ts +++ b/packages/cloudflare/src/api/durable-queue.ts @@ -3,8 +3,6 @@ import { IgnorableError } from "@opennextjs/aws/utils/error.js"; import { getCloudflareContext } from "./cloudflare-context"; - - export default { name: "durable-queue", send: async (msg: QueueMessage) => { @@ -18,5 +16,5 @@ export default { ...msg, previewModeId, }); - } -} satisfies Queue; \ No newline at end of file + }, +} satisfies Queue; diff --git a/packages/cloudflare/src/cli/build/bundle-server.ts b/packages/cloudflare/src/cli/build/bundle-server.ts index a04a8bbf2..425ee9e02 100644 --- a/packages/cloudflare/src/cli/build/bundle-server.ts +++ b/packages/cloudflare/src/cli/build/bundle-server.ts @@ -54,25 +54,18 @@ export async function bundleServer(buildOpts: BuildOptions): Promise { "server-functions/default", getPackagePath(buildOpts), ".next" - ) - const serverFiles = path.join( - baseManifestPath, - "required-server-files.json" ); + const serverFiles = path.join(baseManifestPath, "required-server-files.json"); const nextConfig = JSON.parse(fs.readFileSync(serverFiles, "utf-8")).config; // TODO: This is a temporary solution to get the previewModeId from the prerender-manifest.json // We should find a better way to get this value, probably directly provided from aws // probably in an env variable exactly as for BUILD_ID - const prerenderManifest = path.join( - baseManifestPath, - "prerender-manifest.json" - ); + const prerenderManifest = path.join(baseManifestPath, "prerender-manifest.json"); const prerenderManifestContent = fs.readFileSync(prerenderManifest, "utf-8"); const prerenderManifestJson = JSON.parse(prerenderManifestContent); const previewModeId = prerenderManifestJson.preview.previewModeId; - console.log(`\x1b[35m⚙️ Bundling the OpenNext server...\n\x1b[0m`); await patchWebpackRuntime(buildOpts); From 5e289072621927ca409c2b65cd16e68d9fb288a0 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Thu, 13 Mar 2025 11:34:50 +0100 Subject: [PATCH 4/7] update aws dep --- packages/cloudflare/package.json | 2 +- .../src/api/durable-objects/queue.spec.ts | 4 ++-- .../cloudflare/src/api/memory-queue.spec.ts | 19 +++++++++++++------ pnpm-lock.yaml | 10 +++++----- 4 files changed, 21 insertions(+), 14 deletions(-) diff --git a/packages/cloudflare/package.json b/packages/cloudflare/package.json index a6bc9ded1..615addcf4 100644 --- a/packages/cloudflare/package.json +++ b/packages/cloudflare/package.json @@ -73,7 +73,7 @@ "dependencies": { "@ast-grep/napi": "^0.36.1", "@dotenvx/dotenvx": "catalog:", - "@opennextjs/aws": "https://pkg.pr.new/@opennextjs/aws@7e23eee", + "@opennextjs/aws": "https://pkg.pr.new/@opennextjs/aws@773", "enquirer": "^2.4.1", "glob": "catalog:", "yaml": "^2.7.0" diff --git a/packages/cloudflare/src/api/durable-objects/queue.spec.ts b/packages/cloudflare/src/api/durable-objects/queue.spec.ts index a7d3ecb08..20efc40e4 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.spec.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.spec.ts @@ -52,8 +52,8 @@ const createDurableObjectQueue = ({ }); }; -const createMessage = (dedupId: string) => ({ - MessageBody: { host: "test.local", url: "/test" }, +const createMessage = (dedupId: string, lastModified = Date.now()) => ({ + MessageBody: { host: "test.local", url: "/test", eTag: "test", lastModified }, MessageGroupId: "test.local/test", MessageDeduplicationId: dedupId, previewModeId: "test", diff --git a/packages/cloudflare/src/api/memory-queue.spec.ts b/packages/cloudflare/src/api/memory-queue.spec.ts index 52876d6f3..27c5e3d16 100644 --- a/packages/cloudflare/src/api/memory-queue.spec.ts +++ b/packages/cloudflare/src/api/memory-queue.spec.ts @@ -12,6 +12,13 @@ vi.mock("./cloudflare-context", () => ({ }), })); +const generateMessageBody = ({ host, url }: { host: string; url: string }) => ({ + host, + url, + eTag: "etag", + lastModified: Date.now(), +}); + describe("MemoryQueue", () => { beforeAll(() => { vi.useFakeTimers(); @@ -22,7 +29,7 @@ describe("MemoryQueue", () => { it("should process revalidations for a path", async () => { const firstRequest = cache.send({ - MessageBody: { host: "test.local", url: "/test" }, + MessageBody: generateMessageBody({ host: "test.local", url: "/test" }), MessageGroupId: generateMessageGroupId("/test"), MessageDeduplicationId: "", }); @@ -31,7 +38,7 @@ describe("MemoryQueue", () => { expect(mockServiceWorkerFetch).toHaveBeenCalledTimes(1); const secondRequest = cache.send({ - MessageBody: { host: "test.local", url: "/test" }, + MessageBody: generateMessageBody({ host: "test.local", url: "/test" }), MessageGroupId: generateMessageGroupId("/test"), MessageDeduplicationId: "", }); @@ -42,7 +49,7 @@ describe("MemoryQueue", () => { it("should process revalidations for multiple paths", async () => { const firstRequest = cache.send({ - MessageBody: { host: "test.local", url: "/test" }, + MessageBody: generateMessageBody({ host: "test.local", url: "/test" }), MessageGroupId: generateMessageGroupId("/test"), MessageDeduplicationId: "", }); @@ -51,7 +58,7 @@ describe("MemoryQueue", () => { expect(mockServiceWorkerFetch).toHaveBeenCalledTimes(1); const secondRequest = cache.send({ - MessageBody: { host: "test.local", url: "/test" }, + MessageBody: generateMessageBody({ host: "test.local", url: "/test" }), MessageGroupId: generateMessageGroupId("/other"), MessageDeduplicationId: "", }); @@ -63,12 +70,12 @@ describe("MemoryQueue", () => { it("should de-dupe revalidations", async () => { const requests = [ cache.send({ - MessageBody: { host: "test.local", url: "/test" }, + MessageBody: generateMessageBody({ host: "test.local", url: "/test" }), MessageGroupId: generateMessageGroupId("/test"), MessageDeduplicationId: "", }), cache.send({ - MessageBody: { host: "test.local", url: "/test" }, + MessageBody: generateMessageBody({ host: "test.local", url: "/test" }), MessageGroupId: generateMessageGroupId("/test"), MessageDeduplicationId: "", }), diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 379a94fdf..7b31632af 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -898,8 +898,8 @@ importers: specifier: 'catalog:' version: 1.31.0 '@opennextjs/aws': - specifier: https://pkg.pr.new/@opennextjs/aws@7e23eee - version: https://pkg.pr.new/@opennextjs/aws@7e23eee + specifier: https://pkg.pr.new/@opennextjs/aws@773 + version: https://pkg.pr.new/@opennextjs/aws@773 enquirer: specifier: ^2.4.1 version: 2.4.1 @@ -3809,8 +3809,8 @@ packages: resolution: {integrity: sha512-8H4FeoxeLb24N2iWO9H3Tp8ln16YG1V3c+gIzwi+5lc+PRie/5TEjNOd1x1LLc/O9s0P2i4JjEQiDk8MFBI4TA==} hasBin: true - '@opennextjs/aws@https://pkg.pr.new/@opennextjs/aws@7e23eee': - resolution: {tarball: https://pkg.pr.new/@opennextjs/aws@7e23eee} + '@opennextjs/aws@https://pkg.pr.new/@opennextjs/aws@773': + resolution: {tarball: https://pkg.pr.new/@opennextjs/aws@773} version: 3.5.2 hasBin: true @@ -13044,7 +13044,7 @@ snapshots: - aws-crt - supports-color - '@opennextjs/aws@https://pkg.pr.new/@opennextjs/aws@7e23eee': + '@opennextjs/aws@https://pkg.pr.new/@opennextjs/aws@773': dependencies: '@aws-sdk/client-cloudfront': 3.398.0 '@aws-sdk/client-dynamodb': 3.699.0 From 0a49f3cd3827f80a3dfd0ea787e4e15e4e3797b1 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Thu, 13 Mar 2025 16:23:26 +0100 Subject: [PATCH 5/7] add comment to env --- packages/cloudflare/src/api/cloudflare-context.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/cloudflare/src/api/cloudflare-context.ts b/packages/cloudflare/src/api/cloudflare-context.ts index 43526bb39..51aa29606 100644 --- a/packages/cloudflare/src/api/cloudflare-context.ts +++ b/packages/cloudflare/src/api/cloudflare-context.ts @@ -4,12 +4,19 @@ import type { DurableObjectQueueHandler } from "./durable-objects/queue"; declare global { interface CloudflareEnv { + // KV used for the incremental cache NEXT_CACHE_WORKERS_KV?: KVNamespace; + // D1 db used for the tag cache NEXT_CACHE_D1?: D1Database; + // D1 table to use for the tag cache for the tag/path mapping NEXT_CACHE_D1_TAGS_TABLE?: string; + // D1 table to use for the tag cache for storing the tag and their associated revalidation times NEXT_CACHE_D1_REVALIDATIONS_TABLE?: string; + // Service binding for the worker itself to be able to call itself from within the worker NEXT_CACHE_REVALIDATION_WORKER?: Service; + // Durable Object namespace to use for the durable object queue handler NEXT_CACHE_REVALIDATION_DURABLE_OBJECT?: DurableObjectNamespace; + // Asset binding ASSETS?: Fetcher; } } From e7de1bbd0069d3f13f3269b8a102b1af7fe53648 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Tue, 18 Mar 2025 21:46:59 +0100 Subject: [PATCH 6/7] review fix --- examples/e2e/app-router/open-next.config.ts | 1 - .../src/api/durable-objects/queue.spec.ts | 38 +++++++++---------- .../src/api/durable-objects/queue.ts | 32 +++++++--------- 3 files changed, 32 insertions(+), 39 deletions(-) diff --git a/examples/e2e/app-router/open-next.config.ts b/examples/e2e/app-router/open-next.config.ts index e4913befa..fe53a284f 100644 --- a/examples/e2e/app-router/open-next.config.ts +++ b/examples/e2e/app-router/open-next.config.ts @@ -1,7 +1,6 @@ import { defineCloudflareConfig } from "@opennextjs/cloudflare"; import d1TagCache from "@opennextjs/cloudflare/d1-tag-cache"; import kvIncrementalCache from "@opennextjs/cloudflare/kv-cache"; -// import memoryQueue from "@opennextjs/cloudflare/memory-queue"; import doQueue from "@opennextjs/cloudflare/durable-queue"; export default defineCloudflareConfig({ diff --git a/packages/cloudflare/src/api/durable-objects/queue.spec.ts b/packages/cloudflare/src/api/durable-objects/queue.spec.ts index 20efc40e4..e877d2f92 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.spec.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.spec.ts @@ -4,12 +4,10 @@ import { DurableObjectQueueHandler } from "./queue"; vi.mock("cloudflare:workers", () => ({ DurableObject: class { - ctx: DurableObjectState; - env: CloudflareEnv; - constructor(ctx: DurableObjectState, env: CloudflareEnv) { - this.ctx = ctx; - this.env = env; - } + constructor( + public ctx: DurableObjectState, + public env: CloudflareEnv + ) {} }, })); @@ -100,7 +98,7 @@ describe("DurableObjectQueue", () => { // the next one should block until one of the previous ones finishes const blockedReq = queue.revalidate(createMessage("id6")); - expect(queue.ongoingRevalidations.size).toBe(5); + expect(queue.ongoingRevalidations.size).toBe(queue.maxRevalidations); expect(queue.ongoingRevalidations.has("id6")).toBe(false); expect(Array.from(queue.ongoingRevalidations.keys())).toEqual(["id", "id2", "id3", "id4", "id5"]); @@ -203,14 +201,14 @@ describe("DurableObjectQueue", () => { it("should add an alarm if there are failed states", async () => { const queue = createDurableObjectQueue({ fetchDuration: 10 }); - queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarm: 1000 }); + queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 }); await queue.addAlarm(); expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(1000); }); it("should not add an alarm if there is already an alarm set", async () => { const queue = createDurableObjectQueue({ fetchDuration: 10 }); - queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarm: 1000 }); + queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 }); // @ts-expect-error queue.ctx.storage.getAlarm.mockResolvedValueOnce(1000); await queue.addAlarm(); @@ -219,8 +217,8 @@ describe("DurableObjectQueue", () => { it("should set the alarm to the lowest nextAlarm", async () => { const queue = createDurableObjectQueue({ fetchDuration: 10 }); - queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarm: 1000 }); - queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, nextAlarm: 500 }); + queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 }); + queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, nextAlarmMs: 500 }); await queue.addAlarm(); expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(500); }); @@ -238,7 +236,7 @@ describe("DurableObjectQueue", () => { it("should add a failed state with the correct nextAlarm", async () => { const queue = createDurableObjectQueue({ fetchDuration: 10 }); await queue.addToFailedState(createMessage("id")); - expect(queue.routeInFailedState.get("id")?.nextAlarm).toBeGreaterThan(Date.now()); + expect(queue.routeInFailedState.get("id")?.nextAlarmMs).toBeGreaterThan(Date.now()); expect(queue.routeInFailedState.get("id")?.retryCount).toBe(1); }); @@ -246,13 +244,13 @@ describe("DurableObjectQueue", () => { const queue = createDurableObjectQueue({ fetchDuration: 10 }); await queue.addToFailedState(createMessage("id")); await queue.addToFailedState(createMessage("id")); - expect(queue.routeInFailedState.get("id")?.nextAlarm).toBeGreaterThan(Date.now()); + expect(queue.routeInFailedState.get("id")?.nextAlarmMs).toBeGreaterThan(Date.now()); expect(queue.routeInFailedState.get("id")?.retryCount).toBe(2); }); it("should not add a failed state if it has been retried 6 times", async () => { const queue = createDurableObjectQueue({ fetchDuration: 10 }); - queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 6, nextAlarm: 1000 }); + queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 6, nextAlarmMs: 1000 }); await queue.addToFailedState(createMessage("id")); expect(queue.routeInFailedState.size).toBe(0); }); @@ -264,12 +262,12 @@ describe("DurableObjectQueue", () => { queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, - nextAlarm: Date.now() - 1000, + nextAlarmMs: Date.now() - 1000, }); queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, - nextAlarm: Date.now() - 1000, + nextAlarmMs: Date.now() - 1000, }); await queue.alarm(); expect(queue.routeInFailedState.size).toBe(0); @@ -281,12 +279,12 @@ describe("DurableObjectQueue", () => { queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, - nextAlarm: Date.now() + 1000, + nextAlarmMs: Date.now() + 1000, }); queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, - nextAlarm: Date.now() + 500, + nextAlarmMs: Date.now() + 500, }); await queue.alarm(); expect(queue.routeInFailedState.size).toBe(1); @@ -299,12 +297,12 @@ describe("DurableObjectQueue", () => { queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, - nextAlarm: Date.now() + 1000, + nextAlarmMs: Date.now() + 1000, }); queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, - nextAlarm: Date.now() - 1000, + nextAlarmMs: Date.now() - 1000, }); await queue.alarm(); expect(queue.routeInFailedState.size).toBe(0); diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts index 184e53d34..acd9e315b 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -24,7 +24,7 @@ export class DurableObjectQueueHandler extends DurableObject { // TODO: restore the state of the failed revalidations - Probably in the next PR where i'll add the storage routeInFailedState = new Map< string, - { msg: ExtendedQueueMessage; retryCount: number; nextAlarm: number } + { msg: ExtendedQueueMessage; retryCount: number; nextAlarmMs: number } >(); service: NonNullable; @@ -34,10 +34,9 @@ export class DurableObjectQueueHandler extends DurableObject { constructor(ctx: DurableObjectState, env: CloudflareEnv) { super(ctx, env); - const service = env.NEXT_CACHE_REVALIDATION_WORKER; + this.service = env.NEXT_CACHE_REVALIDATION_WORKER!; // If there is no service binding, we throw an error because we can't revalidate without it - if (!service) throw new IgnorableError("No service binding for cache revalidation worker"); - this.service = service; + if (!this.service) throw new IgnorableError("No service binding for cache revalidation worker"); } async revalidate(msg: ExtendedQueueMessage) { @@ -115,18 +114,16 @@ export class DurableObjectQueueHandler extends DurableObject { } override async alarm() { + const currentDateTime = Date.now(); // We fetch the first event that needs to be retried or if the date is expired const nextEventToRetry = Array.from(this.routeInFailedState.values()) - .filter((failing) => failing.nextAlarm > Date.now()) - .sort(({ nextAlarm: a }, { nextAlarm: b }) => a - b)[0]; + .filter(({ nextAlarmMs }) => nextAlarmMs > currentDateTime) + .sort(({ nextAlarmMs: a }, { nextAlarmMs: b }) => a - b)[0]; // We also have to check if there are expired events, if the revalidation takes too long, or if the const expiredEvents = Array.from(this.routeInFailedState.values()).filter( - ({ nextAlarm }) => nextAlarm <= Date.now() + ({ nextAlarmMs }) => nextAlarmMs <= currentDateTime ); - const allEventsToRetry = - nextEventToRetry && nextEventToRetry.nextAlarm > Date.now() - ? [nextEventToRetry, ...expiredEvents] - : expiredEvents; + const allEventsToRetry = nextEventToRetry ? [nextEventToRetry, ...expiredEvents] : expiredEvents; for (const event of allEventsToRetry) { await this.executeRevalidation(event.msg); this.routeInFailedState.delete(event.msg.MessageDeduplicationId); @@ -135,7 +132,6 @@ export class DurableObjectQueueHandler extends DurableObject { async addToFailedState(msg: ExtendedQueueMessage) { const existingFailedState = this.routeInFailedState.get(msg.MessageDeduplicationId); - let nextAlarm = Date.now() + 2_000; if (existingFailedState) { if (existingFailedState.retryCount >= 6) { @@ -146,17 +142,18 @@ export class DurableObjectQueueHandler extends DurableObject { this.routeInFailedState.delete(msg.MessageDeduplicationId); return; } - nextAlarm = Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * 2_000; + const nextAlarm = Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * 2_000; this.routeInFailedState.set(msg.MessageDeduplicationId, { ...existingFailedState, retryCount: existingFailedState.retryCount + 1, - nextAlarm, + nextAlarmMs: nextAlarm, }); } else { + const nextAlarm = Date.now() + 2_000; this.routeInFailedState.set(msg.MessageDeduplicationId, { msg, retryCount: 1, - nextAlarm, + nextAlarmMs: nextAlarm, }); } // We probably want to do something if routeInFailedState is becoming too big, at least log it @@ -168,9 +165,8 @@ export class DurableObjectQueueHandler extends DurableObject { if (existingAlarm) return; if (this.routeInFailedState.size === 0) return; - const nextAlarmToSetup = Array.from(this.routeInFailedState.values()).reduce( - (acc, { nextAlarm }) => Math.min(acc, nextAlarm), - Infinity + const nextAlarmToSetup = Math.min( + ...Array.from(this.routeInFailedState.values()).map(({ nextAlarmMs }) => nextAlarmMs) ); await this.ctx.storage.setAlarm(nextAlarmToSetup); } From b18d4fe69bb8a423385b957aa4895a9fbb52a784 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Wed, 19 Mar 2025 10:43:12 +0100 Subject: [PATCH 7/7] review fix --- packages/cloudflare/src/api/durable-objects/queue.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts index acd9e315b..ca6bb961a 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -142,18 +142,17 @@ export class DurableObjectQueueHandler extends DurableObject { this.routeInFailedState.delete(msg.MessageDeduplicationId); return; } - const nextAlarm = Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * 2_000; + const nextAlarmMs = Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * 2_000; this.routeInFailedState.set(msg.MessageDeduplicationId, { ...existingFailedState, retryCount: existingFailedState.retryCount + 1, - nextAlarmMs: nextAlarm, + nextAlarmMs, }); } else { - const nextAlarm = Date.now() + 2_000; this.routeInFailedState.set(msg.MessageDeduplicationId, { msg, retryCount: 1, - nextAlarmMs: nextAlarm, + nextAlarmMs: Date.now() + 2_000, }); } // We probably want to do something if routeInFailedState is becoming too big, at least log it