Skip to content

Commit 249de21

Browse files
fix: persist WebSocket streaming messages to disk (#174)
* fix: persist WebSocket streaming messages to disk The WebSocket streaming path (get_chatbot_reply_stream) updates the in-memory session history but never persists it to disk. The REST endpoint already does this via BackgroundTasks, but the WebSocket handler was missing the equivalent call. Added persist_session() after each WebSocket exchange completes, using asyncio.to_thread() to avoid blocking the event loop. Fixes #173 * test: add unit test for WebSocket session persistence --------- Co-authored-by: Bervianto Leo Pratama <[email protected]>
1 parent 665373b commit 249de21

3 files changed

Lines changed: 31 additions & 0 deletions

File tree

chatbot-core/api/routes/chatbot.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ async def chatbot_stream(websocket: WebSocket, session_id: str):
134134
json.dumps({"end": True})
135135
)
136136

137+
asyncio.create_task(asyncio.to_thread(persist_session, session_id))
138+
137139
except WebSocketDisconnect:
138140
logger.info(
139141
"WebSocket disconnected for session %s",

chatbot-core/tests/unit/mocks/test_env.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ def mock_get_chatbot_reply_stream(mocker):
6262
"""Mock the get_chatbot_reply_stream function."""
6363
return mocker.patch("api.routes.chatbot.get_chatbot_reply_stream")
6464

65+
@pytest.fixture
66+
def mock_persist_session(mocker):
67+
"""Mock the persist_session function."""
68+
return mocker.patch("api.routes.chatbot.persist_session")
69+
6570

6671
@pytest.fixture
6772
def mock_process_uploaded_file(mocker):

chatbot-core/tests/unit/routes/test_chatbot.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,3 +156,27 @@ def test_websocket_invalid_session_returns_error_and_closes(
156156
with client.websocket_connect("/sessions/bad-session/stream") as ws:
157157
error = ws.receive_json()
158158
assert error == {"error": "Session not found"}
159+
160+
161+
def test_websocket_persist_session_called_after_streaming(
162+
client, mock_session_exists, mock_get_chatbot_reply_stream, mock_persist_session
163+
):
164+
"""persist_session must be called after a WebSocket streaming exchange completes.
165+
166+
Before fix: the WebSocket path updated in-memory history but never
167+
persisted to disk — messages were lost on server restart.
168+
After fix: asyncio.create_task runs persist_session in the background.
169+
"""
170+
mock_session_exists.return_value = True
171+
172+
async def fake_stream(_session_id, _message):
173+
yield "token"
174+
175+
mock_get_chatbot_reply_stream.side_effect = fake_stream
176+
177+
with client.websocket_connect("/sessions/test-session-id/stream") as ws:
178+
ws.send_json({"message": "Hello"})
179+
ws.receive_json() # token
180+
ws.receive_json() # end marker
181+
182+
mock_persist_session.assert_called_with("test-session-id")

0 commit comments

Comments
 (0)