Skip to content

Commit aa36c2b

Browse files
authored
Docker: Fix recording shutdown for node docker (#3105)
Signed-off-by: Viet Nguyen Duc <[email protected]>
1 parent fa9a905 commit aa36c2b

2 files changed

Lines changed: 31 additions & 6 deletions

File tree

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,7 +1158,7 @@ test_node_docker: hub standalone_docker standalone_chrome standalone_firefox sta
11581158
echo VIDEO_TAG=$(FFMPEG_TAG_VERSION)-$(BUILD_DATE) >> .env ; \
11591159
echo TEST_DRAIN_AFTER_SESSION_COUNT=$(or $(TEST_DRAIN_AFTER_SESSION_COUNT), 0) >> .env ; \
11601160
echo TEST_PARALLEL_HARDENING=$(or $(TEST_PARALLEL_HARDENING), "false") >> .env ; \
1161-
echo LOG_LEVEL=$(or $(LOG_LEVEL), "FINE") >> .env ; \
1161+
echo LOG_LEVEL=$(or $(LOG_LEVEL), "INFO") >> .env ; \
11621162
echo REQUEST_TIMEOUT=$(or $(REQUEST_TIMEOUT), 300) >> .env ; \
11631163
echo SELENIUM_ENABLE_MANAGED_DOWNLOADS=$(or $(SELENIUM_ENABLE_MANAGED_DOWNLOADS), "false") >> .env ; \
11641164
echo TEST_DELAY_AFTER_TEST=$(or $(TEST_DELAY_AFTER_TEST), 2) >> .env ; \
@@ -1175,14 +1175,14 @@ test_node_docker: hub standalone_docker standalone_chrome standalone_firefox sta
11751175
else \
11761176
echo HOST_IP=127.0.0.1 >> .env ; \
11771177
fi; \
1178+
BASIC_AUTH_USER=admin ; \
1179+
BASIC_AUTH_PASSWORD=admin ; \
11781180
if [ "$(PLATFORMS)" = "linux/amd64" ]; then \
11791181
NODE_EDGE=edge ; \
11801182
NODE_CHROME=chrome ; \
11811183
else \
11821184
NODE_EDGE=chromium ; \
11831185
NODE_CHROME=chromium ; \
1184-
BASIC_AUTH_USER=admin ; \
1185-
BASIC_AUTH_PASSWORD=admin ; \
11861186
fi; \
11871187
echo BASIC_AUTH_USER=$${BASIC_AUTH_USER} >> .env ; \
11881188
echo BASIC_AUTH_PASSWORD=$${BASIC_AUTH_PASSWORD} >> .env ; \

Video/video_service.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ def __init__(self):
163163
self.upload_opts = os.environ.get("SE_UPLOAD_OPTS", "-P --cutoff-mode SOFT --metadata --inplace")
164164
self.retain_local = os.environ.get("SE_UPLOAD_RETAIN_LOCAL_FILE", "false").lower() == "true"
165165
self.upload_batch_size = int(os.environ.get("SE_VIDEO_UPLOAD_BATCH_CHECK", "10"))
166+
self.upload_timeout = int(os.environ.get("SE_VIDEO_UPLOAD_TIMEOUT", "300"))
166167
self.upload_failure_only = os.environ.get("SE_UPLOAD_FAILURE_SESSION_ONLY", "false").lower() == "true"
167168
default_failure_events = [":failure", ":failed"]
168169
custom_failure_events = os.environ.get("SE_UPLOAD_FAILURE_SESSION_EVENTS", "").lower()
@@ -225,6 +226,9 @@ def __init__(self):
225226
self.recorder_done = asyncio.Event()
226227
self.uploader_done = asyncio.Event()
227228

229+
# Tracked delayed-cleanup tasks so they can be cancelled on shutdown
230+
self._cleanup_tasks: List[asyncio.Task] = []
231+
228232
# Rename SE_RCLONE_* env vars
229233
self._rename_rclone_env()
230234

@@ -588,7 +592,13 @@ async def process_upload(self, task: UploadTask) -> None:
588592
)
589593
self.active_uploads.append(proc)
590594
try:
591-
stdout, stderr = await proc.communicate()
595+
try:
596+
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=self.upload_timeout)
597+
except asyncio.TimeoutError:
598+
logger.warning(f"Upload timed out after {self.upload_timeout}s: {task.video_file}, killing process")
599+
proc.kill()
600+
await proc.communicate()
601+
return
592602
finally:
593603
try:
594604
self.active_uploads.remove(proc)
@@ -713,8 +723,11 @@ async def handle_session_closed(self, data: dict) -> None:
713723
await self.stop_recording(session)
714724
await self.queue_upload(session)
715725

716-
# Clean up session after a delay (keep for potential late events)
717-
asyncio.create_task(self._cleanup_session_delayed(session_id, delay=60))
726+
# Clean up session after a delay (keep for potential late events).
727+
# Tracked so cleanup() can cancel these on shutdown instead of waiting 60s.
728+
t = asyncio.create_task(self._cleanup_session_delayed(session_id, delay=60))
729+
self._cleanup_tasks.append(t)
730+
t.add_done_callback(lambda fut: self._cleanup_tasks.remove(fut) if fut in self._cleanup_tasks else None)
718731

719732
# Check drain condition
720733
if self.max_sessions > 0 and self.recorded_count >= self.max_sessions:
@@ -814,6 +827,10 @@ async def subscribe_events(self) -> None:
814827
if await self.subscriber.poll(timeout=1000):
815828
frames = await self.subscriber.recv_multipart()
816829

830+
# Re-check shutdown before spending time processing the event
831+
if self.shutdown_event.is_set():
832+
break
833+
817834
if len(frames) < 4:
818835
continue
819836

@@ -892,6 +909,14 @@ async def cleanup(self) -> None:
892909
"""Cleanup all resources."""
893910
logger.info("Shutting down...")
894911

912+
# Cancel delayed session-cleanup tasks immediately — they have a 60s
913+
# sleep that would keep the event loop alive long after shutdown.
914+
for t in list(self._cleanup_tasks):
915+
t.cancel()
916+
if self._cleanup_tasks:
917+
await asyncio.gather(*self._cleanup_tasks, return_exceptions=True)
918+
self._cleanup_tasks.clear()
919+
895920
# Snapshot active sessions outside the lock so we don't hold
896921
# sessions_lock across slow awaits (stop_recording can take up to 10s).
897922
async with self.sessions_lock:

0 commit comments

Comments
 (0)