Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions tests/conformance/test_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,15 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
# sibling conformance runner tests/conformance/test_observability_langfuse.py
# (NOT unit tests). They are skipped here and asserted there; the coverage guard
# counts them as accounted. 035/036 (invocation-id) + 059 (attribution) were
# relocated here from this file by the fixture-harness catch-up.
# relocated here from this file by the fixture-harness catch-up; 029 (fan-out
# per-instance) was un-deferred into the sibling runner the same way.
_LANGFUSE_HARNESS_FIXTURES: frozenset[str] = frozenset(
{
"022-langfuse-basic-trace",
"023-langfuse-generation-rendering",
"024-langfuse-prompt-linkage",
"027-langfuse-caller-supplied-metadata",
"029-caller-metadata-fan-out-per-instance",
"031-langfuse-subgraph-span-hierarchy",
"032-langfuse-fan-out-per-instance-spans",
"033-langfuse-detached-trace-mode",
Expand All @@ -311,13 +313,9 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
for fixture_ids, reason in (
# The Langfuse-mapping fixtures are fixture-tested by the sibling
# conformance runner test_observability_langfuse.py -- see
# _LANGFUSE_HARNESS_FIXTURES, NOT here (they are not unit-only). 029/030
# stay below: deferred in that file (harness gaps), unit-tested for now.
(
("029-caller-metadata-fan-out-per-instance",),
"proposal 0040 fan-out per-instance caller metadata; unit-tested; "
"deferred in test_observability_langfuse.py",
),
# _LANGFUSE_HARNESS_FIXTURES, NOT here (they are not unit-only). 030 stays
# below: deferred in that file (needs a LangfuseObserver per-branch
# dispatch-span src change), unit-tested for now.
(
("030-caller-metadata-parallel-branches-per-branch",),
"proposal 0040 per-branch caller metadata; covered by "
Expand Down
175 changes: 115 additions & 60 deletions tests/conformance/test_observability_langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,48 +94,36 @@
"035-caller-invocation-id-uuid",
"036-caller-invocation-id-non-uuid",
"059-implementation-attribution-langfuse",
# 029 + 030 stay deferred in v0.11.0:
# - 029 (fan-out per-instance): fixture omits ``collect_field``
# and ``target_field`` on the fan_out cfg, plus the inner
# subgraph omits a ``state:`` block — both are required by
# the cross-cap adapter. The augmentation behavior IS
# verified end-to-end by the unit test
# ``test_observability_langfuse.py::test_metadata_augmentation_in_fan_out_isolates_per_instance``
# plus the OTel counterpart.
# - 030 (parallel-branches per-branch): the expected trace
# requires a per-branch dispatch span the Langfuse mapping
# doesn't synthesize today; the spec direction is in
# ``discuss-otel-parallel-branches-dispatch-span``.
# Sibling-skip behavior IS verified by the OTel unit test
# ``test_metadata_augmentation_in_parallel_branches_skips_sibling``.
# Both fixtures land once spec settles the dispatch-span
# shape AND the adapter learns to infer fan-out aggregation
# defaults from inner subgraphs.
# 039 (nested-lineage augmentation) — proposal 0045 v0.37.0.
# Stays deferred in PR 11. The three cases require harness
# extensions the existing primitives don't cover:
# - Case 1 + 3 (nested fan-out + fan-out-in-serial): the
# ``augment_metadata_from_field`` middleware captures the
# items list at fixture-build time from ``initial_state``;
# nested fan-outs read items from the OUTER instance's
# subgraph state (the ``inner_seed`` field threaded through
# per_product), which the current item-capture path can't
# resolve. Needs a runtime-state item-list lookup.
# - Case 2 (pb-inside-fan-out): introduces a new directive
# ``augment_metadata_from_outer_item: {key: field}`` — the
# pb branch's middleware sources the augmentation value
# from the SURROUNDING outer fan-out instance's item, not
# from a static value or the branch's own state. Needs a
# new middleware factory + a way to capture the outer-item
# reference at fixture-build time.
# 0045's behavioral contract IS exercised at unit level via the
# OTel observer's
# ``test_nested_fan_out_augmentation_reaches_outer_instance_dispatch_span``
# regression test, and the cross-impl coverage the spec relies
# on for nested cases is already in place. Activating 039 against the
# Langfuse harness is a follow-up PR — the harness extensions
# above are non-trivial and orthogonal to the engine + observer
# changes 0045 actually mandates.
# 029 (fan-out per-instance): the fixture omits collect_field/
# target_field on the fan_out cfg and the inner subgraph omits a
# state: block, both required by the cross-cap adapter. The harness
# synthesizes the inner state (_infer_state_fields_from_nodes) and a
# throwaway aggregation sink (_synthesize_fan_out_aggregation) -- 029
# asserts per-instance span metadata + sibling isolation, never the
# collected results -- and augment_metadata_from_field drives the
# per-instance set_invocation_metadata.
"029-caller-metadata-fan-out-per-instance",
# 030 (parallel-branches per-branch) stays deferred: a SRC gap, not a
# harness one. The expected trace needs a per-branch dispatch-span
# observation (inner branch spans parent under it), which §4.3 + §8.4.2
# (proposal 0042 observation.metadata.branch_name) + proposal 0044
# mandate. The OTel observer synthesizes it
# (parallel_branches_branch_spans); the LangfuseObserver does not yet
# (0044 shipped OTel-only in v0.11.0). Un-defers once that synthesis is
# ported to the Langfuse observer (a src change). Sibling-skip behavior
# IS verified by the OTel unit test
# ``test_metadata_augmentation_in_parallel_branches_skips_sibling``.
# 039 (nested-lineage augmentation, proposal 0045) stays deferred: the
# three cases need harness extensions the existing primitives lack.
# Cases 1 + 3 (nested fan-out / fan-out-in-serial) need the fan-out
# augment middleware to read items_field from the executing subgraph's
# RUNTIME state (the outer instance's threaded inner_seed), not the
# build-time initial_state the current _make_augment_instance_middleware
# captures. Case 2 (pb-inside-fan-out) needs a new
# augment_metadata_from_outer_item factory AND depends on 030's
# per-branch dispatch span landing first. 0045's contract IS exercised
# at unit level via the OTel observer's
# ``test_nested_fan_out_augmentation_reaches_outer_instance_dispatch_span``.
}
)

Expand Down Expand Up @@ -222,6 +210,69 @@ def _normalize_fan_out_subgraph_keys(spec: dict[str, Any]) -> None:
fan_out_cfg["subgraph"] = fan_out_cfg.pop("inner_subgraph")


def _synthesize_fan_out_aggregation(spec: dict[str, Any]) -> None:
"""Synthesize a throwaway aggregation sink for a ``fan_out`` block that omits
``collect_field`` / ``target_field`` (fixture 029): collect the inner
subgraph's ``stores_response_in`` value into a fresh outer list field, and
declare the adapter's other required fan-out fields (``item_field``, the
``items_field`` source, the inner state).

Call AFTER ``_normalize_fan_out_subgraph_keys`` so the ``subgraph(s)`` keys
are already resolved.
"""
# 029 asserts per-instance span metadata + sibling isolation, never the
# collected results, so the sink only satisfies the adapter's collect/target
# requirement. The inner subgraph (spec["subgraphs"]) is shared across a
# fixture's cases, so its state seed below uses setdefault to stay
# idempotent; the outer state / fan_out writes are per-case.
subgraphs = cast("dict[str, Any]", spec.get("subgraphs") or {})
state_block = cast("dict[str, Any]", spec.get("state") or {})
state_fields = cast("dict[str, Any]", state_block.get("fields") or {})
for node_name, node_spec in cast("dict[str, Any]", spec.get("nodes") or {}).items():
if not isinstance(node_spec, dict):
continue
fan_out_cfg = cast("dict[str, Any] | None", cast("dict[str, Any]", node_spec).get("fan_out"))
if fan_out_cfg is None or ("collect_field" in fan_out_cfg and "target_field" in fan_out_cfg):
continue
sub_name = cast("str | None", fan_out_cfg.get("subgraph"))
sub_spec = cast("dict[str, Any]", subgraphs.get(sub_name or "", {}))
inferred = _infer_state_fields_from_nodes(cast("dict[str, Any]", sub_spec.get("nodes") or {}))
# Any inferred field works as the collected value; the sink is never
# asserted, so the first one is fine.
collect_field = next(iter(inferred), None)
if collect_field is None:
continue
# The sink is node-scoped (one outer field per fan-out node). The item
# slot is a fixed inner-state field name on purpose: distinct fan-outs
# have distinct inner subgraphs, and a subgraph shared between two
# fan-outs reuses the one slot, so scoping it per node would mismatch.
sink = f"oa_fan_out_sink_{node_name}"
item_field = "oa_fan_out_item"
fan_out_cfg.setdefault("collect_field", collect_field)
fan_out_cfg.setdefault("target_field", sink)
# items_field mode also requires item_field (where the engine places
# each item in the inner state). The augment middleware reads the item
# back out of this slot at runtime.
fan_out_cfg.setdefault("item_field", item_field)
# Ensure the inner state declares item_field (+ the inferred response
# fields) whether or not the subgraph shipped its own state block --
# State is strict, so the engine's write to item_field needs it declared.
sub_state = cast("dict[str, Any]", sub_spec.setdefault("state", {}))
sub_fields = cast("dict[str, Any]", sub_state.setdefault("fields", {}))
for fname, fdef in inferred.items():
sub_fields.setdefault(fname, fdef)
sub_fields.setdefault(item_field, {"type": "dict", "default": {}})
state_fields[sink] = {"type": "list", "reducer": "append", "default": []}
# The items_field source (e.g. products) must be declared on the outer
# state; 029 ships it only via initial_state, so declare it here.
items_field = cast("str | None", fan_out_cfg.get("items_field"))
if items_field is not None:
state_fields.setdefault(items_field, {"type": "list<dict>", "default": []})
if state_fields:
state_block["fields"] = state_fields
spec["state"] = state_block


def _build_augment_middlewares(
case: Mapping[str, Any],
) -> tuple[
Expand All @@ -244,7 +295,6 @@ def _build_augment_middlewares(
"""
fan_out_mw: dict[str, list[Any]] = {}
branch_mw: dict[str, dict[str, list[Any]]] = {}
initial_state = cast("dict[str, Any]", case.get("initial_state") or {})

for node_name, node_spec_any in cast("dict[str, Any]", case.get("nodes") or {}).items():
if not isinstance(node_spec_any, dict):
Expand All @@ -254,11 +304,12 @@ def _build_augment_middlewares(
if fan_out_cfg is not None:
augment_field_map = cast("dict[str, str] | None", fan_out_cfg.get("augment_metadata_from_field"))
if augment_field_map:
items_field = cast("str | None", fan_out_cfg.get("items_field"))
items_list = (
cast("list[dict[str, Any]]", initial_state.get(items_field, [])) if items_field else []
)
fan_out_mw[node_name] = [_make_augment_instance_middleware(augment_field_map, items_list)]
item_field = cast("str | None", fan_out_cfg.get("item_field"))
if item_field is None:
raise ValueError(
f"fan-out node {node_name!r}: augment_metadata_from_field requires item_field"
)
fan_out_mw[node_name] = [_make_augment_instance_middleware(augment_field_map, item_field)]
Comment thread
chris-colinsky marked this conversation as resolved.
pb_cfg = cast("dict[str, Any] | None", node_spec.get("parallel_branches"))
if pb_cfg is not None:
branches_cfg = cast("dict[str, dict[str, Any]]", pb_cfg.get("branches") or {})
Expand All @@ -272,25 +323,28 @@ def _build_augment_middlewares(
return fan_out_mw, branch_mw


def _make_augment_instance_middleware(field_map: dict[str, str], items: list[dict[str, Any]]) -> Any:
"""Per-instance fan-out middleware that calls
``set_invocation_metadata`` with per-item entries pulled from
``items[current_fan_out_index()][field_path]``. Captures ``items``
at fixture-build time so each instance reads the same list."""
def _make_augment_instance_middleware(field_map: dict[str, str], item_field: str) -> Any:
"""Per-instance fan-out middleware that reads the instance's own item from
the ``item_field`` slot of its subgraph state and calls
``set_invocation_metadata`` with the mapped entries."""

# Reads runtime state, not a build-time list indexed by
# current_fan_out_index(): instance middleware wraps the inner subgraph from
# OUTSIDE, but the fan_out_index ContextVar is set deeper (inside the inner
# node execution), so it is None here. The engine has already placed each
# item in item_field by the time the chain runs, and set_invocation_metadata
# lands before the inner spans open, so the instance's dispatch + inner spans
# all carry the augmentation.
class _AugmentInstanceMW:
async def __call__(self, state: Any, next_: Any, /) -> Any:
from openarmature.observability.correlation import ( # noqa: PLC0415
current_fan_out_index,
)
from openarmature.observability.metadata import ( # noqa: PLC0415
set_invocation_metadata,
)

idx = current_fan_out_index()
if idx is not None and 0 <= idx < len(items):
item = items[idx]
entries = {key: item[field_path] for key, field_path in field_map.items()}
item = getattr(state, item_field, None)
if isinstance(item, Mapping):
item_map = cast("Mapping[str, Any]", item)
entries = {key: item_map[field_path] for key, field_path in field_map.items()}
set_invocation_metadata(**entries)
return await next_(state)

Expand Down Expand Up @@ -657,6 +711,7 @@ async def _run_case(case: Mapping[str, Any]) -> None:
# references. Pure key normalization; semantics unchanged.
if isinstance(case, dict):
_normalize_fan_out_subgraph_keys(case)
_synthesize_fan_out_aggregation(case)
# Per proposal 0040 fixtures 029 / 030: synthesize the
# augmentation middlewares that drive the per-instance /
# per-branch ``set_invocation_metadata`` calls. Both flow into
Expand Down