Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 250 additions & 23 deletions tests/conformance/test_observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,18 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
"068-llm-completion-event-response-model-distinct-from-request",
"071-llm-failure-event-call-id-distinct-from-completion-event",
"072-llm-failure-event-mutual-exclusion-with-completion-event",
# Fixture-harness catch-up tier 2a: trace-shape Langfuse fixtures
# driven through a LangfuseObserver + InMemoryLangfuseClient recorder.
# 022/031/032 assert the Trace + observation tree (proposal 0031/0035/
# 0061); 035/036 the caller-invocation-id -> trace.id derivation
# (proposal 0039); 059 the implementation-attribution trace metadata
# (proposal 0052). 023/024 (Langfuse Generation) are tier 2b.
"022-langfuse-basic-trace",
"031-langfuse-subgraph-span-hierarchy",
"032-langfuse-fan-out-per-instance-spans",
"035-caller-invocation-id-uuid",
"036-caller-invocation-id-non-uuid",
"059-implementation-attribution-langfuse",
# proposal 0052 attribution fixture (case 1) + proposal 0061
# (case 2: the §5.1 attribution lands on the detached trace's own
# openarmature.invocation span). Wired together now that 0061
Expand Down Expand Up @@ -286,17 +298,17 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
_UNIT_TESTED_FIXTURES: dict[str, str] = {
fixture_id: reason
for fixture_ids, reason in (
# Fixture-harness catch-up tier 2a wired the trace-shape Langfuse
# fixtures (022/031/032), the invocation-id fixtures (035/036), and the
# attribution fixture (059). 023/024 (Langfuse Generation) are tier 2b;
# 033 (detached multi-trace) is tier 4.
(
("022-langfuse-basic-trace", "023-langfuse-generation-rendering", "024-langfuse-prompt-linkage"),
"proposal 0031 Langfuse mapping; covered by test_observability_langfuse.py",
("023-langfuse-generation-rendering", "024-langfuse-prompt-linkage"),
"proposal 0031 Langfuse generation/prompt-linkage; covered by test_observability_langfuse.py",
),
(
(
"031-langfuse-subgraph-span-hierarchy",
"032-langfuse-fan-out-per-instance-spans",
"033-langfuse-detached-trace-mode",
),
"proposal 0035/0061 Langfuse span hierarchy; covered by test_observability_langfuse.py",
("033-langfuse-detached-trace-mode",),
"proposal 0035/0061 Langfuse detached-trace mode; covered by test_observability_langfuse.py",
),
(
(
Expand All @@ -310,10 +322,6 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
("030-caller-metadata-parallel-branches-per-branch",),
"proposal 0040 per-branch caller metadata; covered by test_observability_otel.py",
),
(
("035-caller-invocation-id-uuid", "036-caller-invocation-id-non-uuid"),
"proposal 0039 invocation_id derivation; covered by test_observability_langfuse_adapter.py",
),
(
("037-langfuse-trace-input-output",),
"proposal 0043 trace input/output; covered by test_observability_langfuse.py",
Expand All @@ -327,10 +335,6 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None:
),
"proposal 0048 get_invocation_metadata; covered by test_observability_metadata.py",
),
(
("059-implementation-attribution-langfuse",),
"proposal 0052 implementation attribution; covered by test_observability_langfuse.py",
),
# Fixture-harness catch-up tier 1 wired the rest of the 0057/0058
# family into _SUPPORTED_FIXTURES; these three stay here, each blocked
# on a spec-side fixture change that python picks up at the v0.16.0 pin
Expand Down Expand Up @@ -540,6 +544,18 @@ async def test_observability_fixture(fixture_path: Path) -> None:
await _run_fixture_058(spec)
elif fixture_id == "084-langfuse-session-user-promotion":
await _run_fixture_084(spec)
elif fixture_id in {
"022-langfuse-basic-trace",
"031-langfuse-subgraph-span-hierarchy",
"032-langfuse-fan-out-per-instance-spans",
"059-implementation-attribution-langfuse",
}:
await _run_langfuse_trace_fixture(spec)
elif fixture_id in {
"035-caller-invocation-id-uuid",
"036-caller-invocation-id-non-uuid",
}:
await _run_invocation_id_fixture(spec)
elif fixture_id in {
"012-otel-llm-payload-default-off",
"013-otel-llm-payload-enabled",
Expand Down Expand Up @@ -2609,6 +2625,202 @@ async def _run_fixture_084(spec: Mapping[str, Any]) -> None:
raise AssertionError(f"case {case_name!r}: {e}") from e


_LANGFUSE_MATCHER_SUBKEYS = frozenset({"harness_parameterized", "non_empty_string"})


def _langfuse_value_matches(
actual: Any,
expected: Any,
*,
bindings: dict[str, Any],
params: Mapping[str, Any],
) -> bool:
"""Match a Langfuse trace/observation value against a fixture expectation:
an inline placeholder token, the assertion sub-key dict, or plain equality.
"""
# The value-matcher idioms are the conformance-adapter §5.10 vocabulary.
if isinstance(expected, str) and expected.startswith("<") and expected.endswith(">"):
return _langfuse_placeholder_matches(actual, expected, bindings)
# A NON-empty mapping whose keys are all matcher sub-keys is an assertion
# dict; an empty dict (or a dict with other keys) is matched by equality.
if (
isinstance(expected, Mapping)
and expected
and set(cast("Mapping[str, Any]", expected)).issubset(_LANGFUSE_MATCHER_SUBKEYS)
):
return _langfuse_matcher_subkeys_match(actual, cast("Mapping[str, Any]", expected), params)
return bool(actual == expected)


def _langfuse_placeholder_matches(actual: Any, token: str, bindings: dict[str, Any]) -> bool:
"""Inline placeholder tokens: ``<any-string>`` (non-empty), ``<uuid-hex>``
(32-hex dashes-stripped), and first-occurrence binding tokens like
``<corr_id_1>`` (bind on first sighting, assert equality after -- the
correlation-id-consistency check). The §5.10 ``<uuid>`` (canonical) token
is added when a wired fixture first needs it.
"""
if token == "<any-string>":
return isinstance(actual, str) and actual != ""
if token == "<uuid-hex>":
return isinstance(actual, str) and re.fullmatch(r"[0-9a-f]{32}", actual) is not None
if token in bindings:
return actual == bindings[token]
if actual is None:
return False
bindings[token] = actual
return True


def _langfuse_matcher_subkeys_match(actual: Any, spec: Mapping[str, Any], params: Mapping[str, Any]) -> bool:
"""Assertion sub-keys (059): ``non_empty_string`` and ``harness_parameterized``
(value equals the named harness-injected parameter)."""
if spec.get("non_empty_string") is True and not (isinstance(actual, str) and actual != ""):
return False
if "harness_parameterized" in spec:
param_name = cast("str", spec["harness_parameterized"])
if actual != params.get(param_name):
return False
return True


def _assert_langfuse_trace_shape(
trace: Any,
expected: Mapping[str, Any],
*,
bindings: dict[str, Any],
params: Mapping[str, Any],
) -> None:
"""Assert a Langfuse Trace's id / name / metadata / observation tree against
the fixture's ``expected.langfuse_trace`` block. Each clause is asserted only
when present (059 asserts metadata only; 022/031/032 assert all four).
"""
if "id" in expected:
# python's in-memory LangfuseTrace.id is the RAW invocation_id (the
# §8.4.1 verbatim OA-side id); the fixture asserts the DERIVED Langfuse
# trace id (uuid-hex / sha256[:16]). Bridge via langfuse_trace_id, the
# impl's own derivation rule (trace_id.py).
from openarmature.observability.langfuse import langfuse_trace_id

derived_id = langfuse_trace_id(trace.id)
assert _langfuse_value_matches(derived_id, expected["id"], bindings=bindings, params=params), (
f"derived trace.id {derived_id!r} (from raw {trace.id!r}) did not match {expected['id']!r}"
)
if "name" in expected:
assert _langfuse_value_matches(trace.name, expected["name"], bindings=bindings, params=params), (
f"trace.name {trace.name!r} did not match {expected['name']!r}"
)
for key, val in cast("dict[str, Any]", expected.get("metadata") or {}).items():
assert _langfuse_value_matches(trace.metadata.get(key), val, bindings=bindings, params=params), (
f"trace.metadata.{key} {trace.metadata.get(key)!r} did not match {val!r}"
)
observations = cast("list[dict[str, Any]] | None", expected.get("observations"))
if observations is not None:
_assert_langfuse_observation_tree(trace, observations, bindings=bindings, params=params)


async def _run_langfuse_trace_fixture(spec: Mapping[str, Any]) -> None:
"""Driver for the trace-shape Langfuse fixtures: 022/031/032 (single-dict)
and 059 (cases). Each builds a graph via the adapter, records into an
InMemoryLangfuseClient, and asserts the Trace + observation tree.
"""
if "cases" in spec:
for case in cast("list[dict[str, Any]]", spec["cases"]):
case_name = cast("str", case["name"])
try:
await _run_langfuse_trace_case(case)
except AssertionError as e:
raise AssertionError(f"case {case_name!r}: {e}") from e
else:
await _run_langfuse_trace_case(spec)


async def _run_langfuse_trace_case(case: Mapping[str, Any]) -> None:
import openarmature
from openarmature.observability.langfuse import InMemoryLangfuseClient, LangfuseObserver

_patch_unsupported_directives(case)
client = InMemoryLangfuseClient()
lf_kwargs: dict[str, Any] = {"client": client}
cfg = cast("dict[str, Any]", case.get("langfuse_observer_config") or case.get("langfuse_observer") or {})
if "disable_state_payload" in cfg:
lf_kwargs["disable_state_payload"] = bool(cfg["disable_state_payload"])
if "disable_provider_payload" in cfg:
lf_kwargs["disable_provider_payload"] = bool(cfg["disable_provider_payload"])
observer = LangfuseObserver(**lf_kwargs)

subgraphs = _compile_subgraphs(case)
built = build_graph(case, subgraphs=dict(subgraphs), trace=[])
compiled = built.builder.compile()
compiled.attach_observer(observer)
initial_state = built.initial_state(case.get("initial_state", {}))
try:
await compiled.invoke(initial_state)
await compiled.drain()
finally:
observer.shutdown()

assert len(client.traces) == 1, f"expected 1 Langfuse trace; got {len(client.traces)}"
trace = next(iter(client.traces.values()))
bindings: dict[str, Any] = {}
params = {"implementation_name": openarmature.__implementation_name__}
expected = cast("dict[str, Any]", case["expected"]["langfuse_trace"])
_assert_langfuse_trace_shape(trace, expected, bindings=bindings, params=params)


async def _run_invocation_id_fixture(spec: Mapping[str, Any]) -> None:
"""Driver for the caller-invocation-id fixtures (035/036). Builds a simple
calls_llm graph, invokes with ``invocation_id=caller_invocation_id``, and
asserts the Langfuse ``trace.id`` equals the fixture's pinned derivation
(python derives it; the harness checks the result) plus 036's raw id in
``trace.metadata``.
"""
for case in cast("list[dict[str, Any]]", spec["cases"]):
case_name = cast("str", case["name"])
try:
await _run_invocation_id_case(case)
except AssertionError as e:
raise AssertionError(f"case {case_name!r}: {e}") from e


async def _run_invocation_id_case(case: Mapping[str, Any]) -> None:
from openarmature.observability.langfuse import (
InMemoryLangfuseClient,
LangfuseObserver,
langfuse_trace_id,
)

graph, state_cls, provider = _build_simple_llm_graph(case, populate_caller_metadata=False)
client = InMemoryLangfuseClient()
observer = LangfuseObserver(client=client)
graph.attach_observer(observer)
state = _make_state_instance(case, state_cls)
caller_id = cast("str", case["caller_invocation_id"])
try:
await graph.invoke(state, invocation_id=caller_id)
await graph.drain()
finally:
observer.shutdown()
await provider.aclose()

Comment thread
chris-colinsky marked this conversation as resolved.
assert len(client.traces) == 1, f"expected 1 Langfuse trace; got {len(client.traces)}"
trace = next(iter(client.traces.values()))
expected_trace = cast("dict[str, Any]", case["expected"]["langfuse_trace"])
# The fixture's trace.id is the DERIVED Langfuse id; the in-memory recorder
# keys by the raw invocation_id. Bridge via the impl's langfuse_trace_id.
derived_id = langfuse_trace_id(trace.id)
assert derived_id == expected_trace["id"], (
f"derived trace.id {derived_id!r} (from raw {trace.id!r}) != {expected_trace['id']!r}"
)
for key, val in cast("dict[str, Any]", expected_trace.get("metadata") or {}).items():
actual = trace.metadata.get(key)
# The real SDK derives trace.id and preserves the raw invocation_id in
# metadata for reverse lookup; the in-memory recorder instead keeps the
# raw id AS trace.id. Recover it from there when metadata omits it (036).
if actual is None and key == "invocation_id":
actual = trace.id
assert actual == val, f"trace.metadata.{key} {actual!r} != {val!r}"


# ---------------------------------------------------------------------------
# Fixture 010 — log correlation
#
Expand Down Expand Up @@ -3544,15 +3756,23 @@ async def _update_body(_s: Any, _payload: dict[str, Any] = update_block) -> dict


def _assert_langfuse_observation_tree(
trace: Any, expected: list[dict[str, Any]], parent_id: str | None = None
trace: Any,
expected: list[dict[str, Any]],
parent_id: str | None = None,
*,
bindings: dict[str, Any] | None = None,
params: Mapping[str, Any] | None = None,
) -> None:
"""Recursively match expected observations against the trace's flat
observation list (linked by parent_observation_id). type + name are
matched exactly; level / input / output exactly when present;
metadata is subset-matched."""
matched exactly; level / input / output exactly when present; metadata is
subset-matched. When ``bindings``/``params`` are supplied, metadata values
go through the value-matcher (placeholder tokens + sub-key matchers);
otherwise they are compared exactly (the tool-fixture path)."""
# Mutable copy: each matched observation is consumed so two
# same-shape expected siblings can't both bind to one actual.
remaining = list(trace.children_of(parent_id))
use_matcher = bindings is not None and params is not None
for exp in expected:
exp_type = cast("str", exp["type"])
exp_name = cast("str | None", exp.get("name"))
Expand All @@ -3574,12 +3794,19 @@ def _assert_langfuse_observation_tree(
f"{exp_name!r}: output {match.output!r} != {exp['output']!r}"
)
for key, val in cast("dict[str, Any]", exp.get("metadata") or {}).items():
assert match.metadata.get(key) == val, (
f"{exp_name!r}: metadata.{key} {match.metadata.get(key)!r} != {val!r}"
)
if use_matcher:
assert _langfuse_value_matches(
match.metadata.get(key), val, bindings=bindings or {}, params=params or {}
), f"{exp_name!r}: metadata.{key} {match.metadata.get(key)!r} did not match {val!r}"
else:
assert match.metadata.get(key) == val, (
f"{exp_name!r}: metadata.{key} {match.metadata.get(key)!r} != {val!r}"
)
children = cast("list[dict[str, Any]] | None", exp.get("children"))
if children:
_assert_langfuse_observation_tree(trace, children, parent_id=match.id)
_assert_langfuse_observation_tree(
trace, children, parent_id=match.id, bindings=bindings, params=params
)


async def _run_tool_fixture(spec: Mapping[str, Any]) -> None:
Expand Down