From 4230adde822dd823b687e5411fe24ae652d2a5d6 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 20 May 2026 14:36:30 +0000 Subject: [PATCH 1/3] feat(adagents): directory inverse-lookup wrapper + divergence detector (Parts 2+3 of #749) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Part 2 — fetch_agent_authorizations_from_directory: New async function wrapping GET /api/v1/agents/{agent_url}/publishers on the AAO directory. Returns AgentDirectoryLookup (paged) with one AgentPublisherEntry per authorized publisher. Provenance fields (discovery_method, manager_domain) align with AdAgentsValidationResult so callers can route directory-sourced and per-domain results through the same code path. Mutable status default avoided (uses None → ["authorized"]). Part 3 — detect_publisher_properties_divergence: New async verifier that compares the directory's property counts (inline resolution) against direct per-child adagents.json fetches (federated resolution). Returns a DivergenceReport (list of PublisherDivergence dataclasses) — empty means no divergence. Per adcp#4827, federated wins on disagreement; this function surfaces the signal for operator monitoring. sample_size caps sweeps over large managed networks (cafemedia = ~6,800 child fetches without a cap). Refs #749 https://claude.ai/code/session_01RDYrywLhVbd4crrAHkTnsH --- src/adcp/adagents.py | 296 +++++++++++++++++++++++++++++++++++++++++ tests/test_adagents.py | 273 +++++++++++++++++++++++++++++++++++++ 2 files changed, 569 insertions(+) diff --git a/src/adcp/adagents.py b/src/adcp/adagents.py index f97ac5e0..16522d00 100644 --- a/src/adcp/adagents.py +++ b/src/adcp/adagents.py @@ -11,6 +11,7 @@ import ipaddress import re from dataclasses import dataclass, field +from datetime import datetime from typing import Any, Literal from urllib.parse import urlparse @@ -1374,3 +1375,298 @@ async def fetch_authorization_for_domain( # Build result dictionary, filtering out None values return {domain: ctx for domain, ctx in results if ctx is not None} + + +# --------------------------------------------------------------------------- +# Part 2 — Directory inverse-lookup wrapper (adcp#4823 / spec PR adcp#4828) +# --------------------------------------------------------------------------- + + +@dataclass +class AgentPublisherEntry: + """A single publisher row from the AAO directory's agent-publishers endpoint. + + Fields mirror the response envelope schema (agent-publishers.json). + ``discovery_method`` and ``manager_domain`` align with the provenance + vocabulary on :class:`AdAgentsValidationResult` so callers can route + directory-sourced and per-domain results through the same code path. + """ + + publisher_domain: str + discovery_method: str + manager_domain: str | None + properties_authorized: int + properties_total: int + signing_keys_pinned: bool + status: str + last_verified_at: str | None + + +@dataclass +class AgentDirectoryLookup: + """Envelope returned by :func:`fetch_agent_authorizations_from_directory`. + + ``publishers`` is the list of publisher entries for this page. + ``cursor`` is ``None`` when there are no further pages. + ``total`` is set when the server returns a total count. + """ + + agent_url: str + publishers: list[AgentPublisherEntry] + cursor: str | None = None + total: int | None = None + + +async def fetch_agent_authorizations_from_directory( + agent_url: str, + directory_url: str = "https://agenticadvertising.org", + *, + since: datetime | None = None, + status: list[str] | None = None, + cursor: str | None = None, + limit: int = 200, + timeout: float = 30.0, + client: httpx.AsyncClient | None = None, +) -> AgentDirectoryLookup: + """Return publishers that authorize ``agent_url`` from the AAO directory. + + Inverse of :func:`fetch_agent_authorizations` — instead of pulling from + individual publisher adagents.json files, this queries the directory's + ``GET /api/v1/agents/{agent_url}/publishers`` index. + + Each returned :class:`AgentPublisherEntry` carries the same + ``discovery_method`` / ``manager_domain`` provenance fields as + :class:`AdAgentsValidationResult`, so consumers can route directory-sourced + and per-domain results through the same code path. + + Args: + agent_url: The sales agent URL to look up (``%``-encoded in the path). + directory_url: Base URL of the AAO directory + (default ``"https://agenticadvertising.org"``). + since: Only return publishers whose authorization was last verified + after this timestamp. ``None`` returns all. + status: Filter by authorization status (default ``["authorized"]``). + Pass ``["authorized", "revoked"]`` to include revoked entries. + cursor: Pagination cursor from a previous call's ``cursor`` field. + limit: Maximum entries per page (server-side cap may be lower). + timeout: Per-request timeout in seconds. + client: Optional ``httpx.AsyncClient`` for connection pooling. + The client is **not** closed by this function. + + Returns: + :class:`AgentDirectoryLookup` with ``publishers`` for this page and + a ``cursor`` for the next page (``None`` when exhausted). + + Example:: + + lookup = await fetch_agent_authorizations_from_directory( + "https://interchange.io", + ) + print(f"{len(lookup.publishers)} publishers on first page") + while lookup.cursor: + lookup = await fetch_agent_authorizations_from_directory( + "https://interchange.io", + cursor=lookup.cursor, + ) + """ + from urllib.parse import quote + + if status is None: + status = ["authorized"] + + encoded_agent = quote(agent_url, safe="") + url = f"{directory_url.rstrip('/')}/api/v1/agents/{encoded_agent}/publishers" + + params: dict[str, str | int] = { + "limit": limit, + "status": ",".join(status), + } + if cursor: + params["cursor"] = cursor + if since is not None: + params["since"] = since.isoformat() + + own_client = client is None + http = client or httpx.AsyncClient() + try: + response = await http.get(url, params=params, timeout=timeout) + response.raise_for_status() + data = response.json() + finally: + if own_client: + await http.aclose() + + publishers: list[AgentPublisherEntry] = [] + for row in data.get("publishers", data.get("results", [])): + publishers.append( + AgentPublisherEntry( + publisher_domain=row["publisher_domain"], + discovery_method=row.get("discovery_method", "adagents_authoritative"), + manager_domain=row.get("manager_domain"), + properties_authorized=row.get("properties_authorized", 0), + properties_total=row.get("properties_total", 0), + signing_keys_pinned=bool(row.get("signing_keys_pinned", False)), + status=row.get("status", "authorized"), + last_verified_at=row.get("last_verified_at"), + ) + ) + + return AgentDirectoryLookup( + agent_url=agent_url, + publishers=publishers, + cursor=data.get("cursor") or data.get("next_cursor"), + total=data.get("total"), + ) + + +# --------------------------------------------------------------------------- +# Part 3 — Divergence detector (adcp#4827 §Resolution-paths) +# --------------------------------------------------------------------------- + + +@dataclass +class PublisherDivergence: + """Divergence record for a single publisher domain. + + ``missing_in_inline`` contains property IDs that the federated fetch + found in the publisher's own adagents.json but the directory's inline + resolution did not surface (publisher has properties the directory + doesn't know about yet). + + ``missing_in_federated`` contains property IDs the directory claims + the agent is authorized for but the publisher's own adagents.json + does not include (stale directory entry or publisher revocation). + + ``child_fetch_error`` is non-``None`` when the publisher's adagents.json + could not be fetched or parsed; the other fields are empty in that case. + """ + + publisher_domain: str + directory_properties_authorized: int + federated_properties_found: int + missing_in_inline: list[str] + missing_in_federated: list[str] + child_fetch_error: str | None + + +DivergenceReport = list[PublisherDivergence] + + +async def detect_publisher_properties_divergence( + agent_url: str, + directory_url: str = "https://agenticadvertising.org", + *, + sample_size: int | None = None, + timeout: float = 30.0, + client: httpx.AsyncClient | None = None, +) -> DivergenceReport: + """Compare inline resolution (directory) against federated resolution (per-child fetch). + + For each publisher the directory lists under ``agent_url``: + + 1. Read the directory's inline result (``properties_authorized`` count). + 2. Fetch the publisher's own adagents.json directly. + 3. Apply the same agent-URL filter via :func:`get_properties_by_agent`. + 4. Diff the ``(publisher_domain, property_id)`` sets. + + Per adcp#4827 §Resolution-paths, when the two paths disagree the + federated result is authoritative; this function surfaces the signal + so operators can detect data-integrity issues before they affect buyers. + + **Cost warning — ``sample_size`` is mandatory for large networks.** + Running a full sweep against cafemedia's ~6,800 child publishers requires + ~6,800 sequential (or parallel) HTTP fetches. With a 30 s timeout each + that is hours of wall-clock time. Pass ``sample_size=N`` to cap the + sweep; the sample is taken from the first page of directory results. + + Args: + agent_url: The agent URL to check authorizations for. + directory_url: AAO directory base URL. + sample_size: Maximum number of publisher domains to probe. ``None`` + sweeps all pages (full network — may be very slow). + timeout: Per-request timeout for both directory and child fetches. + client: Optional shared ``httpx.AsyncClient``. + + Returns: + :class:`DivergenceReport` — empty list means no divergence detected. + Only publishers where the two paths disagree (or where the child + fetch failed) appear in the report. + + Example:: + + report = await detect_publisher_properties_divergence( + "https://interchange.io", + sample_size=100, + ) + for entry in report: + print(f"{entry.publisher_domain}: " + f"+{len(entry.missing_in_inline)} inline-only, " + f"+{len(entry.missing_in_federated)} federated-only") + """ + import asyncio + + own_client = client is None + http = client or httpx.AsyncClient() + + try: + # Collect the publisher list from the directory (paged, sample capped). + all_entries: list[AgentPublisherEntry] = [] + page_cursor: str | None = None + while True: + page = await fetch_agent_authorizations_from_directory( + agent_url, + directory_url=directory_url, + cursor=page_cursor, + timeout=timeout, + client=http, + ) + all_entries.extend(page.publishers) + if sample_size is not None and len(all_entries) >= sample_size: + all_entries = all_entries[:sample_size] + break + page_cursor = page.cursor + if not page_cursor: + break + + async def _probe(entry: AgentPublisherEntry) -> PublisherDivergence | None: + try: + data = await fetch_adagents( + entry.publisher_domain, timeout=timeout, client=http + ) + federated_props = get_properties_by_agent(data, agent_url) + federated_ids = { + p.get("property_id") + for p in federated_props + if p.get("property_id") + } + except (AdagentsNotFoundError, AdagentsValidationError, AdagentsTimeoutError) as exc: + return PublisherDivergence( + publisher_domain=entry.publisher_domain, + directory_properties_authorized=entry.properties_authorized, + federated_properties_found=0, + missing_in_inline=[], + missing_in_federated=[], + child_fetch_error=str(exc), + ) + + fed_count = len(federated_ids) + # Without explicit property IDs from the directory we can only + # compare counts; return None (no divergence record) when counts match. + if fed_count == entry.properties_authorized: + return None + + return PublisherDivergence( + publisher_domain=entry.publisher_domain, + directory_properties_authorized=entry.properties_authorized, + federated_properties_found=fed_count, + missing_in_inline=[], + missing_in_federated=[], + child_fetch_error=None, + ) + + probes = await asyncio.gather(*[_probe(e) for e in all_entries]) + finally: + if own_client: + await http.aclose() + + return [p for p in probes if p is not None] diff --git a/tests/test_adagents.py b/tests/test_adagents.py index 44195fd1..61cd6079 100644 --- a/tests/test_adagents.py +++ b/tests/test_adagents.py @@ -9,11 +9,16 @@ import pytest from adcp.adagents import ( + AgentDirectoryLookup, + AgentPublisherEntry, AuthorizationContext, + PublisherDivergence, _normalize_domain, _validate_publisher_domain, + detect_publisher_properties_divergence, domain_matches, fetch_agent_authorizations, + fetch_agent_authorizations_from_directory, get_all_properties, get_all_tags, get_properties_by_agent, @@ -2675,3 +2680,271 @@ def test_report_dataclass_is_immutable(self): err = AdagentsEntryError(index=0, kind="missing_url", message="x") with pytest.raises(dataclasses.FrozenInstanceError): err.index = 1 # type: ignore[misc] + + +class TestFetchAgentAuthorizationsFromDirectory: + """Tests for fetch_agent_authorizations_from_directory (Part 2 of #749).""" + + def _make_response(self, publishers: list[dict], cursor: str | None = None) -> MagicMock: + resp = MagicMock() + resp.json.return_value = { + "publishers": publishers, + "cursor": cursor, + "total": len(publishers), + } + resp.raise_for_status = MagicMock() + return resp + + async def test_returns_publisher_entries(self): + """Should deserialize publisher rows into AgentPublisherEntry dataclasses.""" + raw_publishers = [ + { + "publisher_domain": "cafemedia.com", + "discovery_method": "adagents_authoritative", + "manager_domain": "cafemedia.com", + "properties_authorized": 6843, + "properties_total": 6843, + "signing_keys_pinned": True, + "status": "authorized", + "last_verified_at": "2026-05-20T00:00:00Z", + }, + { + "publisher_domain": "site0001.raptive.com", + "discovery_method": "adagents_authoritative", + "manager_domain": "cafemedia.com", + "properties_authorized": 1, + "properties_total": 1, + "signing_keys_pinned": False, + "status": "authorized", + "last_verified_at": "2026-05-20T00:00:00Z", + }, + ] + mock_client = MagicMock() + mock_client.get = AsyncMock(return_value=self._make_response(raw_publishers)) + + result = await fetch_agent_authorizations_from_directory( + "https://interchange.io", + client=mock_client, + ) + + assert isinstance(result, AgentDirectoryLookup) + assert result.agent_url == "https://interchange.io" + assert len(result.publishers) == 2 + assert result.cursor is None + assert result.total == 2 + + first = result.publishers[0] + assert isinstance(first, AgentPublisherEntry) + assert first.publisher_domain == "cafemedia.com" + assert first.discovery_method == "adagents_authoritative" + assert first.manager_domain == "cafemedia.com" + assert first.properties_authorized == 6843 + assert first.signing_keys_pinned is True + assert first.status == "authorized" + + async def test_url_encodes_agent_url(self): + """agent_url with slashes and colons must be %-encoded in the path.""" + mock_client = MagicMock() + mock_client.get = AsyncMock(return_value=self._make_response([])) + + await fetch_agent_authorizations_from_directory( + "https://interchange.io", + client=mock_client, + ) + + call_url = mock_client.get.call_args[0][0] + assert "https%3A%2F%2Finterchange.io" in call_url + assert "/api/v1/agents/" in call_url + + async def test_pagination_cursor_returned(self): + """When server returns a cursor, it is exposed on AgentDirectoryLookup.""" + mock_client = MagicMock() + mock_client.get = AsyncMock( + return_value=self._make_response( + [{"publisher_domain": "example.com", "discovery_method": "adagents_authoritative", + "manager_domain": None, "properties_authorized": 1, "properties_total": 1, + "signing_keys_pinned": False, "status": "authorized", "last_verified_at": None}], + cursor="next-page-token", + ) + ) + + result = await fetch_agent_authorizations_from_directory( + "https://interchange.io", + client=mock_client, + ) + assert result.cursor == "next-page-token" + + async def test_default_status_is_authorized(self): + """Default status filter should be 'authorized' (not a mutable default arg).""" + mock_client = MagicMock() + mock_client.get = AsyncMock(return_value=self._make_response([])) + + await fetch_agent_authorizations_from_directory( + "https://interchange.io", + client=mock_client, + ) + + _, kwargs = mock_client.get.call_args + params = kwargs.get("params", {}) + assert params.get("status") == "authorized" + + async def test_custom_directory_url(self): + """directory_url prefix should be respected.""" + mock_client = MagicMock() + mock_client.get = AsyncMock(return_value=self._make_response([])) + + await fetch_agent_authorizations_from_directory( + "https://interchange.io", + directory_url="https://custom-dir.example.com", + client=mock_client, + ) + + call_url = mock_client.get.call_args[0][0] + assert call_url.startswith("https://custom-dir.example.com") + + +class TestDetectPublisherPropertiesDivergence: + """Tests for detect_publisher_properties_divergence (Part 3 of #749).""" + + def _make_dir_response(self, publishers: list[dict]) -> MagicMock: + resp = MagicMock() + resp.json.return_value = { + "publishers": publishers, + "cursor": None, + "total": len(publishers), + } + resp.raise_for_status = MagicMock() + return resp + + async def test_no_divergence_returns_empty(self): + """When directory count matches federated count, report should be empty.""" + dir_publishers = [ + {"publisher_domain": "match.com", "discovery_method": "adagents_authoritative", + "manager_domain": None, "properties_authorized": 2, "properties_total": 2, + "signing_keys_pinned": False, "status": "authorized", "last_verified_at": None}, + ] + child_adagents = { + "authorized_agents": [ + { + "url": "https://interchange.io", + "authorization_type": "inline_properties", + "authorized_for": "Test", + "properties": [ + {"property_id": "p-001", "name": "Prop 1"}, + {"property_id": "p-002", "name": "Prop 2"}, + ], + } + ] + } + + mock_client = MagicMock() + mock_client.get = AsyncMock(return_value=self._make_dir_response(dir_publishers)) + + with unittest.mock.patch( + "adcp.adagents.fetch_adagents", new_callable=AsyncMock + ) as mock_fetch: + mock_fetch.return_value = child_adagents + report = await detect_publisher_properties_divergence( + "https://interchange.io", + client=mock_client, + ) + + assert report == [] + + async def test_count_divergence_reported(self): + """When directory and federated counts differ, a divergence entry is returned.""" + dir_publishers = [ + {"publisher_domain": "drift.com", "discovery_method": "adagents_authoritative", + "manager_domain": None, "properties_authorized": 5, "properties_total": 5, + "signing_keys_pinned": False, "status": "authorized", "last_verified_at": None}, + ] + # Federated fetch returns only 3 properties — diverges from directory's 5 + child_adagents = { + "authorized_agents": [ + { + "url": "https://interchange.io", + "authorization_type": "inline_properties", + "authorized_for": "Test", + "properties": [ + {"property_id": f"p-{i}", "name": f"Prop {i}"} for i in range(3) + ], + } + ] + } + + mock_client = MagicMock() + mock_client.get = AsyncMock(return_value=self._make_dir_response(dir_publishers)) + + with unittest.mock.patch( + "adcp.adagents.fetch_adagents", new_callable=AsyncMock + ) as mock_fetch: + mock_fetch.return_value = child_adagents + report = await detect_publisher_properties_divergence( + "https://interchange.io", + client=mock_client, + ) + + assert len(report) == 1 + assert isinstance(report[0], PublisherDivergence) + assert report[0].publisher_domain == "drift.com" + assert report[0].directory_properties_authorized == 5 + assert report[0].federated_properties_found == 3 + assert report[0].child_fetch_error is None + + async def test_child_fetch_error_recorded(self): + """When fetching the child adagents.json fails, error is recorded in report.""" + from adcp.exceptions import AdagentsNotFoundError + + dir_publishers = [ + {"publisher_domain": "gone.com", "discovery_method": "adagents_authoritative", + "manager_domain": None, "properties_authorized": 1, "properties_total": 1, + "signing_keys_pinned": False, "status": "authorized", "last_verified_at": None}, + ] + + mock_client = MagicMock() + mock_client.get = AsyncMock(return_value=self._make_dir_response(dir_publishers)) + + with unittest.mock.patch( + "adcp.adagents.fetch_adagents", new_callable=AsyncMock + ) as mock_fetch: + mock_fetch.side_effect = AdagentsNotFoundError("gone.com") + report = await detect_publisher_properties_divergence( + "https://interchange.io", + client=mock_client, + ) + + assert len(report) == 1 + assert report[0].publisher_domain == "gone.com" + assert report[0].child_fetch_error is not None + assert report[0].federated_properties_found == 0 + + async def test_sample_size_caps_probes(self): + """sample_size should limit the number of publisher domains probed.""" + dir_publishers = [ + {"publisher_domain": f"site{i}.com", "discovery_method": "adagents_authoritative", + "manager_domain": None, "properties_authorized": 1, "properties_total": 1, + "signing_keys_pinned": False, "status": "authorized", "last_verified_at": None} + for i in range(10) + ] + child_adagents = { + "authorized_agents": [ + {"url": "https://interchange.io", "authorization_type": "inline_properties", + "authorized_for": "Test", + "properties": [{"property_id": "p-001", "name": "Prop 1"}]} + ] + } + + mock_client = MagicMock() + mock_client.get = AsyncMock(return_value=self._make_dir_response(dir_publishers)) + + with unittest.mock.patch( + "adcp.adagents.fetch_adagents", new_callable=AsyncMock + ) as mock_fetch: + mock_fetch.return_value = child_adagents + await detect_publisher_properties_divergence( + "https://interchange.io", + sample_size=3, + client=mock_client, + ) + + assert mock_fetch.call_count == 3 From 88006702136d8bd3fafe0f1d527b5c00aaec0f86 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 20 May 2026 14:48:39 +0000 Subject: [PATCH 2/3] fix(adagents): address expert-review blockers on divergence detector (Parts 2+3 of #749) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BLOCKER 1 — missing_in_inline/missing_in_federated were always []; changed to list[str] | None where None signals count-only mode (directory doesn't return property IDs). Callers can now branch on None vs [] correctly. Docstring updated to call out the count-only limitation explicitly. BLOCKER 2 — Count-equality was silently treated as "no divergence" even though set divergence (same-count property replacement) is undetectable without IDs. Docstring now documents this false-negative path; fields are None not [] to prevent callers from treating empty lists as "diff computed, no gaps". NIT — status query param now uses repeated keys (?status=a&status=b) via a list-of-tuples param_list, not comma-joined (?status=a%2Cb). Tests updated to assert the repeated-key form. Refs #749 https://claude.ai/code/session_01RDYrywLhVbd4crrAHkTnsH --- src/adcp/adagents.py | 94 +++++++++++++++++++++++++++--------------- tests/test_adagents.py | 9 +++- 2 files changed, 67 insertions(+), 36 deletions(-) diff --git a/src/adcp/adagents.py b/src/adcp/adagents.py index 16522d00..1c72d40b 100644 --- a/src/adcp/adagents.py +++ b/src/adcp/adagents.py @@ -1477,19 +1477,21 @@ async def fetch_agent_authorizations_from_directory( encoded_agent = quote(agent_url, safe="") url = f"{directory_url.rstrip('/')}/api/v1/agents/{encoded_agent}/publishers" - params: dict[str, str | int] = { - "limit": limit, - "status": ",".join(status), - } + # Build params as a list of (key, value) tuples so multi-value status + # produces repeated keys (?status=authorized&status=revoked), not a + # comma-joined string that httpx would percent-encode as %2C. + param_list: list[tuple[str, str | int | float | bool | None]] = [("limit", limit)] + for s in status: + param_list.append(("status", s)) if cursor: - params["cursor"] = cursor + param_list.append(("cursor", cursor)) if since is not None: - params["since"] = since.isoformat() + param_list.append(("since", since.isoformat())) own_client = client is None http = client or httpx.AsyncClient() try: - response = await http.get(url, params=params, timeout=timeout) + response = await http.get(url, params=param_list, timeout=timeout) response.raise_for_status() data = response.json() finally: @@ -1528,24 +1530,32 @@ async def fetch_agent_authorizations_from_directory( class PublisherDivergence: """Divergence record for a single publisher domain. - ``missing_in_inline`` contains property IDs that the federated fetch - found in the publisher's own adagents.json but the directory's inline - resolution did not surface (publisher has properties the directory - doesn't know about yet). + ``missing_in_inline`` contains property IDs the federated fetch found + in the publisher's own adagents.json that the directory did not surface + (publisher has properties the directory doesn't know about yet). - ``missing_in_federated`` contains property IDs the directory claims - the agent is authorized for but the publisher's own adagents.json - does not include (stale directory entry or publisher revocation). + ``missing_in_federated`` contains property IDs the directory claims the + agent is authorized for but the publisher's own adagents.json does not + include (stale directory entry or publisher revocation). + + Both fields are ``None`` when the directory does not return per-publisher + property IDs (count-only mode). In count-only mode the comparison is + limited to ``directory_properties_authorized != federated_properties_found``. + **Count-equality does NOT guarantee set equality** — if the publisher + replaced three properties with three different ones, count-only mode + produces a false-negative. Use ``?include=properties`` on the directory + endpoint (when supported) to get full set-diff precision. ``child_fetch_error`` is non-``None`` when the publisher's adagents.json - could not be fetched or parsed; the other fields are empty in that case. + could not be fetched or parsed; the count and list fields carry no + meaning in that case. """ publisher_domain: str directory_properties_authorized: int federated_properties_found: int - missing_in_inline: list[str] - missing_in_federated: list[str] + missing_in_inline: list[str] | None + missing_in_federated: list[str] | None child_fetch_error: str | None @@ -1560,24 +1570,35 @@ async def detect_publisher_properties_divergence( timeout: float = 30.0, client: httpx.AsyncClient | None = None, ) -> DivergenceReport: - """Compare inline resolution (directory) against federated resolution (per-child fetch). + """Compare directory inline resolution against per-child federated resolution. For each publisher the directory lists under ``agent_url``: - 1. Read the directory's inline result (``properties_authorized`` count). - 2. Fetch the publisher's own adagents.json directly. + 1. Read the directory's ``properties_authorized`` count (inline result). + 2. Fetch the publisher's own adagents.json directly (federated result). 3. Apply the same agent-URL filter via :func:`get_properties_by_agent`. - 4. Diff the ``(publisher_domain, property_id)`` sets. + 4. Compare counts. When they differ, emit a :class:`PublisherDivergence`. - Per adcp#4827 §Resolution-paths, when the two paths disagree the - federated result is authoritative; this function surfaces the signal + Per adcp#4827 §Resolution-paths, the federated result is authoritative + when the two paths disagree. This function surfaces count-level divergence so operators can detect data-integrity issues before they affect buyers. + **Known limitation — count-only comparison.** The AAO directory + endpoint currently returns ``properties_authorized`` counts, not + property-ID lists. Count-equality does NOT guarantee set equality: + if a publisher replaced three old properties with three new ones, this + function reports no divergence. ``PublisherDivergence.missing_in_inline`` + and ``.missing_in_federated`` are ``None`` (not ``[]``) to signal + count-only mode. A future call to ``?include=properties`` on the + directory endpoint will enable full set-diff once that parameter is + deployed. + **Cost warning — ``sample_size`` is mandatory for large networks.** - Running a full sweep against cafemedia's ~6,800 child publishers requires - ~6,800 sequential (or parallel) HTTP fetches. With a 30 s timeout each - that is hours of wall-clock time. Pass ``sample_size=N`` to cap the - sweep; the sample is taken from the first page of directory results. + Running a full sweep against cafemedia's ~6,800 child publishers launches + ~6,800 concurrent HTTP fetches. With a 30 s timeout each, total wall-clock + is bounded by the slowest fetch, but server-side rate limits may apply. + Pass ``sample_size=N`` to cap the sweep; the sample is taken from the + first page of directory results. Args: agent_url: The agent URL to check authorizations for. @@ -1644,23 +1665,28 @@ async def _probe(entry: AgentPublisherEntry) -> PublisherDivergence | None: publisher_domain=entry.publisher_domain, directory_properties_authorized=entry.properties_authorized, federated_properties_found=0, - missing_in_inline=[], - missing_in_federated=[], + # None = count-only mode; IDs unavailable from directory + missing_in_inline=None, + missing_in_federated=None, child_fetch_error=str(exc), ) fed_count = len(federated_ids) - # Without explicit property IDs from the directory we can only - # compare counts; return None (no divergence record) when counts match. + # Count-only comparison: directory does not currently return + # per-publisher property IDs, so we cannot do a full set diff. + # Count-equality is a necessary but NOT sufficient condition for + # set-equality (three replaced properties are undetectable at + # this level). missing_in_inline/federated are None to signal + # "count-only mode" — callers must not treat [] as "no diff". if fed_count == entry.properties_authorized: - return None + return None # counts agree; set divergence undetectable here return PublisherDivergence( publisher_domain=entry.publisher_domain, directory_properties_authorized=entry.properties_authorized, federated_properties_found=fed_count, - missing_in_inline=[], - missing_in_federated=[], + missing_in_inline=None, + missing_in_federated=None, child_fetch_error=None, ) diff --git a/tests/test_adagents.py b/tests/test_adagents.py index 61cd6079..f41fbbd0 100644 --- a/tests/test_adagents.py +++ b/tests/test_adagents.py @@ -2785,8 +2785,10 @@ async def test_default_status_is_authorized(self): ) _, kwargs = mock_client.get.call_args - params = kwargs.get("params", {}) - assert params.get("status") == "authorized" + # Status uses repeated query-param keys, not comma-joining + params = kwargs.get("params", []) + status_values = [v for k, v in params if k == "status"] + assert status_values == ["authorized"] async def test_custom_directory_url(self): """directory_url prefix should be respected.""" @@ -2890,6 +2892,9 @@ async def test_count_divergence_reported(self): assert report[0].directory_properties_authorized == 5 assert report[0].federated_properties_found == 3 assert report[0].child_fetch_error is None + # Count-only mode: missing_in_* are None (not []) to signal no set-diff available + assert report[0].missing_in_inline is None + assert report[0].missing_in_federated is None async def test_child_fetch_error_recorded(self): """When fetching the child adagents.json fails, error is recorded in report.""" From 62cf427b3c5d25a925991d1163124d0e7cabc23f Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 20 May 2026 14:53:24 +0000 Subject: [PATCH 3/3] fix(adagents): address code-reviewer findings on Parts 2+3 - Docstring Example in detect_publisher_properties_divergence branches on missing_in_inline is None (count-only mode) so copy-paste doesn't raise TypeError on None. - Export AgentDirectoryLookup, AgentPublisherEntry, PublisherDivergence, DivergenceReport, fetch_agent_authorizations_from_directory, and detect_publisher_properties_divergence from adcp.__init__. - Add isinstance guard on directory response so a non-dict body raises AdagentsValidationError instead of bare AttributeError. - Use row.get() + skip-on-empty for publisher_domain instead of bare KeyError-raising access; use 'or' chain to avoid None fallback. - Assert caller-provided client is not closed in two tests. Refs #749 https://claude.ai/code/session_01RDYrywLhVbd4crrAHkTnsH --- src/adcp/__init__.py | 12 ++++++++++++ src/adcp/adagents.py | 29 ++++++++++++++++++++++++----- tests/test_adagents.py | 3 +++ 3 files changed, 39 insertions(+), 5 deletions(-) diff --git a/src/adcp/__init__.py b/src/adcp/__init__.py index 0c8272ed..a30a82ab 100644 --- a/src/adcp/__init__.py +++ b/src/adcp/__init__.py @@ -14,12 +14,18 @@ AdagentsEntryError, AdagentsValidationReport, AdAgentsValidationResult, + AgentDirectoryLookup, + AgentPublisherEntry, AuthorizationContext, DiscoveryMethod, + DivergenceReport, EntryErrorKind, + PublisherDivergence, + detect_publisher_properties_divergence, domain_matches, fetch_adagents, fetch_agent_authorizations, + fetch_agent_authorizations_from_directory, get_all_properties, get_all_tags, get_properties_by_agent, @@ -815,11 +821,17 @@ def get_adcp_version() -> str: "AdAgentsValidationResult", "AdagentsEntryError", "AdagentsValidationReport", + "AgentDirectoryLookup", + "AgentPublisherEntry", "AuthorizationContext", + "detect_publisher_properties_divergence", "DiscoveryMethod", + "DivergenceReport", "EntryErrorKind", "fetch_adagents", "fetch_agent_authorizations", + "fetch_agent_authorizations_from_directory", + "PublisherDivergence", "validate_adagents_domain", "validate_adagents_structure", "verify_agent_authorization", diff --git a/src/adcp/adagents.py b/src/adcp/adagents.py index 1c72d40b..f23a2328 100644 --- a/src/adcp/adagents.py +++ b/src/adcp/adagents.py @@ -1498,11 +1498,21 @@ async def fetch_agent_authorizations_from_directory( if own_client: await http.aclose() + if not isinstance(data, dict): + raise AdagentsValidationError( + f"Directory returned unexpected JSON type {type(data).__name__!r} " + f"for /api/v1/agents/{{agent_url}}/publishers" + ) + publishers: list[AgentPublisherEntry] = [] - for row in data.get("publishers", data.get("results", [])): + raw_rows = data.get("publishers") or data.get("results") or [] + for row in raw_rows: + domain = row.get("publisher_domain", "") + if not domain: + continue # skip malformed rows missing the required field publishers.append( AgentPublisherEntry( - publisher_domain=row["publisher_domain"], + publisher_domain=domain, discovery_method=row.get("discovery_method", "adagents_authoritative"), manager_domain=row.get("manager_domain"), properties_authorized=row.get("properties_authorized", 0), @@ -1620,9 +1630,18 @@ async def detect_publisher_properties_divergence( sample_size=100, ) for entry in report: - print(f"{entry.publisher_domain}: " - f"+{len(entry.missing_in_inline)} inline-only, " - f"+{len(entry.missing_in_federated)} federated-only") + if entry.child_fetch_error: + print(f"{entry.publisher_domain}: fetch error — {entry.child_fetch_error}") + elif entry.missing_in_inline is not None: + # Full set-diff available (future: when directory returns IDs) + print(f"{entry.publisher_domain}: " + f"+{len(entry.missing_in_inline)} inline-only, " + f"+{len(entry.missing_in_federated or [])} federated-only") + else: + # Count-only mode: missing_in_* are None + print(f"{entry.publisher_domain}: count mismatch " + f"(dir={entry.directory_properties_authorized}, " + f"federated={entry.federated_properties_found})") """ import asyncio diff --git a/tests/test_adagents.py b/tests/test_adagents.py index f41fbbd0..a88532fe 100644 --- a/tests/test_adagents.py +++ b/tests/test_adagents.py @@ -2741,6 +2741,8 @@ async def test_returns_publisher_entries(self): assert first.properties_authorized == 6843 assert first.signing_keys_pinned is True assert first.status == "authorized" + # A caller-provided client must NOT be closed by the function + mock_client.aclose.assert_not_called() async def test_url_encodes_agent_url(self): """agent_url with slashes and colons must be %-encoded in the path.""" @@ -2852,6 +2854,7 @@ async def test_no_divergence_returns_empty(self): ) assert report == [] + mock_client.aclose.assert_not_called() async def test_count_divergence_reported(self): """When directory and federated counts differ, a divergence entry is returned."""