Skip to content

Stream run events with resumable SSE#12

Open
mosquito wants to merge 6 commits into
masterfrom
feature/run-event-streaming
Open

Stream run events with resumable SSE#12
mosquito wants to merge 6 commits into
masterfrom
feature/run-event-streaming

Conversation

@mosquito

@mosquito mosquito commented Jul 2, 2026

Copy link
Copy Markdown
Collaborator

Summary

  • contree run follows /v1/operations/{uuid}/events instead of polling /v1/operations/{uuid}. Output streams live; a TerminalSummary built from the SSE events lets the handler skip the final GET when the stream ends with a completion frame.
  • Adds GzipResponse (incremental Content-Encoding: gzip decoding via read1 + zlib.decompressobj(wbits=31)), plus iter_sse_events / decode_event_chunk parsers in client.py. Streams resume across mid-flight drops using Last-Event-Id.
  • operation wait now writes terminal ops into the session image_cache, and show ignores non-terminal cached entries so it never returns a stale in-progress snapshot.
  • cli_version() returns "editable" for uv sync / pip editable installs by consulting direct_url.json, so version strings don't lie in dev checkouts.

Review fixes (2nd commit)

Two issues raised by @insomnes:

  1. SSE retry no longer gives up. The old streamer capped at len(RETRY_DELAYS) attempts and returned an empty summary; the caller then did a single GET whose result could be non-terminal (EXECUTING), and downstream code treated that as final. Now _stream_events_until_close loops indefinitely — between every failed SSE cycle (connect error, mid-stream drop, or clean close without completion) it does GET /operations/{uuid} and, if terminal, parks the full op on TerminalSummary.fallback_op and returns. Only a completion event, a GET-detected terminal status, BrokenPipeError, or Ctrl+C ends the loop.

  2. BrokenPipeError is no longer misinterpreted as a remote network drop. BrokenPipeError is a subclass of ConnectionError, which was inside RETRYABLE_NETWORK_ERRORS — so contree run ... | head would trigger the retry path as if the remote stream broke. Now the streamer catches BrokenPipeError explicitly and re-raises; cmd_run catches it, sends DELETE /operations/{uuid} to cancel the remote op, silences further stdio writes by redirecting the stdout fd to /dev/null, and exits 141 (128 + SIGPIPE) — the Unix convention for SIGPIPE-terminated pipeline stages.

BrokenPipeError behavior — worked example

What happens on contree run -- tar -xvpjf archive.tbz2 | head:

  1. contree spawns the op remotely and opens the SSE stream.
  2. The server streams tar's verbose output; contree writes each chunk to sys.stdout, and head prints it.
  3. After ~10 lines, head exits, closing the pipe.
  4. Next sys.stdout.buffer.write(chunk) raises BrokenPipeError.
  5. _stream_events_until_close re-raises unchanged (does not treat this as a network drop; no retry, no Last-Event-Id reconnect).
  6. cmd_run catches BrokenPipeError:
    • DELETE /v1/operations/{op_uuid} — the server kills tar.
    • os.dup2(/dev/null, sys.stdout.fileno()) — silences any lingering writes during interpreter shutdown.
    • raise SystemExit(141).

Why cancel the remote op

This matches what a local tar xvpjf archive.tbz2 | head does: tar's next stdout write raises SIGPIPE, and its default SIGPIPE action terminates the process — extraction stops partway.

What differs from local

Local tar ... | head Contree contree run -- tar ... | head
Producer sees pipe close Yes (kernel-delivered SIGPIPE) No — the pipe break happens on the CLI host; the CLI propagates it by cancelling the op
Producer stops Yes Yes (via DELETE /operations/{uuid})
Partial state Whatever tar fully wrote before dying stays on the local filesystem Op ends in CANCELLED status → no result_image_uuid → session image does not advance; the writable layer is discarded

So a local tar | head leaves you a partial (recoverable) extraction; the contree equivalent leaves nothing durable. That's a consequence of contree's all-or-nothing image commit model, not of the CLI's pipe handling.

When this matters

  • yes | head, find / | head, dmesg | head — cancelling is fine; no meaningful side effect lost.
  • contree run -- tar ... | head, ... | make build | head, ... | cargo build 2>&1 | head — the op is aborted before completing; if you want the work to finish, either drop the pipe or use contree run -d (detach mode) and read the output later via contree show.

The CLI can't tell these two cases apart, so it errs on the side of matching shell/SIGPIPE semantics. Reversing this would require the server to commit partial images for cancelled ops, which is out of scope here.

