From aee70b6630474add503ab1982e2b9cd6a354ff08 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Tue, 23 Jun 2026 18:46:01 -0700 Subject: [PATCH 1/2] Wire tier-1 typed-event conformance fixtures Move 10 LlmCompletionEvent/LlmFailedEvent fixtures (060-065, 067, 068, 071, 072) from _UNIT_TESTED_FIXTURES into _SUPPORTED_FIXTURES so the conformance harness runs the spec's own YAML fixtures rather than python's hand-written equivalents. First tier of the fixture-harness catch-up; strengthens the cross-impl conformance signal with no library change. Four of the family stay unit-tested, each blocked on a spec-side fixture change to be picked up at the next pin bump: 066 (single-member prompt group, corrected upstream), 069 (asserts an undeclared request model), 070 (missing tool_call_id is non-constructible here, validated at construction not the call boundary), 073 (asserts the vendor error.type where python surfaces the exception class name). --- tests/conformance/test_observability.py | 613 ++++++++++++++++++++---- 1 file changed, 520 insertions(+), 93 deletions(-) diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index cf44a9b..007e938 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -147,6 +147,27 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: "054-llm-completion-event-fan-out-index-population", "055-llm-completion-event-branch-name-population", "056-llm-completion-event-strict-serial-ordering", + # proposal 0057 LlmCompletionEvent field population (060-068) + + # proposal 0058 LlmFailedEvent (069-073). Driven through the typed- + # event-collector runner (the same machinery as 050-056) plus a + # multi-node-chain variant for 067/071. Fixture-harness catch-up + # tier 1. Four of the family stay unit-tested for now (see + # _UNIT_TESTED_FIXTURES): 066 (corrected >=2-member group ships at spec + # v0.74.1, picked up at the v0.16.0 pin), 069 (asserts a request model + # the fixture doesn't declare), 070 (missing-tool_call_id message is + # non-constructible in python -- enforced at construction, not the + # complete() boundary), 073 (fixture asserts the vendor body error.type + # verbatim, but python's error_type is the OA exception class name). + "060-llm-completion-event-input-messages-populated", + "061-llm-completion-event-output-content-populated", + "062-llm-completion-event-request-params-populated", + "063-llm-completion-event-request-extras-populated", + "064-llm-completion-event-active-prompt-populated", + "065-llm-completion-event-active-prompt-null", + "067-llm-completion-event-call-id-always-present-and-distinct", + "068-llm-completion-event-response-model-distinct-from-request", + "071-llm-failure-event-call-id-distinct-from-completion-event", + "072-llm-failure-event-mutual-exclusion-with-completion-event", # proposal 0052 attribution fixture (case 1) + proposal 0061 # (case 2: the §5.1 attribution lands on the detached trace's own # openarmature.invocation span). Wired together now that 0061 @@ -308,32 +329,52 @@ def _reset_otel_global_tracer_provider(restore_to: object) -> None: ("059-implementation-attribution-langfuse",), "proposal 0052 implementation attribution; covered by test_observability_langfuse.py", ), + # Fixture-harness catch-up tier 1 wired the rest of the 0057/0058 + # family into _SUPPORTED_FIXTURES; these three stay here, each blocked + # on a spec-side fixture change that python picks up at the v0.16.0 pin + # bump. ( - ( - "060-llm-completion-event-input-messages-populated", - "061-llm-completion-event-output-content-populated", - "062-llm-completion-event-request-params-populated", - "063-llm-completion-event-request-extras-populated", - "064-llm-completion-event-active-prompt-populated", - "066-llm-completion-event-active-prompt-group-populated", - "067-llm-completion-event-call-id-always-present-and-distinct", - "068-llm-completion-event-response-model-distinct-from-request", - ), - "proposal 0057 LlmCompletionEvent fields; covered by test_llm_provider.py", + ("066-llm-completion-event-active-prompt-group-populated",), + # At the current v0.70.1 pin the fixture's group has a single + # member, which python's PromptGroup (prompt-management §10, + # >=2 members) correctly rejects. The corrected >=2-member fixture + # ships at spec v0.74.1; wire it with the v0.16.0 pin bump. + "proposal 0057 active_prompt_group; corrected >=2-member fixture " + "ships at spec v0.74.1, wired with the v0.16.0 pin bump; covered by " + "test_llm_provider.py", ), ( - ("065-llm-completion-event-active-prompt-null",), - "proposal 0057 active_prompt null case; covered by test_observability_otel.py", + ("069-llm-failure-event-dispatch-on-provider-unavailable",), + # Asserts model "gpt-test" on the failed event but declares no + # request-side model the harness can bind (no calls_llm.model, and + # the 503 body carries no model). Needs a spec fixture fix to + # declare the request model, cf. 068. + "proposal 0058 LlmFailedEvent; fixture asserts a request model it " + "doesn't declare; covered by test_llm_provider.py", ), ( - ( - "069-llm-failure-event-dispatch-on-provider-unavailable", - "070-llm-failure-event-dispatch-on-provider-invalid-request", - "071-llm-failure-event-call-id-distinct-from-completion-event", - "072-llm-failure-event-mutual-exclusion-with-completion-event", - "073-llm-failure-event-error-type-vendor-specific", - ), - "proposal 0058 LlmFailedEvent; covered by test_llm_provider.py", + ("070-llm-failure-event-dispatch-on-provider-invalid-request",), + # The fixture's malformed message (tool role, no tool_call_id) is + # non-constructible in python: ToolMessage.tool_call_id is a required + # field, so the "MUST be present" rule (llm-provider §3) is enforced + # at construction, not the complete() boundary. python drives + # provider_invalid_request via the unmatched-tool_call_id shape. + "proposal 0058 provider_invalid_request; fixture's missing-" + "tool_call_id message is non-constructible in python; covered by " + "test_llm_provider.py", + ), + ( + ("073-llm-failure-event-error-type-vendor-specific",), + # The fixture asserts the vendor body ``error.type`` verbatim per + # case (rate_limit_exceeded / RateLimitError) plus a null case. + # python deliberately sources error_type from the OA exception + # class name (e.g. "ProviderRateLimit") -- a spec-permitted + # "exception class name" style, but it never echoes the body type + # nor emits null. The behavior is contract-conformant; the fixture + # over-constrains beyond the permissive field contract. + "proposal 0058 LlmFailedEvent.error_type; python uses the exception " + "class name, the fixture asserts the vendor body error.type; covered " + "by test_llm_provider.py", ), ) for fixture_id in fixture_ids @@ -477,6 +518,22 @@ async def test_observability_fixture(fixture_path: Path) -> None: await _run_fixture_055(spec) elif fixture_id == "056-llm-completion-event-strict-serial-ordering": await _run_fixture_056(spec) + elif fixture_id in { + "060-llm-completion-event-input-messages-populated", + "061-llm-completion-event-output-content-populated", + "062-llm-completion-event-request-params-populated", + "063-llm-completion-event-request-extras-populated", + "064-llm-completion-event-active-prompt-populated", + "065-llm-completion-event-active-prompt-null", + "068-llm-completion-event-response-model-distinct-from-request", + }: + await _run_typed_event_cases(spec) + elif fixture_id == "072-llm-failure-event-mutual-exclusion-with-completion-event": + await _run_typed_event_cases(spec, expect_failure=True) + elif fixture_id == "067-llm-completion-event-call-id-always-present-and-distinct": + await _run_typed_event_chain_cases(spec) + elif fixture_id == "071-llm-failure-event-call-id-distinct-from-completion-event": + await _run_typed_event_chain_cases(spec, expect_failure=True) elif fixture_id == "058-implementation-attribution-otel": await _run_fixture_058(spec) elif fixture_id == "084-langfuse-session-user-promotion": @@ -2939,25 +2996,7 @@ async def _run_llm_payload_case(case: Mapping[str, Any]) -> None: # ---- RuntimeConfig from the calls_llm.config block config_spec = cast("dict[str, Any] | None", calls_llm_spec.get("config")) - runtime_config: RuntimeConfig | None = None - if config_spec: - extras = cast("dict[str, Any]", config_spec.get("extras") or {}) - runtime_config_kwargs: dict[str, Any] = { - k: v - for k, v in config_spec.items() - if k - in { - "temperature", - "max_tokens", - "top_p", - "seed", - "frequency_penalty", - "presence_penalty", - "stop_sequences", - } - } - runtime_config_kwargs.update(extras) - runtime_config = RuntimeConfig(**runtime_config_kwargs) + runtime_config: RuntimeConfig | None = _build_runtime_config(config_spec) # ---- Provider knobs (provider.genai_system override) provider_spec = cast("dict[str, Any] | None", case.get("provider")) @@ -3740,15 +3779,75 @@ def _build_simple_llm_graph( and MUST call ``await provider.aclose()`` after invoke completes to release the underlying httpx.AsyncClient connection pool. """ - import json - - import httpx - from openarmature.graph import END, GraphBuilder - from openarmature.llm import OpenAIProvider, UserMessage + from openarmature.llm import OpenAIProvider from .adapter import build_state_cls + transport = _make_mock_transport(case) + state_fields = cast("dict[str, dict[str, Any]]", case["state"]["fields"]) + state_cls = build_state_cls("LlmTypedFixtureState", state_fields) + + nodes = cast("dict[str, Any]", case["nodes"]) + entry_name = cast("str", case["entry"]) + node_spec = cast("dict[str, Any]", nodes[entry_name]) + calls_llm_spec = cast("dict[str, Any]", node_spec["calls_llm"]) + stores_in = cast("str", calls_llm_spec.get("stores_response_in", "msg")) + + # Bind the provider to the request-side model. Priority: the node's + # declared ``calls_llm.model`` (the requested identifier per spec + # §5.5.7 -- 068 needs this to differ from the provider-returned + # response_model), else the model the first mock response reports + # (050-056 path), else a default. + bound_model = ( + cast("str | None", calls_llm_spec.get("model")) + or _mock_model_from_first_response(case) + or "test-model" + ) + provider = OpenAIProvider( + base_url="http://mock-llm.test", + model=bound_model, + api_key="test", + transport=transport, + populate_caller_metadata=populate_caller_metadata, + ) + + runtime_config = _build_runtime_config(cast("dict[str, Any] | None", calls_llm_spec.get("config"))) + + # A node may render a prompt before the call (064): the rendered + # PromptResult is stamped active for the complete() call and supplies + # the messages when the node declares none explicitly. + renders_prompt_name = cast("str | None", node_spec.get("renders_prompt")) + prompt_result = _render_prompt_result(case, renders_prompt_name) if renders_prompt_name else None + + messages_spec = cast("list[dict[str, str]]", calls_llm_spec.get("messages", [])) + if messages_spec: + messages = _materialize_typed_messages(messages_spec) + elif prompt_result is not None: + messages = list(prompt_result.messages) + else: + messages = [] + + async def ask_body(_s: Any) -> dict[str, str]: + response = await _complete_with_optional_prompt( + provider, messages, config=runtime_config, prompt_result=prompt_result + ) + return {stores_in: response.message.content or ""} + + builder = ( + GraphBuilder(state_cls).add_node(entry_name, ask_body).add_edge(entry_name, END).set_entry(entry_name) + ) + return builder.compile(), state_cls, provider + + +def _make_mock_transport(case: Mapping[str, Any]) -> Any: + """Build an httpx.MockTransport that replays the case's ``mock_llm`` + response queue in order, one response popped per request. + """ + import json + + import httpx + mock_responses = list(cast("list[dict[str, Any]]", case.get("mock_llm") or [])) def _handler(_request: httpx.Request) -> httpx.Response: @@ -3762,41 +3861,304 @@ def _handler(_request: httpx.Request) -> httpx.Response: headers={"Content-Type": "application/json"}, ) - transport = httpx.MockTransport(_handler) - # Bind the provider to the model the mock responses report (see - # the first response body). The typed event's ``model`` field - # carries the provider's bound identifier (the REQUEST-side - # model per spec §5.5.7); hard-coding "test-model" mismatches - # fixtures whose expected events name a specific model. + return httpx.MockTransport(_handler) + + +def _build_runtime_config(config_spec: Mapping[str, Any] | None) -> Any: + """Build a RuntimeConfig from a fixture's ``calls_llm.config`` block, or + None when absent. + """ + if not config_spec: + return None + from openarmature.llm.response import RuntimeConfig + + # The canonical sampling keys (observability §5.5.2) map to RuntimeConfig + # fields; everything under ``extras`` is the provider-specific extras bag + # (062 request_params, 063 request_extras). Mirrors the LLM-payload runner. + extras = cast("dict[str, Any]", config_spec.get("extras") or {}) + kwargs: dict[str, Any] = { + k: v + for k, v in config_spec.items() + if k + in { + "temperature", + "max_tokens", + "top_p", + "seed", + "frequency_penalty", + "presence_penalty", + "stop_sequences", + } + } + kwargs.update(extras) + return RuntimeConfig(**kwargs) + + +def _materialize_typed_messages(messages_spec: Sequence[Mapping[str, Any]]) -> list[Any]: + """Build typed Message objects from a fixture's ``calls_llm.messages`` list, + for the system / user / assistant roles the typed-event fixtures use. + """ + from openarmature.llm import AssistantMessage, SystemMessage, UserMessage + + # 060 sends a system + user pair the event echoes back in full, so dropping + # non-user roles would under-populate input_messages. + out: list[Any] = [] + for m in messages_spec: + role = m.get("role") + content = cast("str", m.get("content") or "") + if role == "system": + out.append(SystemMessage(content=content)) + elif role == "user": + out.append(UserMessage(content=content)) + elif role == "assistant": + out.append(AssistantMessage(content=content)) + else: + raise AssertionError(f"unsupported message role in typed-event fixture: {role!r}") + return out + + +def _render_prompt_result(case: Mapping[str, Any], prompt_name: str) -> Any: + """Build a PromptResult from ``prompt_backend.prompts.`` rendered + against ``render_variables``. + """ + import hashlib + from datetime import UTC, datetime + + from openarmature.llm import UserMessage + from openarmature.prompts import PromptResult + + # The renders_prompt: directive (064): the 5-field identity (name / version + # / label / template_hash / rendered_hash) is what the event's active_prompt + # asserts; the rendered messages drive the actual call. + prompts = cast("dict[str, dict[str, Any]]", case["prompt_backend"]["prompts"]) + entry = prompts[prompt_name] + variables = cast("dict[str, Any]", case.get("render_variables") or {}) + template = cast("str", entry.get("template", "")) + rendered = template + for key, value in variables.items(): + rendered = rendered.replace("{{" + key + "}}", str(value)).replace("{{ " + key + " }}", str(value)) + rendered_hash = "sha256:" + hashlib.sha256(rendered.encode("utf-8")).hexdigest()[:32] + now = datetime.now(UTC) + return PromptResult( + name=cast("str", entry["name"]), + version=cast("str", entry["version"]), + label=cast("str", entry["label"]), + template_hash=cast("str", entry["template_hash"]), + rendered_hash=rendered_hash, + messages=[UserMessage(content=rendered)], + variables=variables, + fetched_at=now, + rendered_at=now, + ) + + +async def _complete_with_optional_prompt( + provider: Any, + messages: Sequence[Any], + *, + config: Any, + prompt_result: Any, +) -> Any: + """Call ``provider.complete`` inside the active-prompt context when the node + rendered a prompt, otherwise call it directly. + """ + if prompt_result is not None: + from openarmature.prompts import with_active_prompt + + # Inside with_active_prompt so the provider stamps active_prompt onto + # the typed event. + with with_active_prompt(prompt_result): + return await provider.complete(messages, config=config) + return await provider.complete(messages, config=config) + + +def _build_chain_llm_graph( + case: Mapping[str, Any], + *, + populate_caller_metadata: bool, +) -> tuple[Any, type[Any], Any]: + """Build a multi-node graph where every node with a ``calls_llm`` block + calls one shared provider against one mock-response queue, wired per the + case's ``edges``. Used by the chain fixtures 067 (three success calls) and + 071 (success then failure). Returns ``(compiled, state_cls, provider)``; + the caller owns ``provider.aclose()``. + """ + from openarmature.graph import END, GraphBuilder + from openarmature.llm import OpenAIProvider + + from .adapter import build_state_cls + bound_model = _mock_model_from_first_response(case) or "test-model" provider = OpenAIProvider( base_url="http://mock-llm.test", model=bound_model, api_key="test", - transport=transport, + transport=_make_mock_transport(case), populate_caller_metadata=populate_caller_metadata, ) state_fields = cast("dict[str, dict[str, Any]]", case["state"]["fields"]) - state_cls = build_state_cls("LlmTypedFixtureState", state_fields) + state_cls = build_state_cls("LlmChainFixtureState", state_fields) nodes = cast("dict[str, Any]", case["nodes"]) - entry_name = cast("str", case["entry"]) - calls_llm_spec = cast("dict[str, Any]", nodes[entry_name]["calls_llm"]) - stores_in = cast("str", calls_llm_spec.get("stores_response_in", "msg")) - messages_spec = cast("list[dict[str, str]]", calls_llm_spec.get("messages", [])) - messages = [UserMessage(content=m["content"]) for m in messages_spec if m.get("role") == "user"] + builder = GraphBuilder(state_cls) - async def ask_body(_s: Any) -> dict[str, str]: - response = await provider.complete(messages) - return {stores_in: response.message.content or ""} + def _make_node_body(messages: list[Any], stores_in: str, config: Any) -> Any: + async def _body(_s: Any) -> dict[str, str]: + response = await provider.complete(messages, config=config) + return {stores_in: response.message.content or ""} + + return _body + + for node_name, raw in nodes.items(): + node = cast("dict[str, Any]", raw) + if "calls_llm" not in node: + raise AssertionError( + f"_build_chain_llm_graph only supports calls_llm nodes; {node_name!r} has none" + ) + calls_llm_spec = cast("dict[str, Any]", node["calls_llm"]) + stores_in = cast("str", calls_llm_spec.get("stores_response_in", "msg")) + messages = _materialize_typed_messages( + cast("list[dict[str, str]]", calls_llm_spec.get("messages", [])) + ) + config = _build_runtime_config(cast("dict[str, Any] | None", calls_llm_spec.get("config"))) + builder.add_node(node_name, _make_node_body(messages, stores_in, config)) + + for edge in cast("list[dict[str, str]]", case.get("edges") or []): + target = edge["to"] + builder.add_edge(edge["from"], END if target == "END" else target) + builder.set_entry(cast("str", case["entry"])) - builder = ( - GraphBuilder(state_cls).add_node(entry_name, ask_body).add_edge(entry_name, END).set_entry(entry_name) - ) return builder.compile(), state_cls, provider +def _assert_expected_error_if_present(case: Mapping[str, Any], exc: Exception) -> None: + """When the case carries an ``expected_error`` block (071/072), assert the + raised exception's cause chain carries the declared ``category`` AND + originates at the declared ``raised_from`` node. + """ + # category (llm-provider §7) sits on the inner LlmProviderError; raised_from + # sits on the engine's NodeException wrapper as node_name -- both in one + # chain. Complements contains_event: LlmFailedEvent: per 0058's exception- + # flow-preserved contract both must hold (the event fires AND the exception + # still raises), so this is not a substitute for the event check. + expected_error = cast("dict[str, Any] | None", case.get("expected_error")) + if not expected_error: + return + category = cast("str", expected_error["category"]) + raised_from = cast("str | None", expected_error.get("raised_from")) + found_category = False + node_match = raised_from is None + err: Any = exc + while err is not None: + if getattr(err, "category", None) == category: + found_category = True + if raised_from is not None and getattr(err, "node_name", None) == raised_from: + node_match = True + err = getattr(err, "__cause__", None) + if not found_category: + raise AssertionError( + f"expected_error category {category!r} not found in the raised exception cause chain" + ) + if not node_match: + raise AssertionError( + f"expected_error raised_from {raised_from!r} not found " + f"(no NodeException for that node in the cause chain)" + ) + + +def _assert_call_id_invariants( + case: Mapping[str, Any], + collectors: Mapping[str, _TypedEventCollector], +) -> None: + """Machine-check the call_id presence/distinctness invariants that 067/071 + declare in their ``invariants`` block. A no-op for cases with no call_id + invariant. + """ + # The fixtures' ``expected`` blocks assert only event counts, which don't + # capture the per-call call_id freshness contract they're named for; this + # closes that gap. Scoped to the terminal events (LlmCompletionEvent / + # LlmFailedEvent): the per-attempt LlmRetryAttemptEvent shares its call's + # call_id, so including it would false-collide. + invariants = cast("dict[str, Any]", case.get("invariants") or {}) + if not any("call_id" in key for key in invariants): + return + # Gather terminal events across every collector, deduped by identity, so a + # filtered-only collector still yields the call_ids (no silent no-op). + seen: set[int] = set() + ids: list[str] = [] + for collector in collectors.values(): + for event in collector.events: + if type(event).__name__ in {"LlmCompletionEvent", "LlmFailedEvent"} and id(event) not in seen: + seen.add(id(event)) + ids.append(cast("str", event.call_id)) + for cid in ids: + assert isinstance(cid, str) and cid != "", ( + f"call_id invariant: every terminal event's call_id MUST be a non-empty string; got {cid!r}" + ) + assert len(ids) == len(set(ids)), ( + f"call_id invariant: terminal-event call_ids MUST be pairwise distinct; got {ids!r}" + ) + + +async def _run_typed_event_chain_case( + case: Mapping[str, Any], + *, + expect_failure: bool = False, +) -> None: + """Runner for the multi-node-chain typed-event cases (067 success chain, + 071 success-then-failure chain). Mirrors _run_typed_event_fixture_case but + builds the graph via _build_chain_llm_graph. + """ + collectors, populate_caller_metadata = _parse_typed_observers(case) + graph, state_cls, provider = _build_chain_llm_graph( + case, populate_caller_metadata=populate_caller_metadata + ) + try: + extra: _AllEventsCollector | None = None + if expect_failure and not any(c.filter_event_type is None for c in collectors.values()): + extra = _AllEventsCollector() + final, exc = await _invoke_typed_fixture(case, collectors, graph, state_cls, extra_observer=extra) + if expect_failure: + assert exc is not None, "failure-path chain fixture expected an exception" + _assert_expected_error_if_present(case, exc) + elif final is None: + raise AssertionError("expected a non-None final state on success path") + expected = cast("dict[str, Any]", case.get("expected") or {}) + observer_expectations = cast("dict[str, Any]", expected.get("observers") or {}) + for name, expectations in observer_expectations.items(): + collector = collectors.get(name) + if collector is None: + raise AssertionError(f"fixture references unknown observer {name!r}") + _assert_observer_expectations(name, collector, cast("dict[str, Any]", expectations)) + _assert_call_id_invariants(case, collectors) + finally: + await provider.aclose() + + +async def _run_typed_event_cases(spec: Mapping[str, Any], *, expect_failure: bool = False) -> None: + """Iterate the simple single-node typed-event cases (060-065, 068 success; + 072 failure), each through _run_typed_event_fixture_case. + """ + for case in cast("list[dict[str, Any]]", spec["cases"]): + case_name = cast("str", case["name"]) + try: + await _run_typed_event_fixture_case(case, expect_failure=expect_failure) + except AssertionError as e: + raise AssertionError(f"case {case_name!r}: {e}") from e + + +async def _run_typed_event_chain_cases(spec: Mapping[str, Any], *, expect_failure: bool = False) -> None: + """Iterate the multi-node-chain typed-event cases (067 success, 071 + failure).""" + for case in cast("list[dict[str, Any]]", spec["cases"]): + case_name = cast("str", case["name"]) + try: + await _run_typed_event_chain_case(case, expect_failure=expect_failure) + except AssertionError as e: + raise AssertionError(f"case {case_name!r}: {e}") from e + + def _make_state_instance(case: Mapping[str, Any], state_cls: type[Any]) -> Any: """Construct a State instance from the case's ``initial_state`` plus field defaults declared on the fixture state schema. @@ -3863,6 +4225,10 @@ async def _invoke_typed_fixture( # proposal 0063 (092-094) spelling for an exact-count assertion, # same shape as contains_exactly_n_events_of_type. "event_count", + # proposal 0058 (071/072): list form of the scalar event_count, one + # {event_type, count} entry per asserted type in the same observer + # block. + "event_counts", "does_not_contain_event_of_type", "captured_event_field_values_cover", "every_captured_event_has", @@ -3920,6 +4286,14 @@ def _assert_observer_expectations( assert len(matching) == expected_count, ( f"observer {name!r}: expected exactly {expected_count} {type_name} events; got {len(matching)}" ) + if "event_counts" in spec: + for item in cast("list[dict[str, Any]]", spec["event_counts"]): + type_name = cast("str", item["event_type"]) + expected_count = int(cast("int", item["count"])) + matching = [e for e in events if type(e).__name__ == type_name] + assert len(matching) == expected_count, ( + f"observer {name!r}: expected {expected_count} {type_name} events; got {len(matching)}" + ) if "does_not_contain_event_of_type" in spec: type_name = cast("str", spec["does_not_contain_event_of_type"]) matching = [e for e in events if type(e).__name__ == type_name] @@ -3978,29 +4352,60 @@ def _assert_contains_event( """ type_name = cast("str", spec["event_type"]) expected_fields = cast("dict[str, Any]", spec.get("fields") or {}) + # ``fields_absent_keys`` (062, conformance-adapter §3.2): the named field + # MUST be a mapping AND none of the listed keys may appear in it. A + # matching event must satisfy both ``fields`` and ``fields_absent_keys``. + absent_keys_spec = cast("dict[str, list[str]]", spec.get("fields_absent_keys") or {}) matching_type = [e for e in events if type(e).__name__ == type_name] assert matching_type, ( f"observer {observer_name!r}: contains_event expected at least one {type_name}; got none" ) for event in matching_type: - if _event_fields_match(event, expected_fields): + if _event_fields_match(event, expected_fields) and _event_fields_absent_keys(event, absent_keys_spec): return raise AssertionError( - f"observer {observer_name!r}: no {type_name} event matched fields {expected_fields!r}; " + f"observer {observer_name!r}: no {type_name} event matched fields {expected_fields!r} " + f"with absent keys {absent_keys_spec!r}; " f"captured: {[_event_to_repr(e) for e in matching_type]}" ) +def _event_fields_absent_keys(event: Any, absent_spec: Mapping[str, Sequence[str]]) -> bool: + """Return True when, for each ``field -> [keys]`` in ``absent_spec``, the + event's field is a mapping containing none of the listed keys. Raises when + the fixture names a field that doesn't exist on the event (typo guard, + matching ``_event_fields_match``). + """ + # Absence-is-meaningful (conformance-adapter §3.2): a key present with a + # null value still counts as present and fails the check. + for field_name, keys in absent_spec.items(): + if not hasattr(event, field_name): + raise AssertionError( + f"fields_absent_keys references field {field_name!r} that does not exist on " + f"{type(event).__name__}; check for typos in the fixture YAML" + ) + actual = getattr(event, field_name) + if not isinstance(actual, Mapping): + return False + actual_map = cast("Mapping[str, Any]", actual) + for key in keys: + if key in actual_map: + return False + return True + + def _event_fields_match(event: Any, expected: Mapping[str, Any]) -> bool: - """Return True when every key in ``expected`` matches the event's - field. Nested ``usage`` mappings compare against the Usage record - via attribute access; mapping equality otherwise uses ``==``. - - Raises AssertionError when the fixture names a field that doesn't - exist on the event type. Upstream filtering by event type means - a missing attribute signals a fixture-side typo (e.g., - ``node_nam: null`` instead of ``node_name: null``), not a None - value worth silently matching. + """Return True when every key in ``expected`` matches the event's field. + + Comparison delegates to ``_value_matches``, which handles the fixture + idioms: the ```` value-token, list-vs-tuple sequences + (``namespace``), and nested mappings compared against either a Mapping or + a record's attributes (``usage`` Usage, ``active_prompt`` PromptResult). + + Raises AssertionError when the fixture names a field that doesn't exist on + the event type. Upstream filtering by event type means a missing attribute + signals a fixture-side typo (e.g. ``node_nam: null`` instead of + ``node_name: null``), not a None value worth silently matching. """ for field_name, expected_value in expected.items(): if not hasattr(event, field_name): @@ -4008,28 +4413,48 @@ def _event_fields_match(event: Any, expected: Mapping[str, Any]) -> bool: f"fixture references field {field_name!r} that does not exist on " f"{type(event).__name__}; check for typos in the fixture YAML" ) - actual: Any = getattr(event, field_name) - # The 050 fixture's ``usage`` field expectation is a flat - # mapping; the typed event carries a Usage instance. Compare - # field-by-field. - if field_name == "usage" and isinstance(expected_value, Mapping) and actual is not None: - expected_mapping = cast("Mapping[str, Any]", expected_value) - for sub_name, sub_value in expected_mapping.items(): - if getattr(actual, sub_name, None) != sub_value: - return False - continue - # The 050 fixture's ``namespace`` expectation is a list; the - # typed event carries a tuple. Compare as sequences. - if isinstance(expected_value, list) and isinstance(actual, tuple): - actual_tuple = cast("tuple[Any, ...]", actual) - if list(actual_tuple) != expected_value: - return False - continue - if actual != expected_value: + if not _value_matches(getattr(event, field_name), expected_value): return False return True +def _value_matches(actual: Any, expected: Any) -> bool: + """Match one captured value against a fixture's expected value. + + - ````: any non-empty string (an empty string fails); used by + 064's ``rendered_hash``. + - A list expectation against a tuple (the event carries ``namespace`` as a + tuple) compares as sequences. + - A mapping expectation compares against either a Mapping or a record's + attributes (``usage`` -> Usage instance, ``active_prompt`` -> + PromptResult), recursing so inner tokens still apply. + - Everything else is plain equality (None matched exactly). + """ + # (conformance-adapter §3.2) matches any NON-EMPTY string; an + # empty string is non-null but MUST fail (spec ruling on Q3). + if expected == "": + return isinstance(actual, str) and actual != "" + if isinstance(expected, list) and isinstance(actual, tuple): + actual = list(cast("tuple[Any, ...]", actual)) + if isinstance(expected, Mapping): + if actual is None: + return False + for key, sub_expected in cast("Mapping[str, Any]", expected).items(): + if isinstance(actual, Mapping): + actual_mapping = cast("Mapping[str, Any]", actual) + if key not in actual_mapping: + return False + sub_actual = actual_mapping[key] + elif hasattr(actual, key): + sub_actual = getattr(actual, key) + else: + return False + if not _value_matches(sub_actual, sub_expected): + return False + return True + return bool(actual == expected) + + def _event_to_repr(event: Any) -> dict[str, Any]: """Compact field dump for assertion error messages.""" keys = ("invocation_id", "node_name", "namespace", "model", "provider", "finish_reason") @@ -4283,6 +4708,7 @@ async def _run_typed_event_fixture_case( expected = cast("dict[str, Any]", case.get("expected") or {}) if expect_failure: assert exc is not None, "failure-path fixture expected an exception" + _assert_expected_error_if_present(case, exc) node_completed = cast("dict[str, Any] | None", expected.get("node_completed_event_carries_error")) if node_completed: # Source for the assertion: an unfiltered named collector @@ -4304,6 +4730,7 @@ async def _run_typed_event_fixture_case( if collector is None: raise AssertionError(f"fixture references unknown observer {name!r}") _assert_observer_expectations(name, collector, cast("dict[str, Any]", expectations)) + _assert_call_id_invariants(case, collectors) finally: # _build_simple_llm_graph hands ownership of the provider's # httpx.AsyncClient to the runner; close it to release the From 005ad21d16d284ef12a07915ec3c97dfe2e0bc57 Mon Sep 17 00:00:00 2001 From: chris-colinsky Date: Tue, 23 Jun 2026 20:06:01 -0700 Subject: [PATCH 2/2] Apply review fixes to tier-1 fixture harness Address review feedback on the tier-1 fixture wiring: - _render_prompt_result derives rendered_hash via the canonical compute_rendered_hash(messages) helper instead of a bespoke truncated SHA, dropping the hashlib import. - _build_runtime_config is annotated RuntimeConfig | None (via a TYPE_CHECKING import) instead of Any, restoring type information at the provider.complete call sites. - _materialize_typed_messages asserts system/user content is a present, non-empty string instead of coercing to empty, so a fixture mistake fails on the real field rather than as a downstream model ValueError. --- tests/conformance/test_observability.py | 37 +++++++++++++++++-------- 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/tests/conformance/test_observability.py b/tests/conformance/test_observability.py index 007e938..24283f5 100644 --- a/tests/conformance/test_observability.py +++ b/tests/conformance/test_observability.py @@ -54,6 +54,8 @@ if TYPE_CHECKING: from opentelemetry.sdk.trace import ReadableSpan + from openarmature.llm.response import RuntimeConfig + # OTel SDK 1.x makes ``set_tracer_provider`` one-shot: once a non-default # provider is set, subsequent calls are no-ops (the SDK logs a warning @@ -2961,7 +2963,6 @@ async def _run_llm_payload_case(case: Mapping[str, Any]) -> None: from openarmature.graph import END, GraphBuilder from openarmature.llm import OpenAIProvider - from openarmature.llm.response import RuntimeConfig from .adapter import build_state_cls from .harness.llm_attribute_assertions import ( @@ -3864,7 +3865,7 @@ def _handler(_request: httpx.Request) -> httpx.Response: return httpx.MockTransport(_handler) -def _build_runtime_config(config_spec: Mapping[str, Any] | None) -> Any: +def _build_runtime_config(config_spec: Mapping[str, Any] | None) -> RuntimeConfig | None: """Build a RuntimeConfig from a fixture's ``calls_llm.config`` block, or None when absent. """ @@ -3894,6 +3895,18 @@ def _build_runtime_config(config_spec: Mapping[str, Any] | None) -> Any: return RuntimeConfig(**kwargs) +def _require_text_content(role: object, content: object) -> str: + """Assert a fixture message's ``content`` is a present, non-empty string and + return it (the system/user roles require this). + """ + # Assert with the role + value rather than coercing, so a fixture mistake + # surfaces on the real field instead of a downstream model ValueError. + assert isinstance(content, str) and content != "", ( + f"{role} message content MUST be a present non-empty string; got {content!r}" + ) + return content + + def _materialize_typed_messages(messages_spec: Sequence[Mapping[str, Any]]) -> list[Any]: """Build typed Message objects from a fixture's ``calls_llm.messages`` list, for the system / user / assistant roles the typed-event fixtures use. @@ -3905,13 +3918,14 @@ def _materialize_typed_messages(messages_spec: Sequence[Mapping[str, Any]]) -> l out: list[Any] = [] for m in messages_spec: role = m.get("role") - content = cast("str", m.get("content") or "") + content = m.get("content") if role == "system": - out.append(SystemMessage(content=content)) + out.append(SystemMessage(content=_require_text_content(role, content))) elif role == "user": - out.append(UserMessage(content=content)) + out.append(UserMessage(content=_require_text_content(role, content))) elif role == "assistant": - out.append(AssistantMessage(content=content)) + # Assistant content is optional (tool-call-only messages carry none). + out.append(AssistantMessage(content=cast("str", content or ""))) else: raise AssertionError(f"unsupported message role in typed-event fixture: {role!r}") return out @@ -3921,11 +3935,10 @@ def _render_prompt_result(case: Mapping[str, Any], prompt_name: str) -> Any: """Build a PromptResult from ``prompt_backend.prompts.`` rendered against ``render_variables``. """ - import hashlib from datetime import UTC, datetime - from openarmature.llm import UserMessage - from openarmature.prompts import PromptResult + from openarmature.llm import Message, UserMessage + from openarmature.prompts import PromptResult, compute_rendered_hash # The renders_prompt: directive (064): the 5-field identity (name / version # / label / template_hash / rendered_hash) is what the event's active_prompt @@ -3937,15 +3950,15 @@ def _render_prompt_result(case: Mapping[str, Any], prompt_name: str) -> Any: rendered = template for key, value in variables.items(): rendered = rendered.replace("{{" + key + "}}", str(value)).replace("{{ " + key + " }}", str(value)) - rendered_hash = "sha256:" + hashlib.sha256(rendered.encode("utf-8")).hexdigest()[:32] + messages: list[Message] = [UserMessage(content=rendered)] now = datetime.now(UTC) return PromptResult( name=cast("str", entry["name"]), version=cast("str", entry["version"]), label=cast("str", entry["label"]), template_hash=cast("str", entry["template_hash"]), - rendered_hash=rendered_hash, - messages=[UserMessage(content=rendered)], + rendered_hash=compute_rendered_hash(messages), + messages=messages, variables=variables, fetched_at=now, rendered_at=now,