Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 174 additions & 9 deletions tests/conformance/test_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
"035-caller-invocation-id-uuid",
"036-caller-invocation-id-non-uuid",
"059-implementation-attribution-langfuse",
# Tier 2b: Langfuse Generation observation (proposal 0031 §8.4.3/§8.4.4)
# -- model / modelParameters / usage / input-output payload (with
# truncation) and prompt-entity linkage.
"023-langfuse-generation-rendering",
"024-langfuse-prompt-linkage",
# proposal 0052 attribution fixture (case 1) + proposal 0061
# (case 2: the §5.1 attribution lands on the detached trace's own
# openarmature.invocation span). Wired together now that 0061
Expand Down Expand Up @@ -298,14 +303,10 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
_UNIT_TESTED_FIXTURES: dict[str, str] = {
fixture_id: reason
for fixture_ids, reason in (
# Fixture-harness catch-up tier 2a wired the trace-shape Langfuse
# fixtures (022/031/032), the invocation-id fixtures (035/036), and the
# attribution fixture (059). 023/024 (Langfuse Generation) are tier 2b;
# 033 (detached multi-trace) is tier 4.
(
("023-langfuse-generation-rendering", "024-langfuse-prompt-linkage"),
"proposal 0031 Langfuse generation/prompt-linkage; covered by test_observability_langfuse.py",
),
# Fixture-harness catch-up tier 2 wired the trace-shape Langfuse
# fixtures (022/031/032), invocation-id (035/036), attribution (059) in
# 2a, and the Langfuse Generation fixtures (023/024) in 2b. 033 (detached
# multi-trace) is tier 4.
(
("033-langfuse-detached-trace-mode",),
"proposal 0035/0061 Langfuse detached-trace mode; covered by test_observability_langfuse.py",
Expand Down Expand Up @@ -556,6 +557,11 @@ async def test_observability_fixture(fixture_path: Path) -> None:
"036-caller-invocation-id-non-uuid",
}:
await _run_invocation_id_fixture(spec)
elif fixture_id in {
"023-langfuse-generation-rendering",
"024-langfuse-prompt-linkage",
}:
await _run_langfuse_generation_fixture(spec)
elif fixture_id in {
"012-otel-llm-payload-default-off",
"013-otel-llm-payload-enabled",
Expand Down Expand Up @@ -2649,6 +2655,19 @@ def _langfuse_value_matches(
and set(cast("Mapping[str, Any]", expected)).issubset(_LANGFUSE_MATCHER_SUBKEYS)
):
return _langfuse_matcher_subkeys_match(actual, cast("Mapping[str, Any]", expected), params)
# A regular NON-empty nested mapping (e.g. 024 metadata.prompt): recurse per
# key so inner tokens (rendered_hash: <any-string>) still apply. Subset over
# keys -- every expected key must be present and match; actual MAY carry
# extras. An empty expected dict falls through to exact equality below
# (rather than vacuously matching any mapping).
Comment on lines +2658 to +2662
if isinstance(expected, Mapping) and expected:
if not isinstance(actual, Mapping):
return False
actual_map = cast("Mapping[str, Any]", actual)
return all(
k in actual_map and _langfuse_value_matches(actual_map[k], v, bindings=bindings, params=params)
for k, v in cast("Mapping[str, Any]", expected).items()
)
return bool(actual == expected)


Expand Down Expand Up @@ -2820,6 +2839,60 @@ async def _run_invocation_id_case(case: Mapping[str, Any]) -> None:
actual = trace.id
assert actual == val, f"trace.metadata.{key} {actual!r} != {val!r}"

# The fixture's top-level verbatim invocation_id clause (the §5.1
# caller_invocation_id_verbatim_on_attribute invariant): on the OTel side it
# is the openarmature.invocation_id span attribute; in the Langfuse runner
# the verbatim id surfaces as the in-memory recorder's raw trace.id.
expected_invocation_id = cast("dict[str, Any]", case["expected"]).get("invocation_id")
if expected_invocation_id is not None:
assert trace.id == expected_invocation_id, (
f"verbatim invocation_id: raw trace.id {trace.id!r} != {expected_invocation_id!r}"
)


async def _run_langfuse_generation_fixture(spec: Mapping[str, Any]) -> None:
"""Driver for the Langfuse Generation fixtures (023 generation rendering +
truncation, 024 prompt linkage). Builds a calls_llm graph, records into an
InMemoryLangfuseClient under the fixture's observer config, and asserts the
Generation observation nested under the node span.
"""
for case in cast("list[dict[str, Any]]", spec["cases"]):
case_name = cast("str", case["name"])
try:
await _run_langfuse_generation_case(case)
except AssertionError as e:
raise AssertionError(f"case {case_name!r}: {e}") from e


async def _run_langfuse_generation_case(case: Mapping[str, Any]) -> None:
import openarmature
from openarmature.observability.langfuse import InMemoryLangfuseClient, LangfuseObserver

graph, state_cls, provider = _build_simple_llm_graph(case, populate_caller_metadata=False)
client = InMemoryLangfuseClient()
cfg = cast("dict[str, Any]", case.get("langfuse_observer") or {})
lf_kwargs: dict[str, Any] = {"client": client}
Comment on lines +2871 to +2874
if "disable_provider_payload" in cfg:
lf_kwargs["disable_provider_payload"] = bool(cfg["disable_provider_payload"])
if "payload_byte_cap" in cfg:
lf_kwargs["payload_byte_cap"] = int(cfg["payload_byte_cap"])
observer = LangfuseObserver(**lf_kwargs)
graph.attach_observer(observer)
state = _make_state_instance(case, state_cls)
try:
await graph.invoke(state)
await graph.drain()
finally:
observer.shutdown()
await provider.aclose()

assert len(client.traces) == 1, f"expected 1 Langfuse trace; got {len(client.traces)}"
trace = next(iter(client.traces.values()))
bindings: dict[str, Any] = {}
params = {"implementation_name": openarmature.__implementation_name__}
expected = cast("dict[str, Any]", case["expected"]["langfuse_trace"])
_assert_langfuse_trace_shape(trace, expected, bindings=bindings, params=params)


# ---------------------------------------------------------------------------
# Fixture 010 — log correlation
Expand Down Expand Up @@ -3755,6 +3828,72 @@ async def _update_body(_s: Any, _payload: dict[str, Any] = update_block) -> dict
return builder.compile(), state_cls, providers


def _assert_langfuse_generation_fields(
exp_name: str | None,
match: Any,
exp: Mapping[str, Any],
*,
bindings: dict[str, Any],
params: Mapping[str, Any],
) -> None:
"""Generation-observation fields beyond the base span shape (023/024):
model / modelParameters / usage, the input parse-or-truncation shapes, and
the prompt-entity link. Each is asserted only when present, so it is inert
for span / tool observations. The placeholder-capable fields go through the
value-matcher (consistent with metadata); usage is a typed integer record.
"""
if "model" in exp:
assert _langfuse_value_matches(match.model, exp["model"], bindings=bindings, params=params), (
f"{exp_name!r}: model {match.model!r} did not match {exp['model']!r}"
)
if "modelParameters" in exp:
assert _langfuse_value_matches(
match.model_parameters, exp["modelParameters"], bindings=bindings, params=params
), f"{exp_name!r}: modelParameters {match.model_parameters!r} != {exp['modelParameters']!r}"
if "usage" in exp:
u = cast("dict[str, Any]", exp["usage"])
got = None if match.usage is None else (match.usage.input, match.usage.output, match.usage.total)
assert got == (u["input"], u["output"], u["total"]), f"{exp_name!r}: usage {got!r} != {u!r}"
if "prompt_entity_link" in exp:
assert _langfuse_value_matches(
match.prompt_entity_link, exp["prompt_entity_link"], bindings=bindings, params=params
), (
f"{exp_name!r}: prompt_entity_link {match.prompt_entity_link!r} "
f"did not match {exp['prompt_entity_link']!r}"
)
if exp.get("prompt_entity_link_absent") is True:
assert match.prompt_entity_link is None, (
f"{exp_name!r}: expected no prompt_entity_link; got {match.prompt_entity_link!r}"
)
if "input_parses_as_messages" in exp:
# Under-cap input is the native message list (§8.7); compare directly.
assert match.input == exp["input_parses_as_messages"], (
f"{exp_name!r}: input {match.input!r} did not parse as {exp['input_parses_as_messages']!r}"
)
if exp.get("input_is_raw_string_with_marker") is True:
# Over-cap input falls through to the raw truncated string + §5.5.5 marker.
assert isinstance(match.input, str) and re.search(r"\[truncated, \d+ bytes total\]", match.input), (
f"{exp_name!r}: expected a raw truncated string with the marker; got {match.input!r}"
)


def _obs_selection_matches(obs: Any, exp_metadata: Mapping[str, Any]) -> bool:
"""Read-only disambiguator for same-(type, name) sibling observations: an
actual is a candidate when its scalar expected-metadata values match.

Only scalars (str / int / float / bool) are used: placeholder tokens are
shared across siblings (correlation_id) so they don't disambiguate, and
running the value-matcher here would fire its binding side effects during
selection; sequences (namespace) are left to the value-matcher's list/tuple
handling. fan_out_index / step are the fields that actually distinguish.
"""
for key, val in exp_metadata.items():
is_placeholder = isinstance(val, str) and val.startswith("<") and val.endswith(">")
if isinstance(val, (str, int, float)) and not is_placeholder and obs.metadata.get(key) != val:
return False
return True


def _assert_langfuse_observation_tree(
trace: Any,
expected: list[dict[str, Any]],
Expand All @@ -3776,8 +3915,18 @@ def _assert_langfuse_observation_tree(
for exp in expected:
exp_type = cast("str", exp["type"])
exp_name = cast("str | None", exp.get("name"))
# Disambiguate same-(type, name) siblings (e.g. 032's per-instance
# "process" spans) by their scalar metadata, not list/emission order, so
# the assertions can't bind the wrong sibling if emission order shifts.
exp_meta = cast("dict[str, Any]", exp.get("metadata") or {})
match = next(
(o for o in remaining if o.type == exp_type and (exp_name is None or o.name == exp_name)),
(
o
for o in remaining
if o.type == exp_type
and (exp_name is None or o.name == exp_name)
and _obs_selection_matches(o, exp_meta)
),
None,
)
assert match is not None, (
Expand All @@ -3802,6 +3951,7 @@ def _assert_langfuse_observation_tree(
assert match.metadata.get(key) == val, (
f"{exp_name!r}: metadata.{key} {match.metadata.get(key)!r} != {val!r}"
)
_assert_langfuse_generation_fields(exp_name, match, exp, bindings=bindings or {}, params=params or {})
children = cast("list[dict[str, Any]] | None", exp.get("children"))
if children:
_assert_langfuse_observation_tree(
Expand Down Expand Up @@ -4146,6 +4296,13 @@ def _materialize_typed_messages(messages_spec: Sequence[Mapping[str, Any]]) -> l
for m in messages_spec:
role = m.get("role")
content = m.get("content")
# content_repeat synthesis (023 case 2 / fixture 014, mirroring the OTel
# _materialize_messages helper): N repetitions of a single char to drive
# payload truncation. The fixtures use a single-byte ASCII char, so the
# char count equals the byte count.
cr = cast("Mapping[str, Any] | None", m.get("content_repeat"))
if cr is not None:
content = cast("str", cr["char"]) * int(cr["bytes"])
if role == "system":
out.append(SystemMessage(content=_require_text_content(role, content)))
elif role == "user":
Expand Down Expand Up @@ -4179,6 +4336,13 @@ def _render_prompt_result(case: Mapping[str, Any], prompt_name: str) -> Any:
rendered = rendered.replace("{{" + key + "}}", str(value)).replace("{{ " + key + " }}", str(value))
messages: list[Message] = [UserMessage(content=rendered)]
now = datetime.now(UTC)
# A backend that exposes a Langfuse Prompt reference (024 case 1,
# mock_with_langfuse_reference) surfaces it as the langfuse_prompt
# observability entity; the observer reads it to link the Generation.
observability_entities: dict[str, Any] | None = None
reference = entry.get("langfuse_prompt_reference")
if reference is not None:
observability_entities = {"langfuse_prompt": reference}
return PromptResult(
name=cast("str", entry["name"]),
version=cast("str", entry["version"]),
Expand All @@ -4189,6 +4353,7 @@ def _render_prompt_result(case: Mapping[str, Any], prompt_name: str) -> Any:
variables=variables,
fetched_at=now,
rendered_at=now,
observability_entities=observability_entities,
)


Expand Down