Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/adcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@
AdagentsEntryError,
AdagentsValidationReport,
AdAgentsValidationResult,
AgentDirectoryLookup,
AgentPublisherEntry,
AuthorizationContext,
DiscoveryMethod,
DivergenceReport,
EntryErrorKind,
PublisherDivergence,
detect_publisher_properties_divergence,
domain_matches,
fetch_adagents,
fetch_agent_authorizations,
fetch_agent_authorizations_from_directory,
get_all_properties,
get_all_tags,
get_properties_by_agent,
Expand Down Expand Up @@ -815,11 +821,17 @@ def get_adcp_version() -> str:
"AdAgentsValidationResult",
"AdagentsEntryError",
"AdagentsValidationReport",
"AgentDirectoryLookup",
"AgentPublisherEntry",
"AuthorizationContext",
"detect_publisher_properties_divergence",
"DiscoveryMethod",
"DivergenceReport",
"EntryErrorKind",
"fetch_adagents",
"fetch_agent_authorizations",
"fetch_agent_authorizations_from_directory",
"PublisherDivergence",
"validate_adagents_domain",
"validate_adagents_structure",
"verify_agent_authorization",
Expand Down
341 changes: 341 additions & 0 deletions src/adcp/adagents.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import ipaddress
import re
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Literal
from urllib.parse import urlparse

