Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "0",
"metadata": {},
"outputs": [],
"source": [
"from agentex import Agentex\n",
"\n",
"client = Agentex(base_url=\"http://localhost:5003\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1",
"metadata": {},
"outputs": [],
"source": [
"AGENT_NAME = \"s010-multiturn\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2",
"metadata": {},
"outputs": [],
"source": [
"# # (Optional) Create a new task. If you don't create a new task, each message will be sent to a new task. The server will create the task for you.\n",
"\n",
"# import uuid\n",
"\n",
"# TASK_ID = str(uuid.uuid4())[:8]\n",
"\n",
"# rpc_response = client.agents.rpc_by_name(\n",
"# agent_name=AGENT_NAME,\n",
"# method=\"task/create\",\n",
"# params={\n",
"# \"name\": f\"{TASK_ID}-task\",\n",
"# \"params\": {}\n",
"# }\n",
"# )\n",
"\n",
"# task = rpc_response.result\n",
"# print(task)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3",
"metadata": {},
"outputs": [],
"source": [
"# Test non streaming response\n",
"from agentex.types import TextContent\n",
"\n",
"# The response is expected to be a list of TaskMessage objects, which is a union of the following types:\n",
"# - TextContent: A message with just text content \n",
"# - DataContent: A message with JSON-serializable data content\n",
"# - ToolRequestContent: A message with a tool request, which contains a JSON-serializable request to call a tool\n",
"# - ToolResponseContent: A message with a tool response, which contains response object from a tool call in its content\n",
"\n",
"# When processing the message/send response, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n",
"\n",
"rpc_response = client.agents.send_message(\n",
" agent_name=AGENT_NAME,\n",
" params={\n",
" \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n",
" \"stream\": False\n",
" }\n",
")\n",
"\n",
"if not rpc_response or not rpc_response.result:\n",
" raise ValueError(\"No result in response\")\n",
"\n",
"# Extract and print just the text content from the response\n",
"for task_message in rpc_response.result:\n",
" content = task_message.content\n",
" if isinstance(content, TextContent):\n",
" text = content.content\n",
" print(text)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4",
"metadata": {},
"outputs": [],
"source": [
"# Test streaming response\n",
"from agentex.types.text_delta import TextDelta\n",
"from agentex.types.task_message_update import StreamTaskMessageFull, StreamTaskMessageDelta\n",
"\n",
"# The result object of message/send will be a TaskMessageUpdate which is a union of the following types:\n",
"# - StreamTaskMessageStart: \n",
"# - An indicator that a streaming message was started, doesn't contain any useful content\n",
"# - StreamTaskMessageDelta: \n",
"# - A delta of a streaming message, contains the text delta to aggregate\n",
"# - StreamTaskMessageDone: \n",
"# - An indicator that a streaming message was done, doesn't contain any useful content\n",
"# - StreamTaskMessageFull: \n",
"# - A non-streaming message, there is nothing to aggregate, since this contains the full message, not deltas\n",
"\n",
"# Whenn processing StreamTaskMessageDelta, if you are expecting more than TextDeltas, such as DataDelta, ToolRequestDelta, or ToolResponseDelta, you can process them as well\n",
"# Whenn processing StreamTaskMessageFull, if you are expecting more than TextContent, such as DataContent, ToolRequestContent, or ToolResponseContent, you can process them as well\n",
"\n",
"for agent_rpc_response_chunk in client.agents.send_message_stream(\n",
" agent_name=AGENT_NAME,\n",
" params={\n",
" \"content\": {\"type\": \"text\", \"author\": \"user\", \"content\": \"Hello what can you do?\"},\n",
" \"stream\": True\n",
" }\n",
"):\n",
" # We know that the result of the message/send when stream is set to True will be a TaskMessageUpdate\n",
" task_message_update = agent_rpc_response_chunk.result\n",
" # Print oly the text deltas as they arrive or any full messages\n",
" if isinstance(task_message_update, StreamTaskMessageDelta):\n",
" delta = task_message_update.delta\n",
" if isinstance(delta, TextDelta):\n",
" print(delta.text_delta, end=\"\", flush=True)\n",
" else:\n",
" print(f\"Found non-text {type(task_message)} object in streaming message.\")\n",
" elif isinstance(task_message_update, StreamTaskMessageFull):\n",
" content = task_message_update.content\n",
" if isinstance(content, TextContent):\n",
" print(content.content)\n",
" else:\n",
" print(f\"Found non-text {type(task_message)} object in full message.\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.9"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
4 changes: 2 additions & 2 deletions examples/tutorials/00_sync/010_multiturn/dev.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
Expand All @@ -158,7 +158,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.9"
"version": "3.14.2"
}
},
"nbformat": 4,
Expand Down
11 changes: 5 additions & 6 deletions src/agentex/lib/adk/_modules/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,13 @@ def _tracing_service(self) -> TracingService:
if self._tracing_service_lazy is None or (loop_id is not None and loop_id != self._bound_loop_id):
import httpx

# Disable keepalive so each span HTTP call gets a fresh TCP
# connection. Reused connections carry asyncio primitives bound
# to the event loop that created them; in sync-ACP / streaming
# contexts the loop context can shift between calls, causing
# "bound to a different event loop" RuntimeErrors.
# Keepalive ON: connections are reused within a single event
# loop, eliminating the TLS-handshake-per-span penalty under
# load. Cross-loop safety is preserved by rebuilding the
# client whenever loop_id changes (the conditional above).
agentex_client = create_async_agentex_client(
http_client=httpx.AsyncClient(
limits=httpx.Limits(max_keepalive_connections=0),
limits=httpx.Limits(max_keepalive_connections=20),
),
)
tracer = AsyncTracer(agentex_client)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import Any, Dict, override
import asyncio
import weakref
from typing import TYPE_CHECKING, Any, Dict, override

from agentex import Agentex
from agentex.types.span import Span
Expand All @@ -9,6 +11,9 @@
AsyncTracingProcessor,
)

if TYPE_CHECKING:
from agentex import AsyncAgentex


class AgentexSyncTracingProcessor(SyncTracingProcessor):
def __init__(self, config: AgentexTracingProcessorConfig): # noqa: ARG002
Expand Down Expand Up @@ -67,19 +72,40 @@ def shutdown(self) -> None:

class AgentexAsyncTracingProcessor(AsyncTracingProcessor):
def __init__(self, config: AgentexTracingProcessorConfig): # noqa: ARG002
# Per-event-loop client cache. httpx.AsyncClient is bound to the
# loop that created it, so in sync-ACP / streaming contexts (where
# the active loop can change between requests) we keep one client
# per loop instead of disabling keepalive entirely. The cache is a
# WeakKeyDictionary so a GC'd loop and its client are evicted
# automatically — using id() as a key would reuse entries when
# CPython recycles a freed loop's memory address.
self._clients_by_loop: weakref.WeakKeyDictionary[
asyncio.AbstractEventLoop, "AsyncAgentex"
] = weakref.WeakKeyDictionary()

def _build_client(self) -> "AsyncAgentex":
import httpx

# Disable keepalive so each span HTTP call gets a fresh TCP connection.
# Reused connections carry asyncio primitives bound to the event loop
# that created them; in sync-ACP / streaming contexts the loop context
# can shift between calls, causing "bound to a different event loop"
# RuntimeErrors.
self.client = create_async_agentex_client(
# Keepalive ON: connections are reused within a single event loop,
# eliminating the TLS-handshake-per-span penalty under load.
return create_async_agentex_client(
http_client=httpx.AsyncClient(
limits=httpx.Limits(max_keepalive_connections=0),
limits=httpx.Limits(max_keepalive_connections=20),
),
)

@property
def client(self) -> "AsyncAgentex":
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return self._build_client()
client = self._clients_by_loop.get(loop)
if client is None:
client = self._build_client()
self._clients_by_loop[loop] = client
return client

# TODO(AGX1-199): Add batch create/update endpoints to Agentex API and use
# them here instead of one HTTP call per span.
# https://linear.app/scale-epd/issue/AGX1-199/add-agentex-batch-endpoint-for-traces
Expand Down
79 changes: 53 additions & 26 deletions src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import asyncio
import weakref
from typing import cast, override

import scale_gp_beta.lib.tracing as tracing
Expand Down Expand Up @@ -92,23 +94,50 @@ def shutdown(self) -> None:
class SGPAsyncTracingProcessor(AsyncTracingProcessor):
def __init__(self, config: SGPTracingProcessorConfig):
self.disabled = config.sgp_api_key == "" or config.sgp_account_id == ""
self._config = config
# Per-event-loop client cache. httpx.AsyncClient ties its connection
# pool to the loop it was created on; in sync-ACP / streaming contexts
# the active loop can change between requests. Caching per loop lets
# us keep keepalive on within each loop while staying safe across
# loops. The cache is a WeakKeyDictionary so a GC'd loop and its
# client are evicted automatically — using id() as a key would reuse
# entries when CPython recycles a freed loop's memory address.
self._clients_by_loop: weakref.WeakKeyDictionary[
asyncio.AbstractEventLoop, AsyncSGPClient
] = weakref.WeakKeyDictionary()
self.env_vars = EnvironmentVariables.refresh()

def _build_client(self) -> AsyncSGPClient:
import httpx

# Disable keepalive so each HTTP call gets a fresh TCP connection,
# avoiding "bound to a different event loop" errors in sync-ACP.
self.sgp_async_client = (
AsyncSGPClient(
api_key=config.sgp_api_key,
account_id=config.sgp_account_id,
base_url=config.sgp_base_url,
http_client=httpx.AsyncClient(
limits=httpx.Limits(max_keepalive_connections=0),
),
)
if not self.disabled
else None
return AsyncSGPClient(
api_key=self._config.sgp_api_key,
account_id=self._config.sgp_account_id,
base_url=self._config.sgp_base_url,
# Keepalive ON: connections are reused within a single event loop,
# which removes the TLS-handshake-per-span penalty observed under
# load. Cross-loop safety is preserved by the per-loop cache.
http_client=httpx.AsyncClient(
limits=httpx.Limits(max_keepalive_connections=20),
),
)
self.env_vars = EnvironmentVariables.refresh()

def _get_client(self) -> AsyncSGPClient | None:
"""Return the AsyncSGPClient bound to the current event loop, creating
one on first use. Returns None when the processor is disabled."""
if self.disabled:
return None
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# Called from outside an event loop — should not happen on the
# hot path, but build a one-off client rather than crashing.
return self._build_client()
client = self._clients_by_loop.get(loop)
if client is None:
client = self._build_client()
self._clients_by_loop[loop] = client
return client

@override
async def on_span_start(self, span: Span) -> None:
Expand All @@ -123,31 +152,29 @@ async def on_spans_start(self, spans: list[Span]) -> None:
if not spans:
return

sgp_spans = [_build_sgp_span(span, self.env_vars) for span in spans]

if self.disabled:
client = self._get_client()
if client is None:
logger.warning("SGP is disabled, skipping span upsert")
return
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
items=[s.to_request_params() for s in sgp_spans]
)

sgp_spans = [_build_sgp_span(span, self.env_vars) for span in spans]
await client.spans.upsert_batch(items=[s.to_request_params() for s in sgp_spans])

@override
async def on_spans_end(self, spans: list[Span]) -> None:
if not spans:
return

client = self._get_client()
if client is None:
return

sgp_spans: list[SGPSpan] = []
for span in spans:
sgp_span = _build_sgp_span(span, self.env_vars)
sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr]
sgp_spans.append(sgp_span)

if self.disabled:
return
await self.sgp_async_client.spans.upsert_batch( # type: ignore[union-attr]
items=[s.to_request_params() for s in sgp_spans]
)
await client.spans.upsert_batch(items=[s.to_request_params() for s in sgp_spans])

@override
async def shutdown(self) -> None:
Expand Down
Loading
Loading