Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 112 additions & 31 deletions src/agentex/lib/core/tracing/span_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
_DEFAULT_MAX_SIZE = 0
# Total attempts per batch for a *transient* failure (1 == no retry).
_DEFAULT_MAX_RETRIES = 1
# Max number of batch-export HTTP requests in flight at once. The export
# backend (EGP) processes each upsert_batch in ~150ms but serves many requests
# concurrently; issuing one batch at a time caps per-pod egress at ~1/latency.
# Sending several concurrently lets a pod keep up with span production under
# load. ``1`` restores the old strictly-serial behavior.
_DEFAULT_CONCURRENCY = 8
# HTTP statuses worth retrying at the queue level. These are explicit
# backpressure / transient signals; everything else (esp. 401/403/4xx auth and
# validation errors) is a permanent failure that re-enqueuing cannot fix. Note
Expand Down Expand Up @@ -76,15 +82,23 @@ class AsyncSpanQueue:
"""Background FIFO queue for async span processing.

Span events are enqueued synchronously (non-blocking) and drained by a
background task. Items are processed in batches: all START events in a
batch are flushed concurrently, then all END events, so that per-span
start-before-end ordering is preserved while HTTP calls for independent
spans execute in parallel.

Once the drain loop picks up the first item, it lingers up to
``linger_ms`` waiting for more items to coalesce into the same batch.
Without the linger the drain almost always returned size-1 batches under
real agent workloads, because spans typically arrive a few ms apart.
background task. The drain coalesces ready events into batches and
*dispatches* each batch's export as its own task, so up to ``concurrency``
batch requests can be in flight at once. This matters because each
``upsert_batch`` HTTP call takes tens-to-hundreds of ms server-side; issuing
them one at a time caps a pod's egress at ~1/latency and lets a backlog
build under load.

Ordering guarantee: a span's START export always completes before its END
export is issued. END batches wait on the START batches that were in flight
when they were formed; because a span's START is always enqueued before its
END, that span's START send is either still in flight (and waited on) or
already finished. Independent spans export fully concurrently.

Once the drain loop picks up the first item, it lingers up to ``linger_ms``
waiting for more items to coalesce into the same batch. Without the linger
the drain almost always returned size-1 batches under real agent workloads,
because spans typically arrive a few ms apart.

Reliability:
- ``max_size`` bounds the queue. When full, new events are dropped and
Expand All @@ -101,6 +115,7 @@ def __init__(
linger_ms: int | None = None,
max_size: int | None = None,
max_retries: int | None = None,
concurrency: int | None = None,
) -> None:
resolved_max_size = (
_read_int_env("AGENTEX_SPAN_QUEUE_MAX_SIZE", _DEFAULT_MAX_SIZE) if max_size is None else max(0, max_size)
Expand All @@ -115,6 +130,17 @@ def __init__(
if max_retries is None
else max(1, max_retries)
)
self._concurrency = (
_read_int_env("AGENTEX_SPAN_QUEUE_CONCURRENCY", _DEFAULT_CONCURRENCY, minimum=1)
if concurrency is None
else max(1, concurrency)
)
# Bounds concurrent export HTTP requests.
self._send_sema = asyncio.Semaphore(self._concurrency)
# Outstanding dispatched send tasks, and the subset that are START
# sends (END sends wait on these to preserve per-span ordering).
self._inflight: set[asyncio.Task[None]] = set()
self._inflight_starts: set[asyncio.Task[None]] = set()
# Total spans dropped for any reason (full queue, shutdown, permanent
# failure, exhausted retries). Surfaced for metrics/observability so
# span loss stops being silent.
Expand Down Expand Up @@ -169,6 +195,11 @@ def _ensure_drain_running(self) -> None:

async def _drain_loop(self) -> None:
while True:
# Backpressure: cap the number of in-flight send tasks so the drain
# does not run unboundedly ahead of the exporters.
while len(self._inflight) >= self._concurrency:
await asyncio.wait(set(self._inflight), return_when=asyncio.FIRST_COMPLETED)

# Block until at least one item is available.
first = await self._queue.get()
batch: list[_SpanQueueItem] = [first]
Expand Down Expand Up @@ -196,22 +227,47 @@ async def _drain_loop(self) -> None:
except asyncio.QueueEmpty:
break

try:
# Separate START and END events. Processing all STARTs before
# ENDs ensures that on_span_start completes before on_span_end
# for any span whose both events land in the same batch.
starts = [i for i in batch if i.event_type == SpanEventType.START]
ends = [i for i in batch if i.event_type == SpanEventType.END]

if starts:
await self._process_items(starts)
if ends:
await self._process_items(ends)
finally:
for _ in batch:
self._queue.task_done()
# Release span data for GC.
batch.clear()
# Separate START and END events and dispatch each as its own send
# task. Dispatching STARTs first (so they are registered before the
# END snapshot) guarantees an END never outruns a START of the same
# span whose events land in this batch.
starts = [i for i in batch if i.event_type == SpanEventType.START]
ends = [i for i in batch if i.event_type == SpanEventType.END]
if starts:
self._dispatch(starts, SpanEventType.START)
if ends:
# Re-check backpressure before the second dispatch so a batch
# carrying both event types can't push _inflight past the cap.
while len(self._inflight) >= self._concurrency:
await asyncio.wait(set(self._inflight), return_when=asyncio.FIRST_COMPLETED)
self._dispatch(ends, SpanEventType.END)
Comment thread
smoreinis marked this conversation as resolved.

def _dispatch(self, items: list[_SpanQueueItem], event_type: SpanEventType) -> None:
"""Spawn a background task to export ``items``.

