From 6bb5789c698a123620a05c0fc9384102d045aff6 Mon Sep 17 00:00:00 2001 From: Ivan Cheung Date: Sun, 7 Jun 2026 22:45:44 +0000 Subject: [PATCH] feat(adk-agent): standalone Google-ADK agent that listens + responds in the console MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds integrations/adk-agent/ — a standalone Python (Google ADK) agent that long-polls one termchart viewer console scope and, on each human message, renders/updates the diagram AND posts a short text reply into the log. An alternative to driving termchart from a coding harness via the CLI. - Speaks the existing viewer HTTP API only (no changes under packages/*): GET/POST /inbox, push/patch/suggest/status/focus. - Loop-back guard: records the seq of each posted reply and skips it on read (mirrors the browser console's seenSeqs) so the agent never answers itself. - CLI-parity resilience: long-poll from cursor, backoff 1s->15s on transient errors, exit on 4xx, forward-progress guard, clean SIGINT. - Real (non-peek) reads, so the human sees "Read by the agent ✓". - Vertex AI via ADC by default; GCP project read from env only, never hardcoded. - Tests are hermetic (mocked HTTP + scripted runner; no network/LLM/sleep): 43 passing, ruff + mypy clean. New path-filtered CI workflow. --- .github/workflows/adk-agent.yml | 36 +++ AGENTS.md | 2 + README.md | 4 +- integrations/adk-agent/.env.example | 27 ++ integrations/adk-agent/.gitignore | 9 + integrations/adk-agent/README.md | 96 ++++++++ integrations/adk-agent/pyproject.toml | 57 +++++ .../adk-agent/src/termchart_adk/__init__.py | 20 ++ .../adk-agent/src/termchart_adk/agent.py | 72 ++++++ .../adk-agent/src/termchart_adk/client.py | 232 ++++++++++++++++++ .../adk-agent/src/termchart_adk/config.py | 89 +++++++ .../adk-agent/src/termchart_adk/loop.py | 109 ++++++++ .../adk-agent/src/termchart_adk/main.py | 109 ++++++++ .../adk-agent/src/termchart_adk/models.py | 52 ++++ .../adk-agent/src/termchart_adk/prompts.py | 66 +++++ .../adk-agent/src/termchart_adk/py.typed | 0 .../adk-agent/src/termchart_adk/tools.py | 109 ++++++++ integrations/adk-agent/tests/__init__.py | 0 integrations/adk-agent/tests/conftest.py | 40 +++ integrations/adk-agent/tests/fakes.py | 191 ++++++++++++++ .../adk-agent/tests/test_agent_turn.py | 55 +++++ integrations/adk-agent/tests/test_client.py | 123 ++++++++++ integrations/adk-agent/tests/test_config.py | 79 ++++++ .../adk-agent/tests/test_loop_dedupe.py | 49 ++++ .../adk-agent/tests/test_loop_resilience.py | 77 ++++++ integrations/adk-agent/tests/test_tools.py | 88 +++++++ 26 files changed, 1790 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/adk-agent.yml create mode 100644 integrations/adk-agent/.env.example create mode 100644 integrations/adk-agent/.gitignore create mode 100644 integrations/adk-agent/README.md create mode 100644 integrations/adk-agent/pyproject.toml create mode 100644 integrations/adk-agent/src/termchart_adk/__init__.py create mode 100644 integrations/adk-agent/src/termchart_adk/agent.py create mode 100644 integrations/adk-agent/src/termchart_adk/client.py create mode 100644 integrations/adk-agent/src/termchart_adk/config.py create mode 100644 integrations/adk-agent/src/termchart_adk/loop.py create mode 100644 integrations/adk-agent/src/termchart_adk/main.py create mode 100644 integrations/adk-agent/src/termchart_adk/models.py create mode 100644 integrations/adk-agent/src/termchart_adk/prompts.py create mode 100644 integrations/adk-agent/src/termchart_adk/py.typed create mode 100644 integrations/adk-agent/src/termchart_adk/tools.py create mode 100644 integrations/adk-agent/tests/__init__.py create mode 100644 integrations/adk-agent/tests/conftest.py create mode 100644 integrations/adk-agent/tests/fakes.py create mode 100644 integrations/adk-agent/tests/test_agent_turn.py create mode 100644 integrations/adk-agent/tests/test_client.py create mode 100644 integrations/adk-agent/tests/test_config.py create mode 100644 integrations/adk-agent/tests/test_loop_dedupe.py create mode 100644 integrations/adk-agent/tests/test_loop_resilience.py create mode 100644 integrations/adk-agent/tests/test_tools.py diff --git a/.github/workflows/adk-agent.yml b/.github/workflows/adk-agent.yml new file mode 100644 index 0000000..1883fb6 --- /dev/null +++ b/.github/workflows/adk-agent.yml @@ -0,0 +1,36 @@ +name: adk-agent + +# Lint + type-check + unit-test the standalone Python ADK agent. Separate from the Node CI +# (ci.yml) and the corpus job (corpus.yml); path-filtered so it only runs when the agent changes. +# Unit-only: the tests mock HTTP and the runner, so there's no network or model call. +on: + pull_request: + paths: ["integrations/adk-agent/**", ".github/workflows/adk-agent.yml"] + workflow_dispatch: + +jobs: + lint-type-test: + runs-on: ubuntu-latest + timeout-minutes: 10 # unit-only; cap so a hung runner fails fast instead of the 360m default + defaults: + run: + working-directory: integrations/adk-agent + steps: + - uses: actions/checkout@v4 + + - name: Set up uv + Python 3.11 + uses: astral-sh/setup-uv@v5 + with: + python-version: "3.11" + + - name: Install + run: uv sync + + - name: Lint (ruff) + run: uv run ruff check . + + - name: Type-check (mypy) + run: uv run mypy src + + - name: Test (pytest) + run: uv run pytest -q diff --git a/AGENTS.md b/AGENTS.md index 5b14412..9af0fd4 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -61,3 +61,5 @@ sequence/ER/class/state graphs) from the formats + copy-paste examples here: | a live rich visual on a screen / iPad | `termchart push --type …` | | show activity the instant work starts | `termchart begin …` (placeholder + focus + loader) | | narrate long work (toasts / progress / loaders) | `termchart status …` | +| read the human's console feedback / clicks | `termchart inbox --follow` (or `--wait`) | +| auto-respond to a scope's console without a harness | run the standalone Python agent in [`integrations/adk-agent/`](integrations/adk-agent/) (Google ADK) | diff --git a/README.md b/README.md index 8f26387..6404a21 100644 --- a/README.md +++ b/README.md @@ -217,7 +217,9 @@ push failed: HTTP 400 — panes[0].content must be a JSON string (got object) route polylines, for location/route views right inside a dashboard. - **Experimental agent↔human console** (off by default) — flip the "Console (beta)" toggle to send the agent feedback / commands and click agent-pushed suggestion chips; the agent reads them back - with `termchart inbox` and offers choices with `termchart suggest`. + with `termchart inbox` and offers choices with `termchart suggest`. Instead of a coding harness, + you can also run a **standalone Python agent** that listens to a scope's console and responds by + rendering + replying — see [`integrations/adk-agent/`](integrations/adk-agent/) (Google ADK). - Flows re-fit on resize (iPad rotation), carry legends, and use measured layout; a server-side **geometry lint** flags edges that run over or too close to nodes so tangled graphs get fixed. - **Durable & retrievable.** Views live server-side, so a *new* agent session can pick up where the diff --git a/integrations/adk-agent/.env.example b/integrations/adk-agent/.env.example new file mode 100644 index 0000000..b441432 --- /dev/null +++ b/integrations/adk-agent/.env.example @@ -0,0 +1,27 @@ +# Copy to .env and fill in. Never commit a real .env (it's gitignored). + +# --- termchart viewer (same contract as the CLI) ----------------------------- +# Viewer base URL, INCLUDING the /w/ workspace segment. +TERMCHART_VIEWER_URL=http://localhost:8080/w/me +# Bearer token for push/patch/suggest/status. If unset, the agent can still post +# text replies (the inbox POST is tokenless) but cannot render diagrams. +TERMCHART_VIEWER_TOKEN= +# The single scope this agent listens to and responds in. +TERMCHART_PROJECT=demo +TERMCHART_AGENT=driver + +# --- model backend (Vertex AI via ADC is the default) ------------------------ +# Run `gcloud auth application-default login` once for ADC. +GOOGLE_GENAI_USE_VERTEXAI=1 +# Read from YOUR environment / ADC — never hardcode or commit a real project id. +GOOGLE_CLOUD_PROJECT=your-gcp-project +GOOGLE_CLOUD_LOCATION=us-central1 +# Fallback if you set GOOGLE_GENAI_USE_VERTEXAI=0 (no GCP): +# GEMINI_API_KEY= + +# --- optional knobs ---------------------------------------------------------- +# Override the model (flash-tier is the default, for low latency). +# TERMCHART_ADK_MODEL=gemini-2.5-flash +# TERMCHART_ADK_POLL_WAIT_MS=25000 +# TERMCHART_ADK_REQUEST_TIMEOUT_MS=30000 +# TERMCHART_ADK_BACKOFF_MAX_MS=15000 diff --git a/integrations/adk-agent/.gitignore b/integrations/adk-agent/.gitignore new file mode 100644 index 0000000..754d2cb --- /dev/null +++ b/integrations/adk-agent/.gitignore @@ -0,0 +1,9 @@ +.env +.venv/ +__pycache__/ +*.pyc +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ +uv.lock +dist/ diff --git a/integrations/adk-agent/README.md b/integrations/adk-agent/README.md new file mode 100644 index 0000000..03eebe9 --- /dev/null +++ b/integrations/adk-agent/README.md @@ -0,0 +1,96 @@ +# termchart ADK agent + +A standalone [Google ADK](https://github.com/google/adk-python) agent that **listens to one +termchart viewer console scope and responds** — by rendering/updating the diagram on the canvas +**and** posting a short text reply in the console log. + +It's an alternative to driving termchart from a coding harness (Claude Code / agy) via the +`termchart` CLI: instead of a harness reading the inbox and pushing diagrams, this is a small, +purpose-built Python process that does it for one scope. Use it as a runnable reference and +customize the prompt + tools for your own agent. + +## How it works + +``` +human types in the console ──▶ GET /inbox (long-poll) ──▶ ADK agent (Gemini) runs a turn + │ render_diagram / patch_diagram + │ suggest_chips / set_status ──▶ canvas + ▼ + incoming text reply ◀── POST /inbox (kind=message) ── the agent's final answer +``` + +- The listener long-polls `GET /inbox` for its `project/agent` scope (the same endpoint the CLI's + `inbox --follow` uses), with the CLI's resilience: back off 1s→2s→…→15s on transient errors, + exit on a 4xx, advance the cursor each healthy round. +- Each human message (or clicked chip) runs **one ADK turn**. The model calls tools that hit the + viewer's HTTP API to render/patch/suggest/status, then its final text is posted back into the + console as a reply (a tokenless `POST /inbox`). +- **Reading the inbox marks it read** (no `peek`), so the human sees *"Read by the agent ✓"*. +- **Loop-back guard:** the agent's own replies land in the same inbox it polls, so it records each + reply's `seq` and skips it on the next read — it never responds to itself. +- **Scope discipline:** it only ever acts on its one configured scope. + +## Prerequisites + +- Python 3.11+ and [`uv`](https://docs.astral.sh/uv/). +- A running termchart viewer (`termchart serve`, or a deployed `…/w/` workspace). +- For the default Vertex AI backend: Application Default Credentials — + `gcloud auth application-default login` — plus a GCP project/location. + +## Setup + +```bash +cd integrations/adk-agent +cp .env.example .env # then edit .env +uv sync +``` + +Fill in `.env`: + +| Variable | Required | Notes | +|---|---|---| +| `TERMCHART_VIEWER_URL` | yes | Viewer base **including** `/w/` | +| `TERMCHART_VIEWER_TOKEN` | for rendering | Bearer token; unset ⇒ the agent can only post text replies | +| `TERMCHART_PROJECT` / `TERMCHART_AGENT` | yes | The single scope to listen on | +| `GOOGLE_GENAI_USE_VERTEXAI` | `1` (default) | Use Vertex AI | +| `GOOGLE_CLOUD_PROJECT` | for Vertex | **Read from your env via ADC — never hardcode/commit a real id** | +| `GOOGLE_CLOUD_LOCATION` | for Vertex | e.g. `us-central1` | +| `GEMINI_API_KEY` | fallback | Used only if `GOOGLE_GENAI_USE_VERTEXAI=0` | +| `TERMCHART_ADK_MODEL` | no | Defaults to a flash-tier model (low latency); override freely | + +## Run + +```bash +uv run termchart-adk +``` + +You'll see a redacted startup line (`scope=… · Vertex AI · project=… · model=…`) and +`following …`. Open the viewer, toggle the console on, select the same scope, and type a +message — the diagram appears and a reply lands in the log. + +`--project` / `--agent` / `--model` override the env; `--once` runs a single poll round (for +smoke testing). + +## Customize + +- **`src/termchart_adk/prompts.py`** — the system instruction + the compact diagram cheat-sheet. +- **`src/termchart_adk/tools.py`** — the tools the model can call. Add your own (each is a typed + async function with a docstring) and they become available to the agent. + +## Develop + +```bash +uv run pytest # fast, hermetic (mocked HTTP + a scripted runner; no network, no LLM) +uv run ruff check . +uv run mypy src +``` + +## Troubleshooting + +- **"no TERMCHART_VIEWER_TOKEN configured"** — set the token to enable rendering; without it the + agent still posts text replies. +- **ADC errors at the first model call** — run `gcloud auth application-default login` and set + `GOOGLE_CLOUD_PROJECT` / `GOOGLE_CLOUD_LOCATION` (or switch to `GEMINI_API_KEY`). +- **`config: …` then exit** — a required variable is missing; the message names it. +- **`reconnecting in Ns…`** — the viewer was briefly unreachable; the listener retries with + backoff and recovers on its own. diff --git a/integrations/adk-agent/pyproject.toml b/integrations/adk-agent/pyproject.toml new file mode 100644 index 0000000..9134416 --- /dev/null +++ b/integrations/adk-agent/pyproject.toml @@ -0,0 +1,57 @@ +[project] +name = "termchart-adk-agent" +version = "0.1.0" +description = "A standalone Google-ADK agent that listens to a termchart viewer console and responds by rendering diagrams and posting text replies." +readme = "README.md" +requires-python = ">=3.11" +license = "MIT" +dependencies = [ + "google-adk>=1.26,<2", + "google-genai>=1.0", + "httpx>=0.27", + "pydantic>=2", + "pydantic-settings>=2", +] + +[project.scripts] +termchart-adk = "termchart_adk.main:main" + +[dependency-groups] +dev = [ + "pytest>=8", + "pytest-asyncio>=0.23", + "ruff>=0.6", + "mypy>=1.8", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/termchart_adk"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] + +[tool.ruff] +line-length = 120 +target-version = "py311" +src = ["src", "tests"] + +[tool.ruff.lint] +select = ["E", "F", "I", "UP", "B", "ASYNC"] + +[tool.mypy] +python_version = "3.11" +mypy_path = "src" +plugins = ["pydantic.mypy"] +disallow_untyped_defs = true +warn_redundant_casts = true +warn_unused_ignores = true +no_implicit_optional = true + +[[tool.mypy.overrides]] +module = ["google.*"] +ignore_missing_imports = true diff --git a/integrations/adk-agent/src/termchart_adk/__init__.py b/integrations/adk-agent/src/termchart_adk/__init__.py new file mode 100644 index 0000000..276d6de --- /dev/null +++ b/integrations/adk-agent/src/termchart_adk/__init__.py @@ -0,0 +1,20 @@ +"""termchart_adk — a standalone Google-ADK agent that listens to a termchart viewer +console scope and responds by rendering diagrams and posting a short text reply. + +Single source of truth for the package version is ``__version__`` below. +""" + +from .config import Settings +from .models import InboxEvent, InboxResponse, RenderResult, ReplyResult, SuggestItem + +__version__ = "0.1.0" + +__all__ = [ + "Settings", + "InboxEvent", + "InboxResponse", + "RenderResult", + "ReplyResult", + "SuggestItem", + "__version__", +] diff --git a/integrations/adk-agent/src/termchart_adk/agent.py b/integrations/adk-agent/src/termchart_adk/agent.py new file mode 100644 index 0000000..e9eaf9c --- /dev/null +++ b/integrations/adk-agent/src/termchart_adk/agent.py @@ -0,0 +1,72 @@ +"""Google-ADK agent construction + per-scope session + one-turn execution. + +Credentials are resolved by the Google SDK from the environment (Vertex when +GOOGLE_GENAI_USE_VERTEXAI=1 + project/location; otherwise GEMINI_API_KEY). This module never +touches credentials — it only passes the configured model id. +""" + +from __future__ import annotations + +import logging +from typing import Any + +from google.adk.agents import LlmAgent +from google.adk.apps import App +from google.adk.runners import InMemoryRunner +from google.genai import errors as genai_errors +from google.genai import types + +from .config import Settings +from .prompts import SYSTEM_INSTRUCTION + +logger = logging.getLogger("termchart_adk") + +# ADK requires the App name to be a valid identifier (letters, digits, underscores — no hyphen). +APP_NAME = "termchart_adk" +USER_ID = "human" + + +def build_agent(settings: Settings, tools: list[Any]) -> LlmAgent: + """Construct the LLM agent for the configured model with the termchart tools attached.""" + return LlmAgent( + name="termchart_driver", + model=settings.model, + instruction=SYSTEM_INSTRUCTION, + tools=tools, + ) + + +def build_runner(agent: LlmAgent) -> InMemoryRunner: + """Wrap the agent in an in-memory runner (in-memory session + artifact services).""" + return InMemoryRunner(app=App(name=APP_NAME, root_agent=agent)) + + +async def ensure_session(runner: InMemoryRunner, *, session_id: str) -> None: + """Create the per-scope session once (idempotent) so the agent keeps memory across messages.""" + existing = await runner.session_service.get_session( + app_name=runner.app_name, user_id=USER_ID, session_id=session_id + ) + if existing is None: + await runner.session_service.create_session( + app_name=runner.app_name, user_id=USER_ID, session_id=session_id + ) + + +async def run_turn(runner: InMemoryRunner, *, session_id: str, text: str) -> str: + """Run one turn: feed the human's message in, let the agent call tools, return its final text.""" + message = types.Content(role="user", parts=[types.Part(text=text)]) + chunks: list[str] = [] + try: + async for event in runner.run_async( + user_id=USER_ID, session_id=session_id, new_message=message + ): + if event.is_final_response() and event.content and event.content.parts: + for part in event.content.parts: + if part.text: + chunks.append(part.text) + except genai_errors.APIError: + # A model/quota error on a single turn must not kill the long-running listener; log it + # deeply and reply with a graceful note the human sees in the console. + logger.exception("model error during turn") + return "⚠️ I hit a model error and couldn't finish that — please try again." + return "".join(chunks).strip() diff --git a/integrations/adk-agent/src/termchart_adk/client.py b/integrations/adk-agent/src/termchart_adk/client.py new file mode 100644 index 0000000..9cde371 --- /dev/null +++ b/integrations/adk-agent/src/termchart_adk/client.py @@ -0,0 +1,232 @@ +"""Async HTTP client over the termchart viewer endpoints — the only module that talks to the viewer. + +Mirrors the CLI's transport semantics (see packages/cli/src/viewer-detect.ts): transport failures +and 5xx are transient (the loop retries with backoff); 4xx is permanent (the loop exits). Token-gated +writes (push/patch/suggest/status/focus) send a Bearer header; inbox read + inbox post are URL-gated. +""" + +from __future__ import annotations + +import json +from collections.abc import Mapping + +import httpx +from pydantic import ValidationError + +from .models import InboxResponse, RenderResult, ReplyResult, SuggestItem + +# Diagram types whose `content` must be a JSON string (the others are freeform text). +JSON_DIAGRAM_TYPES = frozenset({"flow", "component", "vegalite", "panes"}) + +# Server-side validity caps (packages/viewer/src/server.ts). +MAX_SUGGEST_ITEMS = 12 +MAX_SUGGEST_LABEL = 120 + + +class ViewerError(Exception): + """Base class for viewer call failures.""" + + +class ViewerTransientError(ViewerError): + """A retryable failure: a transport error, a 5xx, or an unparseable body.""" + + +class ViewerPermanentError(ViewerError): + """A non-retryable failure: a 4xx (bad request / config) that won't fix on retry.""" + + +class TermchartClient: + """A thin, typed wrapper around one viewer workspace + scope.""" + + def __init__( + self, + *, + base_url: str, + token: str | None, + project: str, + agent: str, + http: httpx.AsyncClient, + reply_max_chars: int = 2000, + ) -> None: + self._base = base_url.rstrip("/") + self._token = token + self._project = project + self._agent = agent + self._http = http + self._reply_max = reply_max_chars + + @property + def has_token(self) -> bool: + return bool(self._token) + + def _auth_headers(self) -> dict[str, str]: + return {"authorization": f"Bearer {self._token}"} if self._token else {} + + # --- reads ---------------------------------------------------------------- + async def read_inbox(self, *, since: int, wait_ms: int, peek: bool = False) -> InboxResponse: + """Long-poll the console inbox for this scope. + + A real read (``peek=False``) advances the server's ack and broadcasts ``inbox-read``, + flipping the human's "Read by the agent ✓" receipt — that is intentional. + """ + params = { + "project": self._project, + "agent": self._agent, + "since": str(since), + "wait": str(wait_ms), + } + if peek: + params["peek"] = "1" + try: + r = await self._http.get( + f"{self._base}/inbox", params=params, headers=self._auth_headers() + ) + except httpx.TransportError as e: + raise ViewerTransientError(f"inbox request failed: {e}") from e + if r.status_code >= 500: + raise ViewerTransientError(f"inbox HTTP {r.status_code}") + if r.status_code >= 400: + raise ViewerPermanentError(f"inbox HTTP {r.status_code}: {r.text[:200].strip()}") + try: + return InboxResponse.model_validate(r.json()) + except (json.JSONDecodeError, ValidationError) as e: + raise ViewerTransientError(f"inbox returned an unexpected response: {e}") from e + + # --- the text reply (tokenless POST /inbox) ------------------------------- + async def post_message(self, *, text: str) -> ReplyResult: + """Post a text reply into the console log. Returns the created event (its ``seq`` is the + loop-back guard). Tokenless, like the browser console's send.""" + body = { + "project": self._project, + "agent": self._agent, + "kind": "message", + "text": text[: self._reply_max], + } + try: + r = await self._http.post(f"{self._base}/inbox", json=body) + except httpx.TransportError as e: + raise ViewerTransientError(f"post reply failed: {e}") from e + if r.status_code >= 500: + raise ViewerTransientError(f"post reply HTTP {r.status_code}") + if r.status_code >= 400: + raise ViewerPermanentError(f"post reply HTTP {r.status_code}: {r.text[:200].strip()}") + try: + return ReplyResult.model_validate(r.json()) + except (json.JSONDecodeError, ValidationError) as e: + raise ViewerTransientError(f"post reply returned an unexpected response: {e}") from e + + # --- token-gated writes --------------------------------------------------- + async def push( + self, *, diagram_type: str, content: str, description: str, focus: bool + ) -> RenderResult: + """Render a typed diagram (POST /push). Switches the viewer to this scope when ``focus``.""" + if not self._token: + return _no_token("render diagrams") + if diagram_type in JSON_DIAGRAM_TYPES: + try: + json.loads(content) + except json.JSONDecodeError as e: + return RenderResult(ok=False, error=f"content is not valid JSON: {e}") + body = { + "project": self._project, + "agent": self._agent, + "type": diagram_type, + "content": content, + "description": description, + } + res = await self._write(f"{self._base}/push", body) + if res.ok and focus: + await self._focus() + return res + + async def patch(self, *, ops: list[object]) -> RenderResult: + """Apply incremental ops to the current view in place (POST /patch).""" + if not self._token: + return _no_token("patch diagrams") + body = {"project": self._project, "agent": self._agent, "ops": ops} + res = await self._write(f"{self._base}/patch", body) + if not res.ok and res.error and res.error.startswith("HTTP 404"): + return RenderResult( + ok=False, error="no diagram to patch yet — render one first with render_diagram." + ) + return res + + async def suggest(self, *, items: list[SuggestItem]) -> RenderResult: + """Push clickable suggestion chips (POST /suggest).""" + if not self._token: + return _no_token("offer chips") + if len(items) > MAX_SUGGEST_ITEMS: + return RenderResult(ok=False, error=f"at most {MAX_SUGGEST_ITEMS} chips allowed.") + body = { + "project": self._project, + "agent": self._agent, + "items": [item.model_dump() for item in items], + } + return await self._write(f"{self._base}/suggest", body) + + async def set_status( + self, *, message: str, level: str = "info", done: bool = False + ) -> RenderResult: + """Show/clear a small status overlay scoped to this view (POST /status).""" + if not self._token: + return _no_token("set status") + body: dict[str, object] = {"project": self._project, "agent": self._agent} + if done: + body["done"] = True + else: + body["message"] = message + body["level"] = level + return await self._write(f"{self._base}/status", body) + + async def _focus(self) -> None: + body = {"project": self._project, "agent": self._agent} + try: + await self._http.post( + f"{self._base}/focus", json=body, headers=self._auth_headers() + ) + except httpx.TransportError: + # Focus is best-effort cosmetics; the render already succeeded, so a transport blip + # here must not turn a successful render into a failure. + return + + async def _write(self, url: str, body: Mapping[str, object]) -> RenderResult: + try: + r = await self._http.post(url, json=body, headers=self._auth_headers()) + except httpx.TransportError as e: + return RenderResult(ok=False, error=f"viewer unreachable: {e}") + if r.status_code == 204: + return RenderResult(ok=True) + if r.status_code == 200: + return RenderResult(ok=True, warnings=_warnings(r)) + detail = r.text[:300].strip() + return RenderResult(ok=False, error=f"HTTP {r.status_code}: {detail}" if detail else f"HTTP {r.status_code}") + + +def _warnings(r: httpx.Response) -> list[str]: + """Parse a 200 body's geometry-lint warnings; an empty/odd body just means no warnings.""" + if not r.content: + return [] + try: + data = r.json() + except json.JSONDecodeError: + return [] + warnings = data.get("warnings") if isinstance(data, dict) else None + return [str(w) for w in warnings] if isinstance(warnings, list) else [] + + +def _no_token(action: str) -> RenderResult: + return RenderResult( + ok=False, + error=f"no TERMCHART_VIEWER_TOKEN configured — cannot {action}; the agent can only post text replies.", + ) + + +def is_transient(exc: BaseException) -> bool: + """Classify an exception as retryable (transport / 5xx) vs permanent.""" + if isinstance(exc, ViewerTransientError): + return True + if isinstance(exc, ViewerPermanentError): + return False + if isinstance(exc, httpx.HTTPStatusError): + return exc.response.status_code >= 500 + return isinstance(exc, httpx.TransportError) diff --git a/integrations/adk-agent/src/termchart_adk/config.py b/integrations/adk-agent/src/termchart_adk/config.py new file mode 100644 index 0000000..1445c81 --- /dev/null +++ b/integrations/adk-agent/src/termchart_adk/config.py @@ -0,0 +1,89 @@ +"""Environment-driven configuration. The GCP project is read from the environment only — +it is never hardcoded or defaulted to a real value.""" + +from __future__ import annotations + +from pydantic import Field +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + """All runtime configuration, sourced from environment variables (or a local ``.env``). + + The ``GOOGLE_*`` fields are mirrored only for validation and the redacted startup summary; + the Google SDK remains the single resolver of credentials from the same environment. + """ + + model_config = SettingsConfigDict( + env_file=".env", + env_file_encoding="utf-8", + populate_by_name=True, + case_sensitive=False, + extra="ignore", + ) + + # --- termchart viewer ----------------------------------------------------- + viewer_url: str = Field("", alias="TERMCHART_VIEWER_URL") + viewer_token: str | None = Field(None, alias="TERMCHART_VIEWER_TOKEN") + project: str = Field("", alias="TERMCHART_PROJECT") + agent: str = Field("", alias="TERMCHART_AGENT") + + # --- model backend (Vertex AI via ADC is the documented default) ---------- + use_vertexai: bool = Field(True, alias="GOOGLE_GENAI_USE_VERTEXAI") + google_cloud_project: str | None = Field(None, alias="GOOGLE_CLOUD_PROJECT") + google_cloud_location: str | None = Field(None, alias="GOOGLE_CLOUD_LOCATION") + gemini_api_key: str | None = Field(None, alias="GEMINI_API_KEY") + model: str = Field("gemini-2.5-flash", alias="TERMCHART_ADK_MODEL") + + # --- loop knobs ----------------------------------------------------------- + poll_wait_ms: int = Field(25000, alias="TERMCHART_ADK_POLL_WAIT_MS") + request_timeout_ms: int = Field(30000, alias="TERMCHART_ADK_REQUEST_TIMEOUT_MS") + backoff_start_ms: int = Field(1000, alias="TERMCHART_ADK_BACKOFF_START_MS") + backoff_max_ms: int = Field(15000, alias="TERMCHART_ADK_BACKOFF_MAX_MS") + reply_max_chars: int = Field(2000, alias="TERMCHART_ADK_REPLY_MAX_CHARS") + + def scope(self) -> str: + """The ``project/agent`` scope label this agent listens to and responds in.""" + return f"{self.project}/{self.agent}" + + def validate_for_run(self) -> list[str]: + """Return human-readable, fatal configuration problems (empty list == good to run).""" + problems: list[str] = [] + if not self.viewer_url: + problems.append("TERMCHART_VIEWER_URL is required (the viewer base URL incl. /w/).") + if not self.project: + problems.append("TERMCHART_PROJECT is required (the scope's project).") + if not self.agent: + problems.append("TERMCHART_AGENT is required (the scope's agent).") + if self.use_vertexai: + if not self.google_cloud_project: + problems.append( + "Vertex AI mode needs GOOGLE_CLOUD_PROJECT (read from your env via ADC; " + "never hardcode it)." + ) + if not self.google_cloud_location: + problems.append("Vertex AI mode needs GOOGLE_CLOUD_LOCATION (e.g. us-central1).") + elif not self.gemini_api_key: + problems.append( + "Non-Vertex mode needs GEMINI_API_KEY (or set GOOGLE_GENAI_USE_VERTEXAI=1 for Vertex)." + ) + return problems + + def soft_warnings(self) -> list[str]: + """Non-fatal configuration notes worth printing at startup.""" + notes: list[str] = [] + if not self.viewer_token: + notes.append( + "TERMCHART_VIEWER_TOKEN is not set — the agent can post text replies but cannot " + "render diagrams." + ) + return notes + + def auth_summary(self) -> str: + """A redacted one-line description of the model backend (never logs token or api key).""" + if self.use_vertexai: + return ( + f"Vertex AI · project={self.google_cloud_project} " + f"· location={self.google_cloud_location} · model={self.model}" + ) + return f"Gemini API key · model={self.model}" diff --git a/integrations/adk-agent/src/termchart_adk/loop.py b/integrations/adk-agent/src/termchart_adk/loop.py new file mode 100644 index 0000000..493ee9b --- /dev/null +++ b/integrations/adk-agent/src/termchart_adk/loop.py @@ -0,0 +1,109 @@ +"""The listen -> respond loop: long-poll the inbox, run the agent, render + post a text reply. + +Resilience mirrors the CLI's `inbox --follow` (packages/cli/src/inbox.ts): long-poll from the +cursor, back off 1s -> 2s -> ... -> 15s on transient errors (never exit), treat 4xx as permanent +(exit), advance the cursor each healthy round, and a forward-progress guard against a hot loop. +""" + +from __future__ import annotations + +import asyncio +import logging +from collections.abc import Awaitable, Callable + +from .client import TermchartClient, ViewerError, ViewerPermanentError, ViewerTransientError +from .config import Settings +from .models import InboxEvent + +logger = logging.getLogger("termchart_adk") + +# (human text) -> the agent's reply text. Injected so the loop is testable without the real LLM. +TurnFn = Callable[[str], Awaitable[str]] +SleepFn = Callable[[float], Awaitable[None]] + +# Cap on remembered self-posted seqs (the loop-back guard) so the set can't grow unbounded. +_SELF_SEQS_CAP = 512 +_SELF_SEQS_KEEP = 256 +_DRIVING_KINDS = ("message", "action") + + +class InboxListener: + """Owns the cursor, the loop-back guard, and the backoff state for one scope.""" + + def __init__( + self, + *, + client: TermchartClient, + settings: Settings, + run_turn: TurnFn, + sleep: SleepFn = asyncio.sleep, + ) -> None: + self._client = client + self._settings = settings + self._run_turn = run_turn + self._sleep = sleep + self._self_seqs: set[int] = set() + self._cursor = 0 + self._backoff_ms = 0 + self._stop = False + + def stop(self) -> None: + """Request a clean exit before the next poll (wired to SIGINT/SIGTERM).""" + self._stop = True + + async def run(self, *, max_polls: int | None = None) -> int: + """Run the loop until stopped (or ``max_polls`` rounds, a test hook). Returns an exit code.""" + polls = 0 + logger.info("following %s (Ctrl-C to stop)…", self._settings.scope()) + while not self._stop and (max_polls is None or polls < max_polls): + polls += 1 + try: + resp = await self._client.read_inbox( + since=self._cursor, wait_ms=self._settings.poll_wait_ms, peek=False + ) + except ViewerPermanentError as e: + logger.error("inbox failed (permanent): %s", e) + return 1 + except ViewerTransientError as e: + await self._sleep_backoff(f"connection issue: {e}") + continue + self._backoff_ms = 0 # healthy round + + advanced = resp.cursor > self._cursor + self._cursor = max(self._cursor, resp.cursor) + for event in resp.events: + if event.seq in self._self_seqs or event.kind not in _DRIVING_KINDS: + continue # skip our own replies (loop-back guard) and non-driving events + await self._handle(event) + + if resp.events and not advanced: + await self._sleep(1.0) # forward-progress guard against a misbehaving server + self._prune_self_seqs() + return 0 + + async def _handle(self, event: InboxEvent) -> None: + """Run one turn for a human event and post the agent's reply back into the console.""" + if event.kind == "message": + text = event.text or "" + else: + text = f'(the human clicked the suggestion chip "{event.ref}")' + if not text: + return + reply = (await self._run_turn(text)).strip()[: self._settings.reply_max_chars] + if not reply: + return + try: + created = await self._client.post_message(text=reply) + self._self_seqs.add(created.seq) + except ViewerError as e: + logger.warning("failed to post reply: %s", e) + + async def _sleep_backoff(self, why: str) -> None: + start = self._settings.backoff_start_ms + self._backoff_ms = min(self._backoff_ms * 2 if self._backoff_ms else start, self._settings.backoff_max_ms) + logger.warning("%s; reconnecting in %ds…", why, round(self._backoff_ms / 1000)) + await self._sleep(self._backoff_ms / 1000) + + def _prune_self_seqs(self) -> None: + if len(self._self_seqs) > _SELF_SEQS_CAP: + self._self_seqs = set(sorted(self._self_seqs)[-_SELF_SEQS_KEEP:]) diff --git a/integrations/adk-agent/src/termchart_adk/main.py b/integrations/adk-agent/src/termchart_adk/main.py new file mode 100644 index 0000000..127f100 --- /dev/null +++ b/integrations/adk-agent/src/termchart_adk/main.py @@ -0,0 +1,109 @@ +"""Entrypoint: load config, build the agent + client, and run the listen -> respond loop.""" + +from __future__ import annotations + +import argparse +import asyncio +import logging +import signal +import sys + +import httpx + +from .agent import build_agent, build_runner, ensure_session, run_turn +from .client import TermchartClient +from .config import Settings +from .loop import InboxListener +from .tools import make_tools + +logger = logging.getLogger("termchart_adk") + +EXIT_OK = 0 +EXIT_RUNTIME = 1 +EXIT_BAD_CONFIG = 3 +EXIT_NO_VIEWER = 4 # mirror the CLI (packages/cli/src/viewer-detect.ts) + + +def _parse_args(argv: list[str]) -> argparse.Namespace: + p = argparse.ArgumentParser( + prog="termchart-adk", + description="Listen to a termchart viewer console scope and respond by rendering + replying.", + ) + p.add_argument("--project", help="Override TERMCHART_PROJECT.") + p.add_argument("--agent", help="Override TERMCHART_AGENT.") + p.add_argument("--model", help="Override TERMCHART_ADK_MODEL.") + p.add_argument( + "--once", action="store_true", help="Run a single poll round and exit (for testing)." + ) + return p.parse_args(argv) + + +def _settings_from(args: argparse.Namespace) -> Settings: + settings = Settings() + if args.project: + settings.project = args.project + if args.agent: + settings.agent = args.agent + if args.model: + settings.model = args.model + return settings + + +async def _run(settings: Settings, *, once: bool) -> int: + timeout = httpx.Timeout(settings.request_timeout_ms / 1000) + async with httpx.AsyncClient(timeout=timeout) as http: + client = TermchartClient( + base_url=settings.viewer_url, + token=settings.viewer_token, + project=settings.project, + agent=settings.agent, + http=http, + reply_max_chars=settings.reply_max_chars, + ) + runner = build_runner(build_agent(settings, make_tools(client))) + await ensure_session(runner, session_id=settings.scope()) + + async def turn(text: str) -> str: + return await run_turn(runner, session_id=settings.scope(), text=text) + + listener = InboxListener(client=client, settings=settings, run_turn=turn) + _install_signal_handlers(listener) + return await listener.run(max_polls=1 if once else None) + + +def _install_signal_handlers(listener: InboxListener) -> None: + loop = asyncio.get_running_loop() + for sig in (signal.SIGINT, signal.SIGTERM): + try: + loop.add_signal_handler(sig, listener.stop) + except NotImplementedError: + # Signal handlers aren't available on some platforms (e.g. Windows); Ctrl-C still raises. + pass + + +def main(argv: list[str] | None = None) -> int: + """Console-script entrypoint. Returns a process exit code.""" + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") + args = _parse_args(sys.argv[1:] if argv is None else argv) + settings = _settings_from(args) + + problems = settings.validate_for_run() + if problems: + for problem in problems: + sys.stderr.write(f"config: {problem}\n") + return EXIT_NO_VIEWER if not settings.viewer_url else EXIT_BAD_CONFIG + for note in settings.soft_warnings(): + logger.warning(note) + + logger.info("termchart-adk · scope=%s · %s", settings.scope(), settings.auth_summary()) + try: + return asyncio.run(_run(settings, once=args.once)) + except KeyboardInterrupt: + return EXIT_OK + except Exception: # noqa: BLE001 — top-level crash reporter: log the full traceback, then exit. + logger.exception("termchart-adk crashed") + return EXIT_RUNTIME + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/integrations/adk-agent/src/termchart_adk/models.py b/integrations/adk-agent/src/termchart_adk/models.py new file mode 100644 index 0000000..dc348a3 --- /dev/null +++ b/integrations/adk-agent/src/termchart_adk/models.py @@ -0,0 +1,52 @@ +"""Typed wire models for the termchart HTTP contract (no raw dicts for structured data).""" + +from __future__ import annotations + +from typing import Literal + +from pydantic import BaseModel, Field + + +class InboxEvent(BaseModel): + """One console event. ``message`` carries human free text; ``action`` a clicked chip id.""" + + seq: int + ts: int + kind: Literal["message", "action"] + text: str | None = None + ref: str | None = None + + +class InboxResponse(BaseModel): + """Response of ``GET /inbox``: events after ``since``, the latest ``cursor``, the read ``acked``.""" + + events: list[InboxEvent] = Field(default_factory=list) + cursor: int = 0 + acked: int = 0 + + +class RenderResult(BaseModel): + """Outcome of a render/patch/suggest/status call, shaped for the LLM to act on. + + ``warnings`` are non-fatal readability lints (the diagram rendered, but should be fixed); + ``error`` is the server's rejection reason (the LLM should fix the spec and retry). + """ + + ok: bool + warnings: list[str] = Field(default_factory=list) + error: str | None = None + + +class ReplyResult(BaseModel): + """The event the server created for a posted text reply — its ``seq`` drives the loop-back guard.""" + + seq: int + ts: int + kind: Literal["message"] = "message" + + +class SuggestItem(BaseModel): + """A clickable suggestion chip pushed to the human.""" + + id: str + label: str diff --git a/integrations/adk-agent/src/termchart_adk/prompts.py b/integrations/adk-agent/src/termchart_adk/prompts.py new file mode 100644 index 0000000..7b78ede --- /dev/null +++ b/integrations/adk-agent/src/termchart_adk/prompts.py @@ -0,0 +1,66 @@ +"""The agent's system instruction and a compact, self-contained diagram cheat-sheet. + +This is standalone knowledge: it must NOT read the plugin/ skill directory at runtime. Keep it +small — enough for the model to pick a type and emit a valid `content` string; the viewer is the +authoritative validator and returns precise errors the model can self-correct against. +""" + +from __future__ import annotations + +DIAGRAM_CHEATSHEET = """\ +termchart diagram types — pick the richest fit (do NOT default to mermaid): + +- flow — graphs, pipelines, sequences, state machines, PR/dependency/call graphs. content is a + JSON string. Default direction "TB" (top-down). Color status nodes + add a legend for impact. + Skeleton: + {"direction":"TB", + "nodes":[{"id":"a","data":{"label":"A","status":"ok"}},{"id":"b","data":{"label":"B"}}], + "edges":[{"id":"e1","source":"a","target":"b","label":"then"}]} + node data.status ∈ ok|added|removed|modified|error|warn → colors the node. + +- component — dashboards, cards, tables, kanban, comparisons, checklists (Mantine UI). content is + a JSON string: a node tree. Each node has a string "type", optional "props", optional "children". + Skeleton: + {"type":"Stack","props":{"gap":"md"}, + "children":[{"type":"Title","props":{"order":3,"children":"Status"}}, + {"type":"Text","props":{"children":"All systems go."}}]} + +- vegalite — charts/metrics/distributions. content is a JSON string Vega-Lite spec. + Skeleton: + {"title":"Latency","mark":"line", + "encoding":{"x":{"field":"t","type":"quantitative"},"y":{"field":"ms","type":"quantitative"}}, + "data":{"values":[{"t":1,"ms":20},{"t":2,"ms":35}]}} + +- panes — several views at once. content is a JSON string; each pane's "content" is ITSELF a JSON + string of that pane's spec. + Skeleton: + {"layout":"row", + "panes":[{"title":"Graph","type":"flow","content":"{\\"nodes\\":[],\\"edges\\":[]}"}]} + +- markdown — prose / notes / summaries. content is raw markdown text (not JSON). +- mermaid — only a quick simple flowchart when nothing richer fits. content is mermaid text. + +Hard rules: every render needs a non-empty description. For flow/component/vegalite/panes the +content MUST be valid JSON (as a string). If render_diagram returns warnings, fix them and +re-render; if it returns an error, fix the spec and call again. +""" + +SYSTEM_INSTRUCTION = f"""\ +You are the termchart canvas driver for ONE scope. A human watches a live viewer (often a second +screen / iPad) and types messages into a console. For each message you: + +1. Render or update the diagram that best answers it, using your tools: + - render_diagram for a new or changed view (replaces the current view). + - patch_diagram for a small tweak to the CURRENT view (e.g. flip a node to error, update a + stat) — cheaper and avoids a full re-layout. + - suggest_chips to offer up to 12 clickable next steps. + - set_status for a brief "working…"/progress note when a turn does real work. +2. Then reply with a SHORT (1-3 sentence) natural-language message. Your final text response IS the + reply the human sees in the console log — keep it under 2000 characters, friendly and concrete + (say what you drew and suggest a next step). + +Scope discipline: only ever act on this one scope; never invent or address other scopes. +If a tool reports it cannot render (no token), still reply helpfully in text. + +{DIAGRAM_CHEATSHEET} +""" diff --git a/integrations/adk-agent/src/termchart_adk/py.typed b/integrations/adk-agent/src/termchart_adk/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/integrations/adk-agent/src/termchart_adk/tools.py b/integrations/adk-agent/src/termchart_adk/tools.py new file mode 100644 index 0000000..14b6d49 --- /dev/null +++ b/integrations/adk-agent/src/termchart_adk/tools.py @@ -0,0 +1,109 @@ +"""The tools the LLM can call, each closing over an injected TermchartClient. + +Tool functions take only JSON-friendly scalar args (the ADK builds the tool schema from the +signature + docstring) and return a human-readable string the model can act on — crucially, +surfacing the viewer's warnings/errors so the model self-corrects within the turn. +""" + +from __future__ import annotations + +import json +from collections.abc import Callable +from typing import Any + +from .client import TermchartClient +from .models import RenderResult, SuggestItem + + +def make_tools(client: TermchartClient) -> list[Callable[..., Any]]: + """Build the tool callables bound to one client/scope.""" + + async def render_diagram( + diagram_type: str, content: str, description: str, focus: bool = True + ) -> str: + """Render a diagram on the human's termchart canvas, replacing the current view. + + Args: + diagram_type: One of "flow", "component", "vegalite", "panes", "markdown", "mermaid". + content: The diagram spec. For flow/component/vegalite/panes this is a JSON STRING; for + markdown/mermaid it is raw text. See the diagram cheat-sheet in your instructions. + description: A short human-readable label for this view (required, non-empty). + focus: If true, switch the viewer's screen to this scope. + + Returns: + "Rendered the diagram." on success; a warnings note to fix and re-render; or the + server's rejection reason to fix the spec and call again. + """ + res = await client.push( + diagram_type=diagram_type, content=content, description=description, focus=focus + ) + return _message(res, ok="Rendered the diagram.") + + async def patch_diagram(ops_json: str) -> str: + """Apply small in-place edits to the CURRENT diagram without re-sending the whole spec. + + Args: + ops_json: A JSON array string of patch ops, e.g. + '[{"op":"setNodeData","id":"n1","data":{"status":"error"}}]' for flow, or + '[{"op":"setProps","id":"stat1","props":{"value":"42"}}]' for component. + + Returns: + A status string: patched, a warnings note, or why it failed. + """ + try: + ops = json.loads(ops_json) + except json.JSONDecodeError as e: + return f"ops_json is not valid JSON: {e}" + if not isinstance(ops, list): + return "ops_json must be a JSON array of patch ops." + res = await client.patch(ops=ops) + return _message(res, ok="Patched the diagram in place.") + + async def suggest_chips(items_json: str) -> str: + """Offer the human up to 12 clickable suggestion chips (quick next steps). + + Args: + items_json: A JSON array string like '[{"id":"zoom","label":"Zoom into auth"}]' + (max 12; each label <= 120 chars). Clicking a chip arrives back as a new message. + + Returns: + A status string. + """ + try: + raw = json.loads(items_json) + except json.JSONDecodeError as e: + return f"items_json is not valid JSON: {e}" + if not isinstance(raw, list): + return "items_json must be a JSON array of {id,label} objects." + items: list[SuggestItem] = [] + for entry in raw: + if not isinstance(entry, dict) or "id" not in entry or "label" not in entry: + return 'each chip needs an "id" and a "label".' + items.append(SuggestItem(id=str(entry["id"]), label=str(entry["label"]))) + res = await client.suggest(items=items) + return _message(res, ok=f"Offered {len(items)} chip(s).") + + async def set_status(message: str, level: str = "info", done: bool = False) -> str: + """Show or clear a small status overlay on the canvas (e.g. a "working…" note). + + Args: + message: The status text to show. + level: One of "info", "success", "warn", "error". + done: If true, clear the status instead of showing it. + + Returns: + A status string. + """ + res = await client.set_status(message=message, level=level, done=done) + return _message(res, ok="Status shown.") + + return [render_diagram, patch_diagram, suggest_chips, set_status] + + +def _message(res: RenderResult, *, ok: str) -> str: + """Render a RenderResult into the string the model sees.""" + if res.ok: + if res.warnings: + return f"{ok} Fix these readability warnings and re-render: " + "; ".join(res.warnings) + return ok + return f"Failed: {res.error}" diff --git a/integrations/adk-agent/tests/__init__.py b/integrations/adk-agent/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/integrations/adk-agent/tests/conftest.py b/integrations/adk-agent/tests/conftest.py new file mode 100644 index 0000000..38e3071 --- /dev/null +++ b/integrations/adk-agent/tests/conftest.py @@ -0,0 +1,40 @@ +"""Shared fixtures. An autouse fixture clears termchart/Google env so Settings tests are hermetic.""" + +from __future__ import annotations + +import os + +import pytest + +from termchart_adk.config import Settings + +from .fakes import FakeViewer + +_ENV_PREFIXES = ("TERMCHART_", "GOOGLE_", "GEMINI_") + + +@pytest.fixture(autouse=True) +def clean_env(monkeypatch: pytest.MonkeyPatch) -> None: + """Remove any termchart/Google env vars so a developer's shell can't leak into Settings tests.""" + for key in list(os.environ): + if key.startswith(_ENV_PREFIXES): + monkeypatch.delenv(key, raising=False) + + +@pytest.fixture +def viewer() -> FakeViewer: + return FakeViewer() + + +@pytest.fixture +def settings() -> Settings: + """A complete, valid configuration for loop/integration tests.""" + return Settings( + _env_file=None, + viewer_url="http://viewer/w/me", + viewer_token="dev", + project="p", + agent="a", + google_cloud_project="proj", + google_cloud_location="us-central1", + ) diff --git a/integrations/adk-agent/tests/fakes.py b/integrations/adk-agent/tests/fakes.py new file mode 100644 index 0000000..762f179 --- /dev/null +++ b/integrations/adk-agent/tests/fakes.py @@ -0,0 +1,191 @@ +"""Deterministic test doubles: an in-memory viewer (httpx.MockTransport) and a scripted ADK runner. + +No real network, no real LLM, no sleeping — the test controls exactly what each call returns. +""" + +from __future__ import annotations + +import json +from collections.abc import AsyncIterator + +import httpx + +from termchart_adk.client import TermchartClient + +_TOKEN_ROUTES = frozenset({"push", "patch", "suggest", "status", "focus"}) + + +class FakeViewer: + """An in-memory stand-in for the termchart viewer's HTTP surface.""" + + def __init__(self, *, token: str = "dev") -> None: + self.token = token + self.events: list[dict[str, object]] = [] + self.acked = 0 + self._seq = 0 + # recorded writes + self.pushed: list[dict[str, object]] = [] + self.patched: list[dict[str, object]] = [] + self.suggested: list[dict[str, object]] = [] + self.statuses: list[dict[str, object]] = [] + self.focused = 0 + # configurable responses + self.push_status = 204 + self.push_warnings: list[str] = [] + self.push_error_text = "invalid payload" + self.patch_status = 204 + # inbox fault injection + self.inbox_fail_remaining = 0 + self.inbox_status: int | None = None + self.cursor_override: int | None = None + + # --- helpers for tests ---------------------------------------------------- + def append(self, kind: str, *, text: str | None = None, ref: str | None = None) -> dict[str, object]: + self._seq += 1 + event: dict[str, object] = {"seq": self._seq, "ts": 1000 + self._seq, "kind": kind} + if text is not None: + event["text"] = text + if ref is not None: + event["ref"] = ref + self.events.append(event) + return event + + def human_message(self, text: str) -> dict[str, object]: + return self.append("message", text=text) + + def human_action(self, ref: str) -> dict[str, object]: + return self.append("action", ref=ref) + + def client(self, base_url: str = "http://viewer/w/me") -> httpx.AsyncClient: + return httpx.AsyncClient(transport=httpx.MockTransport(self._handle), base_url=base_url) + + # --- the mock transport handler ------------------------------------------ + def _handle(self, request: httpx.Request) -> httpx.Response: + action = request.url.path.rsplit("/", 1)[-1] + method = request.method + if method == "GET" and action == "inbox": + return self._get_inbox(request) + if method == "POST" and action == "inbox": + return self._post_inbox(request) + if method == "POST" and action in _TOKEN_ROUTES: + return self._post_write(action, request) + return httpx.Response(404, text="not found") + + def _get_inbox(self, request: httpx.Request) -> httpx.Response: + if self.inbox_fail_remaining > 0: + self.inbox_fail_remaining -= 1 + raise httpx.ConnectError("simulated connection drop") + if self.inbox_status is not None: + return httpx.Response(self.inbox_status, text=f"inbox error {self.inbox_status}") + since = int(request.url.params.get("since", "0")) + peek = request.url.params.get("peek") == "1" + out = [e for e in self.events if int(e["seq"]) > since] # type: ignore[arg-type] + latest = int(self.events[-1]["seq"]) if self.events else 0 # type: ignore[arg-type] + cursor = self.cursor_override if self.cursor_override is not None else latest + if not peek and out: + self.acked = max(self.acked, int(out[-1]["seq"])) # type: ignore[arg-type] + return httpx.Response(200, json={"events": out, "cursor": cursor, "acked": self.acked}) + + def _post_inbox(self, request: httpx.Request) -> httpx.Response: + body = json.loads(request.content) + event = self.append(body["kind"], text=body.get("text"), ref=body.get("ref")) + return httpx.Response(200, json=event) + + def _post_write(self, action: str, request: httpx.Request) -> httpx.Response: + if request.headers.get("authorization") != f"Bearer {self.token}": + return httpx.Response(401, text="unauthorized") + body = json.loads(request.content) if request.content else {} + if action == "push": + self.pushed.append(body) + return self._render_response(self.push_status) + if action == "patch": + self.patched.append(body) + return self._render_response(self.patch_status) + if action == "suggest": + self.suggested.append(body) + return httpx.Response(204) + if action == "status": + self.statuses.append(body) + return httpx.Response(204) + self.focused += 1 + return httpx.Response(204) + + def _render_response(self, status: int) -> httpx.Response: + if status == 200: + return httpx.Response(200, json={"warnings": self.push_warnings}) + if status >= 400: + return httpx.Response(status, text=self.push_error_text) + return httpx.Response(204) + + +# --- a scripted ADK runner for run_turn tests -------------------------------- +class _FakePart: + def __init__(self, text: str | None) -> None: + self.text = text + + +class _FakeContent: + def __init__(self, parts: list[_FakePart]) -> None: + self.parts = parts + + +class FakeEvent: + """Minimal stand-in for an ADK event.""" + + def __init__(self, *, final: bool, text: str | None = None) -> None: + self._final = final + self.content = _FakeContent([_FakePart(text)] if text is not None else []) + + def is_final_response(self) -> bool: + return self._final + + +class FakeRunner: + """Yields a fixed list of events from run_async; records the messages it was given.""" + + def __init__(self, events: list[FakeEvent]) -> None: + self._events = events + self.calls: list[object] = [] + + async def run_async( + self, *, user_id: str, session_id: str, new_message: object + ) -> AsyncIterator[FakeEvent]: + self.calls.append(new_message) + for event in self._events: + yield event + + +class RecordingTurn: + """A run_turn double: records each text it receives and returns a fixed reply.""" + + def __init__(self, reply: str = "Done.") -> None: + self.reply = reply + self.texts: list[str] = [] + + async def __call__(self, text: str) -> str: + self.texts.append(text) + return self.reply + + +class FakeSleep: + """An async sleep double: records requested delays (seconds) and returns instantly.""" + + def __init__(self) -> None: + self.calls: list[float] = [] + + async def __call__(self, seconds: float) -> None: + self.calls.append(seconds) + + +def build_client( + viewer: FakeViewer, *, token: str | None = "dev", reply_max_chars: int = 2000 +) -> TermchartClient: + """A TermchartClient wired to a FakeViewer over httpx.MockTransport.""" + return TermchartClient( + base_url="http://viewer/w/me", + token=token, + project="p", + agent="a", + http=viewer.client(), + reply_max_chars=reply_max_chars, + ) diff --git a/integrations/adk-agent/tests/test_agent_turn.py b/integrations/adk-agent/tests/test_agent_turn.py new file mode 100644 index 0000000..c5fb538 --- /dev/null +++ b/integrations/adk-agent/tests/test_agent_turn.py @@ -0,0 +1,55 @@ +"""run_turn text collection, and the render+reply path end-to-end (loop + client, no real LLM).""" + +from __future__ import annotations + +from termchart_adk.agent import run_turn +from termchart_adk.client import TermchartClient +from termchart_adk.config import Settings +from termchart_adk.loop import InboxListener + +from .fakes import FakeEvent, FakeRunner, FakeSleep, FakeViewer, build_client + + +async def test_run_turn_collects_final_text() -> None: + runner = FakeRunner( + [FakeEvent(final=False, text="thinking…"), FakeEvent(final=True, text="Here is your flow.")] + ) + out = await run_turn(runner, session_id="p/a", text="draw a flow") # type: ignore[arg-type] + assert out == "Here is your flow." + assert len(runner.calls) == 1 + + +async def test_run_turn_empty_without_final() -> None: + runner = FakeRunner([FakeEvent(final=False, text="partial")]) + out = await run_turn(runner, session_id="p/a", text="hi") # type: ignore[arg-type] + assert out == "" + + +async def test_render_and_reply_end_to_end(viewer: FakeViewer, settings: Settings) -> None: + viewer.human_message("draw the auth flow") + client = build_client(viewer) + + async def turn_that_renders(text: str) -> str: + # Simulates the LLM choosing to render, then replying — without a real model call. + await _render_flow(client) + return "Here's the auth flow — want me to zoom into the token exchange?" + + listener = InboxListener( + client=client, settings=settings, run_turn=turn_that_renders, sleep=FakeSleep() + ) + await listener.run(max_polls=1) + + assert len(viewer.pushed) == 1 # tool hit: the diagram was rendered + assert viewer.focused == 1 # and the viewer was focused on this scope + assert viewer.events[-1]["kind"] == "message" # the text reply landed in the log + assert viewer.events[-1]["text"].startswith("Here's the auth flow") # type: ignore[union-attr] + assert viewer.acked == 1 # the human's message was read (receipt flips) + + +async def _render_flow(client: TermchartClient) -> None: + await client.push( + diagram_type="flow", + content='{"direction":"TB","nodes":[{"id":"a","data":{"label":"Login"}}],"edges":[]}', + description="auth flow", + focus=True, + ) diff --git a/integrations/adk-agent/tests/test_client.py b/integrations/adk-agent/tests/test_client.py new file mode 100644 index 0000000..b91abed --- /dev/null +++ b/integrations/adk-agent/tests/test_client.py @@ -0,0 +1,123 @@ +"""TermchartClient: per-endpoint shape, auth, status handling, reply truncation.""" + +from __future__ import annotations + +import pytest + +from termchart_adk.client import ViewerPermanentError, ViewerTransientError +from termchart_adk.models import SuggestItem + +from .fakes import FakeViewer, build_client + +_FLOW = '{"nodes":[],"edges":[]}' + + +async def test_read_inbox_parses_and_acks(viewer: FakeViewer) -> None: + viewer.human_message("hi there") + client = build_client(viewer) + resp = await client.read_inbox(since=0, wait_ms=25000) + assert [e.text for e in resp.events] == ["hi there"] + assert resp.cursor == 1 + assert resp.acked == 1 # a real read advances the ack + assert viewer.acked == 1 + + +async def test_read_inbox_peek_does_not_ack(viewer: FakeViewer) -> None: + viewer.human_message("hi") + client = build_client(viewer) + await client.read_inbox(since=0, wait_ms=0, peek=True) + assert viewer.acked == 0 + + +async def test_read_inbox_4xx_is_permanent(viewer: FakeViewer) -> None: + viewer.inbox_status = 403 + client = build_client(viewer) + with pytest.raises(ViewerPermanentError): + await client.read_inbox(since=0, wait_ms=0) + + +async def test_read_inbox_5xx_is_transient(viewer: FakeViewer) -> None: + viewer.inbox_status = 502 + client = build_client(viewer) + with pytest.raises(ViewerTransientError): + await client.read_inbox(since=0, wait_ms=0) + + +async def test_read_inbox_transport_error_is_transient(viewer: FakeViewer) -> None: + viewer.inbox_fail_remaining = 1 + client = build_client(viewer) + with pytest.raises(ViewerTransientError): + await client.read_inbox(since=0, wait_ms=0) + + +async def test_post_message_returns_seq_and_truncates(viewer: FakeViewer) -> None: + client = build_client(viewer, reply_max_chars=5) + res = await client.post_message(text="hello world") + assert res.seq == 1 + assert res.kind == "message" + assert viewer.events[-1]["text"] == "hello" # truncated to reply_max_chars + + +async def test_push_ok_records_payload(viewer: FakeViewer) -> None: + client = build_client(viewer) + res = await client.push(diagram_type="flow", content=_FLOW, description="d", focus=False) + assert res.ok + assert res.warnings == [] + assert len(viewer.pushed) == 1 + assert viewer.pushed[0]["description"] == "d" + + +async def test_push_focus_calls_focus(viewer: FakeViewer) -> None: + client = build_client(viewer) + await client.push(diagram_type="flow", content=_FLOW, description="d", focus=True) + assert viewer.focused == 1 + + +async def test_push_warnings_surface(viewer: FakeViewer) -> None: + viewer.push_status = 200 + viewer.push_warnings = ["edge overlaps a node"] + client = build_client(viewer) + res = await client.push(diagram_type="flow", content=_FLOW, description="d", focus=False) + assert res.ok + assert res.warnings == ["edge overlaps a node"] + + +async def test_push_400_returns_error(viewer: FakeViewer) -> None: + viewer.push_status = 400 + viewer.push_error_text = "bad spec" + client = build_client(viewer) + res = await client.push(diagram_type="flow", content=_FLOW, description="d", focus=False) + assert not res.ok + assert "bad spec" in (res.error or "") + + +async def test_push_invalid_json_fast_fails(viewer: FakeViewer) -> None: + client = build_client(viewer) + res = await client.push(diagram_type="flow", content="{not json", description="d", focus=False) + assert not res.ok + assert "not valid JSON" in (res.error or "") + assert viewer.pushed == [] # never hit the server + + +async def test_push_without_token_degrades(viewer: FakeViewer) -> None: + client = build_client(viewer, token=None) + res = await client.push(diagram_type="flow", content=_FLOW, description="d", focus=False) + assert not res.ok + assert "TERMCHART_VIEWER_TOKEN" in (res.error or "") + assert viewer.pushed == [] + + +async def test_patch_404_is_friendly(viewer: FakeViewer) -> None: + viewer.patch_status = 404 + client = build_client(viewer) + res = await client.patch(ops=[{"op": "noop"}]) + assert not res.ok + assert "render one first" in (res.error or "") + + +async def test_suggest_too_many(viewer: FakeViewer) -> None: + client = build_client(viewer) + items = [SuggestItem(id=str(i), label=f"c{i}") for i in range(13)] + res = await client.suggest(items=items) + assert not res.ok + assert "at most 12" in (res.error or "") diff --git a/integrations/adk-agent/tests/test_config.py b/integrations/adk-agent/tests/test_config.py new file mode 100644 index 0000000..b9e5eac --- /dev/null +++ b/integrations/adk-agent/tests/test_config.py @@ -0,0 +1,79 @@ +"""Config: required vars, Vertex-vs-key resolution, no hardcoded project, redaction.""" + +from __future__ import annotations + +from termchart_adk.config import Settings + + +def _settings(**overrides: object) -> Settings: + base: dict[str, object] = {"_env_file": None} + base.update(overrides) + return Settings(**base) # type: ignore[arg-type] + + +def test_missing_required_reports_each() -> None: + problems = _settings().validate_for_run() + joined = " ".join(problems) + assert "TERMCHART_VIEWER_URL" in joined + assert "TERMCHART_PROJECT" in joined + assert "TERMCHART_AGENT" in joined + + +def test_vertex_without_project_is_reported() -> None: + problems = _settings( + viewer_url="http://v/w/me", project="p", agent="a", google_cloud_location="us-central1" + ).validate_for_run() + assert any("GOOGLE_CLOUD_PROJECT" in p for p in problems) + assert not any("TERMCHART_VIEWER_URL" in p for p in problems) + + +def test_vertex_complete_has_no_problems() -> None: + problems = _settings( + viewer_url="http://v/w/me", + project="p", + agent="a", + google_cloud_project="proj", + google_cloud_location="us-central1", + ).validate_for_run() + assert problems == [] + + +def test_api_key_mode() -> None: + without = _settings(viewer_url="http://v/w/me", project="p", agent="a", use_vertexai=False) + assert any("GEMINI_API_KEY" in p for p in without.validate_for_run()) + with_key = _settings( + viewer_url="http://v/w/me", project="p", agent="a", use_vertexai=False, gemini_api_key="k" + ) + assert with_key.validate_for_run() == [] + + +def test_project_is_never_hardcoded() -> None: + assert _settings().google_cloud_project is None + + +def test_token_missing_is_soft_warning() -> None: + notes = _settings( + viewer_url="http://v/w/me", + project="p", + agent="a", + google_cloud_project="proj", + google_cloud_location="us-central1", + ).soft_warnings() + assert any("TERMCHART_VIEWER_TOKEN" in n for n in notes) + + +def test_auth_summary_redacts_secrets() -> None: + vertex = _settings( + viewer_token="tok-secret", + google_cloud_project="proj", + google_cloud_location="us-central1", + ) + assert "tok-secret" not in vertex.auth_summary() + api = _settings(use_vertexai=False, gemini_api_key="key-secret") + summary = api.auth_summary() + assert "key-secret" not in summary + assert summary.startswith("Gemini API key") + + +def test_scope() -> None: + assert _settings(project="repo", agent="bot").scope() == "repo/bot" diff --git a/integrations/adk-agent/tests/test_loop_dedupe.py b/integrations/adk-agent/tests/test_loop_dedupe.py new file mode 100644 index 0000000..ea9ffc2 --- /dev/null +++ b/integrations/adk-agent/tests/test_loop_dedupe.py @@ -0,0 +1,49 @@ +"""Loop: the self-post loop-back guard, event filtering, and seq-order handling.""" + +from __future__ import annotations + +from termchart_adk.config import Settings +from termchart_adk.loop import InboxListener + +from .fakes import FakeSleep, FakeViewer, RecordingTurn, build_client + + +def _listener(viewer: FakeViewer, settings: Settings, turn: RecordingTurn) -> InboxListener: + return InboxListener( + client=build_client(viewer), settings=settings, run_turn=turn, sleep=FakeSleep() + ) + + +async def test_self_reply_is_not_reprocessed(viewer: FakeViewer, settings: Settings) -> None: + viewer.human_message("draw the flow") + turn = RecordingTurn(reply="here you go") + listener = _listener(viewer, settings, turn) + + await listener.run(max_polls=2) + + # The human message drove exactly one turn; the agent's own reply (seen on the 2nd poll) did not. + assert turn.texts == ["draw the flow"] + assert viewer.events[-1]["kind"] == "message" + assert viewer.events[-1]["text"] == "here you go" + + +async def test_message_then_action_in_seq_order(viewer: FakeViewer, settings: Settings) -> None: + viewer.human_message("first") + viewer.human_action("zoom-auth") + turn = RecordingTurn() + listener = _listener(viewer, settings, turn) + + await listener.run(max_polls=1) + + assert turn.texts == ["first", '(the human clicked the suggestion chip "zoom-auth")'] + + +async def test_no_events_no_turn(viewer: FakeViewer, settings: Settings) -> None: + turn = RecordingTurn() + listener = _listener(viewer, settings, turn) + + rc = await listener.run(max_polls=1) + + assert rc == 0 + assert turn.texts == [] + assert viewer.pushed == [] diff --git a/integrations/adk-agent/tests/test_loop_resilience.py b/integrations/adk-agent/tests/test_loop_resilience.py new file mode 100644 index 0000000..a8597dc --- /dev/null +++ b/integrations/adk-agent/tests/test_loop_resilience.py @@ -0,0 +1,77 @@ +"""Loop resilience: backoff sequence, 4xx exit, forward-progress guard, clean stop. + +All timing is via an injected FakeSleep — assertions are on the *requested* delays, never wall-clock. +""" + +from __future__ import annotations + +from termchart_adk.config import Settings +from termchart_adk.loop import InboxListener + +from .fakes import FakeSleep, FakeViewer, RecordingTurn, build_client + + +def _listener( + viewer: FakeViewer, settings: Settings, sleep: FakeSleep, turn: RecordingTurn +) -> InboxListener: + return InboxListener(client=build_client(viewer), settings=settings, run_turn=turn, sleep=sleep) + + +async def test_backoff_doubles(viewer: FakeViewer, settings: Settings) -> None: + viewer.inbox_fail_remaining = 4 # 4 transient failures, then a healthy (empty) round + sleep = FakeSleep() + listener = _listener(viewer, settings, sleep, RecordingTurn()) + + rc = await listener.run(max_polls=5) + + assert rc == 0 + assert sleep.calls == [1.0, 2.0, 4.0, 8.0] + + +async def test_backoff_caps_at_max(viewer: FakeViewer, settings: Settings) -> None: + viewer.inbox_fail_remaining = 6 + sleep = FakeSleep() + listener = _listener(viewer, settings, sleep, RecordingTurn()) + + await listener.run(max_polls=6) + + assert sleep.calls == [1.0, 2.0, 4.0, 8.0, 15.0, 15.0] + + +async def test_4xx_exits_without_backoff(viewer: FakeViewer, settings: Settings) -> None: + viewer.inbox_status = 400 + sleep = FakeSleep() + turn = RecordingTurn() + listener = _listener(viewer, settings, sleep, turn) + + rc = await listener.run(max_polls=3) + + assert rc == 1 + assert sleep.calls == [] + assert turn.texts == [] + + +async def test_forward_progress_guard(viewer: FakeViewer, settings: Settings) -> None: + viewer.human_message("hi") + viewer.cursor_override = 0 # server returns an event but does NOT advance the cursor + sleep = FakeSleep() + turn = RecordingTurn() + listener = _listener(viewer, settings, sleep, turn) + + await listener.run(max_polls=1) + + assert sleep.calls == [1.0] # the one-second forward-progress pause + assert turn.texts == ["hi"] + + +async def test_clean_stop_returns_zero(viewer: FakeViewer, settings: Settings) -> None: + sleep = FakeSleep() + turn = RecordingTurn() + listener = _listener(viewer, settings, sleep, turn) + listener.stop() + + rc = await listener.run() + + assert rc == 0 + assert turn.texts == [] + assert sleep.calls == [] diff --git a/integrations/adk-agent/tests/test_tools.py b/integrations/adk-agent/tests/test_tools.py new file mode 100644 index 0000000..c0fe7cd --- /dev/null +++ b/integrations/adk-agent/tests/test_tools.py @@ -0,0 +1,88 @@ +"""Tools: each surfaces the server's success/warnings/errors as the string the model acts on.""" + +from __future__ import annotations + +import json + +from termchart_adk.tools import make_tools + +from .fakes import FakeViewer, build_client + +_FLOW = '{"nodes":[],"edges":[]}' + + +def _tools(viewer: FakeViewer, *, token: str | None = "dev"): + render, patch, suggest, status = make_tools(build_client(viewer, token=token)) + return render, patch, suggest, status + + +async def test_render_ok(viewer: FakeViewer) -> None: + render, _, _, _ = _tools(viewer) + out = await render("flow", _FLOW, "a flow") + assert out == "Rendered the diagram." + assert len(viewer.pushed) == 1 + + +async def test_render_surfaces_warnings(viewer: FakeViewer) -> None: + viewer.push_status = 200 + viewer.push_warnings = ["edge overlaps node n2"] + render, _, _, _ = _tools(viewer) + out = await render("flow", _FLOW, "a flow") + assert "Rendered the diagram." in out + assert "edge overlaps node n2" in out + + +async def test_render_surfaces_error(viewer: FakeViewer) -> None: + viewer.push_status = 400 + viewer.push_error_text = "content failed validation" + render, _, _, _ = _tools(viewer) + out = await render("flow", _FLOW, "a flow") + assert out.startswith("Failed:") + assert "content failed validation" in out + + +async def test_render_invalid_json(viewer: FakeViewer) -> None: + render, _, _, _ = _tools(viewer) + out = await render("flow", "{nope", "a flow") + assert "not valid JSON" in out + assert viewer.pushed == [] + + +async def test_render_without_token(viewer: FakeViewer) -> None: + render, _, _, _ = _tools(viewer, token=None) + out = await render("flow", _FLOW, "a flow") + assert "TERMCHART_VIEWER_TOKEN" in out + + +async def test_patch_404_message(viewer: FakeViewer) -> None: + viewer.patch_status = 404 + _, patch, _, _ = _tools(viewer) + out = await patch('[{"op":"setNodeData","id":"n1","data":{"status":"error"}}]') + assert "render one first" in out + + +async def test_patch_bad_json(viewer: FakeViewer) -> None: + _, patch, _, _ = _tools(viewer) + out = await patch("{not an array") + assert "not valid JSON" in out + + +async def test_suggest_rejects_over_twelve(viewer: FakeViewer) -> None: + _, _, suggest, _ = _tools(viewer) + items = json.dumps([{"id": str(i), "label": f"c{i}"} for i in range(13)]) + out = await suggest(items) + assert "at most 12" in out + + +async def test_suggest_ok(viewer: FakeViewer) -> None: + _, _, suggest, _ = _tools(viewer) + out = await suggest('[{"id":"zoom","label":"Zoom into auth"}]') + assert "Offered 1 chip" in out + assert len(viewer.suggested) == 1 + + +async def test_set_status_ok(viewer: FakeViewer) -> None: + _, _, _, status = _tools(viewer) + out = await status("working on it") + assert out == "Status shown." + assert len(viewer.statuses) == 1