From 77e858cce98e696ebfcad59c25564794551c1ba0 Mon Sep 17 00:00:00 2001 From: "wei.zhong1" Date: Tue, 9 Jun 2026 19:34:16 -0700 Subject: [PATCH 1/5] Add Deepgram Voice Agent framework support Adds a `deepgram` assistant-server framework backed by Deepgram's Voice Agent API (unified STT->LLM->TTS over a single WebSocket), so it can be benchmarked like the existing S2S frameworks. - New DeepgramAssistantServer (src/eva/assistant/deepgram_server.py), modeled on the Gemini Live server and the assistant_server_contract. - Register `deepgram` in worker._get_server_class and the framework Literal. - Parse raw WebSocket JSON by event `type` rather than the SDK's typed iterator, which in deepgram-sdk 6.1.x mis-deserializes every agent event as the same model (dropping transcripts and tool-call requests). - KeepAlive task to prevent Deepgram's ~10s input-audio timeout from closing the session while the (half-duplex) agent is speaking. - Compute model_response latency from the server-side receipt time of user_speech_stop (the simulator emits it on a monotonic clock). - Unit tests for settings/tool conversion + framework dispatch test. - Docs section in assistant_server_contract.md; .env.example framework enum. - Bump simulation_version 2.0.0 -> 2.1.0 (affects benchmark outputs). --- .env.example | 2 +- docs/assistant_server_contract.md | 28 + src/eva/__init__.py | 2 +- src/eva/assistant/deepgram_server.py | 532 ++++++++++++++++++ src/eva/models/config.py | 3 +- src/eva/orchestrator/worker.py | 6 +- tests/unit/assistant/test_deepgram_server.py | 103 ++++ .../orchestrator/test_framework_dispatch.py | 6 + 8 files changed, 678 insertions(+), 4 deletions(-) create mode 100644 src/eva/assistant/deepgram_server.py create mode 100644 tests/unit/assistant/test_deepgram_server.py diff --git a/.env.example b/.env.example index dd78e6ef..d84bc222 100644 --- a/.env.example +++ b/.env.example @@ -110,7 +110,7 @@ EVA_MODEL__TTS_PARAMS='{"api_key": "your_cartesia_api_key", "model": "sonic"}' # --- Framework (S2S / AudioLLM) --- #i Base framework for S2S or AudioLLM pipelines. #d enum -#e pipecat,openai_realtime,gemini_live,elevenlabs,grok_voice +#e pipecat,openai_realtime,gemini_live,elevenlabs,grok_voice,deepgram #v EVA_FRAMEWORK=openai_realtime # ============================================== diff --git a/docs/assistant_server_contract.md b/docs/assistant_server_contract.md index 62124ac5..b2363572 100644 --- a/docs/assistant_server_contract.md +++ b/docs/assistant_server_contract.md @@ -535,3 +535,31 @@ the run to fail or produce `None` latency fields in the result. | `audio_assistant.wav` | Yes | TTS quality metrics | | `framework_logs.jsonl` | Yes | Turn boundary metrics | | `pipecat_metrics.jsonl` | Yes | `model_response_latency` in `ConversationResult` | + +--- + +## 13. Reference implementation: Deepgram Voice Agent + +`src/eva/assistant/deepgram_server.py` (`framework: deepgram`) bridges to Deepgram's +**Voice Agent API** (unified STT→LLM→TTS over one WebSocket) via the `deepgram-sdk` +`client.agent.v1.connect()` interface. It is the closest analogue to the Gemini Live +server and a good template for a new S2S framework. + +Notable points specific to Deepgram: + +- **Config.** `framework: deepgram`, `model: {s2s: deepgram, s2s_params: {...}}`. Recognised + `s2s_params`: `api_key` (required), `think_provider` (default `open_ai`), + `think_model` / `model` (LLM + metrics label, default `gpt-4o-mini`), + `listen_model` (STT, default `nova-3`), `speak_model` (TTS, default `aura-2-thalia-en`), + `language` (default `en`). +- **Settings.** Sent once on connect via `send_settings(AgentV1Settings)`. Built from a plain + dict and validated with `AgentV1Settings.model_validate(...)`, which resolves the + discriminated provider unions. Audio is `linear16` @ 24 kHz both directions with output + `container: "none"` (raw PCM); `agent.greeting` carries `INITIAL_MESSAGE`. +- **Tools.** Configured under `agent.think.functions` (no `endpoint` ⇒ *client-side*), so the + agent emits `FunctionCallRequest` events; reply with `send_function_call_response`. +- **Events.** `async for message in connection` yields raw `bytes` (TTS audio) or typed events + (`ConversationText`, `UserStartedSpeaking`, `AgentStartedSpeaking`, `AgentAudioDone`, + `FunctionCallRequest`, `Error`, `Warning`). +- **Limitation.** The Voice Agent event stream exposes no token-usage event, so token usage is + not reported for this framework. Latency is still emitted on the first audio chunk per turn. diff --git a/src/eva/__init__.py b/src/eva/__init__.py index ecc5f2a8..94accf13 100644 --- a/src/eva/__init__.py +++ b/src/eva/__init__.py @@ -7,7 +7,7 @@ # Bump simulation_version when changes affect benchmark outputs (agent code, # user simulator, orchestrator, simulation prompts, agent configs, tool mocks). -simulation_version = "2.0.0" +simulation_version = "2.1.0" # Bump metrics_version when changes affect metric computation (metrics code, # judge prompts, pricing tables, postprocessor). diff --git a/src/eva/assistant/deepgram_server.py b/src/eva/assistant/deepgram_server.py new file mode 100644 index 00000000..bd1343d2 --- /dev/null +++ b/src/eva/assistant/deepgram_server.py @@ -0,0 +1,532 @@ +"""Deepgram Voice Agent AssistantServer for EVA-Bench. + +Bridges between the Twilio-framed WebSocket (user simulator) and Deepgram's +**Voice Agent API** (a unified STT -> LLM -> TTS agent over a single WebSocket) +via the ``deepgram-sdk`` ``client.agent.v1.connect()`` interface. Audio flows: + + User simulator (8 kHz mulaw) + -> 24 kHz PCM16 -> Deepgram agent input + Deepgram agent output (24 kHz PCM16) + -> 8 kHz mulaw -> User simulator + +All tool calls are executed locally via ``ToolExecutor`` (the agent is configured +with *client-side* functions, so Deepgram emits ``FunctionCallRequest`` events and +we reply with ``send_function_call_response``). ``ConversationText`` events populate +the audit log. + +Note: the Voice Agent event stream does not expose token usage, so token usage is +not reported for this framework (latency is still emitted on the first audio chunk +of each turn). +""" + +from __future__ import annotations + +import asyncio +import contextlib +import json +import time +from typing import Any + +import uvicorn +from deepgram import AsyncDeepgramClient +from deepgram.agent.v1.types.agent_v1send_function_call_response import AgentV1SendFunctionCallResponse +from deepgram.agent.v1.types.agent_v1settings import AgentV1Settings +from fastapi import FastAPI, WebSocket, WebSocketDisconnect + +from eva.assistant.audio_bridge import ( + FrameworkLogWriter, + MetricsLogWriter, + create_twilio_media_message, + mulaw_8k_to_pcm16_24k, + parse_twilio_media_message, + pcm16_24k_to_mulaw_8k, + sync_buffer_to_position, +) +from eva.assistant.base_server import INITIAL_MESSAGE, AbstractAssistantServer +from eva.models.agents import AgentConfig +from eva.utils.logging import get_logger +from eva.utils.prompt_manager import PromptManager + +logger = get_logger(__name__) + +# Deepgram agent runs at 24 kHz PCM16 in both directions (matches the recording rate). +_RECORDING_SAMPLE_RATE = 24000 + +# Audio output pacing: send 160-byte mulaw chunks (20ms at 8kHz) at real-time rate +# so the user simulator's silence detection works correctly. +MULAW_CHUNK_SIZE = 160 # bytes per chunk (20ms at 8kHz, 1 byte per sample) +MULAW_CHUNK_DURATION_S = 0.02 # 20ms per chunk + +# Send a KeepAlive at least this often so Deepgram's ~10s input-audio timeout never +# fires during user silence (e.g. while the agent is speaking). +KEEPALIVE_INTERVAL_S = 5.0 + +# Defaults for the Voice Agent listen/think/speak providers (overridable via s2s_params). +_DEFAULT_LISTEN_MODEL = "nova-3" +_DEFAULT_THINK_PROVIDER = "open_ai" +_DEFAULT_THINK_MODEL = "gpt-4o-mini" +_DEFAULT_SPEAK_MODEL = "aura-2-thalia-en" +_DEFAULT_LANGUAGE = "en" + + +def _agent_tools_to_deepgram(agent: AgentConfig) -> list[dict[str, Any]] | None: + """Convert EVA AgentConfig tools to Deepgram ``think.functions`` (client-side). + + Omitting ``endpoint`` marks each function as client-side, so the agent emits a + ``FunctionCallRequest`` event instead of calling an HTTP endpoint itself. + """ + if not agent.tools: + return None + + functions: list[dict[str, Any]] = [] + for tool in agent.tools: + functions.append( + { + "name": tool.function_name, + "description": f"{tool.name}: {tool.description}", + "parameters": { + "type": "object", + "properties": tool.get_parameter_properties(), + "required": tool.get_required_param_names(), + }, + } + ) + return functions or None + + +class DeepgramAssistantServer(AbstractAssistantServer): + """Bridges Twilio WebSocket <-> Deepgram Voice Agent API for EVA-Bench evaluation.""" + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + + # Recording sample rate (Deepgram agent runs at 24 kHz) + self._audio_sample_rate = _RECORDING_SAMPLE_RATE + + s2s_params = self.pipeline_config.s2s_params or {} + self._api_key: str = s2s_params.get("api_key", "") + # ``think_model`` is the LLM driving the agent; used as the metrics label. + # Accept ``model`` as an alias for the contract's "model required" convention. + self._think_model: str = s2s_params.get("think_model") or s2s_params.get("model") or _DEFAULT_THINK_MODEL + self._model = self._think_model + self._think_provider: str = s2s_params.get("think_provider", _DEFAULT_THINK_PROVIDER) + self._listen_model: str = s2s_params.get("listen_model", _DEFAULT_LISTEN_MODEL) + self._speak_model: str = s2s_params.get("speak_model", _DEFAULT_SPEAK_MODEL) + self._language: str = s2s_params.get("language", _DEFAULT_LANGUAGE) + + # Build system prompt (same pattern as the other realtime/S2S servers) + prompt_manager = PromptManager() + self._system_prompt = prompt_manager.get_prompt( + "realtime_agent.system_prompt", + agent_personality=self.agent.description, + agent_instructions=self.agent.instructions, + datetime=self.current_date_time, + ) + + self._functions = _agent_tools_to_deepgram(self.agent) + + # ------------------------------------------------------------------ + # Server lifecycle + # ------------------------------------------------------------------ + + async def start(self) -> None: + """Start the FastAPI WebSocket server (non-blocking).""" + if self._running: + logger.warning("Server already running") + return + + if not self._api_key: + raise ValueError("API key required for Deepgram Voice Agent (set s2s_params.api_key)") + + self.output_dir.mkdir(parents=True, exist_ok=True) + self._fw_log = FrameworkLogWriter(self.output_dir) + self._metrics_log = MetricsLogWriter(self.output_dir) + + self._app = FastAPI() + + @self._app.websocket("/ws") + async def websocket_endpoint(websocket: WebSocket) -> None: + await websocket.accept() + await self._handle_session(websocket) + + @self._app.websocket("/") + async def websocket_root(websocket: WebSocket) -> None: + await websocket.accept() + await self._handle_session(websocket) + + config = uvicorn.Config( + self._app, + host="0.0.0.0", + port=self.port, + log_level="warning", + lifespan="off", + ) + self._server = uvicorn.Server(config) + self._running = True + self._server_task = asyncio.create_task(self._server.serve()) + + while not self._server.started: + await asyncio.sleep(0.01) + + logger.info(f"Deepgram agent server started on ws://localhost:{self.port}") + + async def _shutdown(self) -> None: + """Stop the Deepgram agent server.""" + if not self._running: + return + self._running = False + + if self._server: + self._server.should_exit = True + if self._server_task: + try: + await asyncio.wait_for(self._server_task, timeout=5.0) + except TimeoutError: + self._server_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._server_task + except (asyncio.CancelledError, KeyboardInterrupt): + pass + self._server = None + self._server_task = None + + logger.info(f"Deepgram agent server stopped on port {self.port}") + + # ------------------------------------------------------------------ + # Settings + # ------------------------------------------------------------------ + + def _build_settings(self) -> AgentV1Settings: + """Build the Voice Agent ``Settings`` message. + + Constructed from a plain dict and validated into the typed model; pydantic + resolves the discriminated provider unions and produces the correct wire JSON. + """ + think: dict[str, Any] = { + "provider": {"type": self._think_provider, "model": self._think_model}, + "prompt": self._system_prompt, + } + if self._functions: + think["functions"] = self._functions + + settings_dict: dict[str, Any] = { + "type": "Settings", + "audio": { + "input": {"encoding": "linear16", "sample_rate": self._audio_sample_rate}, + "output": {"encoding": "linear16", "sample_rate": self._audio_sample_rate, "container": "none"}, + }, + "agent": { + "language": self._language, + "greeting": INITIAL_MESSAGE, + "listen": {"provider": {"type": "deepgram", "model": self._listen_model}}, + "think": think, + "speak": {"provider": {"type": "deepgram", "model": self._speak_model}}, + }, + } + return AgentV1Settings.model_validate(settings_dict) + + # ------------------------------------------------------------------ + # Session handler + # ------------------------------------------------------------------ + + async def _handle_session(self, websocket: WebSocket) -> None: + """Bridge a single Twilio WebSocket session with the Deepgram Voice Agent.""" + logger.info("Client connected to Deepgram agent server") + # start() always instantiates these before a session can connect; bind to + # locals so the narrowed (non-None) type is visible inside the nested tasks. + assert self._fw_log is not None and self._metrics_log is not None + fw_log = self._fw_log + metrics_log = self._metrics_log + + stream_sid: str = self.conversation_id + twilio_connected = True + + # Per-turn assistant text accumulated from ConversationText(role=assistant) + _assistant_turn_text: list[str] = [] + + _in_model_turn = False + _user_speaking = False + _user_speech_start_ts: str | None = None # From the simulator's VAD + _user_speech_stop_ts: str | None = None # From the simulator's VAD + _assistant_turn_start_ts: str | None = None # Wall-clock ms of first audio chunk + + # Outbound mulaw chunks; drained by the pacer at real-time rate. + audio_output_queue: asyncio.Queue[bytes] = asyncio.Queue() + + client = AsyncDeepgramClient(api_key=self._api_key) + settings = self._build_settings() + + try: + async with client.agent.v1.connect() as connection: + logger.info(f"Deepgram agent session connected (think_model={self._think_model})") + await connection.send_settings(settings) + fw_log.turn_start() + + # ----- Concurrent tasks ----- + async def _forward_user_audio() -> None: + """Read Twilio WS messages, convert audio, send to Deepgram.""" + nonlocal stream_sid, twilio_connected, _user_speech_start_ts, _user_speech_stop_ts + try: + while twilio_connected and self._running: + try: + raw = await asyncio.wait_for(websocket.receive_text(), timeout=1.0) + except TimeoutError: + continue + + try: + msg = json.loads(raw) + except json.JSONDecodeError: + continue + + event = msg.get("event") + if event == "start": + stream_sid = msg.get("start", {}).get("streamSid", stream_sid) + logger.info(f"Twilio stream started: {stream_sid}") + elif event == "stop": + logger.info("Twilio stream stopped") + twilio_connected = False + break + elif event == "user_speech_start": + _user_speech_start_ts = msg.get("timestamp_ms") + elif event == "user_speech_stop": + # Record our own wall-clock receipt time rather than the event's + # timestamp_ms: the simulator sends user_speech_stop on a monotonic + # clock (unlike the wall-clock user_speech_start), so its value can't + # be diffed against the wall-clock first-audio time. The event arrives + # in ~real time over the local socket, so receipt time is accurate. + _user_speech_stop_ts = str(int(time.time() * 1000)) + elif event == "media": + mulaw_bytes = parse_twilio_media_message(raw) + if mulaw_bytes is None: + continue + pcm_24k = mulaw_8k_to_pcm16_24k(mulaw_bytes) + if not _in_model_turn: + sync_buffer_to_position(self.assistant_audio_buffer, len(self.user_audio_buffer)) + self.user_audio_buffer.extend(pcm_24k) + await connection.send_media(pcm_24k) + except WebSocketDisconnect: + logger.info("Twilio WebSocket disconnected") + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Error in user audio forwarder: {e}", exc_info=True) + finally: + twilio_connected = False + + async def _pace_audio_output() -> None: + """Drain audio_output_queue and forward chunks at real-time rate.""" + nonlocal twilio_connected + next_send_time = time.monotonic() + try: + while self._running: + try: + chunk = await asyncio.wait_for(audio_output_queue.get(), timeout=1.0) + except TimeoutError: + continue + + twilio_msg = create_twilio_media_message(stream_sid, chunk) + try: + await websocket.send_text(twilio_msg) + except Exception: + twilio_connected = False + return + + now = time.monotonic() + if next_send_time <= now: + next_send_time = now + next_send_time += MULAW_CHUNK_DURATION_S + sleep_duration = next_send_time - time.monotonic() + if sleep_duration > 0: + await asyncio.sleep(sleep_duration) + except asyncio.CancelledError: + pass + + def _flush_assistant_turn(interrupted: bool) -> None: + nonlocal _assistant_turn_text, _in_model_turn, _assistant_turn_start_ts + full_text = " ".join(_assistant_turn_text).strip() + if full_text: + text = f"{full_text} [interrupted]" if interrupted else full_text + self.audit_log.append_assistant_output(text, timestamp_ms=_assistant_turn_start_ts) + if interrupted: + fw_log.s2s_transcript(full_text) + else: + fw_log.llm_response(full_text) + fw_log.turn_end(was_interrupted=interrupted) + _in_model_turn = False + _assistant_turn_text = [] + _assistant_turn_start_ts = None + + def _drain_audio_queue() -> None: + while not audio_output_queue.empty(): + with contextlib.suppress(asyncio.QueueEmpty): + audio_output_queue.get_nowait() + + async def _process_deepgram_events() -> None: + """Consume events from the Deepgram agent session. + + We iterate the underlying websocket directly and dispatch on the + raw ``type`` field instead of the SDK's typed iterator. In + deepgram-sdk 6.1.x the agent response-union deserialization is not + discriminated by ``type``: it mis-constructs every JSON event as the + same model, so isinstance-based dispatch silently drops transcripts + and tool-call requests. Parsing the JSON ourselves is deterministic. + Binary frames (TTS audio) are delivered as ``bytes`` unchanged. + """ + nonlocal _assistant_turn_text, _in_model_turn, _user_speaking + nonlocal _user_speech_start_ts, _user_speech_stop_ts, _assistant_turn_start_ts + try: + async for raw in connection._websocket: + if not self._running: + break + + # --- Raw TTS audio output (24 kHz PCM16) --- + if isinstance(raw, bytes): + if not raw: + continue + if not _in_model_turn: + _in_model_turn = True + _user_speaking = False + _assistant_turn_start_ts = str(int(round(time.time() * 1000))) + fw_log.turn_start() + + # model_response latency: user speech end -> first audio. + # Absent on the initial greeting (model-initiated) turn. + if _user_speech_stop_ts: + latency_ms = int(_assistant_turn_start_ts) - int(_user_speech_stop_ts) + if 0 < latency_ms < 30_000: + metrics_log.write_latency("model_response", latency_ms / 1000, self._model) + _user_speech_stop_ts = None + + if not _user_speaking: + sync_buffer_to_position(self.user_audio_buffer, len(self.assistant_audio_buffer)) + self.assistant_audio_buffer.extend(raw) + + if twilio_connected: + try: + mulaw = pcm16_24k_to_mulaw_8k(raw) + except Exception as conv_err: + logger.warning(f"Audio conversion error ({len(raw)} bytes): {conv_err}") + continue + offset = 0 + while offset < len(mulaw): + await audio_output_queue.put(mulaw[offset : offset + MULAW_CHUNK_SIZE]) + offset += MULAW_CHUNK_SIZE + continue + + # --- JSON control / transcript events --- + try: + event = json.loads(raw) + except (json.JSONDecodeError, TypeError): + continue + event_type = event.get("type") + + # Conversation transcripts (final per turn) + if event_type == "ConversationText": + text = (event.get("content") or "").strip() + if not text: + continue + if event.get("role") == "user": + _user_speaking = False + logger.info(f"User transcription: {text}") + self.audit_log.append_user_input(text, timestamp_ms=_user_speech_start_ts) + _user_speech_start_ts = None + else: + _assistant_turn_text.append(text) + + # Agent finished speaking -> end of assistant turn + elif event_type == "AgentAudioDone": + logger.debug("Deepgram agent audio done") + _flush_assistant_turn(interrupted=False) + + # User barge-in + elif event_type == "UserStartedSpeaking": + if _in_model_turn: + logger.debug("User barge-in during agent turn") + _user_speaking = True + _drain_audio_queue() + _flush_assistant_turn(interrupted=True) + + # Client-side tool calls + elif event_type == "FunctionCallRequest": + for fn in event.get("functions", []): + raw_args = fn.get("arguments") + try: + arguments = json.loads(raw_args) if raw_args else {} + except json.JSONDecodeError: + arguments = {} + fn_name = fn.get("name", "") + logger.info(f"Tool call: {fn_name}({json.dumps(arguments)})") + result = await self.execute_tool(fn_name, arguments) + await connection.send_function_call_response( + AgentV1SendFunctionCallResponse( + type="FunctionCallResponse", + id=fn.get("id"), + name=fn_name, + content=json.dumps(result), + ) + ) + + elif event_type in ("Error", "FatalError"): + logger.error(f"Deepgram agent error: {event.get('description')}") + elif event_type == "Warning": + logger.warning(f"Deepgram agent warning: {event.get('description')}") + + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Error in Deepgram event processor: {e}", exc_info=True) + + async def _send_keepalives() -> None: + """Keep the Deepgram input stream alive during user silence. + + The user simulator is half-duplex and stops sending mic audio while + the agent is speaking. Without input, Deepgram closes the session with + a "did not receive audio within our timeout" error (~10s). Periodic + KeepAlive messages reset that timer; they are no-ops when audio flows. + """ + try: + while self._running and twilio_connected: + await asyncio.sleep(KEEPALIVE_INTERVAL_S) + try: + await connection.send_keep_alive() + except Exception: + break + except asyncio.CancelledError: + pass + + user_task = asyncio.create_task(_forward_user_audio()) + events_task = asyncio.create_task(_process_deepgram_events()) + pacer_task = asyncio.create_task(_pace_audio_output()) + keepalive_task = asyncio.create_task(_send_keepalives()) + + done, pending = await asyncio.wait( + [user_task, events_task, pacer_task, keepalive_task], + return_when=asyncio.FIRST_COMPLETED, + ) + + def _task_name(t: asyncio.Task[None]) -> str: + if t is user_task: + return "user_audio" + if t is events_task: + return "deepgram_events" + if t is keepalive_task: + return "keepalive" + return "audio_pacer" + + for task in done: + exc = task.exception() + if exc: + logger.error(f"Task '{_task_name(task)}' failed: {exc}", exc_info=exc) + else: + logger.info(f"Task '{_task_name(task)}' completed normally") + + for task in pending: + logger.info(f"Cancelling pending task '{_task_name(task)}'") + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + + except Exception as e: + logger.error(f"Deepgram agent session error: {e}", exc_info=True) + finally: + logger.info("Client disconnected from Deepgram agent server") diff --git a/src/eva/models/config.py b/src/eva/models/config.py index 6752d54b..0f6451bd 100644 --- a/src/eva/models/config.py +++ b/src/eva/models/config.py @@ -379,7 +379,7 @@ class ModelDeployment(DeploymentTypedDict): ) # Framework selection - framework: Literal["pipecat", "openai_realtime", "gemini_live", "elevenlabs", "grok_voice"] = Field( + framework: Literal["pipecat", "openai_realtime", "gemini_live", "elevenlabs", "grok_voice", "deepgram"] = Field( "pipecat", description=( "Agent framework to use for the assistant server." @@ -388,6 +388,7 @@ class ModelDeployment(DeploymentTypedDict): "'gemini_live': Gemini Live API via google-genai." "'elevenlabs': ElevenLabs Conversational AI API." "'grok_voice': xAI Grok voice realtime API." + "'deepgram': Deepgram Voice Agent API." ), ) diff --git a/src/eva/orchestrator/worker.py b/src/eva/orchestrator/worker.py index 61b9db53..7c26c6d9 100644 --- a/src/eva/orchestrator/worker.py +++ b/src/eva/orchestrator/worker.py @@ -46,10 +46,14 @@ def _get_server_class(framework: str) -> type[AbstractAssistantServer]: from eva.assistant.grok_voice_server import GrokVoiceAssistantServer return GrokVoiceAssistantServer + elif framework == "deepgram": + from eva.assistant.deepgram_server import DeepgramAssistantServer + + return DeepgramAssistantServer else: raise ValueError( f"Unknown framework: {framework!r}. " - "Supported: pipecat, openai_realtime, gemini_live, elevenlabs, grok_voice" + "Supported: pipecat, openai_realtime, gemini_live, elevenlabs, grok_voice, deepgram" ) diff --git a/tests/unit/assistant/test_deepgram_server.py b/tests/unit/assistant/test_deepgram_server.py new file mode 100644 index 00000000..b5993f92 --- /dev/null +++ b/tests/unit/assistant/test_deepgram_server.py @@ -0,0 +1,103 @@ +"""Tests for DeepgramAssistantServer settings + tool conversion helpers.""" + +from unittest.mock import MagicMock + +from deepgram.agent.v1.types.agent_v1settings import AgentV1Settings + +from eva.assistant.deepgram_server import ( + INITIAL_MESSAGE, + DeepgramAssistantServer, + _agent_tools_to_deepgram, +) +from eva.models.agents import AgentConfig, AgentTool, AgentToolParameter + + +def _agent_with_tools() -> AgentConfig: + return AgentConfig( + id="a1", + name="Test Agent", + description="desc", + role="role", + instructions="be helpful", + tool_module_path="eva.assistant.tools.airline_tools", + tools=[ + AgentTool( + id="t1", + name="Lookup Booking", + description="Look up a booking", + required_parameters=[AgentToolParameter(name="booking_id", type="str", description="The booking id")], + optional_parameters=[AgentToolParameter(name="verbose", type="bool")], + ) + ], + ) + + +def _bare_server() -> DeepgramAssistantServer: + """Build a server without running __init__ (which needs file-backed tool config).""" + srv = object.__new__(DeepgramAssistantServer) + srv._audio_sample_rate = 24000 + srv._language = "en" + srv._listen_model = "nova-3" + srv._think_provider = "open_ai" + srv._think_model = "gpt-4o-mini" + srv._model = "gpt-4o-mini" + srv._speak_model = "aura-2-thalia-en" + srv._system_prompt = "you are a helpful assistant" + srv._functions = None + return srv + + +class TestToolConversion: + def test_no_tools_returns_none(self): + agent = MagicMock() + agent.tools = [] + assert _agent_tools_to_deepgram(agent) is None + + def test_tool_converted_to_client_side_function(self): + functions = _agent_tools_to_deepgram(_agent_with_tools()) + assert functions is not None + assert len(functions) == 1 + fn = functions[0] + # Client-side functions have no "endpoint" key. + assert "endpoint" not in fn + assert fn["name"] # function_name derived from the tool + assert "Lookup Booking" in fn["description"] + params = fn["parameters"] + assert params["type"] == "object" + assert "booking_id" in params["properties"] + assert params["required"] == ["booking_id"] + + +class TestBuildSettings: + def test_audio_encoding_and_sample_rate(self): + settings = _bare_server()._build_settings() + assert isinstance(settings, AgentV1Settings) + wire = settings.dict() + assert wire["audio"]["input"] == {"encoding": "linear16", "sample_rate": 24000} + assert wire["audio"]["output"]["encoding"] == "linear16" + assert wire["audio"]["output"]["sample_rate"] == 24000 + # Raw PCM output (no WAV header) so the pacer can stream it directly. + assert wire["audio"]["output"]["container"] == "none" + + def test_providers_and_greeting(self): + wire = _bare_server()._build_settings().dict() + agent = wire["agent"] + assert agent["greeting"] == INITIAL_MESSAGE + assert agent["language"] == "en" + assert agent["listen"]["provider"]["model"] == "nova-3" + assert agent["think"]["provider"]["type"] == "open_ai" + assert agent["think"]["provider"]["model"] == "gpt-4o-mini" + assert agent["think"]["prompt"] == "you are a helpful assistant" + assert agent["speak"]["provider"]["model"] == "aura-2-thalia-en" + + def test_functions_omitted_when_no_tools(self): + wire = _bare_server()._build_settings().dict() + assert "functions" not in wire["agent"]["think"] + + def test_functions_included_when_present(self): + srv = _bare_server() + srv._functions = _agent_tools_to_deepgram(_agent_with_tools()) + wire = srv._build_settings().dict() + functions = wire["agent"]["think"]["functions"] + assert len(functions) == 1 + assert functions[0]["parameters"]["required"] == ["booking_id"] diff --git a/tests/unit/orchestrator/test_framework_dispatch.py b/tests/unit/orchestrator/test_framework_dispatch.py index 82884115..fbb010ce 100644 --- a/tests/unit/orchestrator/test_framework_dispatch.py +++ b/tests/unit/orchestrator/test_framework_dispatch.py @@ -2,6 +2,7 @@ import pytest +from eva.assistant.deepgram_server import DeepgramAssistantServer from eva.assistant.grok_voice_server import GrokVoiceAssistantServer from eva.assistant.openai_realtime_server import OpenAIRealtimeAssistantServer from eva.orchestrator.worker import _get_server_class @@ -12,6 +13,11 @@ def test_grok_voice_dispatch_returns_grok_class(): assert cls is GrokVoiceAssistantServer +def test_deepgram_dispatch_returns_deepgram_class(): + cls = _get_server_class("deepgram") + assert cls is DeepgramAssistantServer + + def test_grok_voice_is_subclass_of_openai_realtime(): assert issubclass(GrokVoiceAssistantServer, OpenAIRealtimeAssistantServer) From bba4c1201e4c4d7eef2d06a9ba0e38c829310e31 Mon Sep 17 00:00:00 2001 From: weiz9 Date: Wed, 10 Jun 2026 11:21:59 -0700 Subject: [PATCH 2/5] Apply suggestions from code review Co-authored-by: Katrina Stankiewicz --- src/eva/assistant/deepgram_server.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/eva/assistant/deepgram_server.py b/src/eva/assistant/deepgram_server.py index 86061a67..b644a7a7 100644 --- a/src/eva/assistant/deepgram_server.py +++ b/src/eva/assistant/deepgram_server.py @@ -64,9 +64,7 @@ # Defaults for the Voice Agent listen/think/speak providers (overridable via s2s_params). _DEFAULT_LISTEN_MODEL = "nova-3" _DEFAULT_THINK_PROVIDER = "open_ai" -_DEFAULT_THINK_MODEL = "gpt-4o-mini" _DEFAULT_SPEAK_MODEL = "aura-2-thalia-en" -_DEFAULT_LANGUAGE = "en" def _agent_tools_to_deepgram(agent: AgentConfig) -> list[dict[str, Any]] | None: @@ -107,12 +105,11 @@ def __init__(self, **kwargs: Any) -> None: self._api_key: str = s2s_params.get("api_key", "") # ``think_model`` is the LLM driving the agent; used as the metrics label. # Accept ``model`` as an alias for the contract's "model required" convention. - self._think_model: str = s2s_params.get("think_model") or s2s_params.get("model") or _DEFAULT_THINK_MODEL + self._think_model: str = s2s_params["model"] self._model = self._think_model self._think_provider: str = s2s_params.get("think_provider", _DEFAULT_THINK_PROVIDER) self._listen_model: str = s2s_params.get("listen_model", _DEFAULT_LISTEN_MODEL) self._speak_model: str = s2s_params.get("speak_model", _DEFAULT_SPEAK_MODEL) - self._language: str = s2s_params.get("language", _DEFAULT_LANGUAGE) # Build system prompt (same pattern as the other realtime/S2S servers) prompt_manager = PromptManager() @@ -216,7 +213,7 @@ def _build_settings(self) -> AgentV1Settings: "output": {"encoding": "linear16", "sample_rate": self._audio_sample_rate, "container": "none"}, }, "agent": { - "language": self._language, + "language": self.language, "greeting": self.initial_message, "listen": {"provider": {"type": "deepgram", "model": self._listen_model}}, "think": think, From 234d46a1de4b303fd1091c15d00010d288520701 Mon Sep 17 00:00:00 2001 From: "wei.zhong1" Date: Wed, 10 Jun 2026 11:48:42 -0700 Subject: [PATCH 3/5] Deepgram: cascade evaluation + think_label, fix review-suggestion test Builds on the code-review suggestions (model now required in s2s_params; use base self.language): - Evaluate Deepgram as a cascade pipeline (get_pipeline_type -> CASCADE) since it runs STT->LLM->TTS internally, so stt_wer / transcription_accuracy / speakability run; expose {stt, llm, tts} via pipeline_parts so the run_id folder shows the three component models. - Add optional `think_label` to decouple the short metrics/run_id label from the (long) Deepgram model id (Deepgram still receives `model`). - Fix test broken by the review suggestion: _bare_server set `_language` but the server now reads base `self.language`. - Docs updated to match. --- docs/assistant_server_contract.md | 13 +++++++++---- src/eva/assistant/deepgram_server.py | 7 ++++--- src/eva/models/config.py | 16 ++++++++++++++-- tests/unit/assistant/test_deepgram_server.py | 2 +- 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/docs/assistant_server_contract.md b/docs/assistant_server_contract.md index b2363572..9394b6bc 100644 --- a/docs/assistant_server_contract.md +++ b/docs/assistant_server_contract.md @@ -548,10 +548,15 @@ server and a good template for a new S2S framework. Notable points specific to Deepgram: - **Config.** `framework: deepgram`, `model: {s2s: deepgram, s2s_params: {...}}`. Recognised - `s2s_params`: `api_key` (required), `think_provider` (default `open_ai`), - `think_model` / `model` (LLM + metrics label, default `gpt-4o-mini`), - `listen_model` (STT, default `nova-3`), `speak_model` (TTS, default `aura-2-thalia-en`), - `language` (default `en`). + `s2s_params`: `api_key` and `model` (both **required**; `model` is the exact Deepgram LLM id, + e.g. `gpt-4o-mini` or `claude-haiku-4-5`), `think_provider` (default `open_ai`; use `anthropic` + for Claude models), `think_label` (optional short metrics/run_id label — Deepgram still receives + `model`), `listen_model` (STT, default `nova-3`), `speak_model` (TTS, default `aura-2-thalia-en`). + The conversation language comes from the run's `language` (base server), not `s2s_params`. +- **Evaluation.** Although configured via `s2s`, Deepgram is scored as a **cascade** pipeline + (`get_pipeline_type` → `CASCADE`), since it runs STT→LLM→TTS internally — so STT/TTS metrics + (`stt_wer`, `transcription_accuracy_key_entities`, `speakability`) run. `pipeline_parts` exposes + `{stt, llm, tts}` so the run_id/folder shows the three component models. - **Settings.** Sent once on connect via `send_settings(AgentV1Settings)`. Built from a plain dict and validated with `AgentV1Settings.model_validate(...)`, which resolves the discriminated provider unions. Audio is `linear16` @ 24 kHz both directions with output diff --git a/src/eva/assistant/deepgram_server.py b/src/eva/assistant/deepgram_server.py index b644a7a7..35878361 100644 --- a/src/eva/assistant/deepgram_server.py +++ b/src/eva/assistant/deepgram_server.py @@ -103,10 +103,11 @@ def __init__(self, **kwargs: Any) -> None: s2s_params = self.pipeline_config.s2s_params or {} self._api_key: str = s2s_params.get("api_key", "") - # ``think_model`` is the LLM driving the agent; used as the metrics label. - # Accept ``model`` as an alias for the contract's "model required" convention. + # ``model`` is the exact LLM id sent to Deepgram (required). self._think_model: str = s2s_params["model"] - self._model = self._think_model + # Metrics/run_id label, decoupled from the (often long) Deepgram model id: + # an explicit ``think_label`` if provided, else the model id itself. + self._model = s2s_params.get("think_label") or self._think_model self._think_provider: str = s2s_params.get("think_provider", _DEFAULT_THINK_PROVIDER) self._listen_model: str = s2s_params.get("listen_model", _DEFAULT_LISTEN_MODEL) self._speak_model: str = s2s_params.get("speak_model", _DEFAULT_SPEAK_MODEL) diff --git a/src/eva/models/config.py b/src/eva/models/config.py index a6d29ec1..eb3bbdb8 100644 --- a/src/eva/models/config.py +++ b/src/eva/models/config.py @@ -209,6 +209,17 @@ def pipeline_parts(self) -> dict[str, str]: "s2s": _param_alias(self.s2s_params) or self.s2s, **_fetch_elevenlabs_agent_models(self.s2s_params), } + if self.s2s == "deepgram": + # Deepgram Voice Agent is a cascade internally (STT -> LLM -> TTS); + # expose its component models. The `llm` part uses the short + # `think_label` if provided (else the Deepgram model id), so the + # run_id/folder stays readable; defaults mirror deepgram_server.py. + p = self.s2s_params or {} + return { + "stt": p.get("listen_model", "nova-3"), + "llm": p.get("think_label") or p.get("model") or p.get("think_model", ""), + "tts": p.get("speak_model", "aura-2-thalia-en"), + } return {"s2s": _param_alias(self.s2s_params)} case PipelineType.CASCADE: return { @@ -249,8 +260,9 @@ def get_pipeline_type(model_data: dict) -> PipelineType: ``llm_model`` in a flat dict. """ if s2s_value := model_data.get("s2s"): - # ElevenLabs uses s2s_params for configuration but is a cascade pipeline internally - if s2s_value == "elevenlabs": + # ElevenLabs and Deepgram use s2s_params for configuration but are cascade + # pipelines internally (STT -> LLM -> TTS), so they're scored as cascade. + if s2s_value in ("elevenlabs", "deepgram"): return PipelineType.CASCADE # Ultravox uses s2s_params for plumbing but is an audio-LLM (audio in, text out, separate TTS) if s2s_value == "ultravox": diff --git a/tests/unit/assistant/test_deepgram_server.py b/tests/unit/assistant/test_deepgram_server.py index f4321a15..299147ed 100644 --- a/tests/unit/assistant/test_deepgram_server.py +++ b/tests/unit/assistant/test_deepgram_server.py @@ -37,7 +37,7 @@ def _bare_server() -> DeepgramAssistantServer: """Build a server without running __init__ (which needs file-backed tool config).""" srv = object.__new__(DeepgramAssistantServer) srv._audio_sample_rate = 24000 - srv._language = "en" + srv.language = "en" srv._listen_model = "nova-3" srv._think_provider = "open_ai" srv._think_model = "gpt-4o-mini" From 4f589dc4707404317cc6aef145dfa3f18b325c26 Mon Sep 17 00:00:00 2001 From: "wei.zhong1" Date: Tue, 16 Jun 2026 14:58:53 -0700 Subject: [PATCH 4/5] Deepgram agent: BYO think providers + fail-loud on think outages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add bring-your-own credentials/endpoint for the Voice Agent think step (s2s_params.think_credentials / think_endpoint / think_params / context_length / think_region) so the LLM can run on your own Bedrock/Anthropic/etc. quota instead of Deepgram's managed allowance — needed for long prompts that exceed managed limits and for an apples-to-apples comparison with the cascade pipeline. Managed config is unchanged; BYO is fully opt-in. For aws_bedrock, IAM/STS creds are built from AWS_* env vars when think_credentials is omitted. Note: Deepgram's aws_bedrock think integration only supports Claude 3.5-era models (e.g. anthropic/claude-3-5-haiku-20240307-v1:0). Haiku 4.5 is rejected with 'Invalid agent.think settings - model not available', so BYO Bedrock cannot match the cascade pipeline's Bedrock Haiku 4.5 — use managed or BYO Anthropic-direct (claude-haiku-4-5) for Haiku 4.5. Surface think outages loudly: a provider Error, or a greeting-only conversation (caller spoke but the model never replied), is now logged at ERROR with guidance instead of silently producing a clean-looking 1-turn run. Handle Welcome/SettingsApplied/History/AgentThinking events explicitly. Bumps simulation_version 2.0.2 -> 2.0.3. --- docs/assistant_server_contract.md | 35 ++++++-- src/eva/__init__.py | 2 +- src/eva/assistant/deepgram_server.py | 129 ++++++++++++++++++++++++++- uv.lock | 6 +- 4 files changed, 158 insertions(+), 14 deletions(-) diff --git a/docs/assistant_server_contract.md b/docs/assistant_server_contract.md index 9394b6bc..69d0072d 100644 --- a/docs/assistant_server_contract.md +++ b/docs/assistant_server_contract.md @@ -550,9 +550,29 @@ Notable points specific to Deepgram: - **Config.** `framework: deepgram`, `model: {s2s: deepgram, s2s_params: {...}}`. Recognised `s2s_params`: `api_key` and `model` (both **required**; `model` is the exact Deepgram LLM id, e.g. `gpt-4o-mini` or `claude-haiku-4-5`), `think_provider` (default `open_ai`; use `anthropic` - for Claude models), `think_label` (optional short metrics/run_id label — Deepgram still receives - `model`), `listen_model` (STT, default `nova-3`), `speak_model` (TTS, default `aura-2-thalia-en`). - The conversation language comes from the run's `language` (base server), not `s2s_params`. + for Claude models, `aws_bedrock`/`google`/`groq` for the others), `think_label` (optional short + metrics/run_id label — Deepgram still receives `model`), `listen_model` (STT, default `nova-3`), + `speak_model` (TTS, default `aura-2-thalia-en`). The conversation language comes from the run's + `language` (base server), not `s2s_params`. +- **Managed vs. BYO think.** By default Deepgram runs the LLM with its *managed* provider + (Deepgram's own credentials/quota). To drive think with **your own** credentials — for an + apples-to-apples comparison with the cascade pipeline (same Bedrock model) and to bypass + managed prompt-size limits — add any of these `s2s_params`: + - `think_credentials` (dict) → `think.provider.credentials`. Only `aws_bedrock` takes + credentials (`{type: iam|sts, region, access_key_id, secret_access_key, session_token?}`). + If omitted for `aws_bedrock`, the server builds IAM/STS creds from `AWS_ACCESS_KEY_ID` / + `AWS_SECRET_ACCESS_KEY` / `AWS_SESSION_TOKEN` env vars (region from `think_region`, + else `AWS_REGION`/`AWS_DEFAULT_REGION`, else `us-east-1`). + - `think_endpoint` (dict `{url, headers}`) → `think.endpoint`. This is how `anthropic`/ + `open_ai`/`google` BYO works (they have no `credentials` field): point at your own URL and + pass your key in `headers`. + - `think_params` (dict) → merged into `think.provider` (e.g. `temperature`, `version`, + `reasoning_mode`). + - `context_length` (`"max"` | int) → `think.context_length`, for long prompts. + - `think_region` (str) → region for the `aws_bedrock` env-credential builder. +- **Fail-loud.** A think outage (provider error, or no model reply after the caller speaks — + greeting only) is logged at ERROR with guidance, instead of silently producing a clean-looking + 1-turn conversation. Watch for `produced NO think response` / `conversation FAILED`. - **Evaluation.** Although configured via `s2s`, Deepgram is scored as a **cascade** pipeline (`get_pipeline_type` → `CASCADE`), since it runs STT→LLM→TTS internally — so STT/TTS metrics (`stt_wer`, `transcription_accuracy_key_entities`, `speakability`) run. `pipeline_parts` exposes @@ -563,8 +583,11 @@ Notable points specific to Deepgram: `container: "none"` (raw PCM); `agent.greeting` carries `INITIAL_MESSAGE`. - **Tools.** Configured under `agent.think.functions` (no `endpoint` ⇒ *client-side*), so the agent emits `FunctionCallRequest` events; reply with `send_function_call_response`. -- **Events.** `async for message in connection` yields raw `bytes` (TTS audio) or typed events - (`ConversationText`, `UserStartedSpeaking`, `AgentStartedSpeaking`, `AgentAudioDone`, - `FunctionCallRequest`, `Error`, `Warning`). +- **Events.** The session yields raw `bytes` (TTS audio) or JSON events: `Welcome`, + `SettingsApplied`, `ConversationText`, `History`, `UserStartedSpeaking`, `AgentThinking`, + `AgentStartedSpeaking`, `AgentAudioDone`, `FunctionCallRequest`, `Error`, `Warning`. Acted-on + events are `ConversationText` (transcript), `AgentAudioDone` (turn end), `UserStartedSpeaking` + (barge-in), `FunctionCallRequest` (tool call), `Error`/`FatalError` (fail-loud); the rest are + informational. - **Limitation.** The Voice Agent event stream exposes no token-usage event, so token usage is not reported for this framework. Latency is still emitted on the first audio chunk per turn. diff --git a/src/eva/__init__.py b/src/eva/__init__.py index 135922d9..fe052c43 100644 --- a/src/eva/__init__.py +++ b/src/eva/__init__.py @@ -7,7 +7,7 @@ # Bump simulation_version when changes affect benchmark outputs (agent code, # user simulator, orchestrator, simulation prompts, agent configs, tool mocks). -simulation_version = "2.0.2" +simulation_version = "2.0.3" # Bump metrics_version when changes affect metric computation (metrics code, # judge prompts, pricing tables, postprocessor). diff --git a/src/eva/assistant/deepgram_server.py b/src/eva/assistant/deepgram_server.py index 35878361..48be377d 100644 --- a/src/eva/assistant/deepgram_server.py +++ b/src/eva/assistant/deepgram_server.py @@ -112,6 +112,25 @@ def __init__(self, **kwargs: Any) -> None: self._listen_model: str = s2s_params.get("listen_model", _DEFAULT_LISTEN_MODEL) self._speak_model: str = s2s_params.get("speak_model", _DEFAULT_SPEAK_MODEL) + # --- BYO ("bring your own") think configuration (all optional) --- + # By default the think step uses Deepgram's *managed* provider. Supply any of + # the following to drive the LLM with your own credentials/endpoint instead — + # for an apples-to-apples comparison with the cascade pipeline (same Bedrock + # model) and to bypass managed-provider prompt-size limits: + # think_credentials: dict -> provider.credentials (aws_bedrock IAM/STS creds). + # If omitted for aws_bedrock, built from AWS_* env vars. + # think_endpoint: dict -> think.endpoint {url, headers} (BYO key for + # anthropic/open_ai/google, which have no credentials field). + # think_params: dict -> merged into provider (e.g. temperature, version, + # reasoning_mode). + # context_length: "max" | int -> think.context_length (long-prompt support). + # think_region: str -> region for the aws_bedrock env-credential builder. + self._think_credentials: dict[str, Any] | None = s2s_params.get("think_credentials") + self._think_endpoint: dict[str, Any] | None = s2s_params.get("think_endpoint") + self._think_params: dict[str, Any] = s2s_params.get("think_params") or {} + self._context_length: Any = s2s_params.get("context_length") + self._think_region: str | None = s2s_params.get("think_region") or s2s_params.get("region") + # Build system prompt (same pattern as the other realtime/S2S servers) prompt_manager = PromptManager() self._system_prompt = prompt_manager.get_prompt( @@ -194,6 +213,52 @@ async def _shutdown(self) -> None: # Settings # ------------------------------------------------------------------ + def _build_think_provider(self) -> dict[str, Any]: + """Build the think ``provider`` object: managed by default, BYO if configured.""" + provider: dict[str, Any] = {"type": self._think_provider, "model": self._think_model} + # Optional provider-level params (temperature, version, reasoning_mode, ...). + provider.update(self._think_params) + # BYO credentials. Only aws_bedrock carries provider.credentials; for + # anthropic/open_ai/google, BYO is done via think.endpoint instead. + if self._think_credentials is not None: + provider["credentials"] = self._think_credentials + elif self._think_provider == "aws_bedrock": + creds = self._aws_credentials_from_env() + if creds is not None: + provider["credentials"] = creds + return provider + + def _aws_credentials_from_env(self) -> dict[str, Any] | None: + """IAM/STS credentials for aws_bedrock BYO think, from the standard AWS_* env vars. + + Mirrors how the cascade pipeline reaches Bedrock, so the agent's think step can + use the *same* model and credentials for an apples-to-apples comparison. + """ + import os + + access_key = os.environ.get("AWS_ACCESS_KEY_ID") + secret_key = os.environ.get("AWS_SECRET_ACCESS_KEY") + if not (access_key and secret_key): + logger.warning( + "think_provider=aws_bedrock but AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY are not set; " + "pass think_credentials explicitly or set the AWS_* env vars" + ) + return None + region = self._think_region or os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION") + if not region: + region = "us-east-1" + logger.warning("No region for aws_bedrock think; defaulting to us-east-1 (set s2s_params.think_region)") + session_token = os.environ.get("AWS_SESSION_TOKEN") + creds: dict[str, Any] = { + "type": "sts" if session_token else "iam", + "region": region, + "access_key_id": access_key, + "secret_access_key": secret_key, + } + if session_token: + creds["session_token"] = session_token + return creds + def _build_settings(self) -> AgentV1Settings: """Build the Voice Agent ``Settings`` message. @@ -201,11 +266,16 @@ def _build_settings(self) -> AgentV1Settings: resolves the discriminated provider unions and produces the correct wire JSON. """ think: dict[str, Any] = { - "provider": {"type": self._think_provider, "model": self._think_model}, + "provider": self._build_think_provider(), "prompt": self._system_prompt, } if self._functions: think["functions"] = self._functions + # BYO endpoint (anthropic/open_ai/google) and long-prompt context window. + if self._think_endpoint: + think["endpoint"] = self._think_endpoint + if self._context_length is not None: + think["context_length"] = self._context_length settings_dict: dict[str, Any] = { "type": "Settings", @@ -248,6 +318,13 @@ async def _handle_session(self, websocket: WebSocket) -> None: _user_speech_stop_ts: str | None = None # From the simulator's VAD _assistant_turn_start_ts: str | None = None # Wall-clock ms of first audio chunk + # Fail-loud bookkeeping: a healthy conversation has the model replying to user + # turns. If the user speaks but the agent only ever emits its greeting (or a + # fatal Error arrives), the think step failed and we surface it loudly. + _user_turn_count = 0 + _assistant_turn_count = 0 # includes the model-initiated greeting + _fatal_error: str | None = None + # Outbound mulaw chunks; drained by the pacer at real-time rate. audio_output_queue: asyncio.Queue[bytes] = asyncio.Queue() @@ -340,9 +417,10 @@ async def _pace_audio_output() -> None: pass def _flush_assistant_turn(interrupted: bool) -> None: - nonlocal _assistant_turn_text, _in_model_turn, _assistant_turn_start_ts + nonlocal _assistant_turn_text, _in_model_turn, _assistant_turn_start_ts, _assistant_turn_count full_text = " ".join(_assistant_turn_text).strip() if full_text: + _assistant_turn_count += 1 text = f"{full_text} [interrupted]" if interrupted else full_text self.audit_log.append_assistant_output(text, timestamp_ms=_assistant_turn_start_ts) if interrupted: @@ -372,6 +450,7 @@ async def _process_deepgram_events() -> None: """ nonlocal _assistant_turn_text, _in_model_turn, _user_speaking nonlocal _user_speech_start_ts, _user_speech_stop_ts, _assistant_turn_start_ts + nonlocal _user_turn_count, _fatal_error try: async for raw in connection._websocket: if not self._running: @@ -425,6 +504,7 @@ async def _process_deepgram_events() -> None: continue if event.get("role") == "user": _user_speaking = False + _user_turn_count += 1 logger.info(f"User transcription: {text}") self.audit_log.append_user_input(text, timestamp_ms=_user_speech_start_ts) _user_speech_start_ts = None @@ -464,10 +544,33 @@ async def _process_deepgram_events() -> None: ) ) + # Session-lifecycle / informational events — no action needed. + # ``History`` mirrors ConversationText (already recorded); + # the others confirm setup or signal the model is working. + elif event_type in ( + "Welcome", + "SettingsApplied", + "History", + "AgentThinking", + "AgentStartedSpeaking", + "PromptUpdated", + "SpeakUpdated", + "ThinkUpdated", + ): + logger.debug(f"Deepgram agent event: {event_type}") + elif event_type in ("Error", "FatalError"): - logger.error(f"Deepgram agent error: {event.get('description')}") + _fatal_error = event.get("description") or event_type + logger.error( + f"Deepgram agent {event_type} from the think/agent service: " + f"{_fatal_error} | full={json.dumps(event)}" + ) elif event_type == "Warning": - logger.warning(f"Deepgram agent warning: {event.get('description')}") + logger.warning( + f"Deepgram agent warning: {event.get('description')} | full={json.dumps(event)}" + ) + else: + logger.warning(f"Unhandled Deepgram event type={event_type}: {json.dumps(event)[:500]}") except asyncio.CancelledError: pass @@ -524,6 +627,24 @@ def _task_name(t: asyncio.Task[None]) -> str: with contextlib.suppress(asyncio.CancelledError): await task + # Fail loud: surface a think outage instead of silently producing a + # clean-looking greeting-only conversation. A healthy run has the model + # replying to user turns; greeting-only (count<=1) after the user spoke + # almost always means the think provider failed (credits/entitlement/ + # credentials) or returned nothing. + if _fatal_error: + logger.error( + f"Deepgram agent conversation FAILED: think/agent service error '{_fatal_error}'. " + f"(user_turns={_user_turn_count}, assistant_turns={_assistant_turn_count})" + ) + elif _user_turn_count >= 1 and _assistant_turn_count <= 1: + logger.error( + "Deepgram agent produced NO think response to the caller " + f"(user_turns={_user_turn_count}, assistant_turns={_assistant_turn_count}; greeting only). " + "The think step likely failed silently — check Deepgram credits/entitlement for the " + "think model, or supply BYO credentials (s2s_params.think_credentials / think_endpoint)." + ) + except Exception as e: logger.error(f"Deepgram agent session error: {e}", exc_info=True) finally: diff --git a/uv.lock b/uv.lock index 8d7e2839..e886c4e4 100644 --- a/uv.lock +++ b/uv.lock @@ -660,7 +660,7 @@ wheels = [ [[package]] name = "deepgram-sdk" -version = "7.3.0" +version = "7.3.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "httpx" }, @@ -669,9 +669,9 @@ dependencies = [ { name = "typing-extensions" }, { name = "websockets" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/24/68/2c152f8a53198eddc952b33d3fac6f3230c55576f20d53ae4bef0e77cced/deepgram_sdk-7.3.0.tar.gz", hash = "sha256:a82401180d685436fc2f9e5c1468161d10ed6228e4f3d3294de5f29410eab9fe", size = 208421, upload-time = "2026-06-01T13:41:23.708Z" } +sdist = { url = "https://files.pythonhosted.org/packages/1e/f4/72418cc556fbf08216a1cdd66b11affb4077bbd749c1d6b0e77043b87fd2/deepgram_sdk-7.3.1.tar.gz", hash = "sha256:eaade4ea4bead9f009490cf20dcec09a0abd184af112e8a5ddb50cc82b7f98b9", size = 208270, upload-time = "2026-06-03T12:53:03.314Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/f9/99/dbac65ef9db7041127fe9170a567c3a0e1c1879708b49c793f9aed4e9150/deepgram_sdk-7.3.0-py3-none-any.whl", hash = "sha256:32070574142354f698aa53a8d80b6b056f3a8256cb81a0fd111a854251e699b3", size = 560127, upload-time = "2026-06-01T13:41:22.062Z" }, + { url = "https://files.pythonhosted.org/packages/a2/25/12535afe1c5e70988bc385109d2161f35d5e125ef52bef4bff25e018ca56/deepgram_sdk-7.3.1-py3-none-any.whl", hash = "sha256:57b01059240ee8193116773a7b1cec7d36802cee8c26d69939601c2497d1aafa", size = 559927, upload-time = "2026-06-03T12:53:01.712Z" }, ] [[package]] From 52806056423cf98156d2d09a7449039b8b9a9dcd Mon Sep 17 00:00:00 2001 From: "wei.zhong1" Date: Wed, 17 Jun 2026 08:17:12 -0700 Subject: [PATCH 5/5] test(deepgram): fix bare-server helper for BYO attrs + cover BYO think settings The _bare_server() helper bypasses __init__, so it needed the new BYO attributes (_think_credentials/_think_endpoint/_think_params/_context_length/ _think_region) set explicitly. Also add coverage for BYO: bedrock credentials (explicit + AWS_* env-built), endpoint/params/context_length passthrough, and that the managed default is unchanged. --- tests/unit/assistant/test_deepgram_server.py | 56 ++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/tests/unit/assistant/test_deepgram_server.py b/tests/unit/assistant/test_deepgram_server.py index 299147ed..01cae37a 100644 --- a/tests/unit/assistant/test_deepgram_server.py +++ b/tests/unit/assistant/test_deepgram_server.py @@ -46,6 +46,12 @@ def _bare_server() -> DeepgramAssistantServer: srv._system_prompt = "you are a helpful assistant" srv._functions = None srv.initial_message = INITIAL_MESSAGE + # BYO think config (all default to "off"; managed provider path). + srv._think_credentials = None + srv._think_endpoint = None + srv._think_params = {} + srv._context_length = None + srv._think_region = None return srv @@ -103,3 +109,53 @@ def test_functions_included_when_present(self): functions = wire["agent"]["think"]["functions"] assert len(functions) == 1 assert functions[0]["parameters"]["required"] == ["booking_id"] + + +class TestBuildSettingsByo: + """BYO ('bring your own') think configuration is opt-in and additive.""" + + def test_managed_default_has_no_credentials_or_endpoint(self): + # With no BYO params set, the managed path is unchanged. + think = _bare_server()._build_settings().dict()["agent"]["think"] + assert "credentials" not in think["provider"] + assert think.get("endpoint") is None + assert think.get("context_length") is None + + def test_bedrock_explicit_credentials_passthrough(self): + srv = _bare_server() + srv._think_provider = "aws_bedrock" + srv._think_model = "anthropic/claude-3-5-haiku-20240307-v1:0" + srv._think_credentials = { + "type": "iam", + "region": "us-east-1", + "access_key_id": "AK", + "secret_access_key": "SK", + } + provider = srv._build_settings().dict()["agent"]["think"]["provider"] + assert provider["type"] == "aws_bedrock" + assert provider["credentials"]["region"] == "us-east-1" + assert provider["credentials"]["access_key_id"] == "AK" + + def test_endpoint_params_and_context_length_passthrough(self): + srv = _bare_server() + srv._think_params = {"temperature": 0.5} + srv._think_endpoint = {"url": "https://example.test/v1", "headers": {"x-api-key": "KEY"}} + srv._context_length = "max" + think = srv._build_settings().dict()["agent"]["think"] + assert think["provider"]["temperature"] == 0.5 + assert think["endpoint"]["url"] == "https://example.test/v1" + assert think["endpoint"]["headers"]["x-api-key"] == "KEY" + assert think["context_length"] == "max" + + def test_bedrock_credentials_built_from_env(self, monkeypatch): + srv = _bare_server() + srv._think_provider = "aws_bedrock" + srv._think_model = "anthropic/claude-3-5-haiku-20240307-v1:0" + srv._think_region = "us-west-2" + monkeypatch.setenv("AWS_ACCESS_KEY_ID", "envAK") + monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "envSK") + monkeypatch.delenv("AWS_SESSION_TOKEN", raising=False) + creds = srv._build_settings().dict()["agent"]["think"]["provider"]["credentials"] + assert creds["type"] == "iam" + assert creds["region"] == "us-west-2" + assert creds["access_key_id"] == "envAK"