Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The
### Fixed

- **`current_fan_out_index()` inside fan-out instance middleware** now returns the executing instance's index (and `current_fan_out_index_chain()` its lineage) instead of `None`. The engine set the fan-out lineage ContextVars per-node, inside the inner subgraph, which left them unset in `instance_middleware` that wraps the subgraph from outside; they are now set around the instance-middleware chain. The documented `instance_middleware` use (`RetryMiddleware`) does not read the index, so no shipped behavior changes. This corrects the value seen by custom instance middleware that reads the index or calls `set_invocation_metadata`.
- **Langfuse per-branch dispatch-span observation** (observability §4.3 / §8.4.2, proposals 0042 / 0044). The Langfuse observer now synthesizes a per-branch Span observation under a `parallel_branches` dispatcher node, so each branch's inner observations nest under their own branch span (a three-level dispatcher / per-branch-span / inner-nodes tree) instead of parenting directly under the dispatcher. The per-branch observation carries the OA-emitted `branch_name` alongside the caller baseline metadata and any per-branch augmentation, and the Generation observation now carries `branch_name` too. The OTel observer already produced this shape (proposal 0044 shipped OTel-only in v0.11.0); this brings the Langfuse mapping into line. Callable branches (proposal 0075) are unchanged.

## [0.15.0] — 2026-06-22

Expand Down
124 changes: 114 additions & 10 deletions src/openarmature/observability/langfuse/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,20 @@ class _InvState:
parallel_branches_parent_node_name: dict[tuple[str, ...], str] = field(
default_factory=dict[tuple[str, ...], str]
)
# Per proposal 0044: per-branch dispatch-span observations synthesized from
# the first inner event of each branch, keyed ``prefix + (branch_name,)``
# (prefix = the parallel-branches NODE namespace). Inner branch-node
# observations parent under this dispatch instead of the shared pb NODE
# span; closed when the pb NODE's completed event fires.
parallel_branches_branch_spans: dict[tuple[str, ...], _OpenObservation] = field(
default_factory=dict[tuple[str, ...], _OpenObservation]
)
# Declared branch-name set per pb NODE namespace (from the NODE's
# parallel_branches_config), so an inner branch event matches only the node
# that actually declares its branch.
parallel_branches_branch_names: dict[tuple[str, ...], frozenset[str]] = field(
default_factory=dict[tuple[str, ...], frozenset[str]]
)
# Side-cache: accumulator for `metadata.detached_child_trace_ids`
# on dispatch observations that spawn detached children. Keyed by
# the dispatch observation's prefix (the fan-out node's namespace,
Expand Down Expand Up @@ -475,6 +489,9 @@ def _open_started_observation(self, event: NodeEvent) -> None:
inv_state.parallel_branches_parent_node_name[event.namespace] = (
event.parallel_branches_config.parent_node_name
)
inv_state.parallel_branches_branch_names[event.namespace] = frozenset(
event.parallel_branches_config.branch_names
)

key = self._key_for(event)
if key in inv_state.open_observations:
Expand Down Expand Up @@ -541,10 +558,15 @@ def _handle_completed(self, event: NodeEvent) -> None:
# rather than appending to the previous iteration's
# accumulator and overwriting the prior link metadata.
inv_state.detached_child_trace_ids.pop(event.namespace, None)
# Per proposal 0045: clean up the pb cache on a pb NODE's own
# completion. Same shape as the fan-out cleanup above.
# Per proposals 0044/0045: on a pb NODE's own completion, close the
# per-branch dispatch observations synthesized for it (children-before-
# parents) and clear the pb caches. Same shape as the fan-out cleanup.
if event.parallel_branches_config is not None:
for prefix in list(inv_state.parallel_branches_branch_spans.keys()):
if len(prefix) > len(event.namespace) and prefix[: len(event.namespace)] == event.namespace:
self._close_parallel_branches_branch_dispatch_observation(inv_state, prefix)
inv_state.parallel_branches_parent_node_name.pop(event.namespace, None)
inv_state.parallel_branches_branch_names.pop(event.namespace, None)

