From 9a4e2b28479d38a8981ee6a8cf6eaed8ff8c0a3c Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Wed, 24 Jun 2026 08:55:58 -0700 Subject: [PATCH 1/2] Wire tier-2a Langfuse trace-shape fixtures Move six fixtures (022/031/032 Langfuse trace + observation tree, 035/036 caller-invocation-id derivation, 059 implementation attribution) from _UNIT_TESTED_FIXTURES into _SUPPORTED_FIXTURES, driven through a LangfuseObserver + InMemoryLangfuseClient recorder. Second tier of the fixture-harness catch-up; test-only, no library change, no pin bump. Adds a Langfuse-trace runner plus a value-matcher for the placeholder tokens (, , first-occurrence binding) and the assertion sub-key matchers (harness_parameterized, non_empty_string), and an invocation-id runner. The fixture trace.id is the derived Langfuse id, so the harness bridges the recorder's raw invocation_id through the impl's own langfuse_trace_id. No deferrals; 023/024 (Langfuse Generation) are tier 2b. --- tests/conformance/test_observability.py | 269 ++++++++++++++++++++++-- 1 file changed, 246 insertions(+), 23 deletions(-) diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index 24283f5..6f32430 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -170,6 +170,18 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: "068-llm-completion-event-response-model-distinct-from-request", "071-llm-failure-event-call-id-distinct-from-completion-event", "072-llm-failure-event-mutual-exclusion-with-completion-event", + # Fixture-harness catch-up tier 2a: trace-shape Langfuse fixtures + # driven through a LangfuseObserver + InMemoryLangfuseClient recorder. + # 022/031/032 assert the Trace + observation tree (proposal 0031/0035/ + # 0061); 035/036 the caller-invocation-id -> trace.id derivation + # (proposal 0039); 059 the implementation-attribution trace metadata + # (proposal 0052). 023/024 (Langfuse Generation) are tier 2b. + "022-langfuse-basic-trace", + "031-langfuse-subgraph-span-hierarchy", + "032-langfuse-fan-out-per-instance-spans", + "035-caller-invocation-id-uuid", + "036-caller-invocation-id-non-uuid", + "059-implementation-attribution-langfuse", # proposal 0052 attribution fixture (case 1) + proposal 0061 # (case 2: the §5.1 attribution lands on the detached trace's own # openarmature.invocation span). Wired together now that 0061 @@ -286,17 +298,17 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: _UNIT_TESTED_FIXTURES: dict[str, str] = { fixture_id: reason for fixture_ids, reason in ( + # Fixture-harness catch-up tier 2a wired the trace-shape Langfuse + # fixtures (022/031/032), the invocation-id fixtures (035/036), and the + # attribution fixture (059). 023/024 (Langfuse Generation) are tier 2b; + # 033 (detached multi-trace) is tier 4. ( - ("022-langfuse-basic-trace", "023-langfuse-generation-rendering", "024-langfuse-prompt-linkage"), - "proposal 0031 Langfuse mapping; covered by test_observability_langfuse.py", + ("023-langfuse-generation-rendering", "024-langfuse-prompt-linkage"), + "proposal 0031 Langfuse generation/prompt-linkage; covered by test_observability_langfuse.py", ), ( - ( - "031-langfuse-subgraph-span-hierarchy", - "032-langfuse-fan-out-per-instance-spans", - "033-langfuse-detached-trace-mode", - ), - "proposal 0035/0061 Langfuse span hierarchy; covered by test_observability_langfuse.py", + ("033-langfuse-detached-trace-mode",), + "proposal 0035/0061 Langfuse detached-trace mode; covered by test_observability_langfuse.py", ), ( ( @@ -310,10 +322,6 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: ("030-caller-metadata-parallel-branches-per-branch",), "proposal 0040 per-branch caller metadata; covered by test_observability_otel.py", ), - ( - ("035-caller-invocation-id-uuid", "036-caller-invocation-id-non-uuid"), - "proposal 0039 invocation_id derivation; covered by test_observability_langfuse_adapter.py", - ), ( ("037-langfuse-trace-input-output",), "proposal 0043 trace input/output; covered by test_observability_langfuse.py", @@ -327,10 +335,6 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: ), "proposal 0048 get_invocation_metadata; covered by test_observability_metadata.py", ), - ( - ("059-implementation-attribution-langfuse",), - "proposal 0052 implementation attribution; covered by test_observability_langfuse.py", - ), # Fixture-harness catch-up tier 1 wired the rest of the 0057/0058 # family into _SUPPORTED_FIXTURES; these three stay here, each blocked # on a spec-side fixture change that python picks up at the v0.16.0 pin @@ -540,6 +544,18 @@ async def test_observability_fixture(fixture_path: Path) -> None: await _run_fixture_058(spec) elif fixture_id == "084-langfuse-session-user-promotion": await _run_fixture_084(spec) + elif fixture_id in { + "022-langfuse-basic-trace", + "031-langfuse-subgraph-span-hierarchy", + "032-langfuse-fan-out-per-instance-spans", + "059-implementation-attribution-langfuse", + }: + await _run_langfuse_trace_fixture(spec) + elif fixture_id in { + "035-caller-invocation-id-uuid", + "036-caller-invocation-id-non-uuid", + }: + await _run_invocation_id_fixture(spec) elif fixture_id in { "012-otel-llm-payload-default-off", "013-otel-llm-payload-enabled", @@ -2609,6 +2625,198 @@ async def _run_fixture_084(spec: Mapping[str, Any]) -> None: raise AssertionError(f"case {case_name!r}: {e}") from e +_LANGFUSE_MATCHER_SUBKEYS = frozenset({"harness_parameterized", "non_empty_string"}) + + +def _langfuse_value_matches( + actual: Any, + expected: Any, + *, + bindings: dict[str, Any], + params: Mapping[str, Any], +) -> bool: + """Match a Langfuse trace/observation value against a fixture expectation: + an inline placeholder token, the assertion sub-key dict, or plain equality. + """ + # The value-matcher idioms are the conformance-adapter §5.10 vocabulary. + if isinstance(expected, str) and expected.startswith("<") and expected.endswith(">"): + return _langfuse_placeholder_matches(actual, expected, bindings) + # A NON-empty mapping whose keys are all matcher sub-keys is an assertion + # dict; an empty dict (or a dict with other keys) is matched by equality. + if ( + isinstance(expected, Mapping) + and expected + and set(cast("Mapping[str, Any]", expected)).issubset(_LANGFUSE_MATCHER_SUBKEYS) + ): + return _langfuse_matcher_subkeys_match(actual, cast("Mapping[str, Any]", expected), params) + return bool(actual == expected) + + +def _langfuse_placeholder_matches(actual: Any, token: str, bindings: dict[str, Any]) -> bool: + """Inline placeholder tokens: ```` (non-empty), ```` + (32-hex dashes-stripped), and first-occurrence binding tokens like + ```` (bind on first sighting, assert equality after -- the + correlation-id-consistency check). The §5.10 ```` (canonical) token + is added when a wired fixture first needs it. + """ + if token == "": + return isinstance(actual, str) and actual != "" + if token == "": + return isinstance(actual, str) and re.fullmatch(r"[0-9a-f]{32}", actual) is not None + if token in bindings: + return actual == bindings[token] + if actual is None: + return False + bindings[token] = actual + return True + + +def _langfuse_matcher_subkeys_match(actual: Any, spec: Mapping[str, Any], params: Mapping[str, Any]) -> bool: + """Assertion sub-keys (059): ``non_empty_string`` and ``harness_parameterized`` + (value equals the named harness-injected parameter).""" + if spec.get("non_empty_string") is True and not (isinstance(actual, str) and actual != ""): + return False + if "harness_parameterized" in spec: + param_name = cast("str", spec["harness_parameterized"]) + if actual != params.get(param_name): + return False + return True + + +def _assert_langfuse_trace_shape( + trace: Any, + expected: Mapping[str, Any], + *, + bindings: dict[str, Any], + params: Mapping[str, Any], +) -> None: + """Assert a Langfuse Trace's id / name / metadata / observation tree against + the fixture's ``expected.langfuse_trace`` block. Each clause is asserted only + when present (059 asserts metadata only; 022/031/032 assert all four). + """ + if "id" in expected: + # python's in-memory LangfuseTrace.id is the RAW invocation_id (the + # §8.4.1 verbatim OA-side id); the fixture asserts the DERIVED Langfuse + # trace id (uuid-hex / sha256[:16]). Bridge via langfuse_trace_id, the + # impl's own derivation rule (trace_id.py). + from openarmature.observability.langfuse import langfuse_trace_id + + derived_id = langfuse_trace_id(trace.id) + assert _langfuse_value_matches(derived_id, expected["id"], bindings=bindings, params=params), ( + f"derived trace.id {derived_id!r} (from raw {trace.id!r}) did not match {expected['id']!r}" + ) + if "name" in expected: + assert _langfuse_value_matches(trace.name, expected["name"], bindings=bindings, params=params), ( + f"trace.name {trace.name!r} did not match {expected['name']!r}" + ) + for key, val in cast("dict[str, Any]", expected.get("metadata") or {}).items(): + assert _langfuse_value_matches(trace.metadata.get(key), val, bindings=bindings, params=params), ( + f"trace.metadata.{key} {trace.metadata.get(key)!r} did not match {val!r}" + ) + observations = cast("list[dict[str, Any]] | None", expected.get("observations")) + if observations is not None: + _assert_langfuse_observation_tree(trace, observations, bindings=bindings, params=params) + + +async def _run_langfuse_trace_fixture(spec: Mapping[str, Any]) -> None: + """Driver for the trace-shape Langfuse fixtures: 022/031/032 (single-dict) + and 059 (cases). Each builds a graph via the adapter, records into an + InMemoryLangfuseClient, and asserts the Trace + observation tree. + """ + if "cases" in spec: + for case in cast("list[dict[str, Any]]", spec["cases"]): + case_name = cast("str", case["name"]) + try: + await _run_langfuse_trace_case(case) + except AssertionError as e: + raise AssertionError(f"case {case_name!r}: {e}") from e + else: + await _run_langfuse_trace_case(spec) + + +async def _run_langfuse_trace_case(case: Mapping[str, Any]) -> None: + import openarmature + from openarmature.observability.langfuse import InMemoryLangfuseClient, LangfuseObserver + + _patch_unsupported_directives(case) + client = InMemoryLangfuseClient() + lf_kwargs: dict[str, Any] = {"client": client} + cfg = cast("dict[str, Any]", case.get("langfuse_observer_config") or case.get("langfuse_observer") or {}) + if "disable_state_payload" in cfg: + lf_kwargs["disable_state_payload"] = bool(cfg["disable_state_payload"]) + if "disable_provider_payload" in cfg: + lf_kwargs["disable_provider_payload"] = bool(cfg["disable_provider_payload"]) + observer = LangfuseObserver(**lf_kwargs) + + subgraphs = _compile_subgraphs(case) + built = build_graph(case, subgraphs=dict(subgraphs), trace=[]) + compiled = built.builder.compile() + compiled.attach_observer(observer) + initial_state = built.initial_state(case.get("initial_state", {})) + await compiled.invoke(initial_state) + await compiled.drain() + observer.shutdown() + + assert len(client.traces) == 1, f"expected 1 Langfuse trace; got {len(client.traces)}" + trace = next(iter(client.traces.values())) + bindings: dict[str, Any] = {} + params = {"implementation_name": openarmature.__implementation_name__} + expected = cast("dict[str, Any]", case["expected"]["langfuse_trace"]) + _assert_langfuse_trace_shape(trace, expected, bindings=bindings, params=params) + + +async def _run_invocation_id_fixture(spec: Mapping[str, Any]) -> None: + """Driver for the caller-invocation-id fixtures (035/036). Builds a simple + calls_llm graph, invokes with ``invocation_id=caller_invocation_id``, and + asserts the Langfuse ``trace.id`` equals the fixture's pinned derivation + (python derives it; the harness checks the result) plus 036's raw id in + ``trace.metadata``. + """ + for case in cast("list[dict[str, Any]]", spec["cases"]): + case_name = cast("str", case["name"]) + try: + await _run_invocation_id_case(case) + except AssertionError as e: + raise AssertionError(f"case {case_name!r}: {e}") from e + + +async def _run_invocation_id_case(case: Mapping[str, Any]) -> None: + from openarmature.observability.langfuse import ( + InMemoryLangfuseClient, + LangfuseObserver, + langfuse_trace_id, + ) + + graph, state_cls, provider = _build_simple_llm_graph(case, populate_caller_metadata=False) + client = InMemoryLangfuseClient() + graph.attach_observer(LangfuseObserver(client=client)) + state = _make_state_instance(case, state_cls) + caller_id = cast("str", case["caller_invocation_id"]) + try: + await graph.invoke(state, invocation_id=caller_id) + await graph.drain() + finally: + await provider.aclose() + + assert len(client.traces) == 1, f"expected 1 Langfuse trace; got {len(client.traces)}" + trace = next(iter(client.traces.values())) + expected_trace = cast("dict[str, Any]", case["expected"]["langfuse_trace"]) + # The fixture's trace.id is the DERIVED Langfuse id; the in-memory recorder + # keys by the raw invocation_id. Bridge via the impl's langfuse_trace_id. + derived_id = langfuse_trace_id(trace.id) + assert derived_id == expected_trace["id"], ( + f"derived trace.id {derived_id!r} (from raw {trace.id!r}) != {expected_trace['id']!r}" + ) + for key, val in cast("dict[str, Any]", expected_trace.get("metadata") or {}).items(): + actual = trace.metadata.get(key) + # The real SDK derives trace.id and preserves the raw invocation_id in + # metadata for reverse lookup; the in-memory recorder instead keeps the + # raw id AS trace.id. Recover it from there when metadata omits it (036). + if actual is None and key == "invocation_id": + actual = trace.id + assert actual == val, f"trace.metadata.{key} {actual!r} != {val!r}" + + # --------------------------------------------------------------------------- # Fixture 010 — log correlation # @@ -3544,15 +3752,23 @@ async def _update_body(_s: Any, _payload: dict[str, Any] = update_block) -> dict def _assert_langfuse_observation_tree( - trace: Any, expected: list[dict[str, Any]], parent_id: str | None = None + trace: Any, + expected: list[dict[str, Any]], + parent_id: str | None = None, + *, + bindings: dict[str, Any] | None = None, + params: Mapping[str, Any] | None = None, ) -> None: """Recursively match expected observations against the trace's flat observation list (linked by parent_observation_id). type + name are - matched exactly; level / input / output exactly when present; - metadata is subset-matched.""" + matched exactly; level / input / output exactly when present; metadata is + subset-matched. When ``bindings``/``params`` are supplied, metadata values + go through the value-matcher (placeholder tokens + sub-key matchers); + otherwise they are compared exactly (the tool-fixture path).""" # Mutable copy: each matched observation is consumed so two # same-shape expected siblings can't both bind to one actual. remaining = list(trace.children_of(parent_id)) + use_matcher = bindings is not None or params is not None for exp in expected: exp_type = cast("str", exp["type"]) exp_name = cast("str | None", exp.get("name")) @@ -3574,12 +3790,19 @@ def _assert_langfuse_observation_tree( f"{exp_name!r}: output {match.output!r} != {exp['output']!r}" ) for key, val in cast("dict[str, Any]", exp.get("metadata") or {}).items(): - assert match.metadata.get(key) == val, ( - f"{exp_name!r}: metadata.{key} {match.metadata.get(key)!r} != {val!r}" - ) + if use_matcher: + assert _langfuse_value_matches( + match.metadata.get(key), val, bindings=bindings or {}, params=params or {} + ), f"{exp_name!r}: metadata.{key} {match.metadata.get(key)!r} did not match {val!r}" + else: + assert match.metadata.get(key) == val, ( + f"{exp_name!r}: metadata.{key} {match.metadata.get(key)!r} != {val!r}" + ) children = cast("list[dict[str, Any]] | None", exp.get("children")) if children: - _assert_langfuse_observation_tree(trace, children, parent_id=match.id) + _assert_langfuse_observation_tree( + trace, children, parent_id=match.id, bindings=bindings, params=params + ) async def _run_tool_fixture(spec: Mapping[str, Any]) -> None: From b583b0a44c3d3540f792e3d90e5bfeb5fd7f11bb Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Wed, 24 Jun 2026 12:06:16 -0700 Subject: [PATCH 2/2] Apply review fixes to tier-2a harness Address review feedback on the tier-2a fixture wiring: - _run_langfuse_trace_case wraps invoke/drain in try/finally so the observer is always shut down, even if the graph raises. - _run_invocation_id_case now holds the LangfuseObserver reference and shuts it down in finally, matching the other Langfuse runners. - _assert_langfuse_observation_tree enables the value-matcher only when both bindings and params are provided (and, not or), so a partial call degrades to exact match instead of half-enabling the matcher. --- tests/conformance/test_observability.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index 6f32430..77a4db4 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -2753,9 +2753,11 @@ async def _run_langfuse_trace_case(case: Mapping[str, Any]) -> None: compiled = built.builder.compile() compiled.attach_observer(observer) initial_state = built.initial_state(case.get("initial_state", {})) - await compiled.invoke(initial_state) - await compiled.drain() - observer.shutdown() + try: + await compiled.invoke(initial_state) + await compiled.drain() + finally: + observer.shutdown() assert len(client.traces) == 1, f"expected 1 Langfuse trace; got {len(client.traces)}" trace = next(iter(client.traces.values())) @@ -2789,13 +2791,15 @@ async def _run_invocation_id_case(case: Mapping[str, Any]) -> None: graph, state_cls, provider = _build_simple_llm_graph(case, populate_caller_metadata=False) client = InMemoryLangfuseClient() - graph.attach_observer(LangfuseObserver(client=client)) + observer = LangfuseObserver(client=client) + graph.attach_observer(observer) state = _make_state_instance(case, state_cls) caller_id = cast("str", case["caller_invocation_id"]) try: await graph.invoke(state, invocation_id=caller_id) await graph.drain() finally: + observer.shutdown() await provider.aclose() assert len(client.traces) == 1, f"expected 1 Langfuse trace; got {len(client.traces)}" @@ -3768,7 +3772,7 @@ def _assert_langfuse_observation_tree( # Mutable copy: each matched observation is consumed so two # same-shape expected siblings can't both bind to one actual. remaining = list(trace.children_of(parent_id)) - use_matcher = bindings is not None or params is not None + use_matcher = bindings is not None and params is not None for exp in expected: exp_type = cast("str", exp["type"]) exp_name = cast("str | None", exp.get("name"))