diff --git a/CHANGELOG.md b/CHANGELOG.md index 2690200..02f881f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,10 +15,11 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The - **Failure-isolation `catch` gate + cause-chain classification primitive** (proposal 0074, pipeline-utilities §6.3 / §6.4, spec v0.65.0). `FailureIsolationMiddleware` gains an optional `catch`: a set of error categories. An exception is caught only if the *derived category* of its cause chain (the outermost non-carrier link's category, resolved through the engine's `node_exception` carriers, the same value reported as `caught_exception.category`) is in the set. This closes a degrade-into-crash footgun: at a wrapping placement (subgraph, fan-out instance, branch) the engine wraps the originating failure in a carrier, so a `predicate` inspecting the surface exception sees only the carrier and misses it, whereas `catch` classifies through the carrier. `catch` composes with `predicate` as a conjunction; both default permissive (both unset stays catch-all), and a null derived category never matches a non-empty set. The carrier-skipping walk behind `catch` and `caught_exception` is promoted to a public primitive, `classify_cause_chain(exc) -> CaughtException` (the ordered `chain`, the derived `category`, and its `message` — the same record the event carries), exported from `openarmature.graph` for use in a custom `predicate`, a router, a metric, or a full-chain retry classifier. The default retry classifier stays deliberately single-level (it classifies at re-attempt granularity); this is now documented, with no behavior change. Conformance fixture 072 (catch matches through an instance-placement carrier and degrades; a non-matching catch propagates with no event). The optional native-exception-type `catch` form (spec MAY) is not shipped. - **Inline-callable parallel branches and conditional `when`** (proposal 0075, pipeline-utilities §11, spec v0.66.0). `ParallelBranchesNode` gains two additive branch forms. A branch may now give its work as `call`, an inline async function over the parent state returning a parent-shaped partial update, instead of a compiled `subgraph` with its own state schema and `inputs` / `outputs` projection; the returned partial is the branch's contribution directly, merged via the parent reducer with no projection. This makes the primitive adoptable for the "M heterogeneous lightweight parallel calls over shared state, each independently failure-isolated" shape (hybrid recall, paired reads) that previously dropped to a hand-rolled gather, while reusing the existing concurrency, fail-fast cancellation, per-branch failure isolation, and reducer fan-in. A branch gives its work as exactly one of `subgraph` / `call`, and a callable branch declares no `inputs` / `outputs`, else a new compile-time `ParallelBranchesInvalidBranchSpec`; a node may mix the two forms freely. A branch (either form) may also carry an optional `when` predicate over the parent state, evaluated once at dispatch: a `False` result skips the branch entirely (no dispatch, contribution, observer events, or span), and an all-skipped node is a valid no-op distinct from the compile-time `ParallelBranchesNoBranches`. A callable branch is the unit of work, so it emits one `started` / `completed` observer pair keyed by `branch_name` (rendered as a single branch span); a skipped branch emits nothing. `ParallelBranchesInvalidBranchSpec` is exported from `openarmature.graph`. Conformance fixtures 073 (two callable branches merge to disjoint fields), 074 (conditional `when` skips / dispatches), and 075 (callable branch failure-isolation degrade) run in `test_pipeline_utilities`. - **Tool-call request observability on LLM spans** (proposal 0076, observability §5.5.1 / §5.5.10 / §5.5.5, spec v0.67.0). The tool calls a model requests in its completion now have an output-side home on the `openarmature.llm.complete` span, closing the gap where they surfaced only incidentally on the next turn's input history. *Which* tools were requested renders by default as three ungated identity projections (the class of `openarmature.llm.model`): `openarmature.llm.output.tool_calls.count`, `.names`, and `.ids`, with `.names` and `.ids` index-aligned in request order and `.count` equal to their length. The full request, arguments included, renders as the payload-gated `openarmature.llm.output.tool_calls`, a JSON `[{id, name, arguments}]` array reusing the input tool-call encoding, surfaced only with `disable_provider_payload=False`. The whole family is emitted only on a tool-calling completion; a completion that requests no tools emits none of it (absence, not `count = 0`). The typed `LlmCompletionEvent` gains an additive `output_tool_calls` field carrying the `ToolCall` records, the source the span attributes render from (in python the OTel span renders from the per-attempt `LlmRetryAttemptEvent`, which carries the field too). This is the request side; the tool-execution complement (a separate `openarmature.tool.call` span) is a later proposal, joined to this one by the `ToolCall.id`. A Langfuse request-side mapping is out of scope. Conformance fixtures 085 (two requested calls surface count / names / ids), 086 (no calls, family absent), and 087 (payload gating: identity survives payload-off while the full serialization is suppressed) run in `test_observability`. +- **OTel GenAI metrics** (proposal 0067, observability §11, spec v0.68.0). The OTel observer can now emit the OpenTelemetry metrics signal alongside its spans: two histogram instruments over provider calls, opt in with `enable_metrics=True` (default off, independent of span emission). `openarmature.gen_ai.client.token.usage` records an LLM completion's input and output token counts (one observation each, tagged `openarmature.gen_ai.token.type`); `openarmature.gen_ai.client.operation.duration` records the call's wall-clock duration, once per attempt under call-level retry, including a failed attempt (which carries `error.type`). Both carry `openarmature.gen_ai.operation` (`"chat"`), `gen_ai.request.model`, and `gen_ai.system`, and use the spec's explicit bucket advisories. The `Meter` comes from the configured `MeterProvider` (injectable via `meter_provider=...`; the OTel global is the no-op fallback when none is set). The instrument names are OA-namespaced, mirroring the upstream `gen_ai.client.*` instruments (at Development status) so a future cutover is a mechanical prefix-strip; metrics target OTel only (no Langfuse mapping). They are a projection of the per-attempt event stream, so they record with spans disabled. `conformance.toml` records proposal 0067 `partial`: the LLM-call metrics (fixtures 088 / 090 / 091) are implemented, and the embedding-call metrics (fixture 089) are deferred until the embedding capability (proposal 0059) lands. The LLM fixtures run in `test_observability` via an in-memory `MetricReader` capture (the conformance-adapter §6.9 primitive). ### Changed -- **Pinned spec advances v0.60.0 → v0.67.0** across the v0.15.0 cycle: v0.61.0 (proposal 0061, the detached-trace invocation span above), v0.62.0 (proposal 0064, the Langfuse session/user population above), v0.63.0 (proposal 0072, the prompt cache control above), the v0.63.1 patch (pipeline-utilities coverage fixtures 070/071 for the already-implemented 0069 / 0070 behavior, no new proposal), and v0.64.0 (proposal 0073, GenAI semconv adoption reconciliation: OA retains `gen_ai.system` despite the upstream rename to `gen_ai.provider.name`; textual-only, with no emitted-attribute or fixture change, so the existing `gen_ai.*` fixtures stand as the retention regression), v0.65.0 (proposal 0074, the failure-isolation `catch` gate above), v0.66.0 (proposal 0075, the inline-callable parallel branches and conditional `when` above), the v0.66.1 patch (an observability §8 call-level-retry Langfuse-mapping clarification reconciling §8 with the per-attempt §5.5 spans: one terminal Generation per `complete()` call, not one per attempt, which the Langfuse observer already renders by driving the Generation from the terminal `LlmCompletionEvent` / `LlmFailedEvent` and skipping the per-attempt `LlmRetryAttemptEvent`; no behavior or fixture change), and v0.67.0 (proposal 0076, the tool-call request observability above). `conformance.toml` records 0061 / 0072 / 0074 / 0075 / 0076 `implemented`, 0064 `partial` (its `sessionId` half is dormant pending the sessions capability), and 0073 `textual-only`. Proposal 0050 needed no pin bump of its own (it was already within the pin from its v0.42.0 acceptance); its v0.14.0 `partial` entry flips to `implemented` with the per-attempt span surface above. +- **Pinned spec advances v0.60.0 → v0.68.0** across the v0.15.0 cycle: v0.61.0 (proposal 0061, the detached-trace invocation span above), v0.62.0 (proposal 0064, the Langfuse session/user population above), v0.63.0 (proposal 0072, the prompt cache control above), the v0.63.1 patch (pipeline-utilities coverage fixtures 070/071 for the already-implemented 0069 / 0070 behavior, no new proposal), and v0.64.0 (proposal 0073, GenAI semconv adoption reconciliation: OA retains `gen_ai.system` despite the upstream rename to `gen_ai.provider.name`; textual-only, with no emitted-attribute or fixture change, so the existing `gen_ai.*` fixtures stand as the retention regression), v0.65.0 (proposal 0074, the failure-isolation `catch` gate above), v0.66.0 (proposal 0075, the inline-callable parallel branches and conditional `when` above), the v0.66.1 patch (an observability §8 call-level-retry Langfuse-mapping clarification reconciling §8 with the per-attempt §5.5 spans: one terminal Generation per `complete()` call, not one per attempt, which the Langfuse observer already renders by driving the Generation from the terminal `LlmCompletionEvent` / `LlmFailedEvent` and skipping the per-attempt `LlmRetryAttemptEvent`; no behavior or fixture change), v0.67.0 (proposal 0076, the tool-call request observability above), and v0.68.0 (proposal 0067, the OTel GenAI metrics above). `conformance.toml` records 0061 / 0072 / 0074 / 0075 / 0076 `implemented`, 0064 `partial` (its `sessionId` half is dormant pending the sessions capability) and 0067 `partial` (its embedding-call metrics await the embedding capability), and 0073 `textual-only`. Proposal 0050 needed no pin bump of its own (it was already within the pin from its v0.42.0 acceptance); its v0.14.0 `partial` entry flips to `implemented` with the per-attempt span surface above. ## [0.14.0] — 2026-06-17 diff --git a/conformance.toml b/conformance.toml index c536ba9..38cc8f2 100644 --- a/conformance.toml +++ b/conformance.toml @@ -32,7 +32,7 @@ [manifest] implementation = "openarmature-python" -spec_pin = "v0.67.0" +spec_pin = "v0.68.0" # Status values: # implemented — shipped behavior matches the proposal's contract @@ -640,6 +640,13 @@ since = "0.14.0" status = "implemented" since = "0.14.0" +# Spec v0.68.0 (proposal 0067). OTel GenAI metrics (observability §11 new +# Metrics section + conformance-adapter §6.9 metric-capture primitive). +[proposals."0067"] +status = "partial" +since = "0.15.0" +note = "OTel GenAI metrics (observability §11): an opt-in enable_metrics flag (default off, normative name) on the bundled OTelObserver, independent of span emission (§11.1). When on, two OA-namespaced histograms record per provider-call ATTEMPT from the python-internal LlmRetryAttemptEvent (the per-attempt LLM-span source since 0050): openarmature.gen_ai.client.token.usage ({token}; two observations -- input + output token counts from the response usage record, openarmature.gen_ai.token.type dim) and openarmature.gen_ai.client.operation.duration (s; once per attempt INCLUDING failed attempts, error.type dim on failure), both configured with the §11.2 explicit bucket advisories. Dimensions: openarmature.gen_ai.operation ('chat'), gen_ai.request.model + gen_ai.system (recognized-core, used directly), openarmature.gen_ai.token.type, error.type. The Meter comes from the configured MeterProvider (injectable; falls back to the OTel global, which is the no-op meter when none is set). PARTIAL: the embedding-call metrics (the §11 embedding path, fixture 089) are deferred -- the embedding capability (proposal 0059, observability §5.5.8 / §5.5.9) is unimplemented in python until v0.16.0, so there is no embedding event/provider to record from. The LLM path (fixtures 088 / 090 / 091) is implemented and wired via a private MeterProvider + InMemoryMetricReader (the §6.9 metric-capture primitive). No Langfuse change (metrics are OTel-only). Streaming / server / rerank metrics + the cutover to the upstream gen_ai.client.* instrument names are out of scope per the proposal." + # Spec v0.57.0 (proposal 0068). Failure-isolation event structured cause # chain (pipeline-utilities §6.3). ``caught_exception`` gains a ``chain`` of # cause links (``{category, message, carrier}``, outermost->innermost), with diff --git a/docs/concepts/observability.md b/docs/concepts/observability.md index 0df2254..b5521eb 100644 --- a/docs/concepts/observability.md +++ b/docs/concepts/observability.md @@ -828,6 +828,49 @@ form, so custom observers consuming cannot accidentally leak raw bytes regardless of how they're written. +### GenAI metrics (`enable_metrics`) + +Spans answer "what happened on this one call"; metrics answer "what is +the token throughput and latency across all calls". The OTel observer +can emit two histogram instruments over provider calls. Opt in with +`enable_metrics=True` (default off): + +```python +observer = OTelObserver( + span_processor=SimpleSpanProcessor(exporter), + enable_metrics=True, +) +``` + +When enabled, the observer obtains a `Meter` from the configured +`MeterProvider`. Pass `meter_provider=...` to use a private one; +otherwise it falls back to the OTel global, and recording is a silent +no-op when no provider is configured. The two instruments: + +- `openarmature.gen_ai.client.token.usage` (unit `{token}`). Per LLM + completion it records two observations: the input-token count, tagged + `openarmature.gen_ai.token.type="input"`, and the output-token count, + tagged `"output"`, sourced from the response usage record. Recorded + only when the call returned usage. +- `openarmature.gen_ai.client.operation.duration` (unit `s`). The + provider-call wall-clock duration, one observation per attempt. A + failed attempt records too, carrying `error.type`. + +Both carry `openarmature.gen_ai.operation` (`"chat"`), +`gen_ai.request.model`, and `gen_ai.system`. Under call-level retry the +duration instrument records once per attempt; the token instrument +records only for attempts that returned usage. + +**Metrics are independent of spans.** `enable_metrics` is orthogonal to +the `disable_llm_spans` / `disable_provider_payload` flags: you can +record metrics with spans off, or emit spans with metrics off. Both draw +from the same event stream. + +The instrument names are OA-namespaced, mirroring the upstream +`gen_ai.client.*` instruments (still at Development status), so a future +cutover is a mechanical prefix-strip. Metrics target OTel only; there is +no Langfuse mapping. + ### Identifying the service: `Resource` Pass an `opentelemetry.sdk.resources.Resource` to set diff --git a/openarmature-spec b/openarmature-spec index f68c64a..b5804d6 160000 --- a/openarmature-spec +++ b/openarmature-spec @@ -1 +1 @@ -Subproject commit f68c64a19b44461708b9310a00012771e70e279b +Subproject commit b5804d6cdf36f9ba42e920de34e1347e0075bcd5 diff --git a/pyproject.toml b/pyproject.toml index ec0a334..a264151 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,7 +63,7 @@ Specification = "https://github.com/LunarCommand/openarmature-spec" openarmature = "openarmature.cli:main" [tool.openarmature] -spec_version = "0.67.0" +spec_version = "0.68.0" [dependency-groups] dev = [ diff --git a/src/openarmature/AGENTS.md b/src/openarmature/AGENTS.md index 4621a0e..c14ed17 100644 --- a/src/openarmature/AGENTS.md +++ b/src/openarmature/AGENTS.md @@ -1,6 +1,6 @@ # OpenArmature — Agent documentation -*This is the agent guide bundled with the openarmature Python package, version 0.14.0 (spec v0.67.0). For the full docs site see [openarmature.ai](https://openarmature.ai). For the canonical spec text see [openarmature.org/capabilities](https://openarmature.org/capabilities/). For project-specific conventions for the code you're editing, see the host project's `AGENTS.md` or `CLAUDE.md`.* +*This is the agent guide bundled with the openarmature Python package, version 0.14.0 (spec v0.68.0). For the full docs site see [openarmature.ai](https://openarmature.ai). For the canonical spec text see [openarmature.org/capabilities](https://openarmature.org/capabilities/). For project-specific conventions for the code you're editing, see the host project's `AGENTS.md` or `CLAUDE.md`.* ## TL;DR @@ -10,7 +10,7 @@ OpenArmature is a workflow framework for LLM pipelines and tool-calling agents: ## Capability contracts -_Sourced from openarmature-spec v0.67.0. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's `spec.md` verbatim — including additions from accepted proposals that this Python implementation may not yet ship. For per-proposal implementation status (implemented / partial / textual-only / not-yet), see the `conformance.toml` manifest at the repo root. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site._ +_Sourced from openarmature-spec v0.68.0. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's `spec.md` verbatim — including additions from accepted proposals that this Python implementation may not yet ship. For per-proposal implementation status (implemented / partial / textual-only / not-yet), see the `conformance.toml` manifest at the repo root. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site._ ### Capability: `graph-engine` diff --git a/src/openarmature/__init__.py b/src/openarmature/__init__.py index 276de75..7188d32 100644 --- a/src/openarmature/__init__.py +++ b/src/openarmature/__init__.py @@ -25,7 +25,7 @@ """ __version__ = "0.14.0" -__spec_version__ = "0.67.0" +__spec_version__ = "0.68.0" # Proposal 0052 (spec observability §5.1 / §8.4.1): canonical # package-registry name for this implementation. Surfaces on every # OTel invocation span as ``openarmature.implementation.name`` and on diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index 7fd5e6b..94a3ee6 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -81,7 +81,9 @@ from typing import Any, cast from opentelemetry import context as otel_context +from opentelemetry import metrics as otel_metrics from opentelemetry import trace as otel_trace +from opentelemetry.metrics import Histogram, Meter, MeterProvider from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import SpanProcessor, TracerProvider from opentelemetry.sdk.trace.id_generator import RandomIdGenerator @@ -142,6 +144,44 @@ _PAYLOAD_DEFAULT_BYTES = 65536 +# §11.2 (proposal 0067) explicit bucket-boundary advisories for the two +# GenAI metric histograms, mirroring the upstream gen_ai.client.* bucket +# advisories so a future stable cutover is a mechanical prefix-strip. +# Token usage in {token}; operation duration in seconds. +_TOKEN_USAGE_BUCKETS: list[float] = [ + 1, + 4, + 16, + 64, + 256, + 1024, + 4096, + 16384, + 65536, + 262144, + 1048576, + 4194304, + 16777216, + 67108864, +] +_DURATION_BUCKETS: list[float] = [ + 0.01, + 0.02, + 0.04, + 0.08, + 0.16, + 0.32, + 0.64, + 1.28, + 2.56, + 5.12, + 10.24, + 20.48, + 40.96, + 81.92, +] + + def _read_spec_version() -> str: """Read the spec version pinned at package level. Lazy import avoids a circular at module-load time (the package's ``__init__`` @@ -469,6 +509,21 @@ class OTelObserver: # LLM-aware backends (Langfuse, Phoenix, Honeycomb's LLM lens) # render correctly out of the box, which keys off gen_ai.*. disable_genai_semconv: bool = False + # Proposal 0067 (observability §11): opt-in GenAI metrics. Default + # off; the flag name is normative. When True, __post_init__ creates + # the two histogram instruments and provider-call events record token + # + duration measurements. Independent of span emission (§11.1) — the + # disable_llm_spans / disable_provider_payload / disable_genai_semconv + # flags govern spans only. + enable_metrics: bool = False + # The MeterProvider the metric instruments are obtained from. None + # (default) falls back to the configured OTel global MeterProvider, + # which is the no-op meter when none is set up (§11.1: recording is a + # silent no-op, never raises). Injectable so tests (and callers who + # keep a private MeterProvider) can capture measurements without + # mutating the OTel global. Unlike the span side's private + # TracerProvider, metrics use the *configured* provider per §11.1. + meter_provider: MeterProvider | None = None # Per-attribute byte cap for the §5.5.1 payload attributes. Default # 64 KiB; minimum 256 bytes (§5.5.5), validated in __post_init__. payload_max_bytes: int = _PAYLOAD_DEFAULT_BYTES @@ -498,6 +553,11 @@ class OTelObserver: # Internal state, populated in __post_init__ and during invocation. _provider: TracerProvider = field(init=False, repr=False) _tracer: otel_trace.Tracer = field(init=False, repr=False) + # §11 metric instruments — created in __post_init__ only when + # enable_metrics is True; None otherwise (and recording is skipped). + _meter: Meter | None = field(init=False, repr=False, default=None) + _token_histogram: Histogram | None = field(init=False, repr=False, default=None) + _duration_histogram: Histogram | None = field(init=False, repr=False, default=None) # Per-invocation_id span state — concurrent invocations on a # shared observer each get their own ``_InvState`` so internal # maps never collide. @@ -539,6 +599,26 @@ def __post_init__(self) -> None: for proc in processors: self._provider.add_span_processor(proc) self._tracer = self._provider.get_tracer("openarmature") + # §11.1 metrics: opt-in. When enabled, obtain a Meter from the + # configured MeterProvider (injected, else the OTel global, which + # is the no-op meter when none is set up) and create the two + # histograms with the §11.2 explicit bucket advisories. When + # disabled, no instrument is created and nothing records. + if self.enable_metrics: + meter_provider = ( + self.meter_provider if self.meter_provider is not None else otel_metrics.get_meter_provider() + ) + self._meter = meter_provider.get_meter("openarmature") + self._token_histogram = self._meter.create_histogram( + "openarmature.gen_ai.client.token.usage", + unit="{token}", + explicit_bucket_boundaries_advisory=_TOKEN_USAGE_BUCKETS, + ) + self._duration_histogram = self._meter.create_histogram( + "openarmature.gen_ai.client.operation.duration", + unit="s", + explicit_bucket_boundaries_advisory=_DURATION_BUCKETS, + ) # ------------------------------------------------------------------ # Enricher invocation (friction-roundup #7p2) @@ -612,6 +692,10 @@ async def __call__( # LlmRetryAttemptEvent — one span per attempt under call-level # retry (attempt_index 0..N-1), one for a no-retry call. if isinstance(event, LlmRetryAttemptEvent): + # §11 metrics record per attempt, independent of span emission + # (§11.1) — so this runs regardless of disable_llm_spans. + if self.enable_metrics: + self._record_llm_metrics(event) if not self.disable_llm_spans: self._handle_typed_llm_retry_attempt(event) return @@ -1214,6 +1298,51 @@ def _emit_checkpoint_save_span(self, event: NodeEvent) -> None: # active_prompt_group snapshots taken at dispatch time — NOT the # ContextVar. The dispatch worker's task-local Context doesn't see # node-body ContextVar writes. + def _record_llm_metrics(self, event: LlmRetryAttemptEvent) -> None: + """Record the §11 GenAI metric observations for one LLM-call + attempt. + + Duration is recorded for every attempt (including a failed one, + carrying ``error.type``); token usage only when the attempt + returned a usage record (a failed attempt has none). Sourced from + the per-attempt ``LlmRetryAttemptEvent`` — one duration sample per + attempt under call-level retry, matching the per-attempt span + model. The attempt index is deliberately not a dimension (§11.2 + cardinality). + """ + if self._duration_histogram is None or self._token_histogram is None: + return + # §11.3 dimensions shared by both instruments. operation is "chat" + # for an LLM completion; gen_ai.request.model / gen_ai.system are + # the recognized-core de-facto-standard keys used directly. + base_dims: dict[str, str] = { + "openarmature.gen_ai.operation": "chat", + "gen_ai.request.model": event.model, + "gen_ai.system": event.provider, + } + # Duration (seconds): every attempt. error.type carries the §7 + # category on a failed attempt; absent on success. + if event.latency_ms is not None: + duration_dims = dict(base_dims) + if event.error_category is not None: + duration_dims["error.type"] = event.error_category + self._duration_histogram.record(event.latency_ms / 1000.0, duration_dims) + # Token usage: only when a usage record is present (a failed + # attempt has none). Two observations for an LLM completion — + # input + output — each only when its count was reported. + usage = event.usage + if usage is not None: + if usage.prompt_tokens is not None: + self._token_histogram.record( + usage.prompt_tokens, + {**base_dims, "openarmature.gen_ai.token.type": "input"}, + ) + if usage.completion_tokens is not None: + self._token_histogram.record( + usage.completion_tokens, + {**base_dims, "openarmature.gen_ai.token.type": "output"}, + ) + def _handle_typed_llm_retry_attempt(self, event: LlmRetryAttemptEvent) -> None: """Open + close one ``openarmature.llm.complete`` span from a per-attempt LlmRetryAttemptEvent. diff --git a/tests/conformance/harness/directives.py b/tests/conformance/harness/directives.py index d504614..1d7d82b 100644 --- a/tests/conformance/harness/directives.py +++ b/tests/conformance/harness/directives.py @@ -375,6 +375,18 @@ class CallsLlmSpec(_AllowExtras): config: RuntimeConfigSpec | None = None +class CallsEmbedSpec(_AllowExtras): + """Embedding-using node: sends ``input`` to a mock embedding provider + and stores the result in ``stores_response_in``. Used by the + GenAI-metrics embedding fixture (089). The embedding capability + (proposal 0059) is unimplemented in python until v0.16.0, so 089 is + deferred at the runner; this directive is modelled here only so the + fixture parses + round-trips through the harness schema.""" + + input: list[str] + stores_response_in: str + + class EmitsLogSpec(_AllowExtras): """Additive companion: the node emits a log record alongside its state update. Verified by observability fixture 010 (Logs Bridge).""" @@ -408,6 +420,8 @@ class NodeSpec(_ForbidExtras): - ``flaky`` and the four ``flaky_*`` variants — harness mocks for retry/checkpoint behaviours. - ``calls_llm`` — see :class:`CallsLlmSpec`. + - ``calls_embed`` — see :class:`CallsEmbedSpec` (embedding fixture 089, + deferred at the runner; modelled so the fixture parses). Companion modifiers (additive, may combine with most primaries): @@ -432,6 +446,7 @@ class NodeSpec(_ForbidExtras): flaky_instance_only: FlakyInstanceOnlySpec | None = None flaky_resume_aware: FlakyResumeAwareSpec | None = None calls_llm: CallsLlmSpec | None = None + calls_embed: CallsEmbedSpec | None = None # Companions — additive. inputs: dict[str, str] | None = None @@ -464,6 +479,7 @@ class NodeSpec(_ForbidExtras): "flaky_instance_only", "flaky_resume_aware", "calls_llm", + "calls_embed", ) @model_validator(mode="after") diff --git a/tests/conformance/harness/expectations.py b/tests/conformance/harness/expectations.py index fb0cec4..3a5f040 100644 --- a/tests/conformance/harness/expectations.py +++ b/tests/conformance/harness/expectations.py @@ -202,6 +202,10 @@ class ObservabilityExpected(_ForbidExtras): # multi-invocation grouping case's per-trace list. langfuse_trace: dict[str, Any] | None = None langfuse_traces: list[dict[str, Any]] | None = None + # GenAI metrics (proposal 0067, fixtures 088-091): the expected metric + # observations, each ``{instrument, dimensions, value?}``. An empty + # list asserts that no measurements were recorded (fixture 091). + metrics: list[dict[str, Any]] | None = None # --------------------------------------------------------------------------- @@ -248,6 +252,8 @@ class ObservabilityExpected(_ForbidExtras): # proposal 0064 (fixture 084) Langfuse Trace-level expectations "langfuse_trace", "langfuse_traces", + # proposal 0067 (fixtures 088-091) GenAI metric observations. + "metrics", } ) diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index b4d8f01..a454241 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -166,6 +166,14 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: "085-llm-tool-call-request-attributes", "086-llm-tool-call-request-absent", "087-llm-tool-call-request-survives-payload-gating", + # v0.68.0 — proposal 0067 (GenAI metrics, observability §11). The + # LLM-path fixtures: token + duration histograms (088), error.type + # on duration (090), and the enable_metrics-off no-op (091). + # Captured via a private MeterProvider + InMemoryMetricReader (the + # §6.9 metric-capture primitive). 089 (embeddings) is deferred. + "088-llm-metrics-token-and-duration", + "090-metrics-error-type-on-duration", + "091-metrics-disabled-no-measurements", } ) @@ -174,6 +182,14 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: # Proposal 0045 (nested-lineage augmentation, v0.37.0) — engine # + observer work lands in PR 11. "039-nested-lineage-augmentation": ("Proposal 0045 not yet implemented (PR 11)"), + # Proposal 0067 (GenAI metrics, v0.68.0) — the embedding metrics + # fixture sources from an embedding call, but the embedding capability + # (proposal 0059, observability §5.5.8 / §5.5.9) is unimplemented in + # python until v0.16.0, so there is no embedding event or provider to + # record from. The LLM-path metric fixtures (088 / 090 / 091) run. + "089-embedding-metrics-token-and-duration": ( + "Embedding capability (proposal 0059) unimplemented until v0.16.0" + ), } @@ -280,6 +296,12 @@ async def test_observability_fixture(fixture_path: Path) -> None: "087-llm-tool-call-request-survives-payload-gating", }: await _run_llm_payload_fixture(spec) + elif fixture_id in { + "088-llm-metrics-token-and-duration", + "090-metrics-error-type-on-duration", + "091-metrics-disabled-no-measurements", + }: + await _run_metrics_fixture(spec) else: raise AssertionError(f"no driver for supported fixture {fixture_id!r}") @@ -2990,6 +3012,114 @@ def _walk(expected_entries: list[dict[str, Any]]) -> None: _walk(expected_tree) +# --------------------------------------------------------------------------- +# Proposal 0067 — GenAI metrics fixtures (088 / 090 / 091) +# --------------------------------------------------------------------------- +# +# The §6.9 metric-capture primitive: a private MeterProvider with an +# InMemoryMetricReader, injected into the OTelObserver so recorded +# measurements can be asserted. The driver reuses ``_build_simple_llm_graph`` +# (mock transport + provider) and asserts ``expected.metrics`` against the +# recorded data points. Duration values + bucket assignment are not asserted +# (§11.4); token.usage values are (the fixed-usage mock). + + +async def _run_metrics_fixture(spec: Mapping[str, Any]) -> None: + cases = cast("list[dict[str, Any]]", spec["cases"]) + for case in cases: + try: + await _run_metrics_case(case) + except AssertionError as e: + raise AssertionError(f"case {case.get('name')!r}: {e}") from e + + +async def _run_metrics_case(case: Mapping[str, Any]) -> None: + from opentelemetry.sdk.metrics import MeterProvider as SdkMeterProvider + from opentelemetry.sdk.metrics.export import InMemoryMetricReader + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + + from openarmature.graph import NodeException + + graph, state_cls, provider = _build_simple_llm_graph(case, populate_caller_metadata=False) + + reader = InMemoryMetricReader() + meter_provider = SdkMeterProvider(metric_readers=[reader]) + exporter = InMemorySpanExporter() + observer = OTelObserver( + span_processor=SimpleSpanProcessor(exporter), + enable_metrics=bool(case.get("enable_metrics", False)), + meter_provider=meter_provider, + ) + graph.attach_observer(observer) + + state = _make_state_instance(case, state_cls) + expected_error = cast("dict[str, Any] | None", case.get("expected_error")) + try: + if expected_error is not None: + # A failed provider call raises (LlmFailedEvent dispatched + # alongside per 0058); the failed-attempt event is still + # enqueued before the raise, so the duration metric records. + with pytest.raises(NodeException): + await graph.invoke(state) + else: + await graph.invoke(state) + await graph.drain() + finally: + await provider.aclose() + observer.shutdown() + + points = _collect_metric_points(reader) + expected_metrics = cast("list[dict[str, Any]]", case["expected"].get("metrics") or []) + _assert_metric_points(points, expected_metrics) + + +def _collect_metric_points(reader: Any) -> list[tuple[str, float, int, dict[str, Any]]]: + """Flatten an InMemoryMetricReader's data into + ``(instrument_name, point_sum, point_count, point_attributes)`` + tuples. Observations with identical attribute sets aggregate into one + histogram data point (sum + count).""" + data = reader.get_metrics_data() + points: list[tuple[str, float, int, dict[str, Any]]] = [] + if data is None: + return points + for resource_metric in data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + for pt in metric.data.data_points: + points.append((metric.name, pt.sum, pt.count, dict(pt.attributes))) + return points + + +def _assert_metric_points( + points: list[tuple[str, float, int, dict[str, Any]]], + expected_metrics: list[dict[str, Any]], +) -> None: + """Match each ``expected.metrics`` entry (instrument + exact + dimensions, plus value for token.usage) against a recorded data + point. An empty expected list asserts no measurements (fixture + 091).""" + if not expected_metrics: + assert points == [], f"expected no measurements; got {points}" + return + for expected in expected_metrics: + instrument = cast("str", expected["instrument"]) + exp_dims = cast("dict[str, Any]", expected.get("dimensions") or {}) + candidates = [p for p in points if p[0] == instrument and p[3] == exp_dims] + assert candidates, ( + f"no {instrument!r} observation with dimensions {exp_dims}; " + f"recorded: {[(p[0], p[3]) for p in points]}" + ) + # token.usage asserts the recorded value (from the fixed-usage + # mock); duration asserts presence + dimensions only (§11.4). + if "value" in expected: + assert candidates[0][1] == expected["value"], ( + f"{instrument!r} {exp_dims} value: expected {expected['value']}, got sum {candidates[0][1]}" + ) + + # --------------------------------------------------------------------------- # Proposal 0049 — typed LlmCompletionEvent fixtures (050-056) # --------------------------------------------------------------------------- diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 7ee0fb9..211ce50 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -9,7 +9,7 @@ def test_package_versions() -> None: assert openarmature.__version__ == "0.14.0" - assert openarmature.__spec_version__ == "0.67.0" + assert openarmature.__spec_version__ == "0.68.0" def test_spec_version_matches_pyproject() -> None: diff --git a/tests/unit/test_observability_otel.py b/tests/unit/test_observability_otel.py index 2fa80d6..cd4cd81 100644 --- a/tests/unit/test_observability_otel.py +++ b/tests/unit/test_observability_otel.py @@ -865,6 +865,192 @@ async def test_llm_span_output_tool_calls_payload_gating() -> None: ] +# --------------------------------------------------------------------------- +# Proposal 0067 — GenAI metrics (observability §11) +# --------------------------------------------------------------------------- + + +def _collect_metric_points(reader: Any) -> list[tuple[str, float, int, dict[str, Any]]]: + """Flatten an InMemoryMetricReader's collected data into + ``(instrument_name, point_sum, point_count, point_attributes)`` + tuples. Histogram observations with identical attribute sets + aggregate into one data point (sum + count), so per-attempt tests + assert on ``count``, not point cardinality.""" + data = reader.get_metrics_data() + points: list[tuple[str, float, int, dict[str, Any]]] = [] + if data is None: + return points + for resource_metric in data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + for pt in metric.data.data_points: + points.append((metric.name, pt.sum, pt.count, dict(pt.attributes))) + return points + + +async def _drive_metrics_events( + events: list[Any], + *, + enable_metrics: bool = True, + disable_llm_spans: bool = False, +) -> tuple[list[tuple[str, float, int, dict[str, Any]]], list[Any]]: + """Feed LlmRetryAttemptEvents through an OTelObserver wired to a + private MeterProvider + InMemoryMetricReader; return the captured + ``(metric_points, llm_complete_spans)``.""" + from opentelemetry.sdk.metrics import MeterProvider as SdkMeterProvider + from opentelemetry.sdk.metrics.export import InMemoryMetricReader + + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + + reader = InMemoryMetricReader() + meter_provider = SdkMeterProvider(metric_readers=[reader]) + exporter = InMemorySpanExporter() + observer = OTelObserver( + span_processor=SimpleSpanProcessor(exporter), + enable_metrics=enable_metrics, + disable_llm_spans=disable_llm_spans, + meter_provider=meter_provider, + ) + token = _set_invocation_id("inv-metrics") + try: + for event in events: + await observer(event) + finally: + _reset_invocation_id(token) + observer.shutdown() + llm_spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.llm.complete"] + return _collect_metric_points(reader), llm_spans + + +async def test_metrics_records_token_and_duration() -> None: + # Proposal 0067 §11 (mirrors fixture 088): a successful LLM attempt + # with usage {input 5, output 1} records two token.usage observations + # (input + output) and one duration observation, with the §11.3 + # dimensions. Duration value is not asserted (§11.4). + from openarmature.llm.response import Usage + from tests._helpers.typed_event import make_retry_attempt_event + + event = make_retry_attempt_event( + model="test-model", + provider="openai", + latency_ms=12.0, + usage=Usage(prompt_tokens=5, completion_tokens=1, total_tokens=6), + ) + points, _ = await _drive_metrics_events([event]) + token_points = [p for p in points if p[0] == "openarmature.gen_ai.client.token.usage"] + duration_points = [p for p in points if p[0] == "openarmature.gen_ai.client.operation.duration"] + by_type = {p[3]["openarmature.gen_ai.token.type"]: p for p in token_points} + assert by_type["input"][1] == 5 + assert by_type["output"][1] == 1 + for ttype in ("input", "output"): + dims = by_type[ttype][3] + assert dims["openarmature.gen_ai.operation"] == "chat" + assert dims["gen_ai.request.model"] == "test-model" + assert dims["gen_ai.system"] == "openai" + assert len(duration_points) == 1 + ddims = duration_points[0][3] + assert ddims["openarmature.gen_ai.operation"] == "chat" + assert ddims["gen_ai.request.model"] == "test-model" + assert ddims["gen_ai.system"] == "openai" + assert "error.type" not in ddims + + +async def test_metrics_records_duration_with_error_type_on_failure() -> None: + # Proposal 0067 §11.2 / §11.3 (mirrors fixture 090): a failed attempt + # records a duration observation carrying error.type, and NO + # token.usage observation (a failed attempt returned no usage). + from tests._helpers.typed_event import make_retry_attempt_event + + event = make_retry_attempt_event( + model="test-model", + provider="openai", + latency_ms=8.0, + finish_reason=None, + usage=None, + error_category="provider_unavailable", + error_type="ProviderUnavailable", + error_message="down", + ) + points, _ = await _drive_metrics_events([event]) + token_points = [p for p in points if p[0] == "openarmature.gen_ai.client.token.usage"] + duration_points = [p for p in points if p[0] == "openarmature.gen_ai.client.operation.duration"] + assert token_points == [] + assert len(duration_points) == 1 + assert duration_points[0][3]["error.type"] == "provider_unavailable" + assert duration_points[0][3]["openarmature.gen_ai.operation"] == "chat" + + +async def test_metrics_disabled_records_nothing() -> None: + # Proposal 0067 §11.1 (mirrors fixture 091): enable_metrics off (the + # default) creates no instrument and records nothing. + from openarmature.llm.response import Usage + from tests._helpers.typed_event import make_retry_attempt_event + + event = make_retry_attempt_event(usage=Usage(prompt_tokens=5, completion_tokens=1, total_tokens=6)) + points, _ = await _drive_metrics_events([event], enable_metrics=False) + assert points == [] + + +async def test_metrics_independent_of_disable_llm_spans() -> None: + # Proposal 0067 §11.1: metrics record even with spans disabled — the + # disable_llm_spans flag governs span emission only. + from openarmature.llm.response import Usage + from tests._helpers.typed_event import make_retry_attempt_event + + event = make_retry_attempt_event(usage=Usage(prompt_tokens=5, completion_tokens=1, total_tokens=6)) + points, llm_spans = await _drive_metrics_events([event], disable_llm_spans=True) + assert llm_spans == [] + assert any(p[0] == "openarmature.gen_ai.client.operation.duration" for p in points) + assert any(p[0] == "openarmature.gen_ai.client.token.usage" for p in points) + + +async def test_metrics_record_once_per_attempt_under_retry() -> None: + # Proposal 0067 §11.2 "Call-level retry": the duration histogram + # records once per attempt (failed attempts carry error.type), and + # token.usage only for an attempt that returned usage. Two failed + # attempts + one success -> 3 duration observations (2 with + # error.type), 2 token.usage observations (the success's input + + # output). Observations with identical dimensions aggregate into one + # data point, so this asserts on histogram counts. + from openarmature.llm.response import Usage + from tests._helpers.typed_event import make_retry_attempt_event + + failed = [ + make_retry_attempt_event( + llm_attempt_index=i, + latency_ms=5.0, + finish_reason=None, + usage=None, + error_category="provider_unavailable", + error_type="ProviderUnavailable", + error_message="down", + ) + for i in range(2) + ] + success = make_retry_attempt_event( + llm_attempt_index=2, + latency_ms=7.0, + usage=Usage(prompt_tokens=5, completion_tokens=1, total_tokens=6), + ) + points, _ = await _drive_metrics_events([*failed, success]) + duration_points = [p for p in points if p[0] == "openarmature.gen_ai.client.operation.duration"] + token_points = [p for p in points if p[0] == "openarmature.gen_ai.client.token.usage"] + # 3 duration observations total: 2 share the error dims (one + # aggregated point, count 2), the success is a separate point. + assert sum(p[2] for p in duration_points) == 3 + error_duration = [p for p in duration_points if p[3].get("error.type") == "provider_unavailable"] + assert sum(p[2] for p in error_duration) == 2 + success_duration = [p for p in duration_points if "error.type" not in p[3]] + assert sum(p[2] for p in success_duration) == 1 + # token.usage only from the success attempt: one input, one output. + by_type = {p[3]["openarmature.gen_ai.token.type"]: p for p in token_points} + assert by_type["input"][1] == 5 and by_type["input"][2] == 1 + assert by_type["output"][1] == 1 and by_type["output"][2] == 1 + + async def test_llm_span_zero_duration_when_latency_missing() -> None: # When the typed event omits latency_ms (None), the handler falls # back to a zero-duration span at end_time rather than guessing