diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index 77a4db4..bda2627 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -182,6 +182,11 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: "035-caller-invocation-id-uuid", "036-caller-invocation-id-non-uuid", "059-implementation-attribution-langfuse", + # Tier 2b: Langfuse Generation observation (proposal 0031 §8.4.3/§8.4.4) + # -- model / modelParameters / usage / input-output payload (with + # truncation) and prompt-entity linkage. + "023-langfuse-generation-rendering", + "024-langfuse-prompt-linkage", # proposal 0052 attribution fixture (case 1) + proposal 0061 # (case 2: the §5.1 attribution lands on the detached trace's own # openarmature.invocation span). Wired together now that 0061 @@ -298,14 +303,10 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: _UNIT_TESTED_FIXTURES: dict[str, str] = { fixture_id: reason for fixture_ids, reason in ( - # Fixture-harness catch-up tier 2a wired the trace-shape Langfuse - # fixtures (022/031/032), the invocation-id fixtures (035/036), and the - # attribution fixture (059). 023/024 (Langfuse Generation) are tier 2b; - # 033 (detached multi-trace) is tier 4. - ( - ("023-langfuse-generation-rendering", "024-langfuse-prompt-linkage"), - "proposal 0031 Langfuse generation/prompt-linkage; covered by test_observability_langfuse.py", - ), + # Fixture-harness catch-up tier 2 wired the trace-shape Langfuse + # fixtures (022/031/032), invocation-id (035/036), attribution (059) in + # 2a, and the Langfuse Generation fixtures (023/024) in 2b. 033 (detached + # multi-trace) is tier 4. ( ("033-langfuse-detached-trace-mode",), "proposal 0035/0061 Langfuse detached-trace mode; covered by test_observability_langfuse.py", @@ -556,6 +557,11 @@ async def test_observability_fixture(fixture_path: Path) -> None: "036-caller-invocation-id-non-uuid", }: await _run_invocation_id_fixture(spec) + elif fixture_id in { + "023-langfuse-generation-rendering", + "024-langfuse-prompt-linkage", + }: + await _run_langfuse_generation_fixture(spec) elif fixture_id in { "012-otel-llm-payload-default-off", "013-otel-llm-payload-enabled", @@ -2649,6 +2655,19 @@ def _langfuse_value_matches( and set(cast("Mapping[str, Any]", expected)).issubset(_LANGFUSE_MATCHER_SUBKEYS) ): return _langfuse_matcher_subkeys_match(actual, cast("Mapping[str, Any]", expected), params) + # A regular NON-empty nested mapping (e.g. 024 metadata.prompt): recurse per + # key so inner tokens (rendered_hash: ) still apply. Subset over + # keys -- every expected key must be present and match; actual MAY carry + # extras. An empty expected dict falls through to exact equality below + # (rather than vacuously matching any mapping). + if isinstance(expected, Mapping) and expected: + if not isinstance(actual, Mapping): + return False + actual_map = cast("Mapping[str, Any]", actual) + return all( + k in actual_map and _langfuse_value_matches(actual_map[k], v, bindings=bindings, params=params) + for k, v in cast("Mapping[str, Any]", expected).items() + ) return bool(actual == expected) @@ -2820,6 +2839,60 @@ async def _run_invocation_id_case(case: Mapping[str, Any]) -> None: actual = trace.id assert actual == val, f"trace.metadata.{key} {actual!r} != {val!r}" + # The fixture's top-level verbatim invocation_id clause (the §5.1 + # caller_invocation_id_verbatim_on_attribute invariant): on the OTel side it + # is the openarmature.invocation_id span attribute; in the Langfuse runner + # the verbatim id surfaces as the in-memory recorder's raw trace.id. + expected_invocation_id = cast("dict[str, Any]", case["expected"]).get("invocation_id") + if expected_invocation_id is not None: + assert trace.id == expected_invocation_id, ( + f"verbatim invocation_id: raw trace.id {trace.id!r} != {expected_invocation_id!r}" + ) + + +async def _run_langfuse_generation_fixture(spec: Mapping[str, Any]) -> None: + """Driver for the Langfuse Generation fixtures (023 generation rendering + + truncation, 024 prompt linkage). Builds a calls_llm graph, records into an + InMemoryLangfuseClient under the fixture's observer config, and asserts the + Generation observation nested under the node span. + """ + for case in cast("list[dict[str, Any]]", spec["cases"]): + case_name = cast("str", case["name"]) + try: + await _run_langfuse_generation_case(case) + except AssertionError as e: + raise AssertionError(f"case {case_name!r}: {e}") from e + + +async def _run_langfuse_generation_case(case: Mapping[str, Any]) -> None: + import openarmature + from openarmature.observability.langfuse import InMemoryLangfuseClient, LangfuseObserver + + graph, state_cls, provider = _build_simple_llm_graph(case, populate_caller_metadata=False) + client = InMemoryLangfuseClient() + cfg = cast("dict[str, Any]", case.get("langfuse_observer") or {}) + lf_kwargs: dict[str, Any] = {"client": client} + if "disable_provider_payload" in cfg: + lf_kwargs["disable_provider_payload"] = bool(cfg["disable_provider_payload"]) + if "payload_byte_cap" in cfg: + lf_kwargs["payload_byte_cap"] = int(cfg["payload_byte_cap"]) + observer = LangfuseObserver(**lf_kwargs) + graph.attach_observer(observer) + state = _make_state_instance(case, state_cls) + try: + await graph.invoke(state) + await graph.drain() + finally: + observer.shutdown() + await provider.aclose() + + assert len(client.traces) == 1, f"expected 1 Langfuse trace; got {len(client.traces)}" + trace = next(iter(client.traces.values())) + bindings: dict[str, Any] = {} + params = {"implementation_name": openarmature.__implementation_name__} + expected = cast("dict[str, Any]", case["expected"]["langfuse_trace"]) + _assert_langfuse_trace_shape(trace, expected, bindings=bindings, params=params) + # --------------------------------------------------------------------------- # Fixture 010 — log correlation @@ -3755,6 +3828,72 @@ async def _update_body(_s: Any, _payload: dict[str, Any] = update_block) -> dict return builder.compile(), state_cls, providers +def _assert_langfuse_generation_fields( + exp_name: str | None, + match: Any, + exp: Mapping[str, Any], + *, + bindings: dict[str, Any], + params: Mapping[str, Any], +) -> None: + """Generation-observation fields beyond the base span shape (023/024): + model / modelParameters / usage, the input parse-or-truncation shapes, and + the prompt-entity link. Each is asserted only when present, so it is inert + for span / tool observations. The placeholder-capable fields go through the + value-matcher (consistent with metadata); usage is a typed integer record. + """ + if "model" in exp: + assert _langfuse_value_matches(match.model, exp["model"], bindings=bindings, params=params), ( + f"{exp_name!r}: model {match.model!r} did not match {exp['model']!r}" + ) + if "modelParameters" in exp: + assert _langfuse_value_matches( + match.model_parameters, exp["modelParameters"], bindings=bindings, params=params + ), f"{exp_name!r}: modelParameters {match.model_parameters!r} != {exp['modelParameters']!r}" + if "usage" in exp: + u = cast("dict[str, Any]", exp["usage"]) + got = None if match.usage is None else (match.usage.input, match.usage.output, match.usage.total) + assert got == (u["input"], u["output"], u["total"]), f"{exp_name!r}: usage {got!r} != {u!r}" + if "prompt_entity_link" in exp: + assert _langfuse_value_matches( + match.prompt_entity_link, exp["prompt_entity_link"], bindings=bindings, params=params + ), ( + f"{exp_name!r}: prompt_entity_link {match.prompt_entity_link!r} " + f"did not match {exp['prompt_entity_link']!r}" + ) + if exp.get("prompt_entity_link_absent") is True: + assert match.prompt_entity_link is None, ( + f"{exp_name!r}: expected no prompt_entity_link; got {match.prompt_entity_link!r}" + ) + if "input_parses_as_messages" in exp: + # Under-cap input is the native message list (§8.7); compare directly. + assert match.input == exp["input_parses_as_messages"], ( + f"{exp_name!r}: input {match.input!r} did not parse as {exp['input_parses_as_messages']!r}" + ) + if exp.get("input_is_raw_string_with_marker") is True: + # Over-cap input falls through to the raw truncated string + §5.5.5 marker. + assert isinstance(match.input, str) and re.search(r"\[truncated, \d+ bytes total\]", match.input), ( + f"{exp_name!r}: expected a raw truncated string with the marker; got {match.input!r}" + ) + + +def _obs_selection_matches(obs: Any, exp_metadata: Mapping[str, Any]) -> bool: + """Read-only disambiguator for same-(type, name) sibling observations: an + actual is a candidate when its scalar expected-metadata values match. + + Only scalars (str / int / float / bool) are used: placeholder tokens are + shared across siblings (correlation_id) so they don't disambiguate, and + running the value-matcher here would fire its binding side effects during + selection; sequences (namespace) are left to the value-matcher's list/tuple + handling. fan_out_index / step are the fields that actually distinguish. + """ + for key, val in exp_metadata.items(): + is_placeholder = isinstance(val, str) and val.startswith("<") and val.endswith(">") + if isinstance(val, (str, int, float)) and not is_placeholder and obs.metadata.get(key) != val: + return False + return True + + def _assert_langfuse_observation_tree( trace: Any, expected: list[dict[str, Any]], @@ -3776,8 +3915,18 @@ def _assert_langfuse_observation_tree( for exp in expected: exp_type = cast("str", exp["type"]) exp_name = cast("str | None", exp.get("name")) + # Disambiguate same-(type, name) siblings (e.g. 032's per-instance + # "process" spans) by their scalar metadata, not list/emission order, so + # the assertions can't bind the wrong sibling if emission order shifts. + exp_meta = cast("dict[str, Any]", exp.get("metadata") or {}) match = next( - (o for o in remaining if o.type == exp_type and (exp_name is None or o.name == exp_name)), + ( + o + for o in remaining + if o.type == exp_type + and (exp_name is None or o.name == exp_name) + and _obs_selection_matches(o, exp_meta) + ), None, ) assert match is not None, ( @@ -3802,6 +3951,7 @@ def _assert_langfuse_observation_tree( assert match.metadata.get(key) == val, ( f"{exp_name!r}: metadata.{key} {match.metadata.get(key)!r} != {val!r}" ) + _assert_langfuse_generation_fields(exp_name, match, exp, bindings=bindings or {}, params=params or {}) children = cast("list[dict[str, Any]] | None", exp.get("children")) if children: _assert_langfuse_observation_tree( @@ -4146,6 +4296,13 @@ def _materialize_typed_messages(messages_spec: Sequence[Mapping[str, Any]]) -> l for m in messages_spec: role = m.get("role") content = m.get("content") + # content_repeat synthesis (023 case 2 / fixture 014, mirroring the OTel + # _materialize_messages helper): N repetitions of a single char to drive + # payload truncation. The fixtures use a single-byte ASCII char, so the + # char count equals the byte count. + cr = cast("Mapping[str, Any] | None", m.get("content_repeat")) + if cr is not None: + content = cast("str", cr["char"]) * int(cr["bytes"]) if role == "system": out.append(SystemMessage(content=_require_text_content(role, content))) elif role == "user": @@ -4179,6 +4336,13 @@ def _render_prompt_result(case: Mapping[str, Any], prompt_name: str) -> Any: rendered = rendered.replace("{{" + key + "}}", str(value)).replace("{{ " + key + " }}", str(value)) messages: list[Message] = [UserMessage(content=rendered)] now = datetime.now(UTC) + # A backend that exposes a Langfuse Prompt reference (024 case 1, + # mock_with_langfuse_reference) surfaces it as the langfuse_prompt + # observability entity; the observer reads it to link the Generation. + observability_entities: dict[str, Any] | None = None + reference = entry.get("langfuse_prompt_reference") + if reference is not None: + observability_entities = {"langfuse_prompt": reference} return PromptResult( name=cast("str", entry["name"]), version=cast("str", entry["version"]), @@ -4189,6 +4353,7 @@ def _render_prompt_result(case: Mapping[str, Any], prompt_name: str) -> Any: variables=variables, fetched_at=now, rendered_at=now, + observability_entities=observability_entities, )