END sends snapshot the currently in-flight START tasks and wait for them
before issuing, preserving the per-span START-before-END invariant.
"""
barrier = tuple(self._inflight_starts) if event_type == SpanEventType.END else ()
task = asyncio.create_task(self._run_send(items, barrier))
self._inflight.add(task)
task.add_done_callback(self._inflight.discard)
if event_type == SpanEventType.START:
self._inflight_starts.add(task)
task.add_done_callback(self._inflight_starts.discard)

async def _run_send(self, items: list[_SpanQueueItem], barrier: tuple[asyncio.Task[None], ...]) -> None:
try:
if barrier:
# Wait for the START sends this END batch depends on. Their
# exceptions are irrelevant here — we only need them finished.
await asyncio.gather(*barrier, return_exceptions=True)
await self._process_items(items)
finally:
# Mark every item done so shutdown's queue.join() can complete only
# once all sends (and their retries) have finished.
for _ in items:
self._queue.task_done()

async def _process_items(self, items: list[_SpanQueueItem]) -> None:
"""Dispatch a batch of same-event-type items to each processor in one call.
Expand Down Expand Up @@ -243,10 +299,12 @@ async def _handle(
) -> None:
spans = [item.span for item in items]
try:
if event_type == SpanEventType.START:
await p.on_spans_start(spans)
else:
await p.on_spans_end(spans)
# Hold a concurrency slot only for the duration of the HTTP call.
async with self._send_sema:
if event_type == SpanEventType.START:
await p.on_spans_start(spans)
else:
await p.on_spans_end(spans)
except Exception as exc:
self._handle_failure(p, items, event_type, exc)

