From 721977add2f9435218adcf035f8b9c5f89e93f79 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Fri, 10 Apr 2026 04:12:37 +0000 Subject: [PATCH 1/3] Add zoom live assistant scaffold with API and CLI Co-authored-by: Kanika Gupta --- .gitignore | 9 ++ zoom-live-assistant/README.md | 112 ++++++++++++++++++ zoom-live-assistant/pyproject.toml | 37 ++++++ .../src/zoom_live_assistant/__init__.py | 0 .../src/zoom_live_assistant/api.py | 57 +++++++++ .../src/zoom_live_assistant/assistant.py | 36 ++++++ .../src/zoom_live_assistant/cli.py | 69 +++++++++++ .../src/zoom_live_assistant/config.py | 33 ++++++ .../src/zoom_live_assistant/llm_client.py | 48 ++++++++ .../src/zoom_live_assistant/models.py | 19 +++ .../zoom_live_assistant/question_detector.py | 50 ++++++++ zoom-live-assistant/tests/test_assistant.py | 36 ++++++ .../tests/test_question_detector.py | 19 +++ 13 files changed, 525 insertions(+) create mode 100644 .gitignore create mode 100644 zoom-live-assistant/README.md create mode 100644 zoom-live-assistant/pyproject.toml create mode 100644 zoom-live-assistant/src/zoom_live_assistant/__init__.py create mode 100644 zoom-live-assistant/src/zoom_live_assistant/api.py create mode 100644 zoom-live-assistant/src/zoom_live_assistant/assistant.py create mode 100644 zoom-live-assistant/src/zoom_live_assistant/cli.py create mode 100644 zoom-live-assistant/src/zoom_live_assistant/config.py create mode 100644 zoom-live-assistant/src/zoom_live_assistant/llm_client.py create mode 100644 zoom-live-assistant/src/zoom_live_assistant/models.py create mode 100644 zoom-live-assistant/src/zoom_live_assistant/question_detector.py create mode 100644 zoom-live-assistant/tests/test_assistant.py create mode 100644 zoom-live-assistant/tests/test_question_detector.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8e39591 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +__pycache__/ +*.py[cod] +.pytest_cache/ +.mypy_cache/ +.venv/ +.env +dist/ +build/ +*.egg-info/ diff --git a/zoom-live-assistant/README.md b/zoom-live-assistant/README.md new file mode 100644 index 0000000..015fb08 --- /dev/null +++ b/zoom-live-assistant/README.md @@ -0,0 +1,112 @@ +# Zoom Live Assistant + +A starter tool for real-time call assistance: + +1. Ingest transcript events from Zoom (or any source). +2. Detect when a customer asks a question. +3. Send recent transcript context + question to ChatGPT. +4. Return a concise suggested answer you can read to the customer. + +## Important note about Zoom integration + +This repo includes the core runtime assistant and API/CLI interfaces. +Direct Zoom call audio capture/transcription depends on your Zoom account type and selected integration method (Zoom Apps/SDK/webhooks or external audio capture + ASR). +Use this service as the central "brain" that receives transcript lines and returns answers. + +## Quick start + +### 1) Install + +```bash +cd /workspace/zoom-live-assistant +python -m venv .venv +source .venv/bin/activate +pip install -e ".[dev]" +``` + +### 2) Configure environment + +Create `.env`: + +```env +OPENAI_API_KEY=your_openai_api_key +OPENAI_MODEL=gpt-4o-mini +MIN_QUESTION_LENGTH=12 +MAX_CONTEXT_MESSAGES=30 +API_HOST=0.0.0.0 +API_PORT=8080 +``` + +### 3) Run API mode + +```bash +zoom-live-assistant-api +``` + +Ingest transcript line: + +```bash +curl -X POST "http://localhost:8080/ingest" \ + -H "Content-Type: application/json" \ + -d '{ + "speaker":"customer", + "text":"Can you explain how your pricing scales with usage?", + "source":"zoom" + }' +``` + +If a question is detected, response includes: + +```json +{ + "accepted": true, + "answer": { + "question": "...", + "answer": "...suggested response...", + "speaker": "customer", + "timestamp": "..." + }, + "context_size": 14 +} +``` + +### 4) Run CLI mode (stdin) + +Plain text mode: + +```bash +printf "Hello team\nCan you describe your SLA commitments?\n" | zoom-live-assistant-cli +``` + +JSONL mode: + +```bash +printf '{"speaker":"customer","text":"How fast can we migrate?"}\n' | zoom-live-assistant-cli --jsonl +``` + +## How to connect this to Zoom in practice + +Common production pattern: + +- Zoom transcript source -> your "bridge" service +- Bridge sends each transcript line to `POST /ingest` +- On non-null `answer`, display in desktop overlay or private chat panel + +You can build the bridge with: + +- Zoom meeting transcript webhooks/events (if available in your plan) +- Zoom SDK app capturing transcript events +- Audio capture pipeline + speech-to-text (Whisper/Deepgram/Azure) then forwarding text here + +## Safety guidance + +- Keep a human in the loop; do not auto-send answers to customers. +- Log all suggestions for QA and compliance. +- Add domain constraints into `ASSISTANT_SYSTEM_PROMPT`. +- Mask sensitive data before sending transcripts to external APIs where required. + +## Tests + +```bash +pytest +``` diff --git a/zoom-live-assistant/pyproject.toml b/zoom-live-assistant/pyproject.toml new file mode 100644 index 0000000..f5506a7 --- /dev/null +++ b/zoom-live-assistant/pyproject.toml @@ -0,0 +1,37 @@ +[project] +name = "zoom-live-assistant" +version = "0.1.0" +description = "Real-time Zoom transcript assistant powered by OpenAI" +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "fastapi", + "httpx", + "openai", + "pydantic-settings", + "python-dotenv", + "uvicorn[standard]", +] + +[project.optional-dependencies] +dev = [ + "pytest", +] + +[tool.pytest.ini_options] +pythonpath = ["src"] +testpaths = ["tests"] + +[project.scripts] +zoom-live-assistant-cli = "zoom_live_assistant.cli:main" +zoom-live-assistant-api = "zoom_live_assistant.api:run" + +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[tool.setuptools] +package-dir = {"" = "src"} + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/zoom-live-assistant/src/zoom_live_assistant/__init__.py b/zoom-live-assistant/src/zoom_live_assistant/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/zoom-live-assistant/src/zoom_live_assistant/api.py b/zoom-live-assistant/src/zoom_live_assistant/api.py new file mode 100644 index 0000000..b07c7ee --- /dev/null +++ b/zoom-live-assistant/src/zoom_live_assistant/api.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +import uvicorn + +from zoom_live_assistant.assistant import LiveAssistant +from zoom_live_assistant.config import get_settings +from zoom_live_assistant.models import AssistantAnswer, TranscriptEvent + +settings = get_settings() +assistant = LiveAssistant(settings) +app = FastAPI( + title="Zoom Live Assistant API", + description="Accepts transcript events and returns suggested live answers when a question is detected.", +) + + +class IngestResponse(BaseModel): + accepted: bool = True + answer: AssistantAnswer | None = None + context_size: int + + +class ZoomTranscriptEvent(BaseModel): + speaker: str = "customer" + text: str + timestamp: str | None = None + + +@app.get("/health") +def health() -> dict[str, str]: + return {"status": "ok"} + + +@app.post("/ingest", response_model=IngestResponse) +def ingest(event: TranscriptEvent) -> IngestResponse: + try: + answer = assistant.ingest_event(event) + return IngestResponse(answer=answer, context_size=assistant.context_size()) + except Exception as exc: # pragma: no cover + raise HTTPException(status_code=500, detail=f"Failed to ingest event: {exc}") from exc + + +@app.post("/zoom/transcript", response_model=IngestResponse) +def ingest_zoom_transcript(event: ZoomTranscriptEvent) -> IngestResponse: + normalized = TranscriptEvent(speaker=event.speaker, text=event.text, source="zoom") + return ingest(normalized) + + +def run() -> None: + uvicorn.run( + "zoom_live_assistant.api:app", + host=settings.api_host, + port=settings.api_port, + reload=False, + ) diff --git a/zoom-live-assistant/src/zoom_live_assistant/assistant.py b/zoom-live-assistant/src/zoom_live_assistant/assistant.py new file mode 100644 index 0000000..94e9d5c --- /dev/null +++ b/zoom-live-assistant/src/zoom_live_assistant/assistant.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from collections import deque +from typing import Deque, Optional + +from zoom_live_assistant.config import Settings +from zoom_live_assistant.llm_client import LLMClient +from zoom_live_assistant.models import AssistantAnswer, TranscriptEvent +from zoom_live_assistant.question_detector import is_question + + +class LiveAssistant: + def __init__(self, settings: Settings, llm_client: Optional[LLMClient] = None): + self._settings = settings + self._llm = llm_client or LLMClient(settings) + self._events: Deque[TranscriptEvent] = deque(maxlen=max(200, settings.max_context_messages)) + self._last_answered_question: Optional[str] = None + + def ingest_event(self, event: TranscriptEvent) -> Optional[AssistantAnswer]: + self._events.append(event) + text = event.text.strip() + if not is_question(text, min_len=self._settings.min_question_length): + return None + + # Avoid repeating answers if transcript stream re-sends identical lines. + normalized_question = " ".join(text.split()).lower() + if normalized_question == self._last_answered_question: + return None + + answer = self._llm.answer_question(question=text, context=list(self._events)) + self._last_answered_question = normalized_question + return AssistantAnswer(question=text, answer=answer, speaker=event.speaker) + + def context_size(self) -> int: + return len(self._events) + diff --git a/zoom-live-assistant/src/zoom_live_assistant/cli.py b/zoom-live-assistant/src/zoom_live_assistant/cli.py new file mode 100644 index 0000000..6397560 --- /dev/null +++ b/zoom-live-assistant/src/zoom_live_assistant/cli.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +import argparse +import json +import sys +from datetime import datetime + +from dotenv import load_dotenv + +from zoom_live_assistant.assistant import LiveAssistant +from zoom_live_assistant.config import get_settings +from zoom_live_assistant.models import TranscriptEvent + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Read transcript lines from stdin and print live answer suggestions." + ) + parser.add_argument( + "--speaker", + default="customer", + help="Default speaker name when plain text lines are provided.", + ) + parser.add_argument( + "--jsonl", + action="store_true", + help=( + "Expect JSON lines with keys: speaker, text, optional timestamp, optional source. " + "Without this flag, each input line is treated as transcript text." + ), + ) + return parser.parse_args() + + +def main() -> None: + load_dotenv() + args = parse_args() + settings = get_settings() + assistant = LiveAssistant(settings=settings) + + for raw in sys.stdin: + line = raw.strip() + if not line: + continue + + if args.jsonl: + payload = json.loads(line) + event = TranscriptEvent( + speaker=payload.get("speaker", args.speaker), + text=payload["text"], + timestamp=datetime.fromisoformat(payload["timestamp"]) + if payload.get("timestamp") + else datetime.utcnow(), + source=payload.get("source", "manual"), + ) + else: + event = TranscriptEvent(speaker=args.speaker, text=line, source="manual") + + answer = assistant.ingest_event(event) + if answer: + print("=== QUESTION DETECTED ===") + print(answer.question) + print("=== SUGGESTED ANSWER ===") + print(answer.answer) + print() + + +if __name__ == "__main__": + main() diff --git a/zoom-live-assistant/src/zoom_live_assistant/config.py b/zoom-live-assistant/src/zoom_live_assistant/config.py new file mode 100644 index 0000000..aaec3f9 --- /dev/null +++ b/zoom-live-assistant/src/zoom_live_assistant/config.py @@ -0,0 +1,33 @@ +from functools import lru_cache + +from pydantic import Field +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + openai_api_key: str = Field(..., alias="OPENAI_API_KEY") + openai_model: str = Field(default="gpt-4o-mini", alias="OPENAI_MODEL") + system_prompt: str = Field( + default=( + "You are an expert sales engineer assistant in a live customer call. " + "Provide concise, accurate suggestions the host can say out loud. " + "If uncertain, clearly state assumptions and offer a safe clarifying question." + ), + alias="ASSISTANT_SYSTEM_PROMPT", + ) + max_context_messages: int = Field(default=30, alias="MAX_CONTEXT_MESSAGES") + min_question_length: int = Field(default=12, alias="MIN_QUESTION_LENGTH") + api_host: str = Field(default="0.0.0.0", alias="API_HOST") + api_port: int = Field(default=8080, alias="API_PORT") + + model_config = SettingsConfigDict( + env_file=".env", + env_file_encoding="utf-8", + case_sensitive=False, + extra="ignore", + ) + + +@lru_cache(maxsize=1) +def get_settings() -> Settings: + return Settings() diff --git a/zoom-live-assistant/src/zoom_live_assistant/llm_client.py b/zoom-live-assistant/src/zoom_live_assistant/llm_client.py new file mode 100644 index 0000000..d0ecb77 --- /dev/null +++ b/zoom-live-assistant/src/zoom_live_assistant/llm_client.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +from typing import Sequence + +from openai import OpenAI + +from zoom_live_assistant.config import Settings +from zoom_live_assistant.models import TranscriptEvent + + +class LLMClient: + def __init__(self, settings: Settings): + self._settings = settings + self._client = OpenAI(api_key=settings.openai_api_key) + + def answer_question(self, question: str, context: Sequence[TranscriptEvent]) -> str: + context_lines = [ + f"[{event.timestamp.isoformat()}] {event.speaker}: {event.text}" for event in context + ] + context_block = "\n".join(context_lines[-self._settings.max_context_messages :]) + + user_prompt = ( + "You are helping the meeting host answer a customer in real time.\n" + "Use the transcript context below, then answer the latest customer question.\n" + "Keep it concise (max 5 bullet points) and practical.\n\n" + f"Transcript context:\n{context_block or '(no prior context)'}\n\n" + f"Latest question:\n{question}\n" + ) + + response = self._client.responses.create( + model=self._settings.openai_model, + input=[ + {"role": "system", "content": self._settings.system_prompt}, + {"role": "user", "content": user_prompt}, + ], + ) + text = getattr(response, "output_text", None) + if text: + return text.strip() + + # Backward-compatible extraction in case output_text is unavailable. + fragments: list[str] = [] + for item in getattr(response, "output", []): + for content in getattr(item, "content", []): + if getattr(content, "type", "") == "output_text": + fragments.append(getattr(content, "text", "")) + return "\n".join(part.strip() for part in fragments if part.strip()) + diff --git a/zoom-live-assistant/src/zoom_live_assistant/models.py b/zoom-live-assistant/src/zoom_live_assistant/models.py new file mode 100644 index 0000000..fbead49 --- /dev/null +++ b/zoom-live-assistant/src/zoom_live_assistant/models.py @@ -0,0 +1,19 @@ +from datetime import datetime +from typing import Literal, Optional + +from pydantic import BaseModel, Field + + +class TranscriptEvent(BaseModel): + speaker: str = Field(..., description="Speaker name or identifier") + text: str = Field(..., min_length=1, description="Transcribed utterance") + timestamp: datetime = Field(default_factory=datetime.utcnow) + source: Literal["zoom", "manual", "api"] = "api" + + +class AssistantAnswer(BaseModel): + question: str + answer: str + speaker: Optional[str] = None + timestamp: datetime = Field(default_factory=datetime.utcnow) + diff --git a/zoom-live-assistant/src/zoom_live_assistant/question_detector.py b/zoom-live-assistant/src/zoom_live_assistant/question_detector.py new file mode 100644 index 0000000..72a565d --- /dev/null +++ b/zoom-live-assistant/src/zoom_live_assistant/question_detector.py @@ -0,0 +1,50 @@ +from __future__ import annotations + +import re +from typing import Iterable, Optional + +QUESTION_STARTERS = ( + "what", + "why", + "how", + "when", + "where", + "who", + "which", + "can", + "could", + "would", + "should", + "do", + "does", + "did", + "is", + "are", + "will", + "have", + "has", +) + + +def _normalize(text: str) -> str: + return re.sub(r"\s+", " ", text.strip()) + + +def is_question(text: str, min_len: int = 12) -> bool: + cleaned = _normalize(text) + if len(cleaned) < min_len: + return False + lowered = cleaned.lower() + if lowered.endswith("?"): + return True + if any(lowered.startswith(f"{starter} ") for starter in QUESTION_STARTERS): + return True + return "can you" in lowered or "could you" in lowered + + +def newest_question(texts: Iterable[str], min_len: int = 12) -> Optional[str]: + for text in reversed(list(texts)): + if is_question(text, min_len=min_len): + return _normalize(text) + return None + diff --git a/zoom-live-assistant/tests/test_assistant.py b/zoom-live-assistant/tests/test_assistant.py new file mode 100644 index 0000000..7ac9b0b --- /dev/null +++ b/zoom-live-assistant/tests/test_assistant.py @@ -0,0 +1,36 @@ +from zoom_live_assistant.assistant import LiveAssistant +from zoom_live_assistant.config import Settings +from zoom_live_assistant.models import TranscriptEvent + + +class StubLLM: + def __init__(self): + self.calls = 0 + + def answer_question(self, question: str, context): + self.calls += 1 + return f"Answer for: {question}" + + +def _settings() -> Settings: + return Settings( + OPENAI_API_KEY="test-key", + MIN_QUESTION_LENGTH=5, + ) + + +def test_assistant_answers_new_question_and_dedupes(): + llm = StubLLM() + assistant = LiveAssistant(settings=_settings(), llm_client=llm) + + first = assistant.ingest_event( + TranscriptEvent(speaker="customer", text="Can you share your SLA details?") + ) + second = assistant.ingest_event( + TranscriptEvent(speaker="customer", text="Can you share your SLA details?") + ) + + assert first is not None + assert "SLA details" in first.answer + assert second is None + assert llm.calls == 1 diff --git a/zoom-live-assistant/tests/test_question_detector.py b/zoom-live-assistant/tests/test_question_detector.py new file mode 100644 index 0000000..5e82e6a --- /dev/null +++ b/zoom-live-assistant/tests/test_question_detector.py @@ -0,0 +1,19 @@ +from zoom_live_assistant.question_detector import is_question, newest_question + + +def test_is_question_marks_explicit_question(): + assert is_question("Can you explain your pricing tiers?") + + +def test_is_question_ignores_short_text(): + assert not is_question("Why?", min_len=10) + + +def test_newest_question_returns_latest_detected(): + texts = [ + "Hello everyone", + "This is the architecture overview", + "How do you handle failover in Redis Enterprise?", + "Great, thanks", + ] + assert newest_question(texts) == "How do you handle failover in Redis Enterprise?" From 8bb7d1d37163e096a0520da7f078bce94365f9c5 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Fri, 10 Apr 2026 04:14:06 +0000 Subject: [PATCH 2/3] Use timezone-aware UTC timestamps Co-authored-by: Kanika Gupta --- zoom-live-assistant/src/zoom_live_assistant/cli.py | 4 ++-- zoom-live-assistant/src/zoom_live_assistant/models.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/zoom-live-assistant/src/zoom_live_assistant/cli.py b/zoom-live-assistant/src/zoom_live_assistant/cli.py index 6397560..20f9a67 100644 --- a/zoom-live-assistant/src/zoom_live_assistant/cli.py +++ b/zoom-live-assistant/src/zoom_live_assistant/cli.py @@ -3,7 +3,7 @@ import argparse import json import sys -from datetime import datetime +from datetime import UTC, datetime from dotenv import load_dotenv @@ -50,7 +50,7 @@ def main() -> None: text=payload["text"], timestamp=datetime.fromisoformat(payload["timestamp"]) if payload.get("timestamp") - else datetime.utcnow(), + else datetime.now(UTC), source=payload.get("source", "manual"), ) else: diff --git a/zoom-live-assistant/src/zoom_live_assistant/models.py b/zoom-live-assistant/src/zoom_live_assistant/models.py index fbead49..f93bcf8 100644 --- a/zoom-live-assistant/src/zoom_live_assistant/models.py +++ b/zoom-live-assistant/src/zoom_live_assistant/models.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import UTC, datetime from typing import Literal, Optional from pydantic import BaseModel, Field @@ -7,7 +7,7 @@ class TranscriptEvent(BaseModel): speaker: str = Field(..., description="Speaker name or identifier") text: str = Field(..., min_length=1, description="Transcribed utterance") - timestamp: datetime = Field(default_factory=datetime.utcnow) + timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC)) source: Literal["zoom", "manual", "api"] = "api" @@ -15,5 +15,5 @@ class AssistantAnswer(BaseModel): question: str answer: str speaker: Optional[str] = None - timestamp: datetime = Field(default_factory=datetime.utcnow) + timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC)) From 44e95958bd196ef92729ca582e5864b3848de207 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Fri, 10 Apr 2026 04:53:53 +0000 Subject: [PATCH 3/3] Add Zoom webhook adapter and desktop overlay UI Co-authored-by: Kanika Gupta --- zoom-live-assistant/README.md | 45 ++++++ zoom-live-assistant/pyproject.toml | 2 + .../src/zoom_live_assistant/api.py | 114 +++++++++++++- .../src/zoom_live_assistant/config.py | 2 + .../src/zoom_live_assistant/overlay.py | 122 +++++++++++++++ .../zoom_webhook_adapter.py | 142 ++++++++++++++++++ .../tests/test_zoom_webhook_adapter.py | 46 ++++++ 7 files changed, 469 insertions(+), 4 deletions(-) create mode 100644 zoom-live-assistant/src/zoom_live_assistant/overlay.py create mode 100644 zoom-live-assistant/src/zoom_live_assistant/zoom_webhook_adapter.py create mode 100644 zoom-live-assistant/tests/test_zoom_webhook_adapter.py diff --git a/zoom-live-assistant/README.md b/zoom-live-assistant/README.md index 015fb08..8f73b37 100644 --- a/zoom-live-assistant/README.md +++ b/zoom-live-assistant/README.md @@ -35,6 +35,8 @@ MIN_QUESTION_LENGTH=12 MAX_CONTEXT_MESSAGES=30 API_HOST=0.0.0.0 API_PORT=8080 +ZOOM_WEBHOOK_SECRET=your_zoom_webhook_secret +VERIFY_ZOOM_SIGNATURES=true ``` ### 3) Run API mode @@ -55,6 +57,24 @@ curl -X POST "http://localhost:8080/ingest" \ }' ``` +Or send a Zoom webhook envelope: + +```bash +curl -X POST "http://localhost:8080/zoom/webhook" \ + -H "Content-Type: application/json" \ + -d '{ + "event":"meeting.transcript_received", + "event_ts":1739923528123, + "payload":{ + "content":{ + "transcript_segments":[ + {"user_name":"customer","text":"Can you explain your pricing model?"} + ] + } + } + }' +``` + If a question is detected, response includes: ```json @@ -92,6 +112,31 @@ Common production pattern: - Bridge sends each transcript line to `POST /ingest` - On non-null `answer`, display in desktop overlay or private chat panel +Concrete Zoom webhook path supported in this project: + +- Set Zoom Event Notification endpoint to: `https:///zoom/webhook` +- Supports Zoom challenge event `endpoint.url_validation` +- Supports signature verification via `x-zm-signature` and `x-zm-request-timestamp` +- Parses transcript payload variants from: + - `payload.content.transcript_segments[]` + - `payload.content.text` / `payload.content.transcript` + - `payload.object.transcript[]` / `payload.object.transcript_entries[]` + - `payload.object.text` + +### Desktop overlay UI + +Run the overlay window (always on top) to see answers instantly: + +```bash +zoom-live-assistant-overlay --ws-url ws://127.0.0.1:8080/ws/answers +``` + +How it works: + +- API publishes each generated answer to websocket endpoint `ws://.../ws/answers` +- Overlay subscribes and updates question + suggested answer in near-real-time +- Use this while you are on a call; keep human-in-the-loop and read/adjust responses + You can build the bridge with: - Zoom meeting transcript webhooks/events (if available in your plan) diff --git a/zoom-live-assistant/pyproject.toml b/zoom-live-assistant/pyproject.toml index f5506a7..bc2fe24 100644 --- a/zoom-live-assistant/pyproject.toml +++ b/zoom-live-assistant/pyproject.toml @@ -10,6 +10,7 @@ dependencies = [ "openai", "pydantic-settings", "python-dotenv", + "websocket-client", "uvicorn[standard]", ] @@ -25,6 +26,7 @@ testpaths = ["tests"] [project.scripts] zoom-live-assistant-cli = "zoom_live_assistant.cli:main" zoom-live-assistant-api = "zoom_live_assistant.api:run" +zoom-live-assistant-overlay = "zoom_live_assistant.overlay:main" [build-system] requires = ["setuptools>=68", "wheel"] diff --git a/zoom-live-assistant/src/zoom_live_assistant/api.py b/zoom-live-assistant/src/zoom_live_assistant/api.py index b07c7ee..501c643 100644 --- a/zoom-live-assistant/src/zoom_live_assistant/api.py +++ b/zoom-live-assistant/src/zoom_live_assistant/api.py @@ -1,15 +1,26 @@ from __future__ import annotations -from fastapi import FastAPI, HTTPException +import json +from collections import deque +from typing import Deque + +from fastapi import FastAPI, Header, HTTPException, Request, WebSocket, WebSocketDisconnect from pydantic import BaseModel import uvicorn from zoom_live_assistant.assistant import LiveAssistant from zoom_live_assistant.config import get_settings from zoom_live_assistant.models import AssistantAnswer, TranscriptEvent +from zoom_live_assistant.zoom_webhook_adapter import ( + build_zoom_validation_response, + extract_transcript_events, + verify_zoom_signature, +) settings = get_settings() assistant = LiveAssistant(settings) +recent_answers: Deque[AssistantAnswer] = deque(maxlen=100) +connected_clients: set[WebSocket] = set() app = FastAPI( title="Zoom Live Assistant API", description="Accepts transcript events and returns suggested live answers when a question is detected.", @@ -28,24 +39,119 @@ class ZoomTranscriptEvent(BaseModel): timestamp: str | None = None +class ZoomWebhookEnvelope(BaseModel): + event: str + payload: dict + event_ts: int | None = None + + +class ZoomWebhookResponse(BaseModel): + accepted: bool = True + validation: dict | None = None + ingested_events: int = 0 + answers: list[AssistantAnswer] = [] + message: str | None = None + + @app.get("/health") def health() -> dict[str, str]: return {"status": "ok"} @app.post("/ingest", response_model=IngestResponse) -def ingest(event: TranscriptEvent) -> IngestResponse: +async def ingest(event: TranscriptEvent) -> IngestResponse: try: answer = assistant.ingest_event(event) + if answer: + await _store_answer(answer) return IngestResponse(answer=answer, context_size=assistant.context_size()) except Exception as exc: # pragma: no cover raise HTTPException(status_code=500, detail=f"Failed to ingest event: {exc}") from exc @app.post("/zoom/transcript", response_model=IngestResponse) -def ingest_zoom_transcript(event: ZoomTranscriptEvent) -> IngestResponse: +async def ingest_zoom_transcript(event: ZoomTranscriptEvent) -> IngestResponse: normalized = TranscriptEvent(speaker=event.speaker, text=event.text, source="zoom") - return ingest(normalized) + return await ingest(normalized) + + +@app.post("/zoom/webhook", response_model=ZoomWebhookResponse) +async def zoom_webhook( + request: Request, + x_zm_request_timestamp: str | None = Header(default=None), + x_zm_signature: str | None = Header(default=None), +) -> ZoomWebhookResponse: + raw = await request.body() + try: + body = json.loads(raw.decode("utf-8")) + except json.JSONDecodeError as exc: + raise HTTPException(status_code=400, detail=f"Invalid JSON payload: {exc}") from exc + + envelope = ZoomWebhookEnvelope.model_validate(body) + if envelope.event == "endpoint.url_validation": + plain_token = envelope.payload.get("plainToken", "") + if not settings.zoom_webhook_secret: + raise HTTPException( + status_code=500, + detail="ZOOM_WEBHOOK_SECRET must be configured for endpoint.url_validation.", + ) + validation = build_zoom_validation_response( + plain_token=plain_token, webhook_secret=settings.zoom_webhook_secret + ) + return ZoomWebhookResponse(validation=validation.model_dump(), message="url validated") + + if settings.verify_zoom_signatures: + if not settings.zoom_webhook_secret: + raise HTTPException( + status_code=500, + detail="ZOOM_WEBHOOK_SECRET must be configured when VERIFY_ZOOM_SIGNATURES is true.", + ) + if not verify_zoom_signature( + raw_body=raw, + request_timestamp=x_zm_request_timestamp, + zoom_signature=x_zm_signature, + webhook_secret=settings.zoom_webhook_secret, + ): + raise HTTPException(status_code=401, detail="Invalid Zoom webhook signature.") + + events = extract_transcript_events(body) + answers: list[AssistantAnswer] = [] + for event in events: + answer = assistant.ingest_event(event) + if answer: + answers.append(answer) + await _store_answer(answer) + return ZoomWebhookResponse(ingested_events=len(events), answers=answers) + + +@app.websocket("/ws/answers") +async def answers_ws(websocket: WebSocket) -> None: + await websocket.accept() + connected_clients.add(websocket) + try: + for answer in recent_answers: + await websocket.send_json(answer.model_dump(mode="json")) + while True: + # Keep alive; clients do not need to send data. + await websocket.receive_text() + except WebSocketDisconnect: + pass + finally: + connected_clients.discard(websocket) + + +async def _store_answer(answer: AssistantAnswer) -> None: + recent_answers.append(answer) + payload = answer.model_dump(mode="json") + stale_clients: list[WebSocket] = [] + for client in list(connected_clients): + try: + # Best effort push to live overlays. + await client.send_json(payload) + except Exception: + stale_clients.append(client) + for stale in stale_clients: + connected_clients.discard(stale) def run() -> None: diff --git a/zoom-live-assistant/src/zoom_live_assistant/config.py b/zoom-live-assistant/src/zoom_live_assistant/config.py index aaec3f9..89b904c 100644 --- a/zoom-live-assistant/src/zoom_live_assistant/config.py +++ b/zoom-live-assistant/src/zoom_live_assistant/config.py @@ -19,6 +19,8 @@ class Settings(BaseSettings): min_question_length: int = Field(default=12, alias="MIN_QUESTION_LENGTH") api_host: str = Field(default="0.0.0.0", alias="API_HOST") api_port: int = Field(default=8080, alias="API_PORT") + zoom_webhook_secret: str = Field(default="", alias="ZOOM_WEBHOOK_SECRET") + verify_zoom_signatures: bool = Field(default=True, alias="VERIFY_ZOOM_SIGNATURES") model_config = SettingsConfigDict( env_file=".env", diff --git a/zoom-live-assistant/src/zoom_live_assistant/overlay.py b/zoom-live-assistant/src/zoom_live_assistant/overlay.py new file mode 100644 index 0000000..be8c26f --- /dev/null +++ b/zoom-live-assistant/src/zoom_live_assistant/overlay.py @@ -0,0 +1,122 @@ +from __future__ import annotations + +import argparse +import json +import queue +import threading +import tkinter as tk +from tkinter import ttk + +import websocket + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Desktop overlay for live answer suggestions." + ) + parser.add_argument( + "--ws-url", + default="ws://127.0.0.1:8080/ws/answers", + help="WebSocket URL served by zoom-live-assistant-api", + ) + parser.add_argument( + "--title", + default="Live Call Assistant", + help="Window title text.", + ) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + incoming: "queue.Queue[dict]" = queue.Queue() + status = {"connected": False, "error": ""} + + def on_open(_: websocket.WebSocketApp) -> None: + status["connected"] = True + status["error"] = "" + + def on_message(_: websocket.WebSocketApp, message: str) -> None: + try: + payload = json.loads(message) + if isinstance(payload, dict): + incoming.put(payload) + except json.JSONDecodeError: + pass + + def on_close(_: websocket.WebSocketApp, __, ___) -> None: + status["connected"] = False + + def on_error(_: websocket.WebSocketApp, error: Exception) -> None: + status["connected"] = False + status["error"] = str(error) + + ws_app = websocket.WebSocketApp( + args.ws_url, + on_open=on_open, + on_message=on_message, + on_close=on_close, + on_error=on_error, + ) + ws_thread = threading.Thread( + target=lambda: ws_app.run_forever(ping_interval=20, ping_timeout=10), + daemon=True, + ) + ws_thread.start() + + root = tk.Tk() + root.title(args.title) + root.geometry("640x360") + root.attributes("-topmost", True) + + container = ttk.Frame(root, padding=12) + container.pack(fill=tk.BOTH, expand=True) + + connection_var = tk.StringVar(value=f"Connecting to {args.ws_url}...") + question_var = tk.StringVar(value="Question: (waiting)") + + ttk.Label(container, textvariable=connection_var).pack(anchor="w") + ttk.Label(container, textvariable=question_var, wraplength=600).pack( + anchor="w", pady=(8, 8) + ) + answer_box = tk.Text(container, height=12, wrap=tk.WORD) + answer_box.pack(fill=tk.BOTH, expand=True) + answer_box.insert("1.0", "Waiting for suggested answers...") + answer_box.config(state=tk.DISABLED) + + def set_answer(text: str) -> None: + answer_box.config(state=tk.NORMAL) + answer_box.delete("1.0", tk.END) + answer_box.insert("1.0", text) + answer_box.config(state=tk.DISABLED) + + def tick() -> None: + if status["connected"]: + connection_var.set(f"Connected: {args.ws_url}") + elif status["error"]: + connection_var.set(f"Disconnected: {status['error']}") + else: + connection_var.set(f"Disconnected: {args.ws_url}") + + while not incoming.empty(): + payload = incoming.get() + question = payload.get("question", "") + answer = payload.get("answer", "") + speaker = payload.get("speaker", "customer") + question_var.set(f"Question ({speaker}): {question or '(n/a)'}") + set_answer(answer or "(empty answer)") + root.after(200, tick) + + def on_close_window() -> None: + try: + ws_app.close() + finally: + root.destroy() + + root.protocol("WM_DELETE_WINDOW", on_close_window) + root.after(200, tick) + root.mainloop() + + +if __name__ == "__main__": + main() diff --git a/zoom-live-assistant/src/zoom_live_assistant/zoom_webhook_adapter.py b/zoom-live-assistant/src/zoom_live_assistant/zoom_webhook_adapter.py new file mode 100644 index 0000000..a43e17f --- /dev/null +++ b/zoom-live-assistant/src/zoom_live_assistant/zoom_webhook_adapter.py @@ -0,0 +1,142 @@ +from __future__ import annotations + +import hashlib +import hmac +from datetime import UTC, datetime +from typing import Any + +from pydantic import BaseModel + +from zoom_live_assistant.models import TranscriptEvent + + +class ZoomValidationResponse(BaseModel): + plainToken: str + encryptedToken: str + + +def build_zoom_validation_response(plain_token: str, webhook_secret: str) -> ZoomValidationResponse: + encrypted_token = hmac.new( + webhook_secret.encode("utf-8"), + plain_token.encode("utf-8"), + hashlib.sha256, + ).hexdigest() + return ZoomValidationResponse(plainToken=plain_token, encryptedToken=encrypted_token) + + +def verify_zoom_signature( + raw_body: bytes, + request_timestamp: str | None, + zoom_signature: str | None, + webhook_secret: str, +) -> bool: + if not request_timestamp or not zoom_signature or not webhook_secret: + return False + # The signature format is documented by Zoom: v0=HMAC_SHA256("v0:{ts}:{raw_body}") + message = b"v0:" + request_timestamp.encode("utf-8") + b":" + raw_body + expected = "v0=" + hmac.new( + webhook_secret.encode("utf-8"), + message, + hashlib.sha256, + ).hexdigest() + return hmac.compare_digest(expected, zoom_signature) + + +def _parse_timestamp(value: Any) -> datetime: + if isinstance(value, str) and value: + try: + return datetime.fromisoformat(value.replace("Z", "+00:00")) + except ValueError: + pass + if isinstance(value, (int, float)): + # Zoom sometimes emits milliseconds timestamps. + ts = value / 1000 if value > 10_000_000_000 else value + return datetime.fromtimestamp(ts, tz=UTC) + return datetime.now(UTC) + + +def extract_transcript_events(body: dict[str, Any]) -> list[TranscriptEvent]: + payload = body.get("payload", {}) + obj = payload.get("object", {}) + content = payload.get("content", {}) + events: list[TranscriptEvent] = [] + + # RTMS-style transcript chunks (common in real-time streams) + for segment in content.get("transcript_segments", []) or []: + text = ( + segment.get("text") + or segment.get("transcript") + or segment.get("utterance") + or "" + ).strip() + if not text: + continue + speaker = ( + segment.get("user_name") + or segment.get("speaker_name") + or segment.get("speaker") + or "customer" + ) + events.append( + TranscriptEvent( + speaker=speaker, + text=text, + timestamp=_parse_timestamp( + segment.get("timestamp") + or segment.get("ts") + or body.get("event_ts") + ), + source="zoom", + ) + ) + + # RTMS callback style where transcript text may exist directly in content. + direct_text = ( + content.get("text") + or content.get("transcript") + or content.get("utterance") + or "" + ).strip() + if direct_text: + direct_speaker = ( + content.get("user_name") + or content.get("speaker_name") + or content.get("speaker") + or "customer" + ) + events.append( + TranscriptEvent( + speaker=direct_speaker, + text=direct_text, + timestamp=_parse_timestamp(content.get("timestamp") or body.get("event_ts")), + source="zoom", + ) + ) + + # Alternate webhook payload style with nested object transcription arrays. + for segment in obj.get("transcript", []) or obj.get("transcript_entries", []) or []: + text = (segment.get("text") or "").strip() + if not text: + continue + events.append( + TranscriptEvent( + speaker=segment.get("speaker") or "customer", + text=text, + timestamp=_parse_timestamp(segment.get("timestamp") or body.get("event_ts")), + source="zoom", + ) + ) + + # Fallback object-level text style. + obj_text = (obj.get("text") or obj.get("transcript") or "").strip() + if obj_text: + events.append( + TranscriptEvent( + speaker=obj.get("participant_name") or obj.get("speaker") or "customer", + text=obj_text, + timestamp=_parse_timestamp(obj.get("timestamp") or body.get("event_ts")), + source="zoom", + ) + ) + + return events diff --git a/zoom-live-assistant/tests/test_zoom_webhook_adapter.py b/zoom-live-assistant/tests/test_zoom_webhook_adapter.py new file mode 100644 index 0000000..925a177 --- /dev/null +++ b/zoom-live-assistant/tests/test_zoom_webhook_adapter.py @@ -0,0 +1,46 @@ +import hashlib +import hmac +import json + +from zoom_live_assistant.zoom_webhook_adapter import ( + build_zoom_validation_response, + extract_transcript_events, + verify_zoom_signature, +) + + +def test_build_zoom_validation_response(): + response = build_zoom_validation_response("plain-token", "secret") + expected = hmac.new(b"secret", b"plain-token", hashlib.sha256).hexdigest() + assert response.plainToken == "plain-token" + assert response.encryptedToken == expected + + +def test_verify_zoom_signature_matches_raw_body(): + body = {"event": "meeting.transcript_received", "payload": {"content": {"text": "hello"}}} + raw = json.dumps(body, separators=(",", ":")).encode("utf-8") + ts = "1739923528" + digest = hmac.new(b"secret", b"v0:" + ts.encode("utf-8") + b":" + raw, hashlib.sha256).hexdigest() + signature = f"v0={digest}" + assert verify_zoom_signature(raw, ts, signature, "secret") + + +def test_extract_transcript_events_from_rtms_segments(): + payload = { + "event": "meeting.transcript_received", + "event_ts": 1739923528123, + "payload": { + "content": { + "transcript_segments": [ + {"user_name": "Alice", "text": "Can you explain your DR story?"}, + {"user_name": "Bob", "text": "Sure, absolutely."}, + ] + } + }, + } + events = extract_transcript_events(payload) + assert len(events) == 2 + assert events[0].speaker == "Alice" + assert events[0].text == "Can you explain your DR story?" + assert events[0].source == "zoom" +