From 66b277ea08c7213e618aa139a32be22d8de82cc0 Mon Sep 17 00:00:00 2001 From: David Lange Date: Wed, 17 Jun 2026 18:35:24 -0400 Subject: [PATCH 1/2] Add AssemblyAI universal-3-5-pro with conversation-context carryover MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Upgrade pipecat-ai to >=1.4.0, which adds the universal-3-5-pro streaming model (Universal-3 Pro family) and AssemblyAI conversation-context carryover (`agent_context` Settings seed + `AssemblyAISTTService.update_agent_context()`). In a standard pipecat bot, carryover is automatic: the assistant context aggregator emits `LLMContextAssistantTurnFrame`, the upstream STT picks it up via `_process_assistant_turn()`, and the AssemblyAI override forwards it to `update_agent_context()`. EVA's cascade pipeline drives the agent turn through a custom `BenchmarkAgentProcessor` that pushes `TTSSpeakFrame` directly and never emits the standard LLM response frames, so the aggregation is empty and that frame is not produced. We therefore trigger carryover explicitly. - services.py: add `update_stt_agent_context()` helper — forwards the agent's reply to STT when it exposes `update_agent_context` (AssemblyAI U3 Pro), no-op otherwise. The existing Settings-forwarding already passes `model`, `agent_context`, and `previous_context_n_turns` (opt-out) through from config. - pipecat_server.py: call the helper from the cascade `on_assistant_response` hook so each agent reply seeds STT before the user's next turn. Calling it alongside the auto-path is idempotent (update_agent_context replaces). - .env.example: document the AssemblyAI universal-3-5-pro config + carryover. - tests: AssemblyAI tests use universal-3-5-pro, assert carryover Settings forwarding, and cover the helper (forward / no-op / empty / None). Verified: full unit suite passes on pipecat 1.4.0 (1765 passed); universal-3-5-pro is recognized as a U3 Pro model (U3_PRO_MODEL_PREFIXES) so carryover applies. Co-Authored-By: Claude Opus 4.8 (1M context) --- .env.example | 3 ++ pyproject.toml | 2 +- src/eva/assistant/pipecat_server.py | 13 +++++-- src/eva/assistant/pipeline/services.py | 22 ++++++++++++ tests/unit/assistant/test_services.py | 49 +++++++++++++++++++++++--- uv.lock | 24 ++++++++++--- 6 files changed, 100 insertions(+), 13 deletions(-) diff --git a/.env.example b/.env.example index 8ba38784..0affb423 100644 --- a/.env.example +++ b/.env.example @@ -75,6 +75,9 @@ EVA_MODEL__STT=cartesia #i STT provider parameters. Must include "api_key" and "model". Use "urls" for round-robin load balancing. #i Some providers also accept extra provider-specific tuning parameters here. +#i AssemblyAI example (Universal-3 Pro family — conversation-context carryover is applied automatically +#i so the agent's last reply improves transcription of the user's next turn; set "previous_context_n_turns": 0 to disable): +#i EVA_MODEL__STT_PARAMS='{"api_key": "your_assemblyai_api_key", "model": "universal-3-5-pro"}' #d json_object #x pipeline_mode=LLM EVA_MODEL__STT_PARAMS='{"api_key": "your_cartesia_api_key", "model": "ink-whisper"}' diff --git a/pyproject.toml b/pyproject.toml index 53568336..2b2ce35e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ license = "MIT" dependencies = [ "pydantic>=2.0", "pydantic-settings>=2.0", - "pipecat-ai>=1.0.0", + "pipecat-ai>=1.4.0", "elevenlabs>=1.0.0", "openai>=2.36.0", "anthropic>=0.83.0", diff --git a/src/eva/assistant/pipecat_server.py b/src/eva/assistant/pipecat_server.py index 58698825..b84df840 100644 --- a/src/eva/assistant/pipecat_server.py +++ b/src/eva/assistant/pipecat_server.py @@ -53,6 +53,7 @@ create_realtime_llm_service, create_stt_service, create_tts_service, + update_stt_agent_context, ) from eva.assistant.pipeline.turn_config import ( create_turn_start_strategy, @@ -408,9 +409,15 @@ async def on_user_transcription(text: str, timestamp: str, turn_id: int | None) llm_client=llm_client, output_dir=self.output_dir, ) - agent_processor.on_assistant_response = lambda msg: self._save_transcript_message_from_turn( - role="assistant", content=msg, timestamp=self._current_iso_timestamp() - ) + async def on_assistant_response(msg: str) -> None: + await self._save_transcript_message_from_turn( + role="assistant", content=msg, timestamp=self._current_iso_timestamp() + ) + # Carry the agent's reply into STT as conversation context so it improves + # transcription of the user's next turn (AssemblyAI Universal-3 Pro; no-op otherwise). + await update_stt_agent_context(stt, msg) + + agent_processor.on_assistant_response = on_assistant_response self.agentic_system = agent_processor.agentic_system # Create pipeline diff --git a/src/eva/assistant/pipeline/services.py b/src/eva/assistant/pipeline/services.py index 798f7101..838e9fef 100644 --- a/src/eva/assistant/pipeline/services.py +++ b/src/eva/assistant/pipeline/services.py @@ -285,6 +285,28 @@ def create_stt_service( ) +async def update_stt_agent_context(stt: STTService | None, text: str) -> None: + """Feed the agent's latest spoken reply back to the STT service as conversation context. + + AssemblyAI's Universal-3 Pro streaming models (e.g. ``u3-rt-pro``, ``universal-3-5-pro``) + support *context carryover*: seeding the agent's most recent reply improves transcription + of the user's next turn (short answers, spelled-out entities, disambiguation). Pipecat + exposes this via ``AssemblyAISTTService.update_agent_context()``. + + In a standard pipecat bot this fires automatically (the assistant context aggregator emits + ``LLMContextAssistantTurnFrame`` and the upstream STT picks it up). EVA's cascade pipeline + drives the agent turn through a custom processor that pushes ``TTSSpeakFrame`` directly and + does not emit the standard LLM response frames, so we trigger the update explicitly from the + assistant-response hook instead. + + No-op for STT services without the capability (Deepgram, Cartesia, …) and for non-U3-Pro + AssemblyAI models, where pipecat treats the call as a safe no-op. + """ + update = getattr(stt, "update_agent_context", None) + if update is not None and text: + await update(text) + + def create_tts_service( model: str | None, params: dict[str, Any] | None = None, diff --git a/tests/unit/assistant/test_services.py b/tests/unit/assistant/test_services.py index 7ed97c93..eea2cdd2 100644 --- a/tests/unit/assistant/test_services.py +++ b/tests/unit/assistant/test_services.py @@ -1,6 +1,6 @@ """Tests for assistant/pipeline/services.py — service factory functions.""" -from unittest.mock import MagicMock +from unittest.mock import AsyncMock, MagicMock import pytest @@ -9,6 +9,7 @@ _resolve_url, create_stt_service, create_tts_service, + update_stt_agent_context, ) @@ -54,16 +55,16 @@ def test_unknown_model_raises_with_available_list(self): create_stt_service("nonexistent_provider", params={"api_key": "k"}) def test_assemblyai_service_created(self): - svc = create_stt_service("assemblyai", params={"api_key": "k", "model": "u3-rt-pro"}) + svc = create_stt_service("assemblyai", params={"api_key": "k", "model": "universal-3-5-pro"}) assert "AssemblyAI" in type(svc).__name__ - assert svc._settings.model == "u3-rt-pro" + assert svc._settings.model == "universal-3-5-pro" def test_assemblyai_forwards_optional_settings(self): svc = create_stt_service( "assemblyai", params={ "api_key": "k", - "model": "u3-rt-pro", + "model": "universal-3-5-pro", "vad_threshold": 0.1, "min_turn_silence": 120, }, @@ -71,9 +72,23 @@ def test_assemblyai_forwards_optional_settings(self): assert svc._settings.vad_threshold == 0.1 assert svc._settings.min_turn_silence == 120 + def test_assemblyai_forwards_context_carryover_settings(self): + """Conversation-context carryover settings (pipecat >= 1.4.0) forward through.""" + svc = create_stt_service( + "assemblyai", + params={ + "api_key": "k", + "model": "universal-3-5-pro", + "agent_context": "Booking confirmed for flight AA123.", + "previous_context_n_turns": 0, + }, + ) + assert svc._settings.agent_context == "Booking confirmed for flight AA123." + assert svc._settings.previous_context_n_turns == 0 + def test_assemblyai_ignores_unspecified_settings(self): """Keys absent from params must not be forwarded, so library defaults apply.""" - svc = create_stt_service("assemblyai", params={"api_key": "k", "model": "u3-rt-pro"}) + svc = create_stt_service("assemblyai", params={"api_key": "k", "model": "universal-3-5-pro"}) assert svc._settings.vad_threshold is None def test_nvidia_requires_url(self): @@ -127,6 +142,30 @@ def test_cartesia_service_created(self): assert "Cartesia" in type(svc).__name__ +class TestUpdateSttAgentContext: + """Conversation-context carryover hook (AssemblyAI Universal-3 Pro).""" + + async def test_forwards_text_when_supported(self): + stt = MagicMock() + stt.update_agent_context = AsyncMock() + await update_stt_agent_context(stt, "The agent's latest reply.") + stt.update_agent_context.assert_awaited_once_with("The agent's latest reply.") + + async def test_noop_when_capability_absent(self): + """STT services without the method (Deepgram, Cartesia, …) are skipped silently.""" + stt = MagicMock(spec=[]) # no attributes → getattr returns None + await update_stt_agent_context(stt, "ignored") # must not raise + + async def test_noop_for_empty_text(self): + stt = MagicMock() + stt.update_agent_context = AsyncMock() + await update_stt_agent_context(stt, "") + stt.update_agent_context.assert_not_awaited() + + async def test_noop_for_none_stt(self): + await update_stt_agent_context(None, "anything") # must not raise + + class TestCreateTtsService: def test_none_disables_tts(self): assert create_tts_service(None) is None diff --git a/uv.lock b/uv.lock index 8d7e2839..0eb4c1f0 100644 --- a/uv.lock +++ b/uv.lock @@ -803,7 +803,7 @@ requires-dist = [ { name = "openai", specifier = ">=2.36.0" }, { name = "pandas", specifier = ">=2.0" }, { name = "pandas", marker = "extra == 'apps'", specifier = ">=2.0" }, - { name = "pipecat-ai", specifier = ">=1.0.0" }, + { name = "pipecat-ai", specifier = ">=1.4.0" }, { name = "plotly", marker = "extra == 'apps'", specifier = ">=5.0" }, { name = "pre-commit", marker = "extra == 'dev'", specifier = ">=4.0" }, { name = "pydantic", specifier = ">=2.0" }, @@ -2135,7 +2135,7 @@ wheels = [ [[package]] name = "pipecat-ai" -version = "1.3.0" +version = "1.4.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "aiofiles" }, @@ -2153,13 +2153,17 @@ dependencies = [ { name = "protobuf" }, { name = "pydantic" }, { name = "pyloudnorm" }, + { name = "pyyaml" }, + { name = "pyyaml-include" }, { name = "resampy" }, { name = "soxr" }, + { name = "typing-extensions" }, { name = "wait-for2", marker = "python_full_version < '3.12'" }, + { name = "websockets" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/92/cc/8352e30c47ee4fd075b9a0f3d91183294582578f07289180a30fc8228409/pipecat_ai-1.3.0.tar.gz", hash = "sha256:abeb9d95b1df2f35b855334cda1899fc603124d5448967c82ba08a94c604318f", size = 11260868, upload-time = "2026-05-29T01:03:00.532Z" } +sdist = { url = "https://files.pythonhosted.org/packages/d5/ed/4bcf67703996ac6a0896dc64232e79fdfcb23eccff36929a7a234587a085/pipecat_ai-1.4.0.tar.gz", hash = "sha256:4d1e68da79e0632dcdaf66581fbf07e840249dfb715dc6e57b423d20fc3520d7", size = 11505429, upload-time = "2026-06-17T03:27:48.83Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/b6/73/fff17b48cd254ad1c358d612ce92fcc342838e0fc43cc235aca3c70527fb/pipecat_ai-1.3.0-py3-none-any.whl", hash = "sha256:59d4950a61a0a201cf551354d8cdec038c1bdf457df080cd41d29d7ff53e0b2e", size = 10905336, upload-time = "2026-05-29T01:02:57.764Z" }, + { url = "https://files.pythonhosted.org/packages/7e/ab/9acf81effd93f6f6d1ededbe5db8fedb50d196363d75188dc21d8aa71d5a/pipecat_ai-1.4.0-py3-none-any.whl", hash = "sha256:d7e1f2ee6f2646720daed3a028835d0e26e5e93787d0424ab00f4c7b6034a170", size = 11171447, upload-time = "2026-06-17T03:27:46.367Z" }, ] [[package]] @@ -2650,6 +2654,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/73/e8/2bdf3ca2090f68bb3d75b44da7bbc71843b19c9f2b9cb9b0f4ab7a5a4329/pyyaml-6.0.3-cp313-cp313-win_arm64.whl", hash = "sha256:5498cd1645aa724a7c71c8f378eb29ebe23da2fc0d7a08071d89469bf1d2defb", size = 140246, upload-time = "2025-09-25T21:32:34.663Z" }, ] +[[package]] +name = "pyyaml-include" +version = "1.4.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pyyaml" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/7f/be/2d07ad85e3d593d69640876a8686eae2c533db8cb7bf298d25c421b4d2d5/pyyaml-include-1.4.1.tar.gz", hash = "sha256:1a96e33a99a3e56235f5221273832464025f02ff3d8539309a3bf00dec624471", size = 20592, upload-time = "2024-03-25T14:56:43.748Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d5/ca/6a2cc3a73170d10b5af1f1613baa2ed1f8f46f62dd0bfab2bffd2c2fe260/pyyaml_include-1.4.1-py3-none-any.whl", hash = "sha256:323c7f3a19c82fbc4d73abbaab7ef4f793e146a13383866831631b26ccc7fb00", size = 19079, upload-time = "2024-03-25T14:56:41.274Z" }, +] + [[package]] name = "rapidfuzz" version = "3.14.3" From 495841ab4f681874d15b9a69237c88fe5c2659d6 Mon Sep 17 00:00:00 2001 From: David Lange Date: Wed, 17 Jun 2026 20:15:16 -0400 Subject: [PATCH 2/2] Plumb vad_force_turn_endpoint for AssemblyAI STT vad_force_turn_endpoint is an AssemblyAISTTService constructor arg, not a Settings field, so the dataclass-field forwarding in create_stt_service() does not carry it and it was stuck at the pipecat default. Thread it explicitly from EVA_MODEL__STT_PARAMS (default True = Pipecat-mode: force the endpoint on Silero VAD stop; False lets AssemblyAI's server-side min/max_turn_silence decide). The Settings-level tuning fields (vad_threshold, min_turn_silence, max_turn_silence) already forward via the existing dataclass introspection. - .env.example: document the tuned AssemblyAI example (vad_threshold=0.1, min_turn_silence=100, max_turn_silence=100, vad_force_turn_endpoint=true). - tests: assert vad_force_turn_endpoint defaults True and is overridable, and that vad_threshold/min_turn_silence/max_turn_silence forward. Co-Authored-By: Claude Opus 4.8 (1M context) --- .env.example | 6 ++++-- src/eva/assistant/pipeline/services.py | 5 +++++ tests/unit/assistant/test_services.py | 18 ++++++++++++++++-- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/.env.example b/.env.example index 0affb423..b5355331 100644 --- a/.env.example +++ b/.env.example @@ -76,8 +76,10 @@ EVA_MODEL__STT=cartesia #i STT provider parameters. Must include "api_key" and "model". Use "urls" for round-robin load balancing. #i Some providers also accept extra provider-specific tuning parameters here. #i AssemblyAI example (Universal-3 Pro family — conversation-context carryover is applied automatically -#i so the agent's last reply improves transcription of the user's next turn; set "previous_context_n_turns": 0 to disable): -#i EVA_MODEL__STT_PARAMS='{"api_key": "your_assemblyai_api_key", "model": "universal-3-5-pro"}' +#i so the agent's last reply improves transcription of the user's next turn; set "previous_context_n_turns": 0 to disable). +#i Tuning fields (vad_threshold, min_turn_silence, max_turn_silence) forward to AssemblyAISTTService.Settings; +#i vad_force_turn_endpoint (default true = Pipecat forces the endpoint on VAD stop) is a constructor arg: +#i EVA_MODEL__STT_PARAMS='{"api_key": "your_assemblyai_api_key", "model": "universal-3-5-pro", "vad_threshold": 0.1, "min_turn_silence": 100, "max_turn_silence": 100, "vad_force_turn_endpoint": true}' #d json_object #x pipeline_mode=LLM EVA_MODEL__STT_PARAMS='{"api_key": "your_cartesia_api_key", "model": "ink-whisper"}' diff --git a/src/eva/assistant/pipeline/services.py b/src/eva/assistant/pipeline/services.py index 838e9fef..7b7f87df 100644 --- a/src/eva/assistant/pipeline/services.py +++ b/src/eva/assistant/pipeline/services.py @@ -143,9 +143,14 @@ def create_stt_service( assemblyai_settings_kwargs = { k: params[k] for f in dataclasses.fields(AssemblyAISTTService.Settings) if (k := f.name) in params } + # vad_force_turn_endpoint is a constructor arg, not a Settings field, so the dataclass + # forwarding above won't carry it — thread it explicitly. Default True = Pipecat-mode + # (force the endpoint on Silero VAD stop); set False to let AssemblyAI's own server-side + # turn detection (min_turn_silence/max_turn_silence) decide turn ends. return AssemblyAISTTService( api_key=api_key, sample_rate=SAMPLE_RATE, + vad_force_turn_endpoint=params.get("vad_force_turn_endpoint", True), settings=AssemblyAISTTService.Settings( language=_to_language_enum(language_code), **assemblyai_settings_kwargs, diff --git a/tests/unit/assistant/test_services.py b/tests/unit/assistant/test_services.py index eea2cdd2..b7f4395e 100644 --- a/tests/unit/assistant/test_services.py +++ b/tests/unit/assistant/test_services.py @@ -66,11 +66,25 @@ def test_assemblyai_forwards_optional_settings(self): "api_key": "k", "model": "universal-3-5-pro", "vad_threshold": 0.1, - "min_turn_silence": 120, + "min_turn_silence": 100, + "max_turn_silence": 100, }, ) assert svc._settings.vad_threshold == 0.1 - assert svc._settings.min_turn_silence == 120 + assert svc._settings.min_turn_silence == 100 + assert svc._settings.max_turn_silence == 100 + + def test_assemblyai_vad_force_turn_endpoint_defaults_true(self): + """Constructor arg (not a Settings field) — defaults to pipecat-mode (True).""" + svc = create_stt_service("assemblyai", params={"api_key": "k", "model": "universal-3-5-pro"}) + assert svc._vad_force_turn_endpoint is True + + def test_assemblyai_vad_force_turn_endpoint_overridable(self): + svc = create_stt_service( + "assemblyai", + params={"api_key": "k", "model": "universal-3-5-pro", "vad_force_turn_endpoint": False}, + ) + assert svc._vad_force_turn_endpoint is False def test_assemblyai_forwards_context_carryover_settings(self): """Conversation-context carryover settings (pipecat >= 1.4.0) forward through."""