key = self._key_for(event)
observation = inv_state.open_observations.pop(key, None)
Expand Down Expand Up @@ -892,6 +914,14 @@ def _resolve_parent_observation_id(self, inv_state: _InvState, event: NodeEvent)
# 3. Leaf node observation at any matching ancestor prefix,
# walked longest-first.
# 4. None — the Trace itself becomes the implicit parent.
# Per proposal 0044: an inner branch node parents under its per-branch
# dispatch observation (longest-first; innermost branch wins).
if event.branch_name is not None:
for prefix_len in range(len(event.namespace) - 1, 0, -1):
prefix = event.namespace[:prefix_len]
branch_dispatch = inv_state.parallel_branches_branch_spans.get(prefix + (event.branch_name,))
if branch_dispatch is not None:
return branch_dispatch.handle.id
if event.fan_out_index is not None and event.namespace:
instance_key = event.namespace[:1] + (str(event.fan_out_index),)
dispatch = inv_state.fan_out_instance_observations.get(instance_key)
Expand Down Expand Up @@ -1014,6 +1044,24 @@ def _sync_subgraph_observations(
):
self._open_fan_out_instance_dispatch_observation(inv_state, correlation_id, prefix, event)
continue
# Per proposal 0044: synthesize a per-branch dispatch observation
# under the pb NODE for an inner branch event, so inner branch
# nodes parent under it rather than the shared pb NODE span. Mirror
# of the fan-out per-instance arm above; gated on the branch
# belonging to the pb node declared at this prefix.
if (
event.branch_name is not None
and prefix in inv_state.parallel_branches_parent_node_name
and event.branch_name in inv_state.parallel_branches_branch_names.get(prefix, frozenset())
):
# Synthesize once per branch: _sync runs on every inner node's
# started event, so guard against re-opening (a second open would
# orphan the first observation and split the branch's nodes).
if prefix + (event.branch_name,) not in inv_state.parallel_branches_branch_spans:
self._open_parallel_branches_branch_dispatch_observation(
inv_state, correlation_id, prefix, event
)
continue
# A parallel-branches or fan-out NODE prefix already has its own
# leaf observation (from the NODE's own started event), unlike a
# transparent subgraph wrapper. Don't synthesize a duplicate
Expand Down Expand Up @@ -1085,7 +1133,7 @@ def _open_fan_out_instance_dispatch_observation(
) -> None:
# Non-detached per-instance dispatch lives in the parent
# Trace under the fan-out node's own Span observation.
fan_out_open = self._find_fan_out_node_observation(inv_state, prefix)
fan_out_open = self._find_node_observation(inv_state, prefix)
parent_observation_id = fan_out_open.handle.id if fan_out_open is not None else None
parent_node_name = inv_state.fan_out_parent_node_name.get(prefix, prefix[-1])
Comment thread
chris-colinsky marked this conversation as resolved.
# Per-instance dispatch is synthesized from the first inner
Expand Down Expand Up @@ -1117,6 +1165,46 @@ def _open_fan_out_instance_dispatch_observation(
branch_name_chain=event.branch_name_chain[:chain_len],
)

def _open_parallel_branches_branch_dispatch_observation(
self,
inv_state: _InvState,
correlation_id: str | None,
prefix: tuple[str, ...],
event: NodeEvent,
) -> None:
# Per-branch dispatch lives under the parallel-branches NODE's own Span
# observation (mirror of the fan-out per-instance dispatch).
pb_open = self._find_node_observation(inv_state, prefix)
parent_observation_id = pb_open.handle.id if pb_open is not None else None
parent_node_name = inv_state.parallel_branches_parent_node_name.get(prefix, prefix[-1])
Comment thread
chris-colinsky marked this conversation as resolved.
# Synthesized from the first inner event in the branch subtree; inherit
# scalar metadata from it. branch_name is the OA-emitted §8.4.2 row; the
# caller's branchName augmentation rides in via the caller metadata.
metadata: dict[str, Any] = {
"namespace": list(prefix),
"step": event.step,
"attempt_index": 0,
"parallel_branches_parent_node_name": parent_node_name,
"branch_name": event.branch_name,
"subgraph_name": _subgraph_identity_at(event, len(prefix)),
}
if correlation_id is not None:
metadata["correlation_id"] = correlation_id
_apply_caller_metadata(metadata, event.caller_invocation_metadata)
handle = self.client.span(
trace_id=inv_state.trace_id,
name=event.branch_name,
metadata=metadata,
parent_observation_id=parent_observation_id,
)
branch_key = prefix + (cast("str", event.branch_name),)
chain_len = len(prefix)
inv_state.parallel_branches_branch_spans[branch_key] = _OpenObservation(
handle=handle,
fan_out_index_chain=event.fan_out_index_chain[:chain_len],
branch_name_chain=event.branch_name_chain[:chain_len],
)