Docker-compat mode: live-streaming RUN steps in contree build (3rd commit)

contree_cli/docker/kw_run.py used to poll GET /operations/{uuid} in a loop and dump the captured stdout/stderr via logger.info after the op reached a terminal status. That worked but felt very much unlike docker build, which prints RUN output as the container produces it. RunKeyword.spawn now goes through the same SSE machinery contree run uses:

  • _stream_events_until_close(client, op_uuid, DefaultFormatter()) — always writes stdout/stderr live to the terminal, regardless of the top-level formatter used for the final build record.
  • The resulting TerminalSummary is resolved via the same priority chain as cmd_run: completion event → fallback_op from a GET-detected terminal status → safety-net GET.
  • log_streams is still called on the fallback paths so users see the captured output when live streaming didn't happen (older server, SSE broken, etc.).

The local poll() helper and the TERMINAL_STATUSES frozenset are gone from kw_run.py — the streamer imports TERMINAL_OP_STATUSES from cli.run.

Same commit also drops the streamer-level graduated backoff. client.request already retries every SSE reconnect through its own RETRY_DELAYS ladder, so the extra _stream_backoff_sleep was pure duplication. The one thing kept is a 0.5s TIGHT_LOOP_FLOOR when a cycle produced no forward progress — guards against a server that returns immediate empty streams for an executing op without letting client.request re-arm.

ADD <url> URL upload was clobbering to the empty-file UUID (4th commit)

While testing docker-compat mode against docs/examples/build-demo/Dockerfile, RUN python -m zipfile -e /tmp/contree-cli.zip /opt/ kept failing with FileNotFoundError. Investigation showed a much more serious bug — every ADD <url> <dest> was assigning the wrong file UUID to the destination.

Diagnosis

The session cache had two entries with different content shape but identical UUIDs:

local_file:https://.../master.zip     uuid=8fba00dd..., sha=2ac18a8e..., size=675183
local_file:https://.../master.tar.gz  uuid=8fba00dd..., sha=c0b2c6b9..., size=604673

Both point at 8fba00dd-fbbc-5a71-a569-a5219f06b1e3 — which is also the UUID of the empty src/__init__.py in the same fixture. Different downloaded content, different SHAs recorded client-side, same server-side UUID as an empty file. The server is content-addressable, so both uploads were being deduped as zero bytes despite HashingReader recording a proper hash on the way through.

The culprit was fetch_and_upload streaming the urllib response body straight into client.request with a manually-set Content-Length copied from the GET response headers. Something in that streaming path (either the socket-backed source not returning the advertised byte count in one go, or http.client's framing when Content-Length is set explicitly on a file-like body) delivered zero payload bytes to the server, so it deduped every URL upload to the empty file's UUID.

Fix

Buffer the URL body to memory once, then POST it as a plain bytes body. http.client frames Content-Length automatically, no mismatch is possible, SHA and size come from the buffered bytes. Typical ADD payloads (wheels, tarballs, small archives) are megabytes — buffering is fine. The now-unused HashingReader helper and parse_content_length utility are deleted along with their tests.

Verified end-to-end: contree build --no-cache ./docs/examples/build-demo completes; RUN contree --help | head -20 and RUN python /app/hello.py output streams live to the terminal.

Test plan

  • make lint
  • make types
  • uv run pytest (1594 passed, 4 skipped)
  • Manual: contree run -- sleep 5; echo done shows live output and exits cleanly
  • Manual: kill the stream mid-run (drop network), confirm it resumes without duplicating output
  • Manual: contree run --format json -- echo hi returns clean JSON with full stdout captured
  • Manual: contree build --no-cache ./docs/examples/build-demo — RUN steps stream live, ADD URL upload produces the correct file UUID
  • Manual: contree run -- yes | head cancels the op on the server and exits 141

`contree run` now follows /v1/operations/{uuid}/events instead of
polling. Adds a gzip-aware incremental HTTP body wrapper, SSE frame
parser with Last-Event-Id resume across drops, and a TerminalSummary
that lets the run handler skip the final GET when the stream ends
with a completion event.

`operation wait` now caches terminal ops into the session image cache
so subsequent `show` invocations reuse the result. `show` treats
non-terminal cache hits as misses to avoid stale reads.

Client:
- GzipResponse: incremental Content-Encoding: gzip via read1+decompressobj
- iter_sse_events / decode_event_chunk parsers
- cli_version() now returns "editable" for direct-url editable installs
Comment thread contree_cli/cli/run.py Outdated
Comment thread contree_cli/cli/run.py
Comment thread contree_cli/client.py Outdated
)

