From aa258b8fb5127c3b0406b74e8e2eb5ec3e740ec5 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Wed, 24 Jun 2026 16:06:22 -0700 Subject: [PATCH 1/2] Un-defer fan-out per-instance fixture 029 029's fan_out omits the collect_field/target_field/item_field the cross-cap adapter requires, and its inner subgraph ships no state block. Synthesize those into a throwaway aggregation sink (029 asserts per-instance span metadata + sibling isolation, never the collected results) and seed the inner state, then activate 029 in the Langfuse conformance runner. Rewrite _make_augment_instance_middleware to read each instance's item from its item_field slot in runtime state. The prior approach indexed a build-time list by current_fan_out_index(), which returns None inside instance middleware (the ContextVar is set deeper, in node execution), so the augmentation never fired -- latent because 029/039 were its only users and both were deferred. Sharpen the 030/039 deferral notes: 030 needs a LangfuseObserver per-branch dispatch-span src change, 039 needs a runtime-state item lookup plus a new augment_metadata_from_outer_item directive. --- tests/conformance/test_observability.py | 14 +- .../test_observability_langfuse.py | 166 +++++++++++------- 2 files changed, 112 insertions(+), 68 deletions(-) diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index 8cefcfa..8e5bea6 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -284,13 +284,15 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: # sibling conformance runner tests/conformance/test_observability_langfuse.py # (NOT unit tests). They are skipped here and asserted there; the coverage guard # counts them as accounted. 035/036 (invocation-id) + 059 (attribution) were -# relocated here from this file by the fixture-harness catch-up. +# relocated here from this file by the fixture-harness catch-up; 029 (fan-out +# per-instance) was un-deferred into the sibling runner the same way. _LANGFUSE_HARNESS_FIXTURES: frozenset[str] = frozenset( { "022-langfuse-basic-trace", "023-langfuse-generation-rendering", "024-langfuse-prompt-linkage", "027-langfuse-caller-supplied-metadata", + "029-caller-metadata-fan-out-per-instance", "031-langfuse-subgraph-span-hierarchy", "032-langfuse-fan-out-per-instance-spans", "033-langfuse-detached-trace-mode", @@ -311,13 +313,9 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: for fixture_ids, reason in ( # The Langfuse-mapping fixtures are fixture-tested by the sibling # conformance runner test_observability_langfuse.py -- see - # _LANGFUSE_HARNESS_FIXTURES, NOT here (they are not unit-only). 029/030 - # stay below: deferred in that file (harness gaps), unit-tested for now. - ( - ("029-caller-metadata-fan-out-per-instance",), - "proposal 0040 fan-out per-instance caller metadata; unit-tested; " - "deferred in test_observability_langfuse.py", - ), + # _LANGFUSE_HARNESS_FIXTURES, NOT here (they are not unit-only). 030 stays + # below: deferred in that file (needs a LangfuseObserver per-branch + # dispatch-span src change), unit-tested for now. ( ("030-caller-metadata-parallel-branches-per-branch",), "proposal 0040 per-branch caller metadata; covered by " diff --git a/tests/conformance/test_observability_langfuse.py b/tests/conformance/test_observability_langfuse.py index 7d5b829..7c79464 100644 --- a/tests/conformance/test_observability_langfuse.py +++ b/tests/conformance/test_observability_langfuse.py @@ -94,48 +94,36 @@ "035-caller-invocation-id-uuid", "036-caller-invocation-id-non-uuid", "059-implementation-attribution-langfuse", - # 029 + 030 stay deferred in v0.11.0: - # - 029 (fan-out per-instance): fixture omits ``collect_field`` - # and ``target_field`` on the fan_out cfg, plus the inner - # subgraph omits a ``state:`` block — both are required by - # the cross-cap adapter. The augmentation behavior IS - # verified end-to-end by the unit test - # ``test_observability_langfuse.py::test_metadata_augmentation_in_fan_out_isolates_per_instance`` - # plus the OTel counterpart. - # - 030 (parallel-branches per-branch): the expected trace - # requires a per-branch dispatch span the Langfuse mapping - # doesn't synthesize today; the spec direction is in - # ``discuss-otel-parallel-branches-dispatch-span``. - # Sibling-skip behavior IS verified by the OTel unit test - # ``test_metadata_augmentation_in_parallel_branches_skips_sibling``. - # Both fixtures land once spec settles the dispatch-span - # shape AND the adapter learns to infer fan-out aggregation - # defaults from inner subgraphs. - # 039 (nested-lineage augmentation) — proposal 0045 v0.37.0. - # Stays deferred in PR 11. The three cases require harness - # extensions the existing primitives don't cover: - # - Case 1 + 3 (nested fan-out + fan-out-in-serial): the - # ``augment_metadata_from_field`` middleware captures the - # items list at fixture-build time from ``initial_state``; - # nested fan-outs read items from the OUTER instance's - # subgraph state (the ``inner_seed`` field threaded through - # per_product), which the current item-capture path can't - # resolve. Needs a runtime-state item-list lookup. - # - Case 2 (pb-inside-fan-out): introduces a new directive - # ``augment_metadata_from_outer_item: {key: field}`` — the - # pb branch's middleware sources the augmentation value - # from the SURROUNDING outer fan-out instance's item, not - # from a static value or the branch's own state. Needs a - # new middleware factory + a way to capture the outer-item - # reference at fixture-build time. - # 0045's behavioral contract IS exercised at unit level via the - # OTel observer's - # ``test_nested_fan_out_augmentation_reaches_outer_instance_dispatch_span`` - # regression test, and the cross-impl coverage the spec relies - # on for nested cases is already in place. Activating 039 against the - # Langfuse harness is a follow-up PR — the harness extensions - # above are non-trivial and orthogonal to the engine + observer - # changes 0045 actually mandates. + # 029 (fan-out per-instance): the fixture omits collect_field/ + # target_field on the fan_out cfg and the inner subgraph omits a + # state: block, both required by the cross-cap adapter. The harness + # synthesizes the inner state (_infer_state_fields_from_nodes) and a + # throwaway aggregation sink (_synthesize_fan_out_aggregation) -- 029 + # asserts per-instance span metadata + sibling isolation, never the + # collected results -- and augment_metadata_from_field drives the + # per-instance set_invocation_metadata. + "029-caller-metadata-fan-out-per-instance", + # 030 (parallel-branches per-branch) stays deferred: a SRC gap, not a + # harness one. The expected trace needs a per-branch dispatch-span + # observation (inner branch spans parent under it), which §4.3 + §8.4.2 + # (proposal 0042 observation.metadata.branch_name) + proposal 0044 + # mandate. The OTel observer synthesizes it + # (parallel_branches_branch_spans); the LangfuseObserver does not yet + # (0044 shipped OTel-only in v0.11.0). Un-defers once that synthesis is + # ported to the Langfuse observer (a src change). Sibling-skip behavior + # IS verified by the OTel unit test + # ``test_metadata_augmentation_in_parallel_branches_skips_sibling``. + # 039 (nested-lineage augmentation, proposal 0045) stays deferred: the + # three cases need harness extensions the existing primitives lack. + # Cases 1 + 3 (nested fan-out / fan-out-in-serial) need the fan-out + # augment middleware to read items_field from the executing subgraph's + # RUNTIME state (the outer instance's threaded inner_seed), not the + # build-time initial_state the current _make_augment_instance_middleware + # captures. Case 2 (pb-inside-fan-out) needs a new + # augment_metadata_from_outer_item factory AND depends on 030's + # per-branch dispatch span landing first. 0045's contract IS exercised + # at unit level via the OTel observer's + # ``test_nested_fan_out_augmentation_reaches_outer_instance_dispatch_span``. } ) @@ -222,6 +210,64 @@ def _normalize_fan_out_subgraph_keys(spec: dict[str, Any]) -> None: fan_out_cfg["subgraph"] = fan_out_cfg.pop("inner_subgraph") +def _synthesize_fan_out_aggregation(spec: dict[str, Any]) -> None: + """Synthesize a throwaway aggregation sink for a ``fan_out`` block that omits + ``collect_field`` / ``target_field`` (fixture 029): collect the inner + subgraph's ``stores_response_in`` value into a fresh outer list field, and + declare the adapter's other required fan-out fields (``item_field``, the + ``items_field`` source, the inner state). + + Call AFTER ``_normalize_fan_out_subgraph_keys`` so the ``subgraph(s)`` keys + are already resolved. + """ + # 029 asserts per-instance span metadata + sibling isolation, never the + # collected results, so the sink only satisfies the adapter's collect/target + # requirement. The inner subgraph (spec["subgraphs"]) is shared across a + # fixture's cases, so its state seed below uses setdefault to stay + # idempotent; the outer state / fan_out writes are per-case. + subgraphs = cast("dict[str, Any]", spec.get("subgraphs") or {}) + state_block = cast("dict[str, Any]", spec.get("state") or {}) + state_fields = cast("dict[str, Any]", state_block.get("fields") or {}) + for node_name, node_spec in cast("dict[str, Any]", spec.get("nodes") or {}).items(): + if not isinstance(node_spec, dict): + continue + fan_out_cfg = cast("dict[str, Any] | None", cast("dict[str, Any]", node_spec).get("fan_out")) + if fan_out_cfg is None or ("collect_field" in fan_out_cfg and "target_field" in fan_out_cfg): + continue + sub_name = cast("str | None", fan_out_cfg.get("subgraph")) + sub_spec = cast("dict[str, Any]", subgraphs.get(sub_name or "", {})) + inferred = _infer_state_fields_from_nodes(cast("dict[str, Any]", sub_spec.get("nodes") or {})) + # Any inferred field works as the collected value; the sink is never + # asserted, so the first one is fine. + collect_field = next(iter(inferred), None) + if collect_field is None: + continue + # The sink is node-scoped (one outer field per fan-out node). The item + # slot is a fixed inner-state field name on purpose: distinct fan-outs + # have distinct inner subgraphs, and a subgraph shared between two + # fan-outs reuses the one slot, so scoping it per node would mismatch. + sink = f"oa_fan_out_sink_{node_name}" + item_field = "oa_fan_out_item" + fan_out_cfg.setdefault("collect_field", collect_field) + fan_out_cfg.setdefault("target_field", sink) + # items_field mode also requires item_field (where the engine places + # each item in the inner state). Seed an explicit inner state block -- + # the inferred response fields plus the item slot -- so + # _build_inner_subgraph_with_llm uses it. The augment middleware reads + # the item back out of this slot at runtime. + fan_out_cfg.setdefault("item_field", item_field) + sub_spec.setdefault("state", {"fields": {**inferred, item_field: {"type": "dict", "default": {}}}}) + state_fields[sink] = {"type": "list", "reducer": "append", "default": []} + # The items_field source (e.g. products) must be declared on the outer + # state; 029 ships it only via initial_state, so declare it here. + items_field = cast("str | None", fan_out_cfg.get("items_field")) + if items_field is not None: + state_fields.setdefault(items_field, {"type": "list", "default": []}) + if state_fields: + state_block["fields"] = state_fields + spec["state"] = state_block + + def _build_augment_middlewares( case: Mapping[str, Any], ) -> tuple[ @@ -244,7 +290,6 @@ def _build_augment_middlewares( """ fan_out_mw: dict[str, list[Any]] = {} branch_mw: dict[str, dict[str, list[Any]]] = {} - initial_state = cast("dict[str, Any]", case.get("initial_state") or {}) for node_name, node_spec_any in cast("dict[str, Any]", case.get("nodes") or {}).items(): if not isinstance(node_spec_any, dict): @@ -254,11 +299,8 @@ def _build_augment_middlewares( if fan_out_cfg is not None: augment_field_map = cast("dict[str, str] | None", fan_out_cfg.get("augment_metadata_from_field")) if augment_field_map: - items_field = cast("str | None", fan_out_cfg.get("items_field")) - items_list = ( - cast("list[dict[str, Any]]", initial_state.get(items_field, [])) if items_field else [] - ) - fan_out_mw[node_name] = [_make_augment_instance_middleware(augment_field_map, items_list)] + item_field = cast("str | None", fan_out_cfg.get("item_field")) + fan_out_mw[node_name] = [_make_augment_instance_middleware(augment_field_map, item_field)] pb_cfg = cast("dict[str, Any] | None", node_spec.get("parallel_branches")) if pb_cfg is not None: branches_cfg = cast("dict[str, dict[str, Any]]", pb_cfg.get("branches") or {}) @@ -272,25 +314,28 @@ def _build_augment_middlewares( return fan_out_mw, branch_mw -def _make_augment_instance_middleware(field_map: dict[str, str], items: list[dict[str, Any]]) -> Any: - """Per-instance fan-out middleware that calls - ``set_invocation_metadata`` with per-item entries pulled from - ``items[current_fan_out_index()][field_path]``. Captures ``items`` - at fixture-build time so each instance reads the same list.""" +def _make_augment_instance_middleware(field_map: dict[str, str], item_field: str | None) -> Any: + """Per-instance fan-out middleware that reads the instance's own item from + the ``item_field`` slot of its subgraph state and calls + ``set_invocation_metadata`` with the mapped entries.""" + # Reads runtime state, not a build-time list indexed by + # current_fan_out_index(): instance middleware wraps the inner subgraph from + # OUTSIDE, but the fan_out_index ContextVar is set deeper (inside the inner + # node execution), so it is None here. The engine has already placed each + # item in item_field by the time the chain runs, and set_invocation_metadata + # lands before the inner spans open, so the instance's dispatch + inner spans + # all carry the augmentation. class _AugmentInstanceMW: async def __call__(self, state: Any, next_: Any, /) -> Any: - from openarmature.observability.correlation import ( # noqa: PLC0415 - current_fan_out_index, - ) from openarmature.observability.metadata import ( # noqa: PLC0415 set_invocation_metadata, ) - idx = current_fan_out_index() - if idx is not None and 0 <= idx < len(items): - item = items[idx] - entries = {key: item[field_path] for key, field_path in field_map.items()} + item = getattr(state, item_field, None) if item_field else None + if isinstance(item, Mapping): + item_map = cast("Mapping[str, Any]", item) + entries = {key: item_map[field_path] for key, field_path in field_map.items()} set_invocation_metadata(**entries) return await next_(state) @@ -657,6 +702,7 @@ async def _run_case(case: Mapping[str, Any]) -> None: # references. Pure key normalization; semantics unchanged. if isinstance(case, dict): _normalize_fan_out_subgraph_keys(case) + _synthesize_fan_out_aggregation(case) # Per proposal 0040 fixtures 029 / 030: synthesize the # augmentation middlewares that drive the per-instance / # per-branch ``set_invocation_metadata`` calls. Both flow into From 62eb95ecf0dd67f3e8d7ee6579e09bf89ef42505 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Wed, 24 Jun 2026 16:14:39 -0700 Subject: [PATCH 2/2] Harden fan-out item_field synthesis From CoPilot review of #188: - Make the inner-state synthesis additive: ensure item_field and the inferred response fields are declared whether or not the subgraph shipped its own state block. setdefault on the whole state block skipped a fixture that declares inner state without the item slot, which would fail strict-State validation when the engine writes it. - Fail fast when augment_metadata_from_field is present but item_field is missing, rather than building a silent no-op middleware that hides a harness/fixture mismatch. Tighten the middleware signature to a non-optional item_field. --- .../test_observability_langfuse.py | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/tests/conformance/test_observability_langfuse.py b/tests/conformance/test_observability_langfuse.py index 7c79464..a0c02d0 100644 --- a/tests/conformance/test_observability_langfuse.py +++ b/tests/conformance/test_observability_langfuse.py @@ -251,12 +251,17 @@ def _synthesize_fan_out_aggregation(spec: dict[str, Any]) -> None: fan_out_cfg.setdefault("collect_field", collect_field) fan_out_cfg.setdefault("target_field", sink) # items_field mode also requires item_field (where the engine places - # each item in the inner state). Seed an explicit inner state block -- - # the inferred response fields plus the item slot -- so - # _build_inner_subgraph_with_llm uses it. The augment middleware reads - # the item back out of this slot at runtime. + # each item in the inner state). The augment middleware reads the item + # back out of this slot at runtime. fan_out_cfg.setdefault("item_field", item_field) - sub_spec.setdefault("state", {"fields": {**inferred, item_field: {"type": "dict", "default": {}}}}) + # Ensure the inner state declares item_field (+ the inferred response + # fields) whether or not the subgraph shipped its own state block -- + # State is strict, so the engine's write to item_field needs it declared. + sub_state = cast("dict[str, Any]", sub_spec.setdefault("state", {})) + sub_fields = cast("dict[str, Any]", sub_state.setdefault("fields", {})) + for fname, fdef in inferred.items(): + sub_fields.setdefault(fname, fdef) + sub_fields.setdefault(item_field, {"type": "dict", "default": {}}) state_fields[sink] = {"type": "list", "reducer": "append", "default": []} # The items_field source (e.g. products) must be declared on the outer # state; 029 ships it only via initial_state, so declare it here. @@ -300,6 +305,10 @@ def _build_augment_middlewares( augment_field_map = cast("dict[str, str] | None", fan_out_cfg.get("augment_metadata_from_field")) if augment_field_map: item_field = cast("str | None", fan_out_cfg.get("item_field")) + if item_field is None: + raise ValueError( + f"fan-out node {node_name!r}: augment_metadata_from_field requires item_field" + ) fan_out_mw[node_name] = [_make_augment_instance_middleware(augment_field_map, item_field)] pb_cfg = cast("dict[str, Any] | None", node_spec.get("parallel_branches")) if pb_cfg is not None: @@ -314,7 +323,7 @@ def _build_augment_middlewares( return fan_out_mw, branch_mw -def _make_augment_instance_middleware(field_map: dict[str, str], item_field: str | None) -> Any: +def _make_augment_instance_middleware(field_map: dict[str, str], item_field: str) -> Any: """Per-instance fan-out middleware that reads the instance's own item from the ``item_field`` slot of its subgraph state and calls ``set_invocation_metadata`` with the mapped entries.""" @@ -332,7 +341,7 @@ async def __call__(self, state: Any, next_: Any, /) -> Any: set_invocation_metadata, ) - item = getattr(state, item_field, None) if item_field else None + item = getattr(state, item_field, None) if isinstance(item, Mapping): item_map = cast("Mapping[str, Any]", item) entries = {key: item_map[field_path] for key, field_path in field_map.items()}