Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 178 additions & 13 deletions tests/conformance/test_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,144 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
)


_EMBEDDING_DEFER = (
"embedding capability (proposal 0059) unimplemented until v0.16.0; "
"no embedding event/provider to record from"
)


# Pinned observability fixtures NOT run by this YAML harness, each with an
# explicit reason. The coverage guard (test_observability_fixture_coverage_
# is_complete) fails on any pinned fixture absent from _SUPPORTED_FIXTURES +
# the three sets below, so a future unwired spec fixture cannot silently
# pytest.skip past CI.
#
# _DEFERRED_FIXTURES — not run because the capability is unimplemented.
_DEFERRED_FIXTURES: dict[str, str] = {
# Proposal 0045 (nested-lineage augmentation, v0.37.0) — engine
# + observer work lands in PR 11.
"039-nested-lineage-augmentation": ("Proposal 0045 not yet implemented (PR 11)"),
# Proposal 0067 (GenAI metrics, v0.68.0) — the embedding metrics
# fixture sources from an embedding call, but the embedding capability
# (proposal 0059, observability §5.5.8 / §5.5.9) is unimplemented in
# python until v0.16.0, so there is no embedding event or provider to
# record from. The LLM-path metric fixtures (088 / 090 / 091) run.
"089-embedding-metrics-token-and-duration": (
"Embedding capability (proposal 0059) unimplemented until v0.16.0"
# Proposal 0045 IS implemented (v0.11.0), but the nested-case Langfuse
# fixture stays deferred: it needs runtime-state item-list lookup for
# nested fan-outs plus an augment_metadata_from_outer_item directive
# the harness doesn't model yet.
"039-nested-lineage-augmentation": (
"nested-case Langfuse harness wiring not yet implemented (proposal 0045 nested fan-out)"
),
# Embedding observability (proposals 0059 / 0067 §11). The embedding
# capability is unshipped until v0.16.0; the LLM-path equivalents run.
**{
fixture_id: _EMBEDDING_DEFER
for fixture_id in (
"074-embedding-event-dispatch",
"075-embedding-failure-event-dispatch-on-provider-unavailable",
"076-embedding-event-mutual-exclusion",
"077-embedding-event-call-id-distinct",
"078-embedding-event-input-strings-populated",
"079-embedding-event-request-params-populated",
"080-embedding-event-input-count-and-dimensions-populated",
"081-embedding-event-active-prompt-populated",
"082-otel-embedding-span-attributes",
"083-langfuse-embedding-observation",
"089-embedding-metrics-token-and-duration",
)
},
}


# _UNIT_TESTED_FIXTURES — implemented behavior covered by the dedicated unit
# suite rather than wired into this YAML harness. Value names the proposal +
# the covering file.
_UNIT_TESTED_FIXTURES: dict[str, str] = {
fixture_id: reason
for fixture_ids, reason in (
(
("022-langfuse-basic-trace", "023-langfuse-generation-rendering", "024-langfuse-prompt-linkage"),
"proposal 0031 Langfuse mapping; covered by test_observability_langfuse.py",
),
(
(
"031-langfuse-subgraph-span-hierarchy",
"032-langfuse-fan-out-per-instance-spans",
"033-langfuse-detached-trace-mode",
),
"proposal 0035/0061 Langfuse span hierarchy; covered by test_observability_langfuse.py",
),
(
(
"027-langfuse-caller-supplied-metadata",
"029-caller-metadata-fan-out-per-instance",
"034-caller-metadata-open-span-update-serial",
),
"proposal 0034/0040 caller metadata; covered by test_observability_langfuse.py",
),
(
("030-caller-metadata-parallel-branches-per-branch",),
"proposal 0040 per-branch caller metadata; covered by test_observability_otel.py",
),
(
("035-caller-invocation-id-uuid", "036-caller-invocation-id-non-uuid"),
"proposal 0039 invocation_id derivation; covered by test_observability_langfuse_adapter.py",
),
(
("037-langfuse-trace-input-output",),
"proposal 0043 trace input/output; covered by test_observability_langfuse.py",
),
(
(
"043-get-invocation-metadata-roundtrip",
"044-get-invocation-metadata-fan-out-scoping",
"045-get-invocation-metadata-retry-scoping",
"046-get-invocation-metadata-outside-invocation",
),
"proposal 0048 get_invocation_metadata; covered by test_observability_metadata.py",
),
(
("059-implementation-attribution-langfuse",),
"proposal 0052 implementation attribution; covered by test_observability_langfuse.py",
),
(
(
"060-llm-completion-event-input-messages-populated",
"061-llm-completion-event-output-content-populated",
"062-llm-completion-event-request-params-populated",
"063-llm-completion-event-request-extras-populated",
"064-llm-completion-event-active-prompt-populated",
"066-llm-completion-event-active-prompt-group-populated",
"067-llm-completion-event-call-id-always-present-and-distinct",
"068-llm-completion-event-response-model-distinct-from-request",
),
"proposal 0057 LlmCompletionEvent fields; covered by test_llm_provider.py",
),
(
("065-llm-completion-event-active-prompt-null",),
"proposal 0057 active_prompt null case; covered by test_observability_otel.py",
),
(
(
"069-llm-failure-event-dispatch-on-provider-unavailable",
"070-llm-failure-event-dispatch-on-provider-invalid-request",
"071-llm-failure-event-call-id-distinct-from-completion-event",
"072-llm-failure-event-mutual-exclusion-with-completion-event",
"073-llm-failure-event-error-type-vendor-specific",
),
"proposal 0058 LlmFailedEvent; covered by test_llm_provider.py",
),
)
for fixture_id in fixture_ids
}


# _CONVENTION_ONLY_FIXTURES — proposal 0048 §9 queryable-observer pattern is
# convention-only (no new abstract surface on Observer), satisfied via
# docs/concepts/observability.md, so there is no library API to assert.
_CONVENTION_ONLY_FIXTURES: dict[str, str] = {
fixture_id: (
"proposal 0048 §9 queryable-observer pattern is convention-only "
"(no library surface); satisfied by docs/concepts/observability.md"
)
for fixture_id in (
"047-queryable-observer-pattern",
"048-queryable-observer-async-safety",
"049-queryable-observer-lifecycle-drop",
)
}


Expand All @@ -224,6 +350,37 @@ def _load(path: Path) -> dict[str, Any]:
return cast("dict[str, Any]", yaml.safe_load(f))


def test_observability_fixture_coverage_is_complete() -> None:
# Fail-on-unknown guard. Every pinned observability conformance fixture
# MUST be either run (_SUPPORTED_FIXTURES) or explicitly accounted for:
# _DEFERRED_FIXTURES (future capability), _UNIT_TESTED_FIXTURES (covered
# by the unit suite, not this YAML harness), or _CONVENTION_ONLY_FIXTURES
# (doc-satisfied, no library surface). A new spec fixture that is none of
# these fails HERE rather than silently pytest.skip-ping past CI.
all_ids = {p.stem for p in _fixture_paths()}
accounted = (
set(_SUPPORTED_FIXTURES)
| _DEFERRED_FIXTURES.keys()
| _UNIT_TESTED_FIXTURES.keys()
| _CONVENTION_ONLY_FIXTURES.keys()
)
unaccounted = sorted(all_ids - accounted)
assert not unaccounted, (
"unaccounted observability conformance fixtures: wire each into "
"_SUPPORTED_FIXTURES once it runs, or document it in _DEFERRED_FIXTURES "
"(future capability) / _UNIT_TESTED_FIXTURES (covered by the unit suite) "
f"/ _CONVENTION_ONLY_FIXTURES (doc-satisfied): {unaccounted}"
)
# An accounting entry whose fixture no longer exists on disk (renamed at
# a pin bump) should be removed.
stale = sorted(accounted - all_ids)
assert not stale, f"accounting entries with no fixture file (remove): {stale}"
# A fixture cannot be both run and documented-as-not-run.
not_run = _DEFERRED_FIXTURES.keys() | _UNIT_TESTED_FIXTURES.keys() | _CONVENTION_ONLY_FIXTURES.keys()
overlap = sorted(set(_SUPPORTED_FIXTURES) & not_run)
assert not overlap, f"fixtures both run and documented-as-not-run (pick one): {overlap}"


# ---------------------------------------------------------------------------
# Per-fixture dispatcher
# ---------------------------------------------------------------------------
Expand All @@ -232,10 +389,18 @@ def _load(path: Path) -> dict[str, Any]:
@pytest.mark.parametrize("fixture_path", _fixture_paths(), ids=_fixture_id)
async def test_observability_fixture(fixture_path: Path) -> None:
fixture_id = fixture_path.stem
if fixture_id in _DEFERRED_FIXTURES:
pytest.skip(f"{fixture_id}: {_DEFERRED_FIXTURES[fixture_id]}")
skip_reason = (
_DEFERRED_FIXTURES.get(fixture_id)
or _UNIT_TESTED_FIXTURES.get(fixture_id)
or _CONVENTION_ONLY_FIXTURES.get(fixture_id)
)
if skip_reason is not None:
pytest.skip(f"{fixture_id}: {skip_reason}")
if fixture_id not in _SUPPORTED_FIXTURES:
pytest.skip(f"{fixture_id}: harness wiring not yet implemented")
# Unaccounted: neither wired nor documented. The coverage guard
# (test_observability_fixture_coverage_is_complete) fails loudly
# listing every such fixture; the individual case skips here.
pytest.skip(f"{fixture_id}: unaccounted -- see the coverage guard")

spec = _load(fixture_path)
if fixture_id == "001-otel-basic-trace":
Expand Down
80 changes: 80 additions & 0 deletions tests/unit/test_llm_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -1782,6 +1782,86 @@ def _handler(_req: httpx.Request) -> httpx.Response:
assert typed.finish_reason == "tool_calls"


async def test_llm_completion_event_active_prompt_populated_from_context() -> None:
# Proposal 0057 active_prompt: complete() invoked inside a
# with_active_prompt block stamps the active PromptResult onto the
# typed event (the provider reads current_prompt_result()). Covers
# conformance fixture 064 -- the populated record on the EVENT, not
# just the observer's span rendering of an injected field.
from datetime import UTC, datetime

from openarmature.prompts import PromptResult, with_active_prompt

now = datetime.now(UTC)
pr = PromptResult(
name="greeting",
version="1",
label="production",
template_hash="sha256:tmpl",
rendered_hash="sha256:rendered",
messages=[UserMessage(content="hi")],
variables={"user": "Alice"},
fetched_at=now,
rendered_at=now,
)
events, token = _collecting_dispatch()
transport = _make_openai_response_with_usage(
{"prompt_tokens": 1, "completion_tokens": 1, "total_tokens": 2}
)
provider = OpenAIProvider(base_url="http://test", model="m", api_key="k", transport=transport)
try:
with with_active_prompt(pr):
await provider.complete([UserMessage(content="hi")])
finally:
await provider.aclose()
_release_dispatch(token)

typed = next(e for e in events if isinstance(e, LlmCompletionEvent))
assert typed.active_prompt == pr


async def test_llm_completion_event_active_prompt_group_populated_from_context() -> None:
# Proposal 0057 active_prompt_group: complete() inside a
# with_active_prompt_group block stamps the active PromptGroup onto
# the typed event (the provider reads current_prompt_group()). Covers
# conformance fixture 066.
from datetime import UTC, datetime

from openarmature.prompts import PromptGroup, PromptResult, with_active_prompt_group

now = datetime.now(UTC)

def _pr(name: str) -> PromptResult:
return PromptResult(
name=name,
version="1",
label="production",
template_hash="sha256:tmpl",
rendered_hash="sha256:rendered",
messages=[UserMessage(content="hi")],
variables={"user": "Alice"},
fetched_at=now,
rendered_at=now,
)

# PromptGroup requires N>=2 members.
group = PromptGroup(group_name="greetings", members=[_pr("greeting"), _pr("farewell")])
events, token = _collecting_dispatch()
transport = _make_openai_response_with_usage(
{"prompt_tokens": 1, "completion_tokens": 1, "total_tokens": 2}
)
provider = OpenAIProvider(base_url="http://test", model="m", api_key="k", transport=transport)
try:
with with_active_prompt_group(group):
await provider.complete([UserMessage(content="hi")])
finally:
await provider.aclose()
_release_dispatch(token)

typed = next(e for e in events if isinstance(e, LlmCompletionEvent))
assert typed.active_prompt_group == group


async def test_llm_completion_event_request_params_only_carries_supplied_keys() -> None:
# Proposal 0057 request_params shape: absence-is-meaningful. Only
# caller-supplied gen_ai.request.* keys appear; unset RuntimeConfig
Expand Down