Expand Down Expand Up @@ -1374,3 +1375,343 @@ async def fetch_authorization_for_domain(

# Build result dictionary, filtering out None values
return {domain: ctx for domain, ctx in results if ctx is not None}


# ---------------------------------------------------------------------------
# Part 2 — Directory inverse-lookup wrapper (adcp#4823 / spec PR adcp#4828)
# ---------------------------------------------------------------------------


@dataclass
class AgentPublisherEntry:
"""A single publisher row from the AAO directory's agent-publishers endpoint.

Fields mirror the response envelope schema (agent-publishers.json).
``discovery_method`` and ``manager_domain`` align with the provenance
vocabulary on :class:`AdAgentsValidationResult` so callers can route
directory-sourced and per-domain results through the same code path.
"""

publisher_domain: str
discovery_method: str
manager_domain: str | None
properties_authorized: int
properties_total: int
signing_keys_pinned: bool
status: str
last_verified_at: str | None


@dataclass
class AgentDirectoryLookup:
"""Envelope returned by :func:`fetch_agent_authorizations_from_directory`.

``publishers`` is the list of publisher entries for this page.
``cursor`` is ``None`` when there are no further pages.
``total`` is set when the server returns a total count.
"""

agent_url: str
publishers: list[AgentPublisherEntry]
cursor: str | None = None
total: int | None = None


async def fetch_agent_authorizations_from_directory(
agent_url: str,
directory_url: str = "https://agenticadvertising.org",
*,
since: datetime | None = None,
status: list[str] | None = None,
cursor: str | None = None,
limit: int = 200,
timeout: float = 30.0,
client: httpx.AsyncClient | None = None,
) -> AgentDirectoryLookup:
"""Return publishers that authorize ``agent_url`` from the AAO directory.

Inverse of :func:`fetch_agent_authorizations` — instead of pulling from
individual publisher adagents.json files, this queries the directory's
``GET /api/v1/agents/{agent_url}/publishers`` index.

Each returned :class:`AgentPublisherEntry` carries the same
``discovery_method`` / ``manager_domain`` provenance fields as
:class:`AdAgentsValidationResult`, so consumers can route directory-sourced
and per-domain results through the same code path.

Args:
agent_url: The sales agent URL to look up (``%``-encoded in the path).
directory_url: Base URL of the AAO directory
(default ``"https://agenticadvertising.org"``).
since: Only return publishers whose authorization was last verified
after this timestamp. ``None`` returns all.
status: Filter by authorization status (default ``["authorized"]``).
Pass ``["authorized", "revoked"]`` to include revoked entries.
cursor: Pagination cursor from a previous call's ``cursor`` field.
limit: Maximum entries per page (server-side cap may be lower).
timeout: Per-request timeout in seconds.
client: Optional ``httpx.AsyncClient`` for connection pooling.
The client is **not** closed by this function.

Returns:
:class:`AgentDirectoryLookup` with ``publishers`` for this page and
a ``cursor`` for the next page (``None`` when exhausted).

Example::

lookup = await fetch_agent_authorizations_from_directory(
"https://interchange.io",
)
print(f"{len(lookup.publishers)} publishers on first page")
while lookup.cursor:
lookup = await fetch_agent_authorizations_from_directory(
"https://interchange.io",
cursor=lookup.cursor,
)
"""
from urllib.parse import quote

if status is None:
status = ["authorized"]

encoded_agent = quote(agent_url, safe="")
url = f"{directory_url.rstrip('/')}/api/v1/agents/{encoded_agent}/publishers"

# Build params as a list of (key, value) tuples so multi-value status
# produces repeated keys (?status=authorized&status=revoked), not a
# comma-joined string that httpx would percent-encode as %2C.
param_list: list[tuple[str, str | int | float | bool | None]] = [("limit", limit)]
for s in status:
param_list.append(("status", s))
if cursor:
param_list.append(("cursor", cursor))
if since is not None:
param_list.append(("since", since.isoformat()))

own_client = client is None
http = client or httpx.AsyncClient()
try:
response = await http.get(url, params=param_list, timeout=timeout)
response.raise_for_status()
data = response.json()
finally:
if own_client:
await http.aclose()

if not isinstance(data, dict):
raise AdagentsValidationError(
f"Directory returned unexpected JSON type {type(data).__name__!r} "
f"for /api/v1/agents/{{agent_url}}/publishers"
)

publishers: list[AgentPublisherEntry] = []
raw_rows = data.get("publishers") or data.get("results") or []
for row in raw_rows:
domain = row.get("publisher_domain", "")
if not domain:
continue # skip malformed rows missing the required field
publishers.append(
AgentPublisherEntry(
publisher_domain=domain,
discovery_method=row.get("discovery_method", "adagents_authoritative"),
manager_domain=row.get("manager_domain"),
properties_authorized=row.get("properties_authorized", 0),
properties_total=row.get("properties_total", 0),
signing_keys_pinned=bool(row.get("signing_keys_pinned", False)),
status=row.get("status", "authorized"),
last_verified_at=row.get("last_verified_at"),
)
)

return AgentDirectoryLookup(
agent_url=agent_url,
publishers=publishers,
cursor=data.get("cursor") or data.get("next_cursor"),
total=data.get("total"),
)


# ---------------------------------------------------------------------------
# Part 3 — Divergence detector (adcp#4827 §Resolution-paths)
# ---------------------------------------------------------------------------


@dataclass
class PublisherDivergence:
"""Divergence record for a single publisher domain.

``missing_in_inline`` contains property IDs the federated fetch found
in the publisher's own adagents.json that the directory did not surface
(publisher has properties the directory doesn't know about yet).

``missing_in_federated`` contains property IDs the directory claims the
agent is authorized for but the publisher's own adagents.json does not
include (stale directory entry or publisher revocation).

Both fields are ``None`` when the directory does not return per-publisher
property IDs (count-only mode). In count-only mode the comparison is
limited to ``directory_properties_authorized != federated_properties_found``.
**Count-equality does NOT guarantee set equality** — if the publisher
replaced three properties with three different ones, count-only mode
produces a false-negative. Use ``?include=properties`` on the directory
endpoint (when supported) to get full set-diff precision.

``child_fetch_error`` is non-``None`` when the publisher's adagents.json
could not be fetched or parsed; the count and list fields carry no
meaning in that case.
"""

publisher_domain: str
directory_properties_authorized: int
federated_properties_found: int
missing_in_inline: list[str] | None
missing_in_federated: list[str] | None
child_fetch_error: str | None


DivergenceReport = list[PublisherDivergence]


async def detect_publisher_properties_divergence(
agent_url: str,
directory_url: str = "https://agenticadvertising.org",
*,
sample_size: int | None = None,
timeout: float = 30.0,
client: httpx.AsyncClient | None = None,
) -> DivergenceReport:
"""Compare directory inline resolution against per-child federated resolution.

For each publisher the directory lists under ``agent_url``:

1. Read the directory's ``properties_authorized`` count (inline result).
2. Fetch the publisher's own adagents.json directly (federated result).
3. Apply the same agent-URL filter via :func:`get_properties_by_agent`.
4. Compare counts. When they differ, emit a :class:`PublisherDivergence`.

Per adcp#4827 §Resolution-paths, the federated result is authoritative
when the two paths disagree. This function surfaces count-level divergence
so operators can detect data-integrity issues before they affect buyers.

**Known limitation — count-only comparison.** The AAO directory
endpoint currently returns ``properties_authorized`` counts, not
property-ID lists. Count-equality does NOT guarantee set equality:
if a publisher replaced three old properties with three new ones, this
function reports no divergence. ``PublisherDivergence.missing_in_inline``
and ``.missing_in_federated`` are ``None`` (not ``[]``) to signal
count-only mode. A future call to ``?include=properties`` on the
directory endpoint will enable full set-diff once that parameter is
deployed.

**Cost warning — ``sample_size`` is mandatory for large networks.**
Running a full sweep against cafemedia's ~6,800 child publishers launches
~6,800 concurrent HTTP fetches. With a 30 s timeout each, total wall-clock
is bounded by the slowest fetch, but server-side rate limits may apply.
Pass ``sample_size=N`` to cap the sweep; the sample is taken from the
first page of directory results.

Args:
agent_url: The agent URL to check authorizations for.
directory_url: AAO directory base URL.
sample_size: Maximum number of publisher domains to probe. ``None``
sweeps all pages (full network — may be very slow).
timeout: Per-request timeout for both directory and child fetches.
client: Optional shared ``httpx.AsyncClient``.

Returns:
:class:`DivergenceReport` — empty list means no divergence detected.
Only publishers where the two paths disagree (or where the child
fetch failed) appear in the report.

Example::

report = await detect_publisher_properties_divergence(
"https://interchange.io",
sample_size=100,
)
for entry in report:
if entry.child_fetch_error:
print(f"{entry.publisher_domain}: fetch error — {entry.child_fetch_error}")
elif entry.missing_in_inline is not None:
# Full set-diff available (future: when directory returns IDs)
print(f"{entry.publisher_domain}: "
f"+{len(entry.missing_in_inline)} inline-only, "
f"+{len(entry.missing_in_federated or [])} federated-only")
else:
# Count-only mode: missing_in_* are None
print(f"{entry.publisher_domain}: count mismatch "
f"(dir={entry.directory_properties_authorized}, "
f"federated={entry.federated_properties_found})")
"""
import asyncio

own_client = client is None
http = client or httpx.AsyncClient()

try:
# Collect the publisher list from the directory (paged, sample capped).
all_entries: list[AgentPublisherEntry] = []
page_cursor: str | None = None
while True:
page = await fetch_agent_authorizations_from_directory(
agent_url,
directory_url=directory_url,
cursor=page_cursor,
timeout=timeout,
client=http,
)
all_entries.extend(page.publishers)
if sample_size is not None and len(all_entries) >= sample_size:
all_entries = all_entries[:sample_size]
break
page_cursor = page.cursor
if not page_cursor:
break

async def _probe(entry: AgentPublisherEntry) -> PublisherDivergence | None:
try:
data = await fetch_adagents(
entry.publisher_domain, timeout=timeout, client=http
)
federated_props = get_properties_by_agent(data, agent_url)
federated_ids = {
p.get("property_id")
for p in federated_props
if p.get("property_id")
}
except (AdagentsNotFoundError, AdagentsValidationError, AdagentsTimeoutError) as exc:
return PublisherDivergence(
publisher_domain=entry.publisher_domain,
directory_properties_authorized=entry.properties_authorized,
federated_properties_found=0,
# None = count-only mode; IDs unavailable from directory
missing_in_inline=None,
missing_in_federated=None,
child_fetch_error=str(exc),
)

fed_count = len(federated_ids)
# Count-only comparison: directory does not currently return
# per-publisher property IDs, so we cannot do a full set diff.
# Count-equality is a necessary but NOT sufficient condition for
# set-equality (three replaced properties are undetectable at
# this level). missing_in_inline/federated are None to signal
# "count-only mode" — callers must not treat [] as "no diff".
if fed_count == entry.properties_authorized:
return None # counts agree; set divergence undetectable here

return PublisherDivergence(
publisher_domain=entry.publisher_domain,
directory_properties_authorized=entry.properties_authorized,
federated_properties_found=fed_count,
missing_in_inline=None,
missing_in_federated=None,
child_fetch_error=None,
)

probes = await asyncio.gather(*[_probe(e) for e in all_entries])
finally:
if own_client:
await http.aclose()

return [p for p in probes if p is not None]
Loading
Loading