diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index 8cefcfa..8e5bea6 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -284,13 +284,15 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: # sibling conformance runner tests/conformance/test_observability_langfuse.py # (NOT unit tests). They are skipped here and asserted there; the coverage guard # counts them as accounted. 035/036 (invocation-id) + 059 (attribution) were -# relocated here from this file by the fixture-harness catch-up. +# relocated here from this file by the fixture-harness catch-up; 029 (fan-out +# per-instance) was un-deferred into the sibling runner the same way. _LANGFUSE_HARNESS_FIXTURES: frozenset[str] = frozenset( { "022-langfuse-basic-trace", "023-langfuse-generation-rendering", "024-langfuse-prompt-linkage", "027-langfuse-caller-supplied-metadata", + "029-caller-metadata-fan-out-per-instance", "031-langfuse-subgraph-span-hierarchy", "032-langfuse-fan-out-per-instance-spans", "033-langfuse-detached-trace-mode", @@ -311,13 +313,9 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: for fixture_ids, reason in ( # The Langfuse-mapping fixtures are fixture-tested by the sibling # conformance runner test_observability_langfuse.py -- see - # _LANGFUSE_HARNESS_FIXTURES, NOT here (they are not unit-only). 029/030 - # stay below: deferred in that file (harness gaps), unit-tested for now. - ( - ("029-caller-metadata-fan-out-per-instance",), - "proposal 0040 fan-out per-instance caller metadata; unit-tested; " - "deferred in test_observability_langfuse.py", - ), + # _LANGFUSE_HARNESS_FIXTURES, NOT here (they are not unit-only). 030 stays + # below: deferred in that file (needs a LangfuseObserver per-branch + # dispatch-span src change), unit-tested for now. ( ("030-caller-metadata-parallel-branches-per-branch",), "proposal 0040 per-branch caller metadata; covered by " diff --git a/tests/conformance/test_observability_langfuse.py b/tests/conformance/test_observability_langfuse.py index 7d5b829..a0c02d0 100644 --- a/tests/conformance/test_observability_langfuse.py +++ b/tests/conformance/test_observability_langfuse.py @@ -94,48 +94,36 @@ "035-caller-invocation-id-uuid", "036-caller-invocation-id-non-uuid", "059-implementation-attribution-langfuse", - # 029 + 030 stay deferred in v0.11.0: - # - 029 (fan-out per-instance): fixture omits ``collect_field`` - # and ``target_field`` on the fan_out cfg, plus the inner - # subgraph omits a ``state:`` block — both are required by - # the cross-cap adapter. The augmentation behavior IS - # verified end-to-end by the unit test - # ``test_observability_langfuse.py::test_metadata_augmentation_in_fan_out_isolates_per_instance`` - # plus the OTel counterpart. - # - 030 (parallel-branches per-branch): the expected trace - # requires a per-branch dispatch span the Langfuse mapping - # doesn't synthesize today; the spec direction is in - # ``discuss-otel-parallel-branches-dispatch-span``. - # Sibling-skip behavior IS verified by the OTel unit test - # ``test_metadata_augmentation_in_parallel_branches_skips_sibling``. - # Both fixtures land once spec settles the dispatch-span - # shape AND the adapter learns to infer fan-out aggregation - # defaults from inner subgraphs. - # 039 (nested-lineage augmentation) — proposal 0045 v0.37.0. - # Stays deferred in PR 11. The three cases require harness - # extensions the existing primitives don't cover: - # - Case 1 + 3 (nested fan-out + fan-out-in-serial): the - # ``augment_metadata_from_field`` middleware captures the - # items list at fixture-build time from ``initial_state``; - # nested fan-outs read items from the OUTER instance's - # subgraph state (the ``inner_seed`` field threaded through - # per_product), which the current item-capture path can't - # resolve. Needs a runtime-state item-list lookup. - # - Case 2 (pb-inside-fan-out): introduces a new directive - # ``augment_metadata_from_outer_item: {key: field}`` — the - # pb branch's middleware sources the augmentation value - # from the SURROUNDING outer fan-out instance's item, not - # from a static value or the branch's own state. Needs a - # new middleware factory + a way to capture the outer-item - # reference at fixture-build time. - # 0045's behavioral contract IS exercised at unit level via the - # OTel observer's - # ``test_nested_fan_out_augmentation_reaches_outer_instance_dispatch_span`` - # regression test, and the cross-impl coverage the spec relies - # on for nested cases is already in place. Activating 039 against the - # Langfuse harness is a follow-up PR — the harness extensions - # above are non-trivial and orthogonal to the engine + observer - # changes 0045 actually mandates. + # 029 (fan-out per-instance): the fixture omits collect_field/ + # target_field on the fan_out cfg and the inner subgraph omits a + # state: block, both required by the cross-cap adapter. The harness + # synthesizes the inner state (_infer_state_fields_from_nodes) and a + # throwaway aggregation sink (_synthesize_fan_out_aggregation) -- 029 + # asserts per-instance span metadata + sibling isolation, never the + # collected results -- and augment_metadata_from_field drives the + # per-instance set_invocation_metadata. + "029-caller-metadata-fan-out-per-instance", + # 030 (parallel-branches per-branch) stays deferred: a SRC gap, not a + # harness one. The expected trace needs a per-branch dispatch-span + # observation (inner branch spans parent under it), which §4.3 + §8.4.2 + # (proposal 0042 observation.metadata.branch_name) + proposal 0044 + # mandate. The OTel observer synthesizes it + # (parallel_branches_branch_spans); the LangfuseObserver does not yet + # (0044 shipped OTel-only in v0.11.0). Un-defers once that synthesis is + # ported to the Langfuse observer (a src change). Sibling-skip behavior + # IS verified by the OTel unit test + # ``test_metadata_augmentation_in_parallel_branches_skips_sibling``. + # 039 (nested-lineage augmentation, proposal 0045) stays deferred: the + # three cases need harness extensions the existing primitives lack. + # Cases 1 + 3 (nested fan-out / fan-out-in-serial) need the fan-out + # augment middleware to read items_field from the executing subgraph's + # RUNTIME state (the outer instance's threaded inner_seed), not the + # build-time initial_state the current _make_augment_instance_middleware + # captures. Case 2 (pb-inside-fan-out) needs a new + # augment_metadata_from_outer_item factory AND depends on 030's + # per-branch dispatch span landing first. 0045's contract IS exercised + # at unit level via the OTel observer's + # ``test_nested_fan_out_augmentation_reaches_outer_instance_dispatch_span``. } ) @@ -222,6 +210,69 @@ def _normalize_fan_out_subgraph_keys(spec: dict[str, Any]) -> None: fan_out_cfg["subgraph"] = fan_out_cfg.pop("inner_subgraph") +def _synthesize_fan_out_aggregation(spec: dict[str, Any]) -> None: + """Synthesize a throwaway aggregation sink for a ``fan_out`` block that omits + ``collect_field`` / ``target_field`` (fixture 029): collect the inner + subgraph's ``stores_response_in`` value into a fresh outer list field, and + declare the adapter's other required fan-out fields (``item_field``, the + ``items_field`` source, the inner state). + + Call AFTER ``_normalize_fan_out_subgraph_keys`` so the ``subgraph(s)`` keys + are already resolved. + """ + # 029 asserts per-instance span metadata + sibling isolation, never the + # collected results, so the sink only satisfies the adapter's collect/target + # requirement. The inner subgraph (spec["subgraphs"]) is shared across a + # fixture's cases, so its state seed below uses setdefault to stay + # idempotent; the outer state / fan_out writes are per-case. + subgraphs = cast("dict[str, Any]", spec.get("subgraphs") or {}) + state_block = cast("dict[str, Any]", spec.get("state") or {}) + state_fields = cast("dict[str, Any]", state_block.get("fields") or {}) + for node_name, node_spec in cast("dict[str, Any]", spec.get("nodes") or {}).items(): + if not isinstance(node_spec, dict): + continue + fan_out_cfg = cast("dict[str, Any] | None", cast("dict[str, Any]", node_spec).get("fan_out")) + if fan_out_cfg is None or ("collect_field" in fan_out_cfg and "target_field" in fan_out_cfg): + continue + sub_name = cast("str | None", fan_out_cfg.get("subgraph")) + sub_spec = cast("dict[str, Any]", subgraphs.get(sub_name or "", {})) + inferred = _infer_state_fields_from_nodes(cast("dict[str, Any]", sub_spec.get("nodes") or {})) + # Any inferred field works as the collected value; the sink is never + # asserted, so the first one is fine. + collect_field = next(iter(inferred), None) + if collect_field is None: + continue + # The sink is node-scoped (one outer field per fan-out node). The item + # slot is a fixed inner-state field name on purpose: distinct fan-outs + # have distinct inner subgraphs, and a subgraph shared between two + # fan-outs reuses the one slot, so scoping it per node would mismatch. + sink = f"oa_fan_out_sink_{node_name}" + item_field = "oa_fan_out_item" + fan_out_cfg.setdefault("collect_field", collect_field) + fan_out_cfg.setdefault("target_field", sink) + # items_field mode also requires item_field (where the engine places + # each item in the inner state). The augment middleware reads the item + # back out of this slot at runtime. + fan_out_cfg.setdefault("item_field", item_field) + # Ensure the inner state declares item_field (+ the inferred response + # fields) whether or not the subgraph shipped its own state block -- + # State is strict, so the engine's write to item_field needs it declared. + sub_state = cast("dict[str, Any]", sub_spec.setdefault("state", {})) + sub_fields = cast("dict[str, Any]", sub_state.setdefault("fields", {})) + for fname, fdef in inferred.items(): + sub_fields.setdefault(fname, fdef) + sub_fields.setdefault(item_field, {"type": "dict", "default": {}}) + state_fields[sink] = {"type": "list", "reducer": "append", "default": []} + # The items_field source (e.g. products) must be declared on the outer + # state; 029 ships it only via initial_state, so declare it here. + items_field = cast("str | None", fan_out_cfg.get("items_field")) + if items_field is not None: + state_fields.setdefault(items_field, {"type": "list", "default": []}) + if state_fields: + state_block["fields"] = state_fields + spec["state"] = state_block + + def _build_augment_middlewares( case: Mapping[str, Any], ) -> tuple[ @@ -244,7 +295,6 @@ def _build_augment_middlewares( """ fan_out_mw: dict[str, list[Any]] = {} branch_mw: dict[str, dict[str, list[Any]]] = {} - initial_state = cast("dict[str, Any]", case.get("initial_state") or {}) for node_name, node_spec_any in cast("dict[str, Any]", case.get("nodes") or {}).items(): if not isinstance(node_spec_any, dict): @@ -254,11 +304,12 @@ def _build_augment_middlewares( if fan_out_cfg is not None: augment_field_map = cast("dict[str, str] | None", fan_out_cfg.get("augment_metadata_from_field")) if augment_field_map: - items_field = cast("str | None", fan_out_cfg.get("items_field")) - items_list = ( - cast("list[dict[str, Any]]", initial_state.get(items_field, [])) if items_field else [] - ) - fan_out_mw[node_name] = [_make_augment_instance_middleware(augment_field_map, items_list)] + item_field = cast("str | None", fan_out_cfg.get("item_field")) + if item_field is None: + raise ValueError( + f"fan-out node {node_name!r}: augment_metadata_from_field requires item_field" + ) + fan_out_mw[node_name] = [_make_augment_instance_middleware(augment_field_map, item_field)] pb_cfg = cast("dict[str, Any] | None", node_spec.get("parallel_branches")) if pb_cfg is not None: branches_cfg = cast("dict[str, dict[str, Any]]", pb_cfg.get("branches") or {}) @@ -272,25 +323,28 @@ def _build_augment_middlewares( return fan_out_mw, branch_mw -def _make_augment_instance_middleware(field_map: dict[str, str], items: list[dict[str, Any]]) -> Any: - """Per-instance fan-out middleware that calls - ``set_invocation_metadata`` with per-item entries pulled from - ``items[current_fan_out_index()][field_path]``. Captures ``items`` - at fixture-build time so each instance reads the same list.""" +def _make_augment_instance_middleware(field_map: dict[str, str], item_field: str) -> Any: + """Per-instance fan-out middleware that reads the instance's own item from + the ``item_field`` slot of its subgraph state and calls + ``set_invocation_metadata`` with the mapped entries.""" + # Reads runtime state, not a build-time list indexed by + # current_fan_out_index(): instance middleware wraps the inner subgraph from + # OUTSIDE, but the fan_out_index ContextVar is set deeper (inside the inner + # node execution), so it is None here. The engine has already placed each + # item in item_field by the time the chain runs, and set_invocation_metadata + # lands before the inner spans open, so the instance's dispatch + inner spans + # all carry the augmentation. class _AugmentInstanceMW: async def __call__(self, state: Any, next_: Any, /) -> Any: - from openarmature.observability.correlation import ( # noqa: PLC0415 - current_fan_out_index, - ) from openarmature.observability.metadata import ( # noqa: PLC0415 set_invocation_metadata, ) - idx = current_fan_out_index() - if idx is not None and 0 <= idx < len(items): - item = items[idx] - entries = {key: item[field_path] for key, field_path in field_map.items()} + item = getattr(state, item_field, None) + if isinstance(item, Mapping): + item_map = cast("Mapping[str, Any]", item) + entries = {key: item_map[field_path] for key, field_path in field_map.items()} set_invocation_metadata(**entries) return await next_(state) @@ -657,6 +711,7 @@ async def _run_case(case: Mapping[str, Any]) -> None: # references. Pure key normalization; semantics unchanged. if isinstance(case, dict): _normalize_fan_out_subgraph_keys(case) + _synthesize_fan_out_aggregation(case) # Per proposal 0040 fixtures 029 / 030: synthesize the # augmentation middlewares that drive the per-instance / # per-branch ``set_invocation_metadata`` calls. Both flow into