diff --git a/apps/desktop/src-tauri/src/api.rs b/apps/desktop/src-tauri/src/api.rs index 9511723a4a3..49a23a48eca 100644 --- a/apps/desktop/src-tauri/src/api.rs +++ b/apps/desktop/src-tauri/src/api.rs @@ -339,11 +339,18 @@ pub async fn signal_recording_complete( .map_err(|err| format!("api/signal_recording_complete/request: {err}"))?; if !resp.status().is_success() { - let status = resp.status().as_u16(); + let status = resp.status(); let error_body = resp .text() .await .unwrap_or_else(|_| "".to_string()); + if status == reqwest::StatusCode::CONFLICT + && error_body.contains("Muxing already in progress") + { + return Ok(()); + } + + let status = status.as_u16(); return Err(format!("api/signal_recording_complete/{status}: {error_body}").into()); } diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 9e9beda2044..3ab260174c9 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -1472,6 +1472,7 @@ impl SegmentUploader { { let mut signal_ok = false; + let mut last_signal_error = None::; for attempt in 0..3u32 { match api::signal_recording_complete(&app, &video_id).await { Ok(()) => { @@ -1479,10 +1480,12 @@ impl SegmentUploader { break; } Err(e) => { + let error = e.to_string(); warn!( attempt = attempt + 1, - "Failed to signal recording complete: {e}" + "Failed to signal recording complete: {error}" ); + last_signal_error = Some(error); if attempt < 2 { tokio::time::sleep(Duration::from_millis(1000 * (1 << attempt) as u64)) .await; @@ -1504,8 +1507,11 @@ impl SegmentUploader { emit_upload_complete(&app, &video_id); + let detail = last_signal_error + .map(|error| format!(": {error}")) + .unwrap_or_default(); return Err(format!( - "Failed to signal recording complete for {video_id} after 3 attempts" + "Failed to signal recording complete for {video_id} after 3 attempts{detail}" ) .into()); } diff --git a/apps/web/__tests__/unit/recording-complete-utils.test.ts b/apps/web/__tests__/unit/recording-complete-utils.test.ts new file mode 100644 index 00000000000..23f809124f8 --- /dev/null +++ b/apps/web/__tests__/unit/recording-complete-utils.test.ts @@ -0,0 +1,65 @@ +import { describe, expect, it } from "vitest"; +import { + getRecordingCompleteIdempotentResult, + isMaterializedDesktopRecordingSource, + isSegmentedRecordingSource, + RECORDING_COMPLETE_UNCLAIMABLE_PHASES, +} from "@/app/api/upload/[...route]/recording-complete-utils"; + +describe("recording complete utils", () => { + it("treats active mux phases as successful idempotent finalization", () => { + expect( + getRecordingCompleteIdempotentResult({ phase: "processing" }), + ).toEqual({ + success: true, + alreadyProcessing: true, + phase: "processing", + }); + expect( + getRecordingCompleteIdempotentResult({ + phase: "generating_thumbnail", + }), + ).toEqual({ + success: true, + alreadyProcessing: true, + phase: "generating_thumbnail", + }); + }); + + it("treats complete uploads as successful idempotent finalization", () => { + expect(getRecordingCompleteIdempotentResult({ phase: "complete" })).toEqual( + { + success: true, + alreadyComplete: true, + phase: "complete", + }, + ); + }); + + it("does not hide claimable or failed upload phases", () => { + expect(getRecordingCompleteIdempotentResult({ phase: "uploading" })).toBe( + null, + ); + expect(getRecordingCompleteIdempotentResult({ phase: "error" })).toBe(null); + expect(getRecordingCompleteIdempotentResult(null)).toBe(null); + }); + + it("keeps active and complete phases out of the mux claim update", () => { + expect(RECORDING_COMPLETE_UNCLAIMABLE_PHASES).toEqual([ + "processing", + "generating_thumbnail", + "complete", + ]); + }); + + it("recognizes segmented and already materialized desktop recordings", () => { + expect(isSegmentedRecordingSource({ type: "desktopSegments" })).toBe(true); + expect(isSegmentedRecordingSource({ type: "desktopMP4" })).toBe(false); + expect(isMaterializedDesktopRecordingSource({ type: "desktopMP4" })).toBe( + true, + ); + expect( + isMaterializedDesktopRecordingSource({ type: "desktopSegments" }), + ).toBe(false); + }); +}); diff --git a/apps/web/app/api/upload/[...route]/recording-complete-utils.ts b/apps/web/app/api/upload/[...route]/recording-complete-utils.ts new file mode 100644 index 00000000000..0158c3afde8 --- /dev/null +++ b/apps/web/app/api/upload/[...route]/recording-complete-utils.ts @@ -0,0 +1,58 @@ +type RecordingCompletePhase = + | "uploading" + | "processing" + | "generating_thumbnail" + | "complete" + | "error"; + +export const RECORDING_COMPLETE_UNCLAIMABLE_PHASES = [ + "processing", + "generating_thumbnail", + "complete", +] satisfies RecordingCompletePhase[]; + +export type RecordingCompleteIdempotentResult = + | { + success: true; + alreadyProcessing: true; + phase: "processing" | "generating_thumbnail"; + } + | { + success: true; + alreadyComplete: true; + phase: "complete"; + }; + +export function isSegmentedRecordingSource( + source: { type: string } | null | undefined, +) { + return source?.type === "desktopSegments"; +} + +export function isMaterializedDesktopRecordingSource( + source: { type: string } | null | undefined, +) { + return source?.type === "desktopMP4"; +} + +export function getRecordingCompleteIdempotentResult( + upload: { phase: RecordingCompletePhase } | null | undefined, +): RecordingCompleteIdempotentResult | null { + switch (upload?.phase) { + case "processing": + case "generating_thumbnail": + return { + success: true, + alreadyProcessing: true, + phase: upload.phase, + }; + case "complete": + return { + success: true, + alreadyComplete: true, + phase: upload.phase, + }; + default: + return null; + } +} diff --git a/apps/web/app/api/upload/[...route]/recording-complete.ts b/apps/web/app/api/upload/[...route]/recording-complete.ts index 463b21b6e38..e037dd862b2 100644 --- a/apps/web/app/api/upload/[...route]/recording-complete.ts +++ b/apps/web/app/api/upload/[...route]/recording-complete.ts @@ -12,6 +12,12 @@ import { invalidateGoogleDriveStorageQuotaCache } from "@/lib/google-drive-stora import { runPromise } from "@/lib/server"; import { decodeStorageVideo } from "@/lib/video-storage"; import { withAuth } from "../../utils"; +import { + getRecordingCompleteIdempotentResult, + isMaterializedDesktopRecordingSource, + isSegmentedRecordingSource, + RECORDING_COMPLETE_UNCLAIMABLE_PHASES, +} from "./recording-complete-utils"; const MEDIA_SERVER_PRESIGNED_GET_EXPIRES_SECONDS = 3 * 60 * 60; const MEDIA_SERVER_PRESIGNED_PUT_EXPIRES_SECONDS = 3 * 60 * 60; @@ -39,7 +45,15 @@ export const app = new Hono().post( return c.json({ error: "Video not found" }, 404); } - if (video.source?.type !== "desktopSegments") { + if (isMaterializedDesktopRecordingSource(video.source)) { + return c.json({ + success: true, + alreadyComplete: true, + source: "desktopMP4", + }); + } + + if (!isSegmentedRecordingSource(video.source)) { return c.json({ error: "Video is not a segmented recording" }, 400); } @@ -59,6 +73,66 @@ export const app = new Hono().post( } try { + const claimResult = await db() + .update(Db.videoUploads) + .set({ + phase: "processing", + processingProgress: 0, + processingMessage: "Muxing segments into MP4...", + updatedAt: new Date(), + }) + .where( + and( + eq(Db.videoUploads.videoId, videoId), + notInArray( + Db.videoUploads.phase, + RECORDING_COMPLETE_UNCLAIMABLE_PHASES, + ), + ), + ); + + if ((claimResult[0]?.affectedRows ?? 0) === 0) { + const [existing] = await db() + .select({ phase: Db.videoUploads.phase }) + .from(Db.videoUploads) + .where(eq(Db.videoUploads.videoId, videoId)); + const idempotentResult = getRecordingCompleteIdempotentResult(existing); + + if (idempotentResult) { + return c.json(idempotentResult); + } + + if (!existing) { + try { + await db().insert(Db.videoUploads).values({ + videoId, + phase: "processing", + processingProgress: 0, + processingMessage: "Muxing segments into MP4...", + }); + } catch (error) { + const [existingAfterRace] = await db() + .select({ phase: Db.videoUploads.phase }) + .from(Db.videoUploads) + .where(eq(Db.videoUploads.videoId, videoId)); + const raceResult = + getRecordingCompleteIdempotentResult(existingAfterRace); + + if (raceResult) { + return c.json(raceResult); + } + + console.error( + "[recording-complete] Failed to claim video upload for muxing:", + error, + ); + return c.json({ error: "Failed to claim recording complete" }, 500); + } + } else { + return c.json({ error: "Failed to claim recording complete" }, 500); + } + } + const muxPayload = await Effect.gen(function* () { const [bucket] = yield* Storage.getAccessForVideo( decodeStorageVideo(video), @@ -191,46 +265,6 @@ export const app = new Hono().post( }).pipe(runPromise); await invalidateGoogleDriveStorageQuotaCache(video.storageIntegrationId); - const claimResult = await db() - .update(Db.videoUploads) - .set({ - phase: "processing", - processingProgress: 0, - processingMessage: "Muxing segments into MP4...", - updatedAt: new Date(), - }) - .where( - and( - eq(Db.videoUploads.videoId, videoId), - notInArray(Db.videoUploads.phase, [ - "processing", - "generating_thumbnail", - ]), - ), - ); - - if (claimResult[0].affectedRows === 0) { - const [existing] = await db() - .select({ phase: Db.videoUploads.phase }) - .from(Db.videoUploads) - .where(eq(Db.videoUploads.videoId, videoId)); - - if (existing) { - return c.json({ error: "Muxing already in progress" }, 409); - } - - try { - await db().insert(Db.videoUploads).values({ - videoId, - phase: "processing", - processingProgress: 0, - processingMessage: "Muxing segments into MP4...", - }); - } catch { - return c.json({ error: "Muxing already in progress" }, 409); - } - } - const webhookBaseUrl = serverEnv().MEDIA_SERVER_WEBHOOK_URL || serverEnv().WEB_URL; const webhookUrl = `${webhookBaseUrl}/api/webhooks/media-server/progress`;