Skip to content

feat(waterdata): Add async parallel chunker over httpx.AsyncClient#285

Merged
thodson-usgs merged 3 commits into
DOI-USGS:mainfrom
thodson-usgs:httpx-migration
May 28, 2026
Merged

feat(waterdata): Add async parallel chunker over httpx.AsyncClient#285
thodson-usgs merged 3 commits into
DOI-USGS:mainfrom
thodson-usgs:httpx-migration

Conversation

@thodson-usgs
Copy link
Copy Markdown
Collaborator

@thodson-usgs thodson-usgs commented May 21, 2026

Summary

Adds an async-only parallel chunker to the Water Data getters. When a multi-value query (many monitoring locations, many parameter codes, large CQL2 filter) would exceed the server's ~8 KB URL/body limit, the call is split into multiple sub-requests and run concurrently under one shared httpx.AsyncClient with the connection pool sized from API_USGS_CONCURRENT (default 16).

Mid-stream transients (429 / 5xx / connect/read timeout / oversize follow-up URL) escalate to a resumable ChunkInterrupted carrying .call; the user calls exc.call.resume() once the underlying condition clears and only the still-pending sub-requests are re-issued.

Rebased onto current main after the merge of #289 (httpx), #283 (auto-chunking), #290 (notebook migration), #291 (vignette ports), #292 (compact CQL2 JSON), and #293 (ruff-only pre-commit + AGENTS.md + housekeeping).

Commit structure (2 commits)

feat(waterdata): async parallel chunker over httpx.AsyncClient All chunker / retry / resume / progress / finalize / deps work, squashed.
chore(pre-commit): add nbstripout to strip notebook outputs on commit Stacks on #293's ruff-only config; adds just the kynan/nbstripout hook (the one piece #293 didn't cover) and strips outputs from the 15 checked-in waterdata notebooks once.

Benchmark

