diff --git a/agentkit/toolkit/cli/cli_invoke.py b/agentkit/toolkit/cli/cli_invoke.py index 1940eca..4bd2b77 100644 --- a/agentkit/toolkit/cli/cli_invoke.py +++ b/agentkit/toolkit/cli/cli_invoke.py @@ -16,6 +16,8 @@ from pathlib import Path from typing import Optional, Any +import base64 +import binascii import json import typer from typer.core import TyperGroup @@ -627,6 +629,160 @@ def build_harness_overrides( return overrides +# Fixed ADK app name for the run_sse path. The harness loader serves its single +# agent under any app name, so a stable constant keeps the CLI decoupled from the +# deployed HARNESS_NAME. +_HARNESS_RUN_SSE_APP = "harness" + + +def _user_id_from_token(token: str) -> str | None: + """Return the OIDC ``sub`` claim from a JWT bearer token, else ``None``. + + A custom_jwt harness is called with an OIDC id_token whose ``sub`` is the + authenticated user's stable id — use it as the run's user_id so sessions are + tied to the real identity. A key_auth token is an opaque api key (not a JWT, + no ``sub``), so this returns ``None`` and the caller falls back to a random id. + """ + parts = token.split(".") + if len(parts) != 3: + return None # not a JWT (e.g. a key_auth api key) + payload = parts[1] + payload += "=" * (-len(payload) % 4) # restore base64 padding + try: + claims = json.loads(base64.urlsafe_b64decode(payload)) + except (ValueError, binascii.Error): + return None # malformed JWT payload → fall back to random + sub = claims.get("sub") + return sub if isinstance(sub, str) and sub else None + + +def _harness_run_sse( + *, + base_url: str, + token: str, + prompt: str, + session_id: str, + overrides: dict, + raw: bool, +) -> Any: + """Invoke a deployed harness via the ADK ``/run_sse`` endpoint (streaming). + + app_name is the fixed ``"harness"``; user_id is the JWT ``sub`` when the token + is an OIDC id_token, else a random id; session_id is the caller's. When + ``overrides`` is non-empty it is sent as the ``harness`` field so the runtime + streams a spawned (overridden) agent; otherwise the base agent. + """ + import requests + + app_name = _HARNESS_RUN_SSE_APP + sub = _user_id_from_token(token) + user_id = sub or f"u-{uuid.uuid4().hex[:12]}" + user_id_origin = "jwt sub" if sub else "random" + headers = {"Content-Type": "application/json"} + if token: + headers["Authorization"] = f"Bearer {token}" + console.print( + f"[blue]run_sse: app_name={app_name}, user_id={user_id} ({user_id_origin}), " + f"session_id={session_id}[/blue]" + ) + + # ADK /run_sse requires an existing session; create it (ignore "exists"). + session_url = f"{base_url}/apps/{app_name}/users/{user_id}/sessions/{session_id}" + try: + sr = requests.post(session_url, json={}, headers=headers, timeout=60) + except requests.RequestException as e: + console.print(f"[red]❌ Session create failed: {e}[/red]") + raise typer.Exit(1) + if sr.status_code not in (200, 400, 409): + console.print( + f"[red]❌ Session create HTTP {sr.status_code}: {sr.text[:300]}[/red]" + ) + raise typer.Exit(1) + + body: dict[str, Any] = { + "app_name": app_name, + "user_id": user_id, + "session_id": session_id, + "new_message": {"role": "user", "parts": [{"text": prompt}]}, + "streaming": True, + } + if overrides: + body["harness"] = overrides + console.print(f"[blue]Using one-time overrides: {overrides}[/blue]") + try: + resp = requests.post( + f"{base_url}/run_sse", + json=body, + headers=headers, + timeout=300, + stream=True, + ) + except requests.RequestException as e: + console.print(f"[red]❌ run_sse request failed: {e}[/red]") + raise typer.Exit(1) + if resp.status_code != 200: + console.print( + f"[red]❌ run_sse HTTP {resp.status_code}: {resp.text[:500]}[/red]" + ) + raise typer.Exit(1) + + def _answer_text(event: dict) -> str: + # Only the answer text; the model's "thought" (reasoning) parts stay hidden + # behind the thinking spinner. + parts = (event.get("content") or {}).get("parts") or [] + return "".join( + p["text"] + for p in parts + if isinstance(p, dict) and p.get("text") and not p.get("thought") + ) + + streamed = [] + final_answer = "" + answer_open = False # first answer token seen → spinner stopped, reply started + # A spinner covers the wait (model latency + reasoning) until the answer starts. + status = None if raw else console.status("thinking", spinner="dots") + if status is not None: + status.start() + try: + for line in resp.iter_lines(decode_unicode=True): + if not line: + continue + if raw: + print(line) # builtin print: no rich wrapping/markup on raw JSON + continue + event = _normalize_stream_event(line) + if not isinstance(event, dict): + continue + if event.get("error"): + if status is not None and not answer_open: + status.stop() + console.print(f"\n[red]Error: {event['error']}[/red]") + continue + answer = _answer_text(event) + # `partial=False` is the final aggregate (repeats everything) — keep it + # as a fallback but don't print it; partial events stream the deltas. + if event.get("partial") is False: + final_answer = answer or final_answer + continue + if answer: + if not answer_open: + if status is not None: + status.stop() # leave the thinking phase + console.print("") # blank line before the reply + answer_open = True + console.print(answer, end="", style="green") + streamed.append(answer) + finally: + if status is not None and not answer_open: + status.stop() + + if not streamed and final_answer: + console.print("") + console.print(final_answer, end="", style="green") + console.print("") + return "".join(streamed) or final_answer + + @invoke_app.command("harness") def harness_command( name: str = typer.Argument( @@ -640,7 +796,9 @@ def harness_command( "agentkit_user", "--user-id", help="user_id for the run." ), session_id: str = typer.Option( - "agentkit_sample_session", "--session-id", help="session_id for the run." + None, + "--session-id", + help="session_id for the run (random if unset).", ), max_llm_calls: int = typer.Option( None, @@ -675,7 +833,12 @@ def harness_command( help="Bearer token override (e.g. OAuth JWT for custom_jwt harnesses).", ), raw: bool = typer.Option( - False, "--raw", help="Print the raw InvokeHarnessResponse JSON." + False, "--raw", help="Print the raw response (InvokeHarnessResponse / SSE)." + ), + protocol: str = typer.Option( + "run_sse", + "--protocol", + help="Transport: 'run_sse' (ADK /run_sse, default) or 'invoke' (POST /harness/invoke).", ), ) -> Any: """Invoke a deployed harness by name (resolved via the harness.json registry). @@ -712,7 +875,13 @@ def harness_command( ) raise typer.Exit(1) - invoke_url = entry["url"].rstrip("/") + "/harness/invoke" + if protocol not in ("invoke", "run_sse"): + console.print( + f"[red]Error: --protocol must be 'invoke' or 'run_sse', got '{protocol}'.[/red]" + ) + raise typer.Exit(1) + + base_url = entry["url"].rstrip("/") # Inbound credential selection is auth-type aware — the registry records how each # harness was deployed (harness/deploy.py _record_harness): a custom_jwt harness is @@ -722,6 +891,7 @@ def harness_command( # the data plane's JWT path; refresh failure errors (re-login), never silent. # 3. key_auth harness → the static "key"; a key_auth authorizer would reject a JWT, # so we never force the id_token onto it. + # Both transports (invoke and run_sse) share this resolution. from agentkit.auth.errors import AuthError from agentkit.auth.sso import load_session @@ -744,6 +914,30 @@ def harness_command( if not token: token = entry.get("key") or "" + # No session given → mint a random one (both transports behave identically; + # creating it is idempotent). + session_id = session_id or f"s-{uuid.uuid4().hex[:12]}" + + if protocol == "run_sse": + # run_sse supports the same overrides (sent as the `harness` field); only + # --max-llm-calls is invoke-only (not part of the ADK run_sse request). + if max_llm_calls is not None: + console.print( + "[yellow]Note: --max-llm-calls is ignored with --protocol " + "run_sse.[/yellow]" + ) + return _harness_run_sse( + base_url=base_url, + token=token, + prompt=message, + session_id=session_id, + overrides=build_harness_overrides( + system_prompt, model_name, tools, skills, runtime + ), + raw=raw, + ) + + invoke_url = base_url + "/harness/invoke" req_headers = {"Content-Type": "application/json"} if token: req_headers["Authorization"] = f"Bearer {token}" diff --git a/docs/content/2.agentkit-cli/5.harness.md b/docs/content/2.agentkit-cli/5.harness.md index 33e4761..60847b3 100644 --- a/docs/content/2.agentkit-cli/5.harness.md +++ b/docs/content/2.agentkit-cli/5.harness.md @@ -160,46 +160,54 @@ agentkit deploy --harness my-harness --region cn-beijing --yes `agentkit invoke harness "" [options]` 在 `--directory`(默认当前目录)下的 `harness.json` 注册表中按名查找运行时并发起调用。通过 `--protocol` 选择传输方式: -- `invoke`(默认):`POST /harness/invoke`,**非流式**,返回完整结果(含错误透传)。 -- `run_sse`:调用 ADK 的 `/run_sse`,**流式**返回。其 `app_name` 固定为 `harness`,`user_id` 由 CLI 随机生成(临时占位),`session_id` 取 `--session-id`。 +- `run_sse`(**默认**):调用 ADK 的 `/run_sse`,**流式**返回。其 `app_name` 固定为 `harness`;`user_id` 在携带 JWT(custom_jwt)时取其 `sub` 声明(与登录身份绑定),否则随机生成。渲染上:等待与推理阶段显示一个 `thinking` loading 动画(模型 thought 不展示),收到答案后空一行逐字流式输出回答正文;工具事件与末尾重复的聚合事件不显示;`--raw` 可打印底层原始 SSE。 +- `invoke`:`POST /harness/invoke`,**非流式**,返回完整结果(含错误透传)。 + +两种方式的 `session_id` 都取 `--session-id`,**未指定时随机生成 `s-`**(行为一致);`run_sse` 调用前会创建该 session(**幂等**:已存在也不影响)。 两种方式都支持一次性覆写(`--system-prompt` / `--model-name` / `--tools` / `--skills` / `--runtime`);覆写为**叠加在本次调用上的临时值**,不写回 spec、不改动运行时(工具 / 技能为增量加入)。 +**鉴权(两种传输方式统一)**:`--apikey/-ak` 显式传入时优先;否则对 **custom_jwt** 的 Harness 自动读取 `agentkit login` 会话中的 OIDC id_token 作为 Bearer(自动续期,401 时强制刷新并重试一次)——因此 custom_jwt 的 Harness **无需手动传 `-ak`**;对 **key_auth** 的 Harness 使用注册表中的静态 `key`(key_auth 鉴权器会拒绝 JWT,故不会强塞 id_token)。 + | 参数 | 说明 | 默认值 / 取值 | | :--- | :--- | :--- | | `` | **(必填)** 已部署的 Harness 名称(注册表键)。 | — | | `` | **(必填)** 发送给 Harness 的提示词。 | — | -| `--protocol` | 传输方式。 | `invoke`(默认)/ `run_sse` | +| `--protocol` | 传输方式。 | `run_sse`(默认)/ `invoke` | | `--directory` | `harness.json` 注册表所在目录。 | 当前目录 | -| `--user-id` | 运行的 user_id(仅 `invoke`;`run_sse` 由 CLI 随机生成)。 | `agentkit_user` | -| `--session-id` | 运行的 session_id。 | `agentkit_sample_session` | +| `--user-id` | 运行的 user_id(仅 `invoke`;`run_sse` 取 JWT `sub` 或随机生成)。 | `agentkit_user` | +| `--session-id` | 运行的 session_id(两种传输方式一致)。 | 未指定时随机 `s-` | | `--max-llm-calls` | 本次调用的最大 LLM 调用次数(**仅 `invoke`**)。 | 不限 | | `--system-prompt` | 覆写系统提示词。 | — | | `--model-name` | 覆写模型名。 | — | | `--tools` | 覆写工具(逗号分隔,增量加入)。 | — | | `--skills` | 覆写技能(逗号分隔,增量加入)。 | — | | `--runtime` | 覆写运行时后端。 | `adk` / `codex` | -| `--apikey`, `-ak` | Bearer Token 覆写(如 custom_jwt 的用户 JWT)。 | 注册表 `key` | +| `--apikey`, `-ak` | Bearer Token 覆写(如 custom_jwt 的用户 JWT)。 | 自动鉴权或注册表 `key` | | `--raw` | 打印原始响应(`InvokeHarnessResponse` / SSE)。 | `false` | > `--max-llm-calls` 不属于 ADK `run_sse` 请求,故仅在 `invoke` 模式生效;在 `run_sse` 模式传入会被忽略并提示。 ```bash -# 基础调用(invoke,默认) +# 基础调用(run_sse,默认,流式) agentkit invoke harness my-harness "今天杭州天气如何?" -# 一次性覆写系统提示词(invoke) +# 一次性覆写系统提示词 agentkit invoke harness my-harness "2+2=?" --system-prompt "请简洁作答。" -# 限制本次最大 LLM 调用次数(仅 invoke) -agentkit invoke harness my-harness "规划一次多步研究任务。" --max-llm-calls 10 +# 指定 session(未指定则随机生成;同名 session 幂等复用以延续上下文) +agentkit invoke harness my-harness "继续上文" --session-id sess-001 -# 流式调用(run_sse),并覆写模型与工具 +# 覆写模型与工具 agentkit invoke harness my-harness "帮我查一下杭州天气" \ - --protocol run_sse --model-name doubao-seed-1-6-250615 --tools web_search,web_fetch \ - --session-id sess-001 + --model-name doubao-seed-1-6-250615 --tools web_search,web_fetch -# OAuth(custom_jwt)的 Harness:携带用户池签发的 JWT +# 改用 invoke(非流式 /harness/invoke),并限制最大 LLM 调用次数 +agentkit invoke harness my-harness "规划一次多步研究任务。" --protocol invoke --max-llm-calls 10 + +# OAuth(custom_jwt)的 Harness:默认自动使用 `agentkit login` 的 JWT,无需 -ak +agentkit invoke harness my-harness "你好" +# 如需覆写凭证可显式传入 agentkit invoke harness my-harness "你好" -ak "" ``` @@ -265,3 +273,53 @@ agentkit deploy --harness my-harness \ agentkit list harness agentkit list harness --output json --region cn-beijing ``` + +## 8. list sessions —— 列出某 Harness 的会话 + +`agentkit list sessions --harness ` 列出某个已部署 harness 运行时上、**某个用户**的对话会话。它通过 `harness.json` 解析出运行时地址与凭证,直接调用运行时的 ADK 数据面端点: + +```text +GET {harness_url}/apps/harness/users/{user_id}/sessions +``` + +> 与 `list harness`(控制面,AK/SK 调火山 `ListRuntimes`)不同,本命令是**数据面**调用,寻址方式与 `invoke harness` 一致(按名从 `harness.json` 解析、用 harness 凭证鉴权)。 + +### user_id 的确定方式 + +ADK 的会话列表接口把 `user_id` 作为**必填路径参数**,没有"列出所有用户会话"的能力。因此 user_id 必须能被确定,按以下优先级: + +1. 显式 `--user-id `; +2. 未指定时,从 harness 凭证的 JWT `sub` 解出—— + - **custom_jwt(OAuth)harness**:默认使用 `agentkit login` 的 id_token(或 `-ak` 传入的 JWT),其 `sub` 即用户身份; + - **key_auth harness**:凭证是不透明 API Key、不是 JWT,**无法解出 user_id**,此时必须显式 `--user-id`,否则快速失败。 + +### 参数 + +| 参数 | 说明 | 默认值 | +| :--- | :--- | :--- | +| `--harness ` | **(必填)** harness 名称,从 `harness.json` 解析。 | — | +| `--user-id ` | 要查询的用户;缺省时尝试从凭证 JWT 的 `sub` 推导。 | 由 JWT 推导 | +| `--apikey`, `-ak ` | Bearer token(如 custom_jwt harness 的 OIDC JWT),覆盖 registry 中的凭证。 | 注册表凭证 | +| `--directory ` | `harness.json` 所在目录。 | `.`(当前目录) | +| `--output` | 输出格式 `table` / `json` / `yaml`。 | `table` | +| `--quiet`, `-q` | 仅打印会话 id。 | `false` | +| `--no-color`, `-nc` | 关闭彩色输出。 | `false` | + +表格输出包含 `SessionId`、`Events`(事件数)、`LastUpdate`(最近更新时间)等列。 + +### 例子 + +```bash +# custom_jwt(OAuth)harness:已 agentkit login,user_id 自动取自 JWT sub +agentkit list sessions --harness my-harness + +# 显式指定用户(key_auth harness 必须这样) +agentkit list sessions --harness my-harness --user-id alice + +# 传入 JWT 覆盖凭证(user_id 取自该 JWT 的 sub) +agentkit list sessions --harness my-harness -ak "" + +# 仅打印会话 id / JSON 输出 +agentkit list sessions --harness my-harness --user-id alice --quiet +agentkit list sessions --harness my-harness --user-id alice --output json +``` diff --git a/tests/toolkit/cli/test_cli_invoke_harness.py b/tests/toolkit/cli/test_cli_invoke_harness.py index 7d08315..18fe1be 100644 --- a/tests/toolkit/cli/test_cli_invoke_harness.py +++ b/tests/toolkit/cli/test_cli_invoke_harness.py @@ -44,6 +44,15 @@ def _run_harness(args): return runner.invoke(app, ["invoke", "harness", *args]) +def _run_invoke(args): + """Run the harness subcommand pinned to the ``invoke`` transport. + + ``--protocol`` now defaults to ``run_sse``; these tests exercise the + ``/harness/invoke`` path, so they request it explicitly. + """ + return _run_harness([*args, "--protocol", "invoke"]) + + class _FakeResponse: def __init__(self, payload, status_code=200): self._payload = payload @@ -96,13 +105,13 @@ def test_build_harness_overrides_empty_when_unset(): def test_unknown_harness_fails(tmp_path): _write_registry(tmp_path, {"other": {"url": "https://x", "key": "k"}}) - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 1 assert "not found in registry" in result.output def test_no_registry_fails(tmp_path): - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 1 assert "not found in registry" in result.output @@ -122,7 +131,7 @@ def test_harness_invoke_posts_correct_request(tmp_path, monkeypatch): "output": "PINEAPPLE", }) - result = _run_harness( + result = _run_invoke( [ "first", "What should you reply?", @@ -145,6 +154,8 @@ def test_harness_invoke_posts_correct_request(tmp_path, monkeypatch): assert body["prompt"] == "What should you reply?" assert body["harness_name"] == "first" assert body["run_agent_request"]["user_id"] == "agentkit_user" + # No --session-id → random s-, consistent with the run_sse path. + assert body["run_agent_request"]["session_id"].startswith("s-") assert body["run_agent_request"]["max_llm_calls"] == 7 # Partial overrides only (model_fields_set semantics). assert body["harness"] == {"system_prompt": "Reply PINEAPPLE."} @@ -155,7 +166,7 @@ def test_harness_invoke_no_overrides_omits_harness_key(tmp_path, monkeypatch): captured = {} _patch_post(monkeypatch, captured) - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 0, result.output assert "harness" not in captured["json"] @@ -176,7 +187,7 @@ def test_harness_error_field_is_surfaced(tmp_path, monkeypatch): }, ) - result = _run_harness( + result = _run_invoke( ["first", "hi", "--directory", str(tmp_path), "--tools", "bogus"] ) assert result.exit_code == 1 @@ -189,7 +200,7 @@ def test_harness_invoke_http_error_fails(tmp_path, monkeypatch): captured = {} _patch_post(monkeypatch, captured, payload={"detail": "boom"}, status_code=500) - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 1 assert "HTTP 500" in result.output @@ -199,7 +210,7 @@ def test_apikey_overrides_registry_key(tmp_path, monkeypatch): captured = {} _patch_post(monkeypatch, captured) - result = _run_harness( + result = _run_invoke( ["first", "hi", "--directory", str(tmp_path), "--apikey", "jwt-token"] ) assert result.exit_code == 0, result.output @@ -282,7 +293,7 @@ def test_harness_invoke_uses_login_id_token_as_bearer(monkeypatch, tmp_path): _write_registry(tmp_path, {"first": {"url": "https://h.example.com"}}) # no static key captured = {} _patch_post(monkeypatch, captured) - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 0, result.output assert captured["headers"].get("Authorization") == f"Bearer {tok}" @@ -292,7 +303,7 @@ def test_harness_invoke_apikey_overrides_login_id_token(monkeypatch, tmp_path): _write_registry(tmp_path, {"first": {"url": "https://h.example.com"}}) captured = {} _patch_post(monkeypatch, captured) - result = _run_harness(["first", "hi", "--directory", str(tmp_path), "--apikey", "explicit-key"]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path), "--apikey", "explicit-key"]) assert result.exit_code == 0, result.output assert captured["headers"].get("Authorization") == "Bearer explicit-key" @@ -317,7 +328,7 @@ def fake_post(url, json=None, headers=None, timeout=None): return _FakeResponse({"output": "ok"}, 401 if len(calls) == 1 else 200) monkeypatch.setattr("requests.post", fake_post) - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 0, result.output assert calls == [f"Bearer {tok1}", f"Bearer {tok2}"] # one refresh + one retry @@ -334,7 +345,7 @@ def boom(self, rt): raise AuthError("token endpoint rejected the request") monkeypatch.setattr(sess_mod.OAuthClient, "refresh", boom) - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 1 assert "login" in result.output.lower() @@ -346,7 +357,7 @@ def test_harness_invoke_keyauth_uses_key_even_when_logged_in(monkeypatch, tmp_pa _write_registry(tmp_path, {"first": {"url": "https://h.example.com", "key": "ak", "runtime_id": "r-1"}}) captured = {} _patch_post(monkeypatch, captured) - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 0, result.output assert captured["headers"].get("Authorization") == "Bearer ak" @@ -360,7 +371,7 @@ def test_harness_invoke_custom_jwt_entry_uses_login_id_token(monkeypatch, tmp_pa }}) captured = {} _patch_post(monkeypatch, captured) - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 0, result.output assert captured["headers"].get("Authorization") == f"Bearer {tok}" @@ -382,7 +393,231 @@ def fake_post(url, json=None, headers=None, timeout=None): return _FakeResponse({"detail": "denied"}, 401) monkeypatch.setattr("requests.post", fake_post) - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 1 assert len(calls) == 2 # original + exactly one refresh-retry, no third assert "HTTP 401" in result.output + + +# --- run_sse protocol -------------------------------------------------------- + + +def test_harness_run_sse_streams_answer(tmp_path, monkeypatch): + _write_registry(tmp_path, {"first": {"url": "https://x", "key": "ak"}}) + calls = [] + sse_lines = [ + 'data: {"content":{"parts":[{"text":"KI"}],"role":"model"},"partial":true}', + 'data: {"content":{"parts":[{"text":"WI"}],"role":"model"},"partial":true}', + # final aggregate repeats everything (incl. a thought part); must be skipped + 'data: {"content":{"parts":[{"text":"reasoning","thought":true},' + '{"text":"KIWI"}],"role":"model"},"partial":false}', + ] + + class _SSEResp: + status_code = 200 + text = "" + + def iter_lines(self, decode_unicode=False): + return iter(sse_lines) + + def fake_post(url, json=None, headers=None, timeout=None, stream=False): + calls.append({"url": url, "json": json}) + return _SSEResp() if url.endswith("/run_sse") else _FakeResponse({}, 200) + + monkeypatch.setattr("requests.post", fake_post) + + result = _run_harness( + [ + "first", + "hello", + "--directory", + str(tmp_path), + "--protocol", + "run_sse", + "--session-id", + "s-1", + ] + ) + assert result.exit_code == 0, result.output + # Answer streamed from the deltas; thought text and the final aggregate are + # not double-printed. + assert "KIWI" in result.output + assert "reasoning" not in result.output + + run_call = next(c for c in calls if c["url"].endswith("/run_sse")) + assert run_call["url"] == "https://x/run_sse" + assert run_call["json"]["app_name"] == "harness" # fixed + assert run_call["json"]["session_id"] == "s-1" # caller-provided + assert run_call["json"]["user_id"].startswith("u-") # random, CLI-generated + assert "harness" not in run_call["json"] # no overrides passed + + sess_call = next(c for c in calls if "/sessions/" in c["url"]) + assert sess_call["url"] == "https://x/apps/harness/users/" + run_call["json"]["user_id"] + "/sessions/s-1" + + +def test_harness_run_sse_sends_overrides(tmp_path, monkeypatch): + _write_registry(tmp_path, {"first": {"url": "https://x", "key": "ak"}}) + calls = [] + sse = ['data: {"content":{"parts":[{"text":"PINEAPPLE"}]},"partial":true}'] + + class _SSEResp: + status_code = 200 + text = "" + + def iter_lines(self, decode_unicode=False): + return iter(sse) + + def fake_post(url, json=None, headers=None, timeout=None, stream=False): + calls.append({"url": url, "json": json}) + return _SSEResp() if url.endswith("/run_sse") else _FakeResponse({}, 200) + + monkeypatch.setattr("requests.post", fake_post) + + result = _run_harness( + [ + "first", + "x", + "--directory", + str(tmp_path), + "--protocol", + "run_sse", + "--system-prompt", + "Reply PINEAPPLE.", + "--tools", + "web_search", + ] + ) + assert result.exit_code == 0, result.output + run_call = next(c for c in calls if c["url"].endswith("/run_sse")) + # Overrides are sent as the `harness` field (mirrors HarnessOverrides shape). + assert run_call["json"]["harness"] == { + "system_prompt": "Reply PINEAPPLE.", + "tools": "web_search", + } + + +def _make_jwt(sub): + """Build an unsigned JWT whose payload carries the given ``sub`` claim.""" + import base64 + + def _seg(obj): + raw = json.dumps(obj).encode() + return base64.urlsafe_b64encode(raw).rstrip(b"=").decode() + + return f"{_seg({'alg': 'none'})}.{_seg({'sub': sub})}.sig" + + +def test_run_sse_user_id_from_jwt_sub(tmp_path, monkeypatch): + """With a JWT bearer token, user_id is the token's ``sub`` (not random).""" + _write_registry(tmp_path, {"first": {"url": "https://x", "key": "ak"}}) + calls = [] + sse = ['data: {"content":{"parts":[{"text":"OK"}]},"partial":true}'] + + class _SSEResp: + status_code = 200 + text = "" + + def iter_lines(self, decode_unicode=False): + return iter(sse) + + def fake_post(url, json=None, headers=None, timeout=None, stream=False): + calls.append({"url": url, "json": json}) + return _SSEResp() if url.endswith("/run_sse") else _FakeResponse({}, 200) + + monkeypatch.setattr("requests.post", fake_post) + + token = _make_jwt("user-abc-123") + result = _run_harness( + [ + "first", + "hi", + "--directory", + str(tmp_path), + "--protocol", + "run_sse", + "--session-id", + "s-1", + "--apikey", + token, + ] + ) + assert result.exit_code == 0, result.output + run_call = next(c for c in calls if c["url"].endswith("/run_sse")) + assert run_call["json"]["user_id"] == "user-abc-123" + # Session is created under the same (sub-derived) user_id. + sess_call = next(c for c in calls if "/sessions/" in c["url"]) + assert sess_call["url"] == "https://x/apps/harness/users/user-abc-123/sessions/s-1" + + +def test_user_id_from_token_helper(): + assert cli_invoke._user_id_from_token(_make_jwt("u-42")) == "u-42" + assert cli_invoke._user_id_from_token("opaque-api-key") is None + assert cli_invoke._user_id_from_token("") is None + + +def test_default_protocol_is_run_sse_with_random_session(tmp_path, monkeypatch): + """No --protocol and no --session-id → run_sse with a freshly minted session.""" + _write_registry(tmp_path, {"first": {"url": "https://x", "key": "ak"}}) + calls = [] + sse = ['data: {"content":{"parts":[{"text":"OK"}]},"partial":true}'] + + class _SSEResp: + status_code = 200 + text = "" + + def iter_lines(self, decode_unicode=False): + return iter(sse) + + def fake_post(url, json=None, headers=None, timeout=None, stream=False): + calls.append({"url": url, "json": json}) + return _SSEResp() if url.endswith("/run_sse") else _FakeResponse({}, 200) + + monkeypatch.setattr("requests.post", fake_post) + + result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + assert result.exit_code == 0, result.output + # Default transport is run_sse (not /harness/invoke). + run_call = next(c for c in calls if c["url"].endswith("/run_sse")) + sid = run_call["json"]["session_id"] + assert sid.startswith("s-") # random, not the invoke-path default + # The session is created (idempotently) before the run, under that id. + sess_call = next(c for c in calls if "/sessions/" in c["url"]) + assert sess_call["url"].endswith(f"/sessions/{sid}") + + +def test_run_sse_hides_reasoning_shows_answer(tmp_path, monkeypatch): + """Reasoning (thought) stays behind the spinner; only the answer is printed.""" + _write_registry(tmp_path, {"first": {"url": "https://x", "key": "ak"}}) + sse = [ + 'data: {"content":{"parts":[{"text":"let me think","thought":true}]},"partial":true}', + 'data: {"content":{"parts":[{"text":"FINAL"}]},"partial":true}', + ] + + class _SSEResp: + status_code = 200 + text = "" + + def iter_lines(self, decode_unicode=False): + return iter(sse) + + def fake_post(url, json=None, headers=None, timeout=None, stream=False): + return _SSEResp() if url.endswith("/run_sse") else _FakeResponse({}, 200) + + monkeypatch.setattr("requests.post", fake_post) + + result = _run_harness( + ["first", "hi", "--directory", str(tmp_path), "--session-id", "s-1"] + ) + assert result.exit_code == 0, result.output + out = result.output + assert "FINAL" in out + assert "let me think" not in out # reasoning is not dumped to the user + + +def test_harness_invalid_protocol_fails(tmp_path): + _write_registry(tmp_path, {"first": {"url": "https://x", "key": "ak"}}) + result = _run_harness( + ["first", "hi", "--directory", str(tmp_path), "--protocol", "bogus"] + ) + assert result.exit_code == 1 + assert "must be 'invoke' or 'run_sse'" in result.output