From 082f0b68fa8bb4111f6648cc38d4f29c842e14f0 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Wed, 24 Jun 2026 14:49:24 -0700 Subject: [PATCH 1/2] Relocate Langfuse fixtures to their own runner The Langfuse-mapping observability fixtures are fixture-tested by the dedicated sibling runner tests/conformance/test_observability_langfuse.py, not by unit tests. The _UNIT_TESTED_FIXTURES bucket in test_observability.py mislabeled them, so the fixture-harness catch-up wrongly wired 022/023/024/031/032 into test_observability.py -- duplicating coverage that already existed in the sibling runner -- and homed the genuinely-new 035/036/059 there too. Revert all Langfuse wiring from test_observability.py (back to its post-tier-1 state) and add a _LANGFUSE_HARNESS_FIXTURES bucket that the coverage guard counts and the dispatcher skips, recording that the sibling runner owns them. Relocate 035/036/059 into test_observability_langfuse.py, extending its runner for caller_invocation_id, the derived-trace.id bridge, 036's raw invocation_id, and the harness_parameterized / non_empty_string metadata matcher. Test-only; no behavior or pin change. --- tests/conformance/test_observability.py | 481 ++---------------- .../test_observability_langfuse.py | 65 ++- 2 files changed, 112 insertions(+), 434 deletions(-) diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index bda2627..8cefcfa 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -170,23 +170,6 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: "068-llm-completion-event-response-model-distinct-from-request", "071-llm-failure-event-call-id-distinct-from-completion-event", "072-llm-failure-event-mutual-exclusion-with-completion-event", - # Fixture-harness catch-up tier 2a: trace-shape Langfuse fixtures - # driven through a LangfuseObserver + InMemoryLangfuseClient recorder. - # 022/031/032 assert the Trace + observation tree (proposal 0031/0035/ - # 0061); 035/036 the caller-invocation-id -> trace.id derivation - # (proposal 0039); 059 the implementation-attribution trace metadata - # (proposal 0052). 023/024 (Langfuse Generation) are tier 2b. - "022-langfuse-basic-trace", - "031-langfuse-subgraph-span-hierarchy", - "032-langfuse-fan-out-per-instance-spans", - "035-caller-invocation-id-uuid", - "036-caller-invocation-id-non-uuid", - "059-implementation-attribution-langfuse", - # Tier 2b: Langfuse Generation observation (proposal 0031 §8.4.3/§8.4.4) - # -- model / modelParameters / usage / input-output payload (with - # truncation) and prompt-entity linkage. - "023-langfuse-generation-rendering", - "024-langfuse-prompt-linkage", # proposal 0052 attribution fixture (case 1) + proposal 0061 # (case 2: the §5.1 attribution lands on the detached trace's own # openarmature.invocation span). Wired together now that 0061 @@ -297,35 +280,48 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: } +# _LANGFUSE_HARNESS_FIXTURES — Langfuse-mapping fixtures fixture-tested by the +# sibling conformance runner tests/conformance/test_observability_langfuse.py +# (NOT unit tests). They are skipped here and asserted there; the coverage guard +# counts them as accounted. 035/036 (invocation-id) + 059 (attribution) were +# relocated here from this file by the fixture-harness catch-up. +_LANGFUSE_HARNESS_FIXTURES: frozenset[str] = frozenset( + { + "022-langfuse-basic-trace", + "023-langfuse-generation-rendering", + "024-langfuse-prompt-linkage", + "027-langfuse-caller-supplied-metadata", + "031-langfuse-subgraph-span-hierarchy", + "032-langfuse-fan-out-per-instance-spans", + "033-langfuse-detached-trace-mode", + "034-caller-metadata-open-span-update-serial", + "035-caller-invocation-id-uuid", + "036-caller-invocation-id-non-uuid", + "037-langfuse-trace-input-output", + "059-implementation-attribution-langfuse", + } +) + + # _UNIT_TESTED_FIXTURES — implemented behavior covered by the dedicated unit # suite rather than wired into this YAML harness. Value names the proposal + # the covering file. _UNIT_TESTED_FIXTURES: dict[str, str] = { fixture_id: reason for fixture_ids, reason in ( - # Fixture-harness catch-up tier 2 wired the trace-shape Langfuse - # fixtures (022/031/032), invocation-id (035/036), attribution (059) in - # 2a, and the Langfuse Generation fixtures (023/024) in 2b. 033 (detached - # multi-trace) is tier 4. + # The Langfuse-mapping fixtures are fixture-tested by the sibling + # conformance runner test_observability_langfuse.py -- see + # _LANGFUSE_HARNESS_FIXTURES, NOT here (they are not unit-only). 029/030 + # stay below: deferred in that file (harness gaps), unit-tested for now. ( - ("033-langfuse-detached-trace-mode",), - "proposal 0035/0061 Langfuse detached-trace mode; covered by test_observability_langfuse.py", - ), - ( - ( - "027-langfuse-caller-supplied-metadata", - "029-caller-metadata-fan-out-per-instance", - "034-caller-metadata-open-span-update-serial", - ), - "proposal 0034/0040 caller metadata; covered by test_observability_langfuse.py", + ("029-caller-metadata-fan-out-per-instance",), + "proposal 0040 fan-out per-instance caller metadata; unit-tested; " + "deferred in test_observability_langfuse.py", ), ( ("030-caller-metadata-parallel-branches-per-branch",), - "proposal 0040 per-branch caller metadata; covered by test_observability_otel.py", - ), - ( - ("037-langfuse-trace-input-output",), - "proposal 0043 trace input/output; covered by test_observability_langfuse.py", + "proposal 0040 per-branch caller metadata; covered by " + "test_observability_otel.py; deferred in test_observability_langfuse.py", ), ( ( @@ -437,6 +433,7 @@ def test_observability_fixture_coverage_is_complete() -> None: | _DEFERRED_FIXTURES.keys() | _UNIT_TESTED_FIXTURES.keys() | _CONVENTION_ONLY_FIXTURES.keys() + | _LANGFUSE_HARNESS_FIXTURES ) unaccounted = sorted(all_ids - accounted) assert not unaccounted, ( @@ -450,7 +447,12 @@ def test_observability_fixture_coverage_is_complete() -> None: stale = sorted(accounted - all_ids) assert not stale, f"accounting entries with no fixture file (remove): {stale}" # A fixture cannot be both run and documented-as-not-run. - not_run = _DEFERRED_FIXTURES.keys() | _UNIT_TESTED_FIXTURES.keys() | _CONVENTION_ONLY_FIXTURES.keys() + not_run = ( + _DEFERRED_FIXTURES.keys() + | _UNIT_TESTED_FIXTURES.keys() + | _CONVENTION_ONLY_FIXTURES.keys() + | _LANGFUSE_HARNESS_FIXTURES + ) overlap = sorted(set(_SUPPORTED_FIXTURES) & not_run) assert not overlap, f"fixtures both run and documented-as-not-run (pick one): {overlap}" @@ -463,6 +465,8 @@ def test_observability_fixture_coverage_is_complete() -> None: @pytest.mark.parametrize("fixture_path", _fixture_paths(), ids=_fixture_id) async def test_observability_fixture(fixture_path: Path) -> None: fixture_id = fixture_path.stem + if fixture_id in _LANGFUSE_HARNESS_FIXTURES: + pytest.skip(f"{fixture_id}: fixture-tested by tests/conformance/test_observability_langfuse.py") skip_reason = ( _DEFERRED_FIXTURES.get(fixture_id) or _UNIT_TESTED_FIXTURES.get(fixture_id) @@ -545,23 +549,6 @@ async def test_observability_fixture(fixture_path: Path) -> None: await _run_fixture_058(spec) elif fixture_id == "084-langfuse-session-user-promotion": await _run_fixture_084(spec) - elif fixture_id in { - "022-langfuse-basic-trace", - "031-langfuse-subgraph-span-hierarchy", - "032-langfuse-fan-out-per-instance-spans", - "059-implementation-attribution-langfuse", - }: - await _run_langfuse_trace_fixture(spec) - elif fixture_id in { - "035-caller-invocation-id-uuid", - "036-caller-invocation-id-non-uuid", - }: - await _run_invocation_id_fixture(spec) - elif fixture_id in { - "023-langfuse-generation-rendering", - "024-langfuse-prompt-linkage", - }: - await _run_langfuse_generation_fixture(spec) elif fixture_id in { "012-otel-llm-payload-default-off", "013-otel-llm-payload-enabled", @@ -2631,269 +2618,6 @@ async def _run_fixture_084(spec: Mapping[str, Any]) -> None: raise AssertionError(f"case {case_name!r}: {e}") from e -_LANGFUSE_MATCHER_SUBKEYS = frozenset({"harness_parameterized", "non_empty_string"}) - - -def _langfuse_value_matches( - actual: Any, - expected: Any, - *, - bindings: dict[str, Any], - params: Mapping[str, Any], -) -> bool: - """Match a Langfuse trace/observation value against a fixture expectation: - an inline placeholder token, the assertion sub-key dict, or plain equality. - """ - # The value-matcher idioms are the conformance-adapter §5.10 vocabulary. - if isinstance(expected, str) and expected.startswith("<") and expected.endswith(">"): - return _langfuse_placeholder_matches(actual, expected, bindings) - # A NON-empty mapping whose keys are all matcher sub-keys is an assertion - # dict; an empty dict (or a dict with other keys) is matched by equality. - if ( - isinstance(expected, Mapping) - and expected - and set(cast("Mapping[str, Any]", expected)).issubset(_LANGFUSE_MATCHER_SUBKEYS) - ): - return _langfuse_matcher_subkeys_match(actual, cast("Mapping[str, Any]", expected), params) - # A regular NON-empty nested mapping (e.g. 024 metadata.prompt): recurse per - # key so inner tokens (rendered_hash: ) still apply. Subset over - # keys -- every expected key must be present and match; actual MAY carry - # extras. An empty expected dict falls through to exact equality below - # (rather than vacuously matching any mapping). - if isinstance(expected, Mapping) and expected: - if not isinstance(actual, Mapping): - return False - actual_map = cast("Mapping[str, Any]", actual) - return all( - k in actual_map and _langfuse_value_matches(actual_map[k], v, bindings=bindings, params=params) - for k, v in cast("Mapping[str, Any]", expected).items() - ) - return bool(actual == expected) - - -def _langfuse_placeholder_matches(actual: Any, token: str, bindings: dict[str, Any]) -> bool: - """Inline placeholder tokens: ```` (non-empty), ```` - (32-hex dashes-stripped), and first-occurrence binding tokens like - ```` (bind on first sighting, assert equality after -- the - correlation-id-consistency check). The §5.10 ```` (canonical) token - is added when a wired fixture first needs it. - """ - if token == "": - return isinstance(actual, str) and actual != "" - if token == "": - return isinstance(actual, str) and re.fullmatch(r"[0-9a-f]{32}", actual) is not None - if token in bindings: - return actual == bindings[token] - if actual is None: - return False - bindings[token] = actual - return True - - -def _langfuse_matcher_subkeys_match(actual: Any, spec: Mapping[str, Any], params: Mapping[str, Any]) -> bool: - """Assertion sub-keys (059): ``non_empty_string`` and ``harness_parameterized`` - (value equals the named harness-injected parameter).""" - if spec.get("non_empty_string") is True and not (isinstance(actual, str) and actual != ""): - return False - if "harness_parameterized" in spec: - param_name = cast("str", spec["harness_parameterized"]) - if actual != params.get(param_name): - return False - return True - - -def _assert_langfuse_trace_shape( - trace: Any, - expected: Mapping[str, Any], - *, - bindings: dict[str, Any], - params: Mapping[str, Any], -) -> None: - """Assert a Langfuse Trace's id / name / metadata / observation tree against - the fixture's ``expected.langfuse_trace`` block. Each clause is asserted only - when present (059 asserts metadata only; 022/031/032 assert all four). - """ - if "id" in expected: - # python's in-memory LangfuseTrace.id is the RAW invocation_id (the - # §8.4.1 verbatim OA-side id); the fixture asserts the DERIVED Langfuse - # trace id (uuid-hex / sha256[:16]). Bridge via langfuse_trace_id, the - # impl's own derivation rule (trace_id.py). - from openarmature.observability.langfuse import langfuse_trace_id - - derived_id = langfuse_trace_id(trace.id) - assert _langfuse_value_matches(derived_id, expected["id"], bindings=bindings, params=params), ( - f"derived trace.id {derived_id!r} (from raw {trace.id!r}) did not match {expected['id']!r}" - ) - if "name" in expected: - assert _langfuse_value_matches(trace.name, expected["name"], bindings=bindings, params=params), ( - f"trace.name {trace.name!r} did not match {expected['name']!r}" - ) - for key, val in cast("dict[str, Any]", expected.get("metadata") or {}).items(): - assert _langfuse_value_matches(trace.metadata.get(key), val, bindings=bindings, params=params), ( - f"trace.metadata.{key} {trace.metadata.get(key)!r} did not match {val!r}" - ) - observations = cast("list[dict[str, Any]] | None", expected.get("observations")) - if observations is not None: - _assert_langfuse_observation_tree(trace, observations, bindings=bindings, params=params) - - -async def _run_langfuse_trace_fixture(spec: Mapping[str, Any]) -> None: - """Driver for the trace-shape Langfuse fixtures: 022/031/032 (single-dict) - and 059 (cases). Each builds a graph via the adapter, records into an - InMemoryLangfuseClient, and asserts the Trace + observation tree. - """ - if "cases" in spec: - for case in cast("list[dict[str, Any]]", spec["cases"]): - case_name = cast("str", case["name"]) - try: - await _run_langfuse_trace_case(case) - except AssertionError as e: - raise AssertionError(f"case {case_name!r}: {e}") from e - else: - await _run_langfuse_trace_case(spec) - - -async def _run_langfuse_trace_case(case: Mapping[str, Any]) -> None: - import openarmature - from openarmature.observability.langfuse import InMemoryLangfuseClient, LangfuseObserver - - _patch_unsupported_directives(case) - client = InMemoryLangfuseClient() - lf_kwargs: dict[str, Any] = {"client": client} - cfg = cast("dict[str, Any]", case.get("langfuse_observer_config") or case.get("langfuse_observer") or {}) - if "disable_state_payload" in cfg: - lf_kwargs["disable_state_payload"] = bool(cfg["disable_state_payload"]) - if "disable_provider_payload" in cfg: - lf_kwargs["disable_provider_payload"] = bool(cfg["disable_provider_payload"]) - observer = LangfuseObserver(**lf_kwargs) - - subgraphs = _compile_subgraphs(case) - built = build_graph(case, subgraphs=dict(subgraphs), trace=[]) - compiled = built.builder.compile() - compiled.attach_observer(observer) - initial_state = built.initial_state(case.get("initial_state", {})) - try: - await compiled.invoke(initial_state) - await compiled.drain() - finally: - observer.shutdown() - - assert len(client.traces) == 1, f"expected 1 Langfuse trace; got {len(client.traces)}" - trace = next(iter(client.traces.values())) - bindings: dict[str, Any] = {} - params = {"implementation_name": openarmature.__implementation_name__} - expected = cast("dict[str, Any]", case["expected"]["langfuse_trace"]) - _assert_langfuse_trace_shape(trace, expected, bindings=bindings, params=params) - - -async def _run_invocation_id_fixture(spec: Mapping[str, Any]) -> None: - """Driver for the caller-invocation-id fixtures (035/036). Builds a simple - calls_llm graph, invokes with ``invocation_id=caller_invocation_id``, and - asserts the Langfuse ``trace.id`` equals the fixture's pinned derivation - (python derives it; the harness checks the result) plus 036's raw id in - ``trace.metadata``. - """ - for case in cast("list[dict[str, Any]]", spec["cases"]): - case_name = cast("str", case["name"]) - try: - await _run_invocation_id_case(case) - except AssertionError as e: - raise AssertionError(f"case {case_name!r}: {e}") from e - - -async def _run_invocation_id_case(case: Mapping[str, Any]) -> None: - from openarmature.observability.langfuse import ( - InMemoryLangfuseClient, - LangfuseObserver, - langfuse_trace_id, - ) - - graph, state_cls, provider = _build_simple_llm_graph(case, populate_caller_metadata=False) - client = InMemoryLangfuseClient() - observer = LangfuseObserver(client=client) - graph.attach_observer(observer) - state = _make_state_instance(case, state_cls) - caller_id = cast("str", case["caller_invocation_id"]) - try: - await graph.invoke(state, invocation_id=caller_id) - await graph.drain() - finally: - observer.shutdown() - await provider.aclose() - - assert len(client.traces) == 1, f"expected 1 Langfuse trace; got {len(client.traces)}" - trace = next(iter(client.traces.values())) - expected_trace = cast("dict[str, Any]", case["expected"]["langfuse_trace"]) - # The fixture's trace.id is the DERIVED Langfuse id; the in-memory recorder - # keys by the raw invocation_id. Bridge via the impl's langfuse_trace_id. - derived_id = langfuse_trace_id(trace.id) - assert derived_id == expected_trace["id"], ( - f"derived trace.id {derived_id!r} (from raw {trace.id!r}) != {expected_trace['id']!r}" - ) - for key, val in cast("dict[str, Any]", expected_trace.get("metadata") or {}).items(): - actual = trace.metadata.get(key) - # The real SDK derives trace.id and preserves the raw invocation_id in - # metadata for reverse lookup; the in-memory recorder instead keeps the - # raw id AS trace.id. Recover it from there when metadata omits it (036). - if actual is None and key == "invocation_id": - actual = trace.id - assert actual == val, f"trace.metadata.{key} {actual!r} != {val!r}" - - # The fixture's top-level verbatim invocation_id clause (the §5.1 - # caller_invocation_id_verbatim_on_attribute invariant): on the OTel side it - # is the openarmature.invocation_id span attribute; in the Langfuse runner - # the verbatim id surfaces as the in-memory recorder's raw trace.id. - expected_invocation_id = cast("dict[str, Any]", case["expected"]).get("invocation_id") - if expected_invocation_id is not None: - assert trace.id == expected_invocation_id, ( - f"verbatim invocation_id: raw trace.id {trace.id!r} != {expected_invocation_id!r}" - ) - - -async def _run_langfuse_generation_fixture(spec: Mapping[str, Any]) -> None: - """Driver for the Langfuse Generation fixtures (023 generation rendering + - truncation, 024 prompt linkage). Builds a calls_llm graph, records into an - InMemoryLangfuseClient under the fixture's observer config, and asserts the - Generation observation nested under the node span. - """ - for case in cast("list[dict[str, Any]]", spec["cases"]): - case_name = cast("str", case["name"]) - try: - await _run_langfuse_generation_case(case) - except AssertionError as e: - raise AssertionError(f"case {case_name!r}: {e}") from e - - -async def _run_langfuse_generation_case(case: Mapping[str, Any]) -> None: - import openarmature - from openarmature.observability.langfuse import InMemoryLangfuseClient, LangfuseObserver - - graph, state_cls, provider = _build_simple_llm_graph(case, populate_caller_metadata=False) - client = InMemoryLangfuseClient() - cfg = cast("dict[str, Any]", case.get("langfuse_observer") or {}) - lf_kwargs: dict[str, Any] = {"client": client} - if "disable_provider_payload" in cfg: - lf_kwargs["disable_provider_payload"] = bool(cfg["disable_provider_payload"]) - if "payload_byte_cap" in cfg: - lf_kwargs["payload_byte_cap"] = int(cfg["payload_byte_cap"]) - observer = LangfuseObserver(**lf_kwargs) - graph.attach_observer(observer) - state = _make_state_instance(case, state_cls) - try: - await graph.invoke(state) - await graph.drain() - finally: - observer.shutdown() - await provider.aclose() - - assert len(client.traces) == 1, f"expected 1 Langfuse trace; got {len(client.traces)}" - trace = next(iter(client.traces.values())) - bindings: dict[str, Any] = {} - params = {"implementation_name": openarmature.__implementation_name__} - expected = cast("dict[str, Any]", case["expected"]["langfuse_trace"]) - _assert_langfuse_trace_shape(trace, expected, bindings=bindings, params=params) - - # --------------------------------------------------------------------------- # Fixture 010 — log correlation # @@ -3828,105 +3552,21 @@ async def _update_body(_s: Any, _payload: dict[str, Any] = update_block) -> dict return builder.compile(), state_cls, providers -def _assert_langfuse_generation_fields( - exp_name: str | None, - match: Any, - exp: Mapping[str, Any], - *, - bindings: dict[str, Any], - params: Mapping[str, Any], -) -> None: - """Generation-observation fields beyond the base span shape (023/024): - model / modelParameters / usage, the input parse-or-truncation shapes, and - the prompt-entity link. Each is asserted only when present, so it is inert - for span / tool observations. The placeholder-capable fields go through the - value-matcher (consistent with metadata); usage is a typed integer record. - """ - if "model" in exp: - assert _langfuse_value_matches(match.model, exp["model"], bindings=bindings, params=params), ( - f"{exp_name!r}: model {match.model!r} did not match {exp['model']!r}" - ) - if "modelParameters" in exp: - assert _langfuse_value_matches( - match.model_parameters, exp["modelParameters"], bindings=bindings, params=params - ), f"{exp_name!r}: modelParameters {match.model_parameters!r} != {exp['modelParameters']!r}" - if "usage" in exp: - u = cast("dict[str, Any]", exp["usage"]) - got = None if match.usage is None else (match.usage.input, match.usage.output, match.usage.total) - assert got == (u["input"], u["output"], u["total"]), f"{exp_name!r}: usage {got!r} != {u!r}" - if "prompt_entity_link" in exp: - assert _langfuse_value_matches( - match.prompt_entity_link, exp["prompt_entity_link"], bindings=bindings, params=params - ), ( - f"{exp_name!r}: prompt_entity_link {match.prompt_entity_link!r} " - f"did not match {exp['prompt_entity_link']!r}" - ) - if exp.get("prompt_entity_link_absent") is True: - assert match.prompt_entity_link is None, ( - f"{exp_name!r}: expected no prompt_entity_link; got {match.prompt_entity_link!r}" - ) - if "input_parses_as_messages" in exp: - # Under-cap input is the native message list (§8.7); compare directly. - assert match.input == exp["input_parses_as_messages"], ( - f"{exp_name!r}: input {match.input!r} did not parse as {exp['input_parses_as_messages']!r}" - ) - if exp.get("input_is_raw_string_with_marker") is True: - # Over-cap input falls through to the raw truncated string + §5.5.5 marker. - assert isinstance(match.input, str) and re.search(r"\[truncated, \d+ bytes total\]", match.input), ( - f"{exp_name!r}: expected a raw truncated string with the marker; got {match.input!r}" - ) - - -def _obs_selection_matches(obs: Any, exp_metadata: Mapping[str, Any]) -> bool: - """Read-only disambiguator for same-(type, name) sibling observations: an - actual is a candidate when its scalar expected-metadata values match. - - Only scalars (str / int / float / bool) are used: placeholder tokens are - shared across siblings (correlation_id) so they don't disambiguate, and - running the value-matcher here would fire its binding side effects during - selection; sequences (namespace) are left to the value-matcher's list/tuple - handling. fan_out_index / step are the fields that actually distinguish. - """ - for key, val in exp_metadata.items(): - is_placeholder = isinstance(val, str) and val.startswith("<") and val.endswith(">") - if isinstance(val, (str, int, float)) and not is_placeholder and obs.metadata.get(key) != val: - return False - return True - - def _assert_langfuse_observation_tree( - trace: Any, - expected: list[dict[str, Any]], - parent_id: str | None = None, - *, - bindings: dict[str, Any] | None = None, - params: Mapping[str, Any] | None = None, + trace: Any, expected: list[dict[str, Any]], parent_id: str | None = None ) -> None: """Recursively match expected observations against the trace's flat observation list (linked by parent_observation_id). type + name are - matched exactly; level / input / output exactly when present; metadata is - subset-matched. When ``bindings``/``params`` are supplied, metadata values - go through the value-matcher (placeholder tokens + sub-key matchers); - otherwise they are compared exactly (the tool-fixture path).""" + matched exactly; level / input / output exactly when present; + metadata is subset-matched.""" # Mutable copy: each matched observation is consumed so two # same-shape expected siblings can't both bind to one actual. remaining = list(trace.children_of(parent_id)) - use_matcher = bindings is not None and params is not None for exp in expected: exp_type = cast("str", exp["type"]) exp_name = cast("str | None", exp.get("name")) - # Disambiguate same-(type, name) siblings (e.g. 032's per-instance - # "process" spans) by their scalar metadata, not list/emission order, so - # the assertions can't bind the wrong sibling if emission order shifts. - exp_meta = cast("dict[str, Any]", exp.get("metadata") or {}) match = next( - ( - o - for o in remaining - if o.type == exp_type - and (exp_name is None or o.name == exp_name) - and _obs_selection_matches(o, exp_meta) - ), + (o for o in remaining if o.type == exp_type and (exp_name is None or o.name == exp_name)), None, ) assert match is not None, ( @@ -3943,20 +3583,12 @@ def _assert_langfuse_observation_tree( f"{exp_name!r}: output {match.output!r} != {exp['output']!r}" ) for key, val in cast("dict[str, Any]", exp.get("metadata") or {}).items(): - if use_matcher: - assert _langfuse_value_matches( - match.metadata.get(key), val, bindings=bindings or {}, params=params or {} - ), f"{exp_name!r}: metadata.{key} {match.metadata.get(key)!r} did not match {val!r}" - else: - assert match.metadata.get(key) == val, ( - f"{exp_name!r}: metadata.{key} {match.metadata.get(key)!r} != {val!r}" - ) - _assert_langfuse_generation_fields(exp_name, match, exp, bindings=bindings or {}, params=params or {}) + assert match.metadata.get(key) == val, ( + f"{exp_name!r}: metadata.{key} {match.metadata.get(key)!r} != {val!r}" + ) children = cast("list[dict[str, Any]] | None", exp.get("children")) if children: - _assert_langfuse_observation_tree( - trace, children, parent_id=match.id, bindings=bindings, params=params - ) + _assert_langfuse_observation_tree(trace, children, parent_id=match.id) async def _run_tool_fixture(spec: Mapping[str, Any]) -> None: @@ -4296,13 +3928,6 @@ def _materialize_typed_messages(messages_spec: Sequence[Mapping[str, Any]]) -> l for m in messages_spec: role = m.get("role") content = m.get("content") - # content_repeat synthesis (023 case 2 / fixture 014, mirroring the OTel - # _materialize_messages helper): N repetitions of a single char to drive - # payload truncation. The fixtures use a single-byte ASCII char, so the - # char count equals the byte count. - cr = cast("Mapping[str, Any] | None", m.get("content_repeat")) - if cr is not None: - content = cast("str", cr["char"]) * int(cr["bytes"]) if role == "system": out.append(SystemMessage(content=_require_text_content(role, content))) elif role == "user": @@ -4336,13 +3961,6 @@ def _render_prompt_result(case: Mapping[str, Any], prompt_name: str) -> Any: rendered = rendered.replace("{{" + key + "}}", str(value)).replace("{{ " + key + " }}", str(value)) messages: list[Message] = [UserMessage(content=rendered)] now = datetime.now(UTC) - # A backend that exposes a Langfuse Prompt reference (024 case 1, - # mock_with_langfuse_reference) surfaces it as the langfuse_prompt - # observability entity; the observer reads it to link the Generation. - observability_entities: dict[str, Any] | None = None - reference = entry.get("langfuse_prompt_reference") - if reference is not None: - observability_entities = {"langfuse_prompt": reference} return PromptResult( name=cast("str", entry["name"]), version=cast("str", entry["version"]), @@ -4353,7 +3971,6 @@ def _render_prompt_result(case: Mapping[str, Any], prompt_name: str) -> Any: variables=variables, fetched_at=now, rendered_at=now, - observability_entities=observability_entities, ) diff --git a/tests/conformance/test_observability_langfuse.py b/tests/conformance/test_observability_langfuse.py index 570359c..1d2a067 100644 --- a/tests/conformance/test_observability_langfuse.py +++ b/tests/conformance/test_observability_langfuse.py @@ -86,6 +86,14 @@ # ``_DEFERRED_CASES`` rather than at the fixture level so the # four other cases run. "037-langfuse-trace-input-output", + # 035/036 — proposal 0039 caller-invocation-id -> trace.id derivation + # (UUID hex dashes-stripped / sha256-first-16 for a non-UUID). 059 — + # proposal 0052 implementation-attribution rows on trace.metadata. + # Wired here (the Langfuse conformance home) by the fixture-harness + # catch-up; previously unit-only. + "035-caller-invocation-id-uuid", + "036-caller-invocation-id-non-uuid", + "059-implementation-attribution-langfuse", # 029 + 030 stay deferred in v0.11.0: # - 029 (fan-out per-instance): fixture omits ``collect_field`` # and ``target_field`` on the fan_out cfg, plus the inner @@ -757,6 +765,11 @@ async def _run_case(case: Mapping[str, Any]) -> None: caller_metadata = cast("dict[str, Any] | None", case.get("caller_metadata")) if caller_metadata is not None: invoke_kwargs["metadata"] = caller_metadata + # Fixtures 035/036: caller-supplied invocation_id drives the trace.id + # derivation (UUID hex dashes-stripped / sha256-first-16 for a non-UUID). + caller_invocation_id = cast("str | None", case.get("caller_invocation_id")) + if caller_invocation_id is not None: + invoke_kwargs["invocation_id"] = caller_invocation_id # Resume cases run a two-phase flow (first invoke catches expected # error → resume invoke completes), then assert against both traces @@ -1237,10 +1250,29 @@ def _assert_trace( *, expected_invariants: dict[str, Any], ) -> None: - _assert_string_or_placeholder("trace.id", trace.id, expected.get("id")) + expected_id = expected.get("id") + if expected_id is not None and not _is_placeholder(expected_id): + # Fixtures 035/036: a LITERAL trace.id is the DERIVED Langfuse id; the + # in-memory recorder keys by the raw invocation_id, so bridge via the + # impl's langfuse_trace_id (the derivation the real SDK adapter uses). + from openarmature.observability.langfuse import langfuse_trace_id # noqa: PLC0415 + + derived = langfuse_trace_id(trace.id) + assert derived == expected_id, ( + f"trace.id: derived {derived!r} (from raw {trace.id!r}) != {expected_id!r}" + ) + else: + _assert_string_or_placeholder("trace.id", trace.id, expected_id) if "name" in expected: _assert_string_or_placeholder("trace.name", trace.name, expected.get("name")) - expected_metadata = cast("dict[str, Any]", expected.get("metadata") or {}) + expected_metadata = dict(cast("dict[str, Any]", expected.get("metadata") or {})) + # Fixture 036 asserts the raw invocation_id as metadata.invocation_id. The + # real SDK derives trace.id and preserves the raw in metadata; the in-memory + # recorder keeps the raw AS trace.id, so recover it from there. + if "invocation_id" in expected_metadata and "invocation_id" not in trace.metadata: + assert trace.id == expected_metadata.pop("invocation_id"), ( + f"trace.metadata.invocation_id: raw trace.id {trace.id!r} != expected" + ) _assert_metadata_subset("trace.metadata", trace.metadata, expected_metadata) # Proposal 0043 (§8.4.1 trace.input/output sourcing). Fixtures that # opt in supply these as YAML maps; older fixtures leave them absent. @@ -1395,6 +1427,25 @@ def _is_placeholder(value: Any) -> bool: return isinstance(value, str) and value.startswith("<") and value.endswith(">") +_METADATA_MATCHER_SUBKEYS = frozenset({"harness_parameterized", "non_empty_string"}) + + +def _assert_metadata_matcher_subkeys(label: str, actual: Any, spec: dict[str, Any]) -> None: + """Fixture 059 attribution matcher sub-keys: ``non_empty_string`` (the value + is a non-empty string) and ``harness_parameterized`` (the value equals the + named harness-injected parameter, e.g. the implementation name).""" + if spec.get("non_empty_string") is True: + assert isinstance(actual, str) and actual != "", f"{label}: expected non-empty string, got {actual!r}" + if "harness_parameterized" in spec: + import openarmature # noqa: PLC0415 + + params = {"implementation_name": openarmature.__implementation_name__} + param_name = cast("str", spec["harness_parameterized"]) + assert actual == params.get(param_name), ( + f"{label}: expected harness param {param_name!r}={params.get(param_name)!r}, got {actual!r}" + ) + + def _assert_metadata_subset( label: str, actual: Mapping[str, Any], @@ -1411,6 +1462,16 @@ def _assert_metadata_subset( f"{label}.{key}: expected placeholder {expected_value!r} match, got {actual_value!r}" ) continue + if ( + isinstance(expected_value, dict) + and expected_value + and set(cast("dict[str, Any]", expected_value)).issubset(_METADATA_MATCHER_SUBKEYS) + ): + # Fixture 059 attribution: assertion sub-keys, not a nested mapping. + _assert_metadata_matcher_subkeys( + f"{label}.{key}", actual_value, cast("dict[str, Any]", expected_value) + ) + continue if isinstance(expected_value, dict) and isinstance(actual_value, dict): _assert_metadata_subset( f"{label}.{key}", From 20e73a2ceae847f7a671441b6a79a45967cbf7d8 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Wed, 24 Jun 2026 15:02:35 -0700 Subject: [PATCH 2/2] Fix side-effect-in-assert in 036 trace.id check CodeQL py/side-effect-in-assert flagged the expected_metadata.pop() inside the assert: under python -O the assertion (and its pop) is stripped, leaving the invocation_id key in expected_metadata for the later _assert_metadata_subset to wrongly check. Bind the pop to a local before the assert, and include the expected value in the failure message. --- tests/conformance/test_observability_langfuse.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/conformance/test_observability_langfuse.py b/tests/conformance/test_observability_langfuse.py index 1d2a067..7d5b829 100644 --- a/tests/conformance/test_observability_langfuse.py +++ b/tests/conformance/test_observability_langfuse.py @@ -1270,8 +1270,9 @@ def _assert_trace( # real SDK derives trace.id and preserves the raw in metadata; the in-memory # recorder keeps the raw AS trace.id, so recover it from there. if "invocation_id" in expected_metadata and "invocation_id" not in trace.metadata: - assert trace.id == expected_metadata.pop("invocation_id"), ( - f"trace.metadata.invocation_id: raw trace.id {trace.id!r} != expected" + expected_invocation_id = expected_metadata.pop("invocation_id") + assert trace.id == expected_invocation_id, ( + f"trace.metadata.invocation_id: raw trace.id {trace.id!r} != {expected_invocation_id!r}" ) _assert_metadata_subset("trace.metadata", trace.metadata, expected_metadata) # Proposal 0043 (§8.4.1 trace.input/output sourcing). Fixtures that