Skip to content
200 changes: 197 additions & 3 deletions agentkit/toolkit/cli/cli_invoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

from pathlib import Path
from typing import Optional, Any
import base64
import binascii
import json
import typer
from typer.core import TyperGroup
Expand Down Expand Up @@ -627,6 +629,160 @@ def build_harness_overrides(
return overrides


# Fixed ADK app name for the run_sse path. The harness loader serves its single
# agent under any app name, so a stable constant keeps the CLI decoupled from the
# deployed HARNESS_NAME.
_HARNESS_RUN_SSE_APP = "harness"


def _user_id_from_token(token: str) -> str | None:
"""Return the OIDC ``sub`` claim from a JWT bearer token, else ``None``.

A custom_jwt harness is called with an OIDC id_token whose ``sub`` is the
authenticated user's stable id — use it as the run's user_id so sessions are
tied to the real identity. A key_auth token is an opaque api key (not a JWT,
no ``sub``), so this returns ``None`` and the caller falls back to a random id.
"""
parts = token.split(".")
if len(parts) != 3:
return None # not a JWT (e.g. a key_auth api key)
payload = parts[1]
payload += "=" * (-len(payload) % 4) # restore base64 padding
try:
claims = json.loads(base64.urlsafe_b64decode(payload))
except (ValueError, binascii.Error):
return None # malformed JWT payload → fall back to random
sub = claims.get("sub")
return sub if isinstance(sub, str) and sub else None


def _harness_run_sse(
*,
base_url: str,
token: str,
prompt: str,
session_id: str,
overrides: dict,
raw: bool,
) -> Any:
"""Invoke a deployed harness via the ADK ``/run_sse`` endpoint (streaming).

app_name is the fixed ``"harness"``; user_id is the JWT ``sub`` when the token
is an OIDC id_token, else a random id; session_id is the caller's. When
``overrides`` is non-empty it is sent as the ``harness`` field so the runtime
streams a spawned (overridden) agent; otherwise the base agent.
"""
import requests

app_name = _HARNESS_RUN_SSE_APP
sub = _user_id_from_token(token)
user_id = sub or f"u-{uuid.uuid4().hex[:12]}"
user_id_origin = "jwt sub" if sub else "random"
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
console.print(
f"[blue]run_sse: app_name={app_name}, user_id={user_id} ({user_id_origin}), "
f"session_id={session_id}[/blue]"
)

# ADK /run_sse requires an existing session; create it (ignore "exists").
session_url = f"{base_url}/apps/{app_name}/users/{user_id}/sessions/{session_id}"
try:
sr = requests.post(session_url, json={}, headers=headers, timeout=60)
except requests.RequestException as e:
console.print(f"[red]❌ Session create failed: {e}[/red]")
raise typer.Exit(1)
if sr.status_code not in (200, 400, 409):
console.print(
f"[red]❌ Session create HTTP {sr.status_code}: {sr.text[:300]}[/red]"
)
raise typer.Exit(1)

body: dict[str, Any] = {
"app_name": app_name,
"user_id": user_id,
"session_id": session_id,
"new_message": {"role": "user", "parts": [{"text": prompt}]},
"streaming": True,
}
if overrides:
body["harness"] = overrides
console.print(f"[blue]Using one-time overrides: {overrides}[/blue]")
try:
resp = requests.post(
f"{base_url}/run_sse",
json=body,
headers=headers,
timeout=300,
stream=True,
)
except requests.RequestException as e:
console.print(f"[red]❌ run_sse request failed: {e}[/red]")
raise typer.Exit(1)
if resp.status_code != 200:
console.print(
f"[red]❌ run_sse HTTP {resp.status_code}: {resp.text[:500]}[/red]"
)
raise typer.Exit(1)

def _answer_text(event: dict) -> str:
# Only the answer text; the model's "thought" (reasoning) parts stay hidden
# behind the thinking spinner.
parts = (event.get("content") or {}).get("parts") or []
return "".join(
p["text"]
for p in parts
if isinstance(p, dict) and p.get("text") and not p.get("thought")
)

streamed = []
final_answer = ""
answer_open = False # first answer token seen → spinner stopped, reply started
# A spinner covers the wait (model latency + reasoning) until the answer starts.
status = None if raw else console.status("thinking", spinner="dots")
if status is not None:
status.start()
try:
for line in resp.iter_lines(decode_unicode=True):
if not line:
continue
if raw:
print(line) # builtin print: no rich wrapping/markup on raw JSON
continue
event = _normalize_stream_event(line)
if not isinstance(event, dict):
continue
if event.get("error"):
if status is not None and not answer_open:
status.stop()
console.print(f"\n[red]Error: {event['error']}[/red]")
continue
answer = _answer_text(event)
# `partial=False` is the final aggregate (repeats everything) — keep it
# as a fallback but don't print it; partial events stream the deltas.
if event.get("partial") is False:
final_answer = answer or final_answer
continue
if answer:
if not answer_open:
if status is not None:
status.stop() # leave the thinking phase
console.print("") # blank line before the reply
answer_open = True
console.print(answer, end="", style="green")
streamed.append(answer)
finally:
if status is not None and not answer_open:
status.stop()

if not streamed and final_answer:
console.print("")
console.print(final_answer, end="", style="green")
console.print("")
return "".join(streamed) or final_answer


@invoke_app.command("harness")
def harness_command(
name: str = typer.Argument(
Expand All @@ -640,7 +796,9 @@ def harness_command(
"agentkit_user", "--user-id", help="user_id for the run."
),
session_id: str = typer.Option(
"agentkit_sample_session", "--session-id", help="session_id for the run."
None,
"--session-id",
help="session_id for the run (random if unset).",
),
max_llm_calls: int = typer.Option(
None,
Expand Down Expand Up @@ -675,7 +833,12 @@ def harness_command(
help="Bearer token override (e.g. OAuth JWT for custom_jwt harnesses).",
),
raw: bool = typer.Option(
False, "--raw", help="Print the raw InvokeHarnessResponse JSON."
False, "--raw", help="Print the raw response (InvokeHarnessResponse / SSE)."
),
protocol: str = typer.Option(
"run_sse",
"--protocol",
help="Transport: 'run_sse' (ADK /run_sse, default) or 'invoke' (POST /harness/invoke).",
),
) -> Any:
"""Invoke a deployed harness by name (resolved via the harness.json registry).
Expand Down Expand Up @@ -712,7 +875,13 @@ def harness_command(
)
raise typer.Exit(1)

invoke_url = entry["url"].rstrip("/") + "/harness/invoke"
if protocol not in ("invoke", "run_sse"):
console.print(
f"[red]Error: --protocol must be 'invoke' or 'run_sse', got '{protocol}'.[/red]"
)
raise typer.Exit(1)

base_url = entry["url"].rstrip("/")

# Inbound credential selection is auth-type aware — the registry records how each
# harness was deployed (harness/deploy.py _record_harness): a custom_jwt harness is
Expand All @@ -722,6 +891,7 @@ def harness_command(
# the data plane's JWT path; refresh failure errors (re-login), never silent.
# 3. key_auth harness → the static "key"; a key_auth authorizer would reject a JWT,
# so we never force the id_token onto it.
# Both transports (invoke and run_sse) share this resolution.
from agentkit.auth.errors import AuthError
from agentkit.auth.sso import load_session

Expand All @@ -744,6 +914,30 @@ def harness_command(
if not token:
token = entry.get("key") or ""

# No session given → mint a random one (both transports behave identically;
# creating it is idempotent).
session_id = session_id or f"s-{uuid.uuid4().hex[:12]}"

if protocol == "run_sse":
# run_sse supports the same overrides (sent as the `harness` field); only
# --max-llm-calls is invoke-only (not part of the ADK run_sse request).
if max_llm_calls is not None:
console.print(
"[yellow]Note: --max-llm-calls is ignored with --protocol "
"run_sse.[/yellow]"
)
return _harness_run_sse(
base_url=base_url,
token=token,
prompt=message,
session_id=session_id,
overrides=build_harness_overrides(
system_prompt, model_name, tools, skills, runtime
),
raw=raw,
)

invoke_url = base_url + "/harness/invoke"
req_headers = {"Content-Type": "application/json"}
if token:
req_headers["Authorization"] = f"Bearer {token}"
Expand Down
86 changes: 72 additions & 14 deletions docs/content/2.agentkit-cli/5.harness.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,46 +160,54 @@ agentkit deploy --harness my-harness --region cn-beijing --yes

`agentkit invoke harness <name> "<prompt>" [options]` 在 `--directory`(默认当前目录)下的 `harness.json` 注册表中按名查找运行时并发起调用。通过 `--protocol` 选择传输方式:

- `invoke`(默认):`POST /harness/invoke`,**非流式**,返回完整结果(含错误透传)。
- `run_sse`:调用 ADK 的 `/run_sse`,**流式**返回。其 `app_name` 固定为 `harness`,`user_id` 由 CLI 随机生成(临时占位),`session_id` 取 `--session-id`。
- `run_sse`(**默认**):调用 ADK 的 `/run_sse`,**流式**返回。其 `app_name` 固定为 `harness`;`user_id` 在携带 JWT(custom_jwt)时取其 `sub` 声明(与登录身份绑定),否则随机生成。渲染上:等待与推理阶段显示一个 `thinking` loading 动画(模型 thought 不展示),收到答案后空一行逐字流式输出回答正文;工具事件与末尾重复的聚合事件不显示;`--raw` 可打印底层原始 SSE。
- `invoke`:`POST /harness/invoke`,**非流式**,返回完整结果(含错误透传)。

两种方式的 `session_id` 都取 `--session-id`,**未指定时随机生成 `s-<id>`**(行为一致);`run_sse` 调用前会创建该 session(**幂等**:已存在也不影响)。

两种方式都支持一次性覆写(`--system-prompt` / `--model-name` / `--tools` / `--skills` / `--runtime`);覆写为**叠加在本次调用上的临时值**,不写回 spec、不改动运行时(工具 / 技能为增量加入)。

**鉴权(两种传输方式统一)**:`--apikey/-ak` 显式传入时优先;否则对 **custom_jwt** 的 Harness 自动读取 `agentkit login` 会话中的 OIDC id_token 作为 Bearer(自动续期,401 时强制刷新并重试一次)——因此 custom_jwt 的 Harness **无需手动传 `-ak`**;对 **key_auth** 的 Harness 使用注册表中的静态 `key`(key_auth 鉴权器会拒绝 JWT,故不会强塞 id_token)。

| 参数 | 说明 | 默认值 / 取值 |
| :--- | :--- | :--- |
| `<name>` | **(必填)** 已部署的 Harness 名称(注册表键)。 | — |
| `<message>` | **(必填)** 发送给 Harness 的提示词。 | — |
| `--protocol` | 传输方式。 | `invoke`(默认)/ `run_sse` |
| `--protocol` | 传输方式。 | `run_sse`(默认)/ `invoke` |
| `--directory` | `harness.json` 注册表所在目录。 | 当前目录 |
| `--user-id` | 运行的 user_id(仅 `invoke`;`run_sse` 由 CLI 随机生成)。 | `agentkit_user` |
| `--session-id` | 运行的 session_id。 | `agentkit_sample_session` |
| `--user-id` | 运行的 user_id(仅 `invoke`;`run_sse` 取 JWT `sub` 或随机生成)。 | `agentkit_user` |
| `--session-id` | 运行的 session_id(两种传输方式一致)。 | 未指定时随机 `s-<id>` |
| `--max-llm-calls` | 本次调用的最大 LLM 调用次数(**仅 `invoke`**)。 | 不限 |
| `--system-prompt` | 覆写系统提示词。 | — |
| `--model-name` | 覆写模型名。 | — |
| `--tools` | 覆写工具(逗号分隔,增量加入)。 | — |
| `--skills` | 覆写技能(逗号分隔,增量加入)。 | — |
| `--runtime` | 覆写运行时后端。 | `adk` / `codex` |
| `--apikey`, `-ak` | Bearer Token 覆写(如 custom_jwt 的用户 JWT)。 | 注册表 `key` |
| `--apikey`, `-ak` | Bearer Token 覆写(如 custom_jwt 的用户 JWT)。 | 自动鉴权或注册表 `key` |
| `--raw` | 打印原始响应(`InvokeHarnessResponse` / SSE)。 | `false` |

> `--max-llm-calls` 不属于 ADK `run_sse` 请求,故仅在 `invoke` 模式生效;在 `run_sse` 模式传入会被忽略并提示。

```bash
# 基础调用(invoke,默认)
# 基础调用(run_sse,默认,流式
agentkit invoke harness my-harness "今天杭州天气如何?"

# 一次性覆写系统提示词(invoke)
# 一次性覆写系统提示词
agentkit invoke harness my-harness "2+2=?" --system-prompt "请简洁作答。"

# 限制本次最大 LLM 调用次数(仅 invoke
agentkit invoke harness my-harness "规划一次多步研究任务。" --max-llm-calls 10
# 指定 session(未指定则随机生成;同名 session 幂等复用以延续上下文
agentkit invoke harness my-harness "继续上文" --session-id sess-001

# 流式调用(run_sse),并覆写模型与工具
# 覆写模型与工具
agentkit invoke harness my-harness "帮我查一下杭州天气" \
--protocol run_sse --model-name doubao-seed-1-6-250615 --tools web_search,web_fetch \
--session-id sess-001
--model-name doubao-seed-1-6-250615 --tools web_search,web_fetch

# OAuth(custom_jwt)的 Harness:携带用户池签发的 JWT
# 改用 invoke(非流式 /harness/invoke),并限制最大 LLM 调用次数
agentkit invoke harness my-harness "规划一次多步研究任务。" --protocol invoke --max-llm-calls 10

# OAuth(custom_jwt)的 Harness:默认自动使用 `agentkit login` 的 JWT,无需 -ak
agentkit invoke harness my-harness "你好"
# 如需覆写凭证可显式传入
agentkit invoke harness my-harness "你好" -ak "<user-oauth-jwt>"
```

Expand Down Expand Up @@ -265,3 +273,53 @@ agentkit deploy --harness my-harness \
agentkit list harness
agentkit list harness --output json --region cn-beijing
```

## 8. list sessions —— 列出某 Harness 的会话

`agentkit list sessions --harness <name>` 列出某个已部署 harness 运行时上、**某个用户**的对话会话。它通过 `harness.json` 解析出运行时地址与凭证,直接调用运行时的 ADK 数据面端点:

```text
GET {harness_url}/apps/harness/users/{user_id}/sessions
```

> 与 `list harness`(控制面,AK/SK 调火山 `ListRuntimes`)不同,本命令是**数据面**调用,寻址方式与 `invoke harness` 一致(按名从 `harness.json` 解析、用 harness 凭证鉴权)。

### user_id 的确定方式

ADK 的会话列表接口把 `user_id` 作为**必填路径参数**,没有"列出所有用户会话"的能力。因此 user_id 必须能被确定,按以下优先级:

1. 显式 `--user-id <id>`;
2. 未指定时,从 harness 凭证的 JWT `sub` 解出——
- **custom_jwt(OAuth)harness**:默认使用 `agentkit login` 的 id_token(或 `-ak` 传入的 JWT),其 `sub` 即用户身份;
- **key_auth harness**:凭证是不透明 API Key、不是 JWT,**无法解出 user_id**,此时必须显式 `--user-id`,否则快速失败。

### 参数

| 参数 | 说明 | 默认值 |
| :--- | :--- | :--- |
| `--harness <name>` | **(必填)** harness 名称,从 `harness.json` 解析。 | — |
| `--user-id <id>` | 要查询的用户;缺省时尝试从凭证 JWT 的 `sub` 推导。 | 由 JWT 推导 |
| `--apikey`, `-ak <token>` | Bearer token(如 custom_jwt harness 的 OIDC JWT),覆盖 registry 中的凭证。 | 注册表凭证 |
| `--directory <dir>` | `harness.json` 所在目录。 | `.`(当前目录) |
| `--output` | 输出格式 `table` / `json` / `yaml`。 | `table` |
| `--quiet`, `-q` | 仅打印会话 id。 | `false` |
| `--no-color`, `-nc` | 关闭彩色输出。 | `false` |

表格输出包含 `SessionId`、`Events`(事件数)、`LastUpdate`(最近更新时间)等列。

### 例子

```bash
# custom_jwt(OAuth)harness:已 agentkit login,user_id 自动取自 JWT sub
agentkit list sessions --harness my-harness

# 显式指定用户(key_auth harness 必须这样)
agentkit list sessions --harness my-harness --user-id alice

# 传入 JWT 覆盖凭证(user_id 取自该 JWT 的 sub)
agentkit list sessions --harness my-harness -ak "<oidc-jwt>"

# 仅打印会话 id / JSON 输出
agentkit list sessions --harness my-harness --user-id alice --quiet
agentkit list sessions --harness my-harness --user-id alice --output json
```
Loading