-
Notifications
You must be signed in to change notification settings - Fork 1.5k
fix: make recording completion idempotent #1836
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -339,11 +339,18 @@ pub async fn signal_recording_complete( | |
| .map_err(|err| format!("api/signal_recording_complete/request: {err}"))?; | ||
|
|
||
| if !resp.status().is_success() { | ||
| let status = resp.status().as_u16(); | ||
| let status = resp.status(); | ||
| let error_body = resp | ||
| .text() | ||
| .await | ||
| .unwrap_or_else(|_| "<no response body>".to_string()); | ||
| if status == reqwest::StatusCode::CONFLICT | ||
| && error_body.contains("Muxing already in progress") | ||
|
Comment on lines
+342
to
+348
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The check Prompt To Fix With AIThis is a comment left during a code review.
Path: apps/desktop/src-tauri/src/api.rs
Line: 342-348
Comment:
**Brittle backward-compat string match**
The check `error_body.contains("Muxing already in progress")` hard-codes the exact server error message. If that message is ever edited or the server returns a localized/structured JSON body instead of plain text, the condition silently stops matching: the response is treated as an ordinary error, the client retries three times, and the upload fails. Since the updated server now returns HTTP 200 for this case, this path is only exercised against older deployments — but it would be worth at minimum adding a comment that documents what server version this bridges, so a future maintainer knows when it can be removed.
How can I resolve this? If you propose a fix, please make it concise. |
||
| { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let status = status.as_u16(); | ||
| return Err(format!("api/signal_recording_complete/{status}: {error_body}").into()); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| import { describe, expect, it } from "vitest"; | ||
| import { | ||
| getRecordingCompleteIdempotentResult, | ||
| isMaterializedDesktopRecordingSource, | ||
| isSegmentedRecordingSource, | ||
| RECORDING_COMPLETE_UNCLAIMABLE_PHASES, | ||
| } from "@/app/api/upload/[...route]/recording-complete-utils"; | ||
|
|
||
| describe("recording complete utils", () => { | ||
| it("treats active mux phases as successful idempotent finalization", () => { | ||
| expect( | ||
| getRecordingCompleteIdempotentResult({ phase: "processing" }), | ||
| ).toEqual({ | ||
| success: true, | ||
| alreadyProcessing: true, | ||
| phase: "processing", | ||
| }); | ||
| expect( | ||
| getRecordingCompleteIdempotentResult({ | ||
| phase: "generating_thumbnail", | ||
| }), | ||
| ).toEqual({ | ||
| success: true, | ||
| alreadyProcessing: true, | ||
| phase: "generating_thumbnail", | ||
| }); | ||
| }); | ||
|
|
||
| it("treats complete uploads as successful idempotent finalization", () => { | ||
| expect(getRecordingCompleteIdempotentResult({ phase: "complete" })).toEqual( | ||
| { | ||
| success: true, | ||
| alreadyComplete: true, | ||
| phase: "complete", | ||
| }, | ||
| ); | ||
| }); | ||
|
|
||
| it("does not hide claimable or failed upload phases", () => { | ||
| expect(getRecordingCompleteIdempotentResult({ phase: "uploading" })).toBe( | ||
| null, | ||
| ); | ||
| expect(getRecordingCompleteIdempotentResult({ phase: "error" })).toBe(null); | ||
| expect(getRecordingCompleteIdempotentResult(null)).toBe(null); | ||
| }); | ||
|
|
||
| it("keeps active and complete phases out of the mux claim update", () => { | ||
| expect(RECORDING_COMPLETE_UNCLAIMABLE_PHASES).toEqual([ | ||
| "processing", | ||
| "generating_thumbnail", | ||
| "complete", | ||
| ]); | ||
| }); | ||
|
|
||
| it("recognizes segmented and already materialized desktop recordings", () => { | ||
| expect(isSegmentedRecordingSource({ type: "desktopSegments" })).toBe(true); | ||
| expect(isSegmentedRecordingSource({ type: "desktopMP4" })).toBe(false); | ||
| expect(isMaterializedDesktopRecordingSource({ type: "desktopMP4" })).toBe( | ||
| true, | ||
| ); | ||
| expect( | ||
| isMaterializedDesktopRecordingSource({ type: "desktopSegments" }), | ||
| ).toBe(false); | ||
| }); | ||
| }); |
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,58 @@ | ||||||||||||||
| type RecordingCompletePhase = | ||||||||||||||
| | "uploading" | ||||||||||||||
| | "processing" | ||||||||||||||
| | "generating_thumbnail" | ||||||||||||||
| | "complete" | ||||||||||||||
| | "error"; | ||||||||||||||
|
|
||||||||||||||
| export const RECORDING_COMPLETE_UNCLAIMABLE_PHASES = [ | ||||||||||||||
| "processing", | ||||||||||||||
| "generating_thumbnail", | ||||||||||||||
| "complete", | ||||||||||||||
| ] satisfies RecordingCompletePhase[]; | ||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Small TS typing nit:
Suggested change
|
||||||||||||||
|
|
||||||||||||||
| export type RecordingCompleteIdempotentResult = | ||||||||||||||
| | { | ||||||||||||||
| success: true; | ||||||||||||||
| alreadyProcessing: true; | ||||||||||||||
| phase: "processing" | "generating_thumbnail"; | ||||||||||||||
| } | ||||||||||||||
| | { | ||||||||||||||
| success: true; | ||||||||||||||
| alreadyComplete: true; | ||||||||||||||
| phase: "complete"; | ||||||||||||||
| }; | ||||||||||||||
|
|
||||||||||||||
| export function isSegmentedRecordingSource( | ||||||||||||||
| source: { type: string } | null | undefined, | ||||||||||||||
| ) { | ||||||||||||||
| return source?.type === "desktopSegments"; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| export function isMaterializedDesktopRecordingSource( | ||||||||||||||
| source: { type: string } | null | undefined, | ||||||||||||||
| ) { | ||||||||||||||
| return source?.type === "desktopMP4"; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| export function getRecordingCompleteIdempotentResult( | ||||||||||||||
| upload: { phase: RecordingCompletePhase } | null | undefined, | ||||||||||||||
| ): RecordingCompleteIdempotentResult | null { | ||||||||||||||
| switch (upload?.phase) { | ||||||||||||||
| case "processing": | ||||||||||||||
| case "generating_thumbnail": | ||||||||||||||
| return { | ||||||||||||||
| success: true, | ||||||||||||||
| alreadyProcessing: true, | ||||||||||||||
| phase: upload.phase, | ||||||||||||||
| }; | ||||||||||||||
| case "complete": | ||||||||||||||
| return { | ||||||||||||||
| success: true, | ||||||||||||||
| alreadyComplete: true, | ||||||||||||||
| phase: upload.phase, | ||||||||||||||
| }; | ||||||||||||||
| default: | ||||||||||||||
| return null; | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,12 @@ import { invalidateGoogleDriveStorageQuotaCache } from "@/lib/google-drive-stora | |
| import { runPromise } from "@/lib/server"; | ||
| import { decodeStorageVideo } from "@/lib/video-storage"; | ||
| import { withAuth } from "../../utils"; | ||
| import { | ||
| getRecordingCompleteIdempotentResult, | ||
| isMaterializedDesktopRecordingSource, | ||
| isSegmentedRecordingSource, | ||
| RECORDING_COMPLETE_UNCLAIMABLE_PHASES, | ||
| } from "./recording-complete-utils"; | ||
|
|
||
| const MEDIA_SERVER_PRESIGNED_GET_EXPIRES_SECONDS = 3 * 60 * 60; | ||
| const MEDIA_SERVER_PRESIGNED_PUT_EXPIRES_SECONDS = 3 * 60 * 60; | ||
|
|
@@ -39,7 +45,15 @@ export const app = new Hono().post( | |
| return c.json({ error: "Video not found" }, 404); | ||
| } | ||
|
|
||
| if (video.source?.type !== "desktopSegments") { | ||
| if (isMaterializedDesktopRecordingSource(video.source)) { | ||
| return c.json({ | ||
| success: true, | ||
| alreadyComplete: true, | ||
| source: "desktopMP4", | ||
| }); | ||
| } | ||
|
|
||
| if (!isSegmentedRecordingSource(video.source)) { | ||
| return c.json({ error: "Video is not a segmented recording" }, 400); | ||
| } | ||
|
|
||
|
|
@@ -59,6 +73,66 @@ export const app = new Hono().post( | |
| } | ||
|
|
||
| try { | ||
| const claimResult = await db() | ||
| .update(Db.videoUploads) | ||
| .set({ | ||
| phase: "processing", | ||
| processingProgress: 0, | ||
| processingMessage: "Muxing segments into MP4...", | ||
| updatedAt: new Date(), | ||
| }) | ||
| .where( | ||
| and( | ||
| eq(Db.videoUploads.videoId, videoId), | ||
| notInArray( | ||
| Db.videoUploads.phase, | ||
| RECORDING_COMPLETE_UNCLAIMABLE_PHASES, | ||
| ), | ||
| ), | ||
| ); | ||
|
|
||
| if ((claimResult[0]?.affectedRows ?? 0) === 0) { | ||
| const [existing] = await db() | ||
| .select({ phase: Db.videoUploads.phase }) | ||
| .from(Db.videoUploads) | ||
| .where(eq(Db.videoUploads.videoId, videoId)); | ||
| const idempotentResult = getRecordingCompleteIdempotentResult(existing); | ||
|
|
||
| if (idempotentResult) { | ||
| return c.json(idempotentResult); | ||
| } | ||
|
|
||
| if (!existing) { | ||
| try { | ||
| await db().insert(Db.videoUploads).values({ | ||
| videoId, | ||
| phase: "processing", | ||
| processingProgress: 0, | ||
| processingMessage: "Muxing segments into MP4...", | ||
| }); | ||
| } catch (error) { | ||
| const [existingAfterRace] = await db() | ||
| .select({ phase: Db.videoUploads.phase }) | ||
| .from(Db.videoUploads) | ||
| .where(eq(Db.videoUploads.videoId, videoId)); | ||
| const raceResult = | ||
| getRecordingCompleteIdempotentResult(existingAfterRace); | ||
|
|
||
| if (raceResult) { | ||
| return c.json(raceResult); | ||
| } | ||
|
|
||
| console.error( | ||
| "[recording-complete] Failed to claim video upload for muxing:", | ||
| error, | ||
| ); | ||
| return c.json({ error: "Failed to claim recording complete" }, 500); | ||
| } | ||
| } else { | ||
| return c.json({ error: "Failed to claim recording complete" }, 500); | ||
| } | ||
|
Comment on lines
+131
to
+133
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This Prompt To Fix With AIThis is a comment left during a code review.
Path: apps/web/app/api/upload/[...route]/recording-complete.ts
Line: 131-133
Comment:
**Unreachable-in-theory branch returns an opaque 500**
This `else` arm fires only when the `UPDATE` returned 0 affected rows yet the follow-up `SELECT` found a row whose phase is neither idempotent (`processing` / `generating_thumbnail` / `complete`) nor absent. In normal operation that means the row transitioned from `processing` → `error` between the UPDATE and the SELECT (a tight race with the media-server webhook). Because `error` is claimable, the Rust client's next retry attempt would successfully claim and re-mux — but the 500 response here is indistinguishable from a genuine server failure. Adding a log with the actual `existing.phase` would make this edge case much easier to diagnose in production.
How can I resolve this? If you propose a fix, please make it concise. |
||
| } | ||
|
|
||
| const muxPayload = await Effect.gen(function* () { | ||
| const [bucket] = yield* Storage.getAccessForVideo( | ||
| decodeStorageVideo(video), | ||
|
|
@@ -191,46 +265,6 @@ export const app = new Hono().post( | |
| }).pipe(runPromise); | ||
| await invalidateGoogleDriveStorageQuotaCache(video.storageIntegrationId); | ||
|
|
||
| const claimResult = await db() | ||
| .update(Db.videoUploads) | ||
| .set({ | ||
| phase: "processing", | ||
| processingProgress: 0, | ||
| processingMessage: "Muxing segments into MP4...", | ||
| updatedAt: new Date(), | ||
| }) | ||
| .where( | ||
| and( | ||
| eq(Db.videoUploads.videoId, videoId), | ||
| notInArray(Db.videoUploads.phase, [ | ||
| "processing", | ||
| "generating_thumbnail", | ||
| ]), | ||
| ), | ||
| ); | ||
|
|
||
| if (claimResult[0].affectedRows === 0) { | ||
| const [existing] = await db() | ||
| .select({ phase: Db.videoUploads.phase }) | ||
| .from(Db.videoUploads) | ||
| .where(eq(Db.videoUploads.videoId, videoId)); | ||
|
|
||
| if (existing) { | ||
| return c.json({ error: "Muxing already in progress" }, 409); | ||
| } | ||
|
|
||
| try { | ||
| await db().insert(Db.videoUploads).values({ | ||
| videoId, | ||
| phase: "processing", | ||
| processingProgress: 0, | ||
| processingMessage: "Muxing segments into MP4...", | ||
| }); | ||
| } catch { | ||
| return c.json({ error: "Muxing already in progress" }, 409); | ||
| } | ||
| } | ||
|
|
||
| const webhookBaseUrl = | ||
| serverEnv().MEDIA_SERVER_WEBHOOK_URL || serverEnv().WEB_URL; | ||
| const webhookUrl = `${webhookBaseUrl}/api/webhooks/media-server/progress`; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String-matching the error body feels a bit brittle here (any wording change would make desktop start erroring again). If
409is only used to signal the idempotent "already processing" case, I’d treat all409s as OK.