Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ EVA_MODEL__STT=cartesia

#i STT provider parameters. Must include "api_key" and "model". Use "urls" for round-robin load balancing.
#i Some providers also accept extra provider-specific tuning parameters here.
#i AssemblyAI example (Universal-3 Pro family — conversation-context carryover is applied automatically
#i so the agent's last reply improves transcription of the user's next turn; set "previous_context_n_turns": 0 to disable).
#i Tuning fields (vad_threshold, min_turn_silence, max_turn_silence) forward to AssemblyAISTTService.Settings;
#i vad_force_turn_endpoint (default true = Pipecat forces the endpoint on VAD stop) is a constructor arg:
#i EVA_MODEL__STT_PARAMS='{"api_key": "your_assemblyai_api_key", "model": "universal-3-5-pro", "vad_threshold": 0.1, "min_turn_silence": 100, "max_turn_silence": 100, "vad_force_turn_endpoint": true}'
#d json_object
#x pipeline_mode=LLM
EVA_MODEL__STT_PARAMS='{"api_key": "your_cartesia_api_key", "model": "ink-whisper"}'
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ license = "MIT"
dependencies = [
"pydantic>=2.0",
"pydantic-settings>=2.0",
"pipecat-ai>=1.0.0",
"pipecat-ai>=1.4.0",
"elevenlabs>=1.0.0",
"openai>=2.36.0",
"anthropic>=0.83.0",
Expand Down
13 changes: 10 additions & 3 deletions src/eva/assistant/pipecat_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
create_realtime_llm_service,
create_stt_service,
create_tts_service,
update_stt_agent_context,
)
from eva.assistant.pipeline.turn_config import (
create_turn_start_strategy,
Expand Down Expand Up @@ -408,9 +409,15 @@ async def on_user_transcription(text: str, timestamp: str, turn_id: int | None)
llm_client=llm_client,
output_dir=self.output_dir,
)
agent_processor.on_assistant_response = lambda msg: self._save_transcript_message_from_turn(
role="assistant", content=msg, timestamp=self._current_iso_timestamp()
)
async def on_assistant_response(msg: str) -> None:
await self._save_transcript_message_from_turn(
role="assistant", content=msg, timestamp=self._current_iso_timestamp()
)
# Carry the agent's reply into STT as conversation context so it improves
# transcription of the user's next turn (AssemblyAI Universal-3 Pro; no-op otherwise).
await update_stt_agent_context(stt, msg)

agent_processor.on_assistant_response = on_assistant_response
self.agentic_system = agent_processor.agentic_system