~6.7× speedup on a 6-state get_daily discharge pull over 17,298 stream sites (chunked into 64 sub-requests): parallel @ API_USGS_CONCURRENT=163.11 s; serial @ =120.94 s (~1,920 rows each, distinct date windows per side so the server cache can't bias either run; both authenticated via API_USGS_PAT).

API_USGS_CONCURRENT

Value Behavior
unset / blank parallel, cap = 16 (default)
≥ 2 parallel, connection-pool capped at that value
1 single-connection gather (one request at a time)
unbounded no per-call cap — caller owns the burst risk
0, negative, malformed ValueError at call time

Concurrency is bounded by the connection pool (httpx.Limits(max_connections=N, max_keepalive_connections=N)), not a semaphore. The same pool is shared across every sub-request of a single chunked call via the _chunked_client ContextVar so the paginated helpers (_walk_pages) reuse it.

Retries (API_USGS_RETRIES)

Each sub-request is retried on a transient failure with exponential backoff + full jitter, honoring a server Retry-After. A long Retry-After (multi-minute quota window) escalates immediately to the resumable interruption rather than blocking the call.

Value Behavior
unset / blank retry, cap = 4 (default)
N ≥ 1 up to N retries per sub-request
0 no retry — first transient surfaces as a resumable ChunkInterrupted
negative, malformed ValueError at call time
  • RetryPolicy — frozen value object (from_env / should_retry / backoff). Full jitter de-correlates concurrent retries so a throttle event doesn't make them re-burst in lockstep.
  • _retryable is narrower than _classify_chunk_error: it retries the typed RateLimited / ServiceUnavailable / httpx.TransportError but not httpx.InvalidURL (a too-long cursor won't fix on retry).
  • Retries fire inside the pool slot, so a backing-off chunk holds its connection — effective concurrency shrinks gracefully under throttling.

Architecture

ChunkPlan — extracts every chunkable axis (multi-value list params + the cql-text filter along its top-level OR clauses), greedy-halves the biggest axis until each sub-request URL fits the byte budget, and iterates the cartesian product. Single-request queries get a one-step plan that skips chunking entirely.

ChunkedCall — drives the plan over an async def fetch(args):

  • _run gathers every pending sub-request under one shared httpx.AsyncClient.
  • resume() is the sync facade — runs _run through an anyio blocking portal so it works whether or not the caller is already in an event loop (Jupyter / IPython / async apps).
  • partial_frame / partial_response snapshot completed sub-requests as raw (frame, response) — safe to inspect from inside a ChunkInterrupted handler without triggering finalize (which for OGC getters includes a schema fetch on an empty frame).
  • finalize hook (dependency-injected callable) lets the OGC getters inject utils._finalize_ogc (type coercion, column arrangement, BaseMetadata, max_rows truncation) without coupling the chunker to OGC concerns. A successful first call and a resumed call yield the same shape.

Failure precedence in the gather (in order):

  1. Cancellation signals (CancelledError, KeyboardInterrupt, SystemExit) propagate unmodified — wrapping them would consume the user's stop signal.
  2. Non-transient failures (real bugs unrecognized by _classify_chunk_error) surface raw, so they aren't masked behind a resumable handle for a transient sibling that landed later.
  3. Recognized transients raise the first one as a resumable ChunkInterrupted subclass (QuotaExhausted for 429, ServiceInterrupted for the rest).

Deps

  • geopandas>=0.10 + mapclassify added to [doc] extras so WaterData_demo.ipynb's .set_crs().explore() cell executes (the plain-pandas frame lacks .set_crs).

Validation

Result
Offline test suite 298 passed, ruff clean
Live API regression suite (tests/waterdata_test.py) 63 passed
README examples All 4 byte-identical to main (matching row counts, columns, SHA-256 of head rows — across single-site, multi-site, POST/CQL2 1,360-row, and 52,853-row paginated continuous queries)
Notebook execution (8 from demos/) 7/7 OK on both branches
Cell-by-cell notebook diff All data cells match between branches

Out of scope (intentional)

  • Adaptive concurrency (AIMD) — a client-side token-bucket controller that converges to the gateway's steady-state rate would reduce retries further. The pool-held-during-backoff behavior is a partial version; a full controller is a separate change.
  • asyncio.TaskGroup (3.11+) — would replace gather once the 3.9 floor is dropped, though the partial-completion contract fights TaskGroup's cancel-on-first-failure default.
  • Pre-emptive quota gating — an earlier revision raised a RequestExceedsQuota before a burst the x-ratelimit-remaining header said couldn't fit; dropped as not worth the complexity. The reactive retry-then-escalate flow is the chosen alternative.
  • Layered config refactor — gathering the four runtime knobs behind a WaterDataConfig dataclass with file + env + Python-override precedence is in progress as a follow-up (Allow user to work in timezones besides UTC #3 on the fork).

🤖 Generated with Claude Code

@thodson-usgs thodson-usgs force-pushed the httpx-migration branch 18 times, most recently from c618ea8 to 06d43aa Compare May 24, 2026 14:34
@thodson-usgs thodson-usgs force-pushed the httpx-migration branch 9 times, most recently from 57ff5bd to 6d1d5db Compare May 25, 2026 15:38
@thodson-usgs thodson-usgs changed the title feat(waterdata): Migrate to httpx and add async parallel chunker feat(waterdata): Add async parallel chunker over httpx.AsyncClient May 25, 2026
@thodson-usgs thodson-usgs added the enhancement New feature or request label May 25, 2026
@thodson-usgs thodson-usgs force-pushed the httpx-migration branch 3 times, most recently from 2ec3f51 to d2bf71f Compare May 26, 2026 21:25
Comment thread dataretrieval/waterdata/chunking.py Outdated
Comment thread dataretrieval/waterdata/chunking.py
Comment thread dataretrieval/waterdata/chunking.py Outdated
Comment thread dataretrieval/waterdata/chunking.py Outdated
Comment thread dataretrieval/waterdata/chunking.py Outdated
Comment thread dataretrieval/waterdata/chunking.py Outdated
Comment thread dataretrieval/waterdata/chunking.py Outdated
Copy link
Copy Markdown
Collaborator Author

@thodson-usgs thodson-usgs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please address my comments.

Comment thread dataretrieval/waterdata/chunking.py Outdated
Comment on lines +381 to +385
def _passthrough_result(
frame: pd.DataFrame, response: httpx.Response
) -> tuple[pd.DataFrame, Any]:
"""Default :data:`_Finalize`: return the raw combined pair unchanged."""
return frame, response
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this function useful? Does it result in cleaner more readable code?
Furthermore, is this always applied to the Finalize callable in practice? If so do we really need Finalize as a parameter?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked the actual flow before changing this — finalize is never the default in production. Every OGC getter goes through utils.get_ogc_data, which builds a functools.partial(_finalize_ogc, properties=..., output_id=..., service=..., max_rows=...) per call and passes it as finalize= to _fetch_once (utils.py:1387–1396). So _passthrough_result is the fallback for direct chunker use (tests, future non-OGC callers), not the common path.

The injection is what keeps chunking.py from importing OGC-specific post-processing. Without it, the chunker would either need to import _finalize_ogc directly (couples chunking → utils) or resume() would have to return the raw shape — and callers catching ChunkInterrupted further up the stack would then have to know to apply OGC post-processing themselves (breaks the "resume() returns the same shape a successful first call would have" contract).

I'd push back on removing it. Happy to swap the named _passthrough_result for a lambda f, r: (f, r) if that small abstraction feels like overkill — but the parameter itself earns its place.

Comment thread dataretrieval/waterdata/chunking.py Outdated
Comment on lines +1103 to +1128
def _retryable(exc: BaseException) -> tuple[bool, float | None]:
"""
Decide whether ``exc`` is a transient worth an automatic retry.

Only the *top-level* exception is inspected — unlike
:func:`_classify_chunk_error`, which walks the ``__cause__`` chain.
The distinction matters because ``_paginate`` raises an
initial-request transient (429 / 5xx / :class:`httpx.TransportError`)
*raw*, but wraps a mid-pagination failure as a ``RuntimeError``. So a
raw transient means a sub-request that made no progress and is cheap to
re-issue, whereas a mid-pagination failure is left to escalate to a
resumable :class:`ChunkInterrupted` rather than re-walked from page 1
(which would re-spend the quota just exhausted). ``httpx.InvalidURL``
is never retried — a too-long cursor won't fix on a retry.

Returns
-------
tuple[bool, float or None]
``(retryable, retry_after)`` — the server ``Retry-After`` hint
(seconds) when the transient carried one, else ``None``.
"""
if isinstance(exc, (RateLimited, ServiceUnavailable)):
return True, exc.retry_after
if isinstance(exc, httpx.TransportError):
return True, None
return False, None
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a trivial function and it's called in exactly one place. If you agree, then I think we can consolidate this.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Declining this one — _retryable is directly unit-tested (tests/waterdata_chunking_test.py:1571–1591, 7 assertions covering the RateLimited / ServiceUnavailable / ReadTimeout / ConnectError / InvalidURL / RuntimeError cases and the __cause__-walking semantics). Inlining it into _retry_delay would either delete that coverage or force contorted "call _retry_delay and assert it returned None" assertions that lose the per-case specificity.

Keeping _retryable as a named, separately-testable concept earns the indirection. Open to revisiting if you have a different test pattern in mind.

Comment on lines +1029 to +1031
finalize : Callable, optional
Transform applied to the combined ``(frame, response)`` (see
:data:`_Finalize`). Defaults to :func:`_passthrough_result`.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my earlier point. If finalize always defaults to _passthrough_result in our code, then maybe we don't need it as a parameter.

Copy link
Copy Markdown
Collaborator Author

@thodson-usgs thodson-usgs May 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my reply on the _passthrough_result threadfinalize is never actually defaulted in production; every OGC getter overrides it with a functools.partial(_finalize_ogc, ...). The default exists as a fallback, not the common path.

Comment on lines +1378 to +1382
finalize : Callable
Transform applied to the combined result (see :data:`_Finalize`) at
the terminal :meth:`_run` return, so a completed call yields the
caller's finished shape. The ``partial_*`` accessors deliberately
skip it and stay raw.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same point as other finalize.

Copy link
Copy Markdown
Collaborator Author

@thodson-usgs thodson-usgs May 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the other finalize threads — see the main reply. finalize is always overridden in production by the OGC getters, so the parameter earns its place.

Comment thread dataretrieval/waterdata/chunking.py Outdated
Comment thread dataretrieval/waterdata/chunking.py Outdated
thodson-usgs and others added 2 commits May 28, 2026 14:06
Multi-value Water Data queries (many monitoring locations, many parameter
codes, large CQL2 filters) can exceed the server's ~8 KB URL/body limit
and need to be split into multiple sub-requests. This adds an
``async``-only chunker that:

* Plans the fan-out: every multi-value list parameter and the cql-text
  ``filter`` (along its top-level ``OR`` clauses) is modeled as a chunkable
  axis; ``ChunkPlan`` greedy-halves the biggest axis until every
  sub-request URL fits the byte budget, then iterates the cartesian product.
* Dispatches concurrently: ``ChunkedCall._run`` gathers every pending
  sub-request through one shared ``httpx.AsyncClient`` with the connection
  pool sized from ``API_USGS_CONCURRENT`` (default 16; ``unbounded``
  removes the cap). A single ``anyio`` blocking portal lets the sync
  facade work from inside event loops (Jupyter, async apps).
* Survives transients: typed ``RateLimited`` (429) and
  ``ServiceUnavailable`` (5xx) trigger bounded retry-with-backoff
  (``API_USGS_RETRIES`` default 4; full jitter; honors ``Retry-After``
  up to a 60 s cap). Anything still failing escalates to a resumable
  ``ChunkInterrupted`` subclass carrying ``.call`` — call ``.call.resume()``
  once the underlying condition clears; only the still-pending
  sub-requests are re-issued.
* Combines and finalizes: the OGC getters inject ``utils._finalize_ogc``
  (type coercion, column arrangement, ``max_rows`` truncation,
  ``BaseMetadata``) through the chunker's ``finalize`` hook so a
  successful first call and a resumed call yield the same shape.

Surfaces and integration:

* ``dataretrieval/waterdata/chunking.py`` (new module): ``RetryPolicy``,
  ``ChunkPlan``, ``ChunkedCall``, ``ChunkInterrupted`` /
  ``QuotaExhausted`` / ``ServiceInterrupted``, ``multi_value_chunked``
  decorator.
* ``utils._fetch_once`` is the decorated async fetcher; pagination
  helpers ``_paginate`` / ``_walk_pages`` are async and share the
  chunker's client via a ``ContextVar``.
* ``api.get_reference_table(... max_rows=...)``: new preview cap.
* ``_progress``: per-call status line (chunk count, pages, rows,
  rate-limit remaining); ``API_USGS_PROGRESS`` opt-in/off.

Deps: add ``geopandas>=0.10`` + ``mapclassify`` to ``[doc]`` extras so
``WaterData_demo.ipynb``'s ``.set_crs().explore()`` cell executes (the
plain-pandas frame lacks ``.set_crs``).

Tests: full async chunker suite (planning, retry taxonomy, resume,
client-sharing, progress reporter, finalize injection) + live-API
regression tests covering every public getter. 298 offline + 63 live
tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Stacks on DOI-USGS#293 (which switched pre-commit to ruff-only and cleaned the
text-file housekeeping). The one piece DOI-USGS#293 didn't cover is the
checked-in notebook outputs — they balloon the demo notebook diffs on
every re-run with timestamps, quota counters, and HTML/plot blobs, even
when the source cells are unchanged.

Adds the kynan/nbstripout pre-commit hook (rev 0.8.1) and runs it once
across the existing tree, stripping cell outputs + execution_count from
the 15 checked-in waterdata notebooks. Demos still execute cleanly
locally; commits now diff the source rather than the rendered run.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…live

The notebook had its ``peak_trend_analysis`` invocation commented out
with a "few minutes to run" warning, and the last cell loaded a
pre-computed ``datasets/peak_discharge_trends.csv`` instead. The
relative path broke whenever the notebook was executed from any cwd
other than ``demos/`` (e.g. an nbconvert run against a staged copy).

With the async chunker, the analysis runs end-to-end in roughly two
seconds for a single state's worth of stream gages, so the cached CSV
no longer earns its keep:

- Un-commented the live ``peak_trend_analysis`` call in cell 11 and
  reduced its scope from three states to Rhode Island. Rhode Island has
  ~350 stream gages — enough to actually exercise the chunker, but
  small enough that the doc build (which executes notebooks live under
  nbsphinx, unauthenticated) stays well within rate limits.
- Deleted the ``pd.read_csv("datasets/peak_discharge_trends.csv")`` cell
  (no longer needed; ``final_df`` is produced live).
- Deleted ``demos/datasets/peak_discharge_trends.csv`` (535 KB tracked
  cache) and the now-empty ``demos/datasets/`` directory.

The notebook now has no relative-path data dependency — it executes
cleanly from any cwd, including under nbsphinx in CI.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@thodson-usgs thodson-usgs marked this pull request as ready for review May 28, 2026 22:26
@thodson-usgs thodson-usgs merged commit ad4e980 into DOI-USGS:main May 28, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant