diff --git a/cognite/extractorutils/unstable/core/_bounded_reader.py b/cognite/extractorutils/unstable/core/_bounded_reader.py index b17a0d64..45a67c43 100644 --- a/cognite/extractorutils/unstable/core/_bounded_reader.py +++ b/cognite/extractorutils/unstable/core/_bounded_reader.py @@ -34,6 +34,14 @@ def __len__(self) -> int: def tell(self) -> int: return self._size - self._remaining + def seek(self, offset: int, whence: int = 0) -> int: + pos = self._stream.seek(offset, whence) + self._remaining = max(0, self._size - pos) + return pos + + def seekable(self) -> bool: + return self._stream.seekable() + @property def closed(self) -> bool: return self._stream.closed diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index ed7407f2..15c61607 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -1,12 +1,18 @@ """Built-in ``fetch_logs`` action: streams rotated log files to CDF Files.""" +import json import logging +from collections import Counter from dataclasses import dataclass from datetime import date, timedelta, timezone from datetime import datetime as dt from pathlib import Path +from typing import BinaryIO, Literal + +from cognite.client import CogniteClient from cognite.extractorutils.unstable.configuration.models import ExtractorConfig, LogFileHandlerConfig +from cognite.extractorutils.unstable.core._bounded_reader import BoundedReader from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError _logger = logging.getLogger(__name__) @@ -14,6 +20,8 @@ MAX_DATE_RANGE_DAYS = 7 """Maximum number of calendar days a single ``fetch_logs`` invocation may cover.""" +MAX_FILE_SIZE_BYTES = 4 * 1024 * 1024 * 1024 # 4 GiB — CDF single-request upload limit + _FETCH_LOGS_DESCRIPTION = ( f"Upload rotated log files to CDF Files for a given date range. At most {MAX_DATE_RANGE_DAYS} days per invocation." ) @@ -28,6 +36,15 @@ class LogFileCandidate: is_current: bool # True when path is the live (unrotated) file.log +@dataclass +class _FileUploadResult: + log_date: date + file_external_id: str + status: Literal["uploaded", "skipped_too_large", "failed"] + size_bytes: int = 0 + error: str | None = None + + def _today_utc() -> date: return dt.now(tz=timezone.utc).date() @@ -100,6 +117,67 @@ def _build_candidate_files( return candidates, skipped +def _file_external_id(integration_external_id: str, log_date: date) -> str: + return f"extractor-logs-{integration_external_id}-{log_date.isoformat()}" + + +def _upload_candidate( + candidate: LogFileCandidate, + integration_external_id: str, + cdf_client: CogniteClient, + snapshot_size: int | None, +) -> _FileUploadResult: + """Upload one candidate log file to CDF Files. Returns a result regardless of success or failure.""" + external_id = _file_external_id(integration_external_id, candidate.log_date) + + try: + actual_size = snapshot_size if snapshot_size is not None else candidate.path.stat().st_size + except OSError as e: + return _FileUploadResult( + log_date=candidate.log_date, file_external_id=external_id, status="failed", error=str(e) + ) + + if actual_size > MAX_FILE_SIZE_BYTES: + _logger.warning( + "fetch_logs: skipping %s (%d bytes) — exceeds MAX_FILE_SIZE_BYTES (%d bytes)", + candidate.path.name, + actual_size, + MAX_FILE_SIZE_BYTES, + ) + return _FileUploadResult( + log_date=candidate.log_date, + file_external_id=external_id, + status="skipped_too_large", + size_bytes=actual_size, + ) + + try: + with open(candidate.path, "rb") as f: + reader: BinaryIO = BoundedReader(f, snapshot_size) if snapshot_size is not None else f # type: ignore[assignment] + cdf_client.files.upload_bytes( + content=reader, + name=f"{external_id}.log", + external_id=external_id, + mime_type="text/plain", + overwrite=True, + ) + _logger.info("fetch_logs: uploaded %s (%d bytes)", external_id, actual_size) + return _FileUploadResult( + log_date=candidate.log_date, + file_external_id=external_id, + status="uploaded", + size_bytes=actual_size, + ) + except Exception as e: + _logger.error("fetch_logs: failed to upload %s — %s", external_id, e) + return _FileUploadResult( + log_date=candidate.log_date, + file_external_id=external_id, + status="failed", + error=str(e), + ) + + def fetch_logs_action(ctx: ActionContext) -> None: """Validate parameters and upload rotated log files for the requested date range to CDF Files.""" params = ctx.call_metadata or {} @@ -144,11 +222,78 @@ def fetch_logs_action(ctx: ActionContext) -> None: error_type="no_file_handler_configured", ) - candidates, skipped = _build_candidate_files(log_file_path, start_date, end_date, today) + candidates, skipped_dates = _build_candidate_files(log_file_path, start_date, end_date, today) _logger.info( "fetch_logs: %d candidate file(s) for %s to %s; %d date(s) skipped", len(candidates), start_date, end_date, - len(skipped), + len(skipped_dates), + ) + + # Snapshot the current-day file size BEFORE starting uploads. + # This gives a fixed read ceiling for BoundedReader — bytes written after this + # point are excluded from the upload, preventing Content-Length mismatches. + snapshot_size: int | None = None + current_candidate = next((c for c in candidates if c.is_current), None) + if current_candidate is not None: + try: + snapshot_size = current_candidate.path.stat().st_size + _logger.info( + "fetch_logs: current-day snapshot %d bytes (%s)", + snapshot_size, + current_candidate.path.name, + ) + except OSError as e: + _logger.warning("fetch_logs: could not snapshot current-day file size — %s", e) + + integration_external_id = ctx._extractor.connection_config.integration.external_id + cdf_client = ctx._extractor.cognite_client + + upload_results: list[_FileUploadResult] = [ + _upload_candidate( + candidate, + integration_external_id, + cdf_client, + snapshot_size if candidate.is_current else None, + ) + for candidate in candidates + ] + + counts = Counter(r.status for r in upload_results) + + # Per-file entries: upload results (sorted by date) + missing dates (skipped by candidate builder) + files_list: list[dict[str, str]] = [] + for r in upload_results: + entry: dict[str, str] = { + "date": str(r.log_date), + "file_external_id": r.file_external_id, + "status": r.status, + } + if r.size_bytes: + entry["size_bytes"] = str(r.size_bytes) + if r.error: + entry["error"] = r.error + files_list.append(entry) + files_list.extend( + { + "date": str(skipped_date), + "file_external_id": _file_external_id(integration_external_id, skipped_date), + "status": "skipped", + } + for skipped_date in sorted(skipped_dates) + ) + files_list.sort(key=lambda e: e["date"]) + + total_skipped = len(skipped_dates) + counts["skipped_too_large"] + + ctx.set_result( + f"{counts['uploaded']} of {num_days} log files uploaded to CDF Files", + metadata={ + "total_files": str(num_days), + "uploaded_files": str(counts["uploaded"]), + "skipped_files": str(total_skipped), + "failed_files": str(counts["failed"]), + "files": json.dumps(files_list), + }, ) diff --git a/cognite/extractorutils/unstable/core/actions.py b/cognite/extractorutils/unstable/core/actions.py index dd1c6f1a..cf0084fc 100644 --- a/cognite/extractorutils/unstable/core/actions.py +++ b/cognite/extractorutils/unstable/core/actions.py @@ -38,6 +38,8 @@ def __init__( self._extractor = extractor self.external_id = external_id self.call_metadata = call_metadata + self._result_message: str | None = None + self._result_metadata: dict[str, str] | None = None self._logger = logging.getLogger(f"{self._extractor.EXTERNAL_ID}.action.{self._action.name.replace(' ', '')}") @@ -61,6 +63,11 @@ def _new_error( task_name=task_name if task_name is not None else self._action.name, ) + def set_result(self, message: str, *, metadata: dict[str, str] | None = None) -> None: + """Record the result for a successful action completion.""" + self._result_message = message + self._result_metadata = metadata + class ActionError(Exception): """Deliberate action failure with structured metadata for Odin result reporting.""" diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index 0fff2d52..68bbd4f7 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -677,7 +677,12 @@ def _handle_custom_action(self, action: Action) -> None: try: custom.target(ctx) self._checkin_worker.queue_action_update( - ActionUpdate(external_id=action.external_id, status=ActionStatus.succeeded) + ActionUpdate( + external_id=action.external_id, + status=ActionStatus.succeeded, + result_message=ctx._result_message, + result_metadata=ctx._result_metadata, + ) ) except ActionError as e: self._checkin_worker.queue_action_update( diff --git a/tests/test_unstable/test_action_dispatch.py b/tests/test_unstable/test_action_dispatch.py index 7b61a1ed..83a91a20 100644 --- a/tests/test_unstable/test_action_dispatch.py +++ b/tests/test_unstable/test_action_dispatch.py @@ -261,3 +261,18 @@ def test_start_registers_handle_actions_as_dispatcher() -> None: registered = extractor._checkin_worker.set_action_dispatcher.call_args[0][0] assert registered.__func__.__name__ == "_handle_actions" assert registered.__self__ is extractor + + +def test_set_result_propagates_to_succeeded_action_update() -> None: + extractor = _make_extractor() + + def action_with_result(ctx: ActionContext) -> None: + ctx.set_result("3 files uploaded", metadata={"total_files": "3", "uploaded_files": "3"}) + + extractor.add_action(CustomAction(name="upload", target=action_with_result)) + action = Action(external_id="act-r", action_name="upload", status=ActionStatus.pending) + extractor._dispatch_single_action(action) + updates = [c[0][0] for c in extractor._checkin_worker.queue_action_update.call_args_list] + succeeded = next(u for u in updates if u.status == ActionStatus.succeeded) + assert succeeded.result_message == "3 files uploaded" + assert succeeded.result_metadata == {"total_files": "3", "uploaded_files": "3"} diff --git a/tests/test_unstable/test_bounded_reader.py b/tests/test_unstable/test_bounded_reader.py index 286cce17..bdcb7e4f 100644 --- a/tests/test_unstable/test_bounded_reader.py +++ b/tests/test_unstable/test_bounded_reader.py @@ -93,6 +93,33 @@ def test_tell_tracks_bytes_consumed(tmp_path: Path) -> None: assert reader.tell() == 8 +def test_seek_rewinds_for_retry(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"hello world") + with open(path, "rb") as f: + reader = BoundedReader(f, 8) + assert reader.read(5) == b"hello" + assert reader.seek(0) == 0 + assert reader.tell() == 0 + assert reader.read(5) == b"hello" + + +def test_seek_relative(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"hello world") + with open(path, "rb") as f: + reader = BoundedReader(f, 8) + reader.read(3) + assert reader.seek(2, 1) == 5 + assert reader.tell() == 5 + assert reader.read(3) == b" wo" # bytes 5-7 of b"hello world" within snapshot of 8 + + +def test_seekable_mirrors_underlying_stream(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"data") + with open(path, "rb") as f: + reader = BoundedReader(f, 4) + assert reader.seekable() == f.seekable() + + def test_stream_shorter_than_snapshot_partial(tmp_path: Path) -> None: # File has 5 bytes but snapshot declares 10 (e.g. file was truncated after snapshot). # _remaining decrements by len(data) not to_read, so tell() reflects actual bytes read. diff --git a/tests/test_unstable/test_log_upload_action.py b/tests/test_unstable/test_log_upload_action.py index 505b541b..65c3ec2c 100644 --- a/tests/test_unstable/test_log_upload_action.py +++ b/tests/test_unstable/test_log_upload_action.py @@ -1,3 +1,4 @@ +import json from datetime import date, timedelta from pathlib import Path from unittest.mock import MagicMock @@ -12,6 +13,7 @@ from cognite.extractorutils.unstable.core._log_upload_action import ( MAX_DATE_RANGE_DAYS, _build_candidate_files, + _FileUploadResult, _resolve_log_file_path, ) from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError, CustomAction @@ -297,3 +299,153 @@ def test_range_spanning_rotated_and_live_file(tmp_path: Path) -> None: assert rotated.is_current is False assert live.is_current is True assert live.path == base + + +def test_file_external_id_naming_convention() -> None: + from cognite.extractorutils.unstable.core._log_upload_action import _file_external_id + + assert _file_external_id("my-extractor", date(2026, 6, 1)) == "extractor-logs-my-extractor-2026-06-01" + + +def test_upload_candidate_calls_cdf_upload_for_rotated_file(tmp_path: Path) -> None: + from cognite.extractorutils.unstable.core._log_upload_action import LogFileCandidate, _upload_candidate + + path = tmp_path / "extractor.log.2026-06-01" + path.write_bytes(b"log line\n" * 100) + candidate = LogFileCandidate(log_date=date(2026, 6, 1), path=path, is_current=False) + mock_client = MagicMock() + + result = _upload_candidate(candidate, "my-extractor", mock_client, snapshot_size=None) + + assert result.status == "uploaded" + assert result.size_bytes == path.stat().st_size + assert result.file_external_id == "extractor-logs-my-extractor-2026-06-01" + mock_client.files.upload_bytes.assert_called_once() + _, kwargs = mock_client.files.upload_bytes.call_args + assert kwargs["external_id"] == "extractor-logs-my-extractor-2026-06-01" + assert kwargs["overwrite"] is True + + +def test_upload_candidate_current_day_uses_bounded_reader(tmp_path: Path) -> None: + from cognite.extractorutils.unstable.core._bounded_reader import BoundedReader + from cognite.extractorutils.unstable.core._log_upload_action import LogFileCandidate, _upload_candidate + + path = tmp_path / "extractor.log" + path.write_bytes(b"x" * 500) + candidate = LogFileCandidate(log_date=_PAST_TODAY, path=path, is_current=True) + mock_client = MagicMock() + + result = _upload_candidate(candidate, "my-extractor", mock_client, snapshot_size=300) + + assert result.status == "uploaded" + assert result.size_bytes == 300 + _, kwargs = mock_client.files.upload_bytes.call_args + assert isinstance(kwargs["content"], BoundedReader) + assert len(kwargs["content"]) == 300 + + +def test_upload_candidate_exceeds_max_size_returns_skipped_too_large(tmp_path: Path) -> None: + from unittest.mock import patch + + from cognite.extractorutils.unstable.core._log_upload_action import ( + MAX_FILE_SIZE_BYTES, + LogFileCandidate, + _upload_candidate, + ) + + path = tmp_path / "huge.log" + path.write_bytes(b"x") # actual bytes don't matter — we mock stat + candidate = LogFileCandidate(log_date=date(2026, 6, 1), path=path, is_current=False) + mock_client = MagicMock() + + oversized = MAX_FILE_SIZE_BYTES + 1 + mock_stat_result = MagicMock() + mock_stat_result.st_size = oversized + with patch("pathlib.Path.stat", return_value=mock_stat_result): + result = _upload_candidate(candidate, "my-extractor", mock_client, snapshot_size=None) + + assert result.status == "skipped_too_large" + assert result.size_bytes == oversized + mock_client.files.upload_bytes.assert_not_called() + + +def test_upload_candidate_cdf_error_returns_failed(tmp_path: Path) -> None: + from cognite.extractorutils.unstable.core._log_upload_action import LogFileCandidate, _upload_candidate + + path = tmp_path / "extractor.log.2026-06-01" + path.write_bytes(b"data") + candidate = LogFileCandidate(log_date=date(2026, 6, 1), path=path, is_current=False) + mock_client = MagicMock() + mock_client.files.upload_bytes.side_effect = RuntimeError("CDF unavailable") + + result = _upload_candidate(candidate, "my-extractor", mock_client, snapshot_size=None) + + assert result.status == "failed" + assert "CDF unavailable" in (result.error or "") + + +def test_fetch_logs_action_upload_sets_result_metadata(tmp_path: Path) -> None: + log_path = tmp_path / "extractor.log" + (tmp_path / "extractor.log.2026-06-10").write_bytes(b"log line\n" * 50) + extractor = _make_extractor(log_path=log_path) + updates = _dispatch(extractor, {"start_date": "2026-06-10", "end_date": "2026-06-10"}) + succeeded = next(u for u in updates if u.status == ActionStatus.succeeded) + assert succeeded.result_message is not None + assert "1 of 1" in succeeded.result_message + assert succeeded.result_metadata is not None + assert succeeded.result_metadata["uploaded_files"] == "1" + assert succeeded.result_metadata["failed_files"] == "0" + assert succeeded.result_metadata["total_files"] == "1" + files = json.loads(succeeded.result_metadata["files"]) + assert len(files) == 1 + assert files[0]["status"] == "uploaded" + assert files[0]["date"] == "2026-06-10" + + +def test_fetch_logs_action_missing_files_reported_in_metadata(tmp_path: Path) -> None: + log_path = tmp_path / "extractor.log" + # Only 2026-06-11 exists; 2026-06-10 is missing + (tmp_path / "extractor.log.2026-06-11").write_bytes(b"data") + extractor = _make_extractor(log_path=log_path) + updates = _dispatch(extractor, {"start_date": "2026-06-10", "end_date": "2026-06-11"}) + succeeded = next(u for u in updates if u.status == ActionStatus.succeeded) + assert succeeded.result_metadata is not None + assert succeeded.result_metadata["uploaded_files"] == "1" + assert succeeded.result_metadata["skipped_files"] == "1" + assert succeeded.result_metadata["total_files"] == "2" + files = json.loads(succeeded.result_metadata["files"]) + by_date = {f["date"]: f for f in files} + assert by_date["2026-06-10"]["status"] == "skipped" + assert by_date["2026-06-11"]["status"] == "uploaded" + + +def test_fetch_logs_action_all_files_missing_still_succeeds(tmp_path: Path) -> None: + log_path = tmp_path / "extractor.log" + extractor = _make_extractor(log_path=log_path) + updates = _dispatch(extractor, {"start_date": "2026-06-10", "end_date": "2026-06-10"}) + succeeded = next(u for u in updates if u.status == ActionStatus.succeeded) + assert succeeded.result_metadata is not None + assert succeeded.result_metadata["uploaded_files"] == "0" + assert succeeded.result_metadata["skipped_files"] == "1" + + +def test_fetch_logs_action_upload_failure_still_succeeds(tmp_path: Path) -> None: + from unittest.mock import patch + + log_path = tmp_path / "extractor.log" + (tmp_path / "extractor.log.2026-06-10").write_bytes(b"data") + extractor = _make_extractor(log_path=log_path) + with patch( + "cognite.extractorutils.unstable.core._log_upload_action._upload_candidate", + return_value=_FileUploadResult( + log_date=date(2026, 6, 10), + file_external_id="extractor-logs-test-integration-2026-06-10", + status="failed", + error="upload boom", + ), + ): + updates = _dispatch(extractor, {"start_date": "2026-06-10", "end_date": "2026-06-10"}) + succeeded = next(u for u in updates if u.status == ActionStatus.succeeded) + assert succeeded.result_metadata is not None + assert succeeded.result_metadata["failed_files"] == "1" + assert succeeded.result_metadata["uploaded_files"] == "0"