diff --git a/examples/tutorials/00_sync/010_multiturn/.ipynb_checkpoints/dev-checkpoint.ipynb b/examples/tutorials/00_sync/010_multiturn/.ipynb_checkpoints/dev-checkpoint.ipynb new file mode 100644 index 000000000..d82cf5775 --- /dev/null +++ b/examples/tutorials/00_sync/010_multiturn/.ipynb_checkpoints/dev-checkpoint.ipynb @@ -0,0 +1,166 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "0", + "metadata": {}, + "outputs": [], + "source": [ + "from agentex import Agentex\n", + "\n", + "client = Agentex(base_url=\"http://localhost:5003\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1", + "metadata": {}, + "outputs": [], + "source": [ + "AGENT_NAME = \"s010-multiturn\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2", + "metadata": {}, + "outputs": [], + "source": [ + "# # (Optional) Create a new task. If you don't create a new task, each message will be sent to a new task. The server will create the task for you.\n", + "\n", + "# import uuid\n", + "\n", + "# TASK_ID = str(uuid.uuid4())[:8]\n", + "\n", + "# rpc_response = client.agents.rpc_by_name(\n", + "# agent_name=AGENT_NAME,\n", + "# method=\"task/create\",\n", + "# params={\n", + "# \"name\": f\"{TASK_ID}-task\",\n", + "# \"params\": {}\n", + "# }\n", + "# )\n", + "\n", + "# task = rpc_response.result\n", + "# print(task)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3", + "metadata": {}, + "outputs": [], + "source": [ + "# Test non streaming response\n", + "from agentex.types import TextContent\n", + "\n", + "# The response is expected to be a list of TaskMessage objects, which is a union of the following types:\n", + "# - TextContent: A message with just text content \n", + "# - DataContent: A message with JSON-serializable data content\n", + "# - ToolRequestContent: A message with a tool request, which contains a JSON-serializable request to call a tool\n", + "# - ToolResponseContent: A message with a tool response, which contains response object from a tool call in its content\n", + "\n", + "# When processing the message/send response, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n", + "\n", + "rpc_response = client.agents.send_message(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n", + " \"stream\": False\n", + " }\n", + ")\n", + "\n", + "if not rpc_response or not rpc_response.result:\n", + " raise ValueError(\"No result in response\")\n", + "\n", + "# Extract and print just the text content from the response\n", + "for task_message in rpc_response.result:\n", + " content = task_message.content\n", + " if isinstance(content, TextContent):\n", + " text = content.content\n", + " print(text)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4", + "metadata": {}, + "outputs": [], + "source": [ + "# Test streaming response\n", + "from agentex.types.text_delta import TextDelta\n", + "from agentex.types.task_message_update import StreamTaskMessageFull, StreamTaskMessageDelta\n", + "\n", + "# The result object of message/send will be a TaskMessageUpdate which is a union of the following types:\n", + "# - StreamTaskMessageStart: \n", + "# - An indicator that a streaming message was started, doesn't contain any useful content\n", + "# - StreamTaskMessageDelta: \n", + "# - A delta of a streaming message, contains the text delta to aggregate\n", + "# - StreamTaskMessageDone: \n", + "# - An indicator that a streaming message was done, doesn't contain any useful content\n", + "# - StreamTaskMessageFull: \n", + "# - A non-streaming message, there is nothing to aggregate, since this contains the full message, not deltas\n", + "\n", + "# Whenn processing StreamTaskMessageDelta, if you are expecting more than TextDeltas, such as DataDelta, ToolRequestDelta, or ToolResponseDelta, you can process them as well\n", + "# Whenn processing StreamTaskMessageFull, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n", + "\n", + "for agent_rpc_response_chunk in client.agents.send_message_stream(\n", + " agent_name=AGENT_NAME,\n", + " params={\n", + " \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n", + " \"stream\": True\n", + " }\n", + "):\n", + " # We know that the result of the message/send when stream is set to True will be a TaskMessageUpdate\n", + " task_message_update = agent_rpc_response_chunk.result\n", + " # Print oly the text deltas as they arrive or any full messages\n", + " if isinstance(task_message_update, StreamTaskMessageDelta):\n", + " delta = task_message_update.delta\n", + " if isinstance(delta, TextDelta):\n", + " print(delta.text_delta, end=\"\", flush=True)\n", + " else:\n", + " print(f\"Found non-text {type(task_message)} object in streaming message.\")\n", + " elif isinstance(task_message_update, StreamTaskMessageFull):\n", + " content = task_message_update.content\n", + " if isinstance(content, TextContent):\n", + " print(content.content)\n", + " else:\n", + " print(f\"Found non-text {type(task_message)} object in full message.\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/tutorials/00_sync/010_multiturn/dev.ipynb b/examples/tutorials/00_sync/010_multiturn/dev.ipynb index d82cf5775..c7c50532f 100644 --- a/examples/tutorials/00_sync/010_multiturn/dev.ipynb +++ b/examples/tutorials/00_sync/010_multiturn/dev.ipynb @@ -144,7 +144,7 @@ ], "metadata": { "kernelspec": { - "display_name": ".venv", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, @@ -158,7 +158,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.9" + "version": "3.14.2" } }, "nbformat": 4, diff --git a/src/agentex/lib/adk/_modules/tracing.py b/src/agentex/lib/adk/_modules/tracing.py index 67150f01d..8694c2078 100644 --- a/src/agentex/lib/adk/_modules/tracing.py +++ b/src/agentex/lib/adk/_modules/tracing.py @@ -67,14 +67,13 @@ def _tracing_service(self) -> TracingService: if self._tracing_service_lazy is None or (loop_id is not None and loop_id != self._bound_loop_id): import httpx - # Disable keepalive so each span HTTP call gets a fresh TCP - # connection. Reused connections carry asyncio primitives bound - # to the event loop that created them; in sync-ACP / streaming - # contexts the loop context can shift between calls, causing - # "bound to a different event loop" RuntimeErrors. + # Keepalive ON: connections are reused within a single event + # loop, eliminating the TLS-handshake-per-span penalty under + # load. Cross-loop safety is preserved by rebuilding the + # client whenever loop_id changes (the conditional above). agentex_client = create_async_agentex_client( http_client=httpx.AsyncClient( - limits=httpx.Limits(max_keepalive_connections=0), + limits=httpx.Limits(max_keepalive_connections=20), ), ) tracer = AsyncTracer(agentex_client) diff --git a/src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py b/src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py index f91634b64..98d50546b 100644 --- a/src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py +++ b/src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py @@ -1,4 +1,6 @@ -from typing import Any, Dict, override +import asyncio +import weakref +from typing import TYPE_CHECKING, Any, Dict, override from agentex import Agentex from agentex.types.span import Span @@ -9,6 +11,9 @@ AsyncTracingProcessor, ) +if TYPE_CHECKING: + from agentex import AsyncAgentex + class AgentexSyncTracingProcessor(SyncTracingProcessor): def __init__(self, config: AgentexTracingProcessorConfig): # noqa: ARG002 @@ -67,19 +72,40 @@ def shutdown(self) -> None: class AgentexAsyncTracingProcessor(AsyncTracingProcessor): def __init__(self, config: AgentexTracingProcessorConfig): # noqa: ARG002 + # Per-event-loop client cache. httpx.AsyncClient is bound to the + # loop that created it, so in sync-ACP / streaming contexts (where + # the active loop can change between requests) we keep one client + # per loop instead of disabling keepalive entirely. The cache is a + # WeakKeyDictionary so a GC'd loop and its client are evicted + # automatically — using id() as a key would reuse entries when + # CPython recycles a freed loop's memory address. + self._clients_by_loop: weakref.WeakKeyDictionary[ + asyncio.AbstractEventLoop, "AsyncAgentex" + ] = weakref.WeakKeyDictionary() + + def _build_client(self) -> "AsyncAgentex": import httpx - # Disable keepalive so each span HTTP call gets a fresh TCP connection. - # Reused connections carry asyncio primitives bound to the event loop - # that created them; in sync-ACP / streaming contexts the loop context - # can shift between calls, causing "bound to a different event loop" - # RuntimeErrors. - self.client = create_async_agentex_client( + # Keepalive ON: connections are reused within a single event loop, + # eliminating the TLS-handshake-per-span penalty under load. + return create_async_agentex_client( http_client=httpx.AsyncClient( - limits=httpx.Limits(max_keepalive_connections=0), + limits=httpx.Limits(max_keepalive_connections=20), ), ) + @property + def client(self) -> "AsyncAgentex": + try: + loop = asyncio.get_running_loop() + except RuntimeError: + return self._build_client() + client = self._clients_by_loop.get(loop) + if client is None: + client = self._build_client() + self._clients_by_loop[loop] = client + return client + # TODO(AGX1-199): Add batch create/update endpoints to Agentex API and use # them here instead of one HTTP call per span. # https://linear.app/scale-epd/issue/AGX1-199/add-agentex-batch-endpoint-for-traces diff --git a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py index 3a1c96c1b..a21eff7d3 100644 --- a/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py +++ b/src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py @@ -1,5 +1,7 @@ from __future__ import annotations +import asyncio +import weakref from typing import cast, override import scale_gp_beta.lib.tracing as tracing @@ -92,23 +94,50 @@ def shutdown(self) -> None: class SGPAsyncTracingProcessor(AsyncTracingProcessor): def __init__(self, config: SGPTracingProcessorConfig): self.disabled = config.sgp_api_key == "" or config.sgp_account_id == "" + self._config = config + # Per-event-loop client cache. httpx.AsyncClient ties its connection + # pool to the loop it was created on; in sync-ACP / streaming contexts + # the active loop can change between requests. Caching per loop lets + # us keep keepalive on within each loop while staying safe across + # loops. The cache is a WeakKeyDictionary so a GC'd loop and its + # client are evicted automatically — using id() as a key would reuse + # entries when CPython recycles a freed loop's memory address. + self._clients_by_loop: weakref.WeakKeyDictionary[ + asyncio.AbstractEventLoop, AsyncSGPClient + ] = weakref.WeakKeyDictionary() + self.env_vars = EnvironmentVariables.refresh() + + def _build_client(self) -> AsyncSGPClient: import httpx - # Disable keepalive so each HTTP call gets a fresh TCP connection, - # avoiding "bound to a different event loop" errors in sync-ACP. - self.sgp_async_client = ( - AsyncSGPClient( - api_key=config.sgp_api_key, - account_id=config.sgp_account_id, - base_url=config.sgp_base_url, - http_client=httpx.AsyncClient( - limits=httpx.Limits(max_keepalive_connections=0), - ), - ) - if not self.disabled - else None + return AsyncSGPClient( + api_key=self._config.sgp_api_key, + account_id=self._config.sgp_account_id, + base_url=self._config.sgp_base_url, + # Keepalive ON: connections are reused within a single event loop, + # which removes the TLS-handshake-per-span penalty observed under + # load. Cross-loop safety is preserved by the per-loop cache. + http_client=httpx.AsyncClient( + limits=httpx.Limits(max_keepalive_connections=20), + ), ) - self.env_vars = EnvironmentVariables.refresh() + + def _get_client(self) -> AsyncSGPClient | None: + """Return the AsyncSGPClient bound to the current event loop, creating + one on first use. Returns None when the processor is disabled.""" + if self.disabled: + return None + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # Called from outside an event loop — should not happen on the + # hot path, but build a one-off client rather than crashing. + return self._build_client() + client = self._clients_by_loop.get(loop) + if client is None: + client = self._build_client() + self._clients_by_loop[loop] = client + return client @override async def on_span_start(self, span: Span) -> None: @@ -123,31 +152,29 @@ async def on_spans_start(self, spans: list[Span]) -> None: if not spans: return - sgp_spans = [_build_sgp_span(span, self.env_vars) for span in spans] - - if self.disabled: + client = self._get_client() + if client is None: logger.warning("SGP is disabled, skipping span upsert") return - await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr] - items=[s.to_request_params() for s in sgp_spans] - ) + + sgp_spans = [_build_sgp_span(span, self.env_vars) for span in spans] + await client.spans.upsert_batch(items=[s.to_request_params() for s in sgp_spans]) @override async def on_spans_end(self, spans: list[Span]) -> None: if not spans: return + client = self._get_client() + if client is None: + return + sgp_spans: list[SGPSpan] = [] for span in spans: sgp_span = _build_sgp_span(span, self.env_vars) sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr] sgp_spans.append(sgp_span) - - if self.disabled: - return - await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr] - items=[s.to_request_params() for s in sgp_spans] - ) + await client.spans.upsert_batch(items=[s.to_request_params() for s in sgp_spans]) @override async def shutdown(self) -> None: diff --git a/src/agentex/lib/core/tracing/span_queue.py b/src/agentex/lib/core/tracing/span_queue.py index d0d92669e..f9105718b 100644 --- a/src/agentex/lib/core/tracing/span_queue.py +++ b/src/agentex/lib/core/tracing/span_queue.py @@ -1,5 +1,6 @@ from __future__ import annotations +import os import asyncio from enum import Enum from dataclasses import dataclass @@ -13,6 +14,47 @@ logger = make_logger(__name__) _DEFAULT_BATCH_SIZE = 50 +_DEFAULT_LINGER_MS = 100 +# 0 == unbounded (preserves prior behavior). A bound makes backpressure +# visible (dropped spans are counted) and caps worst-case memory. +_DEFAULT_MAX_SIZE = 0 +# Total attempts per batch for a *transient* failure (1 == no retry). +_DEFAULT_MAX_RETRIES = 1 +# HTTP statuses worth retrying at the queue level. These are explicit +# backpressure / transient signals; everything else (esp. 401/403/4xx auth and +# validation errors) is a permanent failure that re-enqueuing cannot fix. Note +# the underlying SGP client already retries these internally, so queue-level +# retry only helps when its budget is exhausted by a longer blip. +_RETRYABLE_STATUS_CODES = frozenset({429, 500, 502, 503, 504}) + + +def _read_int_env(name: str, default: int, *, minimum: int = 0) -> int: + """Read a non-negative int from the environment, clamping to ``minimum`` + and falling back to ``default`` when unset or unparseable.""" + raw = os.environ.get(name) + if raw is None: + return default + try: + return max(minimum, int(raw)) + except ValueError: + logger.warning("Ignoring invalid %s=%r; using default %d", name, raw, default) + return default + + +def _read_linger_ms_env() -> int: + """Read AGENTEX_SPAN_QUEUE_LINGER_MS from the environment, falling back to + _DEFAULT_LINGER_MS when unset or unparseable. Negative values are clamped + to 0 (i.e. "drain immediately, no linger").""" + return _read_int_env("AGENTEX_SPAN_QUEUE_LINGER_MS", _DEFAULT_LINGER_MS) + + +def _is_retryable_exc(exc: BaseException) -> bool: + """A failure is retryable only when it carries an HTTP ``status_code`` in + the retryable set. Connection/timeout errors (no status_code) have already + been retried by the SGP client, and bare exceptions (programming bugs) must + never be retried — re-enqueuing them would spin forever.""" + status_code = getattr(exc, "status_code", None) + return isinstance(status_code, int) and status_code in _RETRYABLE_STATUS_CODES class SpanEventType(str, Enum): @@ -25,6 +67,9 @@ class _SpanQueueItem: event_type: SpanEventType span: Span processors: list[AsyncTracingProcessor] + # Number of times this item has already been dispatched. Used to bound + # re-enqueue on transient failures. + attempts: int = 0 class AsyncSpanQueue: @@ -35,13 +80,69 @@ class AsyncSpanQueue: batch are flushed concurrently, then all END events, so that per-span start-before-end ordering is preserved while HTTP calls for independent spans execute in parallel. + + Once the drain loop picks up the first item, it lingers up to + ``linger_ms`` waiting for more items to coalesce into the same batch. + Without the linger the drain almost always returned size-1 batches under + real agent workloads, because spans typically arrive a few ms apart. + + Reliability: + - ``max_size`` bounds the queue. When full, new events are dropped and + counted (see ``dropped_spans``) rather than growing memory without limit. + ``0`` keeps the queue unbounded. + - A batch that fails with a *transient* HTTP status (429/5xx) is + re-enqueued up to ``max_retries`` total attempts. Permanent failures + (auth/validation/bugs) are dropped and counted immediately. """ - def __init__(self, batch_size: int = _DEFAULT_BATCH_SIZE) -> None: - self._queue: asyncio.Queue[_SpanQueueItem] = asyncio.Queue() + def __init__( + self, + batch_size: int = _DEFAULT_BATCH_SIZE, + linger_ms: int | None = None, + max_size: int | None = None, + max_retries: int | None = None, + ) -> None: + resolved_max_size = ( + _read_int_env("AGENTEX_SPAN_QUEUE_MAX_SIZE", _DEFAULT_MAX_SIZE) if max_size is None else max(0, max_size) + ) + self._queue: asyncio.Queue[_SpanQueueItem] = asyncio.Queue(maxsize=resolved_max_size) self._drain_task: asyncio.Task[None] | None = None self._stopping = False self._batch_size = batch_size + self._linger_ms = _read_linger_ms_env() if linger_ms is None else max(0, linger_ms) + self._max_retries = ( + _read_int_env("AGENTEX_SPAN_QUEUE_MAX_RETRIES", _DEFAULT_MAX_RETRIES, minimum=1) + if max_retries is None + else max(1, max_retries) + ) + # Total spans dropped for any reason (full queue, shutdown, permanent + # failure, exhausted retries). Surfaced for metrics/observability so + # span loss stops being silent. + self._dropped_spans = 0 + + @property + def dropped_spans(self) -> int: + """Cumulative count of spans dropped (never delivered).""" + return self._dropped_spans + + @property + def depth(self) -> int: + """Current number of items waiting in the queue.""" + return self._queue.qsize() + + def _record_drop(self, count: int, reason: str) -> None: + if count <= 0: + return + self._dropped_spans += count + # Warn on the first drop and then sparsely, so a drop storm is visible + # without flooding the log. + if self._dropped_spans == count or self._dropped_spans % 100 < count: + logger.warning( + "Span queue dropped %d span(s) (%s); %d dropped in total", + count, + reason, + self._dropped_spans, + ) def enqueue( self, @@ -50,10 +151,13 @@ def enqueue( processors: list[AsyncTracingProcessor], ) -> None: if self._stopping: - logger.warning("Span queue is shutting down, dropping %s event for span %s", event_type.value, span.id) + self._record_drop(1, "queue shutting down") return self._ensure_drain_running() - self._queue.put_nowait(_SpanQueueItem(event_type=event_type, span=span, processors=processors)) + try: + self._queue.put_nowait(_SpanQueueItem(event_type=event_type, span=span, processors=processors)) + except asyncio.QueueFull: + self._record_drop(1, "queue full") def _ensure_drain_running(self) -> None: if self._drain_task is None or self._drain_task.done(): @@ -69,12 +173,28 @@ async def _drain_loop(self) -> None: first = await self._queue.get() batch: list[_SpanQueueItem] = [first] - # Opportunistically grab more ready items (non-blocking). - while len(batch) < self._batch_size: - try: - batch.append(self._queue.get_nowait()) - except asyncio.QueueEmpty: - break + # Linger briefly so spans emitted within the window coalesce into + # one batch. Stop early when the batch fills, when the linger + # window elapses, or as soon as the queue is briefly empty *after* + # the deadline. + if self._linger_ms > 0 and not self._stopping: + loop = asyncio.get_running_loop() + deadline = loop.time() + (self._linger_ms / 1000.0) + while len(batch) < self._batch_size: + remaining = deadline - loop.time() + if remaining <= 0: + break + try: + batch.append(await asyncio.wait_for(self._queue.get(), timeout=remaining)) + except asyncio.TimeoutError: + break + else: + # No linger — drain whatever is already queued and stop. + while len(batch) < self._batch_size: + try: + batch.append(self._queue.get_nowait()) + except asyncio.QueueEmpty: + break try: # Separate START and END events. Processing all STARTs before @@ -93,8 +213,7 @@ async def _drain_loop(self) -> None: # Release span data for GC. batch.clear() - @staticmethod - async def _process_items(items: list[_SpanQueueItem]) -> None: + async def _process_items(self, items: list[_SpanQueueItem]) -> None: """Dispatch a batch of same-event-type items to each processor in one call. Groups spans by processor so each processor sees its full slice of the @@ -109,26 +228,78 @@ async def _process_items(items: list[_SpanQueueItem]) -> None: "_process_items requires all items to share the same event_type; " "callers must split START and END batches before dispatching." ) - by_processor: dict[AsyncTracingProcessor, list[Span]] = {} + by_processor: dict[AsyncTracingProcessor, list[_SpanQueueItem]] = {} for item in items: for p in item.processors: - by_processor.setdefault(p, []).append(item.span) + by_processor.setdefault(p, []).append(item) - async def _handle(p: AsyncTracingProcessor, spans: list[Span]) -> None: - try: - if event_type == SpanEventType.START: - await p.on_spans_start(spans) - else: - await p.on_spans_end(spans) - except Exception: - logger.exception( - "Tracing processor %s failed handling %d spans during %s", + await asyncio.gather(*[self._handle(p, batch, event_type) for p, batch in by_processor.items()]) + + async def _handle( + self, + p: AsyncTracingProcessor, + items: list[_SpanQueueItem], + event_type: SpanEventType, + ) -> None: + spans = [item.span for item in items] + try: + if event_type == SpanEventType.START: + await p.on_spans_start(spans) + else: + await p.on_spans_end(spans) + except Exception as exc: + self._handle_failure(p, items, event_type, exc) + + def _handle_failure( + self, + p: AsyncTracingProcessor, + items: list[_SpanQueueItem], + event_type: SpanEventType, + exc: Exception, + ) -> None: + # Re-enqueue transient failures, drop everything else. Re-enqueue is + # bounded by max_retries, so even during shutdown the queue's join() + # still terminates after a finite number of passes. + if _is_retryable_exc(exc): + retriable = [item for item in items if item.attempts + 1 < self._max_retries] + exhausted = len(items) - len(retriable) + if exhausted: + self._record_drop(exhausted, f"{type(p).__name__} retries exhausted during {event_type.value}") + for item in retriable: + self._reenqueue(item, p) + if retriable: + logger.warning( + "Tracing processor %s failed handling %d spans during %s (%s); re-enqueued %d for retry", type(p).__name__, - len(spans), + len(items), event_type.value, + type(exc).__name__, + len(retriable), ) + return + + self._record_drop(len(items), f"{type(p).__name__} permanent failure during {event_type.value}") + logger.exception( + "Tracing processor %s failed handling %d spans during %s", + type(p).__name__, + len(items), + event_type.value, + ) - await asyncio.gather(*[_handle(p, spans) for p, spans in by_processor.items()]) + def _reenqueue(self, item: _SpanQueueItem, p: AsyncTracingProcessor) -> None: + """Put a single failed item back on the queue, scoped to the processor + that failed, with an incremented attempt count.""" + try: + self._queue.put_nowait( + _SpanQueueItem( + event_type=item.event_type, + span=item.span, + processors=[p], + attempts=item.attempts + 1, + ) + ) + except asyncio.QueueFull: + self._record_drop(1, "queue full on retry") # ------------------------------------------------------------------ # Shutdown diff --git a/tests/lib/core/tracing/processors/test_agentex_tracing_processor.py b/tests/lib/core/tracing/processors/test_agentex_tracing_processor.py new file mode 100644 index 000000000..ec1ed5e88 --- /dev/null +++ b/tests/lib/core/tracing/processors/test_agentex_tracing_processor.py @@ -0,0 +1,132 @@ +from __future__ import annotations + +import asyncio +import weakref +from unittest.mock import MagicMock, patch + +import pytest + +# AgentexAsyncTracingProcessor pulls in agentex.lib.adk via +# create_async_agentex_client, which in turn imports pydantic_ai at package +# init. Skip these tests cleanly when pydantic_ai isn't installed (the SDK +# dev venv state) so collection doesn't error out. +pytest.importorskip( + "pydantic_ai", + reason="agentex.lib.adk import chain requires pydantic_ai", +) + +# Import the processor module up front so unittest.mock.patch() can resolve +# attributes by string path. The tracing_processor_manager only loads this +# module lazily, so without this explicit import the patches below would fail +# with AttributeError at __enter__ time. +import agentex.lib.core.tracing.processors.agentex_tracing_processor # noqa: E402, F401 + +MODULE = "agentex.lib.core.tracing.processors.agentex_tracing_processor" + + +def _make_config() -> MagicMock: + """Empty config — AgentexTracingProcessorConfig is unused by __init__.""" + return MagicMock() + + +class TestAgentexAsyncTracingProcessor: + """Coverage for the per-event-loop client cache. The SGP processor has + matching tests; mirror them here so a regression in the Agentex side + (e.g. an accidental refactor that switches back to a plain dict, or + drops the lazy lookup) does not slip through unnoticed. + """ + + async def test_client_caches_per_event_loop(self): + """First access builds the client; subsequent accesses in the same + running loop must return the cached instance. + """ + with patch(f"{MODULE}.create_async_agentex_client") as mock_factory: + mock_factory.side_effect = lambda **kwargs: MagicMock() + + from agentex.lib.core.tracing.processors.agentex_tracing_processor import ( + AgentexAsyncTracingProcessor, + ) + + processor = AgentexAsyncTracingProcessor(_make_config()) + + # Construction must not eagerly build the client (no running loop + # guarantee at module import time). + assert mock_factory.call_count == 0 + + c1 = processor.client + c2 = processor.client + c3 = processor.client + + assert mock_factory.call_count == 1, ( + f"Expected client to be built once per loop, but " + f"create_async_agentex_client was called {mock_factory.call_count} times" + ) + assert c1 is c2 is c3 + + async def test_client_keepalive_is_enabled(self): + """Regression guard: the per-loop client must use keepalive — the + whole reason for the per-loop cache. Verify max_keepalive_connections > 0. + """ + import httpx as _httpx + + captured_limits: list[_httpx.Limits] = [] + original_async_client = _httpx.AsyncClient + + def capture_limits(*args, **kwargs): + limits = kwargs.get("limits") + if limits is not None: + captured_limits.append(limits) + return original_async_client(*args, **kwargs) + + with patch(f"{MODULE}.create_async_agentex_client") as mock_factory, patch( + "httpx.AsyncClient", side_effect=capture_limits + ): + mock_factory.side_effect = lambda **kwargs: MagicMock() + + from agentex.lib.core.tracing.processors.agentex_tracing_processor import ( + AgentexAsyncTracingProcessor, + ) + + processor = AgentexAsyncTracingProcessor(_make_config()) + _ = processor.client + + assert len(captured_limits) == 1 + max_keepalive = captured_limits[0].max_keepalive_connections + assert max_keepalive is not None and max_keepalive > 0, ( + f"Agentex async client should have keepalive enabled, got " + f"max_keepalive_connections={max_keepalive}" + ) + + def test_cache_is_weakkeydict_and_evicts_dead_loops(self): + """Regression guard for the id()-reuse bug: the per-loop cache must + be a WeakKeyDictionary so a GC'd loop's entry is evicted. Otherwise + a new loop landing at the same memory address would reuse the dead + loop's client, reintroducing the "bound to a different event loop" + error the per-loop cache was built to prevent. + """ + import gc + + with patch(f"{MODULE}.create_async_agentex_client"): + from agentex.lib.core.tracing.processors.agentex_tracing_processor import ( + AgentexAsyncTracingProcessor, + ) + + processor = AgentexAsyncTracingProcessor(_make_config()) + + # Storage type itself: WeakKeyDictionary, not plain dict. + assert isinstance(processor._clients_by_loop, weakref.WeakKeyDictionary) + + # End-to-end check: insert under a loop, drop the loop, the entry + # must vanish after GC. + loop = asyncio.new_event_loop() + try: + processor._clients_by_loop[loop] = MagicMock() + assert len(processor._clients_by_loop) == 1 + finally: + loop.close() + del loop + gc.collect() + assert len(processor._clients_by_loop) == 0, ( + "WeakKeyDictionary should have evicted the dead loop's entry; " + "remaining keys would cause stale-client reuse on id() recycling." + ) diff --git a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py index 4614fe540..090cd9a09 100644 --- a/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py +++ b/tests/lib/core/tracing/processors/test_sgp_tracing_processor.py @@ -1,6 +1,7 @@ from __future__ import annotations import uuid +import asyncio from datetime import UTC, datetime from unittest.mock import AsyncMock, MagicMock, patch @@ -129,19 +130,20 @@ def _make_processor(): processor = SGPAsyncTracingProcessor(_make_config()) - # Wire up the mock client after construction (constructor stores it) - processor.sgp_async_client = mock_async_client + # Force the per-loop cache to return the mock for whatever loop the + # test runs on, by stubbing _get_client directly. + processor._get_client = lambda: mock_async_client # type: ignore[method-assign] - return processor, mock_create_span + return processor, mock_create_span, mock_async_client def test_processor_holds_no_per_span_state(self): """Stateless processor must not retain any per-span dict between lifecycle events.""" - processor, _ = self._make_processor() + processor, _, _ = self._make_processor() assert not hasattr(processor, "_spans") async def test_span_lifecycle_produces_two_upserts(self): """Each span produces one upsert_batch call on start and one on end.""" - processor, _ = self._make_processor() + processor, _, mock_client = self._make_processor() with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): span = _make_span() @@ -149,7 +151,7 @@ async def test_span_lifecycle_produces_two_upserts(self): span.end_time = datetime.now(UTC) await processor.on_span_end(span) - assert processor.sgp_async_client.spans.upsert_batch.call_count == 2 + assert mock_client.spans.upsert_batch.call_count == 2 async def test_span_end_without_prior_start_still_upserts(self): """Cross-pod Temporal case: END activity lands on a pod that never saw START. @@ -157,7 +159,7 @@ async def test_span_end_without_prior_start_still_upserts(self): Today this used to be a silent no-op. After the stateless refactor it must still upsert a complete span via upsert_batch. """ - processor, _ = self._make_processor() + processor, _, mock_client = self._make_processor() with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): span = _make_span() @@ -165,13 +167,13 @@ async def test_span_end_without_prior_start_still_upserts(self): # No on_span_start — END lands here for the first time. await processor.on_span_end(span) - assert processor.sgp_async_client.spans.upsert_batch.call_count == 1 - items = processor.sgp_async_client.spans.upsert_batch.call_args.kwargs["items"] + assert mock_client.spans.upsert_batch.call_count == 1 + items = mock_client.spans.upsert_batch.call_args.kwargs["items"] assert len(items) == 1 async def test_sgp_span_input_and_output_propagated_on_end(self): """on_span_end should send the span's current input and output via upsert_batch.""" - processor, _ = self._make_processor() + processor, _, mock_client = self._make_processor() captured: list[MagicMock] = [] @@ -196,7 +198,7 @@ def capture_create_span(**kwargs): span.end_time = datetime.now(UTC) await processor.on_span_end(span) - assert processor.sgp_async_client.spans.upsert_batch.call_count == 2 # start + end + assert mock_client.spans.upsert_batch.call_count == 2 # start + end # The end-time SGPSpan should have end_time populated. end_span = captured[-1] assert end_span.end_time is not None @@ -207,36 +209,167 @@ def capture_create_span(**kwargs): async def test_on_spans_start_sends_single_upsert_for_batch(self): """Given N spans at once, on_spans_start should make ONE upsert_batch HTTP call.""" - processor, _ = self._make_processor() + processor, _, mock_client = self._make_processor() n = 10 spans = [_make_span() for _ in range(n)] with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): await processor.on_spans_start(spans) - assert processor.sgp_async_client.spans.upsert_batch.call_count == 1, ( + assert mock_client.spans.upsert_batch.call_count == 1, ( "Batched on_spans_start must make exactly one upsert_batch HTTP call" ) - items = processor.sgp_async_client.spans.upsert_batch.call_args.kwargs["items"] + items = mock_client.spans.upsert_batch.call_args.kwargs["items"] assert len(items) == n + async def test_get_client_caches_per_event_loop(self): + """The processor must keep one client per event loop, and reuse it + across calls within the same loop. This is what enables connection + keepalive instead of paying a TLS handshake per span. + """ + mock_env = MagicMock() + mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None) + + with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch( + f"{MODULE}.AsyncSGPClient" + ) as mock_sgp_cls: + mock_sgp_cls.side_effect = lambda **kwargs: MagicMock() + + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPAsyncTracingProcessor, + ) + + processor = SGPAsyncTracingProcessor(_make_config()) + + # Construction should NOT eagerly build the client (no running + # loop guarantee at import time). + assert mock_sgp_cls.call_count == 0 + + c1 = processor._get_client() + c2 = processor._get_client() + c3 = processor._get_client() + + # First call builds the client; subsequent calls in the same + # loop return the cached one. + assert mock_sgp_cls.call_count == 1, ( + f"Expected client to be built once per loop, but AsyncSGPClient " + f"was called {mock_sgp_cls.call_count} times" + ) + assert c1 is c2 is c3 + + async def test_get_client_keepalive_is_enabled(self): + """Regression guard: the per-loop client must use keepalive (the whole + point of the per-loop cache). Verify max_keepalive_connections > 0. + """ + import httpx as _httpx + + captured_limits: list[_httpx.Limits] = [] + + original_async_client = _httpx.AsyncClient + + def capture_limits(*args, **kwargs): + limits = kwargs.get("limits") + if limits is not None: + captured_limits.append(limits) + return original_async_client(*args, **kwargs) + + mock_env = MagicMock() + mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None) + + with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch( + f"{MODULE}.AsyncSGPClient" + ), patch("httpx.AsyncClient", side_effect=capture_limits): + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPAsyncTracingProcessor, + ) + + processor = SGPAsyncTracingProcessor(_make_config()) + processor._get_client() + + assert len(captured_limits) == 1 + max_keepalive = captured_limits[0].max_keepalive_connections + assert max_keepalive is not None and max_keepalive > 0, ( + f"SGP async client should have keepalive enabled, got " + f"max_keepalive_connections={max_keepalive}" + ) + + def test_cache_is_weakkeydict_and_evicts_dead_loops(self): + """Regression guard for the id()-reuse bug: the per-loop cache must + be a WeakKeyDictionary so a GC'd loop's entry is evicted. Otherwise + a new loop landing at the same memory address would reuse the dead + loop's client, reintroducing the "bound to a different event loop" + error the per-loop cache was built to prevent. + """ + import gc + import weakref + + mock_env = MagicMock() + mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None) + + with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch(f"{MODULE}.AsyncSGPClient"): + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPAsyncTracingProcessor, + ) + + processor = SGPAsyncTracingProcessor(_make_config()) + + # Storage type itself: WeakKeyDictionary, not plain dict. + assert isinstance(processor._clients_by_loop, weakref.WeakKeyDictionary) + + # End-to-end check: insert under a loop, drop the loop, the entry + # must vanish after GC. + loop = asyncio.new_event_loop() + try: + processor._clients_by_loop[loop] = MagicMock() + assert len(processor._clients_by_loop) == 1 + finally: + loop.close() + del loop + gc.collect() + assert len(processor._clients_by_loop) == 0, ( + "WeakKeyDictionary should have evicted the dead loop's entry; " + "remaining keys would cause stale-client reuse on id() recycling." + ) + + async def test_disabled_processor_returns_none_client(self): + """When config is missing api_key/account_id, _get_client must return + None and no HTTP client must be constructed.""" + from agentex.lib.types.tracing import SGPTracingProcessorConfig + + mock_env = MagicMock() + mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None) + + with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch( + f"{MODULE}.AsyncSGPClient" + ) as mock_sgp_cls: + from agentex.lib.core.tracing.processors.sgp_tracing_processor import ( + SGPAsyncTracingProcessor, + ) + + processor = SGPAsyncTracingProcessor( + SGPTracingProcessorConfig(sgp_api_key="", sgp_account_id="") + ) + + assert processor._get_client() is None + assert mock_sgp_cls.call_count == 0 + async def test_on_spans_end_sends_single_upsert_for_batch(self): """Given N spans at once, on_spans_end should make ONE upsert_batch HTTP call.""" - processor, _ = self._make_processor() + processor, _, mock_client = self._make_processor() n = 10 spans = [_make_span() for _ in range(n)] with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()): await processor.on_spans_start(spans) - processor.sgp_async_client.spans.upsert_batch.reset_mock() + mock_client.spans.upsert_batch.reset_mock() for span in spans: span.end_time = datetime.now(UTC) await processor.on_spans_end(spans) - assert processor.sgp_async_client.spans.upsert_batch.call_count == 1, ( + assert mock_client.spans.upsert_batch.call_count == 1, ( "Batched on_spans_end must make exactly one upsert_batch HTTP call" ) - items = processor.sgp_async_client.spans.upsert_batch.call_args.kwargs["items"] + items = mock_client.spans.upsert_batch.call_args.kwargs["items"] assert len(items) == n diff --git a/tests/lib/core/tracing/test_span_queue.py b/tests/lib/core/tracing/test_span_queue.py index 0d6b2fd0a..2e68cf88d 100644 --- a/tests/lib/core/tracing/test_span_queue.py +++ b/tests/lib/core/tracing/test_span_queue.py @@ -205,9 +205,7 @@ async def slow_start(span: Span) -> None: await queue.shutdown() - assert max_concurrency > 1, ( - f"Expected concurrent processing, but max concurrency was {max_concurrency}" - ) + assert max_concurrency > 1, f"Expected concurrent processing, but max concurrency was {max_concurrency}" async def test_batch_faster_than_serial(self): """Batched drain should be significantly faster than serial for slow processors.""" @@ -253,7 +251,7 @@ async def test_mixed_event_types_raise_assertion(self): ] try: - await AsyncSpanQueue._process_items(mixed) + await AsyncSpanQueue()._process_items(mixed) except AssertionError: return else: @@ -288,9 +286,7 @@ async def capture_starts(spans: list[Span]) -> None: await queue.shutdown() # on_spans_start must have been called exactly once with all 5 spans. - assert proc.on_spans_start.call_count == 1, ( - f"Expected one batched call, got {proc.on_spans_start.call_count}" - ) + assert proc.on_spans_start.call_count == 1, f"Expected one batched call, got {proc.on_spans_start.call_count}" assert received == [ids] async def test_batched_end_dispatch_single_call_per_drain(self): @@ -315,6 +311,231 @@ async def capture_ends(spans: list[Span]) -> None: assert received == [ids] +class TestAsyncSpanQueueLinger: + """The drain loop should linger briefly after the first item arrives so + that concurrently-emitted spans coalesce into one batch, instead of each + span producing its own size-1 drain cycle. + """ + + async def test_linger_coalesces_staggered_enqueues_into_one_batch(self): + """Spans enqueued a few ms apart should land in the SAME drain batch + when the linger window is wider than the gap between them. + """ + received: list[list[str]] = [] + + async def capture_starts(spans: list[Span]) -> None: + received.append([s.id for s in spans]) + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=capture_starts) + proc.on_spans_end = AsyncMock() + + # Linger of 100ms; we enqueue 3 items 20ms apart, well inside the window. + queue = AsyncSpanQueue(linger_ms=100) + + for i in range(3): + queue.enqueue(SpanEventType.START, _make_span(f"span-{i}"), [proc]) + await asyncio.sleep(0.02) + + await queue.shutdown() + + # All three should arrive in one batched call thanks to the linger. + assert proc.on_spans_start.call_count == 1, ( + f"Expected one batch from linger-coalesced enqueues, got " + f"{proc.on_spans_start.call_count} batches: {received}" + ) + assert received == [["span-0", "span-1", "span-2"]] + + async def test_linger_zero_drains_immediately(self): + """With linger_ms=0, the drain loop should NOT wait — staggered + enqueues produce separate batches (back-compat with prior behavior). + """ + received: list[list[str]] = [] + + async def capture_starts(spans: list[Span]) -> None: + received.append([s.id for s in spans]) + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=capture_starts) + proc.on_spans_end = AsyncMock() + + queue = AsyncSpanQueue(linger_ms=0) + + for i in range(3): + queue.enqueue(SpanEventType.START, _make_span(f"span-{i}"), [proc]) + # Give the drain loop time to pick up and process each one. + await asyncio.sleep(0.05) + + await queue.shutdown() + + # With no linger, each staggered enqueue produces its own batch. + assert proc.on_spans_start.call_count == 3, ( + f"Expected three size-1 batches without linger, got {proc.on_spans_start.call_count}: {received}" + ) + + async def test_linger_respects_batch_size_cap(self): + """The linger must not push batches over batch_size.""" + received: list[list[str]] = [] + + async def capture_starts(spans: list[Span]) -> None: + received.append([s.id for s in spans]) + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=capture_starts) + proc.on_spans_end = AsyncMock() + + # Tight batch cap, linger wide enough to coalesce but not so large + # that the tail singleton stalls the test for hundreds of ms. + queue = AsyncSpanQueue(batch_size=3, linger_ms=50) + + ids = [f"span-{i}" for i in range(7)] + for i in ids: + queue.enqueue(SpanEventType.START, _make_span(i), [proc]) + + await queue.shutdown() + + # 7 spans / batch_size=3 ⇒ at least 3 batches (3, 3, 1). None should + # exceed the cap. + for batch in received: + assert len(batch) <= 3, f"Batch exceeded cap: {batch}" + assert sum(len(b) for b in received) == 7 + + +class _FakeHTTPError(Exception): + """Mimics an SGP/httpx status error: carries a ``status_code`` attribute.""" + + def __init__(self, status_code: int) -> None: + self.status_code = status_code + super().__init__(f"HTTP {status_code}") + + +class TestAsyncSpanQueueDropObservability: + """Silent span loss should be counted so it is measurable, and a bounded + queue should shed load deterministically instead of growing without limit. + """ + + async def test_full_queue_drops_are_counted(self): + release = asyncio.Event() + + async def block_first(spans: list[Span]) -> None: + # Block the drain on its first batch so the queue can fill behind it. + await release.wait() + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=block_first) + proc.on_spans_end = AsyncMock() + + # max_size=1, no linger: the drain pulls item-0 and blocks; item-1 fills + # the queue; items 2 and 3 are dropped. + queue = AsyncSpanQueue(max_size=1, linger_ms=0) + + queue.enqueue(SpanEventType.START, _make_span("s0"), [proc]) + await asyncio.sleep(0.02) # let the drain pick up s0 and block + queue.enqueue(SpanEventType.START, _make_span("s1"), [proc]) + queue.enqueue(SpanEventType.START, _make_span("s2"), [proc]) + queue.enqueue(SpanEventType.START, _make_span("s3"), [proc]) + + assert queue.dropped_spans == 2, f"expected 2 dropped, got {queue.dropped_spans}" + + release.set() + await queue.shutdown() + + async def test_no_drops_under_normal_load(self): + proc = _make_processor() + queue = AsyncSpanQueue() + for i in range(5): + queue.enqueue(SpanEventType.START, _make_span(f"s{i}"), [proc]) + await queue.shutdown() + assert queue.dropped_spans == 0 + + +class TestAsyncSpanQueueRetry: + """Transient HTTP failures (429/5xx) should be re-enqueued up to a bounded + number of attempts; auth/other errors must be dropped (and counted), never + retried. + """ + + async def test_retryable_status_is_reenqueued_and_eventually_succeeds(self): + attempts = 0 + + async def fail_then_succeed(spans: list[Span]) -> None: + nonlocal attempts + attempts += 1 + if attempts == 1: + raise _FakeHTTPError(503) + # second attempt succeeds + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=fail_then_succeed) + proc.on_spans_end = AsyncMock() + + queue = AsyncSpanQueue(max_retries=3, linger_ms=0) + queue.enqueue(SpanEventType.START, _make_span("s0"), [proc]) + await queue.shutdown() + + assert attempts == 2, "503 should be retried once, then succeed" + assert queue.dropped_spans == 0, "successful retry must not count as a drop" + + async def test_non_retryable_status_is_dropped_not_retried(self): + attempts = 0 + + async def always_401(spans: list[Span]) -> None: + nonlocal attempts + attempts += 1 + raise _FakeHTTPError(401) + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=always_401) + proc.on_spans_end = AsyncMock() + + queue = AsyncSpanQueue(max_retries=3, linger_ms=0) + queue.enqueue(SpanEventType.START, _make_span("s0"), [proc]) + await queue.shutdown() + + assert attempts == 1, "401 is non-retryable — must be tried exactly once" + assert queue.dropped_spans == 1 + + async def test_non_http_exception_is_not_retried(self): + """A plain bug (no status_code) must not be retried into an infinite + loop — preserves the original drain-continues-on-error contract.""" + attempts = 0 + + async def boom(spans: list[Span]) -> None: + nonlocal attempts + attempts += 1 + raise RuntimeError("bug, not transient") + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=boom) + proc.on_spans_end = AsyncMock() + + queue = AsyncSpanQueue(max_retries=3, linger_ms=0) + queue.enqueue(SpanEventType.START, _make_span("s0"), [proc]) + await queue.shutdown() + + assert attempts == 1 + assert queue.dropped_spans == 1 + + async def test_retryable_exhausts_attempts_then_drops(self): + attempts = 0 + + async def always_503(spans: list[Span]) -> None: + nonlocal attempts + attempts += 1 + raise _FakeHTTPError(503) + + proc = AsyncMock() + proc.on_spans_start = AsyncMock(side_effect=always_503) + proc.on_spans_end = AsyncMock() + + queue = AsyncSpanQueue(max_retries=3, linger_ms=0) + queue.enqueue(SpanEventType.START, _make_span("s0"), [proc]) + await queue.shutdown() + + assert attempts == 3, "should try up to max_retries times" + assert queue.dropped_spans == 1 + + class TestAsyncSpanQueueIntegration: async def test_integration_with_async_trace(self): call_log: list[tuple[str, str]] = []