From f05ebf1ea7890a45df3fce6e16b2908583ce002c Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Mon, 22 Jun 2026 15:54:21 -0700 Subject: [PATCH 1/3] Add tool-execution observability The model requests tools in its completion (0076); the caller runs them in node-body code, which was invisible to observers. Add the with_tool_call instrumentation scope -- a context manager (like with_active_prompt) the caller wraps a tool execution in -- plus the typed ToolCallEvent / ToolCallFailedEvent it dispatches at outcome time (re-raising on failure; the failure event carries error_type / error_message and deliberately no error_category). The OTel observer renders an openarmature.tool.call span (OA-namespace attributes, error.type on failure); the Langfuse observer renders a dedicated Tool observation (asType "tool"), which adds a tool() method to the client Protocol, the in-memory recorder, and the SDK adapter. Arguments and result are payload, gated by disable_provider_payload. Also harden both observers' payload serialization with default=str so an opaque tool result JSON can't encode renders via str() instead of crashing the observer, and back-date the Langfuse Tool observation (generalize the back-dating helper to wrap LangfuseTool). Implements proposal 0063 (graph-engine 6, observability 5.5 / 8.4). --- src/openarmature/graph/events.py | 112 +++++++++++ src/openarmature/graph/observer.py | 4 + src/openarmature/observability/__init__.py | 7 + src/openarmature/observability/correlation.py | 12 ++ .../observability/langfuse/adapter.py | 75 ++++++-- .../observability/langfuse/client.py | 51 ++++- .../observability/langfuse/observer.py | 96 +++++++++- .../observability/otel/observer.py | 130 ++++++++++++- src/openarmature/observability/tool_call.py | 173 +++++++++++++++++ tests/unit/test_observability_langfuse.py | 178 ++++++++++++++++++ .../test_observability_langfuse_adapter.py | 46 +++++ tests/unit/test_observability_otel.py | 141 ++++++++++++++ tests/unit/test_tool_call.py | 124 ++++++++++++ 13 files changed, 1127 insertions(+), 22 deletions(-) create mode 100644 src/openarmature/observability/tool_call.py create mode 100644 tests/unit/test_tool_call.py diff --git a/src/openarmature/graph/events.py b/src/openarmature/graph/events.py index efb2df6..79bb4ec 100644 --- a/src/openarmature/graph/events.py +++ b/src/openarmature/graph/events.py @@ -794,6 +794,116 @@ class FailureIsolatedEvent: caught_exception: CaughtException +# Spec: realizes graph-engine §6 tool-execution observer events +# (proposal 0063, spec v0.69.0), the success+failure pair the +# tool-call instrumentation scope (``with_tool_call``) dispatches around +# a caller's tool execution. OA observes the execution; it does NOT run, +# select, loop, or feed back tools (llm-provider §1). Mirrors the +# 0049/0058 LLM completion/failure pairing and the §6 dispatch treatment +# (no ``phase`` discriminator, not subject to the ``phases`` filter; +# observers filter by type discrimination). ``arguments`` / ``result`` +# are payload-bearing: populated unconditionally here, observer-side +# gating applies at rendering per observability §5.5.4 +# (``disable_provider_payload``). +@dataclass(frozen=True) +class ToolCallEvent: + """A successful tool execution delivered to observers. + + Dispatched by the tool-call instrumentation scope when a caller's + tool execution returns a result. Observer code filters by type + discrimination (``isinstance(event, ToolCallEvent)``). + + Field set: + + - ``invocation_id`` / ``correlation_id`` / ``node_name`` / + ``namespace`` / ``attempt_index`` / ``fan_out_index`` / + ``branch_name``: the scope-entry identity of the node that ran + the tool (captured when the scope was entered). + - ``call_id``: per-execution disambiguator minted when the scope is + entered. Always present; distinct from ``tool_call_id`` (this is + OA's own correlation token for the execution). + - ``tool_name``: the name of the tool / function executed. + - ``tool_call_id``: the ``ToolCall.id`` of the + ``LlmCompletionEvent.output_tool_calls`` entry this execution + satisfies (the linkage back to the requesting LLM call); ``None`` + when the instrumented function did not originate from an LLM tool + request. + - ``arguments``: the arguments the tool was invoked with; ``None`` + when the tool takes no arguments. Payload-bearing. + - ``result``: the tool's return value as the tool produced it + (pre-serialization, language-idiomatic; opaque to OA). + Payload-bearing. + - ``latency_ms``: wall-clock latency measured at the scope boundary, + in milliseconds; ``None`` when not measured. + - ``caller_invocation_metadata``: optional snapshot of caller- + supplied invocation metadata, same opt-in semantics as on + :class:`LlmCompletionEvent`. + """ + + invocation_id: str + correlation_id: str | None + node_name: str + namespace: tuple[str, ...] + attempt_index: int + fan_out_index: int | None + branch_name: str | None + call_id: str + tool_name: str + tool_call_id: str | None + arguments: Mapping[str, Any] | None + result: Any + latency_ms: float | None + caller_invocation_metadata: Mapping[str, AttributeValue] | None = None + + +# Spec: the failure variant (proposal 0063). Mirrors ToolCallEvent's +# identity / scoping / request-side fields with ``result`` absent and +# two failure fields. DELIBERATELY carries NO ``error_category``: tool +# execution is arbitrary user / third-party code with no closed +# llm-provider §7 failure taxonomy (the departure from LlmFailedEvent / +# EmbeddingFailedEvent). Mutually exclusive with ToolCallEvent per +# execution; dispatched ALONGSIDE the re-raised exception, not in place +# of it. +@dataclass(frozen=True) +class ToolCallFailedEvent: + """A failed tool execution delivered to observers. + + Dispatched by the tool-call instrumentation scope when a caller's + tool execution raises (the exception then re-raises out of the + scope; the event is dispatched alongside it, not in place of it). + Observer code filters by type discrimination. + + Field set: the identity / scoping / request-side fields of + :class:`ToolCallEvent` (``tool_name`` / ``tool_call_id`` / + ``arguments`` / ``latency_ms`` / ``call_id``), the success-only + ``result`` absent, plus: + + - ``error_type``: the exception class name (e.g. ``"TimeoutError"``) + or a tool-defined error code; ``None`` when no type is available. + - ``error_message``: the message from the raised exception; always + present (empty string when the exception carried no message). + + There is no ``error_category`` (the deliberate departure from the + provider failure events). + """ + + invocation_id: str + correlation_id: str | None + node_name: str + namespace: tuple[str, ...] + attempt_index: int + fan_out_index: int | None + branch_name: str | None + call_id: str + tool_name: str + tool_call_id: str | None + arguments: Mapping[str, Any] | None + latency_ms: float | None + error_type: str | None + error_message: str + caller_invocation_metadata: Mapping[str, AttributeValue] | None = None + + __all__ = [ "FailureIsolatedEvent", "FanOutEventConfig", @@ -805,4 +915,6 @@ class FailureIsolatedEvent: "MetadataAugmentationEvent", "NodeEvent", "ParallelBranchesEventConfig", + "ToolCallEvent", + "ToolCallFailedEvent", ] diff --git a/src/openarmature/graph/observer.py b/src/openarmature/graph/observer.py index da6b3e7..383bc9f 100644 --- a/src/openarmature/graph/observer.py +++ b/src/openarmature/graph/observer.py @@ -43,6 +43,8 @@ LlmRetryAttemptEvent, MetadataAugmentationEvent, NodeEvent, + ToolCallEvent, + ToolCallFailedEvent, ) from .state import State @@ -71,6 +73,8 @@ | LlmFailedEvent | LlmRetryAttemptEvent | FailureIsolatedEvent + | ToolCallEvent + | ToolCallFailedEvent ) diff --git a/src/openarmature/observability/__init__.py b/src/openarmature/observability/__init__.py index cba414f..ff7dfef 100644 --- a/src/openarmature/observability/__init__.py +++ b/src/openarmature/observability/__init__.py @@ -56,9 +56,15 @@ set_invocation_metadata, ) +# v0.15.0 (proposal 0063): the tool-call instrumentation scope users +# wrap a tool execution in from inside a node body, so OA emits the +# ToolCallEvent / ToolCallFailedEvent around it. +from .tool_call import ToolCallScope, with_tool_call + __all__ = [ "LLM_NAMESPACE", "LlmEventPayload", + "ToolCallScope", "current_active_observers", "current_attempt_index", "current_correlation_id", @@ -69,4 +75,5 @@ "current_namespace_prefix", "get_invocation_metadata", "set_invocation_metadata", + "with_tool_call", ] diff --git a/src/openarmature/observability/correlation.py b/src/openarmature/observability/correlation.py index ba83e00..442ff4c 100644 --- a/src/openarmature/observability/correlation.py +++ b/src/openarmature/observability/correlation.py @@ -45,6 +45,8 @@ LlmRetryAttemptEvent, MetadataAugmentationEvent, NodeEvent, + ToolCallEvent, + ToolCallFailedEvent, ) from openarmature.graph.observer import SubscribedObserver @@ -230,6 +232,8 @@ def _reset_active_observers(token: Token[tuple[SubscribedObserver, ...]]) -> Non | LlmFailedEvent | LlmRetryAttemptEvent | FailureIsolatedEvent + | ToolCallEvent + | ToolCallFailedEvent ], None, ] @@ -248,6 +252,8 @@ def current_dispatch() -> ( | LlmFailedEvent | LlmRetryAttemptEvent | FailureIsolatedEvent + | ToolCallEvent + | ToolCallFailedEvent ], None, ] @@ -277,6 +283,8 @@ def _set_active_dispatch( | LlmFailedEvent | LlmRetryAttemptEvent | FailureIsolatedEvent + | ToolCallEvent + | ToolCallFailedEvent ], None, ], @@ -291,6 +299,8 @@ def _set_active_dispatch( | LlmFailedEvent | LlmRetryAttemptEvent | FailureIsolatedEvent + | ToolCallEvent + | ToolCallFailedEvent ], None, ] @@ -313,6 +323,8 @@ def _reset_active_dispatch( | LlmFailedEvent | LlmRetryAttemptEvent | FailureIsolatedEvent + | ToolCallEvent + | ToolCallFailedEvent ], None, ] diff --git a/src/openarmature/observability/langfuse/adapter.py b/src/openarmature/observability/langfuse/adapter.py index ed7bc93..6f926f3 100644 --- a/src/openarmature/observability/langfuse/adapter.py +++ b/src/openarmature/observability/langfuse/adapter.py @@ -387,7 +387,10 @@ def generation( # This is the only path to a back-dated Generation in # v4.7; the live-account integration test catches a future # SDK break. - obs = self._start_back_dated_generation( + from langfuse._client.span import LangfuseGeneration + + obs = self._start_back_dated_observation( + LangfuseGeneration, trace_id=trace_id, name=name, metadata=metadata, @@ -410,8 +413,56 @@ def generation( ) return _SpanHandle(obs) - def _start_back_dated_generation( + def tool( + self, + *, + trace_id: str, + name: str | None = None, + metadata: dict[str, Any] | None = None, + parent_observation_id: str | None = None, + level: ObservationLevel = "DEFAULT", + status_message: str | None = None, + input: Any = None, + output: Any = None, + start_time: datetime | None = None, + ) -> LangfuseSpanHandle: + # v4 unifies observations under start_observation(as_type=); a + # Tool observation routes through as_type="tool" (proposal 0063). + # When start_time is supplied, back-date via the private OTel + # tracer (the public API can't), exactly as generation() does — + # so the Tool observation's duration reflects the tool latency. + extra_kwargs: dict[str, Any] = {"input": input, "output": output} + present_extra = {k: v for k, v in extra_kwargs.items() if v is not None} + if start_time is not None: + from langfuse._client.span import LangfuseTool + + obs = self._start_back_dated_observation( + LangfuseTool, + trace_id=trace_id, + name=name, + metadata=metadata, + parent_observation_id=parent_observation_id, + level=level, + status_message=status_message, + start_time=start_time, + **present_extra, + ) + else: + obs = self._start_observation( + as_type="tool", + trace_id=trace_id, + name=name, + metadata=metadata, + parent_observation_id=parent_observation_id, + level=level, + status_message=status_message, + **present_extra, + ) + return _SpanHandle(obs) + + def _start_back_dated_observation( self, + observation_cls: type[Any], *, trace_id: str, name: str | None, @@ -422,11 +473,11 @@ def _start_back_dated_generation( start_time: datetime, **extra: Any, ) -> Any: - """Open a LangfuseGeneration at a back-dated timestamp by going - through the private OTel tracer rather than the public - ``start_observation`` API (which doesn't accept ``start_time`` - in v4.7). Mirrors the SDK's ``create_event`` precedent.""" - from langfuse._client.span import LangfuseGeneration + """Open a back-dated observation of ``observation_cls`` (e.g. + ``LangfuseGeneration`` / ``LangfuseTool``) by going through the + private OTel tracer rather than the public ``start_observation`` + API (which doesn't accept ``start_time`` in v4.7). Mirrors the + SDK's ``create_event`` precedent.""" from opentelemetry import trace as otel_trace_api trace_entry = self._trace_info.get(trace_id) @@ -459,17 +510,17 @@ def _start_back_dated_generation( name=name or "observation", start_time=start_time_ns, ) - generation_kwargs: dict[str, Any] = { + obs_kwargs: dict[str, Any] = { "otel_span": otel_span, "langfuse_client": self._client, "metadata": metadata, } if level != "DEFAULT": - generation_kwargs["level"] = level + obs_kwargs["level"] = level if status_message is not None: - generation_kwargs["status_message"] = status_message - generation_kwargs.update(extra) - return LangfuseGeneration(**generation_kwargs) + obs_kwargs["status_message"] = status_message + obs_kwargs.update(extra) + return observation_cls(**obs_kwargs) def force_flush(self, timeout_ms: int = 30_000) -> bool: """Best-effort flush of the underlying Langfuse client. diff --git a/src/openarmature/observability/langfuse/client.py b/src/openarmature/observability/langfuse/client.py index 4a9d6ed..ba73a03 100644 --- a/src/openarmature/observability/langfuse/client.py +++ b/src/openarmature/observability/langfuse/client.py @@ -33,7 +33,7 @@ from datetime import datetime from typing import Any, Literal, Protocol, runtime_checkable -ObservationType = Literal["span", "generation", "event"] +ObservationType = Literal["span", "generation", "event", "tool"] # Langfuse-supported `level` values per spec §8.4.2 (statusMessage pair). ObservationLevel = Literal["DEFAULT", "DEBUG", "INFO", "WARNING", "ERROR"] @@ -248,6 +248,26 @@ def generation( start_time: datetime | None = None, ) -> LangfuseGenerationHandle: ... + def tool( + self, + *, + trace_id: str, + name: str | None = None, + metadata: dict[str, Any] | None = None, + parent_observation_id: str | None = None, + level: ObservationLevel = "DEFAULT", + status_message: str | None = None, + input: Any = None, + output: Any = None, + start_time: datetime | None = None, + ) -> LangfuseSpanHandle: + """Open a dedicated Tool observation (proposal 0063, Langfuse + ``asType="tool"``). Like :meth:`generation` minus the model / + usage / prompt surface: a Tool carries ``input`` (arguments) / + ``output`` (result) / metadata / level. Returns a minimal + handle the caller ``.end()``s at outcome time.""" + ... + def force_flush(self, timeout_ms: int = 30_000) -> bool: """Flush any pending outbound buffer in the underlying sink. @@ -480,6 +500,35 @@ def generation( trace.observations.append(observation) return _InMemoryGenerationHandle(observation=observation) + def tool( + self, + *, + trace_id: str, + name: str | None = None, + metadata: dict[str, Any] | None = None, + parent_observation_id: str | None = None, + level: ObservationLevel = "DEFAULT", + status_message: str | None = None, + input: Any = None, + output: Any = None, + start_time: datetime | None = None, + ) -> LangfuseSpanHandle: + trace = self._get_trace(trace_id) + observation = LangfuseObservation( + id=self._mint_observation_id(), + type="tool", + name=name, + metadata=dict(metadata) if metadata is not None else {}, + parent_observation_id=parent_observation_id, + level=level, + status_message=status_message, + input=input, + output=output, + start_time=start_time, + ) + trace.observations.append(observation) + return _InMemorySpanHandle(observation=observation) + def force_flush(self, timeout_ms: int = 30_000) -> bool: # In-memory recorder has no outbound buffer; every observation # is captured synchronously on its create call. The ``timeout_ms`` diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index 4523901..025574e 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -38,6 +38,8 @@ LlmRetryAttemptEvent, MetadataAugmentationEvent, NodeEvent, + ToolCallEvent, + ToolCallFailedEvent, ) from openarmature.observability.lineage import is_strict_prefix @@ -46,6 +48,7 @@ LangfuseGenerationHandle, LangfuseSpanHandle, LangfuseUsage, + ObservationLevel, ) # §5.5.5 / §8.7 truncation: when the serialized payload exceeds the @@ -381,6 +384,8 @@ async def __call__( | LlmFailedEvent | LlmRetryAttemptEvent | FailureIsolatedEvent + | ToolCallEvent + | ToolCallFailedEvent ), ) -> None: if isinstance(event, InvocationStartedEvent): @@ -409,6 +414,12 @@ async def __call__( if not self.disable_llm_spans: self._handle_typed_llm_failed(event) return + # Proposal 0063 tool-execution observability: render the dedicated + # Langfuse Tool observation (asType "tool") under the calling + # node's Span observation. + if isinstance(event, ToolCallEvent | ToolCallFailedEvent): + self._handle_tool_call(event) + return # Proposal 0050 §6.3 framework-emitted failure-isolation event. if isinstance(event, FailureIsolatedEvent): self._handle_failure_isolated(event) @@ -1585,6 +1596,75 @@ def _handle_typed_llm_failed(self, event: LlmFailedEvent) -> None: status_message=event.error_category, ) + def _handle_tool_call(self, event: ToolCallEvent | ToolCallFailedEvent) -> None: + """Open + close a dedicated Tool observation (Langfuse + ``asType="tool"``, proposal 0063) under the calling node's Span + observation. DEFAULT level on a ToolCallEvent; ERROR (with + ``error_type`` / ``error_message`` in metadata and as the status + message) on a ToolCallFailedEvent. ``input`` (arguments) / + ``output`` (result) are payload-gated per ``disable_provider_payload``. + """ + from openarmature.observability.correlation import ( + current_correlation_id, + current_invocation_id, + ) + + invocation_id = current_invocation_id() + if invocation_id is None: + return + correlation_id = current_correlation_id() + if invocation_id not in self._inv_states: + self._open_trace_for_typed_event(invocation_id, correlation_id, event) + inv_state = self._inv_states[invocation_id] + end_time = datetime.now(UTC) + if event.latency_ms is not None: + start_time = end_time - timedelta(milliseconds=event.latency_ms) + else: + start_time = end_time + parent_observation_id = self._resolve_llm_parent_observation_id( + inv_state, + calling_namespace_prefix=event.namespace, + calling_attempt_index=event.attempt_index, + calling_fan_out_index=event.fan_out_index, + calling_branch_name=event.branch_name, + ) + # §8.4.6 metadata: tool name always, tool_call_id when present. + metadata: dict[str, Any] = {"openarmature_tool_name": event.tool_name} + if event.tool_call_id is not None: + metadata["openarmature_tool_call_id"] = event.tool_call_id + input_value: Any = None + output_value: Any = None + if not self.disable_provider_payload: + if event.arguments is not None: + input_value = self._maybe_truncate_for_input(event.arguments) + if isinstance(event, ToolCallEvent): + # The tool result is a structured value (not a plain + # string like output_content), so reuse the structured + # truncator: native value when it fits, marker string + # otherwise. + output_value = self._maybe_truncate_for_input(event.result) + level: ObservationLevel = "DEFAULT" + status_message: str | None = None + if isinstance(event, ToolCallFailedEvent): + level = "ERROR" + if event.error_type is not None: + metadata["error_type"] = event.error_type + metadata["error_message"] = event.error_message + status_message = event.error_message + target_trace_id = self._trace_id_for(inv_state, event.namespace, event.fan_out_index) + handle = self.client.tool( + trace_id=target_trace_id, + name="openarmature.tool.call", + input=input_value, + output=output_value, + metadata=metadata, + parent_observation_id=parent_observation_id, + level=level, + status_message=status_message, + start_time=start_time, + ) + handle.end(end_time=end_time) + def _resolve_llm_parent_observation_id( self, inv_state: _InvState, @@ -1696,12 +1776,15 @@ def _resolve_prompt_link_from_typed_event(self, event: LlmCompletionEvent | LlmF return cast("dict[str, Any]", entities).get("langfuse_prompt") def _open_trace_for_typed_event( - self, invocation_id: str, correlation_id: str | None, event: LlmCompletionEvent | LlmFailedEvent + self, + invocation_id: str, + correlation_id: str | None, + event: LlmCompletionEvent | LlmFailedEvent | ToolCallEvent | ToolCallFailedEvent, ) -> None: - """Trace open path for a typed LLM event (LlmCompletionEvent or - LlmFailedEvent) arriving before any node-started event reached + """Trace open path for a typed event (LLM completion / failure or + tool execution) arriving before any node-started event reached this observer. Synthesizes the minimal trace shape from the - typed event's scoping fields.""" + typed event's scoping fields (all read generically).""" if event.namespace: entry_node = event.namespace[0] else: @@ -1757,7 +1840,10 @@ def _maybe_truncate_for_extras(self, value: dict[str, Any]) -> Any: def _serialize_payload_value(value: Any) -> str: # Mirrors observability/otel/observer.py's _serialize_for_attribute # so both observers see the same string under the same cap. - return json.dumps(value, sort_keys=True, separators=(",", ":"), ensure_ascii=False) + # ``default=str`` is the same safety net: an opaque tool result + # JSON can't natively encode renders via str() rather than + # raising inside the observer (no-op for the encodable payloads). + return json.dumps(value, sort_keys=True, separators=(",", ":"), ensure_ascii=False, default=str) def _truncate(serialized: str, cap_bytes: int) -> str | None: diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py index 94a3ee6..bb6854c 100644 --- a/src/openarmature/observability/otel/observer.py +++ b/src/openarmature/observability/otel/observer.py @@ -108,6 +108,8 @@ LlmRetryAttemptEvent, MetadataAugmentationEvent, NodeEvent, + ToolCallEvent, + ToolCallFailedEvent, ) from openarmature.observability.lineage import is_strict_prefix from openarmature.observability.llm_event import serialize_tool_calls @@ -303,8 +305,15 @@ def _span_chain_on_path( # v0.17.0 — conformance fixtures use parse-shape assertions, not # bytewise equality. def _serialize_for_attribute(value: Any) -> str: - """JSON-encode ``value`` for emission as an OTel string attribute.""" - return json.dumps(value, sort_keys=True, separators=(",", ":"), ensure_ascii=False) + """JSON-encode ``value`` for emission as an OTel string attribute. + + ``default=str`` is a safety net so a value JSON can't natively encode + renders via its ``str()`` rather than raising inside the observer + (which would lose the whole span). It is a no-op for the + already-encodable payloads (messages, params); it matters for the + proposal 0063 tool ``result``, which is an opaque, language-idiomatic + value the tool produced (a model, dataclass, datetime, ...).""" + return json.dumps(value, sort_keys=True, separators=(",", ":"), ensure_ascii=False, default=str) # §5.5.5 truncation algorithm: @@ -532,7 +541,19 @@ class OTelObserver: # close sites (subgraph dispatch, detached root, fan-out instance, # invocation span, shutdown drain). attribute_enrichers: Sequence[ - Callable[[Span, NodeEvent | LlmCompletionEvent | LlmFailedEvent | LlmRetryAttemptEvent | None], None] + Callable[ + [ + Span, + NodeEvent + | LlmCompletionEvent + | LlmFailedEvent + | LlmRetryAttemptEvent + | ToolCallEvent + | ToolCallFailedEvent + | None, + ], + None, + ] ] = () # Read from the package's ``__spec_version__`` (one of the three # places the spec version is pinned per CLAUDE.md). Bumping the @@ -630,7 +651,15 @@ def __post_init__(self) -> None: # ``event`` is None on synthetic close sites (subgraph dispatch, # detached root, fan-out instance, invocation span, orphan drain). def _run_enrichers( - self, span: Span, event: NodeEvent | LlmCompletionEvent | LlmFailedEvent | LlmRetryAttemptEvent | None + self, + span: Span, + event: NodeEvent + | LlmCompletionEvent + | LlmFailedEvent + | LlmRetryAttemptEvent + | ToolCallEvent + | ToolCallFailedEvent + | None, ) -> None: """Invoke configured enrichers against ``span`` before ``span.end()`` is called.""" @@ -678,6 +707,8 @@ async def __call__( | LlmFailedEvent | LlmRetryAttemptEvent | FailureIsolatedEvent + | ToolCallEvent + | ToolCallFailedEvent ), ) -> None: # Proposal 0043 invocation-boundary events: OTel has no @@ -705,6 +736,13 @@ async def __call__( # consumers, so the OTel observer ignores them here. if isinstance(event, LlmCompletionEvent | LlmFailedEvent): return + # Proposal 0063 tool-execution observability: emit the + # openarmature.tool.call span from the typed tool events. + # Independent of disable_llm_spans (that flag is scoped to LLM + # completion spans per §5.5.4). + if isinstance(event, ToolCallEvent | ToolCallFailedEvent): + self._handle_tool_call(event) + return # Proposal 0050 §6.3 framework-emitted failure-isolation event. if isinstance(event, FailureIsolatedEvent): self._handle_failure_isolated(event) @@ -1524,6 +1562,90 @@ def _handle_typed_llm_retry_attempt(self, event: LlmRetryAttemptEvent) -> None: self._run_enrichers(span, event) span.end(end_time=end_time_ns) + def _handle_tool_call(self, event: ToolCallEvent | ToolCallFailedEvent) -> None: + """Emit an ``openarmature.tool.call`` span for a tool execution + (proposal 0063), parented under the calling node. + + A ``ToolCallEvent`` renders OK with the result attribute; a + ``ToolCallFailedEvent`` renders ERROR with the standard OTel + ``error.type`` attribute + an exception event carrying the + message. ``arguments`` / ``result`` are payload, gated by + ``disable_provider_payload`` (§5.5.4) + the §5.5.5 truncation + contract. The OA-namespace ``openarmature.tool.*`` attributes + mirror the Development ``gen_ai.tool.*`` surface, which is NOT + emitted in v1. ``disable_llm_spans`` does not gate this span. + """ + from openarmature.observability.correlation import ( + current_correlation_id, + current_invocation_id, + ) + + invocation_id = current_invocation_id() + if invocation_id is None: + return + inv_state = self._inv_state_for(invocation_id) + # Back-date start_time by latency_ms so the span duration reflects + # the scope-boundary measurement, mirroring the LLM span. + end_time_ns = time.time_ns() + if event.latency_ms is not None: + start_time_ns = end_time_ns - int(event.latency_ms * 1_000_000) + else: + start_time_ns = end_time_ns + parent_ctx = self._resolve_llm_parent( + inv_state, + invocation_id, + calling_namespace_prefix=event.namespace, + calling_attempt_index=event.attempt_index, + calling_fan_out_index=event.fan_out_index, + calling_branch_name=event.branch_name, + ) + attrs: dict[str, Any] = {"openarmature.tool.name": event.tool_name} + if event.tool_call_id is not None: + attrs["openarmature.tool.call.id"] = event.tool_call_id + cid = current_correlation_id() + if cid is not None: + attrs["openarmature.correlation_id"] = cid + if event.caller_invocation_metadata is not None: + _apply_caller_metadata(attrs, event.caller_invocation_metadata) + # §5.5.4 payload gating: arguments (both variants) + result + # (success only) are payload. No gen_ai.tool.* (mirrored, + # Development); disable_genai_semconv is not applicable. + if not self.disable_provider_payload: + if event.arguments is not None: + serialized_args = _serialize_for_attribute(event.arguments) + attrs["openarmature.tool.call.arguments"] = _truncate_for_attribute( + serialized_args, self.payload_max_bytes + ) + if isinstance(event, ToolCallEvent): + serialized_result = _serialize_for_attribute(event.result) + attrs["openarmature.tool.call.result"] = _truncate_for_attribute( + serialized_result, self.payload_max_bytes + ) + span = self._tracer.start_span( + name="openarmature.tool.call", + context=cast("Any", parent_ctx), + kind=SpanKind.INTERNAL, + attributes=attrs, + start_time=start_time_ns, + ) + if isinstance(event, ToolCallFailedEvent): + # §4.2 ERROR mapping via the standard OTel error.type + + # an exception event carrying the message. + if event.error_type is not None: + span.set_attribute("error.type", event.error_type) + span.add_event( + "exception", + attributes={ + "exception.type": event.error_type or "", + "exception.message": event.error_message, + }, + ) + span.set_status(Status(StatusCode.ERROR, description=event.error_message or event.error_type)) + else: + span.set_status(Status(StatusCode.OK)) + self._run_enrichers(span, event) + span.end(end_time=end_time_ns) + def _handle_failure_isolated(self, event: FailureIsolatedEvent) -> None: """Emit a zero-duration ``openarmature.failure_isolated`` span for a FailureIsolationMiddleware catch. diff --git a/src/openarmature/observability/tool_call.py b/src/openarmature/observability/tool_call.py new file mode 100644 index 0000000..b570953 --- /dev/null +++ b/src/openarmature/observability/tool_call.py @@ -0,0 +1,173 @@ +# Spec: realizes the graph-engine §6 tool-call instrumentation scope +# (proposal 0063, spec v0.69.0). A node-body primitive the caller wraps +# a tool execution in; OA observes the execution and dispatches a typed +# ToolCallEvent (success) or ToolCallFailedEvent (failure) at outcome +# time. OA does NOT run, select, loop, retry, or feed back tools +# (llm-provider §1) -- the caller runs the tool inside the scope. +# +# Shape follows the existing node-body context-manager precedent +# (prompts.context.with_active_prompt / with_active_prompt_group): a sync +# @contextmanager. A sync ``with`` brackets the awaited tool call in its +# body fine, and everything the scope does (capture identity, mint +# call_id, time, dispatch) is synchronous. Identity is captured at scope +# ENTRY (the §6 scope-entry-identity rule; the inline case is the trivial +# instance where entry and outcome share one context). Dispatch goes +# through ``current_dispatch()``, the same path set_invocation_metadata / +# FailureIsolationMiddleware use; it is None outside an invocation (no +# observers), in which case the body still runs and only the event is +# skipped. +# +# v1 ships this inline bracketing form only; the deferred start/complete +# split (result lands in a later turn) is a spec MAY, not yet needed. + +"""Tool-call instrumentation scope: ``with_tool_call``.""" + +from __future__ import annotations + +import time +import uuid +from collections.abc import Iterator, Mapping +from contextlib import contextmanager +from typing import Any + + +# Sentinel distinguishing "the caller never reported a result" from "the +# tool returned None" -- a forgotten set_result() resolves to a null +# result rather than masquerading as a real one. +class _Unset: + pass + + +_UNSET = _Unset() + + +class ToolCallScope: + """Handle yielded by :func:`with_tool_call`. + + The caller reports the tool's return value via :meth:`set_result` so + the success event can carry it. ``call_id`` is OA's per-execution + correlation token (minted when the scope is entered), exposed for the + caller to correlate a deferred completion if needed. + """ + + __slots__ = ("call_id", "_result") + + def __init__(self, call_id: str) -> None: + self.call_id = call_id + self._result: Any = _UNSET + + def set_result(self, value: Any) -> None: + """Report the tool's return value to the scope.""" + self._result = value + + +@contextmanager +def with_tool_call( + tool_name: str, + arguments: Mapping[str, Any] | None = None, + *, + tool_call_id: str | None = None, +) -> Iterator[ToolCallScope]: + """Instrument a tool execution inside a node body. + + Wrap the caller's tool execution in this scope and report the + result via :meth:`ToolCallScope.set_result`:: + + with with_tool_call("get_weather", {"city": "Paris"}, tool_call_id="call_abc") as scope: + result = await get_weather(city="Paris") + scope.set_result(result) + + On clean exit a :class:`~openarmature.graph.events.ToolCallEvent` is + dispatched carrying the reported result; on an exception a + :class:`~openarmature.graph.events.ToolCallFailedEvent` is dispatched + (with the exception's type + message) and the exception **re-raises** + -- the scope observes, it does not swallow. OA does not run the tool, + choose it, loop, or feed the result back to the model; those stay in + the caller's graph. + + ``arguments`` is the observability representation of the call inputs + (for an LLM-originated call, the parsed ``ToolCall.arguments``); it is + independent of how the caller actually invokes the tool. + ``tool_call_id`` links back to the ``LlmCompletionEvent.output_tool_calls`` + entry this execution satisfies, or ``None`` for a standalone + instrumented function. ``arguments`` and the result are payload; + observer-side gating (``disable_provider_payload``) applies at + rendering. + """ + from openarmature.graph.events import ToolCallEvent, ToolCallFailedEvent + + from .correlation import ( + current_attempt_index, + current_branch_name, + current_correlation_id, + current_dispatch, + current_fan_out_index, + current_invocation_id, + current_namespace_prefix, + ) + from .metadata import current_invocation_metadata + + # Scope-entry identity (§6): the node that initiated the execution. + namespace = current_namespace_prefix() + node_name = namespace[-1] if namespace else "" + invocation_id = current_invocation_id() or "" + correlation_id = current_correlation_id() + attempt_index = current_attempt_index() + fan_out_index = current_fan_out_index() + branch_name = current_branch_name() + caller_metadata = dict(current_invocation_metadata()) + call_id = uuid.uuid4().hex + dispatch = current_dispatch() + + scope = ToolCallScope(call_id) + start = time.perf_counter() + try: + yield scope + except Exception as exc: + latency_ms = (time.perf_counter() - start) * 1000.0 + if dispatch is not None: + dispatch( + ToolCallFailedEvent( + invocation_id=invocation_id, + correlation_id=correlation_id, + node_name=node_name, + namespace=namespace, + attempt_index=attempt_index, + fan_out_index=fan_out_index, + branch_name=branch_name, + call_id=call_id, + tool_name=tool_name, + tool_call_id=tool_call_id, + arguments=arguments, + latency_ms=latency_ms, + error_type=type(exc).__name__, + error_message=str(exc), + caller_invocation_metadata=caller_metadata, + ) + ) + # Observe, don't swallow: the exception propagates to the caller. + raise + latency_ms = (time.perf_counter() - start) * 1000.0 + if dispatch is not None: + result = None if isinstance(scope._result, _Unset) else scope._result + dispatch( + ToolCallEvent( + invocation_id=invocation_id, + correlation_id=correlation_id, + node_name=node_name, + namespace=namespace, + attempt_index=attempt_index, + fan_out_index=fan_out_index, + branch_name=branch_name, + call_id=call_id, + tool_name=tool_name, + tool_call_id=tool_call_id, + arguments=arguments, + result=result, + latency_ms=latency_ms, + caller_invocation_metadata=caller_metadata, + ) + ) + + +__all__ = ["ToolCallScope", "with_tool_call"] diff --git a/tests/unit/test_observability_langfuse.py b/tests/unit/test_observability_langfuse.py index e9cdeb0..48b5a7d 100644 --- a/tests/unit/test_observability_langfuse.py +++ b/tests/unit/test_observability_langfuse.py @@ -1520,3 +1520,181 @@ async def test_typed_failed_event_parents_under_branch_calling_node() -> None: assert len(error_gens) == 1 assert error_gens[0].parent_observation_id == fast_handle.id assert error_gens[0].parent_observation_id != slow_handle.id + + +# --------------------------------------------------------------------------- +# Proposal 0063 — tool-execution Tool observation (asType "tool") +# --------------------------------------------------------------------------- + + +async def test_tool_call_event_renders_dedicated_tool_observation() -> None: + # A ToolCallEvent renders a dedicated Tool observation (type "tool", + # NOT generation), DEFAULT level, with input / output populated + # (payload on) and tool_name / tool_call_id in metadata. + from openarmature.graph.events import ToolCallEvent + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client, disable_provider_payload=False) + token = _set_invocation_id("inv-tool-1") + try: + await observer( + ToolCallEvent( + invocation_id="inv-tool-1", + correlation_id=None, + node_name="run_tool", + namespace=("run_tool",), + attempt_index=0, + fan_out_index=None, + branch_name=None, + call_id="cc-1", + tool_name="get_weather", + tool_call_id="call_abc123", + arguments={"city": "Paris"}, + result={"temperature_c": 20}, + latency_ms=5.0, + ) + ) + finally: + _reset_invocation_id(token) + + trace = client.traces["inv-tool-1"] + tools = [o for o in trace.observations if o.type == "tool"] + assert len(tools) == 1 + assert [o for o in trace.observations if o.type == "generation"] == [] + obs = tools[0] + assert obs.name == "openarmature.tool.call" + assert obs.level == "DEFAULT" + assert obs.input == {"city": "Paris"} + assert obs.output == {"temperature_c": 20} + assert obs.metadata.get("openarmature_tool_name") == "get_weather" + assert obs.metadata.get("openarmature_tool_call_id") == "call_abc123" + assert obs.ended is True + + +async def test_tool_call_failed_event_renders_error_level() -> None: + # A ToolCallFailedEvent renders the Tool observation at ERROR level + # with error_type / error_message in metadata and as the status + # message. + from openarmature.graph.events import ToolCallFailedEvent + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client, disable_provider_payload=False) + token = _set_invocation_id("inv-tool-2") + try: + await observer( + ToolCallFailedEvent( + invocation_id="inv-tool-2", + correlation_id=None, + node_name="run_tool", + namespace=("run_tool",), + attempt_index=0, + fan_out_index=None, + branch_name=None, + call_id="cc-2", + tool_name="get_weather", + tool_call_id="call_def456", + arguments={"city": "Paris"}, + latency_ms=3.0, + error_type="TimeoutError", + error_message="tool timed out", + ) + ) + finally: + _reset_invocation_id(token) + + obs = next(o for o in client.traces["inv-tool-2"].observations if o.type == "tool") + assert obs.level == "ERROR" + assert obs.status_message == "tool timed out" + assert obs.metadata.get("error_type") == "TimeoutError" + assert obs.metadata.get("error_message") == "tool timed out" + assert obs.metadata.get("openarmature_tool_name") == "get_weather" + + +async def test_tool_call_payload_gated_off_by_default() -> None: + # With disable_provider_payload at its default (True), the Tool + # observation's input / output are suppressed; metadata still carries + # the identity. + from openarmature.graph.events import ToolCallEvent + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client) + token = _set_invocation_id("inv-tool-3") + try: + await observer( + ToolCallEvent( + invocation_id="inv-tool-3", + correlation_id=None, + node_name="run_tool", + namespace=("run_tool",), + attempt_index=0, + fan_out_index=None, + branch_name=None, + call_id="cc-3", + tool_name="get_weather", + tool_call_id="call_abc123", + arguments={"city": "Paris"}, + result={"temperature_c": 20}, + latency_ms=5.0, + ) + ) + finally: + _reset_invocation_id(token) + + obs = next(o for o in client.traces["inv-tool-3"].observations if o.type == "tool") + assert obs.input is None + assert obs.output is None + assert obs.metadata.get("openarmature_tool_name") == "get_weather" + + +async def test_tool_call_non_json_result_does_not_crash_observer() -> None: + # Proposal 0063: the tool result is opaque. A value json.dumps can't + # natively encode MUST NOT crash the observer's serialization (which + # would lose the Tool observation); the observation is still emitted. + from openarmature.graph.events import ToolCallEvent + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + + class _Opaque: + def __str__(self) -> str: + return "OPAQUE-RESULT" + + client = InMemoryLangfuseClient() + observer = LangfuseObserver(client=client, disable_provider_payload=False) + token = _set_invocation_id("inv-tool-opaque") + try: + await observer( + ToolCallEvent( + invocation_id="inv-tool-opaque", + correlation_id=None, + node_name="run_tool", + namespace=("run_tool",), + attempt_index=0, + fan_out_index=None, + branch_name=None, + call_id="cc-4", + tool_name="get_weather", + tool_call_id="call_abc123", + arguments={"city": "Paris"}, + result=_Opaque(), + latency_ms=5.0, + ) + ) + finally: + _reset_invocation_id(token) + + tools = [o for o in client.traces["inv-tool-opaque"].observations if o.type == "tool"] + assert len(tools) == 1 diff --git a/tests/unit/test_observability_langfuse_adapter.py b/tests/unit/test_observability_langfuse_adapter.py index 0d567ee..a2f2527 100644 --- a/tests/unit/test_observability_langfuse_adapter.py +++ b/tests/unit/test_observability_langfuse_adapter.py @@ -210,6 +210,52 @@ def _start_observation_should_not_be_called(**_kwargs: Any) -> None: assert captured_otel_kwargs.get("name") == "g" +def test_adapter_tool_routes_back_dated_calls_via_otel_tracer(monkeypatch: pytest.MonkeyPatch) -> None: + # Proposal 0063: the Tool observation back-dates the same way + # generation() does — via the private _otel_tracer (the public + # start_observation rejects start_time=). Mirrors the generation + # back-dating test, and additionally verifies LangfuseTool constructs + # cleanly on the back-dated path. + from datetime import UTC, datetime + from unittest.mock import MagicMock + + client = _dummy_client() + captured_otel_kwargs: dict[str, Any] = {} + + def _otel_spy(**kwargs: Any) -> MagicMock: + captured_otel_kwargs.update(kwargs) + span = MagicMock() + span.get_span_context.return_value = MagicMock( + trace_id=int("a" * 32, 16), + span_id=int("b" * 16, 16), + ) + return span + + def _start_observation_should_not_be_called(**_kwargs: Any) -> None: + raise AssertionError( + "start_observation MUST NOT be called on the back-dated tool path; " + "v4 SDK rejects start_time= and the adapter should route via _otel_tracer" + ) + + monkeypatch.setattr(client._otel_tracer, "start_span", _otel_spy) # noqa: SLF001 + monkeypatch.setattr(client, "start_observation", _start_observation_should_not_be_called) + adapter = LangfuseSDKAdapter(client) + adapter.trace(id="trace-tool-ts", name="t") + + start = datetime(2026, 6, 8, 12, 0, 0, tzinfo=UTC) + adapter.tool( + trace_id="trace-tool-ts", + name="openarmature.tool.call", + input={"city": "Paris"}, + output={"temperature_c": 20}, + start_time=start, + ) + + expected_ns = int(start.timestamp() * 1_000_000_000) + assert captured_otel_kwargs.get("start_time") == expected_ns + assert captured_otel_kwargs.get("name") == "openarmature.tool.call" + + def test_adapter_generation_without_start_time_uses_public_api(monkeypatch: pytest.MonkeyPatch) -> None: # Companion to the back-dated test: when ``start_time`` is NOT # supplied, the adapter falls back to the v4 SDK's public diff --git a/tests/unit/test_observability_otel.py b/tests/unit/test_observability_otel.py index cd4cd81..9c08e9c 100644 --- a/tests/unit/test_observability_otel.py +++ b/tests/unit/test_observability_otel.py @@ -3710,3 +3710,144 @@ def _children(span: ReadableSpan) -> list[ReadableSpan]: # The NODE span's children are exactly the two dispatched branch spans. assert sorted(c.name for c in _children(node)) == ["fts", "keyword"] + + +# --------------------------------------------------------------------------- +# Proposal 0063 — tool-execution span (openarmature.tool.call) +# --------------------------------------------------------------------------- + + +async def _drive_tool_span(event: Any, *, disable_provider_payload: bool = True) -> Any: + """Feed a ToolCallEvent / ToolCallFailedEvent through the OTel + observer; return the single openarmature.tool.call ReadableSpan.""" + from openarmature.observability.correlation import ( + _reset_invocation_id, + _set_invocation_id, + ) + + exporter = InMemorySpanExporter() + observer = OTelObserver( + span_processor=SimpleSpanProcessor(exporter), + disable_provider_payload=disable_provider_payload, + ) + token = _set_invocation_id("inv-tool") + try: + await observer(event) + finally: + _reset_invocation_id(token) + observer.shutdown() + tool_spans = [s for s in exporter.get_finished_spans() if s.name == "openarmature.tool.call"] + assert len(tool_spans) == 1 + return tool_spans[0] + + +def _tool_call_event(**overrides: Any) -> Any: + from openarmature.graph.events import ToolCallEvent + + base: dict[str, Any] = { + "invocation_id": "inv-tool", + "correlation_id": None, + "node_name": "run_tool", + "namespace": ("run_tool",), + "attempt_index": 0, + "fan_out_index": None, + "branch_name": None, + "call_id": "cc-1", + "tool_name": "get_weather", + "tool_call_id": "call_abc", + "arguments": {"city": "Paris"}, + "result": {"temperature_c": 20}, + "latency_ms": 5.0, + } + base.update(overrides) + return ToolCallEvent(**base) + + +async def test_tool_span_emits_oa_namespace_attributes_not_gen_ai() -> None: + # Proposal 0063 §5.5 (mirrors fixture 097): the tool span uses + # OA-namespace openarmature.tool.* attributes; the Development + # gen_ai.tool.* surface is NOT emitted in v1. Payload on. + import json + + span = await _drive_tool_span(_tool_call_event(), disable_provider_payload=False) + attrs = dict(span.attributes or {}) + assert attrs["openarmature.tool.name"] == "get_weather" + assert attrs["openarmature.tool.call.id"] == "call_abc" + assert json.loads(attrs["openarmature.tool.call.arguments"]) == {"city": "Paris"} + assert json.loads(attrs["openarmature.tool.call.result"]) == {"temperature_c": 20} + for absent in ( + "gen_ai.tool.name", + "gen_ai.tool.call.id", + "gen_ai.tool.call.arguments", + "gen_ai.tool.call.result", + "gen_ai.operation.name", + ): + assert absent not in attrs + + +async def test_tool_span_payload_gated_off_by_default() -> None: + # Proposal 0063 §5.5.4 (mirrors fixture 096): arguments + result are + # payload, suppressed under disable_provider_payload (default True); + # the identity attributes still render. + span = await _drive_tool_span(_tool_call_event()) + attrs = dict(span.attributes or {}) + assert attrs["openarmature.tool.name"] == "get_weather" + assert attrs["openarmature.tool.call.id"] == "call_abc" + assert "openarmature.tool.call.arguments" not in attrs + assert "openarmature.tool.call.result" not in attrs + + +async def test_tool_span_omits_call_id_for_standalone() -> None: + # tool_call_id None (a standalone instrumented function) -> the + # openarmature.tool.call.id attribute is omitted entirely. + span = await _drive_tool_span(_tool_call_event(tool_call_id=None), disable_provider_payload=False) + attrs = dict(span.attributes or {}) + assert "openarmature.tool.call.id" not in attrs + assert attrs["openarmature.tool.name"] == "get_weather" + + +async def test_tool_failed_span_renders_error_status() -> None: + # Proposal 0063: a ToolCallFailedEvent renders ERROR with the + # standard OTel error.type + an exception event; no result attribute. + from opentelemetry.trace import StatusCode + + from openarmature.graph.events import ToolCallFailedEvent + + event = ToolCallFailedEvent( + invocation_id="inv-tool", + correlation_id=None, + node_name="run_tool", + namespace=("run_tool",), + attempt_index=0, + fan_out_index=None, + branch_name=None, + call_id="cc-1", + tool_name="get_weather", + tool_call_id="call_def", + arguments={"city": "Paris"}, + latency_ms=3.0, + error_type="TimeoutError", + error_message="tool timed out", + ) + span = await _drive_tool_span(event, disable_provider_payload=False) + attrs = dict(span.attributes or {}) + assert span.status.status_code == StatusCode.ERROR + assert attrs["error.type"] == "TimeoutError" + assert "openarmature.tool.call.result" not in attrs + exception_events = [e for e in span.events if e.name == "exception"] + assert len(exception_events) == 1 + assert dict(exception_events[0].attributes or {})["exception.message"] == "tool timed out" + + +async def test_tool_span_serializes_non_json_result_via_str_fallback() -> None: + # Proposal 0063: the tool result is opaque (any language-idiomatic + # value). A value json.dumps can't natively encode MUST NOT crash the + # observer (which would lose the whole span); it renders via str(). + class _Opaque: + def __str__(self) -> str: + return "OPAQUE-RESULT" + + span = await _drive_tool_span(_tool_call_event(result=_Opaque()), disable_provider_payload=False) + attrs = dict(span.attributes or {}) + assert "openarmature.tool.call.result" in attrs + assert "OPAQUE-RESULT" in attrs["openarmature.tool.call.result"] diff --git a/tests/unit/test_tool_call.py b/tests/unit/test_tool_call.py new file mode 100644 index 0000000..a79c085 --- /dev/null +++ b/tests/unit/test_tool_call.py @@ -0,0 +1,124 @@ +"""Unit tests for the tool-call instrumentation scope (proposal 0063). + +Exercise ``with_tool_call`` directly by installing a collecting dispatch +callback + a node-scope identity into the correlation ContextVars, the +same mechanism the engine sets up per invocation. +""" + +from __future__ import annotations + +from collections.abc import Iterator +from contextlib import contextmanager +from typing import Any + +import pytest + +from openarmature.graph.events import ToolCallEvent, ToolCallFailedEvent +from openarmature.observability import with_tool_call +from openarmature.observability.correlation import ( + _reset_active_dispatch, + _reset_invocation_id, + _reset_namespace_prefix, + _set_active_dispatch, + _set_invocation_id, + _set_namespace_prefix, +) + + +@contextmanager +def _scope(namespace: tuple[str, ...] = ("run_tool",)) -> Iterator[list[Any]]: + """Install a collecting dispatch + invocation/namespace identity; + yield the captured-events list.""" + events: list[Any] = [] + d_tok = _set_active_dispatch(lambda e: events.append(e)) + i_tok = _set_invocation_id("inv-1") + n_tok = _set_namespace_prefix(namespace) + try: + yield events + finally: + _reset_namespace_prefix(n_tok) + _reset_invocation_id(i_tok) + _reset_active_dispatch(d_tok) + + +async def test_with_tool_call_dispatches_tool_call_event_on_success() -> None: + async def _tool() -> dict[str, int]: + return {"temperature_c": 20} + + with _scope() as events: + with with_tool_call("get_weather", {"city": "Paris"}, tool_call_id="call_abc") as scope: + scope.set_result(await _tool()) + + tool_events = [e for e in events if isinstance(e, ToolCallEvent)] + assert len(tool_events) == 1 + ev = tool_events[0] + assert ev.tool_name == "get_weather" + assert ev.tool_call_id == "call_abc" + assert ev.arguments == {"city": "Paris"} + assert ev.result == {"temperature_c": 20} + assert ev.node_name == "run_tool" + assert ev.namespace == ("run_tool",) + assert ev.attempt_index == 0 + assert ev.fan_out_index is None + assert ev.branch_name is None + assert ev.invocation_id == "inv-1" + assert ev.latency_ms is not None + assert ev.call_id # minted per execution + assert [e for e in events if isinstance(e, ToolCallFailedEvent)] == [] + + +async def test_with_tool_call_dispatches_failed_event_and_reraises() -> None: + with _scope() as events: + # The scope observes; it does NOT swallow — the exception + # propagates out of the `with` to the caller. + with pytest.raises(TimeoutError): + with with_tool_call("get_weather", {"city": "Paris"}, tool_call_id="call_def"): + raise TimeoutError("tool timed out") + + failed = [e for e in events if isinstance(e, ToolCallFailedEvent)] + assert len(failed) == 1 + ev = failed[0] + assert ev.error_type == "TimeoutError" + assert ev.error_message == "tool timed out" + assert ev.tool_name == "get_weather" + assert ev.tool_call_id == "call_def" + assert ev.arguments == {"city": "Paris"} + assert ev.latency_ms is not None + # The deliberate departure: no error_category on a tool failure. + assert not hasattr(ev, "error_category") + assert [e for e in events if isinstance(e, ToolCallEvent)] == [] + + +async def test_with_tool_call_tool_call_id_null_for_standalone() -> None: + # A node-body utility the caller instruments without an originating + # LLM tool request carries tool_call_id = None. + with _scope() as events: + with with_tool_call("compute_total", {"items": 3}) as scope: + scope.set_result(42) + + ev = next(e for e in events if isinstance(e, ToolCallEvent)) + assert ev.tool_call_id is None + assert ev.result == 42 + + +async def test_with_tool_call_forgotten_result_is_null_not_a_crash() -> None: + # If the caller never reports a result, the event carries None + # (distinct from a tool that returned None — both render null, but + # neither raises). + with _scope() as events: + with with_tool_call("noop", None): + pass + + ev = next(e for e in events if isinstance(e, ToolCallEvent)) + assert ev.result is None + assert ev.arguments is None + + +async def test_with_tool_call_runs_body_without_observers() -> None: + # Outside any invocation (no dispatch installed) the scope still runs + # the body and does not raise; it simply emits no event. + ran = False + with with_tool_call("t", {}) as scope: + ran = True + scope.set_result(1) + assert ran From c9f06b4c8566f8a2a9044bb48b977fb5357b4f16 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Mon, 22 Jun 2026 15:56:49 -0700 Subject: [PATCH 2/3] Pin spec v0.69.0; wire 0063 conformance and docs Advance the spec pin v0.68.0 -> v0.69.0 across the four sync points (submodule, __spec_version__, pyproject, conformance manifest) and the smoke assertion; regenerate the bundled AGENTS.md. Wire conformance fixtures 092-098 through a tool-graph runner (calls_tool / calls_llm / update nodes) dispatching across the typed-event-collector, OTel span-tree, and Langfuse Tool-observation assertion shapes; teach the fixture-parser schema the calls_tool directive and the record state type, and defer-parse 092-095 (the typed-collector shape, like 050-056). Record proposal 0063 implemented, document the with_tool_call scope and the Tool observation, and add the CHANGELOG entry. Also reconcile a stale LangfuseClient method count and add the Tool observation to the Langfuse-mapping overview. --- CHANGELOG.md | 3 +- conformance.toml | 10 +- docs/concepts/observability.md | 48 +++- openarmature-spec | 2 +- pyproject.toml | 2 +- src/openarmature/AGENTS.md | 4 +- src/openarmature/__init__.py | 2 +- tests/conformance/adapter.py | 6 + tests/conformance/harness/directives.py | 27 +++ tests/conformance/test_fixture_parsing.py | 18 ++ tests/conformance/test_observability.py | 274 ++++++++++++++++++++++ tests/test_smoke.py | 2 +- 12 files changed, 388 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 02f881f..2703ee3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,10 +16,11 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The - **Inline-callable parallel branches and conditional `when`** (proposal 0075, pipeline-utilities §11, spec v0.66.0). `ParallelBranchesNode` gains two additive branch forms. A branch may now give its work as `call`, an inline async function over the parent state returning a parent-shaped partial update, instead of a compiled `subgraph` with its own state schema and `inputs` / `outputs` projection; the returned partial is the branch's contribution directly, merged via the parent reducer with no projection. This makes the primitive adoptable for the "M heterogeneous lightweight parallel calls over shared state, each independently failure-isolated" shape (hybrid recall, paired reads) that previously dropped to a hand-rolled gather, while reusing the existing concurrency, fail-fast cancellation, per-branch failure isolation, and reducer fan-in. A branch gives its work as exactly one of `subgraph` / `call`, and a callable branch declares no `inputs` / `outputs`, else a new compile-time `ParallelBranchesInvalidBranchSpec`; a node may mix the two forms freely. A branch (either form) may also carry an optional `when` predicate over the parent state, evaluated once at dispatch: a `False` result skips the branch entirely (no dispatch, contribution, observer events, or span), and an all-skipped node is a valid no-op distinct from the compile-time `ParallelBranchesNoBranches`. A callable branch is the unit of work, so it emits one `started` / `completed` observer pair keyed by `branch_name` (rendered as a single branch span); a skipped branch emits nothing. `ParallelBranchesInvalidBranchSpec` is exported from `openarmature.graph`. Conformance fixtures 073 (two callable branches merge to disjoint fields), 074 (conditional `when` skips / dispatches), and 075 (callable branch failure-isolation degrade) run in `test_pipeline_utilities`. - **Tool-call request observability on LLM spans** (proposal 0076, observability §5.5.1 / §5.5.10 / §5.5.5, spec v0.67.0). The tool calls a model requests in its completion now have an output-side home on the `openarmature.llm.complete` span, closing the gap where they surfaced only incidentally on the next turn's input history. *Which* tools were requested renders by default as three ungated identity projections (the class of `openarmature.llm.model`): `openarmature.llm.output.tool_calls.count`, `.names`, and `.ids`, with `.names` and `.ids` index-aligned in request order and `.count` equal to their length. The full request, arguments included, renders as the payload-gated `openarmature.llm.output.tool_calls`, a JSON `[{id, name, arguments}]` array reusing the input tool-call encoding, surfaced only with `disable_provider_payload=False`. The whole family is emitted only on a tool-calling completion; a completion that requests no tools emits none of it (absence, not `count = 0`). The typed `LlmCompletionEvent` gains an additive `output_tool_calls` field carrying the `ToolCall` records, the source the span attributes render from (in python the OTel span renders from the per-attempt `LlmRetryAttemptEvent`, which carries the field too). This is the request side; the tool-execution complement (a separate `openarmature.tool.call` span) is a later proposal, joined to this one by the `ToolCall.id`. A Langfuse request-side mapping is out of scope. Conformance fixtures 085 (two requested calls surface count / names / ids), 086 (no calls, family absent), and 087 (payload gating: identity survives payload-off while the full serialization is suppressed) run in `test_observability`. - **OTel GenAI metrics** (proposal 0067, observability §11, spec v0.68.0). The OTel observer can now emit the OpenTelemetry metrics signal alongside its spans: two histogram instruments over provider calls, opt in with `enable_metrics=True` (default off, independent of span emission). `openarmature.gen_ai.client.token.usage` records an LLM completion's input and output token counts (one observation each, tagged `openarmature.gen_ai.token.type`); `openarmature.gen_ai.client.operation.duration` records the call's wall-clock duration, once per attempt under call-level retry, including a failed attempt (which carries `error.type`). Both carry `openarmature.gen_ai.operation` (`"chat"`), `gen_ai.request.model`, and `gen_ai.system`, and use the spec's explicit bucket advisories. The `Meter` comes from the configured `MeterProvider` (injectable via `meter_provider=...`; the OTel global is the no-op fallback when none is set). The instrument names are OA-namespaced, mirroring the upstream `gen_ai.client.*` instruments (at Development status) so a future cutover is a mechanical prefix-strip; metrics target OTel only (no Langfuse mapping). They are a projection of the per-attempt event stream, so they record with spans disabled. `conformance.toml` records proposal 0067 `partial`: the LLM-call metrics (fixtures 088 / 090 / 091) are implemented, and the embedding-call metrics (fixture 089) are deferred until the embedding capability (proposal 0059) lands. The LLM fixtures run in `test_observability` via an in-memory `MetricReader` capture (the conformance-adapter §6.9 primitive). +- **Tool-execution observability** (proposal 0063, graph-engine §6 + observability §5.5 / §8.4, spec v0.69.0). A model requests tools in its completion (the request side, proposal 0076); the caller executes them in node-body code, and that execution is now observable. `with_tool_call(tool_name, arguments, tool_call_id=...)` is a node-body instrumentation scope (a context manager, like `with_active_prompt`, exported from `openarmature.observability`): you run the tool inside it and report the outcome with `scope.set_result(...)`. OpenArmature observes the execution and emits a typed `ToolCallEvent` on success or a `ToolCallFailedEvent` (carrying `error_type` / `error_message`, deliberately with no `error_category`) on a raise, then re-raises (it observes, it does not run, select, loop, or swallow). Both events carry the identity / scoping baseline plus `tool_name`, `tool_call_id` (the link back to the requesting `LlmCompletionEvent.output_tool_calls` entry, or `None` for a standalone instrumented function), `arguments`, `latency_ms`, and `call_id`; `ToolCallEvent` adds `result`. The OTel observer renders an `openarmature.tool.call` span parented under the calling node, with OA-namespace `openarmature.tool.{name,call.id,call.arguments,call.result}` attributes and the standard `error.type` on failure; the Development `gen_ai.tool.*` / `execute_tool` surface is mirrored, not emitted in v1. The Langfuse observer renders a dedicated `Tool` observation (`asType="tool"`, not a `Generation`) under the node's Span observation, with the arguments / result as input / output and the tool name / call id in metadata, ERROR level on failure. Arguments and result are payload, gated by `disable_provider_payload` (no new flag); `disable_llm_spans` does not gate the tool span. Conformance fixtures 092-098 run in `test_observability`. ### Changed -- **Pinned spec advances v0.60.0 → v0.68.0** across the v0.15.0 cycle: v0.61.0 (proposal 0061, the detached-trace invocation span above), v0.62.0 (proposal 0064, the Langfuse session/user population above), v0.63.0 (proposal 0072, the prompt cache control above), the v0.63.1 patch (pipeline-utilities coverage fixtures 070/071 for the already-implemented 0069 / 0070 behavior, no new proposal), and v0.64.0 (proposal 0073, GenAI semconv adoption reconciliation: OA retains `gen_ai.system` despite the upstream rename to `gen_ai.provider.name`; textual-only, with no emitted-attribute or fixture change, so the existing `gen_ai.*` fixtures stand as the retention regression), v0.65.0 (proposal 0074, the failure-isolation `catch` gate above), v0.66.0 (proposal 0075, the inline-callable parallel branches and conditional `when` above), the v0.66.1 patch (an observability §8 call-level-retry Langfuse-mapping clarification reconciling §8 with the per-attempt §5.5 spans: one terminal Generation per `complete()` call, not one per attempt, which the Langfuse observer already renders by driving the Generation from the terminal `LlmCompletionEvent` / `LlmFailedEvent` and skipping the per-attempt `LlmRetryAttemptEvent`; no behavior or fixture change), v0.67.0 (proposal 0076, the tool-call request observability above), and v0.68.0 (proposal 0067, the OTel GenAI metrics above). `conformance.toml` records 0061 / 0072 / 0074 / 0075 / 0076 `implemented`, 0064 `partial` (its `sessionId` half is dormant pending the sessions capability) and 0067 `partial` (its embedding-call metrics await the embedding capability), and 0073 `textual-only`. Proposal 0050 needed no pin bump of its own (it was already within the pin from its v0.42.0 acceptance); its v0.14.0 `partial` entry flips to `implemented` with the per-attempt span surface above. +- **Pinned spec advances v0.60.0 → v0.69.0** across the v0.15.0 cycle: v0.61.0 (proposal 0061, the detached-trace invocation span above), v0.62.0 (proposal 0064, the Langfuse session/user population above), v0.63.0 (proposal 0072, the prompt cache control above), the v0.63.1 patch (pipeline-utilities coverage fixtures 070/071 for the already-implemented 0069 / 0070 behavior, no new proposal), and v0.64.0 (proposal 0073, GenAI semconv adoption reconciliation: OA retains `gen_ai.system` despite the upstream rename to `gen_ai.provider.name`; textual-only, with no emitted-attribute or fixture change, so the existing `gen_ai.*` fixtures stand as the retention regression), v0.65.0 (proposal 0074, the failure-isolation `catch` gate above), v0.66.0 (proposal 0075, the inline-callable parallel branches and conditional `when` above), the v0.66.1 patch (an observability §8 call-level-retry Langfuse-mapping clarification reconciling §8 with the per-attempt §5.5 spans: one terminal Generation per `complete()` call, not one per attempt, which the Langfuse observer already renders by driving the Generation from the terminal `LlmCompletionEvent` / `LlmFailedEvent` and skipping the per-attempt `LlmRetryAttemptEvent`; no behavior or fixture change), v0.67.0 (proposal 0076, the tool-call request observability above), v0.68.0 (proposal 0067, the OTel GenAI metrics above), and v0.69.0 (proposal 0063, the tool-execution observability above). `conformance.toml` records 0061 / 0072 / 0074 / 0075 / 0076 / 0063 `implemented`, 0064 `partial` (its `sessionId` half is dormant pending the sessions capability) and 0067 `partial` (its embedding-call metrics await the embedding capability), and 0073 `textual-only`. Proposal 0050 needed no pin bump of its own (it was already within the pin from its v0.42.0 acceptance); its v0.14.0 `partial` entry flips to `implemented` with the per-attempt span surface above. ## [0.14.0] — 2026-06-17 diff --git a/conformance.toml b/conformance.toml index 38cc8f2..4cd6bac 100644 --- a/conformance.toml +++ b/conformance.toml @@ -32,7 +32,7 @@ [manifest] implementation = "openarmature-python" -spec_pin = "v0.68.0" +spec_pin = "v0.69.0" # Status values: # implemented — shipped behavior matches the proposal's contract @@ -706,6 +706,14 @@ status = "implemented" since = "0.15.0" note = "The OTel observer synthesizes an openarmature.invocation span at the root of each detached trace (a detached subgraph + each detached fan-out instance), carrying the parent's SHARED invocation_id (detached mode is observer-side trace rendering, not a new run) and the detached unit's own entry_node; the detached subgraph / instance span nests under it. A raising detached subgraph surfaces ERROR + the category + an OTel exception event on BOTH the parent dispatch span and the detached invocation span. Observer-side only -- no graph-engine change; the Langfuse observer is unchanged (its Trace entity already plays the invocation-level-container role). Fixtures 008 (rewritten) and 058 (newly wired) run in test_observability." +# Spec v0.69.0 (proposal 0063). Tool-execution observability (graph-engine +# §6 instrumentation scope + two typed events; observability §5.5.11 OTel tool +# span + §8.4.6 Langfuse Tool observation). +[proposals."0063"] +status = "implemented" +since = "0.15.0" +note = "A node-body tool-call instrumentation scope (with_tool_call, a sync context manager modelled on with_active_prompt) the caller wraps a tool execution in; OA observes (does NOT run / select / loop / feed back). On result it dispatches ToolCallEvent; on raise it dispatches ToolCallFailedEvent and RE-RAISES (observe, don't swallow). The two typed §6 events carry identity/scoping + tool_name / tool_call_id (links back to LlmCompletionEvent.output_tool_calls, null for a standalone instrumented function) / arguments / latency_ms / call_id; ToolCallEvent adds result, ToolCallFailedEvent adds error_type + error_message and deliberately NO error_category (tool code has no closed §7 taxonomy). OTel: an openarmature.tool.call span (note .call, not .complete) parented under the calling node, OA-namespace openarmature.tool.{name,call.id,call.arguments,call.result} attrs + standard error.type on failure (ERROR status + exception event); the Development gen_ai.tool.* / execute_tool surface is mirrored, NOT emitted in v1. Langfuse: the dedicated Tool observation (asType=tool) -- python's first non-Span/Generation observation type -- input=arguments, output=result, tool_name/tool_call_id in metadata, ERROR level + error fields on failure. arguments/result are payload, gated by disable_provider_payload (no new flag); disable_llm_spans does not gate the tool span. v1 ships the inline bracketing form; the deferred start/complete split is a spec MAY, not yet needed. Fixtures 092-098 run in test_observability (092-095 typed-event-collector, 096/097 OTel span_tree, 098 Langfuse Tool observation)." + # Spec v0.62.0 (proposal 0064). Langfuse trace.sessionId / trace.userId # population (observability §8.4.1 / §8.10). [proposals."0064"] diff --git a/docs/concepts/observability.md b/docs/concepts/observability.md index b5521eb..58ebfd2 100644 --- a/docs/concepts/observability.md +++ b/docs/concepts/observability.md @@ -871,6 +871,49 @@ The instrument names are OA-namespaced, mirroring the upstream cutover is a mechanical prefix-strip. Metrics target OTel only; there is no Langfuse mapping. +### Tool-execution observability (`with_tool_call`) + +A model requests tools in its completion (the `output_tool_calls` above); +the *caller* executes them in node-body code. OpenArmature does not run, +choose, loop, or feed back tools (that orchestration stays in your graph), +but it can observe a tool execution you wrap in the `with_tool_call` +instrumentation scope: + +```python +from openarmature.observability import with_tool_call + +async def run_tools(state: AgentState) -> dict: + with with_tool_call("get_weather", {"city": "Paris"}, tool_call_id="call_abc") as scope: + result = await get_weather(city="Paris") + scope.set_result(result) + return {"weather": result} +``` + +`with_tool_call` is a context manager (like `with_active_prompt`): you run +the tool inside it and report the outcome with `scope.set_result(...)`. On a +clean exit it dispatches a `ToolCallEvent`; if the tool raises, it dispatches +a `ToolCallFailedEvent` and re-raises (it observes, it does not swallow, so +your node body still sees the exception). `tool_call_id` links the execution +back to the `output_tool_calls` entry that requested it, or is omitted for a +standalone instrumented function. + +The events render on both backends: + +- OTel: an `openarmature.tool.call` span parented under the calling node, + carrying `openarmature.tool.name`, `openarmature.tool.call.id`, and (when + payload is on) `openarmature.tool.call.arguments` / `.result`. A failure + sets ERROR status with the standard `error.type` attribute. +- Langfuse: a dedicated `Tool` observation (not a Generation) under the + node's Span observation, with the arguments / result as input / output and + the tool name / call id in metadata; a failure renders at ERROR level. + +The arguments and result are payload, gated by `disable_provider_payload` +exactly like the LLM payload attributes (default off keeps tool inputs and +outputs out of traces). `disable_llm_spans` does not affect tool spans. The +`openarmature.tool.*` attribute names mirror the upstream Development +`gen_ai.tool.*` surface, which OpenArmature does not emit in v1, so a future +cutover is a prefix swap. + ### Identifying the service: `Resource` Pass an `opentelemetry.sdk.resources.Resource` to set @@ -1044,7 +1087,8 @@ appear dropped. Two workarounds: A second sibling observer maps the same `NodeEvent` stream onto Langfuse's native Trace + Observation data model: Traces at the top, Span observations for graph nodes, Generation observations for -LLM calls. Use it instead of (or alongside) the OTel observer when +LLM calls, and Tool observations for instrumented tool executions. +Use it instead of (or alongside) the OTel observer when your trace UI is Langfuse and you want first-class Generation rendering without going through Langfuse's OTLP ingest. @@ -1106,7 +1150,7 @@ for a runnable demo. Earlier SDK versions (v2.x, v3.x) are NOT supported. Projects on those versions either upgrade to v4 or supply their own adapter - matching the `LangfuseClient` Protocol's four methods. + matching the `LangfuseClient` Protocol. A runtime `isinstance(adapter, LangfuseClient)` check ships in the unit suite, so if a future v4 patch breaks the Protocol's diff --git a/openarmature-spec b/openarmature-spec index b5804d6..6fdabcc 160000 --- a/openarmature-spec +++ b/openarmature-spec @@ -1 +1 @@ -Subproject commit b5804d6cdf36f9ba42e920de34e1347e0075bcd5 +Subproject commit 6fdabcc618772322ec57edc07d56388f53c6bab0 diff --git a/pyproject.toml b/pyproject.toml index a264151..aca0e3d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -63,7 +63,7 @@ Specification = "https://github.com/LunarCommand/openarmature-spec" openarmature = "openarmature.cli:main" [tool.openarmature] -spec_version = "0.68.0" +spec_version = "0.69.0" [dependency-groups] dev = [ diff --git a/src/openarmature/AGENTS.md b/src/openarmature/AGENTS.md index c14ed17..071f950 100644 --- a/src/openarmature/AGENTS.md +++ b/src/openarmature/AGENTS.md @@ -1,6 +1,6 @@ # OpenArmature — Agent documentation -*This is the agent guide bundled with the openarmature Python package, version 0.14.0 (spec v0.68.0). For the full docs site see [openarmature.ai](https://openarmature.ai). For the canonical spec text see [openarmature.org/capabilities](https://openarmature.org/capabilities/). For project-specific conventions for the code you're editing, see the host project's `AGENTS.md` or `CLAUDE.md`.* +*This is the agent guide bundled with the openarmature Python package, version 0.14.0 (spec v0.69.0). For the full docs site see [openarmature.ai](https://openarmature.ai). For the canonical spec text see [openarmature.org/capabilities](https://openarmature.org/capabilities/). For project-specific conventions for the code you're editing, see the host project's `AGENTS.md` or `CLAUDE.md`.* ## TL;DR @@ -10,7 +10,7 @@ OpenArmature is a workflow framework for LLM pipelines and tool-calling agents: ## Capability contracts -_Sourced from openarmature-spec v0.68.0. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's `spec.md` verbatim — including additions from accepted proposals that this Python implementation may not yet ship. For per-proposal implementation status (implemented / partial / textual-only / not-yet), see the `conformance.toml` manifest at the repo root. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site._ +_Sourced from openarmature-spec v0.69.0. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's `spec.md` verbatim — including additions from accepted proposals that this Python implementation may not yet ship. For per-proposal implementation status (implemented / partial / textual-only / not-yet), see the `conformance.toml` manifest at the repo root. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site._ ### Capability: `graph-engine` diff --git a/src/openarmature/__init__.py b/src/openarmature/__init__.py index 7188d32..c185f18 100644 --- a/src/openarmature/__init__.py +++ b/src/openarmature/__init__.py @@ -25,7 +25,7 @@ """ __version__ = "0.14.0" -__spec_version__ = "0.68.0" +__spec_version__ = "0.69.0" # Proposal 0052 (spec observability §5.1 / §8.4.1): canonical # package-registry name for this implementation. Surfaces on every # OTel invocation span as ``openarmature.implementation.name`` and on diff --git a/tests/conformance/adapter.py b/tests/conformance/adapter.py index ee35fa2..c98c7c3 100644 --- a/tests/conformance/adapter.py +++ b/tests/conformance/adapter.py @@ -69,6 +69,12 @@ def _parse_type(s: str) -> Any: # collect_field gets a null entry, so the element type must permit None. if s == "any": return Any + # proposal-0063 tool fixtures (092-098): ``record`` is the state slot + # a tool result is stored in — an opaque, language-idiomatic value + # (often a mapping) with a ``null`` default. ``Any`` admits both the + # null default and whatever shape the tool produced. + if s == "record": + return Any # Unparameterized container types — parallel-branches fixtures # 034/035/037 use ``dict`` and ``list`` as state-field types # for accumulator slots (branch_errors, merged_dict, collected_labels) diff --git a/tests/conformance/harness/directives.py b/tests/conformance/harness/directives.py index 1d7d82b..49180bc 100644 --- a/tests/conformance/harness/directives.py +++ b/tests/conformance/harness/directives.py @@ -375,6 +375,31 @@ class CallsLlmSpec(_AllowExtras): config: RuntimeConfigSpec | None = None +class MockToolSpec(_AllowExtras): + """The mock tool a ``calls_tool`` node runs inside the + instrumentation scope: ``returns`` (a result -> ToolCallEvent) XOR + ``raises`` (``{error_type, message}`` -> ToolCallFailedEvent + + re-raise).""" + + returns: Any = None + raises: dict[str, Any] | None = None + + +class CallsToolSpec(_AllowExtras): + """Tool-using node (proposal 0063): the node body enters the + ``with_tool_call`` instrumentation scope, runs the ``mock_tool``, and + OA emits the ToolCallEvent / ToolCallFailedEvent. ``tool_call_id`` is + optional (null = a standalone instrumented function); + ``stores_result_in`` is optional (absent on the failure fixtures + where the node lets the exception propagate).""" + + tool_name: str + mock_tool: MockToolSpec + arguments: dict[str, Any] | None = None + tool_call_id: str | None = None + stores_result_in: str | None = None + + class CallsEmbedSpec(_AllowExtras): """Embedding-using node: sends ``input`` to a mock embedding provider and stores the result in ``stores_response_in``. Used by the @@ -447,6 +472,7 @@ class NodeSpec(_ForbidExtras): flaky_resume_aware: FlakyResumeAwareSpec | None = None calls_llm: CallsLlmSpec | None = None calls_embed: CallsEmbedSpec | None = None + calls_tool: CallsToolSpec | None = None # Companions — additive. inputs: dict[str, str] | None = None @@ -480,6 +506,7 @@ class NodeSpec(_ForbidExtras): "flaky_resume_aware", "calls_llm", "calls_embed", + "calls_tool", ) @model_validator(mode="after") diff --git a/tests/conformance/test_fixture_parsing.py b/tests/conformance/test_fixture_parsing.py index a4a964c..6f6382e 100644 --- a/tests/conformance/test_fixture_parsing.py +++ b/tests/conformance/test_fixture_parsing.py @@ -334,6 +334,24 @@ def _id(case: tuple[str, Path]) -> str: "observability/056-llm-completion-event-strict-serial-ordering": ( "Proposal 0049 typed LLM completion event; queued for v0.13.0" ), + # Proposal 0063 (tool-execution observability, v0.69.0) — the + # typed-collector fixtures share the ``expected.observers`` shape of + # 050-056 (which the harness schema models loosely, hence the same + # parser-deferral); they still RUN via test_observability. The + # OTel-span (096/097) and Langfuse (098) tool fixtures parse fine and + # are not deferred. + "observability/092-tool-call-event-dispatch": ( + "Proposal 0063 typed-event-collector shape; runs in test_observability" + ), + "observability/093-tool-call-failed-event-dispatch": ( + "Proposal 0063 typed-event-collector shape; runs in test_observability" + ), + "observability/094-tool-call-event-mutual-exclusion": ( + "Proposal 0063 typed-event-collector shape; runs in test_observability" + ), + "observability/095-tool-call-id-links-to-llm-request": ( + "Proposal 0063 typed-event-collector shape; runs in test_observability" + ), # Proposal 0057 (LlmCompletionEvent field-set extension, v0.51.0) # — fixtures 060-068 share the same ``typed_observers`` directive # shape as 050-056 and inherit the same parser-deferral status diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index a454241..2dfbc94 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -174,6 +174,17 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: "088-llm-metrics-token-and-duration", "090-metrics-error-type-on-duration", "091-metrics-disabled-no-measurements", + # v0.69.0 — proposal 0063 (tool-execution observability). A + # calls_tool node enters the with_tool_call scope; the typed + # ToolCallEvent / ToolCallFailedEvent drive the OTel tool span + + # the Langfuse Tool observation. + "092-tool-call-event-dispatch", + "093-tool-call-failed-event-dispatch", + "094-tool-call-event-mutual-exclusion", + "095-tool-call-id-links-to-llm-request", + "096-tool-call-payload-gating", + "097-otel-tool-span-attributes", + "098-langfuse-tool-observation", } ) @@ -302,6 +313,16 @@ async def test_observability_fixture(fixture_path: Path) -> None: "091-metrics-disabled-no-measurements", }: await _run_metrics_fixture(spec) + elif fixture_id in { + "092-tool-call-event-dispatch", + "093-tool-call-failed-event-dispatch", + "094-tool-call-event-mutual-exclusion", + "095-tool-call-id-links-to-llm-request", + "096-tool-call-payload-gating", + "097-otel-tool-span-attributes", + "098-langfuse-tool-observation", + }: + await _run_tool_fixture(spec) else: raise AssertionError(f"no driver for supported fixture {fixture_id!r}") @@ -3120,6 +3141,248 @@ def _assert_metric_points( ) +# --------------------------------------------------------------------------- +# Proposal 0063 — tool-execution observability fixtures (092-098) +# --------------------------------------------------------------------------- +# +# A calls_tool node enters the with_tool_call instrumentation scope around a +# mock tool (returns -> ToolCallEvent; raises -> ToolCallFailedEvent + +# re-raise). One graph builder serves all three assertion shapes: +# typed-event-collector (092-095), OTel span_tree (096/097), and the Langfuse +# Tool observation tree (098). Dispatch is by which expected.* key appears. + + +def _make_tool_node_body(spec: Mapping[str, Any]) -> Any: + from openarmature.observability import with_tool_call + + tool_name = cast("str", spec["tool_name"]) + arguments = cast("dict[str, Any] | None", spec.get("arguments")) + tool_call_id = cast("str | None", spec.get("tool_call_id")) + stores_in = cast("str | None", spec.get("stores_result_in")) + mock = cast("dict[str, Any]", spec["mock_tool"]) + + async def body(_state: Any) -> Mapping[str, Any]: + with with_tool_call(tool_name, arguments, tool_call_id=tool_call_id) as scope: + raises = cast("dict[str, Any] | None", mock.get("raises")) + if raises is not None: + # Synthesize an exception whose class name == error_type + # so the event captures the fixture's error_type verbatim. + exc_type = cast("str", raises.get("error_type", "ToolError")) + exc_cls = type(exc_type, (Exception,), {}) + raise exc_cls(cast("str", raises.get("message", ""))) + result = mock.get("returns") + scope.set_result(result) + return {stores_in: result} if stores_in else {} + + return body + + +def _make_tool_fixture_llm_body(spec: Mapping[str, Any], provider: Any) -> Any: + from openarmature.llm import UserMessage + + stores_in = cast("str", spec.get("stores_response_in", "msg")) + messages = [ + UserMessage(content=cast("str", m["content"])) + for m in cast("list[dict[str, Any]]", spec.get("messages", [])) + if m.get("role") == "user" + ] + + async def body(_state: Any) -> Mapping[str, str]: + response = await provider.complete(messages) + return {stores_in: response.message.content or ""} + + return body + + +def _build_tool_graph(case: Mapping[str, Any]) -> tuple[Any, type[Any], list[Any]]: + """Build a graph whose nodes are calls_tool / calls_llm / update. + Returns (compiled_graph, state_cls, providers-to-close).""" + import json + + import httpx + + from openarmature.graph import END, GraphBuilder + from openarmature.llm import OpenAIProvider + + from .adapter import build_state_cls + + state_cls = build_state_cls( + "ToolFixtureState", cast("dict[str, dict[str, Any]]", case["state"]["fields"]) + ) + builder = GraphBuilder(state_cls) + providers: list[Any] = [] + mock_responses = list(cast("list[dict[str, Any]]", case.get("mock_llm") or [])) + + def _handler(_request: httpx.Request) -> httpx.Response: + if not mock_responses: + raise AssertionError("mock_llm queue exhausted") + spec_resp = mock_responses.pop(0) + body = cast("dict[str, Any]", spec_resp.get("body") or {}) + return httpx.Response( + int(spec_resp.get("status", 200)), + content=json.dumps(body).encode("utf-8"), + headers={"Content-Type": "application/json"}, + ) + + nodes = cast("dict[str, Any]", case["nodes"]) + for node_name, node_spec in nodes.items(): + nd = cast("dict[str, Any]", node_spec) + if "calls_tool" in nd: + builder.add_node(node_name, _make_tool_node_body(cast("dict[str, Any]", nd["calls_tool"]))) + elif "calls_llm" in nd: + provider = OpenAIProvider( + base_url="http://mock-llm.test", + model="test-model", + api_key="test", + transport=httpx.MockTransport(_handler), + ) + providers.append(provider) + builder.add_node( + node_name, _make_tool_fixture_llm_body(cast("dict[str, Any]", nd["calls_llm"]), provider) + ) + elif "update" in nd: + update_block = cast("dict[str, Any]", nd["update"]) + + async def _update_body(_s: Any, _payload: dict[str, Any] = update_block) -> dict[str, Any]: + return dict(_payload) + + builder.add_node(node_name, _update_body) + else: + raise AssertionError(f"tool fixture node {node_name!r} has no recognized directive") + + for edge in cast("list[dict[str, str]]", case["edges"]): + target = END if edge["to"] == "END" else edge["to"] + builder.add_edge(edge["from"], target) + builder.set_entry(cast("str", case["entry"])) + return builder.compile(), state_cls, providers + + +def _assert_langfuse_observation_tree( + trace: Any, expected: list[dict[str, Any]], parent_id: str | None = None +) -> None: + """Recursively match expected observations against the trace's flat + observation list (linked by parent_observation_id). type + name are + matched exactly; level / input / output exactly when present; + metadata is subset-matched.""" + actual_children = trace.children_of(parent_id) + for exp in expected: + exp_type = cast("str", exp["type"]) + exp_name = cast("str | None", exp.get("name")) + match = next( + (o for o in actual_children if o.type == exp_type and (exp_name is None or o.name == exp_name)), + None, + ) + assert match is not None, ( + f"no {exp_type!r} observation named {exp_name!r} under parent {parent_id!r}; " + f"got {[(o.type, o.name) for o in actual_children]}" + ) + if "level" in exp: + assert match.level == exp["level"], f"{exp_name!r}: level {match.level!r} != {exp['level']!r}" + if "input" in exp: + assert match.input == exp["input"], f"{exp_name!r}: input {match.input!r} != {exp['input']!r}" + if "output" in exp: + assert match.output == exp["output"], ( + f"{exp_name!r}: output {match.output!r} != {exp['output']!r}" + ) + for key, val in cast("dict[str, Any]", exp.get("metadata") or {}).items(): + assert match.metadata.get(key) == val, ( + f"{exp_name!r}: metadata.{key} {match.metadata.get(key)!r} != {val!r}" + ) + children = cast("list[dict[str, Any]] | None", exp.get("children")) + if children: + _assert_langfuse_observation_tree(trace, children, parent_id=match.id) + + +async def _run_tool_fixture(spec: Mapping[str, Any]) -> None: + cases = cast("list[dict[str, Any]]", spec["cases"]) + for case in cases: + try: + await _run_tool_case(case) + except AssertionError as e: + raise AssertionError(f"case {case.get('name')!r}: {e}") from e + + +async def _run_tool_case(case: Mapping[str, Any]) -> None: + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, + ) + + from openarmature.graph import NodeException + from openarmature.observability.langfuse import InMemoryLangfuseClient, LangfuseObserver + + from .harness.llm_attribute_assertions import ( + assert_attribute_does_not_contain, + assert_attribute_parses_as_messages, + assert_attribute_parses_as_object, + assert_attribute_truncation, + assert_attributes_absent, + ) + + expected = cast("dict[str, Any]", case["expected"]) + expected_error = cast("dict[str, Any] | None", case.get("expected_error")) + graph, state_cls, providers = _build_tool_graph(case) + state = _make_state_instance(case, state_cls) + + collectors: dict[str, _TypedEventCollector] = {} + exporter: Any = None + otel_observer: Any = None + langfuse_client: Any = None + if "observers" in expected: + collectors, _ = _parse_typed_observers(case) + for collector in collectors.values(): + graph.attach_observer(collector) + if "span_tree" in expected: + exporter = InMemorySpanExporter() + otel_kwargs: dict[str, Any] = {"span_processor": SimpleSpanProcessor(exporter)} + if "disable_provider_payload" in case: + otel_kwargs["disable_provider_payload"] = bool(case["disable_provider_payload"]) + otel_observer = OTelObserver(**otel_kwargs) + graph.attach_observer(otel_observer) + if "langfuse_trace" in expected: + langfuse_client = InMemoryLangfuseClient() + lf_kwargs: dict[str, Any] = {"client": langfuse_client} + if "disable_provider_payload" in case: + lf_kwargs["disable_provider_payload"] = bool(case["disable_provider_payload"]) + graph.attach_observer(LangfuseObserver(**lf_kwargs)) + + try: + if expected_error is not None: + with pytest.raises(NodeException): + await graph.invoke(state) + else: + await graph.invoke(state) + await graph.drain() + finally: + for provider in providers: + await provider.aclose() + if otel_observer is not None: + otel_observer.shutdown() + + if "observers" in expected: + for obs_name, obs_spec in cast("dict[str, Any]", expected["observers"]).items(): + _assert_observer_expectations(obs_name, collectors[obs_name], cast("dict[str, Any]", obs_spec)) + if "span_tree" in expected and exporter is not None: + _check_payload_span_tree( + exporter.get_finished_spans(), + cast("list[dict[str, Any]]", expected["span_tree"]), + full_input_serialization=None, + assert_attributes_absent=assert_attributes_absent, + assert_attribute_parses_as_messages=assert_attribute_parses_as_messages, + assert_attribute_parses_as_object=assert_attribute_parses_as_object, + assert_attribute_does_not_contain=assert_attribute_does_not_contain, + assert_attribute_truncation=assert_attribute_truncation, + ) + if "langfuse_trace" in expected and langfuse_client is not None: + assert len(langfuse_client.traces) == 1, ( + f"expected 1 Langfuse trace; got {sorted(langfuse_client.traces)}" + ) + trace = next(iter(langfuse_client.traces.values())) + _assert_langfuse_observation_tree( + trace, cast("list[dict[str, Any]]", expected["langfuse_trace"].get("observations") or []) + ) + + # --------------------------------------------------------------------------- # Proposal 0049 — typed LlmCompletionEvent fixtures (050-056) # --------------------------------------------------------------------------- @@ -3348,6 +3611,9 @@ async def _invoke_typed_fixture( "contains_exactly_one_event_of_type", "contains_event_of_type", "contains_exactly_n_events_of_type", + # proposal 0063 (092-094) spelling for an exact-count assertion, + # same shape as contains_exactly_n_events_of_type. + "event_count", "does_not_contain_event_of_type", "captured_event_field_values_cover", "every_captured_event_has", @@ -3397,6 +3663,14 @@ def _assert_observer_expectations( assert len(matching) == expected_count, ( f"observer {name!r}: expected exactly {expected_count} {type_name} events; got {len(matching)}" ) + if "event_count" in spec: + sub = cast("dict[str, Any]", spec["event_count"]) + type_name = cast("str", sub["event_type"]) + expected_count = int(cast("int", sub["count"])) + matching = [e for e in events if type(e).__name__ == type_name] + assert len(matching) == expected_count, ( + f"observer {name!r}: expected exactly {expected_count} {type_name} events; got {len(matching)}" + ) if "does_not_contain_event_of_type" in spec: type_name = cast("str", spec["does_not_contain_event_of_type"]) matching = [e for e in events if type(e).__name__ == type_name] diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 211ce50..389013e 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -9,7 +9,7 @@ def test_package_versions() -> None: assert openarmature.__version__ == "0.14.0" - assert openarmature.__spec_version__ == "0.68.0" + assert openarmature.__spec_version__ == "0.69.0" def test_spec_version_matches_pyproject() -> None: From f3dba0798b1988f36737ac9c0fdf188834b8d8f3 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Mon, 22 Jun 2026 16:43:48 -0700 Subject: [PATCH 3/3] Address review on tool-execution observability - with_tool_call: drop the result sentinel. The scope result defaults to None and the event carries it directly; a forgotten set_result and a tool that returns None both emit a null result, which is correct, so the sentinel produced no distinguishable output. - observability docs: name tool_call_id explicitly in the Langfuse Tool metadata sentence (the feature has both tool_call_id and call_id). - conformance: _assert_langfuse_observation_tree consumes each matched observation, so two same-shape expected siblings can't both bind to one actual observation. --- docs/concepts/observability.md | 3 ++- src/openarmature/observability/tool_call.py | 18 +++++------------- tests/conformance/test_observability.py | 9 ++++++--- 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/docs/concepts/observability.md b/docs/concepts/observability.md index 58ebfd2..3065cba 100644 --- a/docs/concepts/observability.md +++ b/docs/concepts/observability.md @@ -905,7 +905,8 @@ The events render on both backends: sets ERROR status with the standard `error.type` attribute. - Langfuse: a dedicated `Tool` observation (not a Generation) under the node's Span observation, with the arguments / result as input / output and - the tool name / call id in metadata; a failure renders at ERROR level. + the tool name and `tool_call_id` in metadata; a failure renders at ERROR + level. The arguments and result are payload, gated by `disable_provider_payload` exactly like the LLM payload attributes (default off keeps tool inputs and diff --git a/src/openarmature/observability/tool_call.py b/src/openarmature/observability/tool_call.py index b570953..d3d8f69 100644 --- a/src/openarmature/observability/tool_call.py +++ b/src/openarmature/observability/tool_call.py @@ -31,16 +31,6 @@ from typing import Any -# Sentinel distinguishing "the caller never reported a result" from "the -# tool returned None" -- a forgotten set_result() resolves to a null -# result rather than masquerading as a real one. -class _Unset: - pass - - -_UNSET = _Unset() - - class ToolCallScope: """Handle yielded by :func:`with_tool_call`. @@ -54,7 +44,10 @@ class ToolCallScope: def __init__(self, call_id: str) -> None: self.call_id = call_id - self._result: Any = _UNSET + # Defaults to None: a forgotten set_result() and a tool that + # genuinely returns None both emit a null result, which is + # correct (a tool may legitimately return None). + self._result: Any = None def set_result(self, value: Any) -> None: """Report the tool's return value to the scope.""" @@ -149,7 +142,6 @@ def with_tool_call( raise latency_ms = (time.perf_counter() - start) * 1000.0 if dispatch is not None: - result = None if isinstance(scope._result, _Unset) else scope._result dispatch( ToolCallEvent( invocation_id=invocation_id, @@ -163,7 +155,7 @@ def with_tool_call( tool_name=tool_name, tool_call_id=tool_call_id, arguments=arguments, - result=result, + result=scope._result, latency_ms=latency_ms, caller_invocation_metadata=caller_metadata, ) diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index 2dfbc94..24aba6d 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -3264,18 +3264,21 @@ def _assert_langfuse_observation_tree( observation list (linked by parent_observation_id). type + name are matched exactly; level / input / output exactly when present; metadata is subset-matched.""" - actual_children = trace.children_of(parent_id) + # Mutable copy: each matched observation is consumed so two + # same-shape expected siblings can't both bind to one actual. + remaining = list(trace.children_of(parent_id)) for exp in expected: exp_type = cast("str", exp["type"]) exp_name = cast("str | None", exp.get("name")) match = next( - (o for o in actual_children if o.type == exp_type and (exp_name is None or o.name == exp_name)), + (o for o in remaining if o.type == exp_type and (exp_name is None or o.name == exp_name)), None, ) assert match is not None, ( f"no {exp_type!r} observation named {exp_name!r} under parent {parent_id!r}; " - f"got {[(o.type, o.name) for o in actual_children]}" + f"got {[(o.type, o.name) for o in remaining]}" ) + remaining.remove(match) if "level" in exp: assert match.level == exp["level"], f"{exp_name!r}: level {match.level!r} != {exp['level']!r}" if "input" in exp: