From 316381d16404d5f60137d88363ea52dba82d545c Mon Sep 17 00:00:00 2001 From: Brian O'Kelley Date: Thu, 14 May 2026 04:36:51 -0400 Subject: [PATCH] fix(server): Tier 2 permission-denied parity contract (#392) Folds the four PERMISSION_DENIED branches in `_resolve_buyer_agent` through a single emit point with: - A single internal PermissionDeniedError + translator (was 4 inline envelope sites). - Deadline-relative latency budget on every denial path, env-tunable via ADCP_PERMISSION_DENIED_BUDGET_MS (default 50 ms). - Audit-row parity: uniform operation label, uniform key set, agent_url hashed-truncated to defend against log-scraping. - Header-parity tradeoff documented (Content-Length variance within the spec's omit-on-unestablished-identity allowance). Closes the structural / latency / observability gaps deferred from #375 (wire-code rename). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/adcp/decisioning/handler.py | 166 +++--- src/adcp/decisioning/permission_denied.py | 367 +++++++++++++ tests/test_tier2_parity_contract.py | 633 ++++++++++++++++++++++ tests/test_tier2_spec_conformance.py | 12 +- 4 files changed, 1106 insertions(+), 72 deletions(-) create mode 100644 src/adcp/decisioning/permission_denied.py create mode 100644 tests/test_tier2_parity_contract.py diff --git a/src/adcp/decisioning/handler.py b/src/adcp/decisioning/handler.py index 0dc431e59..4a79fad72 100644 --- a/src/adcp/decisioning/handler.py +++ b/src/adcp/decisioning/handler.py @@ -186,6 +186,7 @@ if TYPE_CHECKING: from concurrent.futures import ThreadPoolExecutor + from adcp.audit_sink import AuditSink from adcp.decisioning.platform import DecisioningPlatform from adcp.decisioning.property_list import PropertyListFetcher from adcp.decisioning.registry import BuyerAgent, BuyerAgentRegistry @@ -441,6 +442,8 @@ async def _resolve_buyer_agent( registry: BuyerAgentRegistry, auth_info: AuthInfo | None, + *, + audit_sink: AuditSink | None = None, ) -> BuyerAgent: """Resolve a :class:`BuyerAgent` from a wired registry. @@ -477,13 +480,40 @@ async def _resolve_buyer_agent( ``details`` OMITTED — scope MUST NOT be set on the unestablished- identity path (omit-on-unestablished-identity rule). - Note on parity: the *latency / headers / side-effects* parity - contract between the recognized and unrecognized paths is tracked - as a follow-up — the eager-raise pattern below still completes the - unrecognized path on a different code path than the recognized - one. Renaming closes the wire-code mismatch; folding all four - paths through a common emit point with deliberate latency padding - and identical audit/metric side-effects is the next step. + Parity contract + --------------- + + All four denial paths funnel through + :func:`adcp.decisioning.permission_denied.raise_permission_denied`, + which: + + * Emits a single :class:`~adcp.audit_sink.AuditEvent` with the + uniform operation label ``buyer_agent_registry.permission_denied`` + and the same key set in ``details`` on every path. The + ``agent_url`` is hashed-truncated before it reaches the sink so + log-scraping cannot reconstruct the side channel from internal + telemetry. + * Sleeps until a shared deadline + (``perf_counter() + ADCP_PERMISSION_DENIED_BUDGET_MS / 1000``, + default 50 ms). Deadline-relative rather than fixed-duration so + audit-sink latency variance is absorbed into the budget rather + than extending past it. + * Raises via a single translator + (:func:`~adcp.decisioning.permission_denied.translate_to_adcp_error`) + so the wire envelope shape is identical across the four + branches modulo the ``details`` payload (omitted on + unrecognized; populated with ``{scope, status, agent_url}`` on + recognized). + + See the module docstring of :mod:`adcp.decisioning.permission_denied` + for the latency-budget and header-parity tradeoffs. + + :param audit_sink: Optional :class:`~adcp.audit_sink.AuditSink` for + denial events. When ``None`` (default), denial events log at + ``DEBUG`` instead. Wired by ``PlatformHandler`` from the + adopter-supplied sink; unset paths (test fixtures, sinkless + deployments) still get the latency-budget and translator + parity guarantees. :raises AdcpError: ``PERMISSION_DENIED`` (all four denial paths). Recovery is ``correctable`` per the spec's ``enumMetadata`` @@ -494,11 +524,18 @@ async def _resolve_buyer_agent( the signal callers surface to a human operator rather than loop on the request. """ + from adcp.decisioning.permission_denied import ( + PermissionDeniedError, + PermissionDeniedReason, + raise_permission_denied, + translate_to_adcp_error, + ) from adcp.decisioning.registry import ( ApiKeyCredential, HttpSigCredential, OAuthCredential, ) + from adcp.decisioning.registry_cache import _current_tenant_id from adcp.decisioning.types import AdcpError credential = auth_info.credential if auth_info is not None else None @@ -514,6 +551,10 @@ async def _resolve_buyer_agent( # with INTERNAL_ERROR rather than silently passing the # request through (which would skip the registry gate # entirely and leak the upgrade footgun into production). + # NOT routed through the PERMISSION_DENIED parity gate: + # an INTERNAL_ERROR is an adopter-config bug, not a + # commercial-identity rejection, and adopters need a + # distinct wire signal to fix it. raise AdcpError( "INTERNAL_ERROR", message=( @@ -526,69 +567,53 @@ async def _resolve_buyer_agent( recovery="terminal", ) - # Generic message used on every denial path — MUST be identical - # across the unrecognized and the recognized-but-denied paths so - # the wire-level error.message is not itself a side channel - # leaking which agent_urls are onboarded with which sellers. The - # discriminator (when present at all) is in details, only on the - # recognized-but-denied paths. - _denied_message = ( - "Buyer agent is not authorized for this seller. The seller's " - "commercial allowlist did not authorize this credential. " - "Resolve out-of-band via the seller's onboarding contact; this " - "is not a request-side error the buyer can correct." - ) - + # Build the per-branch reason; the actual emit-then-raise happens + # in a single helper below so audit/latency/translator parity is + # structurally enforced rather than per-branch-duplicated. + reason: PermissionDeniedReason | None if agent is None: - # Registry miss / no credential. ``details`` is OMITTED — the - # spec's omit-on-unestablished-identity rule says the - # unrecognized-agent path MUST be indistinguishable on the - # wire from the recognized-but-denied path, and ``scope`` - # would itself be the discriminator. - raise AdcpError( - "PERMISSION_DENIED", - message=_denied_message, - recovery="correctable", - ) - - if agent.status == "active": + # Unrecognized: registry miss or no credential. ``scope`` is + # None so the wire ``details`` is OMITTED. ``agent_url`` is + # None because we have no defensible URL to project (the + # client-provided URL on a registry miss is NOT defensible — + # echoing it would let an attacker confirm probe inputs). + reason = PermissionDeniedReason(scope=None, status=None, agent_url=None) + elif agent.status == "active": return agent - if agent.status == "suspended": - raise AdcpError( - "PERMISSION_DENIED", - message=_denied_message, - recovery="correctable", - details={ - "scope": "agent", - "status": "suspended", - "agent_url": agent.agent_url, - }, + elif agent.status == "suspended": + reason = PermissionDeniedReason( + scope="agent", status="suspended", agent_url=agent.agent_url ) - if agent.status == "blocked": - raise AdcpError( - "PERMISSION_DENIED", - message=_denied_message, - recovery="correctable", - details={ - "scope": "agent", - "status": "blocked", - "agent_url": agent.agent_url, - }, + elif agent.status == "blocked": + reason = PermissionDeniedReason(scope="agent", status="blocked", agent_url=agent.agent_url) + else: + # Default-reject any non-active status the framework doesn't + # recognize (typo, future enum value, adopter-custom string). + # ``details`` is OMITTED for the same reason as the + # registry-miss branch — the framework cannot defensibly + # project an unknown status string on the wire (it might + # encode commercial state the framework doesn't understand). + # The agent_url is also omitted from the wire ``details`` for + # the same defensible-claim reason, but is passed through to + # the audit row's ``agent_url_hash`` so SecOps can correlate + # unknown-status events to the underlying row internally. + reason = PermissionDeniedReason(scope=None, status=None, agent_url=agent.agent_url) + + # Single emit point: audit -> sleep-to-deadline -> raise. Catching + # the internal PermissionDeniedError immediately and re-raising as + # AdcpError keeps the exception type adopters see stable across + # the parity refactor — only the framework sees + # PermissionDeniedError in flight. + try: + await raise_permission_denied( + reason, + audit_sink=audit_sink, + tenant_id=_current_tenant_id(), ) - # Default-reject any non-active status the framework doesn't - # recognize (typo, future enum value, adopter-custom string). A - # silent fall-through to "active" would leak commercial state - # past the gate. ``details`` is OMITTED for the same reason as - # the registry-miss branch — the framework treats unknown statuses - # as the unrecognized-identity path (the row is in the registry - # but the framework cannot interpret it, which is operationally - # equivalent to "not authorized" without a defensible status - # claim to project on the wire). - raise AdcpError( - "PERMISSION_DENIED", - message=_denied_message, - recovery="correctable", - ) + except PermissionDeniedError as denied: + raise translate_to_adcp_error(denied.reason) from None + # Unreachable: raise_permission_denied always raises. + raise AssertionError("raise_permission_denied returned without raising — framework bug") def _project_build_creative(result: Any) -> Any: @@ -999,6 +1024,7 @@ def __init__( webhook_supervisor: WebhookDeliverySupervisor | None = None, auto_emit_completion_webhooks: bool = True, buyer_agent_registry: BuyerAgentRegistry | None = None, + permission_denied_audit_sink: AuditSink | None = None, config_store: ProductConfigStore | None = None, property_list_fetcher: PropertyListFetcher | None = None, advertise_all: bool = False, @@ -1013,6 +1039,13 @@ def __init__( self._webhook_supervisor = webhook_supervisor self._auto_emit_completion_webhooks = auto_emit_completion_webhooks self._buyer_agent_registry = buyer_agent_registry + # Audit sink for the Tier 2 commercial-identity gate's parity + # contract. Optional — when None, denials log at DEBUG. See + # adcp.decisioning.permission_denied for the parity-contract + # rationale (audit emission is one of three side-effects that + # must match across the four denial branches; the other two + # are wire envelope and latency budget). + self._permission_denied_audit_sink = permission_denied_audit_sink self._config_store = config_store self._property_list_fetcher = property_list_fetcher self._advertise_all = advertise_all @@ -1059,6 +1092,7 @@ async def _prime_auth_context(self, ctx: ToolContext) -> None: buyer_agent = await _resolve_buyer_agent( self._buyer_agent_registry, auth_info, + audit_sink=self._permission_denied_audit_sink, ) ctx.metadata[_BUYER_AGENT_METADATA_KEY] = buyer_agent diff --git a/src/adcp/decisioning/permission_denied.py b/src/adcp/decisioning/permission_denied.py new file mode 100644 index 000000000..9ef72b557 --- /dev/null +++ b/src/adcp/decisioning/permission_denied.py @@ -0,0 +1,367 @@ +"""Tier 2 commercial-identity gate — single emit point for the four +PERMISSION_DENIED denial paths in +:func:`adcp.decisioning.handler._resolve_buyer_agent`. + +The cross-tenant onboarding-oracle clamp in the AdCP spec requires the +unrecognized-agent path and the recognized-but-denied path to be +observably indistinguishable to an external attacker across: + +* HTTP status code (aligned via the shared ``PERMISSION_DENIED`` envelope). +* Response headers — same ``Content-Type``; ``Content-Length`` within a + documented tolerance (see the *Header parity* note below). +* Side effects — same audit-row shape, same metric label. +* Latency — bounded by an explicit budget so branch-variance is dominated + by the budget rather than the code path. +* Observability — same operation label; the discriminator (``agent_url``) + is hashed-truncated before it reaches the audit sink so log-scraping + cannot rebuild the side channel from internal telemetry. + +Latency budget tradeoff +----------------------- + +Every denial path executes ``await asyncio.sleep(...)`` to a shared +deadline. The default is 50 ms; adopters tune via the +``ADCP_PERMISSION_DENIED_BUDGET_MS`` env var. The tradeoff: + +* Higher budget → tighter latency parity (an external attacker's timing + oracle is dominated by the budget, not by code-path variance). +* Higher budget → more wall-clock latency on every rejection, which + matters for buyers in legitimate operator-suspension states who get a + fast rejection today. + +50 ms was chosen as a compromise — large enough that audit-emit and +serialization variance (typically <5 ms in the framework's reference +sinks) is absorbed by the budget; small enough that legitimate buyers +don't notice. Adopters running into real DB-backed audit sinks with +higher tail latency (p99 > 30 ms) should raise the budget so the budget +continues to dominate the variance. + +Header parity tradeoff +---------------------- + +The ``details`` payload differs across paths: + +* Recognized + suspended/blocked → ``{scope, status, agent_url}``. +* Unrecognized → ``details`` OMITTED entirely (spec rule: + omit-on-unestablished-identity — see ``schemas/cache/error-details/ + agent-permission-denied.json``). + +The omit rule prevents padding the unrecognized envelope. The +``Content-Length`` variance between paths is therefore non-zero: +the recognized-but-denied envelope is ~80 bytes longer than the +unrecognized envelope on a typical ``agent_url``. + +However, the recognized branches *already* vary in ``Content-Length`` +among themselves because ``agent_url`` is buyer-controlled — an +attacker watching ``Content-Length`` cannot distinguish "this is +unrecognized" from "this is suspended with a short ``agent_url``". +So the existing intra-recognized variance dominates the recognized-vs- +unrecognized delta. The tradeoff: we accept the ~80-byte delta on the +unrecognized path because closing it (e.g., padding all recognized +envelopes to a fixed size) would require choosing a worst-case +``agent_url`` length that buyers might exceed. + +Adopters that want hard ``Content-Length`` parity wrap the transport +layer with a fixed-width response middleware — out of scope for this +gate. + +Audit-row parity +---------------- + +Every denial path emits a single :class:`~adcp.audit_sink.AuditEvent` +with the same operation label (``buyer_agent_registry.permission_denied``) +and the same key set in ``details``: + +* ``outcome`` — always ``"denied"``. +* ``reason_scope`` — ``"agent"`` (recognized) or ``None`` (unrecognized). +* ``reason_status`` — ``"suspended"`` / ``"blocked"`` (recognized) or + ``None`` (unrecognized). +* ``agent_url_hash`` — first 12 chars of ``sha256(agent_url)`` when an + ``agent_url`` is known; ``None`` otherwise. + +The hash truncation defends against log-scraping: an operator with read +access to the audit sink can correlate denial events across requests +(same hash = same agent_url) without learning the underlying URL. +""" + +from __future__ import annotations + +import asyncio +import hashlib +import logging +import os +import time +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Literal + +from adcp.decisioning.types import AdcpError + +if TYPE_CHECKING: + from adcp.audit_sink import AuditSink + +logger = logging.getLogger(__name__) + + +# Env var adopters tune. Read on every denial so a runtime config flip +# takes effect without a process restart. +BUDGET_ENV_VAR = "ADCP_PERMISSION_DENIED_BUDGET_MS" +DEFAULT_BUDGET_MS = 50.0 + +# Operation label emitted on every denial — uniform across the four +# branches so audit queries can filter the gate's traffic without +# branching on the discriminator. +AUDIT_OPERATION = "buyer_agent_registry.permission_denied" + +# Generic message used on every denial path. MUST be identical across +# the unrecognized and the recognized-but-denied paths so the wire-level +# ``error.message`` is not itself a side channel leaking which +# ``agent_url``s are onboarded with which sellers. +DENIED_MESSAGE = ( + "Buyer agent is not authorized for this seller. The seller's " + "commercial allowlist did not authorize this credential. " + "Resolve out-of-band via the seller's onboarding contact; this " + "is not a request-side error the buyer can correct." +) + + +@dataclass(frozen=True) +class PermissionDeniedReason: + """Internal reason carried by :class:`PermissionDeniedError`. + + Projected to the wire envelope by :func:`translate_to_adcp_error`. + ``scope is None`` → unrecognized path (no ``details`` on the wire). + ``scope == "agent"`` → recognized-but-denied path (``details`` carries + ``scope`` + ``status`` + ``agent_url``). + + :param scope: ``"agent"`` for recognized-but-denied; ``None`` for + unrecognized (registry miss, no credential, unknown status). + :param status: Agent's status string on recognized paths + (``"suspended"`` / ``"blocked"``). ``None`` on unrecognized. + :param agent_url: The agent's URL when known. Used to populate the + wire ``details.agent_url`` (recognized paths only) and to + compute the audit-row ``agent_url_hash`` (all paths where known). + """ + + scope: Literal["agent"] | None + status: str | None = None + agent_url: str | None = None + + +class PermissionDeniedError(Exception): + """Internal exception raised by :func:`_resolve_buyer_agent` branches. + + Caught at the function's tail and projected to :class:`AdcpError` + via :func:`translate_to_adcp_error` after the audit emission and + latency-budget sleep have run. Adopter code does NOT see this + type — it's a framework-internal marker. + """ + + def __init__(self, reason: PermissionDeniedReason) -> None: + super().__init__(f"PermissionDenied(scope={reason.scope})") + self.reason = reason + + +def get_latency_budget_seconds() -> float: + """Read the latency budget in seconds from the env var, with fallback. + + Re-read on every call so a runtime config flip (e.g., adopter raises + the budget during an incident response) takes effect without a + process restart. Malformed values fall back to the default with a + one-shot warning — fail-open rather than crash the dispatch path. + """ + # Literal string (rather than ``os.environ.get(BUDGET_ENV_VAR)``) + # so the framework's docstring/env-var drift test + # (``tests/test_docstring_consistency.py``) can detect this read + # via static regex scan — its detector matches string literals + # only, not constant references. + raw = os.environ.get("ADCP_PERMISSION_DENIED_BUDGET_MS") + if raw is None: + return DEFAULT_BUDGET_MS / 1000.0 + try: + ms = float(raw) + except ValueError: + logger.warning( + "[adcp.permission_denied] %s=%r is not a number; " "falling back to default %.1f ms", + BUDGET_ENV_VAR, + raw, + DEFAULT_BUDGET_MS, + ) + return DEFAULT_BUDGET_MS / 1000.0 + if ms < 0: + logger.warning( + "[adcp.permission_denied] %s=%r is negative; " "falling back to default %.1f ms", + BUDGET_ENV_VAR, + raw, + DEFAULT_BUDGET_MS, + ) + return DEFAULT_BUDGET_MS / 1000.0 + return ms / 1000.0 + + +def hash_discriminator(value: str) -> str: + """Hash-truncate a discriminator string for audit emission. + + SHA-256 (12 hex chars = 48 bits) is overkill for collision resistance + in this domain — the threat is log-scraping correlation, not + cryptographic forgery. 12 chars matches the convention used by + git short SHAs and is long enough that accidental collisions across + distinct ``agent_url``s in the same tenant are negligible. + """ + return hashlib.sha256(value.encode("utf-8")).hexdigest()[:12] + + +def translate_to_adcp_error(reason: PermissionDeniedReason) -> AdcpError: + """Project a :class:`PermissionDeniedReason` to the wire envelope. + + Single translator — every denial path goes through here so the + wire shape is uniform. ``details`` is populated only on the + recognized-but-denied paths (``reason.scope is not None``); the + unrecognized path emits a byte-equivalent envelope with no + ``details`` per the omit-on-unestablished-identity rule. + + The ``message`` and ``recovery`` are identical on every path. + """ + if reason.scope is None: + # Unrecognized path — registry miss, no credential, unknown + # status. ``details`` is OMITTED. ``scope`` would itself be the + # discriminator that leaks onboarding state to an external + # attacker. + return AdcpError( + "PERMISSION_DENIED", + message=DENIED_MESSAGE, + recovery="correctable", + ) + # Recognized-but-denied path. ``details`` carries the discriminator. + details: dict[str, object] = {"scope": reason.scope} + if reason.status is not None: + details["status"] = reason.status + if reason.agent_url is not None: + details["agent_url"] = reason.agent_url + return AdcpError( + "PERMISSION_DENIED", + message=DENIED_MESSAGE, + recovery="correctable", + details=details, + ) + + +async def _emit_denial_audit( + reason: PermissionDeniedReason, + *, + audit_sink: AuditSink | None, + tenant_id: str | None, + sink_timeout_seconds: float = 5.0, +) -> None: + """Emit a single audit event with a uniform shape across every denial. + + Same operation label, same key set in ``details`` (values vary per + path). The ``agent_url`` is hashed-truncated so an operator reading + the audit trail cannot reconstruct which URLs the seller has + onboarded — log-scraping is closed as a side channel. + + Sink failures are bounded + swallowed — a stalled sink NEVER blocks + the gate. The latency budget downstream is still honored because + this function returns within ``sink_timeout_seconds`` worst-case. + """ + from adcp.audit_sink import AuditEvent + + details: dict[str, object | None] = { + "outcome": "denied", + "reason_scope": reason.scope, + "reason_status": reason.status, + "agent_url_hash": ( + hash_discriminator(reason.agent_url) if reason.agent_url is not None else None + ), + } + + event = AuditEvent( + operation=AUDIT_OPERATION, + success=False, + occurred_at=datetime.now(timezone.utc), + tenant_id=tenant_id, + details=details, + ) + + if audit_sink is None: + logger.debug( + "[adcp.permission_denied] %s outcome=denied scope=%s status=%s", + AUDIT_OPERATION, + reason.scope, + reason.status, + ) + return + try: + await asyncio.wait_for(audit_sink.record(event), timeout=sink_timeout_seconds) + except asyncio.TimeoutError: + logger.warning( + "[adcp.permission_denied] audit sink %s timed out after %ss", + type(audit_sink).__name__, + sink_timeout_seconds, + ) + except Exception: # noqa: BLE001 — sink failures must not propagate + logger.warning( + "[adcp.permission_denied] audit sink %s raised", + type(audit_sink).__name__, + exc_info=True, + ) + + +async def raise_permission_denied( + reason: PermissionDeniedReason, + *, + audit_sink: AuditSink | None = None, + tenant_id: str | None = None, + sink_timeout_seconds: float = 5.0, +) -> None: + """Single emit point for every PERMISSION_DENIED branch. + + Order on every path is deliberately identical: + + 1. Capture the deadline (``now + budget``) BEFORE any I/O. + 2. Emit the uniform audit event (variable wall-clock; sink-dependent). + 3. Sleep until the deadline so total wall-clock from call entry to + raise is dominated by the budget rather than by step 2's + variance. + 4. Raise the wire ``AdcpError`` via the single translator. + + Step 3's deadline-relative sleep (rather than a fixed + ``sleep(budget)``) is what closes the timing oracle: if step 2 + takes longer on one path than another, the remaining sleep + automatically shrinks to keep the total constant. + + Reasoning on ordering: + + * Audit-emit BEFORE the sleep means a slow sink absorbs into the + budget rather than extending past it — a denied request never + takes longer than ``budget + sink_timeout_seconds`` regardless of + branch. + * Sleep BEFORE the raise means the wall-clock between request entry + and the wire response is dominated by the budget, not by the + raise/serialize path. + """ + deadline = time.perf_counter() + get_latency_budget_seconds() + await _emit_denial_audit( + reason, + audit_sink=audit_sink, + tenant_id=tenant_id, + sink_timeout_seconds=sink_timeout_seconds, + ) + remaining = deadline - time.perf_counter() + if remaining > 0: + await asyncio.sleep(remaining) + raise PermissionDeniedError(reason) + + +__all__ = [ + "AUDIT_OPERATION", + "BUDGET_ENV_VAR", + "DEFAULT_BUDGET_MS", + "DENIED_MESSAGE", + "PermissionDeniedError", + "PermissionDeniedReason", + "get_latency_budget_seconds", + "hash_discriminator", + "raise_permission_denied", + "translate_to_adcp_error", +] diff --git a/tests/test_tier2_parity_contract.py b/tests/test_tier2_parity_contract.py new file mode 100644 index 000000000..1060008e2 --- /dev/null +++ b/tests/test_tier2_parity_contract.py @@ -0,0 +1,633 @@ +"""Tier 2 commercial-identity gate — latency / headers / side-effects +parity contract. + +Follow-up to PR #375 (wire-code rename) per issue #392. The +:file:`test_tier2_spec_conformance.py` file pins the wire-shape +conformance for the four denial paths in +:func:`adcp.decisioning.handler._resolve_buyer_agent`; this file pins +the *parity* guarantees the cross-tenant onboarding-oracle clamp +requires: + +* **Latency parity** — p99 difference between the unrecognized and + recognized-but-suspended paths is bounded below the latency budget + (50 ms default). +* **Status-code parity** — every denial path raises + ``AdcpError("PERMISSION_DENIED", recovery="correctable")``. +* **Header parity** — wire envelopes serialize with the same + ``Content-Type`` and ``Content-Length`` within a documented + tolerance (the ``details`` payload's size variance is the + legitimate, spec-allowed delta). +* **Audit / metric parity** — every denial emits one + :class:`~adcp.audit_sink.AuditEvent` with the same operation label + and the same key set in ``details``. The discriminator + (``agent_url``) is hashed-truncated before reaching the sink so + log-scraping cannot reconstruct the side channel. + +See :mod:`adcp.decisioning.permission_denied` for the latency-budget +and header-parity tradeoff documentation. +""" + +from __future__ import annotations + +import json +import statistics +import time +from concurrent.futures import ThreadPoolExecutor +from typing import Any + +import pytest + +from adcp.audit_sink import AuditEvent, AuditSink +from adcp.decisioning import ( + AdcpError, + AuthInfo, + BuyerAgent, + DecisioningCapabilities, + DecisioningPlatform, + HttpSigCredential, + InMemoryTaskRegistry, + SingletonAccounts, + signing_only_registry, +) +from adcp.decisioning.handler import PlatformHandler +from adcp.decisioning.permission_denied import ( + AUDIT_OPERATION, + BUDGET_ENV_VAR, + PermissionDeniedReason, + hash_discriminator, + translate_to_adcp_error, +) +from adcp.server.base import ToolContext + +# --------------------------------------------------------------------------- +# Test fixtures + helpers +# --------------------------------------------------------------------------- + + +class _CollectingSink: + """In-memory :class:`AuditSink` for parity assertions. + + Records every event verbatim so tests can assert per-path key set + + value shape without an external dependency. NOT a Mock — calling + :meth:`AuditSink.record` on a ``MagicMock`` swallows the real + Pydantic validation, which is what catches schema regressions. + """ + + def __init__(self) -> None: + self.events: list[AuditEvent] = [] + + async def record(self, event: AuditEvent) -> None: + self.events.append(event) + + +@pytest.fixture +def collecting_sink() -> _CollectingSink: + return _CollectingSink() + + +@pytest.fixture +def executor(): + pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="tier2-parity-") + yield pool + pool.shutdown(wait=True) + + +@pytest.fixture +def short_budget_ms(monkeypatch): + """5 ms budget for fast-running tests that don't measure latency. + + The default 50 ms budget would make the 200-iteration latency + test take ~10s. Tests that DO measure latency override this + fixture explicitly to keep the budget at the production value + (so they exercise the same code path adopters run). + """ + monkeypatch.setenv(BUDGET_ENV_VAR, "5") + yield 5.0 + + +def _signed_auth_info(agent_url: str) -> AuthInfo: + return AuthInfo( + kind="http_sig", + credential=HttpSigCredential( + kind="http_sig", + keyid="kid-1", + agent_url=agent_url, + verified_at=1700000000.0, + ), + ) + + +class _RejectingPlatform(DecisioningPlatform): + """Test platform whose every method asserts it's not invoked. + + The Tier 2 gate runs BEFORE the platform method, so any platform + method invocation here means the gate let the request through. + """ + + capabilities = DecisioningCapabilities() + accounts = SingletonAccounts(account_id="acct-1") + + async def get_products(self, req, ctx): # pragma: no cover - asserts not called + raise AssertionError("Tier 2 gate should have rejected before this") + + +def _make_handler( + platform: DecisioningPlatform, + executor: ThreadPoolExecutor, + registry, + *, + audit_sink: AuditSink | None = None, +) -> PlatformHandler: + return PlatformHandler( + platform, + executor=executor, + registry=InMemoryTaskRegistry(), + buyer_agent_registry=registry, + permission_denied_audit_sink=audit_sink, + ) + + +def _make_registry(agent: BuyerAgent | None): + """Build a signing-only registry that returns ``agent`` for any URL.""" + + async def lookup(_url: str) -> BuyerAgent | None: + return agent + + return signing_only_registry(lookup) + + +async def _trigger_denial( + *, + executor: ThreadPoolExecutor, + agent: BuyerAgent | None, + audit_sink: AuditSink | None = None, + auth: AuthInfo | None = None, +) -> AdcpError: + """Drive one request through the gate and return the AdcpError raised.""" + from adcp.types import GetProductsRequest + + handler = _make_handler( + _RejectingPlatform(), + executor, + _make_registry(agent), + audit_sink=audit_sink, + ) + ctx_metadata: dict[str, Any] = {} + if auth is not None: + ctx_metadata["adcp.auth_info"] = auth + with pytest.raises(AdcpError) as exc: + await handler.get_products( + GetProductsRequest(buying_mode="brief", brief="any"), + ToolContext(metadata=ctx_metadata), + ) + return exc.value + + +# --------------------------------------------------------------------------- +# Translator parity — wire shape is uniform modulo `details` +# --------------------------------------------------------------------------- + + +def test_translator_unrecognized_omits_details() -> None: + """``scope is None`` → ``details`` is OMITTED entirely (not ``{}``). + + Per the omit-on-unestablished-identity rule in the spec + (``schemas/cache/error-details/agent-permission-denied.json``). + """ + reason = PermissionDeniedReason(scope=None, status=None, agent_url=None) + err = translate_to_adcp_error(reason) + assert err.code == "PERMISSION_DENIED" + assert err.recovery == "correctable" + # ``details`` is the empty dict on the AdcpError instance (the + # constructor normalizes None → {}). ``to_wire()`` then omits the + # key entirely. That's the byte-equivalence guarantee. + assert err.details == {} + assert "details" not in err.to_wire() + + +def test_translator_unrecognized_drops_agent_url_even_when_known() -> None: + """Defense in depth: even when the framework KNOWS an ``agent_url`` + (unknown-status branch), the unrecognized translator path drops it + from the wire envelope. Echoing the URL would let an attacker + confirm probe inputs even though ``scope`` is omitted. + """ + reason = PermissionDeniedReason(scope=None, status=None, agent_url="https://known/") + err = translate_to_adcp_error(reason) + assert err.details == {} + assert "details" not in err.to_wire() + + +def test_translator_recognized_includes_full_details() -> None: + reason = PermissionDeniedReason(scope="agent", status="suspended", agent_url="https://s/") + err = translate_to_adcp_error(reason) + wire = err.to_wire() + assert wire["code"] == "PERMISSION_DENIED" + assert wire["recovery"] == "correctable" + assert wire["details"] == { + "scope": "agent", + "status": "suspended", + "agent_url": "https://s/", + } + + +# --------------------------------------------------------------------------- +# Status code parity across all 4 denial paths +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_status_code_parity_across_four_paths(executor, short_budget_ms) -> None: + """All four denial branches raise the same ``code`` + ``recovery``. + + Status-code parity is the most basic parity guarantee — an attacker + who sees a distinct code on one path has the side channel. Verified + here even though the spec-conformance file pins the code on each + path individually; this asserts they're pairwise identical. + """ + # Registry miss (unrecognized). + err_miss = await _trigger_denial( + executor=executor, agent=None, auth=_signed_auth_info("https://x/") + ) + # No credential (unrecognized). + err_no_cred = await _trigger_denial(executor=executor, agent=None, auth=None) + # Suspended (recognized). + err_susp = await _trigger_denial( + executor=executor, + agent=BuyerAgent(agent_url="https://s/", display_name="S", status="suspended"), + auth=_signed_auth_info("https://s/"), + ) + # Blocked (recognized). + err_blocked = await _trigger_denial( + executor=executor, + agent=BuyerAgent(agent_url="https://b/", display_name="B", status="blocked"), + auth=_signed_auth_info("https://b/"), + ) + + errs = [err_miss, err_no_cred, err_susp, err_blocked] + codes = {e.code for e in errs} + recoveries = {e.recovery for e in errs} + assert codes == {"PERMISSION_DENIED"} + assert recoveries == {"correctable"} + # Same `message` across paths — the message itself MUST NOT vary. + messages = {str(e.args[0]) for e in errs} + assert len(messages) == 1, f"message variance leaks branch identity: {messages}" + + +# --------------------------------------------------------------------------- +# Audit / metric parity — same op label, same key set +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_audit_emit_same_operation_label_across_paths( + executor, collecting_sink, short_budget_ms +) -> None: + """Every denial branch emits exactly one event with the same + ``operation`` label. Operation-label variance would let log- + scraping reconstruct the branch. + """ + paths = [ + # (agent, auth) for each of the 4 denial branches. + (None, _signed_auth_info("https://x/")), # registry miss + (None, None), # no credential + ( + BuyerAgent(agent_url="https://s/", display_name="S", status="suspended"), + _signed_auth_info("https://s/"), + ), + ( + BuyerAgent(agent_url="https://b/", display_name="B", status="blocked"), + _signed_auth_info("https://b/"), + ), + ] + for agent, auth in paths: + await _trigger_denial(executor=executor, agent=agent, audit_sink=collecting_sink, auth=auth) + + assert len(collecting_sink.events) == 4 + labels = {e.operation for e in collecting_sink.events} + assert labels == {AUDIT_OPERATION} + # Every event is a `success=False` row. + assert all(not e.success for e in collecting_sink.events) + + +@pytest.mark.asyncio +async def test_audit_emit_uniform_details_key_set( + executor, collecting_sink, short_budget_ms +) -> None: + """The audit row's ``details`` key set is uniform across paths. + + Key-set variance would let log-scraping detect the branch via + ``"reason_status" in details`` even with hashed discriminators. + Values may vary; the key set may not. + """ + paths = [ + (None, _signed_auth_info("https://x/")), + (None, None), + ( + BuyerAgent(agent_url="https://s/", display_name="S", status="suspended"), + _signed_auth_info("https://s/"), + ), + ( + BuyerAgent(agent_url="https://b/", display_name="B", status="blocked"), + _signed_auth_info("https://b/"), + ), + ] + for agent, auth in paths: + await _trigger_denial(executor=executor, agent=agent, audit_sink=collecting_sink, auth=auth) + + key_sets = {frozenset(e.details.keys()) for e in collecting_sink.events} + assert len(key_sets) == 1, f"key-set variance across paths: {key_sets}" + # Pin the expected shape — a future schema change shouldn't slip + # in unnoticed. + expected = frozenset({"outcome", "reason_scope", "reason_status", "agent_url_hash"}) + assert next(iter(key_sets)) == expected + + +@pytest.mark.asyncio +async def test_audit_emit_agent_url_is_hashed_not_plaintext( + executor, collecting_sink, short_budget_ms +) -> None: + """``agent_url_hash`` is the SHA-256 prefix, never the URL itself. + + Log-scraping defense — an operator with audit-read access can + correlate denials by hash without learning the URL. + """ + agent_url = "https://buyer.example.com/agent" + suspended = BuyerAgent(agent_url=agent_url, display_name="S", status="suspended") + + await _trigger_denial( + executor=executor, + agent=suspended, + audit_sink=collecting_sink, + auth=_signed_auth_info(agent_url), + ) + + [event] = collecting_sink.events + hashed = event.details["agent_url_hash"] + assert hashed == hash_discriminator(agent_url) + assert agent_url not in str(event.model_dump_json()) + + +@pytest.mark.asyncio +async def test_audit_emit_unrecognized_carries_none_hash_not_missing_key( + executor, collecting_sink, short_budget_ms +) -> None: + """On the unrecognized path the hash is ``None``, but the KEY is + still present. Key presence (not value) is the parity invariant — + a missing key would be a presence-discriminator. + """ + await _trigger_denial( + executor=executor, + agent=None, + audit_sink=collecting_sink, + auth=_signed_auth_info("https://x/"), + ) + [event] = collecting_sink.events + assert "agent_url_hash" in event.details + assert event.details["agent_url_hash"] is None + assert event.details["reason_scope"] is None + assert event.details["reason_status"] is None + + +# --------------------------------------------------------------------------- +# Header parity — Content-Type identical; Content-Length within tolerance +# --------------------------------------------------------------------------- + + +def _serialize_wire(err: AdcpError) -> bytes: + """Serialize an AdcpError envelope the way the transport layer would. + + JSON with no whitespace — matches the canonical wire form used by + both MCP and A2A transports. The Content-Length of this bytestream + is what an external attacker observes. + """ + return json.dumps(err.to_wire(), separators=(",", ":")).encode("utf-8") + + +@pytest.mark.asyncio +async def test_header_content_length_within_tolerance(executor, short_budget_ms) -> None: + """Content-Length variance between unrecognized and recognized is + bounded by the size of the ``details`` payload. + + Justification for the tolerance: the recognized-but-denied + envelope adds ``{"scope":"agent","status":"suspended","agent_url": + ""}`` to ``details``. With a typical ``agent_url`` (~30 + bytes) and the two fixed fields (~40 bytes), the recognized + envelope is ~80 bytes longer than the unrecognized envelope. We + pin tolerance at 200 bytes to absorb realistic ``agent_url`` + variance up to ~150 bytes, while still detecting an accidental + huge-payload regression (e.g., echoing the full request). + """ + err_unrecognized = await _trigger_denial( + executor=executor, agent=None, auth=_signed_auth_info("https://x/") + ) + err_suspended = await _trigger_denial( + executor=executor, + agent=BuyerAgent(agent_url="https://s/", display_name="S", status="suspended"), + auth=_signed_auth_info("https://s/"), + ) + err_blocked = await _trigger_denial( + executor=executor, + agent=BuyerAgent(agent_url="https://b/", display_name="B", status="blocked"), + auth=_signed_auth_info("https://b/"), + ) + + lengths = [ + len(_serialize_wire(err_unrecognized)), + len(_serialize_wire(err_suspended)), + len(_serialize_wire(err_blocked)), + ] + spread = max(lengths) - min(lengths) + # Tolerance documented in the function docstring above. + assert spread < 200, ( + f"Content-Length spread {spread} exceeds 200-byte tolerance: " + f"lengths={lengths}. A spread larger than the documented " + f"tolerance suggests a regression — e.g., the unrecognized " + f"path started leaking a discriminator into `details`, or a " + f"recognized path is echoing buyer-supplied free text." + ) + + +@pytest.mark.asyncio +async def test_unrecognized_envelopes_are_byte_equivalent(executor, short_budget_ms) -> None: + """Two unrecognized paths (registry miss, no credential) MUST + serialize to byte-identical wire envelopes. + + The intra-unrecognized parity is the strongest form of the + omit-on-unestablished-identity rule: an attacker who probes both + code paths sees indistinguishable responses. + """ + err_miss = await _trigger_denial( + executor=executor, agent=None, auth=_signed_auth_info("https://x/") + ) + err_no_cred = await _trigger_denial(executor=executor, agent=None, auth=None) + assert _serialize_wire(err_miss) == _serialize_wire(err_no_cred) + + +# --------------------------------------------------------------------------- +# Latency parity — p99(unrecognized) ~ p99(recognized) under the budget +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_latency_parity_p99_difference_under_budget(monkeypatch) -> None: + """p99 difference between unrecognized and recognized-suspended + paths < 5 ms over 200 iterations. + + Measures :func:`raise_permission_denied` directly rather than + routing through the full handler stack. The parity contract is + a property of ``raise_permission_denied`` — its deadline-relative + sleep is what dominates branch variance. Wrapping handler setup + into the measurement adds 5-15 ms of per-iteration jitter from + ``PlatformHandler`` construction (``inspect.signature``, + middleware composition) that varies with GC scheduling, not + with the denial branch. Including it would measure noise. + + The handler-level integration is already covered by: + + * :func:`test_status_code_parity_across_four_paths` — same code + and recovery across the four handler-routed branches. + * :func:`test_audit_emit_same_operation_label_across_paths` — + same audit row from the same handler path. + + So this test isolates the latency-budget guarantee: + ``raise_permission_denied`` with the same budget produces + indistinguishable wall-clock regardless of ``reason.scope`` / + ``reason.status`` / ``reason.agent_url``. + + Budget choice: 50 ms — matches the production default. With + ``raise_permission_denied`` isolated from handler overhead, this + budget dominates branch variance even on CI hardware. + + Iteration count: 200 per arm. The 198th-percentile sample is the + p99 quantile for n=200, which is robust to a single outlier. + Total wall-clock: 200 × 2 × 50 ms = 20 s. + """ + from adcp.decisioning.permission_denied import ( + PermissionDeniedError, + raise_permission_denied, + ) + + monkeypatch.setenv(BUDGET_ENV_VAR, "50") + + async def measure(reason: PermissionDeniedReason) -> list[float]: + latencies: list[float] = [] + for _ in range(200): + start = time.perf_counter() + try: + await raise_permission_denied(reason) + except PermissionDeniedError: + pass + latencies.append(time.perf_counter() - start) + return latencies + + unrecognized_reason = PermissionDeniedReason(scope=None, status=None, agent_url=None) + suspended_reason = PermissionDeniedReason( + scope="agent", status="suspended", agent_url="https://s/" + ) + unrecognized_lat = await measure(unrecognized_reason) + suspended_lat = await measure(suspended_reason) + + p99_unrecognized = statistics.quantiles(unrecognized_lat, n=100)[98] + p99_suspended = statistics.quantiles(suspended_lat, n=100)[98] + diff_ms = abs(p99_unrecognized - p99_suspended) * 1000.0 + + # 5 ms tolerance against a 50 ms budget — the budget dominates + # by 10×. A regression that re-introduces branch variance (e.g., + # a future change that emits audit only on recognized paths, + # or skips the budget sleep on the unrecognized path) would + # show up as a 5+ ms diff. + assert diff_ms < 5.0, ( + f"p99 latency parity violated: |unrecognized - suspended| = " + f"{diff_ms:.2f} ms (budget=50 ms, tolerance=5 ms). " + f"unrecognized_p99={p99_unrecognized * 1000:.2f} ms, " + f"suspended_p99={p99_suspended * 1000:.2f} ms" + ) + + +# --------------------------------------------------------------------------- +# Configuration knob — budget is env-tunable +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_budget_env_var_takes_effect_at_runtime(executor, monkeypatch) -> None: + """``ADCP_PERMISSION_DENIED_BUDGET_MS`` is read on every denial. + + Adopters tuning the budget during an incident response (e.g., + raising it because audit sink p99 went up) should NOT need to + restart the process. The translator function is the unit under + test rather than ``_resolve_buyer_agent`` so we avoid measuring + framework setup overhead in the assertion. + """ + from adcp.decisioning.permission_denied import get_latency_budget_seconds + + monkeypatch.setenv(BUDGET_ENV_VAR, "10") + assert get_latency_budget_seconds() == pytest.approx(0.010, rel=0.001) + monkeypatch.setenv(BUDGET_ENV_VAR, "100") + assert get_latency_budget_seconds() == pytest.approx(0.100, rel=0.001) + + +def test_budget_env_var_falls_back_on_malformed_value(monkeypatch) -> None: + """A malformed env var falls back to the default — the parity + contract is preserved even when an adopter typos the config. + """ + from adcp.decisioning.permission_denied import ( + DEFAULT_BUDGET_MS, + get_latency_budget_seconds, + ) + + monkeypatch.setenv(BUDGET_ENV_VAR, "not-a-number") + assert get_latency_budget_seconds() == pytest.approx(DEFAULT_BUDGET_MS / 1000.0) + + monkeypatch.setenv(BUDGET_ENV_VAR, "-10") + assert get_latency_budget_seconds() == pytest.approx(DEFAULT_BUDGET_MS / 1000.0) + + +def test_budget_env_var_unset_uses_default(monkeypatch) -> None: + from adcp.decisioning.permission_denied import ( + DEFAULT_BUDGET_MS, + get_latency_budget_seconds, + ) + + monkeypatch.delenv(BUDGET_ENV_VAR, raising=False) + assert get_latency_budget_seconds() == pytest.approx(DEFAULT_BUDGET_MS / 1000.0) + + +# --------------------------------------------------------------------------- +# Audit sink failure isolation — sink raises don't break the gate +# --------------------------------------------------------------------------- + + +class _RaisingSink: + """An :class:`AuditSink` that raises — simulates a broken audit + pipeline (DB connection error, schema mismatch). + """ + + def __init__(self) -> None: + self.record_calls = 0 + + async def record(self, event: AuditEvent) -> None: + self.record_calls += 1 + raise RuntimeError("audit sink intentionally broken for test") + + +@pytest.mark.asyncio +async def test_raising_sink_does_not_propagate(executor, short_budget_ms) -> None: + """A broken audit sink must not break the gate. + + Sink failures are swallowed inside ``_emit_denial_audit``. The + gate continues to its budget sleep and raises ``PERMISSION_DENIED`` + as if the sink had succeeded. This is the failure-isolation + contract: an attacker who can break the audit pipeline MUST NOT + be able to bypass the commercial-identity gate or distort its + timing. + """ + sink = _RaisingSink() + err = await _trigger_denial( + executor=executor, + agent=None, + audit_sink=sink, + auth=_signed_auth_info("https://x/"), + ) + assert err.code == "PERMISSION_DENIED" + assert sink.record_calls == 1, "sink was supposed to be called once per denial" diff --git a/tests/test_tier2_spec_conformance.py b/tests/test_tier2_spec_conformance.py index b28375b5b..36e23b349 100644 --- a/tests/test_tier2_spec_conformance.py +++ b/tests/test_tier2_spec_conformance.py @@ -27,12 +27,12 @@ accidentally re-introduce them via copy-paste. The latency / headers / side-effects parity contract between the -unrecognized-agent path and the recognized-but-denied path is tracked as -a separate follow-up (see issue #375 and the parity-contract follow-up -referenced in the PR body). This file pins the wire-shape conformance -only — the parity refactor needs a single emit point with deliberate -latency padding and identical audit/metric side-effects, which is a -larger dispatch-path refactor than fits in the rename PR. +unrecognized-agent path and the recognized-but-denied path lives in +:file:`tests/test_tier2_parity_contract.py` (issue #392, follow-up to +the wire-shape rename in #375). This file pins the wire-shape +conformance only — the structural / latency / audit-row parity +guarantees are in the parity-contract file. The two files together +pin the full Tier 2 commercial-identity gate contract. """ from __future__ import annotations