Expand Down Expand Up @@ -288,7 +346,14 @@ def _handle_failure(

def _reenqueue(self, item: _SpanQueueItem, p: AsyncTracingProcessor) -> None:
"""Put a single failed item back on the queue, scoped to the processor
that failed, with an incremented attempt count."""
that failed, with an incremented attempt count.

NOTE: a re-enqueued START goes to the *back* of the queue. If an END
for the same span is dispatched concurrently before this START is picked
up again, the END's barrier snapshot won't contain it, breaking the
START-before-END guarantee for that span. This is benign at the default
``max_retries=1`` (retries disabled) but must be addressed before
enabling retries by default."""
try:
self._queue.put_nowait(
_SpanQueueItem(
Expand All @@ -307,21 +372,37 @@ def _reenqueue(self, item: _SpanQueueItem, p: AsyncTracingProcessor) -> None:

async def shutdown(self, timeout: float = 30.0) -> None:
self._stopping = True
if self._queue.empty() and (self._drain_task is None or self._drain_task.done()):
drain_idle = self._drain_task is None or self._drain_task.done()
if self._queue.empty() and drain_idle and not self._inflight:
return

timed_out = False
try:
# join() returns once every enqueued (and re-enqueued) item has been
# marked done by its send task.
await asyncio.wait_for(self._queue.join(), timeout=timeout)
except asyncio.TimeoutError:
timed_out = True
logger.warning(
"Span queue shutdown timed out after %.1fs with %d items remaining", timeout, self._queue.qsize()
)

if self._drain_task is not None and not self._drain_task.done():
self._drain_task.cancel()
try:
await self._drain_task
except asyncio.CancelledError:
pass

# Clean up any in-flight send tasks. On a clean shutdown these are
# already finishing; on timeout, cancel the stragglers so we don't hang.
inflight = list(self._inflight)
if inflight:
if timed_out:
for task in inflight:
task.cancel()
await asyncio.gather(*inflight, return_exceptions=True)


_default_span_queue: AsyncSpanQueue | None = None

Expand Down
119 changes: 116 additions & 3 deletions tests/lib/core/tracing/test_span_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,10 @@ async def block_first(spans: list[Span]) -> None:
proc.on_spans_start = AsyncMock(side_effect=block_first)
proc.on_spans_end = AsyncMock()

# max_size=1, no linger: the drain pulls item-0 and blocks; item-1 fills
# the queue; items 2 and 3 are dropped.
queue = AsyncSpanQueue(max_size=1, linger_ms=0)
# max_size=1, no linger, concurrency=1: the drain dispatches item-0 and
# then blocks at the in-flight cap; item-1 fills the queue; items 2 and 3
# are dropped.
queue = AsyncSpanQueue(max_size=1, linger_ms=0, concurrency=1)

queue.enqueue(SpanEventType.START, _make_span("s0"), [proc])
await asyncio.sleep(0.02) # let the drain pick up s0 and block
Expand Down Expand Up @@ -536,6 +537,118 @@ async def always_503(spans: list[Span]) -> None:
assert queue.dropped_spans == 1


class TestAsyncSpanQueueConcurrency:
"""Span export should issue multiple batch requests concurrently (bounded),
so per-pod egress isn't capped at one in-flight request — while still
guaranteeing a span's START send completes before its END send.
"""

async def test_batches_dispatched_concurrently_up_to_bound(self):
current = 0
max_seen = 0
lock = asyncio.Lock()

async def slow_start(spans: list[Span]) -> None:
nonlocal current, max_seen
async with lock:
current += 1
max_seen = max(max_seen, current)
await asyncio.sleep(0.05)
async with lock:
current -= 1

proc = AsyncMock()
proc.on_spans_start = AsyncMock(side_effect=slow_start)
proc.on_spans_end = AsyncMock()

# batch_size=1 → each span is its own batch/send; concurrency=4 caps
# simultaneous in-flight sends.
queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=4)
for i in range(8):
queue.enqueue(SpanEventType.START, _make_span(f"s{i}"), [proc])

await queue.shutdown()

assert proc.on_spans_start.call_count == 8
assert 2 <= max_seen <= 4, f"expected bounded concurrency (2..4), saw {max_seen}"

async def test_concurrency_one_serializes(self):
current = 0
max_seen = 0
lock = asyncio.Lock()

async def slow_start(spans: list[Span]) -> None:
nonlocal current, max_seen
async with lock:
current += 1
max_seen = max(max_seen, current)
await asyncio.sleep(0.03)
async with lock:
current -= 1

proc = AsyncMock()
proc.on_spans_start = AsyncMock(side_effect=slow_start)
proc.on_spans_end = AsyncMock()

queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=1)
for i in range(4):
queue.enqueue(SpanEventType.START, _make_span(f"s{i}"), [proc])

await queue.shutdown()

assert max_seen == 1, f"concurrency=1 must serialize sends, saw {max_seen}"

async def test_concurrent_is_faster_than_serial(self):
async def slow_start(spans: list[Span]) -> None:
await asyncio.sleep(0.05)

proc = AsyncMock()
proc.on_spans_start = AsyncMock(side_effect=slow_start)
proc.on_spans_end = AsyncMock()

queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=8)
for i in range(8):
queue.enqueue(SpanEventType.START, _make_span(f"s{i}"), [proc])

start = time.monotonic()
await queue.shutdown()
elapsed = time.monotonic() - start

serial = 8 * 0.05
assert elapsed < serial * 0.5, f"concurrent drain took {elapsed:.3f}s; serial would be {serial:.3f}s"

async def test_end_waits_for_start_of_same_span(self):
"""The per-span ordering invariant: a span's END upsert must not be sent
until its START upsert has completed, even with concurrency enabled."""
log: list[tuple[str, str]] = []

async def on_start(spans: list[Span]) -> None:
log.append(("start_enter", spans[0].id))
await asyncio.sleep(0.05)
log.append(("start_exit", spans[0].id))

async def on_end(spans: list[Span]) -> None:
log.append(("end_enter", spans[0].id))
await asyncio.sleep(0.01)
log.append(("end_exit", spans[0].id))

proc = AsyncMock()
proc.on_spans_start = AsyncMock(side_effect=on_start)
proc.on_spans_end = AsyncMock(side_effect=on_end)

queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=4)
queue.enqueue(SpanEventType.START, _make_span("A"), [proc])
await asyncio.sleep(0.01) # let the START send begin (and block on sleep)
queue.enqueue(SpanEventType.END, _make_span("A"), [proc])

await queue.shutdown()

# END must not enter until START has exited for the same span.
start_exit = log.index(("start_exit", "A"))
end_enter = log.index(("end_enter", "A"))
assert start_exit < end_enter, f"END began before START completed: {log}"


class TestAsyncSpanQueueIntegration:
async def test_integration_with_async_trace(self):
call_log: list[tuple[str, str]] = []
Expand Down
Loading