def _open_detached_subgraph_trace(
self,
inv_state: _InvState,
Expand Down Expand Up @@ -1270,7 +1358,7 @@ def _open_detached_fan_out_instance_trace(
# instance's entry.
ids_list = inv_state.detached_child_trace_ids.setdefault(prefix, [])
ids_list.append(detached_trace_id)
fan_out_open = self._find_fan_out_node_observation(inv_state, prefix)
fan_out_open = self._find_node_observation(inv_state, prefix)
if fan_out_open is not None:
# `detached: True` per §8.4.2 (proposal 0042) — the
Comment thread
chris-colinsky marked this conversation as resolved.
# parent-side fan-out node observation marks itself when
Expand Down Expand Up @@ -1341,14 +1429,23 @@ def _close_fan_out_instance_dispatch_observation(
return
observation.handle.end()

def _find_fan_out_node_observation(
def _close_parallel_branches_branch_dispatch_observation(
self, inv_state: _InvState, prefix: tuple[str, ...]
) -> None:
observation = inv_state.parallel_branches_branch_spans.pop(prefix, None)
if observation is None:
return
observation.handle.end()

def _find_node_observation(
self, inv_state: _InvState, prefix: tuple[str, ...]
) -> _OpenObservation | None:
# Find the fan-out node's open leaf observation at the given
# prefix. Retry middleware wrapping a fan-out bumps the
# attempt_index; this scans for any entry at ``prefix`` with
# ``fan_out_index is None``. Only one such entry is open at a
# time (retry opens and closes within an attempt's lifecycle).
# Find a NODE's own open leaf observation at the given prefix (the
# fan-out or parallel-branches NODE, whose per-instance / per-branch
Comment thread
chris-colinsky marked this conversation as resolved.
# dispatches parent under it). Retry middleware wrapping the node bumps
# the attempt_index; this scans for any entry at ``prefix`` with
# ``fan_out_index is None``. Only one such entry is open at a time
# (retry opens and closes within an attempt's lifecycle).
for key, observation in inv_state.open_observations.items():
if key[0] == prefix and key[2] is None:
return observation
Expand Down Expand Up @@ -1719,6 +1816,13 @@ def _typed_event_metadata(
metadata: dict[str, Any] = {}
if correlation_id is not None:
metadata["correlation_id"] = correlation_id
# §8.4.2: the OA-emitted fan_out_index / branch_name scoping rows,
# mirroring _observation_metadata so a Generation inside a fan-out
# instance or branch carries the same scoping as its node observation.
if event.fan_out_index is not None:
metadata["fan_out_index"] = event.fan_out_index
if event.branch_name is not None:
metadata["branch_name"] = event.branch_name
metadata["system"] = event.provider
active_prompt = event.active_prompt
if active_prompt is not None:
Expand Down
10 changes: 2 additions & 8 deletions tests/conformance/test_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
"024-langfuse-prompt-linkage",
"027-langfuse-caller-supplied-metadata",
"029-caller-metadata-fan-out-per-instance",
"030-caller-metadata-parallel-branches-per-branch",
"031-langfuse-subgraph-span-hierarchy",
"032-langfuse-fan-out-per-instance-spans",
"033-langfuse-detached-trace-mode",
Expand All @@ -313,14 +314,7 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
for fixture_ids, reason in (
# The Langfuse-mapping fixtures are fixture-tested by the sibling
# conformance runner test_observability_langfuse.py -- see
# _LANGFUSE_HARNESS_FIXTURES, NOT here (they are not unit-only). 030 stays
# below: deferred in that file (needs a LangfuseObserver per-branch
# dispatch-span src change), unit-tested for now.
(
("030-caller-metadata-parallel-branches-per-branch",),
"proposal 0040 per-branch caller metadata; covered by "
"test_observability_otel.py; deferred in test_observability_langfuse.py",
),
# _LANGFUSE_HARNESS_FIXTURES, NOT here (they are not unit-only).
(
(
"043-get-invocation-metadata-roundtrip",
Expand Down
15 changes: 5 additions & 10 deletions tests/conformance/test_observability_langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,11 @@
# collected results -- and augment_metadata_from_field drives the
# per-instance set_invocation_metadata.
"029-caller-metadata-fan-out-per-instance",
# 030 (parallel-branches per-branch) stays deferred: a SRC gap, not a
# harness one. The expected trace needs a per-branch dispatch-span
# observation (inner branch spans parent under it), which §4.3 + §8.4.2
# (proposal 0042 observation.metadata.branch_name) + proposal 0044
# mandate. The OTel observer synthesizes it
# (parallel_branches_branch_spans); the LangfuseObserver does not yet
# (0044 shipped OTel-only in v0.11.0). Un-defers once that synthesis is
# ported to the Langfuse observer (a src change). Sibling-skip behavior
# IS verified by the OTel unit test
# ``test_metadata_augmentation_in_parallel_branches_skips_sibling``.
# 030 (parallel-branches per-branch): the LangfuseObserver now
# synthesizes the per-branch dispatch-span observation (§4.3 + §8.4.2 +
# proposal 0044) that inner branch nodes parent under, ported from the
# OTel observer's parallel_branches_branch_spans machinery.
"030-caller-metadata-parallel-branches-per-branch",
# 039 (nested-lineage augmentation, proposal 0045) stays deferred: the
# three cases need harness extensions the existing primitives lack.
# Cases 1 + 3 (nested fan-out / fan-out-in-serial) need the fan-out
Expand Down
51 changes: 51 additions & 0 deletions tests/unit/test_observability_langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,57 @@ async def keyword(_s: _S) -> Any:
assert (branch_obs[0].metadata or {}).get("branch_name") == branch


async def test_parallel_branches_subgraph_branch_one_dispatch_observation() -> None:
# A subgraph branch with multiple inner nodes synthesizes exactly ONE
# per-branch dispatch observation; both inner nodes parent under it (not a
# fresh dispatch per inner started event). Guards the proposal-0044
# synthesis idempotency.
from openarmature.graph import BranchSpec

class _MultiBranchState(State):
a: str = ""
b: str = ""

class _MultiParentState(State):
out: str = ""

async def _node_a(_s: _MultiBranchState) -> dict[str, Any]:
return {"a": "a"}

async def _node_b(_s: _MultiBranchState) -> dict[str, Any]:
return {"b": "b"}

branch = (
GraphBuilder(_MultiBranchState)
.add_node("node_a", _node_a)
.add_node("node_b", _node_b)
.add_edge("node_a", "node_b")
.add_edge("node_b", END)
.set_entry("node_a")
.compile()
)
graph = (
GraphBuilder(_MultiParentState)
.add_parallel_branches_node(
"dispatch",
branches={"only": BranchSpec(subgraph=branch, outputs={"out": "b"})},
)
.add_edge("dispatch", END)
.set_entry("dispatch")
.compile()
)
graph, client, _ = _attach(graph)
await graph.invoke(_MultiParentState())
await graph.drain()

trace = next(iter(client.traces.values()))
branch_obs = [o for o in trace.observations if o.name == "only"]
assert len(branch_obs) == 1, f"expected one per-branch observation, got {len(branch_obs)}"
inner = [o for o in trace.observations if o.name in ("node_a", "node_b")]
assert len(inner) == 2, f"expected two inner observations, got {len(inner)}"
assert all(o.parent_observation_id == branch_obs[0].id for o in inner)


# Spec §8.4.1 / proposal 0052: implementation attribution rows on
# every Langfuse Trace. The two rows source from the §5.1
# attributes; the always-emit invariant inherits from §5.1 so the
Expand Down