for attempt in range(attempts):
delay = RETRY_DELAYS[attempt - 1]

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 on attempt 0 (0-1=-1) is unexpected, I suppose

mosquito added 4 commits July 2, 2026 17:35
- Drop the retry-budget bail-out.  Between every failed SSE cycle
  (connect error, mid-stream drop, clean close without completion),
  poll `GET /operations/{uuid}` and, if terminal, park the full op
  on `TerminalSummary.fallback_op` so cmd_run can use it directly.
  Only a `completion` event, a GET-detected terminal status, or
  Ctrl+C ends the loop.

- Catch `BrokenPipeError` explicitly before RETRYABLE_NETWORK_ERRORS
  in the stream consumer and re-raise.  BrokenPipeError is a
  ConnectionError subclass, so the old broad handler would treat a
  closed local pipe (e.g. `contree run ... | head`) as a remote drop
  and retry uselessly.  cmd_run now catches it, cancels the op,
  silences further stdio writes, and exits 141 (128 + SIGPIPE).

Tests cover ApiError-until-terminal, empty stream + terminal GET,
BrokenPipe propagation out of the streamer, and cmd_run's
BrokenPipe cancel-and-exit path.
RETRY_DELAYS[attempt - 1] on attempt=0 wrapped to the tail (10s)
before ever being intended as a retry index. The retry-block guard
now scopes delay to attempts > 0, and the 410/425 branch uses
RETRY_DELAYS[min(attempt, len - 1)] so first-attempt Too Early / Gone
retries sleep 1s instead of 10s.
`contree build` now consumes the same SSE event stream as `contree run`
instead of polling `/operations/{uuid}` and dumping stdout/stderr at the
end. RUN output reaches the user's terminal as the server produces it —
matching `docker build`'s live-log behavior.

The Dockerfile RUN keyword calls `_stream_events_until_close` with a
`DefaultFormatter`, always writing chunks to `sys.stdout`/`sys.stderr`
regardless of the top-level formatter used for the final build record.
When the streamer falls back to a GET (SSE never delivered a completion
event), the captured stdout/stderr from the op's metadata is replayed
via `log_streams` so the user still sees what happened. The `poll()`
helper and local `TERMINAL_STATUSES` constant are gone.

Also drops the streamer-level graduated backoff. `client.request`
already retries every SSE reconnect through its own `RETRY_DELAYS`
ladder, so the extra `_stream_backoff_sleep` was pure duplication.
Only kept is a 0.5s `TIGHT_LOOP_FLOOR` when a cycle produced no
forward progress — guards against a server that returns immediate
empty streams for an executing op.
`fetch_and_upload` was piping the urllib response straight into
`client.request` with a manually-set `Content-Length` from the GET
response headers.  Something in that streaming path (likely the
socket-backed source not returning the advertised byte count in one
shot) made the server dedupe the upload as a 0-byte file — every
`ADD <url>` in a Dockerfile ended up wired to the empty-file UUID,
so `tar -xzf` / `python -m zipfile -e` promptly failed on the
sandbox side.

Reading the whole body first is fine for typical `ADD` payloads
(archives, wheels, tarballs are megabytes, not gigabytes) and
sidesteps the mismatch: sha and Content-Length are derived from the
buffered bytes, and `http.client` auto-frames the request.

The now-unused `HashingReader` helper and `parse_content_length`
utility are gone with their tests.
Comment thread contree_cli/client.py Outdated

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also attempt = -1 problem + we double sleep in case of 410,425

`client.request`'s retry loop indexed `RETRY_DELAYS[attempt - 1]` at
the top and did a second `time.sleep(RETRY_DELAYS[min(attempt, ...)])`
inside the 410/425 branch — that gave every Retry-After response
roughly twice the intended backoff, and the `attempt - 1` form would
wrap to the tail delay if the guard ever fell off.

Replace both with a `retry_generator()` that yields the ladder,
then keeps yielding the tail forever.  Each retry pulls exactly one
value via `next(delays)`, and the caller bounds attempts with a
plain `retries_left` counter — no `attempt - 1`, no double sleep.

Also shortens the ladder to `(0.1, 0.2, 0.5, 1, 2, 5)` so a
transient blip recovers in 100 ms instead of a full second, and the
worst-case wall clock (6 retries) drops from ~42 s to ~8.8 s.

Adds `test_410_sleeps_once_per_failure` to lock in the regression —
the existing `test_first_attempt_410_uses_short_delay` only checked
`delays[0]` so the double sleep had slipped through.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants