diff --git a/CHANGELOG.md b/CHANGELOG.md index 99f7cdd..d73a94e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). The ### Fixed - **`current_fan_out_index()` inside fan-out instance middleware** now returns the executing instance's index (and `current_fan_out_index_chain()` its lineage) instead of `None`. The engine set the fan-out lineage ContextVars per-node, inside the inner subgraph, which left them unset in `instance_middleware` that wraps the subgraph from outside; they are now set around the instance-middleware chain. The documented `instance_middleware` use (`RetryMiddleware`) does not read the index, so no shipped behavior changes. This corrects the value seen by custom instance middleware that reads the index or calls `set_invocation_metadata`. +- **Langfuse per-branch dispatch-span observation** (observability §4.3 / §8.4.2, proposals 0042 / 0044). The Langfuse observer now synthesizes a per-branch Span observation under a `parallel_branches` dispatcher node, so each branch's inner observations nest under their own branch span (a three-level dispatcher / per-branch-span / inner-nodes tree) instead of parenting directly under the dispatcher. The per-branch observation carries the OA-emitted `branch_name` alongside the caller baseline metadata and any per-branch augmentation, and the Generation observation now carries `branch_name` too. The OTel observer already produced this shape (proposal 0044 shipped OTel-only in v0.11.0); this brings the Langfuse mapping into line. Callable branches (proposal 0075) are unchanged. ## [0.15.0] — 2026-06-22 diff --git a/src/openarmature/observability/langfuse/observer.py b/src/openarmature/observability/langfuse/observer.py index 025574e..05af4c4 100644 --- a/src/openarmature/observability/langfuse/observer.py +++ b/src/openarmature/observability/langfuse/observer.py @@ -256,6 +256,20 @@ class _InvState: parallel_branches_parent_node_name: dict[tuple[str, ...], str] = field( default_factory=dict[tuple[str, ...], str] ) + # Per proposal 0044: per-branch dispatch-span observations synthesized from + # the first inner event of each branch, keyed ``prefix + (branch_name,)`` + # (prefix = the parallel-branches NODE namespace). Inner branch-node + # observations parent under this dispatch instead of the shared pb NODE + # span; closed when the pb NODE's completed event fires. + parallel_branches_branch_spans: dict[tuple[str, ...], _OpenObservation] = field( + default_factory=dict[tuple[str, ...], _OpenObservation] + ) + # Declared branch-name set per pb NODE namespace (from the NODE's + # parallel_branches_config), so an inner branch event matches only the node + # that actually declares its branch. + parallel_branches_branch_names: dict[tuple[str, ...], frozenset[str]] = field( + default_factory=dict[tuple[str, ...], frozenset[str]] + ) # Side-cache: accumulator for `metadata.detached_child_trace_ids` # on dispatch observations that spawn detached children. Keyed by # the dispatch observation's prefix (the fan-out node's namespace, @@ -475,6 +489,9 @@ def _open_started_observation(self, event: NodeEvent) -> None: inv_state.parallel_branches_parent_node_name[event.namespace] = ( event.parallel_branches_config.parent_node_name ) + inv_state.parallel_branches_branch_names[event.namespace] = frozenset( + event.parallel_branches_config.branch_names + ) key = self._key_for(event) if key in inv_state.open_observations: @@ -541,10 +558,15 @@ def _handle_completed(self, event: NodeEvent) -> None: # rather than appending to the previous iteration's # accumulator and overwriting the prior link metadata. inv_state.detached_child_trace_ids.pop(event.namespace, None) - # Per proposal 0045: clean up the pb cache on a pb NODE's own - # completion. Same shape as the fan-out cleanup above. + # Per proposals 0044/0045: on a pb NODE's own completion, close the + # per-branch dispatch observations synthesized for it (children-before- + # parents) and clear the pb caches. Same shape as the fan-out cleanup. if event.parallel_branches_config is not None: + for prefix in list(inv_state.parallel_branches_branch_spans.keys()): + if len(prefix) > len(event.namespace) and prefix[: len(event.namespace)] == event.namespace: + self._close_parallel_branches_branch_dispatch_observation(inv_state, prefix) inv_state.parallel_branches_parent_node_name.pop(event.namespace, None) + inv_state.parallel_branches_branch_names.pop(event.namespace, None) key = self._key_for(event) observation = inv_state.open_observations.pop(key, None) @@ -892,6 +914,14 @@ def _resolve_parent_observation_id(self, inv_state: _InvState, event: NodeEvent) # 3. Leaf node observation at any matching ancestor prefix, # walked longest-first. # 4. None — the Trace itself becomes the implicit parent. + # Per proposal 0044: an inner branch node parents under its per-branch + # dispatch observation (longest-first; innermost branch wins). + if event.branch_name is not None: + for prefix_len in range(len(event.namespace) - 1, 0, -1): + prefix = event.namespace[:prefix_len] + branch_dispatch = inv_state.parallel_branches_branch_spans.get(prefix + (event.branch_name,)) + if branch_dispatch is not None: + return branch_dispatch.handle.id if event.fan_out_index is not None and event.namespace: instance_key = event.namespace[:1] + (str(event.fan_out_index),) dispatch = inv_state.fan_out_instance_observations.get(instance_key) @@ -1014,6 +1044,24 @@ def _sync_subgraph_observations( ): self._open_fan_out_instance_dispatch_observation(inv_state, correlation_id, prefix, event) continue + # Per proposal 0044: synthesize a per-branch dispatch observation + # under the pb NODE for an inner branch event, so inner branch + # nodes parent under it rather than the shared pb NODE span. Mirror + # of the fan-out per-instance arm above; gated on the branch + # belonging to the pb node declared at this prefix. + if ( + event.branch_name is not None + and prefix in inv_state.parallel_branches_parent_node_name + and event.branch_name in inv_state.parallel_branches_branch_names.get(prefix, frozenset()) + ): + # Synthesize once per branch: _sync runs on every inner node's + # started event, so guard against re-opening (a second open would + # orphan the first observation and split the branch's nodes). + if prefix + (event.branch_name,) not in inv_state.parallel_branches_branch_spans: + self._open_parallel_branches_branch_dispatch_observation( + inv_state, correlation_id, prefix, event + ) + continue # A parallel-branches or fan-out NODE prefix already has its own # leaf observation (from the NODE's own started event), unlike a # transparent subgraph wrapper. Don't synthesize a duplicate @@ -1085,7 +1133,7 @@ def _open_fan_out_instance_dispatch_observation( ) -> None: # Non-detached per-instance dispatch lives in the parent # Trace under the fan-out node's own Span observation. - fan_out_open = self._find_fan_out_node_observation(inv_state, prefix) + fan_out_open = self._find_node_observation(inv_state, prefix) parent_observation_id = fan_out_open.handle.id if fan_out_open is not None else None parent_node_name = inv_state.fan_out_parent_node_name.get(prefix, prefix[-1]) # Per-instance dispatch is synthesized from the first inner @@ -1117,6 +1165,46 @@ def _open_fan_out_instance_dispatch_observation( branch_name_chain=event.branch_name_chain[:chain_len], ) + def _open_parallel_branches_branch_dispatch_observation( + self, + inv_state: _InvState, + correlation_id: str | None, + prefix: tuple[str, ...], + event: NodeEvent, + ) -> None: + # Per-branch dispatch lives under the parallel-branches NODE's own Span + # observation (mirror of the fan-out per-instance dispatch). + pb_open = self._find_node_observation(inv_state, prefix) + parent_observation_id = pb_open.handle.id if pb_open is not None else None + parent_node_name = inv_state.parallel_branches_parent_node_name.get(prefix, prefix[-1]) + # Synthesized from the first inner event in the branch subtree; inherit + # scalar metadata from it. branch_name is the OA-emitted §8.4.2 row; the + # caller's branchName augmentation rides in via the caller metadata. + metadata: dict[str, Any] = { + "namespace": list(prefix), + "step": event.step, + "attempt_index": 0, + "parallel_branches_parent_node_name": parent_node_name, + "branch_name": event.branch_name, + "subgraph_name": _subgraph_identity_at(event, len(prefix)), + } + if correlation_id is not None: + metadata["correlation_id"] = correlation_id + _apply_caller_metadata(metadata, event.caller_invocation_metadata) + handle = self.client.span( + trace_id=inv_state.trace_id, + name=event.branch_name, + metadata=metadata, + parent_observation_id=parent_observation_id, + ) + branch_key = prefix + (cast("str", event.branch_name),) + chain_len = len(prefix) + inv_state.parallel_branches_branch_spans[branch_key] = _OpenObservation( + handle=handle, + fan_out_index_chain=event.fan_out_index_chain[:chain_len], + branch_name_chain=event.branch_name_chain[:chain_len], + ) + def _open_detached_subgraph_trace( self, inv_state: _InvState, @@ -1270,7 +1358,7 @@ def _open_detached_fan_out_instance_trace( # instance's entry. ids_list = inv_state.detached_child_trace_ids.setdefault(prefix, []) ids_list.append(detached_trace_id) - fan_out_open = self._find_fan_out_node_observation(inv_state, prefix) + fan_out_open = self._find_node_observation(inv_state, prefix) if fan_out_open is not None: # `detached: True` per §8.4.2 (proposal 0042) — the # parent-side fan-out node observation marks itself when @@ -1341,14 +1429,23 @@ def _close_fan_out_instance_dispatch_observation( return observation.handle.end() - def _find_fan_out_node_observation( + def _close_parallel_branches_branch_dispatch_observation( + self, inv_state: _InvState, prefix: tuple[str, ...] + ) -> None: + observation = inv_state.parallel_branches_branch_spans.pop(prefix, None) + if observation is None: + return + observation.handle.end() + + def _find_node_observation( self, inv_state: _InvState, prefix: tuple[str, ...] ) -> _OpenObservation | None: - # Find the fan-out node's open leaf observation at the given - # prefix. Retry middleware wrapping a fan-out bumps the - # attempt_index; this scans for any entry at ``prefix`` with - # ``fan_out_index is None``. Only one such entry is open at a - # time (retry opens and closes within an attempt's lifecycle). + # Find a NODE's own open leaf observation at the given prefix (the + # fan-out or parallel-branches NODE, whose per-instance / per-branch + # dispatches parent under it). Retry middleware wrapping the node bumps + # the attempt_index; this scans for any entry at ``prefix`` with + # ``fan_out_index is None``. Only one such entry is open at a time + # (retry opens and closes within an attempt's lifecycle). for key, observation in inv_state.open_observations.items(): if key[0] == prefix and key[2] is None: return observation @@ -1719,6 +1816,13 @@ def _typed_event_metadata( metadata: dict[str, Any] = {} if correlation_id is not None: metadata["correlation_id"] = correlation_id + # §8.4.2: the OA-emitted fan_out_index / branch_name scoping rows, + # mirroring _observation_metadata so a Generation inside a fan-out + # instance or branch carries the same scoping as its node observation. + if event.fan_out_index is not None: + metadata["fan_out_index"] = event.fan_out_index + if event.branch_name is not None: + metadata["branch_name"] = event.branch_name metadata["system"] = event.provider active_prompt = event.active_prompt if active_prompt is not None: diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index 8e5bea6..988618e 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -293,6 +293,7 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: "024-langfuse-prompt-linkage", "027-langfuse-caller-supplied-metadata", "029-caller-metadata-fan-out-per-instance", + "030-caller-metadata-parallel-branches-per-branch", "031-langfuse-subgraph-span-hierarchy", "032-langfuse-fan-out-per-instance-spans", "033-langfuse-detached-trace-mode", @@ -313,14 +314,7 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: for fixture_ids, reason in ( # The Langfuse-mapping fixtures are fixture-tested by the sibling # conformance runner test_observability_langfuse.py -- see - # _LANGFUSE_HARNESS_FIXTURES, NOT here (they are not unit-only). 030 stays - # below: deferred in that file (needs a LangfuseObserver per-branch - # dispatch-span src change), unit-tested for now. - ( - ("030-caller-metadata-parallel-branches-per-branch",), - "proposal 0040 per-branch caller metadata; covered by " - "test_observability_otel.py; deferred in test_observability_langfuse.py", - ), + # _LANGFUSE_HARNESS_FIXTURES, NOT here (they are not unit-only). ( ( "043-get-invocation-metadata-roundtrip", diff --git a/tests/conformance/test_observability_langfuse.py b/tests/conformance/test_observability_langfuse.py index a0c02d0..879ae08 100644 --- a/tests/conformance/test_observability_langfuse.py +++ b/tests/conformance/test_observability_langfuse.py @@ -103,16 +103,11 @@ # collected results -- and augment_metadata_from_field drives the # per-instance set_invocation_metadata. "029-caller-metadata-fan-out-per-instance", - # 030 (parallel-branches per-branch) stays deferred: a SRC gap, not a - # harness one. The expected trace needs a per-branch dispatch-span - # observation (inner branch spans parent under it), which §4.3 + §8.4.2 - # (proposal 0042 observation.metadata.branch_name) + proposal 0044 - # mandate. The OTel observer synthesizes it - # (parallel_branches_branch_spans); the LangfuseObserver does not yet - # (0044 shipped OTel-only in v0.11.0). Un-defers once that synthesis is - # ported to the Langfuse observer (a src change). Sibling-skip behavior - # IS verified by the OTel unit test - # ``test_metadata_augmentation_in_parallel_branches_skips_sibling``. + # 030 (parallel-branches per-branch): the LangfuseObserver now + # synthesizes the per-branch dispatch-span observation (§4.3 + §8.4.2 + + # proposal 0044) that inner branch nodes parent under, ported from the + # OTel observer's parallel_branches_branch_spans machinery. + "030-caller-metadata-parallel-branches-per-branch", # 039 (nested-lineage augmentation, proposal 0045) stays deferred: the # three cases need harness extensions the existing primitives lack. # Cases 1 + 3 (nested fan-out / fan-out-in-serial) need the fan-out diff --git a/tests/unit/test_observability_langfuse.py b/tests/unit/test_observability_langfuse.py index 48b5a7d..f037d03 100644 --- a/tests/unit/test_observability_langfuse.py +++ b/tests/unit/test_observability_langfuse.py @@ -1195,6 +1195,57 @@ async def keyword(_s: _S) -> Any: assert (branch_obs[0].metadata or {}).get("branch_name") == branch +async def test_parallel_branches_subgraph_branch_one_dispatch_observation() -> None: + # A subgraph branch with multiple inner nodes synthesizes exactly ONE + # per-branch dispatch observation; both inner nodes parent under it (not a + # fresh dispatch per inner started event). Guards the proposal-0044 + # synthesis idempotency. + from openarmature.graph import BranchSpec + + class _MultiBranchState(State): + a: str = "" + b: str = "" + + class _MultiParentState(State): + out: str = "" + + async def _node_a(_s: _MultiBranchState) -> dict[str, Any]: + return {"a": "a"} + + async def _node_b(_s: _MultiBranchState) -> dict[str, Any]: + return {"b": "b"} + + branch = ( + GraphBuilder(_MultiBranchState) + .add_node("node_a", _node_a) + .add_node("node_b", _node_b) + .add_edge("node_a", "node_b") + .add_edge("node_b", END) + .set_entry("node_a") + .compile() + ) + graph = ( + GraphBuilder(_MultiParentState) + .add_parallel_branches_node( + "dispatch", + branches={"only": BranchSpec(subgraph=branch, outputs={"out": "b"})}, + ) + .add_edge("dispatch", END) + .set_entry("dispatch") + .compile() + ) + graph, client, _ = _attach(graph) + await graph.invoke(_MultiParentState()) + await graph.drain() + + trace = next(iter(client.traces.values())) + branch_obs = [o for o in trace.observations if o.name == "only"] + assert len(branch_obs) == 1, f"expected one per-branch observation, got {len(branch_obs)}" + inner = [o for o in trace.observations if o.name in ("node_a", "node_b")] + assert len(inner) == 2, f"expected two inner observations, got {len(inner)}" + assert all(o.parent_observation_id == branch_obs[0].id for o in inner) + + # Spec §8.4.1 / proposal 0052: implementation attribution rows on # every Langfuse Trace. The two rows source from the §5.1 # attributes; the always-emit invariant inherits from §5.1 so the