From 82d8b6de597fbe3a7f6496f8837089b2e4f63739 Mon Sep 17 00:00:00 2001 From: vikramlc Date: Mon, 22 Jun 2026 14:21:15 +0530 Subject: [PATCH 1/4] feat(odin): Implement bounded reader --- .../unstable/core/_bounded_reader.py | 46 +++++++++++ tests/test_unstable/test_bounded_reader.py | 82 +++++++++++++++++++ 2 files changed, 128 insertions(+) create mode 100644 cognite/extractorutils/unstable/core/_bounded_reader.py create mode 100644 tests/test_unstable/test_bounded_reader.py diff --git a/cognite/extractorutils/unstable/core/_bounded_reader.py b/cognite/extractorutils/unstable/core/_bounded_reader.py new file mode 100644 index 00000000..1241657d --- /dev/null +++ b/cognite/extractorutils/unstable/core/_bounded_reader.py @@ -0,0 +1,46 @@ +"""Bounded binary reader for point-in-time log file uploads.""" + +from typing import BinaryIO + + +class BoundedReader: + """ + Wraps a binary file handle and limits reads to a byte count captured at snapshot time. + + Implements ``__len__`` so ``requests.utils.super_len()`` bypasses ``os.fstat()`` + and declares the correct HTTP ``Content-Length``. This is critical when uploading + the active log file (``file.log``) while the logger is still appending to it: + without the cap, the upload reads past the declared length and httpx raises + ``LocalProtocolError: "Too much data for declared Content-Length"``. + + Usage:: + + snapshot_size = os.path.getsize(log_path) + with BoundedReader(open(log_path, "rb"), snapshot_size) as reader: + upload_queue.add_io_to_upload_queue(file_meta, lambda: reader, ...) + """ + + def __init__(self, f: BinaryIO, max_bytes: int) -> None: + self._f = f + self._size = max_bytes + self._remaining = max_bytes + + def __len__(self) -> int: + # super_len() checks __len__ before os.fstat — returning the fixed snapshot + # size here ensures Content-Length is declared at the moment of the snapshot, + # not at the moment the upload resolves the inode size (which may have grown). + return self._size + + def read(self, size: int = -1) -> bytes: + if self._remaining <= 0: + return b"" + to_read = self._remaining if size < 0 else min(size, self._remaining) + data = self._f.read(to_read) + self._remaining -= len(data) + return data + + def __enter__(self) -> "BoundedReader": + return self + + def __exit__(self, *_: object) -> None: + self._f.close() diff --git a/tests/test_unstable/test_bounded_reader.py b/tests/test_unstable/test_bounded_reader.py new file mode 100644 index 00000000..2e2844c3 --- /dev/null +++ b/tests/test_unstable/test_bounded_reader.py @@ -0,0 +1,82 @@ +from pathlib import Path + +import pytest +from requests.utils import super_len + +from cognite.extractorutils.unstable.core._bounded_reader import BoundedReader + + +def _make_file(tmp_path: Path, content: bytes) -> Path: + p = tmp_path / "f.log" + p.write_bytes(content) + return p + + +def test_len_returns_snapshot_size_and_is_stable_after_reads(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"hello world") + with open(path, "rb") as f: + reader = BoundedReader(f, 5) + assert len(reader) == 5 + reader.read(3) + assert len(reader) == 5 # must not decrement with reads + + +@pytest.mark.parametrize( + "snapshot,read_arg,expected", + [ + (5, -1, b"hello"), # read() with no arg reads up to snapshot + (5, 3, b"hel"), # read(n) where n < remaining + (5, 10, b"hello"), # read(n) where n > snapshot — capped at snapshot + ], + ids=["read_all", "read_partial", "read_exceeds_snapshot"], +) +def test_read_respects_snapshot_bound(tmp_path: Path, snapshot: int, read_arg: int, expected: bytes) -> None: + path = _make_file(tmp_path, b"hello world") # 11 bytes — always larger than snapshot + with open(path, "rb") as f: + assert BoundedReader(f, snapshot).read(read_arg) == expected + + +def test_read_returns_empty_after_exhaustion(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"hello world") + with open(path, "rb") as f: + reader = BoundedReader(f, 5) + reader.read() + assert reader.read() == b"" + assert reader.read(100) == b"" + + +def test_zero_snapshot_returns_empty_immediately(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"data") + with open(path, "rb") as f: + reader = BoundedReader(f, 0) + assert len(reader) == 0 + assert reader.read() == b"" + + +def test_context_manager_closes_underlying_file(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"data") + f = open(path, "rb") # noqa: SIM115 + with BoundedReader(f, 4): + pass + assert f.closed + + +def test_super_len_reads_len_not_fstat(tmp_path: Path) -> None: + # Verify that requests.utils.super_len() uses __len__ (snapshot) rather than + # os.fstat().st_size (live inode size). The file is intentionally larger than + # the declared snapshot to confirm fstat is not consulted. + path = _make_file(tmp_path, b"hello world extended") # 20 bytes on disk + with open(path, "rb") as f: + reader = BoundedReader(f, 5) + assert super_len(reader) == 5 + + +def test_midnight_rotation_race_file_shorter_than_snapshot(tmp_path: Path) -> None: + # If TimedRotatingFileHandler rotates between snapshot and file open, the new + # file.log is empty. BoundedReader returns b"" — the upload layer sees "too little + # data" against the declared Content-Length. This is acceptable; the caller retries. + path = _make_file(tmp_path, b"") + with open(path, "rb") as f: + reader = BoundedReader(f, 1_000) + assert len(reader) == 1_000 # snapshot still declared + assert reader.read() == b"" # but file is empty From a20c8c92c520d32bf73faf80388b17ca6021205a Mon Sep 17 00:00:00 2001 From: vikramlc Date: Mon, 22 Jun 2026 18:46:20 +0530 Subject: [PATCH 2/4] refactor(odin): Address gemini comments --- .../unstable/core/_bounded_reader.py | 12 ++++++++++- tests/test_unstable/test_bounded_reader.py | 21 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/cognite/extractorutils/unstable/core/_bounded_reader.py b/cognite/extractorutils/unstable/core/_bounded_reader.py index 1241657d..54444945 100644 --- a/cognite/extractorutils/unstable/core/_bounded_reader.py +++ b/cognite/extractorutils/unstable/core/_bounded_reader.py @@ -31,6 +31,16 @@ def __len__(self) -> int: # not at the moment the upload resolves the inode size (which may have grown). return self._size + def tell(self) -> int: + return self._size - self._remaining + + @property + def closed(self) -> bool: + return self._f.closed + + def close(self) -> None: + self._f.close() + def read(self, size: int = -1) -> bytes: if self._remaining <= 0: return b"" @@ -43,4 +53,4 @@ def __enter__(self) -> "BoundedReader": return self def __exit__(self, *_: object) -> None: - self._f.close() + self.close() diff --git a/tests/test_unstable/test_bounded_reader.py b/tests/test_unstable/test_bounded_reader.py index 2e2844c3..0b608d4b 100644 --- a/tests/test_unstable/test_bounded_reader.py +++ b/tests/test_unstable/test_bounded_reader.py @@ -80,3 +80,24 @@ def test_midnight_rotation_race_file_shorter_than_snapshot(tmp_path: Path) -> No reader = BoundedReader(f, 1_000) assert len(reader) == 1_000 # snapshot still declared assert reader.read() == b"" # but file is empty + + +def test_tell_tracks_bytes_consumed(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"hello world") + with open(path, "rb") as f: + reader = BoundedReader(f, 8) + assert reader.tell() == 0 + reader.read(3) + assert reader.tell() == 3 + reader.read(5) + assert reader.tell() == 8 + + +def test_close_and_closed_property(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"data") + f = open(path, "rb") # noqa: SIM115 + reader = BoundedReader(f, 4) + assert not reader.closed + reader.close() + assert reader.closed + assert f.closed From 1019b858599997e55d3546e3febf6a71330cd003 Mon Sep 17 00:00:00 2001 From: vikramlc Date: Fri, 26 Jun 2026 11:45:58 +0530 Subject: [PATCH 3/4] refactor(odin): Rename parameter --- .../extractorutils/unstable/core/_bounded_reader.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cognite/extractorutils/unstable/core/_bounded_reader.py b/cognite/extractorutils/unstable/core/_bounded_reader.py index 54444945..b17a0d64 100644 --- a/cognite/extractorutils/unstable/core/_bounded_reader.py +++ b/cognite/extractorutils/unstable/core/_bounded_reader.py @@ -20,8 +20,8 @@ class BoundedReader: upload_queue.add_io_to_upload_queue(file_meta, lambda: reader, ...) """ - def __init__(self, f: BinaryIO, max_bytes: int) -> None: - self._f = f + def __init__(self, stream: BinaryIO, max_bytes: int) -> None: + self._stream = stream self._size = max_bytes self._remaining = max_bytes @@ -36,16 +36,16 @@ def tell(self) -> int: @property def closed(self) -> bool: - return self._f.closed + return self._stream.closed def close(self) -> None: - self._f.close() + self._stream.close() def read(self, size: int = -1) -> bytes: if self._remaining <= 0: return b"" to_read = self._remaining if size < 0 else min(size, self._remaining) - data = self._f.read(to_read) + data = self._stream.read(to_read) self._remaining -= len(data) return data From f9039393465226e11346a6223f0f32a1f35545a2 Mon Sep 17 00:00:00 2001 From: vikramlc Date: Wed, 1 Jul 2026 15:45:09 +0530 Subject: [PATCH 4/4] test(odin): Added tests --- tests/test_unstable/test_bounded_reader.py | 31 ++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tests/test_unstable/test_bounded_reader.py b/tests/test_unstable/test_bounded_reader.py index 0b608d4b..286cce17 100644 --- a/tests/test_unstable/test_bounded_reader.py +++ b/tests/test_unstable/test_bounded_reader.py @@ -93,6 +93,37 @@ def test_tell_tracks_bytes_consumed(tmp_path: Path) -> None: assert reader.tell() == 8 +def test_stream_shorter_than_snapshot_partial(tmp_path: Path) -> None: + # File has 5 bytes but snapshot declares 10 (e.g. file was truncated after snapshot). + # _remaining decrements by len(data) not to_read, so tell() reflects actual bytes read. + path = _make_file(tmp_path, b"hello") + with open(path, "rb") as f: + reader = BoundedReader(f, 10) + assert len(reader) == 10 # snapshot still declared + data = reader.read(10) + assert data == b"hello" # only 5 bytes available + assert reader.tell() == 5 # tracks actual bytes, not requested + assert reader.read() == b"" + + +def test_read_multiple_calls_clamps_final_chunk(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"hello world") + with open(path, "rb") as f: + reader = BoundedReader(f, 7) + assert reader.read(3) == b"hel" + assert reader.read(3) == b"lo " + assert reader.read(3) == b"w" # only 1 byte left — clamped by min(size, remaining) + assert reader.read(3) == b"" + + +def test_read_all_after_partial_consumption(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"hello") + with open(path, "rb") as f: + reader = BoundedReader(f, 5) + assert reader.read(3) == b"hel" + assert reader.read(-1) == b"lo" # read() after partial — returns only remaining 2 bytes + + def test_close_and_closed_property(tmp_path: Path) -> None: path = _make_file(tmp_path, b"data") f = open(path, "rb") # noqa: SIM115