# Create pipeline
Expand Down
27 changes: 27 additions & 0 deletions src/eva/assistant/pipeline/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,14 @@ def create_stt_service(
assemblyai_settings_kwargs = {
k: params[k] for f in dataclasses.fields(AssemblyAISTTService.Settings) if (k := f.name) in params
}
# vad_force_turn_endpoint is a constructor arg, not a Settings field, so the dataclass
# forwarding above won't carry it — thread it explicitly. Default True = Pipecat-mode
# (force the endpoint on Silero VAD stop); set False to let AssemblyAI's own server-side
# turn detection (min_turn_silence/max_turn_silence) decide turn ends.
return AssemblyAISTTService(
api_key=api_key,
sample_rate=SAMPLE_RATE,
vad_force_turn_endpoint=params.get("vad_force_turn_endpoint", True),
settings=AssemblyAISTTService.Settings(
language=_to_language_enum(language_code),
**assemblyai_settings_kwargs,
Expand Down Expand Up @@ -285,6 +290,28 @@ def create_stt_service(
)


async def update_stt_agent_context(stt: STTService | None, text: str) -> None:
"""Feed the agent's latest spoken reply back to the STT service as conversation context.

AssemblyAI's Universal-3 Pro streaming models (e.g. ``u3-rt-pro``, ``universal-3-5-pro``)
support *context carryover*: seeding the agent's most recent reply improves transcription
of the user's next turn (short answers, spelled-out entities, disambiguation). Pipecat
exposes this via ``AssemblyAISTTService.update_agent_context()``.

In a standard pipecat bot this fires automatically (the assistant context aggregator emits
``LLMContextAssistantTurnFrame`` and the upstream STT picks it up). EVA's cascade pipeline
drives the agent turn through a custom processor that pushes ``TTSSpeakFrame`` directly and
does not emit the standard LLM response frames, so we trigger the update explicitly from the
assistant-response hook instead.

No-op for STT services without the capability (Deepgram, Cartesia, …) and for non-U3-Pro
AssemblyAI models, where pipecat treats the call as a safe no-op.
"""
update = getattr(stt, "update_agent_context", None)
if update is not None and text:
await update(text)


def create_tts_service(
model: str | None,
params: dict[str, Any] | None = None,
Expand Down
67 changes: 60 additions & 7 deletions tests/unit/assistant/test_services.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Tests for assistant/pipeline/services.py — service factory functions."""

from unittest.mock import MagicMock
from unittest.mock import AsyncMock, MagicMock

import pytest

Expand All @@ -9,6 +9,7 @@
_resolve_url,
create_stt_service,
create_tts_service,
update_stt_agent_context,
)


Expand Down Expand Up @@ -54,26 +55,54 @@ def test_unknown_model_raises_with_available_list(self):
create_stt_service("nonexistent_provider", params={"api_key": "k"})

def test_assemblyai_service_created(self):
svc = create_stt_service("assemblyai", params={"api_key": "k", "model": "u3-rt-pro"})
svc = create_stt_service("assemblyai", params={"api_key": "k", "model": "universal-3-5-pro"})
assert "AssemblyAI" in type(svc).__name__
assert svc._settings.model == "u3-rt-pro"
assert svc._settings.model == "universal-3-5-pro"

def test_assemblyai_forwards_optional_settings(self):
svc = create_stt_service(
"assemblyai",
params={
"api_key": "k",
"model": "u3-rt-pro",
"model": "universal-3-5-pro",
"vad_threshold": 0.1,
"min_turn_silence": 120,
"min_turn_silence": 100,
"max_turn_silence": 100,
},
)
assert svc._settings.vad_threshold == 0.1
assert svc._settings.min_turn_silence == 120
assert svc._settings.min_turn_silence == 100
assert svc._settings.max_turn_silence == 100

def test_assemblyai_vad_force_turn_endpoint_defaults_true(self):
"""Constructor arg (not a Settings field) — defaults to pipecat-mode (True)."""
svc = create_stt_service("assemblyai", params={"api_key": "k", "model": "universal-3-5-pro"})
assert svc._vad_force_turn_endpoint is True

def test_assemblyai_vad_force_turn_endpoint_overridable(self):
svc = create_stt_service(
"assemblyai",
params={"api_key": "k", "model": "universal-3-5-pro", "vad_force_turn_endpoint": False},
)
assert svc._vad_force_turn_endpoint is False

def test_assemblyai_forwards_context_carryover_settings(self):
"""Conversation-context carryover settings (pipecat >= 1.4.0) forward through."""
svc = create_stt_service(
"assemblyai",
params={
"api_key": "k",
"model": "universal-3-5-pro",
"agent_context": "Booking confirmed for flight AA123.",
"previous_context_n_turns": 0,
},
)
assert svc._settings.agent_context == "Booking confirmed for flight AA123."
assert svc._settings.previous_context_n_turns == 0

def test_assemblyai_ignores_unspecified_settings(self):
"""Keys absent from params must not be forwarded, so library defaults apply."""
svc = create_stt_service("assemblyai", params={"api_key": "k", "model": "u3-rt-pro"})
svc = create_stt_service("assemblyai", params={"api_key": "k", "model": "universal-3-5-pro"})
assert svc._settings.vad_threshold is None

def test_nvidia_requires_url(self):
Expand Down Expand Up @@ -127,6 +156,30 @@ def test_cartesia_service_created(self):
assert "Cartesia" in type(svc).__name__


class TestUpdateSttAgentContext:
"""Conversation-context carryover hook (AssemblyAI Universal-3 Pro)."""

async def test_forwards_text_when_supported(self):
stt = MagicMock()
stt.update_agent_context = AsyncMock()
await update_stt_agent_context(stt, "The agent's latest reply.")
stt.update_agent_context.assert_awaited_once_with("The agent's latest reply.")

async def test_noop_when_capability_absent(self):
"""STT services without the method (Deepgram, Cartesia, …) are skipped silently."""
stt = MagicMock(spec=[]) # no attributes → getattr returns None
await update_stt_agent_context(stt, "ignored") # must not raise

async def test_noop_for_empty_text(self):
stt = MagicMock()
stt.update_agent_context = AsyncMock()
await update_stt_agent_context(stt, "")
stt.update_agent_context.assert_not_awaited()

async def test_noop_for_none_stt(self):
await update_stt_agent_context(None, "anything") # must not raise


class TestCreateTtsService:
def test_none_disables_tts(self):
assert create_tts_service(None) is None
Expand Down
24 changes: 20 additions & 4 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.