diff --git a/CHANGELOG.md b/CHANGELOG.md
index 37476ef..c7afba2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -13,10 +13,11 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The
- **Langfuse `trace.userId` / `trace.sessionId` population** (proposal 0064, observability §8.4.1, spec v0.62.0). The Langfuse observer now promotes a recognized `userId` key in the caller-supplied invocation metadata to Langfuse's first-class `trace.userId` field (the Users dashboard), additively: the key also remains at `trace.metadata.userId`. Promotion is automatic and unconditional; an absent key leaves `trace.userId` unset. The `LangfuseClient.trace()` surface (the Protocol, the in-memory client, and the SDK adapter) gains `session_id` / `user_id`. `trace.sessionId` is sourced from `openarmature.session_id`, which the sessions capability (proposal 0020) establishes; that capability is not yet implemented in python, so the `sessionId` plumbing is in place but dormant (no source) and unset in the interim. `conformance.toml` records proposal 0064 `partial` on that basis: fixture 084 cases 2/3/4 (not session-bound, `userId` present additively, `userId` absent) run, and the session-bound cases 1/5 defer until 0020. Langfuse-only: the OTel side already carries `openarmature.session_id` and `openarmature.user.*` as span attributes, and OTel has no trace-level session/user field.
- **Per-fetch prompt cache control: `cache_ttl_seconds`** (proposal 0072, prompt-management §5 / §6, spec v0.63.0). `PromptBackend.fetch`, `PromptManager.fetch`, and `PromptManager.get` gain an optional `cache_ttl_seconds` read-side control: `None` preserves current behavior, `0` forces a fresh read past any client-side cache, and `N > 0` bounds a served entry's staleness to N seconds; a negative value is rejected at the manager. It governs only which cached entry may be served, not whether or how results are cached. The bundled filesystem backend is cacheless and ignores it; the bundled Langfuse backend forwards it to the Langfuse SDK's `get_prompt` cache. Conformance fixtures 033/034 run through a caching harness backend (conformance-adapter §6.8: `source_read_count` plus a controllable `advance_clock`).
- **Failure-isolation `catch` gate + cause-chain classification primitive** (proposal 0074, pipeline-utilities §6.3 / §6.4, spec v0.65.0). `FailureIsolationMiddleware` gains an optional `catch`: a set of error categories. An exception is caught only if the *derived category* of its cause chain (the outermost non-carrier link's category, resolved through the engine's `node_exception` carriers, the same value reported as `caught_exception.category`) is in the set. This closes a degrade-into-crash footgun: at a wrapping placement (subgraph, fan-out instance, branch) the engine wraps the originating failure in a carrier, so a `predicate` inspecting the surface exception sees only the carrier and misses it, whereas `catch` classifies through the carrier. `catch` composes with `predicate` as a conjunction; both default permissive (both unset stays catch-all), and a null derived category never matches a non-empty set. The carrier-skipping walk behind `catch` and `caught_exception` is promoted to a public primitive, `classify_cause_chain(exc) -> CaughtException` (the ordered `chain`, the derived `category`, and its `message` — the same record the event carries), exported from `openarmature.graph` for use in a custom `predicate`, a router, a metric, or a full-chain retry classifier. The default retry classifier stays deliberately single-level (it classifies at re-attempt granularity); this is now documented, with no behavior change. Conformance fixture 072 (catch matches through an instance-placement carrier and degrades; a non-matching catch propagates with no event). The optional native-exception-type `catch` form (spec MAY) is not shipped.
+- **Inline-callable parallel branches and conditional `when`** (proposal 0075, pipeline-utilities §11, spec v0.66.0). `ParallelBranchesNode` gains two additive branch forms. A branch may now give its work as `call`, an inline async function over the parent state returning a parent-shaped partial update, instead of a compiled `subgraph` with its own state schema and `inputs` / `outputs` projection; the returned partial is the branch's contribution directly, merged via the parent reducer with no projection. This makes the primitive adoptable for the "M heterogeneous lightweight parallel calls over shared state, each independently failure-isolated" shape (hybrid recall, paired reads) that previously dropped to a hand-rolled gather, while reusing the existing concurrency, fail-fast cancellation, per-branch failure isolation, and reducer fan-in. A branch gives its work as exactly one of `subgraph` / `call`, and a callable branch declares no `inputs` / `outputs`, else a new compile-time `ParallelBranchesInvalidBranchSpec`; a node may mix the two forms freely. A branch (either form) may also carry an optional `when` predicate over the parent state, evaluated once at dispatch: a `False` result skips the branch entirely (no dispatch, contribution, observer events, or span), and an all-skipped node is a valid no-op distinct from the compile-time `ParallelBranchesNoBranches`. A callable branch is the unit of work, so it emits one `started` / `completed` observer pair keyed by `branch_name` (rendered as a single branch span); a skipped branch emits nothing. `ParallelBranchesInvalidBranchSpec` is exported from `openarmature.graph`. Conformance fixtures 073 (two callable branches merge to disjoint fields), 074 (conditional `when` skips / dispatches), and 075 (callable branch failure-isolation degrade) run in `test_pipeline_utilities`.
### Changed
-- **Pinned spec advances v0.60.0 → v0.65.0** across the v0.15.0 cycle: v0.61.0 (proposal 0061, the detached-trace invocation span above), v0.62.0 (proposal 0064, the Langfuse session/user population above), v0.63.0 (proposal 0072, the prompt cache control above), the v0.63.1 patch (pipeline-utilities coverage fixtures 070/071 for the already-implemented 0069 / 0070 behavior, no new proposal), and v0.64.0 (proposal 0073, GenAI semconv adoption reconciliation: OA retains `gen_ai.system` despite the upstream rename to `gen_ai.provider.name`; textual-only, with no emitted-attribute or fixture change, so the existing `gen_ai.*` fixtures stand as the retention regression), and v0.65.0 (proposal 0074, the failure-isolation `catch` gate above). `conformance.toml` records 0061 / 0072 / 0074 `implemented`, 0064 `partial` (its `sessionId` half is dormant pending the sessions capability), and 0073 `textual-only`. Proposal 0050 needed no pin bump of its own (it was already within the pin from its v0.42.0 acceptance); its v0.14.0 `partial` entry flips to `implemented` with the per-attempt span surface above.
+- **Pinned spec advances v0.60.0 → v0.66.1** across the v0.15.0 cycle: v0.61.0 (proposal 0061, the detached-trace invocation span above), v0.62.0 (proposal 0064, the Langfuse session/user population above), v0.63.0 (proposal 0072, the prompt cache control above), the v0.63.1 patch (pipeline-utilities coverage fixtures 070/071 for the already-implemented 0069 / 0070 behavior, no new proposal), and v0.64.0 (proposal 0073, GenAI semconv adoption reconciliation: OA retains `gen_ai.system` despite the upstream rename to `gen_ai.provider.name`; textual-only, with no emitted-attribute or fixture change, so the existing `gen_ai.*` fixtures stand as the retention regression), v0.65.0 (proposal 0074, the failure-isolation `catch` gate above), v0.66.0 (proposal 0075, the inline-callable parallel branches and conditional `when` above), and the v0.66.1 patch (an observability §8 call-level-retry Langfuse-mapping clarification reconciling §8 with the per-attempt §5.5 spans: one terminal Generation per `complete()` call, not one per attempt, which the Langfuse observer already renders by driving the Generation from the terminal `LlmCompletionEvent` / `LlmFailedEvent` and skipping the per-attempt `LlmRetryAttemptEvent`; no behavior or fixture change). `conformance.toml` records 0061 / 0072 / 0074 / 0075 `implemented`, 0064 `partial` (its `sessionId` half is dormant pending the sessions capability), and 0073 `textual-only`. Proposal 0050 needed no pin bump of its own (it was already within the pin from its v0.42.0 acceptance); its v0.14.0 `partial` entry flips to `implemented` with the per-attempt span surface above.
## [0.14.0] — 2026-06-17
diff --git a/conformance.toml b/conformance.toml
index 76a1802..59098a9 100644
--- a/conformance.toml
+++ b/conformance.toml
@@ -32,7 +32,7 @@
[manifest]
implementation = "openarmature-python"
-spec_pin = "v0.65.0"
+spec_pin = "v0.66.1"
# Status values:
# implemented — shipped behavior matches the proposal's contract
@@ -726,3 +726,8 @@ note = "Governance + observability §5.5 rationale change: reconciles the gen_ai
status = "implemented"
since = "0.15.0"
note = "FailureIsolationMiddleware gains an optional `catch` set of error categories (§6.3): an exception is caught only if the DERIVED category of its cause chain (the outermost non-carrier link, resolved THROUGH node_exception carriers -- the same value reported as caught_exception.category) is in the set, composing with `predicate` as a conjunction (both default permissive, both unset = catch-all; a null derived category never matches a non-empty set). This classifies a carrier-wrapped failure correctly at a wrapping placement where a surface check sees only the carrier. The §6.4 cause-chain classification walk is promoted to a public primitive classify_cause_chain(exc) -> CaughtException (the existing failure-isolation record: chain + derived category + message) in openarmature.graph, shared by the catch gate, the emitted event, and any consumer. §6.1: the default retry classifier's single-level depth is documented as deliberate (re-run granularity vs §6.3 full-chain degrade); no behavior change. Fixture 072 (catch matches through an instance-placement carrier and degrades; a non-matching catch propagates with no event). The optional native-exception-type catch sugar (spec MAY) is not shipped."
+
+[proposals."0075"]
+status = "implemented"
+since = "0.15.0"
+note = "ParallelBranchesNode gains two additive branch forms. (1) Inline-callable branches (§11.1.1): a BranchSpec may give its work as `call` (an async function over the parent state returning a parent-shaped partial update) instead of a compiled `subgraph` + inputs/outputs projection; the contribution is the returned partial directly, merged via the parent reducer with no projection (§11.4). Exactly one of subgraph/call per branch, and a callable branch declares no inputs/outputs, else parallel_branches_invalid_branch_spec (a new compile-time category); a node MAY mix subgraph and callable branches. Per-leg failure isolation on a callable branch is the existing §11.7 branch-middleware contract (wrap the callable in FailureIsolationMiddleware). (2) Conditional branches (§11.10): a BranchSpec may carry an optional `when` predicate (parent_state) -> bool, evaluated once at dispatch; false skips the branch entirely (no dispatch, contribution, observer events, or span). All-branches-skipped is a valid no-op, distinct from the compile-time parallel_branches_no_branches (empty declared mapping). graph-engine §6 / observability §5.7: a callable branch is the unit -- it emits one started/completed pair keyed by branch_name (rendered as a branch span via the existing §5.7 machinery), a skipped branch emits nothing. Fixtures 073 (two callable branches merge to disjoint fields), 074 (when false skips / true dispatches), 075 (callable branch + FailureIsolationMiddleware degrades, sibling completes, category resolves through the chain)."
diff --git a/docs/concepts/parallel-branches.md b/docs/concepts/parallel-branches.md
index d55e426..9becbba 100644
--- a/docs/concepts/parallel-branches.md
+++ b/docs/concepts/parallel-branches.md
@@ -76,6 +76,61 @@ order**: first the branch declared first in the `branches` dict,
then the next, and so on. This is deterministic regardless of which
branch's inner work finishes first.
+## Lightweight callable branches
+
+Not every branch needs a compiled subgraph. When a branch is really
+just *"call this one async function over the shared state"*, give its
+work as `call` instead of `subgraph`: an async function that reads the
+parent state and returns a parent-shaped partial update. No subgraph,
+no state schema, no `inputs` / `outputs` projection.
+
+```python
+async def vector_recall(state: SearchState) -> dict[str, object]:
+ hits = await vector_store.query(state.query)
+ return {"vector_hits": hits}
+
+async def keyword_recall(state: SearchState) -> dict[str, object]:
+ hits = await fts_index.search(state.query)
+ return {"keyword_hits": hits}
+
+builder.add_parallel_branches_node(
+ "recall",
+ branches={
+ "vector": BranchSpec(call=vector_recall),
+ "keyword": BranchSpec(call=keyword_recall),
+ },
+)
+```
+
+This is the right form for hybrid recall (vector plus full-text),
+paired reads, and other *"M heterogeneous lightweight calls over
+shared state, each independently failure-isolated"* shapes. Each
+branch returns parent fields directly, so the returned partial is the
+branch's contribution as-is: it merges via the parent's reducers in
+branch insertion order, exactly like a subgraph branch's projected
+outputs, with no projection step.
+
+A branch gives its work as **exactly one** of `subgraph` or `call`.
+Declaring both, neither, or `inputs` / `outputs` on a callable branch
+(which has no subgraph state to project against) is a compile-time
+`ParallelBranchesInvalidBranchSpec`. A single parallel-branches node
+may freely mix subgraph branches and callable branches.
+
+Per-leg failure isolation works the same on a callable branch: wrap it
+in a `FailureIsolationMiddleware` via the branch's `middleware` (see
+[Branch middleware](#branch-middleware) below). A failing callable
+that degrades to its configured update "succeeds" from the node's view,
+so it contributes the degraded update and does not trip `fail_fast`.
+
+Because a callable branch has no inner nodes, the branch itself is the
+unit of work: it emits one `started` / `completed` observer pair keyed
+by its `branch_name`, so per-branch observability comes from the branch
+as a whole. The bundled OTel observer renders it as a single per-branch
+dispatch span keyed by `branch_name`, with no inner-node spans beneath
+it (a subgraph branch, by contrast, spans its inner nodes under the
+dispatch span); the Langfuse observer renders it as a single observation
+under the node. A `when`-skipped branch produces no span at all.
+
## Error policy
- **`"fail_fast"`** (default): the first branch failure cancels
@@ -113,6 +168,44 @@ Branch middleware is independent across branches: branch A may
have `[retry, timing]`; branch B may have `[]`; branch C may have
some custom breaker. Each branch's chain composes in isolation.
+## Conditional branches
+
+Any branch (subgraph or callable) may carry an optional `when`
+predicate over the parent state. It is evaluated **once at dispatch**,
+against the state the parallel-branches node received:
+
+```python
+builder.add_parallel_branches_node(
+ "recall",
+ branches={
+ # Only run the vector leg when the query carries an embedding.
+ "vector": BranchSpec(call=vector_recall, when=lambda s: s.embedding is not None),
+ "keyword": BranchSpec(call=keyword_recall),
+ },
+)
+```
+
+- `when` absent (default): the branch always dispatches.
+- `when` returns `True`: the branch dispatches normally.
+- `when` returns `False`: the branch is **skipped** entirely. It is
+ not dispatched, runs no work, contributes nothing to parent state,
+ and emits no observer events or span. It simply does not appear in
+ the run.
+
+This expresses *"skip this leg when it does not apply"* directly,
+without an always-run self-no-op branch cluttering the trace. The
+`branches` mapping and its insertion order are unchanged; skipping is
+a runtime decision over the declared set, and the branches that do
+dispatch keep their insertion-order determinism. If **every** branch
+is skipped, the node completes as a valid no-op (it contributes
+nothing), distinct from the compile-time `ParallelBranchesNoBranches`
+that an empty declared mapping raises.
+
+Keep `when` a deterministic function of the dispatch-time parent
+state so repeated runs skip the same set; a `when` that consults
+nondeterministic sources carries the same caveat as a conditional
+middleware.
+
## Composition with other constructs
Parallel branches compose with the rest of the engine the way
@@ -144,8 +237,10 @@ Per-branch progress is not individually persisted in v1.
this subgraph for each item in a list," reach for
[fan-out](fan-out.md).
- **Not a router.** A router is a conditional-edge pattern that
- picks one branch based on state. Parallel branches runs *all*
- branches concurrently.
+ picks one path based on state. Parallel branches runs all the
+ branches that dispatch concurrently; a branch's `when` can skip an
+ individual branch, but it gates each branch independently rather
+ than selecting exactly one.
- **Not a coordinator.** Branches don't communicate with each other
during execution; if branch B's work depends on branch A's
output, you want a linear pipeline (A → B), not parallel branches.
diff --git a/docs/examples/parallel-branches.md b/docs/examples/parallel-branches.md
index e73ee83..02ca649 100644
--- a/docs/examples/parallel-branches.md
+++ b/docs/examples/parallel-branches.md
@@ -3,19 +3,21 @@
!!! info "Source"
[https://github.com/LunarCommand/openarmature-python/blob/main/examples/parallel-branches/main.py](https://github.com/LunarCommand/openarmature-python/blob/main/examples/parallel-branches/main.py){target="_blank" rel="noopener"}
-Enrich a lunar-mission news article with three independent
-analyses (one-sentence summary, sentiment label, topic tags)
-running concurrently as separate subgraphs.
+Enrich a lunar-mission news article with several independent analyses
+(one-sentence summary, sentiment label, topic tags, reading-time
+estimate, optional translation) running concurrently: some as
+heterogeneous subgraphs, one as a lightweight inline function, and one
+gated by a condition.
## Overview
Where fan-out (the fan-out-with-retry example) runs N copies of
*one* subgraph against different inputs, parallel-branches runs
-M *heterogeneous* subgraphs against the same input. Different state schemas,
+M *heterogeneous* branches against the same input. Different state schemas,
different middleware, different topologies per branch, one
dispatch.
-The article goes into three branches in parallel:
+The article goes into several branches in parallel:
- **summary**: bare subgraph, one node, writes `summary` back.
- **sentiment**: subgraph wrapped in `RetryMiddleware` (the
@@ -23,37 +25,54 @@ The article goes into three branches in parallel:
`label` back into the parent's `sentiment` field.
- **topics**: bare subgraph, writes a `tags` list back into the
parent's `topics` field.
+- **reading_time**: an inline `call` branch, not a subgraph. A plain
+ async function over the parent state estimates reading time from the
+ word count (no LLM, no projection) and returns the parent field
+ directly.
+- **translation**: a `call` branch gated by a `when` predicate. It runs
+ only when `target_language` is set on the state; with it unset (the
+ default) the branch is skipped entirely.
-The branches don't depend on each other, so they fire concurrently
-and the parent fans in once all three complete.
+The branches don't depend on each other, so they fire concurrently and
+the parent fans in once the dispatched branches complete.
## What it teaches
- [`add_parallel_branches_node`](../concepts/parallel-branches.md):
- M named `BranchSpec`s under one node. Each spec carries its own
- compiled subgraph plus per-branch input/output projection plus
- optional per-branch middleware.
+ M named `BranchSpec`s under one node. A branch gives its work as
+ either a compiled `subgraph` (with per-branch input/output
+ projection) or an inline `call`, plus optional per-branch middleware
+ and an optional `when` predicate.
+- Callable branches. `reading_time` is `BranchSpec(call=fn)`: an inline
+ async function over the parent state that returns parent fields
+ directly, with no subgraph, state schema, or projection. Reach for
+ `call` when a leg is really just "run this one function" rather than a
+ whole pipeline.
+- Conditional `when`. `translation` carries
+ `when=lambda s: bool(s.target_language)` and is skipped at dispatch
+ unless a target language is requested: no dispatch, no contribution,
+ no observer events. The other branches always run.
- Branches with *different* state schemas. The summary subgraph's
state has a `summary` field; the sentiment subgraph's has
`label`; the topics subgraph's has a `tags` list. The projection
mappings translate between the branch's vocabulary and the
- parent's.
+ parent's. Callable branches need no projection: they read and write
+ parent fields directly.
- Heterogeneous per-branch middleware. The sentiment branch wraps
- its subgraph in retry; the other two run bare. A production
- pipeline often wants different retry policies, timing windows, or
- custom middleware per branch.
+ its subgraph in retry; the others run bare. A production pipeline
+ often wants different retry policies, timing windows, or custom
+ middleware per branch.
- Branch insertion order = fan-in order. When two branches write to
the same parent field, the parent's reducer applies them in the
order they were declared in the `branches` mapping (not in
- completion order). The three branches here write disjoint parent
- fields, so the order doesn't affect the result, but the property
- holds.
-- A `branch_attribution_observer` reads
- `NodeEvent.branch_name` on inner-node events. `branch_name` is
- populated only for events *inside* a branch's subgraph;
- outer-graph nodes carry `branch_name=None`. This is the
- per-event attribution that lets observability backends route
- metrics and spans by branch.
+ completion order). The branches here write disjoint parent fields,
+ so the order doesn't affect the result, but the property holds.
+- A `branch_attribution_observer` reads `NodeEvent.branch_name`. It is
+ populated for every event from inside a branch: the inner nodes of a
+ subgraph branch and the single branch-unit event of a callable branch.
+ Outer-graph nodes carry `branch_name=None`, and a skipped `when`
+ branch emits no events at all. This is the per-event attribution that
+ lets observability backends route metrics and spans by branch.
## How to run
@@ -62,7 +81,8 @@ uv sync --group examples
LLM_API_KEY=sk-... uv run python examples/parallel-branches/main.py
```
-The article is baked into the example.
+The article is baked into the example. Set `target_language` on the
+input state to also run the translation branch.
## The graph
@@ -75,24 +95,27 @@ flowchart TD
subgraph enrich [enrich: parallel-branches]
direction TB
- summary[summary branch]
- sentiment[sentiment branch
retry middleware]
- topics[topics branch]
+ summary[summary branch
subgraph]
+ sentiment[sentiment branch
subgraph + retry]
+ topics[topics branch
subgraph]
+ reading_time[reading_time branch
callable]
+ translation[translation branch
callable + when]
end
start --> receive --> enrich --> present --> stop
```
-`enrich` is the parallel-branches node; the three branches inside
-the box dispatch concurrently against the same `article` field on
-parent state. The sentiment branch is the only one with middleware
-attached.
+`enrich` is the parallel-branches node; the branches inside the box
+dispatch concurrently against the same `article` field on parent state.
+The summary, sentiment, and topics branches are subgraphs; reading_time
+and translation are inline callables. The translation branch is gated by
+`when` and is skipped in the default run.
## Reading the output
```
========================================================================
-Lunar-mission article enrichment - three independent analyses in parallel
+Lunar-mission article enrichment; independent analyses in parallel
========================================================================
Article (642 chars):
@@ -101,37 +124,44 @@ NASA's Artemis II crew capsule Integrity splashed down in the Pacific
Ocean this evening, ending a ten-day flight that carried four astronauts
on a free-return trajectory around the Moon and back...
- [observer] (branch=summary) inner node 'write_summary' started
- [observer] (branch=sentiment) inner node 'classify_sentiment' started
- [observer] (branch=topics) inner node 'extract_topics' started
+ [observer] (branch=summary) node 'write_summary' started
+ [observer] (branch=sentiment) node 'classify_sentiment' started
+ [observer] (branch=topics) node 'extract_topics' started
+ [observer] (branch=reading_time) node 'reading_time' started
========================================================================
Enrichment results
========================================================================
- summary:
- sentiment: positive
- topics: ['Artemis II', 'splashdown', 'lunar program']
+ summary:
+ sentiment: positive
+ topics: ['Artemis II', 'splashdown', 'lunar program']
+ reading time: 36s
+ translation: (skipped by `when`; set target_language to enable)
wall-clock: 1142.6 ms
-The three branches ran in parallel - wall-clock is closer to the
-slowest single branch than to the sum of all three.
+The branches ran in parallel; wall-clock is closer to the slowest
+single branch than to the sum of them all...
```
-- **The three observer lines** fire close together (often within a
- few ms of each other), confirming the branches dispatched in
- parallel rather than serially.
-- **`branch_name` attribution** is what makes per-branch
- observability tractable. `write_summary` knows nothing about
- `branch_name`; it's the engine that tags the event for the
- observer.
-- **Wall-clock under 1500 ms** for three sequential LLM calls is
- the clearest indicator of parallelism. Three serial calls at
- roughly 1s each would land near 3 seconds; under parallel
- dispatch the wall-clock approaches the slowest branch's duration.
-- **Disjoint output fields** mean the reducer order at fan-in
- doesn't matter here. If two branches both wrote to `summary`, the
- declared branch order (`summary` before `sentiment` before
- `topics`) would determine which value won under the default
+- **The observer lines** fire close together (often within a few ms of
+ each other), confirming the branches dispatched in parallel rather
+ than serially. There is no `translation` line: its `when` predicate
+ was false, so the branch was skipped and emitted nothing.
+- **The reading_time line** comes from a callable branch. It carries
+ `branch_name` exactly like the subgraph branches' inner nodes, so
+ per-branch attribution is uniform across both branch forms.
+- **`branch_name` attribution** is what makes per-branch observability
+ tractable. `write_summary` knows nothing about `branch_name`; it's
+ the engine that tags the event for the observer.
+- **Wall-clock under 1500 ms** for several LLM branches is the clearest
+ indicator of parallelism. Run serially at roughly 1s each they would
+ sum to several seconds; under parallel dispatch the wall-clock
+ approaches the slowest branch's duration. The reading_time branch is
+ effectively free (no LLM).
+- **Disjoint output fields** mean the reducer order at fan-in doesn't
+ matter here. If two branches both wrote to `summary`, the declared
+ branch order would determine which value won under the default
`last_write_wins` reducer.
+```
diff --git a/examples/parallel-branches/main.py b/examples/parallel-branches/main.py
index 2aafcb2..be751c8 100644
--- a/examples/parallel-branches/main.py
+++ b/examples/parallel-branches/main.py
@@ -1,16 +1,17 @@
-"""openarmature demo: enrich a lunar-mission news article with three
+"""openarmature demo: enrich a lunar-mission news article with several
independent analyses running concurrently.
-**Use case:** Given a news article about a lunar mission, produce three
+**Use case:** Given a news article about a lunar mission, produce
side-by-side outputs: a one-sentence summary, an overall sentiment label,
-and a short list of topic tags. The three analyses don't depend on each
-other, so dispatch them in parallel. Each analysis is its own subgraph
-with its own state schema (the summary subgraph doesn't care about
-sentiment, the topic extractor doesn't care about either); which is
-exactly the shape parallel-branches is for.
+a short list of topic tags, an estimated reading time, and an optional
+translation. The analyses don't depend on each other, so dispatch them
+in parallel. Some are full subgraphs with their own state schema
+(summary, sentiment, topics); one is a lightweight inline function over
+the shared article (reading time); and one runs only when asked for
+(translation). That mix is exactly the shape parallel-branches is for.
Where fan-out (the fan-out-with-retry example) runs N copies of ONE subgraph against
-different inputs, parallel-branches runs M heterogeneous subgraphs
+different inputs, parallel-branches runs M heterogeneous branches
against the same input. Different schemas, different middleware,
different topologies per branch; one dispatch.
@@ -18,16 +19,29 @@
- ``GraphBuilder.add_parallel_branches_node`` registers M
``BranchSpec``s under named keys (``summary``, ``sentiment``,
- ``topics`` here). Each spec carries its own compiled subgraph,
- its own input/output projection, and optionally its own middleware.
+ ``topics``, ``reading_time``, ``translation`` here). A branch gives
+ its work as either a compiled ``subgraph`` (with input/output
+ projection) or an inline ``call``, and may carry its own middleware
+ and an optional ``when`` predicate.
+- A branch can be a whole subgraph OR a single function. The summary,
+ sentiment, and topics branches are ``subgraph=...`` branches, each
+ with its own state schema and a projection mapping the parent's
+ ``article`` into the branch's input field. The reading-time branch is
+ a ``call=...`` branch: an inline async function over the parent state
+ that returns parent fields directly, no subgraph or projection. Reach
+ for ``call`` when a leg is really just "run this one function."
+- The translation branch carries a ``when`` predicate and runs only
+ when ``target_language`` is set on the state. With it unset (the
+ default here) the branch is skipped entirely: no dispatch, no
+ contribution, no observer events. Flip ``target_language`` to run it.
- The branches have DIFFERENT state schemas. The summary subgraph's
state has a ``summary`` field; the sentiment subgraph's has a
``label`` field; the topics subgraph's has a ``tags`` list. Each is
scoped to its job. The projection mapping translates the parent's
``article`` into each branch's input field name.
- The sentiment branch wraps its subgraph in ``RetryMiddleware`` to
- show per-branch middleware composition. The other two branches run
- bare. Per-branch middleware is heterogeneous; branch A may have
+ show per-branch middleware composition. The other branches run bare.
+ Per-branch middleware is heterogeneous; branch A may have
retry + timing, branch B nothing, branch C something custom.
- Branch insertion order determines fan-in order: when two branches
contribute to the same parent field, the parent's reducer applies
@@ -128,12 +142,19 @@ async def _chat(system: str, user: str) -> str:
class ArticleState(State):
- """Outer: an article goes in, three enrichment fields come out."""
+ """Outer: an article goes in, the enrichment fields come out.
+
+ ``target_language`` is an input flag: set it to enable the optional
+ translation branch (a ``when`` predicate gates that branch on it).
+ """
article: str = ""
+ target_language: str = ""
summary: str = ""
sentiment: str = ""
topics: list[str] = Field(default_factory=list)
+ reading_time_seconds: int = 0
+ translation: str = ""
trace: Annotated[list[str], append] = Field(default_factory=list)
@@ -227,6 +248,31 @@ def build_topics_subgraph() -> CompiledGraph[TopicsState]:
)
+# ---------------------------------------------------------------------------
+# Callable branches; each is just a function over the parent state. No
+# subgraph, no state schema, no projection: the function reads the article
+# off the parent state and returns parent-shaped fields directly.
+# ---------------------------------------------------------------------------
+
+
+async def estimate_reading_time(s: ArticleState) -> Mapping[str, Any]:
+ """A lightweight branch that needs no LLM and no subgraph: estimate
+ reading time from the word count at ~200 words per minute. The kind
+ of leg that the subgraph form would be too heavy for."""
+ words = len(s.article.split())
+ return {"reading_time_seconds": round(words / 200 * 60)}
+
+
+async def translate_article(s: ArticleState) -> Mapping[str, Any]:
+ """An LLM branch that runs only when a target language is requested
+ (gated by a ``when`` predicate on its BranchSpec)."""
+ content = await _chat(
+ system=(f"Translate the article into {s.target_language}. Output only the translation, no preamble."),
+ user=s.article,
+ )
+ return {"translation": content}
+
+
# ---------------------------------------------------------------------------
# Outer graph
# ---------------------------------------------------------------------------
@@ -243,20 +289,22 @@ async def present(s: ArticleState) -> Mapping[str, Any]:
async def branch_attribution_observer(event: ObserverEvent) -> None:
- """Print which branch each inner-node event came from.
-
- NodeEvent carries ``branch_name`` on events from nodes that
- execute INSIDE a parallel-branches branch; it's the per-event
- attribution that says "this came from branch X." Outermost-graph
- nodes (receive, enrich, present) carry no branch_name. The
- observer skips events with no branch attribution and prints
- ``(branch=…) node_name`` for the rest.
+ """Print which branch each event came from.
+
+ NodeEvent carries ``branch_name`` on every event from inside a
+ parallel-branches branch: the inner nodes of a subgraph branch, and
+ the single branch-unit event of a callable branch. It's the per-event
+ attribution that says "this came from branch X." Outermost-graph nodes
+ (receive, enrich, present) carry no branch_name. A skipped ``when``
+ branch emits no events, so it never appears here. The observer skips
+ events with no branch attribution and prints ``(branch=…) node_name``
+ for the rest.
"""
if not isinstance(event, NodeEvent):
return
if event.branch_name is None or event.phase != "started":
return
- print(f" [observer] (branch={event.branch_name}) inner node {event.node_name!r} started")
+ print(f" [observer] (branch={event.branch_name}) node {event.node_name!r} started")
def build_graph() -> CompiledGraph[ArticleState]:
@@ -297,6 +345,16 @@ def build_graph() -> CompiledGraph[ArticleState]:
inputs={"text": "article"},
outputs={"topics": "tags"},
),
+ # A callable branch: an inline function over the parent
+ # state, no subgraph / projection. Its return value IS the
+ # contribution.
+ "reading_time": BranchSpec(call=estimate_reading_time),
+ # A conditional callable branch: skipped at dispatch unless
+ # a target language is requested.
+ "translation": BranchSpec(
+ call=translate_article,
+ when=lambda s: bool(s.target_language),
+ ),
},
)
.add_node("present", present)
@@ -318,7 +376,7 @@ async def main() -> None:
graph.attach_observer(branch_attribution_observer)
print("=" * 72)
- print("Lunar-mission article enrichment; three independent analyses in parallel")
+ print("Lunar-mission article enrichment; independent analyses in parallel")
print("=" * 72)
print()
print(f"Article ({len(ARTICLE)} chars):")
@@ -328,20 +386,29 @@ async def main() -> None:
wall_start = time.monotonic()
try:
+ # target_language is unset, so the translation branch's `when`
+ # predicate is false and that branch is skipped. Set it (e.g.
+ # ArticleState(article=ARTICLE, target_language="Spanish")) to run it.
final = await graph.invoke(ArticleState(article=ARTICLE))
wall_ms = (time.monotonic() - wall_start) * 1000.0
print("=" * 72)
print("Enrichment results")
print("=" * 72)
print()
- print(f" summary: {final.summary}")
- print(f" sentiment: {final.sentiment}")
- print(f" topics: {final.topics}")
+ print(f" summary: {final.summary}")
+ print(f" sentiment: {final.sentiment}")
+ print(f" topics: {final.topics}")
+ print(f" reading time: {final.reading_time_seconds}s")
+ translation = final.translation or "(skipped by `when`; set target_language to enable)"
+ print(f" translation: {translation}")
print()
print(f" wall-clock: {wall_ms:7.1f} ms")
print()
- print("The three branches ran in parallel; wall-clock is closer to the")
- print("slowest single branch than to the sum of all three.")
+ print("The branches ran in parallel; wall-clock is closer to the slowest")
+ print("single branch than to the sum of them all. The subgraph branches")
+ print("(summary, sentiment, topics) and the inline-callable reading-time")
+ print("branch all contributed; the translation branch was skipped by its")
+ print("`when` predicate, so it did no work and emitted no events.")
finally:
await graph.drain()
if _provider_instance is not None:
diff --git a/openarmature-spec b/openarmature-spec
index 7472cd7..451a579 160000
--- a/openarmature-spec
+++ b/openarmature-spec
@@ -1 +1 @@
-Subproject commit 7472cd7457a76c2cc1b160f4200eda6a11afb7ba
+Subproject commit 451a5799ad81b57f3f5479bc694917a66fa6eaa7
diff --git a/pyproject.toml b/pyproject.toml
index f35fb66..df6178b 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -63,7 +63,7 @@ Specification = "https://github.com/LunarCommand/openarmature-spec"
openarmature = "openarmature.cli:main"
[tool.openarmature]
-spec_version = "0.65.0"
+spec_version = "0.66.1"
[dependency-groups]
dev = [
diff --git a/src/openarmature/AGENTS.md b/src/openarmature/AGENTS.md
index 302280e..c2aa44d 100644
--- a/src/openarmature/AGENTS.md
+++ b/src/openarmature/AGENTS.md
@@ -1,6 +1,6 @@
# OpenArmature — Agent documentation
-*This is the agent guide bundled with the openarmature Python package, version 0.14.0 (spec v0.65.0). For the full docs site see [openarmature.ai](https://openarmature.ai). For the canonical spec text see [openarmature.org/capabilities](https://openarmature.org/capabilities/). For project-specific conventions for the code you're editing, see the host project's `AGENTS.md` or `CLAUDE.md`.*
+*This is the agent guide bundled with the openarmature Python package, version 0.14.0 (spec v0.66.1). For the full docs site see [openarmature.ai](https://openarmature.ai). For the canonical spec text see [openarmature.org/capabilities](https://openarmature.org/capabilities/). For project-specific conventions for the code you're editing, see the host project's `AGENTS.md` or `CLAUDE.md`.*
## TL;DR
@@ -10,7 +10,7 @@ OpenArmature is a workflow framework for LLM pipelines and tool-calling agents:
## Capability contracts
-_Sourced from openarmature-spec v0.65.0. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's `spec.md` verbatim — including additions from accepted proposals that this Python implementation may not yet ship. For per-proposal implementation status (implemented / partial / textual-only / not-yet), see the `conformance.toml` manifest at the repo root. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site._
+_Sourced from openarmature-spec v0.66.1. Each entry below reproduces §1 (Purpose) and §2 (Concepts) of the capability's `spec.md` verbatim — including additions from accepted proposals that this Python implementation may not yet ship. For per-proposal implementation status (implemented / partial / textual-only / not-yet), see the `conformance.toml` manifest at the repo root. For the full spec text (execution model, error semantics, determinism, observer hooks, etc.) see the linked docs site._
### Capability: `graph-engine`
@@ -1523,7 +1523,7 @@ _Runnable example programs shipped in the source tree at `examples/`. The full c
- **`examples/multimodal-prompt/main.py`** — openarmature demo: two independent analyses of a lunar-mission photograph using versioned prompt templates, a fallback prompt backend, and a multimodal user message.
- **`examples/nested-subgraphs/main.py`** — openarmature demo: question answering against a tiny document corpus, with two levels of subgraph nesting.
- **`examples/observer-hooks/main.py`** — openarmature demo: observer hooks for structured logging, per-call metrics, and OTel spans.
-- **`examples/parallel-branches/main.py`** — openarmature demo: enrich a lunar-mission news article with three independent analyses running concurrently.
+- **`examples/parallel-branches/main.py`** — openarmature demo: enrich a lunar-mission news article with several independent analyses running concurrently.
- **`examples/production-observability/main.py`** — openarmature demo: production observability with dual OTel + Langfuse observers, caller hooks for trace.input/output, and the canonical TimingMiddleware.
- **`examples/routing-and-subgraphs/main.py`** — openarmature demo: conditional routing + subgraph with a custom projection.
- **`examples/tool-use/main.py`** — openarmature demo: a lunar-mission assistant that calls local Python functions as tools to answer fact and physics questions about Apollo / Artemis missions.
diff --git a/src/openarmature/__init__.py b/src/openarmature/__init__.py
index eab4587..e4302f3 100644
--- a/src/openarmature/__init__.py
+++ b/src/openarmature/__init__.py
@@ -25,7 +25,7 @@
"""
__version__ = "0.14.0"
-__spec_version__ = "0.65.0"
+__spec_version__ = "0.66.1"
# Proposal 0052 (spec observability §5.1 / §8.4.1): canonical
# package-registry name for this implementation. Surfaces on every
# OTel invocation span as ``openarmature.implementation.name`` and on
diff --git a/src/openarmature/graph/__init__.py b/src/openarmature/graph/__init__.py
index 3f82555..a65a6ee 100644
--- a/src/openarmature/graph/__init__.py
+++ b/src/openarmature/graph/__init__.py
@@ -30,6 +30,7 @@
NoDeclaredEntry,
NodeException,
ParallelBranchesBranchFailed,
+ ParallelBranchesInvalidBranchSpec,
ParallelBranchesNoBranches,
ReducerError,
RoutingError,
@@ -114,6 +115,7 @@
"Observer",
"ObserverEvent",
"ParallelBranchesBranchFailed",
+ "ParallelBranchesInvalidBranchSpec",
"ParallelBranchesNoBranches",
"ParallelBranchesNode",
"BranchSpec",
diff --git a/src/openarmature/graph/builder.py b/src/openarmature/graph/builder.py
index b71b663..5c454a8 100644
--- a/src/openarmature/graph/builder.py
+++ b/src/openarmature/graph/builder.py
@@ -29,6 +29,7 @@
MappingReferencesUndeclaredField,
MultipleOutgoingEdges,
NoDeclaredEntry,
+ ParallelBranchesInvalidBranchSpec,
ParallelBranchesNoBranches,
UnreachableNode,
)
@@ -316,8 +317,11 @@ def add_parallel_branches_node(
- ``branches`` non-empty (raises ``ParallelBranchesNoBranches``).
- Each branch name is a non-empty string (raises ``ValueError``).
- - Each branch's ``inputs`` / ``outputs`` refer only to declared
- fields on the (parent, branch-subgraph) state schemas
+ - Each branch gives its work as exactly one of ``subgraph`` /
+ ``call``, and a callable branch declares no ``inputs`` /
+ ``outputs`` (raises ``ParallelBranchesInvalidBranchSpec``).
+ - Each subgraph branch's ``inputs`` / ``outputs`` refer only to
+ declared fields on the (parent, branch-subgraph) state schemas
(raises ``MappingReferencesUndeclaredField``).
- ``errors_field`` (when set) is a declared parent-state field.
"""
@@ -337,6 +341,27 @@ def add_parallel_branches_node(
for branch_name, spec in branches.items():
if not branch_name:
raise ValueError(f"parallel-branches node {name!r}: branch_name MUST be non-empty")
+ # A branch's work is exactly one of subgraph / call (§11.1.1).
+ # Both or neither is parallel_branches_invalid_branch_spec.
+ if (spec.subgraph is None) == (spec.call is None):
+ raise ParallelBranchesInvalidBranchSpec(
+ name,
+ branch_name,
+ "must give its work as exactly one of subgraph / call",
+ )
+ # A callable branch reads parent state and returns parent-shaped
+ # fields directly, so inputs/outputs projection is meaningless.
+ if spec.call is not None:
+ if spec.inputs or spec.outputs:
+ raise ParallelBranchesInvalidBranchSpec(
+ name,
+ branch_name,
+ "is a callable branch and must not declare inputs / outputs",
+ )
+ continue
+ # Subgraph branch: validate its inputs/outputs against the
+ # (parent, subgraph) state schemas.
+ assert spec.subgraph is not None
sub_fields = spec.subgraph.state_cls.model_fields
for sub_field, parent_field in spec.inputs.items():
if sub_field not in sub_fields:
diff --git a/src/openarmature/graph/compiled.py b/src/openarmature/graph/compiled.py
index 9036b96..5cf4209 100644
--- a/src/openarmature/graph/compiled.py
+++ b/src/openarmature/graph/compiled.py
@@ -2097,28 +2097,28 @@ async def _step_parallel_branches_node(
attempt_counter: list[int] = [0]
deferred_info: list[tuple[int, StateT, StateT] | None] = [None]
- # Per proposal 0044 (observability §5.7, v0.36.0): the
- # resolved parallel-branches configuration is static at
- # compile time (no count / concurrency resolvers like fan-out
- # has), so build the event config once here and ship it on
- # both started + completed events. ``branch_names`` mirrors
- # the dispatch order ParallelBranchesNode uses internally
- # (insertion order of the ``branches`` dict per pipeline-
- # utilities §11.1).
- #
- # Python dicts preserve insertion order (PEP 468; guaranteed
- # since 3.7), and YAML / direct-dict-literal ``branches``
- # construction at the call site preserves the source order
- # through into the dict's keys(). Spec §11.1 ties branch
- # declaration order to dispatch order, so this tuple IS the
- # declaration order observers should see.
+ # Per proposal 0044 (observability §5.7) + proposal 0075:
+ # ``branch_names`` is the full DECLARED set in insertion order
+ # (PEP 468 dict ordering; §11.1 ties declaration order to dispatch
+ # order, so this tuple IS the declaration order observers see).
+ # ``branch_count`` is the number of branches that DISPATCH for the
+ # dispatch-time state — ``when``-skipped branches (§11.10) are
+ # excluded, so the count matches the per-branch dispatch spans
+ # rendered. The two answer different questions ("what was declared"
+ # vs "how many ran"). Build the config inside ``innermost`` (where
+ # the dispatch-time state is in hand) and stash it so the completed
+ # event reuses the same value; ``when`` is pure (§11.10), so the
+ # dispatch set here equals the one ``run_with_context`` computes.
branch_names: tuple[str, ...] = tuple(node.branches.keys())
- parallel_branches_event_config = ParallelBranchesEventConfig(
- branch_names=branch_names,
- branch_count=len(branch_names),
- error_policy=node.error_policy,
- parent_node_name=current,
- )
+ config_box: list[ParallelBranchesEventConfig] = []
+
+ def _build_pb_config(s: Any) -> ParallelBranchesEventConfig:
+ return ParallelBranchesEventConfig(
+ branch_names=branch_names,
+ branch_count=len(node.dispatched_branches(s)),
+ error_policy=node.error_policy,
+ parent_node_name=current,
+ )
async def innermost(s: Any) -> Mapping[str, Any]:
attempt_counter[0] += 1
@@ -2126,6 +2126,7 @@ async def innermost(s: Any) -> Mapping[str, Any]:
# ``innermost`` for the v0.16.1 propagation rule.
attempt_index = current_attempt_index()
+ config_box[:] = [_build_pb_config(s)]
self._dispatch_started(
context,
current,
@@ -2133,7 +2134,7 @@ async def innermost(s: Any) -> Mapping[str, Any]:
step,
s,
attempt_index=attempt_index,
- parallel_branches_config=parallel_branches_event_config,
+ parallel_branches_config=config_box[0],
)
otel_token = _attach_active_observer_span()
try:
@@ -2148,7 +2149,7 @@ async def innermost(s: Any) -> Mapping[str, Any]:
s,
error=e,
attempt_index=attempt_index,
- parallel_branches_config=parallel_branches_event_config,
+ parallel_branches_config=config_box[0],
)
raise
except Exception as e:
@@ -2161,7 +2162,7 @@ async def innermost(s: Any) -> Mapping[str, Any]:
s,
error=wrapped,
attempt_index=attempt_index,
- parallel_branches_config=parallel_branches_event_config,
+ parallel_branches_config=config_box[0],
)
raise wrapped from e
finally:
@@ -2179,7 +2180,7 @@ async def innermost(s: Any) -> Mapping[str, Any]:
s,
error=e,
attempt_index=attempt_index,
- parallel_branches_config=parallel_branches_event_config,
+ parallel_branches_config=config_box[0],
)
raise
@@ -2238,7 +2239,7 @@ def finalize_completed(edge_error: RuntimeGraphError | None) -> None:
final_pre_state,
post_state=final_merged,
attempt_index=final_attempt_index,
- parallel_branches_config=parallel_branches_event_config,
+ parallel_branches_config=config_box[0],
)
else:
self._dispatch_completed(
@@ -2249,7 +2250,7 @@ def finalize_completed(edge_error: RuntimeGraphError | None) -> None:
final_pre_state,
error=edge_error,
attempt_index=final_attempt_index,
- parallel_branches_config=parallel_branches_event_config,
+ parallel_branches_config=config_box[0],
)
return _StepResult(state=merged_outer, finalize_completed=finalize_completed)
diff --git a/src/openarmature/graph/errors.py b/src/openarmature/graph/errors.py
index 06c5ac0..fd51d3a 100644
--- a/src/openarmature/graph/errors.py
+++ b/src/openarmature/graph/errors.py
@@ -154,6 +154,25 @@ def __init__(self, node_name: str) -> None:
self.node_name = node_name
+class ParallelBranchesInvalidBranchSpec(CompileError):
+ """Raised at registration when a branch spec's work is not given by
+ exactly one of ``subgraph`` / ``call``.
+
+ A branch is either a compiled ``subgraph`` (with optional
+ ``inputs`` / ``outputs`` projection) or an inline ``call`` (an async
+ function over the parent state). Declaring both, neither, or
+ ``inputs`` / ``outputs`` on a callable branch (which reads parent
+ state and returns parent-shaped fields directly, so projection is
+ meaningless) is invalid. Non-transient."""
+
+ category = "parallel_branches_invalid_branch_spec"
+
+ def __init__(self, node_name: str, branch_name: str, reason: str) -> None:
+ super().__init__(f"parallel-branches node {node_name!r}: branch {branch_name!r} {reason}")
+ self.node_name = node_name
+ self.branch_name = branch_name
+
+
# ===== Runtime errors =====
diff --git a/src/openarmature/graph/parallel_branches.py b/src/openarmature/graph/parallel_branches.py
index 8f8aac7..7f0a4c3 100644
--- a/src/openarmature/graph/parallel_branches.py
+++ b/src/openarmature/graph/parallel_branches.py
@@ -1,17 +1,20 @@
# Spec: realizes pipeline-utilities §11 (parallel branches).
-"""Parallel branches: concurrent dispatch of M heterogeneous compiled subgraphs.
+"""Parallel branches: concurrent dispatch of M heterogeneous branches.
Counterpart to :mod:`.fan_out`. Fan-out is data-driven (N items,
one subgraph, instantiated N times); parallel branches is
-topology-driven (M heterogeneous compiled subgraphs declared
-statically, run concurrently within a single parent invocation).
-
-Each branch's :class:`BranchSpec` carries its own compiled
-subgraph (with potentially different state schema, middleware,
-topology), its own ``inputs`` / ``outputs`` projection mappings,
-and its own optional ``middleware`` wrapping the whole branch
-invocation as a unit.
+topology-driven (M heterogeneous branches declared statically, run
+concurrently within a single parent invocation).
+
+Each branch's :class:`BranchSpec` gives its work as exactly one of a
+compiled ``subgraph`` (with its own state schema, middleware, topology,
+and ``inputs`` / ``outputs`` projection) or an inline ``call`` (an async
+function over the parent state returning a parent-shaped partial update,
+no subgraph / projection — the lightweight form). A branch may also carry
+its own optional ``middleware`` wrapping the whole branch as a unit, and
+an optional ``when`` predicate that skips the branch at dispatch when it
+returns false.
Buffer-then-apply semantics: contributions are collected during
dispatch and merged deterministically once at node completion,
@@ -38,16 +41,17 @@
import asyncio
import contextvars
import logging
-from collections.abc import Mapping
+from collections.abc import Awaitable, Callable, Mapping
from dataclasses import dataclass, field
-from typing import TYPE_CHECKING, Any, Literal
+from typing import TYPE_CHECKING, Any, Literal, cast
from openarmature.observability.correlation import (
_reset_branch_name,
_set_branch_name,
+ current_attempt_index,
)
-from .errors import ParallelBranchesBranchFailed
+from .errors import NodeException, ParallelBranchesBranchFailed
from .middleware import ChainCall, Middleware, compose_chain
from .state import State
@@ -58,24 +62,43 @@
_log = logging.getLogger(__name__)
+# A callable branch's work: an async function over the PARENT state
+# returning a parent-shaped partial update directly (no subgraph, no
+# state schema, no inputs/outputs projection).
+BranchCallable = Callable[[Any], Awaitable[Mapping[str, Any]]]
+
+
@dataclass(frozen=True)
class BranchSpec[ChildT: State]:
"""One entry in a :class:`ParallelBranchesNode`'s branch mapping.
- Branches are heterogeneous: each branch may reference a different
- compiled subgraph with a different state schema. ``inputs`` /
- ``outputs`` follow the same shape as subgraph projection
- mappings.
+ A branch's work is given by exactly one of:
+
+ - ``subgraph`` — a compiled subgraph (the heterogeneous-subgraph
+ form): each branch may reference a different compiled subgraph
+ with a different state schema, with ``inputs`` / ``outputs``
+ following the same shape as subgraph projection mappings.
+ - ``call`` — an inline async function over the parent state
+ returning a parent-shaped partial update (the lightweight form):
+ no subgraph, no state schema, no ``inputs`` / ``outputs``. The
+ function reads the parent state directly and returns parent fields.
+
+ An optional ``when`` predicate over the parent state, evaluated once
+ at dispatch, skips the branch entirely when it returns false.
Validation lives on the builder side
- (``GraphBuilder.add_parallel_branches_node``):
- ``mapping_references_undeclared_field`` for inputs/outputs
- referencing undeclared fields; ``parallel_branches_no_branches``
- for empty ``branches`` maps; ``ValueError`` for empty-string
- branch names.
+ (``GraphBuilder.add_parallel_branches_node``): exactly one of
+ ``subgraph`` / ``call`` and no ``inputs`` / ``outputs`` on a
+ callable branch (``parallel_branches_invalid_branch_spec``);
+ ``mapping_references_undeclared_field`` for subgraph-branch
+ inputs/outputs referencing undeclared fields;
+ ``parallel_branches_no_branches`` for empty ``branches`` maps;
+ ``ValueError`` for empty-string branch names.
"""
- subgraph: CompiledGraph[ChildT]
+ subgraph: CompiledGraph[ChildT] | None = None
+ call: BranchCallable | None = None
+ when: Callable[[Any], bool] | None = None
inputs: Mapping[str, str] = field(default_factory=dict[str, str])
outputs: Mapping[str, str] = field(default_factory=dict[str, str])
middleware: tuple[Middleware, ...] = ()
@@ -116,6 +139,21 @@ async def run(self, state: ParentT) -> Mapping[str, Any]:
"compiled.invoke)."
)
+ def dispatched_branches(self, state: Any) -> list[tuple[str, BranchSpec[Any]]]:
+ """Return the branches that dispatch for ``state``, in insertion
+ order: every declared branch whose ``when`` predicate is absent or
+ returns true (§11.10).
+
+ ``when`` MUST be a pure function of the dispatch-time parent state,
+ so this is deterministic and safe to evaluate both here (the
+ dispatch set) and at the NODE event's ``branch_count`` (the count of
+ branches that dispatch, which excludes ``when``-skipped branches)."""
+ return [
+ (branch_name, spec)
+ for branch_name, spec in self.branches.items()
+ if spec.when is None or spec.when(state)
+ ]
+
async def run_with_context(
self,
state: ParentT,
@@ -148,6 +186,17 @@ async def run_branch(branch_name: str, spec: BranchSpec[Any]) -> Mapping[str, An
# task body — not in the dispatcher loop.
token = _set_branch_name(branch_name)
try:
+ # A callable branch (§11.1.1) IS the unit of work: run the
+ # inline function (wrapped in its branch middleware) and
+ # return its parent-shaped partial directly — no subgraph,
+ # no inputs/outputs projection. The builder guarantees
+ # exactly one of ``call`` / ``subgraph`` per branch.
+ if spec.call is not None:
+ return await self._dispatch_callable_branch(
+ branch_name, spec.call, spec.middleware, state, context
+ )
+ assert spec.subgraph is not None # builder: exactly one of call/subgraph
+ subgraph = spec.subgraph
# Per §11.2 projection in: subgraph fields not in
# ``inputs`` use the subgraph's declared defaults;
# named subgraph fields are initialized from the
@@ -156,17 +205,17 @@ async def run_branch(branch_name: str, spec: BranchSpec[Any]) -> Mapping[str, An
init: dict[str, Any] = {}
for sub_field, parent_field in spec.inputs.items():
init[sub_field] = parent_dump[parent_field]
- initial = spec.subgraph.state_cls(**init)
+ initial = subgraph.state_cls(**init)
child_context = context.descend_into_parallel_branch(
parallel_branches_node_name=self.name,
parent_state=state,
- sub_attached=tuple(spec.subgraph._attached_observers), # noqa: SLF001
+ sub_attached=tuple(subgraph._attached_observers), # noqa: SLF001
branch_name=branch_name,
)
async def innermost(s: Any) -> Mapping[str, Any]:
- final_branch_state = await spec.subgraph._invoke(s, child_context) # noqa: SLF001
+ final_branch_state = await subgraph._invoke(s, child_context) # noqa: SLF001
# Branch middleware wraps the subgraph invocation
# (§11.7), so the chain operates in the branch
# subgraph's state space. Surface the ``outputs``
@@ -206,14 +255,29 @@ async def innermost(s: Any) -> Mapping[str, Any]:
finally:
_reset_branch_name(token)
- # Spawn one task per branch, in insertion order. Per §11.8
- # the dispatch order is the branches dict's insertion order;
- # ``started`` events from the inner subgraphs interleave
- # arbitrarily but the branch-level dispatch ordering is
- # deterministic.
+ # Conditional branches (§11.10): the branches that dispatch are
+ # those whose ``when`` admits the parent state the node received;
+ # a skipped branch runs no work, contributes nothing, and emits no
+ # observer events / span.
+ dispatching = self.dispatched_branches(state)
+
+ # All branches skipped (§11.10): a valid no-op — the node
+ # contributes nothing. Distinct from the compile-time
+ # ``parallel_branches_no_branches`` (an empty DECLARED mapping).
+ # Return early; ``asyncio.wait([])`` would raise.
+ if not dispatching:
+ if self.error_policy == "collect" and self.errors_field is not None:
+ return {self.errors_field: []}
+ return {}
+
+ # Spawn one task per dispatching branch, in insertion order. Per
+ # §11.8 the dispatch order is the branches dict's insertion order
+ # (over the branches that dispatch); ``started`` events from the
+ # inner subgraphs interleave arbitrarily but the branch-level
+ # dispatch ordering is deterministic.
ctx = contextvars.copy_context()
tasks: list[tuple[str, asyncio.Task[Mapping[str, Any]]]] = []
- for branch_name, spec in self.branches.items():
+ for branch_name, spec in dispatching:
task = asyncio.create_task(
run_branch(branch_name, spec),
context=ctx.copy(),
@@ -224,6 +288,91 @@ async def innermost(s: Any) -> Mapping[str, Any]:
return await self._fail_fast(state, tasks, contributions)
return await self._collect(state, tasks, contributions, errors)
+ async def _dispatch_callable_branch(
+ self,
+ branch_name: str,
+ call: BranchCallable,
+ middleware: tuple[Middleware, ...],
+ state: Any,
+ context: _InvocationContext,
+ ) -> Mapping[str, Any]:
+ """Run one inline-callable branch and return its contribution.
+
+ The callable reads the parent state and returns a parent-shaped
+ partial update directly — no subgraph, no ``inputs`` / ``outputs``
+ projection (§11.4). Branch ``middleware`` (e.g. a per-leg
+ ``FailureIsolationMiddleware``, §11.7) wraps the callable.
+
+ A callable branch has no inner nodes, so it IS the unit of work:
+ it emits one ``started`` / ``completed`` observer pair keyed by its
+ ``branch_name`` (graph-engine §6), which the observers render as the
+ branch's per-branch dispatch span with NO inner-node span beneath it
+ (observability §5.7). To render that way the pair is emitted at the
+ parallel-branches NODE's own namespace (not a descended branch
+ namespace), tagged with ``branch_name`` (set on the ContextVar in
+ ``run_branch``): an event at a pb-node's own namespace carrying a
+ ``branch_name`` and no ``parallel_branches_config`` is unambiguously
+ a callable branch, since a subgraph branch's inner-node events are
+ always one level deeper. A ``when``-skipped branch never reaches
+ here, so it emits nothing.
+ """
+ # Reuse the engine's NodeEvent construction (the static dispatch
+ # helpers) so a callable branch's §6 events carry the same lineage
+ # as the NODE's own events. Function-scope import keeps the textual
+ # cycle off the module graph (compiled.py imports parallel_branches
+ # at function scope too, so neither loads the other at import time).
+ from .compiled import CompiledGraph
+
+ node_namespace = context.namespace_prefix + (self.name,)
+ step = context.take_step()
+ # The pair carries the parallel-branches NODE's active attempt index
+ # (the same value the NODE's own event uses), so under node-level retry
+ # the §6 keying tuple and the Langfuse observation metadata report the
+ # right attempt. Read once at dispatch entry — before the branch chain,
+ # which may run its own retries — so the started/completed pair shares
+ # one value. Without retry this is 0 (the var's baseline).
+ attempt_index = current_attempt_index()
+ CompiledGraph._dispatch_started( # noqa: SLF001
+ context, branch_name, node_namespace, step, state, attempt_index=attempt_index
+ )
+ try:
+ # ``call`` is the chain's innermost. Its public type returns the
+ # broad ``Awaitable``; ``compose_chain`` wants the coroutine-
+ # returning ``ChainCall`` (the NextCall protocol shape). The two
+ # are await-compatible at runtime, so cast across the gap.
+ chain: ChainCall = compose_chain(middleware, cast("ChainCall", call))
+ partial = await chain(state)
+ except Exception as exc:
+ # The callable (or its middleware) raised unrecovered. Wrap as
+ # a NodeException (mirroring the subgraph form, where the inner
+ # ``_invoke`` wraps node raises) so the completed event carries
+ # a RuntimeGraphError and ``collect`` classifies it the same as
+ # a subgraph branch. The node's error policy then wraps this in
+ # ParallelBranchesBranchFailed (fail_fast) or records it
+ # (collect) — exactly as for a subgraph branch.
+ wrapped = NodeException(node_name=branch_name, cause=exc, recoverable_state=state)
+ CompiledGraph._dispatch_completed( # noqa: SLF001
+ context, branch_name, node_namespace, step, state, error=wrapped, attempt_index=attempt_index
+ )
+ raise wrapped from exc
+ # Success path — including a degraded update from a branch
+ # FailureIsolationMiddleware (§11.7), which "succeeds" from the
+ # node's view. The contribution is the returned partial directly
+ # (parent-shaped, no projection). ``post_state`` shows this
+ # branch's local effect on its input; the authoritative reducer
+ # merge across siblings is the NODE's completed event.
+ post_state = state.model_copy(update=dict(partial))
+ CompiledGraph._dispatch_completed( # noqa: SLF001
+ context,
+ branch_name,
+ node_namespace,
+ step,
+ state,
+ post_state=post_state,
+ attempt_index=attempt_index,
+ )
+ return partial
+
async def _fail_fast(
self,
parent_state: Any,
diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py
index be12b94..4523901 100644
--- a/src/openarmature/observability/langfuse/observer.py
+++ b/src/openarmature/observability/langfuse/observer.py
@@ -900,6 +900,19 @@ def _resolve_parent_observation_id(self, inv_state: _InvState, event: NodeEvent)
for key, observation in inv_state.open_observations.items():
if key[0] == prefix:
return observation.handle.id
+ # Proposal 0075: a callable parallel-branch's event sits at the pb
+ # NODE's own namespace (branch_name set, no parallel_branches_config),
+ # so it IS the unit — render it as a single observation parented under
+ # the NODE observation. The strict-ancestor fallback above misses the
+ # same-namespace NODE, so resolve it explicitly here.
+ if (
+ event.branch_name is not None
+ and event.parallel_branches_config is None
+ and event.namespace in inv_state.parallel_branches_parent_node_name
+ ):
+ for key, observation in inv_state.open_observations.items():
+ if key[0] == event.namespace and key[3] is None:
+ return observation.handle.id
return None
def _trace_id_for(
@@ -990,6 +1003,19 @@ def _sync_subgraph_observations(
):
self._open_fan_out_instance_dispatch_observation(inv_state, correlation_id, prefix, event)
continue
+ # A parallel-branches or fan-out NODE prefix already has its own
+ # leaf observation (from the NODE's own started event), unlike a
+ # transparent subgraph wrapper. Don't synthesize a duplicate
+ # subgraph wrapper observation over it; inner branch / instance
+ # events parent under the NODE observation via the
+ # _resolve_parent_observation_id leaf fallback. Mirrors the OTel
+ # observer's same guard (it skips the synthetic subgraph span at a
+ # pb / fan-out NODE depth for the same reason).
+ if (
+ prefix in inv_state.parallel_branches_parent_node_name
+ or prefix in inv_state.fan_out_parent_node_name
+ ):
+ continue
# Plain non-detached subgraph dispatch.
self._open_subgraph_observation(inv_state, correlation_id, prefix, event)
diff --git a/src/openarmature/observability/otel/observer.py b/src/openarmature/observability/otel/observer.py
index cb45ec9..364bee0 100644
--- a/src/openarmature/observability/otel/observer.py
+++ b/src/openarmature/observability/otel/observer.py
@@ -678,6 +678,16 @@ def prepare_sync(self, event: NodeEvent) -> None:
return
open_span = inv_state.open_spans.get(self._key_for(event))
if open_span is None:
+ # Proposal 0075: a callable parallel-branch has no leaf — its span
+ # is the per-branch dispatch span opened in ``_open_started_span``.
+ # Publish that so a provider call inside the callable nests under
+ # the branch span.
+ if event.branch_name is not None and event.parallel_branches_config is None:
+ dispatch = inv_state.parallel_branches_branch_spans.get(
+ event.namespace + (event.branch_name,)
+ )
+ if dispatch is not None:
+ _set_active_observer_span(dispatch.span)
return
# Publish the span to the engine via the ContextVar. Discard
# the Token — last-writer-wins is the documented contract
@@ -752,6 +762,28 @@ def _open_started_span(self, event: NodeEvent) -> None:
# Also closes subgraph spans we've left.
self._sync_subgraph_spans(inv_state, invocation_id, correlation_id, event)
+ # Proposal 0075 (observability §5.7): a callable parallel-branch emits
+ # its started/completed pair at the pb NODE's own namespace, tagged
+ # with branch_name and (unlike the NODE's own events) no
+ # parallel_branches_config. It IS the unit, so render it as the
+ # branch's per-branch dispatch span (keyed by branch_name, parented
+ # under the NODE span) with NO inner-node leaf. A subgraph branch's
+ # inner-node events are always one level deeper, so this never
+ # misfires for them. The dispatch span is closed when the NODE's own
+ # completed event fires (children-before-parents), like any branch
+ # dispatch span; this branch's completed event is then a no-op pop.
+ if (
+ event.branch_name is not None
+ and event.parallel_branches_config is None
+ and event.namespace in inv_state.parallel_branches_parent_node_name
+ ):
+ branch_key = event.namespace + (event.branch_name,)
+ if branch_key not in inv_state.parallel_branches_branch_spans:
+ self._open_parallel_branches_branch_dispatch_span(
+ inv_state, correlation_id, event.namespace, event
+ )
+ return
+
parent_ctx = self._resolve_parent_context(inv_state, invocation_id, event)
span = self._tracer.start_span(
name=event.node_name,
diff --git a/tests/conformance/adapter.py b/tests/conformance/adapter.py
index e5c9fe9..ee35fa2 100644
--- a/tests/conformance/adapter.py
+++ b/tests/conformance/adapter.py
@@ -1057,6 +1057,44 @@ def _add_fan_out_node(
)
+def _build_call_fn(
+ branch_name: str,
+ call_cfg: Mapping[str, Any],
+) -> Callable[[Any], Awaitable[Mapping[str, Any]]]:
+ """Translate a callable branch's ``call:`` directive (proposal 0075)
+ into an async function over the parent state.
+
+ Reuses the node-behavior factories (``update`` / ``flaky`` /
+ ``raises``) keyed by the same directive shape a plain node uses — a
+ callable branch IS just a function. The callable's own trace
+ recording is discarded into a throwaway sink: a parallel-branches
+ node records as a single dispatch step, so callable-branch bodies
+ must not appear in ``execution_order``.
+ """
+ sink: list[str] = []
+ if "update" in call_cfg:
+ return _make_update_fn(branch_name, call_cfg["update"], sink)
+ if "flaky" in call_cfg:
+ return _make_flaky_fn(branch_name, call_cfg["flaky"], sink)
+ if "raises" in call_cfg:
+ return _make_raising_fn(branch_name, call_cfg["raises"], sink)
+ raise ValueError(f"callable branch {branch_name!r}: unsupported call directive {dict(call_cfg)!r}")
+
+
+def _build_when_predicate(when_cfg: Mapping[str, Any]) -> Callable[[Any], bool]:
+ """Translate a branch ``when:`` directive (proposal 0075 §11.10) into
+ a parent-state predicate. Supports ``{field: }`` — a truthy
+ check on a parent-state field at dispatch time."""
+ if "field" in when_cfg:
+ field_name = cast("str", when_cfg["field"])
+
+ def predicate(state: Any) -> bool:
+ return bool(getattr(state, field_name))
+
+ return predicate
+ raise ValueError(f"unsupported when directive: {dict(when_cfg)!r}")
+
+
def _add_parallel_branches_node(
builder: GraphBuilder[Any],
node_name: str,
@@ -1069,20 +1107,31 @@ def _add_parallel_branches_node(
"""Translate a fixture's ``parallel_branches:`` block into a
``builder.add_parallel_branches_node`` call.
- Each branch's ``subgraph`` name resolves against the shared
- ``subgraphs`` registry (built from the fixture's top-level
- ``subgraphs:`` block). ``branch_middleware`` maps branch-name to a
- pre-translated middleware list; the test driver populates it from
- each branch's ``middleware:`` block.
+ A branch is either a ``subgraph`` (name resolved against the shared
+ ``subgraphs`` registry, with optional ``inputs`` / ``outputs``) or an
+ inline ``call`` (proposal 0075), and may carry a ``when`` predicate.
+ ``branch_middleware`` maps branch-name to a pre-translated middleware
+ list; the test driver populates it from each branch's ``middleware:``
+ block.
"""
branches_cfg = cast("dict[str, dict[str, Any]]", cfg["branches"])
branches: dict[str, BranchSpec[Any]] = {}
for branch_name, branch_cfg in branches_cfg.items():
+ when_cfg = branch_cfg.get("when")
+ when = _build_when_predicate(cast("Mapping[str, Any]", when_cfg)) if when_cfg is not None else None
+ if "call" in branch_cfg:
+ branches[branch_name] = BranchSpec(
+ call=_build_call_fn(branch_name, cast("Mapping[str, Any]", branch_cfg["call"])),
+ when=when,
+ middleware=tuple(branch_middleware.get(branch_name, ())),
+ )
+ continue
sub_compiled = subgraphs[branch_cfg["subgraph"]]
branches[branch_name] = BranchSpec(
subgraph=sub_compiled,
inputs=dict(branch_cfg.get("inputs") or {}),
outputs=dict(branch_cfg.get("outputs") or {}),
+ when=when,
middleware=tuple(branch_middleware.get(branch_name, ())),
)
diff --git a/tests/conformance/harness/directives.py b/tests/conformance/harness/directives.py
index cfe0696..d504614 100644
--- a/tests/conformance/harness/directives.py
+++ b/tests/conformance/harness/directives.py
@@ -308,11 +308,18 @@ def _normalize_instance_middleware(cls, data: Any) -> Any:
class ParallelBranchSpec(_AllowExtras):
"""One entry inside a ``parallel_branches.branches`` mapping.
- Permissive on extras because fixtures may carry extra knobs
- (e.g., per-branch annotations the harness ignores).
+ A branch's work is given by exactly one of ``subgraph`` (a compiled
+ subgraph referenced by name, with optional ``inputs`` / ``outputs``)
+ or ``call`` (an inline node-behavior directive — ``update`` / ``flaky``
+ / ``raises`` — run as a function over the parent state, proposal 0075).
+ An optional ``when`` directive (``{field: }``) skips the branch
+ at dispatch. Permissive on extras because fixtures may carry extra
+ knobs (e.g., per-branch annotations the harness ignores).
"""
- subgraph: str
+ subgraph: str | None = None
+ call: dict[str, Any] | None = None
+ when: dict[str, Any] | None = None
inputs: dict[str, str] | None = None
outputs: dict[str, str] | None = None
middleware: list[MiddlewareSpec] | None = None
diff --git a/tests/conformance/test_pipeline_utilities.py b/tests/conformance/test_pipeline_utilities.py
index e13fe65..88331cf 100644
--- a/tests/conformance/test_pipeline_utilities.py
+++ b/tests/conformance/test_pipeline_utilities.py
@@ -97,6 +97,13 @@ def _load(path: Path) -> dict[str, Any]:
# spec v0.63.1) is an FI-degrade fixture this runner drives.
_FAILURE_ISOLATION_FIXTURES = frozenset(range(58, 67)) | {68, 69, 71, 72}
+# Inline-callable parallel branches + conditional ``when`` (proposal 0075,
+# spec v0.66.0). These extend the parallel-branches harness (032-038) with
+# the ``call`` / ``when`` branch directives; 073 (two callable branches),
+# 074 (cases: when false skips / true dispatches), 075 (callable branch +
+# FailureIsolationMiddleware degrade).
+_CALLABLE_BRANCH_FIXTURES = frozenset({73, 74, 75})
+
def _fixture_paths() -> list[Path]:
paths = sorted(CONFORMANCE_DIR.glob("[0-9][0-9][0-9]-*.yaml"))
@@ -106,7 +113,11 @@ def _fixture_paths() -> list[Path]:
number = int(p.stem.split("-", 1)[0])
except ValueError:
continue
- if number <= _LAST_DRIVEN_FIXTURE or number in _FAILURE_ISOLATION_FIXTURES:
+ if (
+ number <= _LAST_DRIVEN_FIXTURE
+ or number in _FAILURE_ISOLATION_FIXTURES
+ or number in _CALLABLE_BRANCH_FIXTURES
+ ):
out.append(p)
return out
diff --git a/tests/test_smoke.py b/tests/test_smoke.py
index f847cc7..d85f3a3 100644
--- a/tests/test_smoke.py
+++ b/tests/test_smoke.py
@@ -9,7 +9,7 @@
def test_package_versions() -> None:
assert openarmature.__version__ == "0.14.0"
- assert openarmature.__spec_version__ == "0.65.0"
+ assert openarmature.__spec_version__ == "0.66.1"
def test_spec_version_matches_pyproject() -> None:
diff --git a/tests/unit/test_observability_langfuse.py b/tests/unit/test_observability_langfuse.py
index 0372ca0..e9cdeb0 100644
--- a/tests/unit/test_observability_langfuse.py
+++ b/tests/unit/test_observability_langfuse.py
@@ -1152,6 +1152,49 @@ def output_hook(state: Any) -> dict[str, Any]:
assert captured_output_state[0].outer_a_done is True
+async def test_parallel_branches_node_renders_no_duplicate_observation() -> None:
+ # Regression: a parallel-branches NODE emits its own started/completed
+ # pair, so it already has a leaf observation. The observer MUST NOT also
+ # synthesize a duplicate subgraph-wrapper observation at the node's
+ # namespace (the bug the OTel observer already guards against, now
+ # mirrored here). Each callable branch (proposal 0075) renders as a
+ # single observation parented under the one NODE observation.
+ from openarmature.graph import BranchSpec
+
+ async def vector(_s: _S) -> Any:
+ return {"trail": ["vector"]}
+
+ async def keyword(_s: _S) -> Any:
+ return {"trail": ["keyword"]}
+
+ graph = (
+ GraphBuilder(_S)
+ .add_parallel_branches_node(
+ "recall",
+ branches={
+ "vector": BranchSpec(call=vector),
+ "keyword": BranchSpec(call=keyword),
+ },
+ )
+ .add_edge("recall", END)
+ .set_entry("recall")
+ .compile()
+ )
+ graph, client, _ = _attach(graph)
+ await graph.invoke(_S())
+ await graph.drain()
+
+ trace = next(iter(client.traces.values()))
+ recall_obs = [o for o in trace.observations if o.name == "recall"]
+ assert len(recall_obs) == 1, f"expected one 'recall' observation, got {len(recall_obs)}"
+ node_id = recall_obs[0].id
+ for branch in ("vector", "keyword"):
+ branch_obs = [o for o in trace.observations if o.name == branch]
+ assert len(branch_obs) == 1, f"branch {branch!r}: expected one observation, got {len(branch_obs)}"
+ assert branch_obs[0].parent_observation_id == node_id
+ assert (branch_obs[0].metadata or {}).get("branch_name") == branch
+
+
# Spec §8.4.1 / proposal 0052: implementation attribution rows on
# every Langfuse Trace. The two rows source from the §5.1
# attributes; the always-emit invariant inherits from §5.1 so the
diff --git a/tests/unit/test_observability_otel.py b/tests/unit/test_observability_otel.py
index 52132e1..3a9448d 100644
--- a/tests/unit/test_observability_otel.py
+++ b/tests/unit/test_observability_otel.py
@@ -3341,3 +3341,86 @@ async def _mid_passthrough(_s: _MidS) -> dict[str, int]:
assert "openarmature.user.group" not in dict(inv_spans[0].attributes or {}), (
"invocation span MUST NOT receive augmenter's key when inside a fan-out instance"
)
+
+
+# ---------------------------------------------------------------------------
+# Callable parallel branches (proposal 0075, observability §5.7). Mirrors
+# spec conformance fixture 110 (otel-callable-branch-span), which is not yet
+# in the pinned submodule: a callable branch renders as ONE per-branch
+# dispatch span keyed by branch_name with NO inner-node spans; a when-skipped
+# branch emits no span.
+# ---------------------------------------------------------------------------
+
+
+class _CallableBranchState(State):
+ run_vector: bool = False
+ vector_result: int = 0
+ fts_result: int = 0
+ keyword_result: int = 0
+
+
+async def test_callable_branch_renders_one_dispatch_span_skipped_emits_none() -> None:
+ from openarmature.graph import BranchSpec
+
+ async def vector(_s: _CallableBranchState) -> dict[str, int]:
+ return {"vector_result": 1}
+
+ async def fts(_s: _CallableBranchState) -> dict[str, int]:
+ return {"fts_result": 2}
+
+ async def keyword(_s: _CallableBranchState) -> dict[str, int]:
+ return {"keyword_result": 3}
+
+ exporter = InMemorySpanExporter()
+ observer = OTelObserver(span_processor=SimpleSpanProcessor(exporter))
+ g = (
+ GraphBuilder(_CallableBranchState)
+ .set_entry("retrieve")
+ .add_parallel_branches_node(
+ "retrieve",
+ branches={
+ "vector": BranchSpec(call=vector, when=lambda s: s.run_vector),
+ "fts": BranchSpec(call=fts),
+ "keyword": BranchSpec(call=keyword),
+ },
+ error_policy="fail_fast",
+ )
+ .add_edge("retrieve", END)
+ .compile()
+ )
+ g.attach_observer(observer)
+ await cast("Any", g).invoke(_CallableBranchState()) # run_vector False -> vector skipped
+ await cast("Any", g).drain()
+
+ spans = exporter.get_finished_spans()
+
+ def _sid(span: ReadableSpan) -> int:
+ ctx = span.context
+ assert ctx is not None
+ return ctx.span_id
+
+ def _children(span: ReadableSpan) -> list[ReadableSpan]:
+ return [s for s in spans if s.parent is not None and s.parent.span_id == _sid(span)]
+
+ node_spans = [s for s in spans if s.name == "retrieve"]
+ assert len(node_spans) == 1
+ node = node_spans[0]
+
+ # The skipped `vector` branch emits NO span.
+ assert [s for s in spans if s.name == "vector"] == []
+
+ # Each dispatched callable branch -> exactly one dispatch span keyed by
+ # branch_name, carrying parent_node_name, parented under the NODE span,
+ # with NO inner-node spans (children == []).
+ for branch in ("fts", "keyword"):
+ branch_spans = [s for s in spans if s.name == branch]
+ assert len(branch_spans) == 1, f"branch {branch!r}: expected one span, got {len(branch_spans)}"
+ bs = branch_spans[0]
+ attrs = dict(bs.attributes or {})
+ assert attrs.get("openarmature.node.branch_name") == branch
+ assert attrs.get("openarmature.parallel_branches.parent_node_name") == "retrieve"
+ assert bs.parent is not None and bs.parent.span_id == _sid(node)
+ assert _children(bs) == []
+
+ # The NODE span's children are exactly the two dispatched branch spans.
+ assert sorted(c.name for c in _children(node)) == ["fts", "keyword"]
diff --git a/tests/unit/test_parallel_branches.py b/tests/unit/test_parallel_branches.py
index 3637563..f2a45f3 100644
--- a/tests/unit/test_parallel_branches.py
+++ b/tests/unit/test_parallel_branches.py
@@ -31,9 +31,13 @@
END,
BranchSpec,
CompiledGraph,
+ FailureIsolatedEvent,
GraphBuilder,
MappingReferencesUndeclaredField,
+ NodeEvent,
+ ObserverEvent,
ParallelBranchesBranchFailed,
+ ParallelBranchesInvalidBranchSpec,
ParallelBranchesNoBranches,
State,
append,
@@ -69,6 +73,12 @@ class ParentState(State):
gamma_result: int = 0
+class ConditionalState(State):
+ run_vector: bool = False
+ alpha_result: int = 0
+ beta_result: int = 0
+
+
def _build_alpha_succeeds() -> CompiledGraph[AlphaState]:
async def a(_state: AlphaState) -> Mapping[str, Any]:
return {"a_out": 1}
@@ -629,3 +639,443 @@ async def test_fail_fast_cancellation_drain_absorbs_residual_exceptions() -> Non
# picks deterministically from the FIRST_EXCEPTION wait — one of
# the two branches surfaces.
assert excinfo.value.branch_name in {"alpha", "beta"}
+
+
+# ---------------------------------------------------------------------------
+# Inline-callable branches + conditional ``when`` (proposal 0075, §11.1.1 /
+# §11.4 / §11.10)
+# ---------------------------------------------------------------------------
+
+
+class _CategorizedError(RuntimeError):
+ """A test exception carrying a ``category`` so the cause-chain
+ classifier (and a branch FailureIsolationMiddleware's ``catch``)
+ resolves it the way the framework resolves engine-categorized errors."""
+
+ def __init__(self, message: str, category: str) -> None:
+ super().__init__(message)
+ self.category = category
+
+
+async def _capture_events(events: list[ObserverEvent]) -> Any:
+ """Build an observer that appends every delivered event to ``events``."""
+
+ async def observe(event: ObserverEvent) -> None:
+ events.append(event)
+
+ return observe
+
+
+async def test_callable_branches_merge_to_disjoint_parent_fields() -> None:
+ # §11.1.1 / §11.4: two inline-callable branches (no subgraph, no
+ # projection) run concurrently; each returns a parent-shaped partial
+ # that merges into a disjoint parent field via the parent reducer.
+ async def vector(_state: ParentState) -> Mapping[str, Any]:
+ return {"alpha_result": 1}
+
+ async def fts(_state: ParentState) -> Mapping[str, Any]:
+ return {"beta_result": 2}
+
+ compiled = (
+ GraphBuilder(ParentState)
+ .set_entry("retrieve")
+ .add_parallel_branches_node(
+ "retrieve",
+ branches={
+ "vector": BranchSpec(call=vector),
+ "fts": BranchSpec(call=fts),
+ },
+ )
+ .add_edge("retrieve", END)
+ .compile()
+ )
+ final = await compiled.invoke(ParentState())
+ await compiled.drain()
+ assert final.alpha_result == 1
+ assert final.beta_result == 2
+
+
+async def test_callable_branch_emits_one_started_completed_pair_keyed_by_branch_name() -> None:
+ # graph-engine §6 / observability §5.7: a callable branch has no inner
+ # nodes, so it IS the unit — it emits exactly one started/completed pair
+ # keyed by its branch_name (node_name == branch_name), not the per-node
+ # stream a subgraph branch produces.
+ events: list[ObserverEvent] = []
+
+ async def vector(_state: ParentState) -> Mapping[str, Any]:
+ return {"alpha_result": 7}
+
+ compiled = (
+ GraphBuilder(ParentState)
+ .set_entry("retrieve")
+ .add_parallel_branches_node("retrieve", branches={"vector": BranchSpec(call=vector)})
+ .add_edge("retrieve", END)
+ .compile()
+ )
+ compiled.attach_observer(await _capture_events(events))
+ await compiled.invoke(ParentState())
+ await compiled.drain()
+
+ branch_events = [e for e in events if isinstance(e, NodeEvent) and e.branch_name == "vector"]
+ assert [e.phase for e in branch_events] == ["started", "completed"]
+ assert all(e.node_name == "vector" for e in branch_events)
+ # Emitted at the pb NODE's own namespace (not a descended branch
+ # namespace), carrying no parallel_branches_config — that shape (a
+ # branch_name at the node's namespace, no config) is what identifies a
+ # callable branch to the observers, which render it as the branch's
+ # single dispatch span with no inner-node span.
+ assert all(e.namespace == ("retrieve",) for e in branch_events)
+ assert all(e.parallel_branches_config is None for e in branch_events)
+ completed = branch_events[1]
+ assert completed.error is None
+ assert isinstance(completed.post_state, ParentState)
+ assert completed.post_state.alpha_result == 7
+
+
+async def test_node_event_branch_count_excludes_when_skipped_branches() -> None:
+ # Proposal 0075: the NODE event's parallel_branches_config.branch_count is
+ # the number of branches that DISPATCH (when-skipped branches excluded),
+ # while branch_names stays the full declared set. The two answer different
+ # questions ("how many ran" vs "what was declared").
+ events: list[ObserverEvent] = []
+
+ async def vector(_state: ConditionalState) -> Mapping[str, Any]:
+ return {"alpha_result": 1}
+
+ async def fts(_state: ConditionalState) -> Mapping[str, Any]:
+ return {"beta_result": 2}
+
+ compiled = (
+ GraphBuilder(ConditionalState)
+ .set_entry("retrieve")
+ .add_parallel_branches_node(
+ "retrieve",
+ branches={
+ "vector": BranchSpec(call=vector, when=lambda s: s.run_vector),
+ "fts": BranchSpec(call=fts),
+ },
+ )
+ .add_edge("retrieve", END)
+ .compile()
+ )
+ compiled.attach_observer(await _capture_events(events))
+ await compiled.invoke(ConditionalState(run_vector=False)) # vector skipped
+ await compiled.drain()
+
+ node_started = next(
+ e
+ for e in events
+ if isinstance(e, NodeEvent)
+ and e.node_name == "retrieve"
+ and e.phase == "started"
+ and e.parallel_branches_config is not None
+ )
+ config = node_started.parallel_branches_config
+ assert config is not None
+ assert config.branch_count == 1 # only fts dispatched
+ assert config.branch_names == ("vector", "fts") # full declared set, insertion order
+
+
+async def test_callable_branch_event_attempt_index_tracks_node_retry() -> None:
+ # PR #175 review: under node-level retry the callable branch's
+ # started/completed pair carries the NODE's active attempt index (the same
+ # value the NODE's own event uses), not a hardcoded 0. A flaky callable
+ # fails the first node attempt and succeeds on the retry.
+ events: list[ObserverEvent] = []
+ calls = {"n": 0}
+
+ async def flaky(_state: ParentState) -> Mapping[str, Any]:
+ calls["n"] += 1
+ if calls["n"] == 1:
+ raise RuntimeError("transient")
+ return {"alpha_result": 1}
+
+ node_retry = RetryMiddleware(
+ RetryConfig(
+ max_attempts=2,
+ classifier=lambda _exc, _state: True, # retry any failure
+ backoff=deterministic_backoff(0.0),
+ )
+ )
+ compiled = (
+ GraphBuilder(ParentState)
+ .set_entry("retrieve")
+ .add_parallel_branches_node(
+ "retrieve",
+ branches={"vector": BranchSpec(call=flaky)},
+ middleware=[node_retry],
+ )
+ .add_edge("retrieve", END)
+ .compile()
+ )
+ compiled.attach_observer(await _capture_events(events))
+ final = await compiled.invoke(ParentState())
+ await compiled.drain()
+
+ assert final.alpha_result == 1 # succeeded on the second node attempt
+ branch_started = [
+ e for e in events if isinstance(e, NodeEvent) and e.branch_name == "vector" and e.phase == "started"
+ ]
+ assert [e.attempt_index for e in branch_started] == [0, 1]
+
+
+async def test_when_false_skips_branch_no_contribution_no_event() -> None:
+ # §11.10: a branch whose ``when`` returns false is skipped entirely —
+ # not dispatched, no contribution (its field stays at the default), and
+ # no observer events. The sibling with no ``when`` runs normally.
+ events: list[ObserverEvent] = []
+
+ async def vector(_state: ParentState) -> Mapping[str, Any]:
+ return {"alpha_result": 1}
+
+ async def fts(_state: ParentState) -> Mapping[str, Any]:
+ return {"beta_result": 2}
+
+ compiled = (
+ GraphBuilder(ParentState)
+ .set_entry("retrieve")
+ .add_parallel_branches_node(
+ "retrieve",
+ branches={
+ "vector": BranchSpec(call=vector, when=lambda _s: False),
+ "fts": BranchSpec(call=fts),
+ },
+ )
+ .add_edge("retrieve", END)
+ .compile()
+ )
+ compiled.attach_observer(await _capture_events(events))
+ final = await compiled.invoke(ParentState())
+ await compiled.drain()
+
+ assert final.alpha_result == 0 # skipped -> no contribution -> default
+ assert final.beta_result == 2
+ assert [e for e in events if isinstance(e, NodeEvent) and e.branch_name == "vector"] == []
+
+
+async def test_when_true_dispatches_branch() -> None:
+ # §11.10: a ``when`` reading dispatch-time parent state; true dispatches.
+ async def vector(_state: ConditionalState) -> Mapping[str, Any]:
+ return {"alpha_result": 1}
+
+ compiled = (
+ GraphBuilder(ConditionalState)
+ .set_entry("retrieve")
+ .add_parallel_branches_node(
+ "retrieve",
+ branches={"vector": BranchSpec(call=vector, when=lambda s: s.run_vector)},
+ )
+ .add_edge("retrieve", END)
+ .compile()
+ )
+ final = await compiled.invoke(ConditionalState(run_vector=True))
+ await compiled.drain()
+ assert final.alpha_result == 1
+
+
+async def test_callable_branch_failure_isolation_degrades_and_emits_event() -> None:
+ # §11.7: per-leg failure isolation on a callable branch is the existing
+ # branch-middleware contract. A callable that raises a categorized error
+ # is caught by its FailureIsolationMiddleware, degrades to the configured
+ # update, and emits a FailureIsolatedEvent whose category resolves to the
+ # originating error; the degraded branch "succeeds", so fail_fast is NOT
+ # triggered and the sibling completes.
+ events: list[ObserverEvent] = []
+
+ async def vector(_state: ParentState) -> Mapping[str, Any]:
+ raise _CategorizedError("vector store down", "provider_unavailable")
+
+ async def fts(_state: ParentState) -> Mapping[str, Any]:
+ return {"beta_result": 2}
+
+ isolation = FailureIsolationMiddleware(
+ degraded_update={"alpha_result": -1},
+ event_name="vector_isolated",
+ )
+ compiled = (
+ GraphBuilder(ParentState)
+ .set_entry("retrieve")
+ .add_parallel_branches_node(
+ "retrieve",
+ branches={
+ "vector": BranchSpec(call=vector, middleware=(isolation,)),
+ "fts": BranchSpec(call=fts),
+ },
+ error_policy="fail_fast",
+ )
+ .add_edge("retrieve", END)
+ .compile()
+ )
+ compiled.attach_observer(await _capture_events(events))
+ final = await compiled.invoke(ParentState())
+ await compiled.drain()
+
+ assert final.alpha_result == -1
+ assert final.beta_result == 2
+ iso = [e for e in events if isinstance(e, FailureIsolatedEvent)]
+ assert len(iso) == 1
+ assert iso[0].event_name == "vector_isolated"
+ assert iso[0].caught_exception.category == "provider_unavailable"
+
+
+async def test_mixed_subgraph_and_callable_branches() -> None:
+ # §11.1.1: a node MAY mix subgraph branches and callable branches freely.
+ async def fts(_state: ParentState) -> Mapping[str, Any]:
+ return {"beta_result": 2}
+
+ compiled = (
+ GraphBuilder(ParentState)
+ .set_entry("retrieve")
+ .add_parallel_branches_node(
+ "retrieve",
+ branches={
+ "alpha": BranchSpec(
+ subgraph=_build_alpha_succeeds(),
+ outputs={"alpha_result": "a_out"},
+ ),
+ "fts": BranchSpec(call=fts),
+ },
+ )
+ .add_edge("retrieve", END)
+ .compile()
+ )
+ final = await compiled.invoke(ParentState())
+ await compiled.drain()
+ assert final.alpha_result == 1
+ assert final.beta_result == 2
+
+
+async def test_all_branches_skipped_is_noop() -> None:
+ # §11.10: all-skipped is a valid no-op — the node contributes nothing
+ # and the parent state is unchanged. Distinct from the compile-time
+ # parallel_branches_no_branches (an empty DECLARED mapping).
+ async def vector(_state: ParentState) -> Mapping[str, Any]:
+ return {"alpha_result": 1}
+
+ compiled = (
+ GraphBuilder(ParentState)
+ .set_entry("retrieve")
+ .add_parallel_branches_node(
+ "retrieve",
+ branches={"vector": BranchSpec(call=vector, when=lambda _s: False)},
+ )
+ .add_edge("retrieve", END)
+ .compile()
+ )
+ final = await compiled.invoke(ParentState(alpha_result=99))
+ await compiled.drain()
+ assert final.alpha_result == 99 # untouched no-op
+
+
+async def test_all_branches_skipped_collect_sets_empty_errors_field() -> None:
+ # §11.10 under collect: an all-skipped node still completes; with an
+ # errors_field declared, no branch ran so the field is the empty list.
+ async def vector(_state: ParentWithErrors) -> Mapping[str, Any]:
+ return {"alpha_result": 1}
+
+ compiled = (
+ GraphBuilder(ParentWithErrors)
+ .set_entry("retrieve")
+ .add_parallel_branches_node(
+ "retrieve",
+ branches={"vector": BranchSpec(call=vector, when=lambda _s: False)},
+ error_policy="collect",
+ errors_field="branch_errors",
+ )
+ .add_edge("retrieve", END)
+ .compile()
+ )
+ final = await compiled.invoke(ParentWithErrors())
+ await compiled.drain()
+ assert final.alpha_result == 0
+ assert final.branch_errors == []
+
+
+async def test_callable_branch_unisolated_failure_fail_fast() -> None:
+ # A callable branch that raises with no isolating middleware propagates
+ # like a subgraph branch: wrapped as ParallelBranchesBranchFailed
+ # carrying the branch_name, with the originating error in the chain.
+ async def vector(_state: ParentState) -> Mapping[str, Any]:
+ raise _CategorizedError("boom", "provider_unavailable")
+
+ compiled = (
+ GraphBuilder(ParentState)
+ .set_entry("retrieve")
+ .add_parallel_branches_node(
+ "retrieve",
+ branches={"vector": BranchSpec(call=vector)},
+ error_policy="fail_fast",
+ )
+ .add_edge("retrieve", END)
+ .compile()
+ )
+ with pytest.raises(ParallelBranchesBranchFailed) as excinfo:
+ await compiled.invoke(ParentState())
+ await compiled.drain()
+ assert excinfo.value.branch_name == "vector"
+
+
+async def test_callable_branch_failure_collect_records_error() -> None:
+ # Under collect, a failing callable branch records into errors_field
+ # exactly like a subgraph branch (node_exception category over the wrap),
+ # and the sibling's contribution still merges.
+ async def vector(_state: ParentWithErrors) -> Mapping[str, Any]:
+ raise _CategorizedError("boom", "provider_unavailable")
+
+ async def fts(_state: ParentWithErrors) -> Mapping[str, Any]:
+ return {"beta_result": 2}
+
+ compiled = (
+ GraphBuilder(ParentWithErrors)
+ .set_entry("retrieve")
+ .add_parallel_branches_node(
+ "retrieve",
+ branches={
+ "vector": BranchSpec(call=vector),
+ "fts": BranchSpec(call=fts),
+ },
+ error_policy="collect",
+ errors_field="branch_errors",
+ )
+ .add_edge("retrieve", END)
+ .compile()
+ )
+ final = await compiled.invoke(ParentWithErrors())
+ await compiled.drain()
+ assert final.beta_result == 2
+ assert len(final.branch_errors) == 1
+ assert final.branch_errors[0]["branch_name"] == "vector"
+
+
+# --- builder validation: exactly one of subgraph / call (§11.1.1) ---
+
+
+def test_branch_with_both_subgraph_and_call_rejected() -> None:
+ async def vector(_state: ParentState) -> Mapping[str, Any]:
+ return {"alpha_result": 1}
+
+ builder: GraphBuilder[ParentState] = GraphBuilder(ParentState)
+ with pytest.raises(ParallelBranchesInvalidBranchSpec) as excinfo:
+ builder.add_parallel_branches_node(
+ "retrieve",
+ branches={"vector": BranchSpec(subgraph=_build_alpha_succeeds(), call=vector)},
+ )
+ assert excinfo.value.category == "parallel_branches_invalid_branch_spec"
+
+
+def test_branch_with_neither_subgraph_nor_call_rejected() -> None:
+ builder: GraphBuilder[ParentState] = GraphBuilder(ParentState)
+ with pytest.raises(ParallelBranchesInvalidBranchSpec):
+ builder.add_parallel_branches_node("retrieve", branches={"vector": BranchSpec()})
+
+
+def test_callable_branch_with_inputs_or_outputs_rejected() -> None:
+ async def vector(_state: ParentState) -> Mapping[str, Any]:
+ return {"alpha_result": 1}
+
+ builder: GraphBuilder[ParentState] = GraphBuilder(ParentState)
+ with pytest.raises(ParallelBranchesInvalidBranchSpec):
+ builder.add_parallel_branches_node(
+ "retrieve",
+ branches={"vector": BranchSpec(call=vector, outputs={"alpha_result": "x"})},
+ )