From b57fe720672d377dbf124c412605ef7ffe78be95 Mon Sep 17 00:00:00 2001 From: "fangyaozheng@bytedance.com" Date: Thu, 18 Jun 2026 13:50:03 +0800 Subject: [PATCH 1/7] feat(invoke harness): add run_sse protocol alongside invoke Add --protocol {invoke|run_sse} to 'agentkit invoke harness'. run_sse calls the deployed harness's ADK /run_sse endpoint: fixed app_name 'harness', a random CLI-generated user_id (temporary), and the caller's session_id; it creates the session then streams the answer (skipping model thought parts and the final aggregate). The existing /harness/invoke path is unchanged and remains the default. Co-Authored-By: Claude Opus 4.8 --- agentkit/toolkit/cli/cli_invoke.py | 139 ++++++++++++++++++- tests/toolkit/cli/test_cli_invoke_harness.py | 64 +++++++++ 2 files changed, 201 insertions(+), 2 deletions(-) diff --git a/agentkit/toolkit/cli/cli_invoke.py b/agentkit/toolkit/cli/cli_invoke.py index 1940eca..99ea2ae 100644 --- a/agentkit/toolkit/cli/cli_invoke.py +++ b/agentkit/toolkit/cli/cli_invoke.py @@ -627,6 +627,112 @@ def build_harness_overrides( return overrides +# Fixed ADK app name for the run_sse path. The harness loader serves its single +# agent under any app name, so a stable constant keeps the CLI decoupled from the +# deployed HARNESS_NAME. +_HARNESS_RUN_SSE_APP = "harness" + + +def _harness_run_sse( + *, base_url: str, token: str, prompt: str, session_id: str, raw: bool +) -> Any: + """Invoke a deployed harness via the ADK ``/run_sse`` endpoint (streaming). + + app_name is the fixed ``"harness"``; user_id is a freshly generated random id + (a temporary placeholder until real identity wiring); session_id is the + caller's. Per-call harness overrides are not supported on this path — it hits + the base agent. + """ + import requests + + app_name = _HARNESS_RUN_SSE_APP + user_id = f"u-{uuid.uuid4().hex[:12]}" + headers = {"Content-Type": "application/json"} + if token: + headers["Authorization"] = f"Bearer {token}" + console.print( + f"[blue]run_sse: app_name={app_name}, user_id={user_id} (random), " + f"session_id={session_id}[/blue]" + ) + + # ADK /run_sse requires an existing session; create it (ignore "exists"). + session_url = f"{base_url}/apps/{app_name}/users/{user_id}/sessions/{session_id}" + try: + sr = requests.post(session_url, json={}, headers=headers, timeout=60) + except requests.RequestException as e: + console.print(f"[red]❌ Session create failed: {e}[/red]") + raise typer.Exit(1) + if sr.status_code not in (200, 400, 409): + console.print( + f"[red]❌ Session create HTTP {sr.status_code}: {sr.text[:300]}[/red]" + ) + raise typer.Exit(1) + + body = { + "app_name": app_name, + "user_id": user_id, + "session_id": session_id, + "new_message": {"role": "user", "parts": [{"text": prompt}]}, + "streaming": True, + } + try: + resp = requests.post( + f"{base_url}/run_sse", + json=body, + headers=headers, + timeout=300, + stream=True, + ) + except requests.RequestException as e: + console.print(f"[red]❌ run_sse request failed: {e}[/red]") + raise typer.Exit(1) + if resp.status_code != 200: + console.print( + f"[red]❌ run_sse HTTP {resp.status_code}: {resp.text[:500]}[/red]" + ) + raise typer.Exit(1) + + console.print("[cyan]📝 Response:[/cyan]") + + def _answer_text(event: dict) -> str: + # Answer text parts only; drop model "thought" (reasoning) parts. + parts = (event.get("content") or {}).get("parts") or [] + return "".join( + p["text"] + for p in parts + if isinstance(p, dict) and p.get("text") and not p.get("thought") + ) + + streamed = [] + final_answer = "" + for line in resp.iter_lines(decode_unicode=True): + if not line: + continue + if raw: + print(line) # builtin print: no rich wrapping/markup on raw JSON + continue + event = _normalize_stream_event(line) + if not isinstance(event, dict): + continue + if event.get("error"): + console.print(f"\n[red]Error: {event['error']}[/red]") + continue + answer = _answer_text(event) + # `partial=False` is the final aggregate (repeats everything) — keep it as + # a fallback but don't print it; the partial events stream answer deltas. + if event.get("partial") is False: + final_answer = answer or final_answer + continue + if answer: + console.print(answer, end="", style="green") + streamed.append(answer) + + if not streamed and final_answer: + console.print(final_answer, end="", style="green") + console.print("") + return "".join(streamed) or final_answer + + @invoke_app.command("harness") def harness_command( name: str = typer.Argument( @@ -675,7 +781,12 @@ def harness_command( help="Bearer token override (e.g. OAuth JWT for custom_jwt harnesses).", ), raw: bool = typer.Option( - False, "--raw", help="Print the raw InvokeHarnessResponse JSON." + False, "--raw", help="Print the raw response (InvokeHarnessResponse / SSE)." + ), + protocol: str = typer.Option( + "invoke", + "--protocol", + help="Transport: 'invoke' (POST /harness/invoke) or 'run_sse' (ADK /run_sse).", ), ) -> Any: """Invoke a deployed harness by name (resolved via the harness.json registry). @@ -712,7 +823,13 @@ def harness_command( ) raise typer.Exit(1) - invoke_url = entry["url"].rstrip("/") + "/harness/invoke" + if protocol not in ("invoke", "run_sse"): + console.print( + f"[red]Error: --protocol must be 'invoke' or 'run_sse', got '{protocol}'.[/red]" + ) + raise typer.Exit(1) + + base_url = entry["url"].rstrip("/") # Inbound credential selection is auth-type aware — the registry records how each # harness was deployed (harness/deploy.py _record_harness): a custom_jwt harness is @@ -722,6 +839,7 @@ def harness_command( # the data plane's JWT path; refresh failure errors (re-login), never silent. # 3. key_auth harness → the static "key"; a key_auth authorizer would reject a JWT, # so we never force the id_token onto it. + # Both transports (invoke and run_sse) share this resolution. from agentkit.auth.errors import AuthError from agentkit.auth.sso import load_session @@ -744,6 +862,23 @@ def harness_command( if not token: token = entry.get("key") or "" + if protocol == "run_sse": + # run_sse hits the base agent over ADK's streaming endpoint; the + # /harness/invoke-only knobs below do not apply. + if any(v is not None for v in (system_prompt, model_name, tools, skills, runtime, max_llm_calls)): + console.print( + "[yellow]Note: overrides / --max-llm-calls are ignored with " + "--protocol run_sse (it invokes the base agent).[/yellow]" + ) + return _harness_run_sse( + base_url=base_url, + token=token, + prompt=message, + session_id=session_id, + raw=raw, + ) + + invoke_url = base_url + "/harness/invoke" req_headers = {"Content-Type": "application/json"} if token: req_headers["Authorization"] = f"Bearer {token}" diff --git a/tests/toolkit/cli/test_cli_invoke_harness.py b/tests/toolkit/cli/test_cli_invoke_harness.py index 7d08315..aece638 100644 --- a/tests/toolkit/cli/test_cli_invoke_harness.py +++ b/tests/toolkit/cli/test_cli_invoke_harness.py @@ -386,3 +386,67 @@ def fake_post(url, json=None, headers=None, timeout=None): assert result.exit_code == 1 assert len(calls) == 2 # original + exactly one refresh-retry, no third assert "HTTP 401" in result.output + + +# --- run_sse protocol -------------------------------------------------------- + + +def test_harness_run_sse_streams_answer(tmp_path, monkeypatch): + _write_registry(tmp_path, {"first": {"url": "https://x", "key": "ak"}}) + calls = [] + sse_lines = [ + 'data: {"content":{"parts":[{"text":"KI"}],"role":"model"},"partial":true}', + 'data: {"content":{"parts":[{"text":"WI"}],"role":"model"},"partial":true}', + # final aggregate repeats everything (incl. a thought part); must be skipped + 'data: {"content":{"parts":[{"text":"reasoning","thought":true},' + '{"text":"KIWI"}],"role":"model"},"partial":false}', + ] + + class _SSEResp: + status_code = 200 + text = "" + + def iter_lines(self, decode_unicode=False): + return iter(sse_lines) + + def fake_post(url, json=None, headers=None, timeout=None, stream=False): + calls.append({"url": url, "json": json}) + return _SSEResp() if url.endswith("/run_sse") else _FakeResponse({}, 200) + + monkeypatch.setattr("requests.post", fake_post) + + result = _run_harness( + [ + "first", + "hello", + "--directory", + str(tmp_path), + "--protocol", + "run_sse", + "--session-id", + "s-1", + ] + ) + assert result.exit_code == 0, result.output + # Answer streamed from the deltas; thought text and the final aggregate are + # not double-printed. + assert "KIWI" in result.output + assert "reasoning" not in result.output + + run_call = next(c for c in calls if c["url"].endswith("/run_sse")) + assert run_call["url"] == "https://x/run_sse" + assert run_call["json"]["app_name"] == "harness" # fixed + assert run_call["json"]["session_id"] == "s-1" # caller-provided + assert run_call["json"]["user_id"].startswith("u-") # random, CLI-generated + + sess_call = next(c for c in calls if "/sessions/" in c["url"]) + assert sess_call["url"] == "https://x/apps/harness/users/" + run_call["json"]["user_id"] + "/sessions/s-1" + + +def test_harness_invalid_protocol_fails(tmp_path): + _write_registry(tmp_path, {"first": {"url": "https://x", "key": "ak"}}) + result = _run_harness( + ["first", "hi", "--directory", str(tmp_path), "--protocol", "bogus"] + ) + assert result.exit_code == 1 + assert "must be 'invoke' or 'run_sse'" in result.output From 3c21de4cdce20b8322dc4825ede971ab9833e262 Mon Sep 17 00:00:00 2001 From: "fangyaozheng@bytedance.com" Date: Thu, 18 Jun 2026 14:07:51 +0800 Subject: [PATCH 2/7] feat(invoke harness): send overrides on the run_sse path too The harness /run_sse now honors once-time overrides (spawned agent), so the CLI's run_sse mode sends --system-prompt/--model-name/--tools/--skills/--runtime as the 'harness' field. Only --max-llm-calls remains invoke-only. Base (no-override) run_sse is unchanged. Co-Authored-By: Claude Opus 4.8 --- agentkit/toolkit/cli/cli_invoke.py | 30 +++++++++----- tests/toolkit/cli/test_cli_invoke_harness.py | 42 ++++++++++++++++++++ 2 files changed, 63 insertions(+), 9 deletions(-) diff --git a/agentkit/toolkit/cli/cli_invoke.py b/agentkit/toolkit/cli/cli_invoke.py index 99ea2ae..8dc8d45 100644 --- a/agentkit/toolkit/cli/cli_invoke.py +++ b/agentkit/toolkit/cli/cli_invoke.py @@ -634,14 +634,20 @@ def build_harness_overrides( def _harness_run_sse( - *, base_url: str, token: str, prompt: str, session_id: str, raw: bool + *, + base_url: str, + token: str, + prompt: str, + session_id: str, + overrides: dict, + raw: bool, ) -> Any: """Invoke a deployed harness via the ADK ``/run_sse`` endpoint (streaming). app_name is the fixed ``"harness"``; user_id is a freshly generated random id (a temporary placeholder until real identity wiring); session_id is the - caller's. Per-call harness overrides are not supported on this path — it hits - the base agent. + caller's. When ``overrides`` is non-empty it is sent as the ``harness`` field + so the runtime streams a spawned (overridden) agent; otherwise the base agent. """ import requests @@ -668,13 +674,16 @@ def _harness_run_sse( ) raise typer.Exit(1) - body = { + body: dict[str, Any] = { "app_name": app_name, "user_id": user_id, "session_id": session_id, "new_message": {"role": "user", "parts": [{"text": prompt}]}, "streaming": True, } + if overrides: + body["harness"] = overrides + console.print(f"[blue]Using one-time overrides: {overrides}[/blue]") try: resp = requests.post( f"{base_url}/run_sse", @@ -863,18 +872,21 @@ def harness_command( token = entry.get("key") or "" if protocol == "run_sse": - # run_sse hits the base agent over ADK's streaming endpoint; the - # /harness/invoke-only knobs below do not apply. - if any(v is not None for v in (system_prompt, model_name, tools, skills, runtime, max_llm_calls)): + # run_sse supports the same overrides (sent as the `harness` field); only + # --max-llm-calls is invoke-only (not part of the ADK run_sse request). + if max_llm_calls is not None: console.print( - "[yellow]Note: overrides / --max-llm-calls are ignored with " - "--protocol run_sse (it invokes the base agent).[/yellow]" + "[yellow]Note: --max-llm-calls is ignored with --protocol " + "run_sse.[/yellow]" ) return _harness_run_sse( base_url=base_url, token=token, prompt=message, session_id=session_id, + overrides=build_harness_overrides( + system_prompt, model_name, tools, skills, runtime + ), raw=raw, ) diff --git a/tests/toolkit/cli/test_cli_invoke_harness.py b/tests/toolkit/cli/test_cli_invoke_harness.py index aece638..22f352f 100644 --- a/tests/toolkit/cli/test_cli_invoke_harness.py +++ b/tests/toolkit/cli/test_cli_invoke_harness.py @@ -438,11 +438,53 @@ def fake_post(url, json=None, headers=None, timeout=None, stream=False): assert run_call["json"]["app_name"] == "harness" # fixed assert run_call["json"]["session_id"] == "s-1" # caller-provided assert run_call["json"]["user_id"].startswith("u-") # random, CLI-generated + assert "harness" not in run_call["json"] # no overrides passed sess_call = next(c for c in calls if "/sessions/" in c["url"]) assert sess_call["url"] == "https://x/apps/harness/users/" + run_call["json"]["user_id"] + "/sessions/s-1" +def test_harness_run_sse_sends_overrides(tmp_path, monkeypatch): + _write_registry(tmp_path, {"first": {"url": "https://x", "key": "ak"}}) + calls = [] + sse = ['data: {"content":{"parts":[{"text":"PINEAPPLE"}]},"partial":true}'] + + class _SSEResp: + status_code = 200 + text = "" + + def iter_lines(self, decode_unicode=False): + return iter(sse) + + def fake_post(url, json=None, headers=None, timeout=None, stream=False): + calls.append({"url": url, "json": json}) + return _SSEResp() if url.endswith("/run_sse") else _FakeResponse({}, 200) + + monkeypatch.setattr("requests.post", fake_post) + + result = _run_harness( + [ + "first", + "x", + "--directory", + str(tmp_path), + "--protocol", + "run_sse", + "--system-prompt", + "Reply PINEAPPLE.", + "--tools", + "web_search", + ] + ) + assert result.exit_code == 0, result.output + run_call = next(c for c in calls if c["url"].endswith("/run_sse")) + # Overrides are sent as the `harness` field (mirrors HarnessOverrides shape). + assert run_call["json"]["harness"] == { + "system_prompt": "Reply PINEAPPLE.", + "tools": "web_search", + } + + def test_harness_invalid_protocol_fails(tmp_path): _write_registry(tmp_path, {"first": {"url": "https://x", "key": "ak"}}) result = _run_harness( From ad9f7e82e4534ee48083847723f726fd1d31fd8c Mon Sep 17 00:00:00 2001 From: "fangyaozheng@bytedance.com" Date: Thu, 18 Jun 2026 22:26:36 +0800 Subject: [PATCH 3/7] feat(invoke harness): derive run_sse user_id from JWT sub The run_sse path used a random u- user_id. When the bearer token is an OIDC id_token (custom_jwt harness), decode its sub claim and use it as user_id so sessions tie to the authenticated identity; fall back to random for opaque key_auth tokens. Co-Authored-By: Claude Opus 4.8 --- agentkit/toolkit/cli/cli_invoke.py | 29 +++++++++- tests/toolkit/cli/test_cli_invoke_harness.py | 59 ++++++++++++++++++++ 2 files changed, 86 insertions(+), 2 deletions(-) diff --git a/agentkit/toolkit/cli/cli_invoke.py b/agentkit/toolkit/cli/cli_invoke.py index 8dc8d45..b00ae7d 100644 --- a/agentkit/toolkit/cli/cli_invoke.py +++ b/agentkit/toolkit/cli/cli_invoke.py @@ -16,6 +16,8 @@ from pathlib import Path from typing import Optional, Any +import base64 +import binascii import json import typer from typer.core import TyperGroup @@ -633,6 +635,27 @@ def build_harness_overrides( _HARNESS_RUN_SSE_APP = "harness" +def _user_id_from_token(token: str) -> str | None: + """Return the OIDC ``sub`` claim from a JWT bearer token, else ``None``. + + A custom_jwt harness is called with an OIDC id_token whose ``sub`` is the + authenticated user's stable id — use it as the run's user_id so sessions are + tied to the real identity. A key_auth token is an opaque api key (not a JWT, + no ``sub``), so this returns ``None`` and the caller falls back to a random id. + """ + parts = token.split(".") + if len(parts) != 3: + return None # not a JWT (e.g. a key_auth api key) + payload = parts[1] + payload += "=" * (-len(payload) % 4) # restore base64 padding + try: + claims = json.loads(base64.urlsafe_b64decode(payload)) + except (ValueError, binascii.Error): + return None # malformed JWT payload → fall back to random + sub = claims.get("sub") + return sub if isinstance(sub, str) and sub else None + + def _harness_run_sse( *, base_url: str, @@ -652,12 +675,14 @@ def _harness_run_sse( import requests app_name = _HARNESS_RUN_SSE_APP - user_id = f"u-{uuid.uuid4().hex[:12]}" + sub = _user_id_from_token(token) + user_id = sub or f"u-{uuid.uuid4().hex[:12]}" + user_id_origin = "jwt sub" if sub else "random" headers = {"Content-Type": "application/json"} if token: headers["Authorization"] = f"Bearer {token}" console.print( - f"[blue]run_sse: app_name={app_name}, user_id={user_id} (random), " + f"[blue]run_sse: app_name={app_name}, user_id={user_id} ({user_id_origin}), " f"session_id={session_id}[/blue]" ) diff --git a/tests/toolkit/cli/test_cli_invoke_harness.py b/tests/toolkit/cli/test_cli_invoke_harness.py index 22f352f..d8da20d 100644 --- a/tests/toolkit/cli/test_cli_invoke_harness.py +++ b/tests/toolkit/cli/test_cli_invoke_harness.py @@ -485,6 +485,65 @@ def fake_post(url, json=None, headers=None, timeout=None, stream=False): } +def _make_jwt(sub): + """Build an unsigned JWT whose payload carries the given ``sub`` claim.""" + import base64 + + def _seg(obj): + raw = json.dumps(obj).encode() + return base64.urlsafe_b64encode(raw).rstrip(b"=").decode() + + return f"{_seg({'alg': 'none'})}.{_seg({'sub': sub})}.sig" + + +def test_run_sse_user_id_from_jwt_sub(tmp_path, monkeypatch): + """With a JWT bearer token, user_id is the token's ``sub`` (not random).""" + _write_registry(tmp_path, {"first": {"url": "https://x", "key": "ak"}}) + calls = [] + sse = ['data: {"content":{"parts":[{"text":"OK"}]},"partial":true}'] + + class _SSEResp: + status_code = 200 + text = "" + + def iter_lines(self, decode_unicode=False): + return iter(sse) + + def fake_post(url, json=None, headers=None, timeout=None, stream=False): + calls.append({"url": url, "json": json}) + return _SSEResp() if url.endswith("/run_sse") else _FakeResponse({}, 200) + + monkeypatch.setattr("requests.post", fake_post) + + token = _make_jwt("user-abc-123") + result = _run_harness( + [ + "first", + "hi", + "--directory", + str(tmp_path), + "--protocol", + "run_sse", + "--session-id", + "s-1", + "--apikey", + token, + ] + ) + assert result.exit_code == 0, result.output + run_call = next(c for c in calls if c["url"].endswith("/run_sse")) + assert run_call["json"]["user_id"] == "user-abc-123" + # Session is created under the same (sub-derived) user_id. + sess_call = next(c for c in calls if "/sessions/" in c["url"]) + assert sess_call["url"] == "https://x/apps/harness/users/user-abc-123/sessions/s-1" + + +def test_user_id_from_token_helper(): + assert cli_invoke._user_id_from_token(_make_jwt("u-42")) == "u-42" + assert cli_invoke._user_id_from_token("opaque-api-key") is None + assert cli_invoke._user_id_from_token("") is None + + def test_harness_invalid_protocol_fails(tmp_path): _write_registry(tmp_path, {"first": {"url": "https://x", "key": "ak"}}) result = _run_harness( From d67cea2a3933ecacb68efffc104d31f682f91450 Mon Sep 17 00:00:00 2001 From: "fangyaozheng@bytedance.com" Date: Thu, 18 Jun 2026 22:55:15 +0800 Subject: [PATCH 4/7] feat(invoke harness): default --protocol to run_sse, mint random session run_sse is now the default transport for `agentkit invoke harness`; pass --protocol invoke for the /harness/invoke path. When --session-id is unset the run_sse path mints a random s- (creating it is idempotent). Existing invoke-path tests pin --protocol invoke via a _run_invoke helper. Co-Authored-By: Claude Opus 4.8 --- agentkit/toolkit/cli/cli_invoke.py | 20 +++--- tests/toolkit/cli/test_cli_invoke_harness.py | 67 ++++++++++++++++---- 2 files changed, 65 insertions(+), 22 deletions(-) diff --git a/agentkit/toolkit/cli/cli_invoke.py b/agentkit/toolkit/cli/cli_invoke.py index b00ae7d..8a1eb68 100644 --- a/agentkit/toolkit/cli/cli_invoke.py +++ b/agentkit/toolkit/cli/cli_invoke.py @@ -667,10 +667,10 @@ def _harness_run_sse( ) -> Any: """Invoke a deployed harness via the ADK ``/run_sse`` endpoint (streaming). - app_name is the fixed ``"harness"``; user_id is a freshly generated random id - (a temporary placeholder until real identity wiring); session_id is the - caller's. When ``overrides`` is non-empty it is sent as the ``harness`` field - so the runtime streams a spawned (overridden) agent; otherwise the base agent. + app_name is the fixed ``"harness"``; user_id is the JWT ``sub`` when the token + is an OIDC id_token, else a random id; session_id is the caller's. When + ``overrides`` is non-empty it is sent as the ``harness`` field so the runtime + streams a spawned (overridden) agent; otherwise the base agent. """ import requests @@ -780,7 +780,9 @@ def harness_command( "agentkit_user", "--user-id", help="user_id for the run." ), session_id: str = typer.Option( - "agentkit_sample_session", "--session-id", help="session_id for the run." + None, + "--session-id", + help="session_id for the run (run_sse: random if unset).", ), max_llm_calls: int = typer.Option( None, @@ -818,9 +820,9 @@ def harness_command( False, "--raw", help="Print the raw response (InvokeHarnessResponse / SSE)." ), protocol: str = typer.Option( - "invoke", + "run_sse", "--protocol", - help="Transport: 'invoke' (POST /harness/invoke) or 'run_sse' (ADK /run_sse).", + help="Transport: 'run_sse' (ADK /run_sse, default) or 'invoke' (POST /harness/invoke).", ), ) -> Any: """Invoke a deployed harness by name (resolved via the harness.json registry). @@ -908,13 +910,15 @@ def harness_command( base_url=base_url, token=token, prompt=message, - session_id=session_id, + # No session given → mint a random one; creating it is idempotent. + session_id=session_id or f"s-{uuid.uuid4().hex[:12]}", overrides=build_harness_overrides( system_prompt, model_name, tools, skills, runtime ), raw=raw, ) + session_id = session_id or "agentkit_sample_session" invoke_url = base_url + "/harness/invoke" req_headers = {"Content-Type": "application/json"} if token: diff --git a/tests/toolkit/cli/test_cli_invoke_harness.py b/tests/toolkit/cli/test_cli_invoke_harness.py index d8da20d..bcdd581 100644 --- a/tests/toolkit/cli/test_cli_invoke_harness.py +++ b/tests/toolkit/cli/test_cli_invoke_harness.py @@ -44,6 +44,15 @@ def _run_harness(args): return runner.invoke(app, ["invoke", "harness", *args]) +def _run_invoke(args): + """Run the harness subcommand pinned to the ``invoke`` transport. + + ``--protocol`` now defaults to ``run_sse``; these tests exercise the + ``/harness/invoke`` path, so they request it explicitly. + """ + return _run_harness([*args, "--protocol", "invoke"]) + + class _FakeResponse: def __init__(self, payload, status_code=200): self._payload = payload @@ -96,13 +105,13 @@ def test_build_harness_overrides_empty_when_unset(): def test_unknown_harness_fails(tmp_path): _write_registry(tmp_path, {"other": {"url": "https://x", "key": "k"}}) - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 1 assert "not found in registry" in result.output def test_no_registry_fails(tmp_path): - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 1 assert "not found in registry" in result.output @@ -122,7 +131,7 @@ def test_harness_invoke_posts_correct_request(tmp_path, monkeypatch): "output": "PINEAPPLE", }) - result = _run_harness( + result = _run_invoke( [ "first", "What should you reply?", @@ -155,7 +164,7 @@ def test_harness_invoke_no_overrides_omits_harness_key(tmp_path, monkeypatch): captured = {} _patch_post(monkeypatch, captured) - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 0, result.output assert "harness" not in captured["json"] @@ -176,7 +185,7 @@ def test_harness_error_field_is_surfaced(tmp_path, monkeypatch): }, ) - result = _run_harness( + result = _run_invoke( ["first", "hi", "--directory", str(tmp_path), "--tools", "bogus"] ) assert result.exit_code == 1 @@ -189,7 +198,7 @@ def test_harness_invoke_http_error_fails(tmp_path, monkeypatch): captured = {} _patch_post(monkeypatch, captured, payload={"detail": "boom"}, status_code=500) - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 1 assert "HTTP 500" in result.output @@ -199,7 +208,7 @@ def test_apikey_overrides_registry_key(tmp_path, monkeypatch): captured = {} _patch_post(monkeypatch, captured) - result = _run_harness( + result = _run_invoke( ["first", "hi", "--directory", str(tmp_path), "--apikey", "jwt-token"] ) assert result.exit_code == 0, result.output @@ -282,7 +291,7 @@ def test_harness_invoke_uses_login_id_token_as_bearer(monkeypatch, tmp_path): _write_registry(tmp_path, {"first": {"url": "https://h.example.com"}}) # no static key captured = {} _patch_post(monkeypatch, captured) - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 0, result.output assert captured["headers"].get("Authorization") == f"Bearer {tok}" @@ -292,7 +301,7 @@ def test_harness_invoke_apikey_overrides_login_id_token(monkeypatch, tmp_path): _write_registry(tmp_path, {"first": {"url": "https://h.example.com"}}) captured = {} _patch_post(monkeypatch, captured) - result = _run_harness(["first", "hi", "--directory", str(tmp_path), "--apikey", "explicit-key"]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path), "--apikey", "explicit-key"]) assert result.exit_code == 0, result.output assert captured["headers"].get("Authorization") == "Bearer explicit-key" @@ -317,7 +326,7 @@ def fake_post(url, json=None, headers=None, timeout=None): return _FakeResponse({"output": "ok"}, 401 if len(calls) == 1 else 200) monkeypatch.setattr("requests.post", fake_post) - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 0, result.output assert calls == [f"Bearer {tok1}", f"Bearer {tok2}"] # one refresh + one retry @@ -334,7 +343,7 @@ def boom(self, rt): raise AuthError("token endpoint rejected the request") monkeypatch.setattr(sess_mod.OAuthClient, "refresh", boom) - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 1 assert "login" in result.output.lower() @@ -346,7 +355,7 @@ def test_harness_invoke_keyauth_uses_key_even_when_logged_in(monkeypatch, tmp_pa _write_registry(tmp_path, {"first": {"url": "https://h.example.com", "key": "ak", "runtime_id": "r-1"}}) captured = {} _patch_post(monkeypatch, captured) - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 0, result.output assert captured["headers"].get("Authorization") == "Bearer ak" @@ -360,7 +369,7 @@ def test_harness_invoke_custom_jwt_entry_uses_login_id_token(monkeypatch, tmp_pa }}) captured = {} _patch_post(monkeypatch, captured) - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 0, result.output assert captured["headers"].get("Authorization") == f"Bearer {tok}" @@ -382,7 +391,7 @@ def fake_post(url, json=None, headers=None, timeout=None): return _FakeResponse({"detail": "denied"}, 401) monkeypatch.setattr("requests.post", fake_post) - result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + result = _run_invoke(["first", "hi", "--directory", str(tmp_path)]) assert result.exit_code == 1 assert len(calls) == 2 # original + exactly one refresh-retry, no third assert "HTTP 401" in result.output @@ -544,6 +553,36 @@ def test_user_id_from_token_helper(): assert cli_invoke._user_id_from_token("") is None +def test_default_protocol_is_run_sse_with_random_session(tmp_path, monkeypatch): + """No --protocol and no --session-id → run_sse with a freshly minted session.""" + _write_registry(tmp_path, {"first": {"url": "https://x", "key": "ak"}}) + calls = [] + sse = ['data: {"content":{"parts":[{"text":"OK"}]},"partial":true}'] + + class _SSEResp: + status_code = 200 + text = "" + + def iter_lines(self, decode_unicode=False): + return iter(sse) + + def fake_post(url, json=None, headers=None, timeout=None, stream=False): + calls.append({"url": url, "json": json}) + return _SSEResp() if url.endswith("/run_sse") else _FakeResponse({}, 200) + + monkeypatch.setattr("requests.post", fake_post) + + result = _run_harness(["first", "hi", "--directory", str(tmp_path)]) + assert result.exit_code == 0, result.output + # Default transport is run_sse (not /harness/invoke). + run_call = next(c for c in calls if c["url"].endswith("/run_sse")) + sid = run_call["json"]["session_id"] + assert sid.startswith("s-") # random, not the invoke-path default + # The session is created (idempotently) before the run, under that id. + sess_call = next(c for c in calls if "/sessions/" in c["url"]) + assert sess_call["url"].endswith(f"/sessions/{sid}") + + def test_harness_invalid_protocol_fails(tmp_path): _write_registry(tmp_path, {"first": {"url": "https://x", "key": "ak"}}) result = _run_harness( From ee344f8277053d91f557d0892d5a39e60a2116b1 Mon Sep 17 00:00:00 2001 From: "fangyaozheng@bytedance.com" Date: Thu, 18 Jun 2026 22:57:56 +0800 Subject: [PATCH 5/7] docs(harness): document run_sse default, JWT-sub user_id, auto-auth Update the invoke harness section: run_sse is now the default transport, run_sse user_id comes from the JWT sub (or random), --session-id is minted randomly when unset (idempotent create), and auth auto-loads the agentkit login id_token for custom_jwt harnesses (no -ak needed) for both transports. Co-Authored-By: Claude Opus 4.8 --- docs/content/2.agentkit-cli/5.harness.md | 34 ++++++++++++++---------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/docs/content/2.agentkit-cli/5.harness.md b/docs/content/2.agentkit-cli/5.harness.md index 33e4761..e569a6f 100644 --- a/docs/content/2.agentkit-cli/5.harness.md +++ b/docs/content/2.agentkit-cli/5.harness.md @@ -160,46 +160,52 @@ agentkit deploy --harness my-harness --region cn-beijing --yes `agentkit invoke harness "" [options]` 在 `--directory`(默认当前目录)下的 `harness.json` 注册表中按名查找运行时并发起调用。通过 `--protocol` 选择传输方式: -- `invoke`(默认):`POST /harness/invoke`,**非流式**,返回完整结果(含错误透传)。 -- `run_sse`:调用 ADK 的 `/run_sse`,**流式**返回。其 `app_name` 固定为 `harness`,`user_id` 由 CLI 随机生成(临时占位),`session_id` 取 `--session-id`。 +- `run_sse`(**默认**):调用 ADK 的 `/run_sse`,**流式**返回。其 `app_name` 固定为 `harness`;`user_id` 在携带 JWT(custom_jwt)时取其 `sub` 声明(与登录身份绑定),否则随机生成;`session_id` 取 `--session-id`,未指定时随机生成 `s-`。调用前会创建该 session(**幂等**:已存在也不影响)。 +- `invoke`:`POST /harness/invoke`,**非流式**,返回完整结果(含错误透传)。 两种方式都支持一次性覆写(`--system-prompt` / `--model-name` / `--tools` / `--skills` / `--runtime`);覆写为**叠加在本次调用上的临时值**,不写回 spec、不改动运行时(工具 / 技能为增量加入)。 +**鉴权(两种传输方式统一)**:`--apikey/-ak` 显式传入时优先;否则对 **custom_jwt** 的 Harness 自动读取 `agentkit login` 会话中的 OIDC id_token 作为 Bearer(自动续期,401 时强制刷新并重试一次)——因此 custom_jwt 的 Harness **无需手动传 `-ak`**;对 **key_auth** 的 Harness 使用注册表中的静态 `key`(key_auth 鉴权器会拒绝 JWT,故不会强塞 id_token)。 + | 参数 | 说明 | 默认值 / 取值 | | :--- | :--- | :--- | | `` | **(必填)** 已部署的 Harness 名称(注册表键)。 | — | | `` | **(必填)** 发送给 Harness 的提示词。 | — | -| `--protocol` | 传输方式。 | `invoke`(默认)/ `run_sse` | +| `--protocol` | 传输方式。 | `run_sse`(默认)/ `invoke` | | `--directory` | `harness.json` 注册表所在目录。 | 当前目录 | -| `--user-id` | 运行的 user_id(仅 `invoke`;`run_sse` 由 CLI 随机生成)。 | `agentkit_user` | -| `--session-id` | 运行的 session_id。 | `agentkit_sample_session` | +| `--user-id` | 运行的 user_id(仅 `invoke`;`run_sse` 取 JWT `sub` 或随机生成)。 | `agentkit_user` | +| `--session-id` | 运行的 session_id。 | `run_sse` 未指定时随机 `s-`;`invoke` 为 `agentkit_sample_session` | | `--max-llm-calls` | 本次调用的最大 LLM 调用次数(**仅 `invoke`**)。 | 不限 | | `--system-prompt` | 覆写系统提示词。 | — | | `--model-name` | 覆写模型名。 | — | | `--tools` | 覆写工具(逗号分隔,增量加入)。 | — | | `--skills` | 覆写技能(逗号分隔,增量加入)。 | — | | `--runtime` | 覆写运行时后端。 | `adk` / `codex` | -| `--apikey`, `-ak` | Bearer Token 覆写(如 custom_jwt 的用户 JWT)。 | 注册表 `key` | +| `--apikey`, `-ak` | Bearer Token 覆写(如 custom_jwt 的用户 JWT)。 | 自动鉴权或注册表 `key` | | `--raw` | 打印原始响应(`InvokeHarnessResponse` / SSE)。 | `false` | > `--max-llm-calls` 不属于 ADK `run_sse` 请求,故仅在 `invoke` 模式生效;在 `run_sse` 模式传入会被忽略并提示。 ```bash -# 基础调用(invoke,默认) +# 基础调用(run_sse,默认,流式) agentkit invoke harness my-harness "今天杭州天气如何?" -# 一次性覆写系统提示词(invoke) +# 一次性覆写系统提示词 agentkit invoke harness my-harness "2+2=?" --system-prompt "请简洁作答。" -# 限制本次最大 LLM 调用次数(仅 invoke) -agentkit invoke harness my-harness "规划一次多步研究任务。" --max-llm-calls 10 +# 指定 session(未指定则随机生成;同名 session 幂等复用以延续上下文) +agentkit invoke harness my-harness "继续上文" --session-id sess-001 -# 流式调用(run_sse),并覆写模型与工具 +# 覆写模型与工具 agentkit invoke harness my-harness "帮我查一下杭州天气" \ - --protocol run_sse --model-name doubao-seed-1-6-250615 --tools web_search,web_fetch \ - --session-id sess-001 + --model-name doubao-seed-1-6-250615 --tools web_search,web_fetch + +# 改用 invoke(非流式 /harness/invoke),并限制最大 LLM 调用次数 +agentkit invoke harness my-harness "规划一次多步研究任务。" --protocol invoke --max-llm-calls 10 -# OAuth(custom_jwt)的 Harness:携带用户池签发的 JWT +# OAuth(custom_jwt)的 Harness:默认自动使用 `agentkit login` 的 JWT,无需 -ak +agentkit invoke harness my-harness "你好" +# 如需覆写凭证可显式传入 agentkit invoke harness my-harness "你好" -ak "" ``` From d3be4c6fbe208cde64aef38094f0843cb04cbe39 Mon Sep 17 00:00:00 2001 From: "fangyaozheng@bytedance.com" Date: Thu, 18 Jun 2026 23:06:13 +0800 Subject: [PATCH 6/7] feat(invoke harness): unify random session for both protocols; render reasoning MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - invoke now mints a random s- session when --session-id is unset, matching run_sse (session handling is identical across transports). - run_sse renders the model's reasoning (thought parts) dim/grey under a '🤔 思考中…' header so the wait isn't blank, then the answer under '📝 Response:'. - docs updated. Co-Authored-By: Claude Opus 4.8 --- agentkit/toolkit/cli/cli_invoke.py | 46 +++++++++++----- docs/content/2.agentkit-cli/5.harness.md | 56 +++++++++++++++++++- tests/toolkit/cli/test_cli_invoke_harness.py | 34 ++++++++++++ 3 files changed, 120 insertions(+), 16 deletions(-) diff --git a/agentkit/toolkit/cli/cli_invoke.py b/agentkit/toolkit/cli/cli_invoke.py index 8a1eb68..3943949 100644 --- a/agentkit/toolkit/cli/cli_invoke.py +++ b/agentkit/toolkit/cli/cli_invoke.py @@ -726,19 +726,23 @@ def _harness_run_sse( ) raise typer.Exit(1) - console.print("[cyan]📝 Response:[/cyan]") - - def _answer_text(event: dict) -> str: - # Answer text parts only; drop model "thought" (reasoning) parts. + def _split_parts(event: dict) -> tuple[str, str]: + # Separate the model's answer text from its "thought" (reasoning) text. parts = (event.get("content") or {}).get("parts") or [] - return "".join( - p["text"] - for p in parts - if isinstance(p, dict) and p.get("text") and not p.get("thought") - ) + answer = thought = "" + for p in parts: + if not (isinstance(p, dict) and p.get("text")): + continue + if p.get("thought"): + thought += p["text"] + else: + answer += p["text"] + return answer, thought streamed = [] final_answer = "" + thinking_open = False # streaming reasoning under the "🤔 思考中" header + answer_open = False # the "📝 Response:" header has been printed for line in resp.iter_lines(decode_unicode=True): if not line: continue @@ -751,17 +755,29 @@ def _answer_text(event: dict) -> str: if event.get("error"): console.print(f"\n[red]Error: {event['error']}[/red]") continue - answer = _answer_text(event) + answer, thought = _split_parts(event) # `partial=False` is the final aggregate (repeats everything) — keep it as # a fallback but don't print it; the partial events stream answer deltas. if event.get("partial") is False: final_answer = answer or final_answer continue + # Reasoning streams dim/grey under its own header, so the wait isn't blank. + if thought and not answer_open: + if not thinking_open: + console.print("[dim]🤔 思考中…[/dim]") + thinking_open = True + console.print(thought, end="", style="dim") if answer: + if not answer_open: + if thinking_open: + console.print("") # close the reasoning block + console.print("[cyan]📝 Response:[/cyan]") + answer_open = True console.print(answer, end="", style="green") streamed.append(answer) if not streamed and final_answer: + console.print("[cyan]📝 Response:[/cyan]") console.print(final_answer, end="", style="green") console.print("") return "".join(streamed) or final_answer @@ -782,7 +798,7 @@ def harness_command( session_id: str = typer.Option( None, "--session-id", - help="session_id for the run (run_sse: random if unset).", + help="session_id for the run (random if unset).", ), max_llm_calls: int = typer.Option( None, @@ -898,6 +914,10 @@ def harness_command( if not token: token = entry.get("key") or "" + # No session given → mint a random one (both transports behave identically; + # creating it is idempotent). + session_id = session_id or f"s-{uuid.uuid4().hex[:12]}" + if protocol == "run_sse": # run_sse supports the same overrides (sent as the `harness` field); only # --max-llm-calls is invoke-only (not part of the ADK run_sse request). @@ -910,15 +930,13 @@ def harness_command( base_url=base_url, token=token, prompt=message, - # No session given → mint a random one; creating it is idempotent. - session_id=session_id or f"s-{uuid.uuid4().hex[:12]}", + session_id=session_id, overrides=build_harness_overrides( system_prompt, model_name, tools, skills, runtime ), raw=raw, ) - session_id = session_id or "agentkit_sample_session" invoke_url = base_url + "/harness/invoke" req_headers = {"Content-Type": "application/json"} if token: diff --git a/docs/content/2.agentkit-cli/5.harness.md b/docs/content/2.agentkit-cli/5.harness.md index e569a6f..03aefe1 100644 --- a/docs/content/2.agentkit-cli/5.harness.md +++ b/docs/content/2.agentkit-cli/5.harness.md @@ -160,9 +160,11 @@ agentkit deploy --harness my-harness --region cn-beijing --yes `agentkit invoke harness "" [options]` 在 `--directory`(默认当前目录)下的 `harness.json` 注册表中按名查找运行时并发起调用。通过 `--protocol` 选择传输方式: -- `run_sse`(**默认**):调用 ADK 的 `/run_sse`,**流式**返回。其 `app_name` 固定为 `harness`;`user_id` 在携带 JWT(custom_jwt)时取其 `sub` 声明(与登录身份绑定),否则随机生成;`session_id` 取 `--session-id`,未指定时随机生成 `s-`。调用前会创建该 session(**幂等**:已存在也不影响)。 +- `run_sse`(**默认**):调用 ADK 的 `/run_sse`,**流式**返回。其 `app_name` 固定为 `harness`;`user_id` 在携带 JWT(custom_jwt)时取其 `sub` 声明(与登录身份绑定),否则随机生成。渲染上回答正文以绿色逐字追加显示,模型推理(thought)以灰色在 `🤔 思考中…` 下流式展示、答案在 `📝 Response:` 下展示;工具事件与末尾重复的聚合事件不显示;`--raw` 可打印底层原始 SSE。 - `invoke`:`POST /harness/invoke`,**非流式**,返回完整结果(含错误透传)。 +两种方式的 `session_id` 都取 `--session-id`,**未指定时随机生成 `s-`**(行为一致);`run_sse` 调用前会创建该 session(**幂等**:已存在也不影响)。 + 两种方式都支持一次性覆写(`--system-prompt` / `--model-name` / `--tools` / `--skills` / `--runtime`);覆写为**叠加在本次调用上的临时值**,不写回 spec、不改动运行时(工具 / 技能为增量加入)。 **鉴权(两种传输方式统一)**:`--apikey/-ak` 显式传入时优先;否则对 **custom_jwt** 的 Harness 自动读取 `agentkit login` 会话中的 OIDC id_token 作为 Bearer(自动续期,401 时强制刷新并重试一次)——因此 custom_jwt 的 Harness **无需手动传 `-ak`**;对 **key_auth** 的 Harness 使用注册表中的静态 `key`(key_auth 鉴权器会拒绝 JWT,故不会强塞 id_token)。 @@ -174,7 +176,7 @@ agentkit deploy --harness my-harness --region cn-beijing --yes | `--protocol` | 传输方式。 | `run_sse`(默认)/ `invoke` | | `--directory` | `harness.json` 注册表所在目录。 | 当前目录 | | `--user-id` | 运行的 user_id(仅 `invoke`;`run_sse` 取 JWT `sub` 或随机生成)。 | `agentkit_user` | -| `--session-id` | 运行的 session_id。 | `run_sse` 未指定时随机 `s-`;`invoke` 为 `agentkit_sample_session` | +| `--session-id` | 运行的 session_id(两种传输方式一致)。 | 未指定时随机 `s-` | | `--max-llm-calls` | 本次调用的最大 LLM 调用次数(**仅 `invoke`**)。 | 不限 | | `--system-prompt` | 覆写系统提示词。 | — | | `--model-name` | 覆写模型名。 | — | @@ -271,3 +273,53 @@ agentkit deploy --harness my-harness \ agentkit list harness agentkit list harness --output json --region cn-beijing ``` + +## 8. list sessions —— 列出某 Harness 的会话 + +`agentkit list sessions --harness ` 列出某个已部署 harness 运行时上、**某个用户**的对话会话。它通过 `harness.json` 解析出运行时地址与凭证,直接调用运行时的 ADK 数据面端点: + +```text +GET {harness_url}/apps/harness/users/{user_id}/sessions +``` + +> 与 `list harness`(控制面,AK/SK 调火山 `ListRuntimes`)不同,本命令是**数据面**调用,寻址方式与 `invoke harness` 一致(按名从 `harness.json` 解析、用 harness 凭证鉴权)。 + +### user_id 的确定方式 + +ADK 的会话列表接口把 `user_id` 作为**必填路径参数**,没有"列出所有用户会话"的能力。因此 user_id 必须能被确定,按以下优先级: + +1. 显式 `--user-id `; +2. 未指定时,从 harness 凭证的 JWT `sub` 解出—— + - **custom_jwt(OAuth)harness**:默认使用 `agentkit login` 的 id_token(或 `-ak` 传入的 JWT),其 `sub` 即用户身份; + - **key_auth harness**:凭证是不透明 API Key、不是 JWT,**无法解出 user_id**,此时必须显式 `--user-id`,否则快速失败。 + +### 参数 + +| 参数 | 说明 | 默认值 | +| :--- | :--- | :--- | +| `--harness ` | **(必填)** harness 名称,从 `harness.json` 解析。 | — | +| `--user-id ` | 要查询的用户;缺省时尝试从凭证 JWT 的 `sub` 推导。 | 由 JWT 推导 | +| `--apikey`, `-ak ` | Bearer token(如 custom_jwt harness 的 OIDC JWT),覆盖 registry 中的凭证。 | 注册表凭证 | +| `--directory ` | `harness.json` 所在目录。 | `.`(当前目录) | +| `--output` | 输出格式 `table` / `json` / `yaml`。 | `table` | +| `--quiet`, `-q` | 仅打印会话 id。 | `false` | +| `--no-color`, `-nc` | 关闭彩色输出。 | `false` | + +表格输出包含 `SessionId`、`Events`(事件数)、`LastUpdate`(最近更新时间)等列。 + +### 例子 + +```bash +# custom_jwt(OAuth)harness:已 agentkit login,user_id 自动取自 JWT sub +agentkit list sessions --harness my-harness + +# 显式指定用户(key_auth harness 必须这样) +agentkit list sessions --harness my-harness --user-id alice + +# 传入 JWT 覆盖凭证(user_id 取自该 JWT 的 sub) +agentkit list sessions --harness my-harness -ak "" + +# 仅打印会话 id / JSON 输出 +agentkit list sessions --harness my-harness --user-id alice --quiet +agentkit list sessions --harness my-harness --user-id alice --output json +``` diff --git a/tests/toolkit/cli/test_cli_invoke_harness.py b/tests/toolkit/cli/test_cli_invoke_harness.py index bcdd581..a01162a 100644 --- a/tests/toolkit/cli/test_cli_invoke_harness.py +++ b/tests/toolkit/cli/test_cli_invoke_harness.py @@ -154,6 +154,8 @@ def test_harness_invoke_posts_correct_request(tmp_path, monkeypatch): assert body["prompt"] == "What should you reply?" assert body["harness_name"] == "first" assert body["run_agent_request"]["user_id"] == "agentkit_user" + # No --session-id → random s-, consistent with the run_sse path. + assert body["run_agent_request"]["session_id"].startswith("s-") assert body["run_agent_request"]["max_llm_calls"] == 7 # Partial overrides only (model_fields_set semantics). assert body["harness"] == {"system_prompt": "Reply PINEAPPLE."} @@ -583,6 +585,38 @@ def fake_post(url, json=None, headers=None, timeout=None, stream=False): assert sess_call["url"].endswith(f"/sessions/{sid}") +def test_run_sse_renders_thinking_then_answer(tmp_path, monkeypatch): + """Reasoning streams under a 思考中 header; the answer under 📝 Response.""" + _write_registry(tmp_path, {"first": {"url": "https://x", "key": "ak"}}) + sse = [ + 'data: {"content":{"parts":[{"text":"let me think","thought":true}]},"partial":true}', + 'data: {"content":{"parts":[{"text":"FINAL"}]},"partial":true}', + ] + + class _SSEResp: + status_code = 200 + text = "" + + def iter_lines(self, decode_unicode=False): + return iter(sse) + + def fake_post(url, json=None, headers=None, timeout=None, stream=False): + return _SSEResp() if url.endswith("/run_sse") else _FakeResponse({}, 200) + + monkeypatch.setattr("requests.post", fake_post) + + result = _run_harness( + ["first", "hi", "--directory", str(tmp_path), "--session-id", "s-1"] + ) + assert result.exit_code == 0, result.output + out = result.output + # Reasoning is shown (not dropped) and precedes the answer. + assert "思考中" in out + assert "let me think" in out + assert "FINAL" in out + assert out.index("let me think") < out.index("FINAL") + + def test_harness_invalid_protocol_fails(tmp_path): _write_registry(tmp_path, {"first": {"url": "https://x", "key": "ak"}}) result = _run_harness( From 7f3bba7d30fbc2321c657e71c1eb5011a6e7599d Mon Sep 17 00:00:00 2001 From: "fangyaozheng@bytedance.com" Date: Thu, 18 Jun 2026 23:09:32 +0800 Subject: [PATCH 7/7] feat(invoke harness): replace thinking/response headers with a spinner MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drop the 🤔/📝 emoji headers. The wait + reasoning phase now shows a 'thinking' loading spinner (model thought parts stay hidden); once the answer arrives the spinner stops and the reply streams after a blank line. Co-Authored-By: Claude Opus 4.8 --- agentkit/toolkit/cli/cli_invoke.py | 92 ++++++++++---------- docs/content/2.agentkit-cli/5.harness.md | 2 +- tests/toolkit/cli/test_cli_invoke_harness.py | 9 +- 3 files changed, 50 insertions(+), 53 deletions(-) diff --git a/agentkit/toolkit/cli/cli_invoke.py b/agentkit/toolkit/cli/cli_invoke.py index 3943949..4bd2b77 100644 --- a/agentkit/toolkit/cli/cli_invoke.py +++ b/agentkit/toolkit/cli/cli_invoke.py @@ -726,58 +726,58 @@ def _harness_run_sse( ) raise typer.Exit(1) - def _split_parts(event: dict) -> tuple[str, str]: - # Separate the model's answer text from its "thought" (reasoning) text. + def _answer_text(event: dict) -> str: + # Only the answer text; the model's "thought" (reasoning) parts stay hidden + # behind the thinking spinner. parts = (event.get("content") or {}).get("parts") or [] - answer = thought = "" - for p in parts: - if not (isinstance(p, dict) and p.get("text")): - continue - if p.get("thought"): - thought += p["text"] - else: - answer += p["text"] - return answer, thought + return "".join( + p["text"] + for p in parts + if isinstance(p, dict) and p.get("text") and not p.get("thought") + ) streamed = [] final_answer = "" - thinking_open = False # streaming reasoning under the "🤔 思考中" header - answer_open = False # the "📝 Response:" header has been printed - for line in resp.iter_lines(decode_unicode=True): - if not line: - continue - if raw: - print(line) # builtin print: no rich wrapping/markup on raw JSON - continue - event = _normalize_stream_event(line) - if not isinstance(event, dict): - continue - if event.get("error"): - console.print(f"\n[red]Error: {event['error']}[/red]") - continue - answer, thought = _split_parts(event) - # `partial=False` is the final aggregate (repeats everything) — keep it as - # a fallback but don't print it; the partial events stream answer deltas. - if event.get("partial") is False: - final_answer = answer or final_answer - continue - # Reasoning streams dim/grey under its own header, so the wait isn't blank. - if thought and not answer_open: - if not thinking_open: - console.print("[dim]🤔 思考中…[/dim]") - thinking_open = True - console.print(thought, end="", style="dim") - if answer: - if not answer_open: - if thinking_open: - console.print("") # close the reasoning block - console.print("[cyan]📝 Response:[/cyan]") - answer_open = True - console.print(answer, end="", style="green") - streamed.append(answer) + answer_open = False # first answer token seen → spinner stopped, reply started + # A spinner covers the wait (model latency + reasoning) until the answer starts. + status = None if raw else console.status("thinking", spinner="dots") + if status is not None: + status.start() + try: + for line in resp.iter_lines(decode_unicode=True): + if not line: + continue + if raw: + print(line) # builtin print: no rich wrapping/markup on raw JSON + continue + event = _normalize_stream_event(line) + if not isinstance(event, dict): + continue + if event.get("error"): + if status is not None and not answer_open: + status.stop() + console.print(f"\n[red]Error: {event['error']}[/red]") + continue + answer = _answer_text(event) + # `partial=False` is the final aggregate (repeats everything) — keep it + # as a fallback but don't print it; partial events stream the deltas. + if event.get("partial") is False: + final_answer = answer or final_answer + continue + if answer: + if not answer_open: + if status is not None: + status.stop() # leave the thinking phase + console.print("") # blank line before the reply + answer_open = True + console.print(answer, end="", style="green") + streamed.append(answer) + finally: + if status is not None and not answer_open: + status.stop() if not streamed and final_answer: - console.print("[cyan]📝 Response:[/cyan]") + console.print("") console.print(final_answer, end="", style="green") console.print("") return "".join(streamed) or final_answer diff --git a/docs/content/2.agentkit-cli/5.harness.md b/docs/content/2.agentkit-cli/5.harness.md index 03aefe1..60847b3 100644 --- a/docs/content/2.agentkit-cli/5.harness.md +++ b/docs/content/2.agentkit-cli/5.harness.md @@ -160,7 +160,7 @@ agentkit deploy --harness my-harness --region cn-beijing --yes `agentkit invoke harness "" [options]` 在 `--directory`(默认当前目录)下的 `harness.json` 注册表中按名查找运行时并发起调用。通过 `--protocol` 选择传输方式: -- `run_sse`(**默认**):调用 ADK 的 `/run_sse`,**流式**返回。其 `app_name` 固定为 `harness`;`user_id` 在携带 JWT(custom_jwt)时取其 `sub` 声明(与登录身份绑定),否则随机生成。渲染上回答正文以绿色逐字追加显示,模型推理(thought)以灰色在 `🤔 思考中…` 下流式展示、答案在 `📝 Response:` 下展示;工具事件与末尾重复的聚合事件不显示;`--raw` 可打印底层原始 SSE。 +- `run_sse`(**默认**):调用 ADK 的 `/run_sse`,**流式**返回。其 `app_name` 固定为 `harness`;`user_id` 在携带 JWT(custom_jwt)时取其 `sub` 声明(与登录身份绑定),否则随机生成。渲染上:等待与推理阶段显示一个 `thinking` loading 动画(模型 thought 不展示),收到答案后空一行逐字流式输出回答正文;工具事件与末尾重复的聚合事件不显示;`--raw` 可打印底层原始 SSE。 - `invoke`:`POST /harness/invoke`,**非流式**,返回完整结果(含错误透传)。 两种方式的 `session_id` 都取 `--session-id`,**未指定时随机生成 `s-`**(行为一致);`run_sse` 调用前会创建该 session(**幂等**:已存在也不影响)。 diff --git a/tests/toolkit/cli/test_cli_invoke_harness.py b/tests/toolkit/cli/test_cli_invoke_harness.py index a01162a..18fe1be 100644 --- a/tests/toolkit/cli/test_cli_invoke_harness.py +++ b/tests/toolkit/cli/test_cli_invoke_harness.py @@ -585,8 +585,8 @@ def fake_post(url, json=None, headers=None, timeout=None, stream=False): assert sess_call["url"].endswith(f"/sessions/{sid}") -def test_run_sse_renders_thinking_then_answer(tmp_path, monkeypatch): - """Reasoning streams under a 思考中 header; the answer under 📝 Response.""" +def test_run_sse_hides_reasoning_shows_answer(tmp_path, monkeypatch): + """Reasoning (thought) stays behind the spinner; only the answer is printed.""" _write_registry(tmp_path, {"first": {"url": "https://x", "key": "ak"}}) sse = [ 'data: {"content":{"parts":[{"text":"let me think","thought":true}]},"partial":true}', @@ -610,11 +610,8 @@ def fake_post(url, json=None, headers=None, timeout=None, stream=False): ) assert result.exit_code == 0, result.output out = result.output - # Reasoning is shown (not dropped) and precedes the answer. - assert "思考中" in out - assert "let me think" in out assert "FINAL" in out - assert out.index("let me think") < out.index("FINAL") + assert "let me think" not in out # reasoning is not dumped to the user def test_harness_invalid_protocol_fails(tmp_path):