From 9d34837f473a24c184b1113a09ea7353dd4bf29e Mon Sep 17 00:00:00 2001 From: vikramlc Date: Fri, 19 Jun 2026 13:58:11 +0530 Subject: [PATCH 01/16] feat(odin): Register fetch logs action and validate input dates --- .../unstable/core/_log_upload_action.py | 54 +++++++ .../extractorutils/unstable/core/actions.py | 19 ++- cognite/extractorutils/unstable/core/base.py | 23 ++- tests/test_unstable/test_action_dispatch.py | 16 +- .../test_unstable/test_action_registration.py | 35 +++-- tests/test_unstable/test_log_upload_action.py | 145 ++++++++++++++++++ 6 files changed, 275 insertions(+), 17 deletions(-) create mode 100644 cognite/extractorutils/unstable/core/_log_upload_action.py create mode 100644 tests/test_unstable/test_log_upload_action.py diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py new file mode 100644 index 00000000..9806ec4d --- /dev/null +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -0,0 +1,54 @@ +"""Built-in ``fetch_logs`` action: streams rotated log files to CDF Files.""" + +from datetime import date + +from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError + +MAX_DATE_RANGE_DAYS = 7 +"""Maximum number of calendar days a single ``fetch_logs`` invocation may cover.""" + +_FETCH_LOGS_DESCRIPTION = ( + "Upload rotated log files to CDF Files for a given date range. " + f"At most {MAX_DATE_RANGE_DAYS} days per invocation." +) + + +def _parse_date(raw: str, field: str) -> date: + try: + return date.fromisoformat(raw) + except ValueError: + raise ActionError( + f"Invalid {field} '{raw}': expected ISO 8601 date (YYYY-MM-DD)", + error_type="invalid_parameter", + ) from None + + +def fetch_logs_action(ctx: ActionContext) -> None: + """Validate parameters and upload rotated log files for the requested date range to CDF Files.""" + params = ctx.call_metadata or {} + + start_date_raw = params.get("start_date") + end_date_raw = params.get("end_date") + + if start_date_raw is None: + raise ActionError("Missing required parameter: start_date", error_type="missing_parameter") + if end_date_raw is None: + raise ActionError("Missing required parameter: end_date", error_type="missing_parameter") + + start_date = _parse_date(start_date_raw, "start_date") + end_date = _parse_date(end_date_raw, "end_date") + + if end_date < start_date: + raise ActionError( + f"end_date ({end_date}) must be on or after start_date ({start_date})", + error_type="invalid_date_range", + ) + + num_days = (end_date - start_date).days + 1 + if num_days > MAX_DATE_RANGE_DAYS: + raise ActionError( + f"Date range of {num_days} days exceeds the maximum of {MAX_DATE_RANGE_DAYS} days; " + "use multiple invocations for longer spans", + error_type="invalid_date_range", + ) + diff --git a/cognite/extractorutils/unstable/core/actions.py b/cognite/extractorutils/unstable/core/actions.py index 480f7c11..0193dc67 100644 --- a/cognite/extractorutils/unstable/core/actions.py +++ b/cognite/extractorutils/unstable/core/actions.py @@ -12,7 +12,7 @@ if TYPE_CHECKING: from cognite.extractorutils.unstable.core.base import Extractor -__all__ = ["ActionContext", "ActionTarget", "CustomAction"] +__all__ = ["ActionContext", "ActionError", "ActionTarget", "CustomAction"] class ActionContext(CogniteLogger): @@ -56,6 +56,23 @@ def _new_error( ) +class ActionError(Exception): + """Deliberate action failure with structured metadata for Odin result reporting.""" + + def __init__(self, message: str, *, error_type: str, details: str | None = None) -> None: + super().__init__(message) + self.error_type = error_type + self.details = details + + @property + def result_metadata(self) -> dict[str, str]: + """Structured metadata dict for the action update.""" + meta: dict[str, str] = {"error_type": self.error_type} + if self.details is not None: + meta["error_detail"] = self.details + return meta + + ActionTarget = Callable[["ActionContext"], None] diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index a8549b09..0fff2d52 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -98,8 +98,9 @@ def my_task_function(self, task_context: TaskContext) -> None: from cognite.extractorutils.unstable.core._dto import ( Task as DtoTask, ) +from cognite.extractorutils.unstable.core._log_upload_action import _FETCH_LOGS_DESCRIPTION, fetch_logs_action from cognite.extractorutils.unstable.core._messaging import RuntimeMessage -from cognite.extractorutils.unstable.core.actions import ActionContext, CustomAction +from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError, CustomAction from cognite.extractorutils.unstable.core.checkin_worker import CheckinWorker from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel from cognite.extractorutils.unstable.core.logger import CogniteLogger, RobustFileHandler @@ -205,6 +206,7 @@ def __init__(self, config: FullConfig[ConfigType], checkin_worker: CheckinWorker ) self.__init_tasks__() + self._register_builtin_actions() self.__init_actions__() def _setup_cancellation_watcher(self, cancel_event: MpEvent) -> None: @@ -346,6 +348,16 @@ def __init_tasks__(self) -> None: """ pass + def _register_builtin_actions(self) -> None: + """Register framework-level actions available on every extractor.""" + self.add_action( + CustomAction( + name="fetch_logs", + target=fetch_logs_action, + description=_FETCH_LOGS_DESCRIPTION, + ) + ) + def __init_actions__(self) -> None: """ This method should be overridden by subclasses to register custom actions. @@ -667,6 +679,15 @@ def _handle_custom_action(self, action: Action) -> None: self._checkin_worker.queue_action_update( ActionUpdate(external_id=action.external_id, status=ActionStatus.succeeded) ) + except ActionError as e: + self._checkin_worker.queue_action_update( + ActionUpdate( + external_id=action.external_id, + status=ActionStatus.failed, + result_message=str(e), + result_metadata=e.result_metadata, + ) + ) except Exception as e: self._checkin_worker.queue_action_update( ActionUpdate( diff --git a/tests/test_unstable/test_action_dispatch.py b/tests/test_unstable/test_action_dispatch.py index e885d617..7b61a1ed 100644 --- a/tests/test_unstable/test_action_dispatch.py +++ b/tests/test_unstable/test_action_dispatch.py @@ -6,7 +6,7 @@ import pytest from cognite.extractorutils.unstable.core._dto import Action, ActionStatus, ActionUpdate -from cognite.extractorutils.unstable.core.actions import ActionContext, CustomAction +from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError, CustomAction from cognite.extractorutils.unstable.core.base import FullConfig from cognite.extractorutils.unstable.core.tasks import ScheduledTask, TaskContext @@ -170,6 +170,20 @@ def target(ctx: ActionContext) -> None: assert expected_message in (updates[-1].result_message or "") +def test_action_error_sets_result_metadata_and_keeps_failed_status() -> None: + def target(ctx: ActionContext) -> None: + raise ActionError("bad input", error_type="invalid_parameter") + + extractor = _make_extractor() + extractor.add_action(CustomAction(name="strict", target=target)) + extractor._dispatch_single_action(_make_action("act-err", "strict")) + + updates = _queued_updates(extractor) + failed = next(u for u in updates if u.status == ActionStatus.failed) + assert failed.result_metadata == {"error_type": "invalid_parameter"} + assert failed.result_message == "bad input" + + def test_custom_action_receives_call_metadata_in_context() -> None: received_metadata: list[dict | None] = [] diff --git a/tests/test_unstable/test_action_registration.py b/tests/test_unstable/test_action_registration.py index 6d3142d3..44fa50be 100644 --- a/tests/test_unstable/test_action_registration.py +++ b/tests/test_unstable/test_action_registration.py @@ -39,21 +39,25 @@ def _startup_request(extractor: Extractor) -> StartupRequest: ), ], ) -def test_available_actions_none_without_scheduled_tasks(extra_tasks: list) -> None: +def test_available_actions_without_scheduled_tasks_only_has_builtins(extra_tasks: list) -> None: extractor = _make_extractor() for task in extra_tasks: extractor.add_task(task) - assert _startup_request(extractor).available_actions is None + req = _startup_request(extractor) + assert req.available_actions is not None + # Only built-in actions present — no start/stop actions from scheduled tasks + task_action_names = {a.name for a in req.available_actions if a.type != ActionType.custom} + assert task_action_names == set() -def test_two_scheduled_tasks_produce_four_available_actions() -> None: +def test_two_scheduled_tasks_produce_four_task_start_stop_actions() -> None: extractor = _make_extractor() extractor.add_task(ScheduledTask.from_interval(interval="1h", name="alpha", target=lambda _: None)) extractor.add_task(ScheduledTask.from_interval(interval="2h", name="beta", target=lambda _: None)) req = _startup_request(extractor) assert req.available_actions is not None - assert len(req.available_actions) == 4 - assert {a.name for a in req.available_actions} == {"Start alpha", "Stop alpha", "Start beta", "Stop beta"} + task_action_names = {a.name for a in req.available_actions if a.type != ActionType.custom} + assert task_action_names == {"Start alpha", "Stop alpha", "Start beta", "Stop beta"} @pytest.mark.parametrize( @@ -79,19 +83,22 @@ def test_scheduled_and_custom_actions_combined_ordering() -> None: extractor.add_action(CustomAction(name="flush", target=lambda _: None)) req = _startup_request(extractor) assert req.available_actions is not None - assert len(req.available_actions) == 3 names = [a.name for a in req.available_actions] - assert names == ["Start sync", "Stop sync", "flush"] + # Built-in actions precede scheduled-task start/stop, which precede user custom actions + assert names.index("Start sync") < names.index("flush") + assert names.index("Stop sync") < names.index("flush") + assert "fetch_logs" in names def test_custom_action_appears_with_correct_type_and_description() -> None: extractor = _make_extractor() extractor.add_action(CustomAction(name="flush cache", target=lambda _: None, description="Clears state")) actions = _startup_request(extractor).available_actions - assert actions is not None and len(actions) == 1 - assert actions[0].name == "flush cache" - assert actions[0].type == ActionType.custom - assert actions[0].description == "Clears state" + assert actions is not None + by_name = {a.name: a for a in actions} + assert "flush cache" in by_name + assert by_name["flush cache"].type == ActionType.custom + assert by_name["flush cache"].description == "Clears state" def test_init_actions_hook_called_after_init_tasks_and_can_register_actions() -> None: @@ -107,15 +114,15 @@ def __init_actions__(self) -> None: extractor = _make_extractor(_Ext) assert call_order == ["tasks", "actions"] - assert len(extractor._custom_actions) == 1 - assert extractor._custom_actions[0].name == "ping" + assert any(a.name == "ping" for a in extractor._custom_actions) def test_multiple_add_action_calls_accumulate_in_registration_order() -> None: extractor = _make_extractor() for name in ("a1", "a2", "a3"): extractor.add_action(CustomAction(name=name, target=lambda _: None)) - assert [a.name for a in extractor._custom_actions] == ["a1", "a2", "a3"] + names = [a.name for a in extractor._custom_actions] + assert names.index("a1") < names.index("a2") < names.index("a3") def test_add_action_raises_on_duplicate_name() -> None: diff --git a/tests/test_unstable/test_log_upload_action.py b/tests/test_unstable/test_log_upload_action.py new file mode 100644 index 00000000..3648e476 --- /dev/null +++ b/tests/test_unstable/test_log_upload_action.py @@ -0,0 +1,145 @@ +from datetime import date, timedelta +from unittest.mock import MagicMock + +import pytest + +from cognite.extractorutils.unstable.core._dto import Action, ActionStatus, ActionUpdate +from cognite.extractorutils.unstable.core._log_upload_action import MAX_DATE_RANGE_DAYS +from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError, CustomAction +from cognite.extractorutils.unstable.core.base import FullConfig + +from .conftest import TestConfig, TestExtractor + + +def _make_extractor() -> TestExtractor: + conn = MagicMock() + conn.integration.external_id = "test-integration" + full_config = FullConfig( + connection_config=conn, + application_config=TestConfig(parameter_one=1, parameter_two="a"), + current_config_revision=1, + ) + return TestExtractor(full_config, MagicMock()) + + +def _queued_updates(extractor: TestExtractor) -> list[ActionUpdate]: + return [c[0][0] for c in extractor._checkin_worker.queue_action_update.call_args_list] + + +def _dispatch(extractor: TestExtractor, call_metadata: dict[str, str] | None) -> list[ActionUpdate]: + action = Action( + external_id="act-1", + action_name="fetch_logs", + status=ActionStatus.pending, + call_metadata=call_metadata, + ) + extractor._dispatch_single_action(action) + return _queued_updates(extractor) + + +def _failed_update(updates: list[ActionUpdate]) -> ActionUpdate: + return next(u for u in updates if u.status == ActionStatus.failed) + + +def test_fetch_logs_registered_as_builtin_with_description() -> None: + extractor = _make_extractor() + action = next((a for a in extractor._custom_actions if a.name == "fetch_logs"), None) + assert action is not None + assert action.description + + +def test_registering_fetch_logs_as_user_action_raises() -> None: + extractor = _make_extractor() + with pytest.raises(ValueError, match="fetch_logs"): + extractor.add_action(CustomAction(name="fetch_logs", target=lambda ctx: None)) + + +@pytest.mark.parametrize( + "call_metadata,missing_field", + [ + (None, "start_date"), + ({"end_date": "2026-06-10"}, "start_date"), + ({"start_date": "2026-06-10"}, "end_date"), + ], + ids=["no_metadata", "missing_start_date", "missing_end_date"], +) +def test_missing_required_date_reports_missing_parameter( + call_metadata: dict[str, str] | None, missing_field: str +) -> None: + extractor = _make_extractor() + updates = _dispatch(extractor, call_metadata) + failed = _failed_update(updates) + assert failed.result_metadata == {"error_type": "missing_parameter"} + assert missing_field in (failed.result_message or "") + + +@pytest.mark.parametrize( + "call_metadata,bad_field", + [ + ({"start_date": "not-a-date", "end_date": "2026-06-10"}, "start_date"), + ({"start_date": "2026-06-10", "end_date": "2026/06/11"}, "end_date"), + ({"start_date": "2026-13-01", "end_date": "2026-06-11"}, "start_date"), + ], + ids=["invalid_start", "slash_end", "out_of_range_month"], +) +def test_non_iso_date_reports_invalid_parameter(call_metadata: dict[str, str], bad_field: str) -> None: + extractor = _make_extractor() + updates = _dispatch(extractor, call_metadata) + failed = _failed_update(updates) + assert failed.result_metadata == {"error_type": "invalid_parameter"} + assert bad_field in (failed.result_message or "") + + +@pytest.mark.parametrize( + "call_metadata,message_contains", + [ + ({"start_date": "2026-06-10", "end_date": "2026-06-09"}, None), + ( + { + "start_date": str(date(2026, 6, 1)), + "end_date": str(date(2026, 6, 1) + timedelta(days=MAX_DATE_RANGE_DAYS)), + }, + str(MAX_DATE_RANGE_DAYS), + ), + ], + ids=["end_before_start", "exceeds_max_days"], +) +def test_invalid_date_range_reports_invalid_date_range( + call_metadata: dict[str, str], message_contains: str | None +) -> None: + extractor = _make_extractor() + updates = _dispatch(extractor, call_metadata) + failed = _failed_update(updates) + assert failed.result_metadata == {"error_type": "invalid_date_range"} + if message_contains: + assert message_contains in (failed.result_message or "") + + +@pytest.mark.parametrize( + "start_str,end_str", + [ + ("2026-06-10", "2026-06-10"), + ("2026-06-01", str(date(2026, 6, 1) + timedelta(days=MAX_DATE_RANGE_DAYS - 1))), + ], + ids=["single_day", "exact_max_days"], +) +def test_valid_date_range_succeeds(start_str: str, end_str: str) -> None: + extractor = _make_extractor() + updates = _dispatch(extractor, {"start_date": start_str, "end_date": end_str}) + statuses = [u.status for u in updates] + assert ActionStatus.succeeded in statuses + assert ActionStatus.failed not in statuses + + +def test_action_error_details_included_in_result_metadata() -> None: + extractor = _make_extractor() + + def raise_with_details(ctx: ActionContext) -> None: + raise ActionError("boom", error_type="unexpected_error", details="inner detail") + + extractor.add_action(CustomAction(name="boom-action", target=raise_with_details)) + action = Action(external_id="act-detail", action_name="boom-action", status=ActionStatus.pending) + extractor._dispatch_single_action(action) + failed = _failed_update(_queued_updates(extractor)) + assert failed.result_metadata == {"error_type": "unexpected_error", "error_detail": "inner detail"} + assert failed.result_message == "boom" From a480f65139b95a320aa2fe4f5a46e0d360cee4d0 Mon Sep 17 00:00:00 2001 From: vikramlc Date: Fri, 19 Jun 2026 14:16:33 +0530 Subject: [PATCH 02/16] fix(odin): Fix lint issue and added type error --- .../extractorutils/unstable/core/_log_upload_action.py | 6 ++---- tests/test_unstable/test_log_upload_action.py | 10 ++++++++++ 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index 9806ec4d..f395627e 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -8,15 +8,14 @@ """Maximum number of calendar days a single ``fetch_logs`` invocation may cover.""" _FETCH_LOGS_DESCRIPTION = ( - "Upload rotated log files to CDF Files for a given date range. " - f"At most {MAX_DATE_RANGE_DAYS} days per invocation." + f"Upload rotated log files to CDF Files for a given date range. At most {MAX_DATE_RANGE_DAYS} days per invocation." ) def _parse_date(raw: str, field: str) -> date: try: return date.fromisoformat(raw) - except ValueError: + except (ValueError, TypeError): raise ActionError( f"Invalid {field} '{raw}': expected ISO 8601 date (YYYY-MM-DD)", error_type="invalid_parameter", @@ -51,4 +50,3 @@ def fetch_logs_action(ctx: ActionContext) -> None: "use multiple invocations for longer spans", error_type="invalid_date_range", ) - diff --git a/tests/test_unstable/test_log_upload_action.py b/tests/test_unstable/test_log_upload_action.py index 3648e476..1fc00915 100644 --- a/tests/test_unstable/test_log_upload_action.py +++ b/tests/test_unstable/test_log_upload_action.py @@ -90,6 +90,16 @@ def test_non_iso_date_reports_invalid_parameter(call_metadata: dict[str, str], b assert bad_field in (failed.result_message or "") +def test_parse_date_non_string_type_raises_action_error() -> None: + # Pydantic guards dict[str, str] at the DTO boundary, but _parse_date may be called + # directly, so TypeError from date.fromisoformat must also surface as ActionError. + from cognite.extractorutils.unstable.core._log_upload_action import _parse_date + + with pytest.raises(ActionError) as exc_info: + _parse_date(20260610, "start_date") # type: ignore[arg-type] + assert exc_info.value.error_type == "invalid_parameter" + + @pytest.mark.parametrize( "call_metadata,message_contains", [ From 8670c5edd044ab35496f42a72c6ba6fdefca9d0f Mon Sep 17 00:00:00 2001 From: vikramlc Date: Fri, 19 Jun 2026 15:14:28 +0530 Subject: [PATCH 03/16] feat(odin): Build candidate log file list --- .../unstable/core/_log_upload_action.py | 86 +++++++++- tests/test_unstable/test_log_upload_action.py | 153 +++++++++++++++--- 2 files changed, 219 insertions(+), 20 deletions(-) diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index f395627e..001ab9ca 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -1,7 +1,11 @@ """Built-in ``fetch_logs`` action: streams rotated log files to CDF Files.""" -from datetime import date +from dataclasses import dataclass +from datetime import date, timedelta, timezone +from datetime import datetime as dt +from pathlib import Path +from cognite.extractorutils.unstable.configuration.models import ExtractorConfig, LogFileHandlerConfig from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError MAX_DATE_RANGE_DAYS = 7 @@ -12,6 +16,19 @@ ) +@dataclass(frozen=True) +class LogFileCandidate: + """A log file resolved for a given date that exists and has content.""" + + date: date + path: Path + is_current: bool # True when path is the live (unrotated) file.log + + +def _today_utc() -> date: + return dt.now(tz=timezone.utc).date() + + def _parse_date(raw: str, field: str) -> date: try: return date.fromisoformat(raw) @@ -22,6 +39,63 @@ def _parse_date(raw: str, field: str) -> date: ) from None +def _resolve_log_file_path(config: ExtractorConfig) -> Path | None: + """Return the base log file path from the first file handler in config, or None.""" + for handler in config.log_handlers: + if isinstance(handler, LogFileHandlerConfig): + return handler.path + return None + + +def _build_candidate_files( + base_path: Path, + start_date: date, + end_date: date, + today: date, +) -> tuple[list[LogFileCandidate], list[date]]: + """ + Enumerate log files for [start_date, end_date] and partition into candidates vs skipped. + + Rotated files follow the naming convention ``.YYYY-MM-DD``. + The live file (``base_path``) is used for ``today``; all other dates use the rotated name. + + A file is skipped when it does not exist or is empty. This covers the brief rotation + race window: if the action is dispatched right after midnight before ``TimedRotatingFileHandler`` + has renamed ``file.log`` to ``file.log.YYYY-MM-DD``, the rotated file will not be found and + that date will appear in ``skipped``. Retrying after rotation completes (within seconds) + will pick it up. + + Returns: + (candidates, skipped): candidates are files that exist and have content; + skipped contains dates for which no usable file was found. + """ + candidates: list[LogFileCandidate] = [] + skipped: list[date] = [] + + current = start_date + while current <= end_date: + if current == today: + path = base_path + is_current = True + else: + path = base_path.parent / f"{base_path.name}.{current.isoformat()}" + is_current = False + + try: + size = path.stat().st_size + except FileNotFoundError: + skipped.append(current) + else: + if size > 0: + candidates.append(LogFileCandidate(date=current, path=path, is_current=is_current)) + else: + skipped.append(current) + + current += timedelta(days=1) + + return candidates, skipped + + def fetch_logs_action(ctx: ActionContext) -> None: """Validate parameters and upload rotated log files for the requested date range to CDF Files.""" params = ctx.call_metadata or {} @@ -50,3 +124,13 @@ def fetch_logs_action(ctx: ActionContext) -> None: "use multiple invocations for longer spans", error_type="invalid_date_range", ) + + log_file_path = _resolve_log_file_path(ctx._extractor.application_config) + if log_file_path is None: + raise ActionError( + "No file log handler configured; add a 'file' type log handler to enable log uploads", + error_type="no_file_handler_configured", + ) + + today = _today_utc() + _candidates, _skipped = _build_candidate_files(log_file_path, start_date, end_date, today) diff --git a/tests/test_unstable/test_log_upload_action.py b/tests/test_unstable/test_log_upload_action.py index 1fc00915..e60e8143 100644 --- a/tests/test_unstable/test_log_upload_action.py +++ b/tests/test_unstable/test_log_upload_action.py @@ -1,22 +1,36 @@ from datetime import date, timedelta +from pathlib import Path from unittest.mock import MagicMock import pytest +from cognite.extractorutils.unstable.configuration.models import ( + LogFileHandlerConfig, + LogLevel, +) from cognite.extractorutils.unstable.core._dto import Action, ActionStatus, ActionUpdate -from cognite.extractorutils.unstable.core._log_upload_action import MAX_DATE_RANGE_DAYS +from cognite.extractorutils.unstable.core._log_upload_action import ( + MAX_DATE_RANGE_DAYS, + _build_candidate_files, + _resolve_log_file_path, +) from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError, CustomAction from cognite.extractorutils.unstable.core.base import FullConfig from .conftest import TestConfig, TestExtractor +_PAST_TODAY = date(2026, 6, 19) -def _make_extractor() -> TestExtractor: + +def _make_extractor(log_path: Path | None = None) -> TestExtractor: conn = MagicMock() conn.integration.external_id = "test-integration" + config_kwargs: dict = {"parameter_one": 1, "parameter_two": "a"} + if log_path is not None: + config_kwargs["log_handlers"] = [LogFileHandlerConfig(type="file", path=log_path, level=LogLevel.INFO)] full_config = FullConfig( connection_config=conn, - application_config=TestConfig(parameter_one=1, parameter_two="a"), + application_config=TestConfig(**config_kwargs), current_config_revision=1, ) return TestExtractor(full_config, MagicMock()) @@ -125,22 +139,6 @@ def test_invalid_date_range_reports_invalid_date_range( assert message_contains in (failed.result_message or "") -@pytest.mark.parametrize( - "start_str,end_str", - [ - ("2026-06-10", "2026-06-10"), - ("2026-06-01", str(date(2026, 6, 1) + timedelta(days=MAX_DATE_RANGE_DAYS - 1))), - ], - ids=["single_day", "exact_max_days"], -) -def test_valid_date_range_succeeds(start_str: str, end_str: str) -> None: - extractor = _make_extractor() - updates = _dispatch(extractor, {"start_date": start_str, "end_date": end_str}) - statuses = [u.status for u in updates] - assert ActionStatus.succeeded in statuses - assert ActionStatus.failed not in statuses - - def test_action_error_details_included_in_result_metadata() -> None: extractor = _make_extractor() @@ -153,3 +151,120 @@ def raise_with_details(ctx: ActionContext) -> None: failed = _failed_update(_queued_updates(extractor)) assert failed.result_metadata == {"error_type": "unexpected_error", "error_detail": "inner detail"} assert failed.result_message == "boom" + + +def test_resolve_log_file_path_returns_none_for_console_only_config() -> None: + config = TestConfig(parameter_one=1, parameter_two="a") + assert _resolve_log_file_path(config) is None + + +@pytest.mark.parametrize( + "handler_names,expected_name", + [ + (["extractor.log"], "extractor.log"), + (["first.log", "second.log"], "first.log"), + ], + ids=["single_handler", "multiple_handlers_returns_first"], +) +def test_resolve_log_file_path_with_file_handlers(tmp_path: Path, handler_names: list[str], expected_name: str) -> None: + config = TestConfig( + parameter_one=1, + parameter_two="a", + log_handlers=[LogFileHandlerConfig(type="file", path=tmp_path / n, level=LogLevel.INFO) for n in handler_names], + ) + assert _resolve_log_file_path(config) == tmp_path / expected_name + + +def test_existing_rotated_files_become_candidates(tmp_path: Path) -> None: + base = tmp_path / "extractor.log" + start = date(2026, 6, 1) + end = date(2026, 6, 3) + for d in [start, start + timedelta(days=1), end]: + (tmp_path / f"extractor.log.{d.isoformat()}").write_bytes(b"data") + candidates, skipped = _build_candidate_files(base, start, end, _PAST_TODAY) + assert len(candidates) == 3 + assert skipped == [] + assert all(not c.is_current for c in candidates) + + +@pytest.mark.parametrize( + "create_file", + [False, True], + ids=["missing", "empty"], +) +def test_unusable_file_goes_to_skipped(tmp_path: Path, create_file: bool) -> None: + # Covers missing files (including the rotation race window) and 0-byte files. + base = tmp_path / "extractor.log" + d = date(2026, 6, 1) + if create_file: + (tmp_path / f"extractor.log.{d.isoformat()}").touch() # 0 bytes + candidates, skipped = _build_candidate_files(base, d, d, _PAST_TODAY) + assert candidates == [] + assert skipped == [d] + + +def test_today_uses_live_file_not_rotated_name(tmp_path: Path) -> None: + base = tmp_path / "extractor.log" + base.write_bytes(b"today data") + candidates, skipped = _build_candidate_files(base, _PAST_TODAY, _PAST_TODAY, _PAST_TODAY) + assert len(candidates) == 1 + assert candidates[0].path == base + assert candidates[0].is_current is True + assert skipped == [] + + +def test_mixed_range_partitions_correctly(tmp_path: Path) -> None: + base = tmp_path / "extractor.log" + start = date(2026, 6, 1) + end = date(2026, 6, 3) + (tmp_path / "extractor.log.2026-06-01").write_bytes(b"data") + (tmp_path / "extractor.log.2026-06-03").write_bytes(b"data") + candidates, skipped = _build_candidate_files(base, start, end, _PAST_TODAY) + assert len(candidates) == 2 + assert skipped == [date(2026, 6, 2)] + + +@pytest.mark.parametrize( + "start,end", + [ + (_PAST_TODAY, _PAST_TODAY), + (date(2026, 6, 1), date(2026, 6, 1) + timedelta(days=MAX_DATE_RANGE_DAYS - 1)), + ], + ids=["single_today", "exact_max_historical"], +) +def test_boundary_ranges_produce_correct_candidate_count(tmp_path: Path, start: date, end: date) -> None: + base = tmp_path / "extractor.log" + expected = (end - start).days + 1 + if start == _PAST_TODAY: + base.write_bytes(b"data") + else: + current = start + while current <= end: + (tmp_path / f"extractor.log.{current.isoformat()}").write_bytes(b"data") + current += timedelta(days=1) + candidates, skipped = _build_candidate_files(base, start, end, _PAST_TODAY) + assert len(candidates) == expected + assert skipped == [] + + +def test_no_file_handler_reports_no_file_handler_configured() -> None: + extractor = _make_extractor() + updates = _dispatch(extractor, {"start_date": "2026-06-01", "end_date": "2026-06-07"}) + failed = _failed_update(updates) + assert failed.result_metadata == {"error_type": "no_file_handler_configured"} + + +@pytest.mark.parametrize( + "create_log_file", + [True, False], + ids=["files_present", "all_files_missing"], +) +def test_fetch_logs_action_with_file_handler_succeeds(tmp_path: Path, create_log_file: bool) -> None: + log_path = tmp_path / "extractor.log" + if create_log_file: + (tmp_path / "extractor.log.2026-06-10").write_bytes(b"log data") + extractor = _make_extractor(log_path=log_path) + updates = _dispatch(extractor, {"start_date": "2026-06-10", "end_date": "2026-06-10"}) + statuses = [u.status for u in updates] + assert ActionStatus.succeeded in statuses + assert ActionStatus.failed not in statuses From 471e5e15b1c05cb5371def55b7014d1ee584ca58 Mon Sep 17 00:00:00 2001 From: vikramlc Date: Fri, 19 Jun 2026 16:16:00 +0530 Subject: [PATCH 04/16] refactor(odin): Fix gemini comments --- .../unstable/core/_log_upload_action.py | 21 ++++++++++++------- tests/test_unstable/test_log_upload_action.py | 3 ++- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index 001ab9ca..b0e13f4b 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -59,11 +59,11 @@ def _build_candidate_files( Rotated files follow the naming convention ``.YYYY-MM-DD``. The live file (``base_path``) is used for ``today``; all other dates use the rotated name. - A file is skipped when it does not exist or is empty. This covers the brief rotation - race window: if the action is dispatched right after midnight before ``TimedRotatingFileHandler`` - has renamed ``file.log`` to ``file.log.YYYY-MM-DD``, the rotated file will not be found and - that date will appear in ``skipped``. Retrying after rotation completes (within seconds) - will pick it up. + A file is skipped when it does not exist, is empty, or is inaccessible (any ``OSError``). + This covers the brief rotation race window: if the action is dispatched right after midnight + before ``TimedRotatingFileHandler`` has renamed ``file.log`` to ``file.log.YYYY-MM-DD``, + the rotated file will not be found and that date will appear in ``skipped``. Retrying after + rotation completes (within seconds) will pick it up. Returns: (candidates, skipped): candidates are files that exist and have content; @@ -83,7 +83,7 @@ def _build_candidate_files( try: size = path.stat().st_size - except FileNotFoundError: + except OSError: skipped.append(current) else: if size > 0: @@ -117,6 +117,14 @@ def fetch_logs_action(ctx: ActionContext) -> None: error_type="invalid_date_range", ) + today = _today_utc() + + if end_date > today: + raise ActionError( + f"end_date ({end_date}) cannot be in the future (today is {today})", + error_type="invalid_date_range", + ) + num_days = (end_date - start_date).days + 1 if num_days > MAX_DATE_RANGE_DAYS: raise ActionError( @@ -132,5 +140,4 @@ def fetch_logs_action(ctx: ActionContext) -> None: error_type="no_file_handler_configured", ) - today = _today_utc() _candidates, _skipped = _build_candidate_files(log_file_path, start_date, end_date, today) diff --git a/tests/test_unstable/test_log_upload_action.py b/tests/test_unstable/test_log_upload_action.py index e60e8143..ac21d243 100644 --- a/tests/test_unstable/test_log_upload_action.py +++ b/tests/test_unstable/test_log_upload_action.py @@ -125,8 +125,9 @@ def test_parse_date_non_string_type_raises_action_error() -> None: }, str(MAX_DATE_RANGE_DAYS), ), + ({"start_date": "2020-01-01", "end_date": "2099-01-01"}, "future"), ], - ids=["end_before_start", "exceeds_max_days"], + ids=["end_before_start", "exceeds_max_days", "end_date_in_future"], ) def test_invalid_date_range_reports_invalid_date_range( call_metadata: dict[str, str], message_contains: str | None From 82d8b6de597fbe3a7f6496f8837089b2e4f63739 Mon Sep 17 00:00:00 2001 From: vikramlc Date: Mon, 22 Jun 2026 14:21:15 +0530 Subject: [PATCH 05/16] feat(odin): Implement bounded reader --- .../unstable/core/_bounded_reader.py | 46 +++++++++++ tests/test_unstable/test_bounded_reader.py | 82 +++++++++++++++++++ 2 files changed, 128 insertions(+) create mode 100644 cognite/extractorutils/unstable/core/_bounded_reader.py create mode 100644 tests/test_unstable/test_bounded_reader.py diff --git a/cognite/extractorutils/unstable/core/_bounded_reader.py b/cognite/extractorutils/unstable/core/_bounded_reader.py new file mode 100644 index 00000000..1241657d --- /dev/null +++ b/cognite/extractorutils/unstable/core/_bounded_reader.py @@ -0,0 +1,46 @@ +"""Bounded binary reader for point-in-time log file uploads.""" + +from typing import BinaryIO + + +class BoundedReader: + """ + Wraps a binary file handle and limits reads to a byte count captured at snapshot time. + + Implements ``__len__`` so ``requests.utils.super_len()`` bypasses ``os.fstat()`` + and declares the correct HTTP ``Content-Length``. This is critical when uploading + the active log file (``file.log``) while the logger is still appending to it: + without the cap, the upload reads past the declared length and httpx raises + ``LocalProtocolError: "Too much data for declared Content-Length"``. + + Usage:: + + snapshot_size = os.path.getsize(log_path) + with BoundedReader(open(log_path, "rb"), snapshot_size) as reader: + upload_queue.add_io_to_upload_queue(file_meta, lambda: reader, ...) + """ + + def __init__(self, f: BinaryIO, max_bytes: int) -> None: + self._f = f + self._size = max_bytes + self._remaining = max_bytes + + def __len__(self) -> int: + # super_len() checks __len__ before os.fstat — returning the fixed snapshot + # size here ensures Content-Length is declared at the moment of the snapshot, + # not at the moment the upload resolves the inode size (which may have grown). + return self._size + + def read(self, size: int = -1) -> bytes: + if self._remaining <= 0: + return b"" + to_read = self._remaining if size < 0 else min(size, self._remaining) + data = self._f.read(to_read) + self._remaining -= len(data) + return data + + def __enter__(self) -> "BoundedReader": + return self + + def __exit__(self, *_: object) -> None: + self._f.close() diff --git a/tests/test_unstable/test_bounded_reader.py b/tests/test_unstable/test_bounded_reader.py new file mode 100644 index 00000000..2e2844c3 --- /dev/null +++ b/tests/test_unstable/test_bounded_reader.py @@ -0,0 +1,82 @@ +from pathlib import Path + +import pytest +from requests.utils import super_len + +from cognite.extractorutils.unstable.core._bounded_reader import BoundedReader + + +def _make_file(tmp_path: Path, content: bytes) -> Path: + p = tmp_path / "f.log" + p.write_bytes(content) + return p + + +def test_len_returns_snapshot_size_and_is_stable_after_reads(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"hello world") + with open(path, "rb") as f: + reader = BoundedReader(f, 5) + assert len(reader) == 5 + reader.read(3) + assert len(reader) == 5 # must not decrement with reads + + +@pytest.mark.parametrize( + "snapshot,read_arg,expected", + [ + (5, -1, b"hello"), # read() with no arg reads up to snapshot + (5, 3, b"hel"), # read(n) where n < remaining + (5, 10, b"hello"), # read(n) where n > snapshot — capped at snapshot + ], + ids=["read_all", "read_partial", "read_exceeds_snapshot"], +) +def test_read_respects_snapshot_bound(tmp_path: Path, snapshot: int, read_arg: int, expected: bytes) -> None: + path = _make_file(tmp_path, b"hello world") # 11 bytes — always larger than snapshot + with open(path, "rb") as f: + assert BoundedReader(f, snapshot).read(read_arg) == expected + + +def test_read_returns_empty_after_exhaustion(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"hello world") + with open(path, "rb") as f: + reader = BoundedReader(f, 5) + reader.read() + assert reader.read() == b"" + assert reader.read(100) == b"" + + +def test_zero_snapshot_returns_empty_immediately(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"data") + with open(path, "rb") as f: + reader = BoundedReader(f, 0) + assert len(reader) == 0 + assert reader.read() == b"" + + +def test_context_manager_closes_underlying_file(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"data") + f = open(path, "rb") # noqa: SIM115 + with BoundedReader(f, 4): + pass + assert f.closed + + +def test_super_len_reads_len_not_fstat(tmp_path: Path) -> None: + # Verify that requests.utils.super_len() uses __len__ (snapshot) rather than + # os.fstat().st_size (live inode size). The file is intentionally larger than + # the declared snapshot to confirm fstat is not consulted. + path = _make_file(tmp_path, b"hello world extended") # 20 bytes on disk + with open(path, "rb") as f: + reader = BoundedReader(f, 5) + assert super_len(reader) == 5 + + +def test_midnight_rotation_race_file_shorter_than_snapshot(tmp_path: Path) -> None: + # If TimedRotatingFileHandler rotates between snapshot and file open, the new + # file.log is empty. BoundedReader returns b"" — the upload layer sees "too little + # data" against the declared Content-Length. This is acceptable; the caller retries. + path = _make_file(tmp_path, b"") + with open(path, "rb") as f: + reader = BoundedReader(f, 1_000) + assert len(reader) == 1_000 # snapshot still declared + assert reader.read() == b"" # but file is empty From a20c8c92c520d32bf73faf80388b17ca6021205a Mon Sep 17 00:00:00 2001 From: vikramlc Date: Mon, 22 Jun 2026 18:46:20 +0530 Subject: [PATCH 06/16] refactor(odin): Address gemini comments --- .../unstable/core/_bounded_reader.py | 12 ++++++++++- tests/test_unstable/test_bounded_reader.py | 21 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/cognite/extractorutils/unstable/core/_bounded_reader.py b/cognite/extractorutils/unstable/core/_bounded_reader.py index 1241657d..54444945 100644 --- a/cognite/extractorutils/unstable/core/_bounded_reader.py +++ b/cognite/extractorutils/unstable/core/_bounded_reader.py @@ -31,6 +31,16 @@ def __len__(self) -> int: # not at the moment the upload resolves the inode size (which may have grown). return self._size + def tell(self) -> int: + return self._size - self._remaining + + @property + def closed(self) -> bool: + return self._f.closed + + def close(self) -> None: + self._f.close() + def read(self, size: int = -1) -> bytes: if self._remaining <= 0: return b"" @@ -43,4 +53,4 @@ def __enter__(self) -> "BoundedReader": return self def __exit__(self, *_: object) -> None: - self._f.close() + self.close() diff --git a/tests/test_unstable/test_bounded_reader.py b/tests/test_unstable/test_bounded_reader.py index 2e2844c3..0b608d4b 100644 --- a/tests/test_unstable/test_bounded_reader.py +++ b/tests/test_unstable/test_bounded_reader.py @@ -80,3 +80,24 @@ def test_midnight_rotation_race_file_shorter_than_snapshot(tmp_path: Path) -> No reader = BoundedReader(f, 1_000) assert len(reader) == 1_000 # snapshot still declared assert reader.read() == b"" # but file is empty + + +def test_tell_tracks_bytes_consumed(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"hello world") + with open(path, "rb") as f: + reader = BoundedReader(f, 8) + assert reader.tell() == 0 + reader.read(3) + assert reader.tell() == 3 + reader.read(5) + assert reader.tell() == 8 + + +def test_close_and_closed_property(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"data") + f = open(path, "rb") # noqa: SIM115 + reader = BoundedReader(f, 4) + assert not reader.closed + reader.close() + assert reader.closed + assert f.closed From acbecd5ab3e6290e227b65cf83c5902ccab217a4 Mon Sep 17 00:00:00 2001 From: vikramlc Date: Wed, 24 Jun 2026 15:03:44 +0530 Subject: [PATCH 07/16] refactor(odin): Add logger and validate actions order --- cognite/extractorutils/unstable/core/_log_upload_action.py | 5 +++++ tests/test_unstable/test_action_registration.py | 5 +++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index f395627e..9d6b20e1 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -1,9 +1,12 @@ """Built-in ``fetch_logs`` action: streams rotated log files to CDF Files.""" +import logging from datetime import date from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError +_logger = logging.getLogger(__name__) + MAX_DATE_RANGE_DAYS = 7 """Maximum number of calendar days a single ``fetch_logs`` invocation may cover.""" @@ -50,3 +53,5 @@ def fetch_logs_action(ctx: ActionContext) -> None: "use multiple invocations for longer spans", error_type="invalid_date_range", ) + + _logger.info("fetch_logs: uploading logs for %s to %s (%d day(s))", start_date, end_date, num_days) diff --git a/tests/test_unstable/test_action_registration.py b/tests/test_unstable/test_action_registration.py index 44fa50be..e8223f09 100644 --- a/tests/test_unstable/test_action_registration.py +++ b/tests/test_unstable/test_action_registration.py @@ -84,10 +84,11 @@ def test_scheduled_and_custom_actions_combined_ordering() -> None: req = _startup_request(extractor) assert req.available_actions is not None names = [a.name for a in req.available_actions] - # Built-in actions precede scheduled-task start/stop, which precede user custom actions + # Ordering: scheduled-task start/stop actions, then _custom_actions in registration order + # (_custom_actions = built-ins registered first, then user-registered actions) assert names.index("Start sync") < names.index("flush") assert names.index("Stop sync") < names.index("flush") - assert "fetch_logs" in names + assert names.index("fetch_logs") < names.index("flush") def test_custom_action_appears_with_correct_type_and_description() -> None: From 90a2cbd74a24b938f787799a0dabd27b33b8417f Mon Sep 17 00:00:00 2001 From: vikramlc Date: Wed, 24 Jun 2026 15:23:35 +0530 Subject: [PATCH 08/16] refactor(odin): Added logging and renaming fields --- .../unstable/core/_log_upload_action.py | 19 ++++++++++--- tests/test_unstable/test_log_upload_action.py | 28 +++++++++++++++++++ 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index b0e13f4b..c027e00c 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -1,5 +1,6 @@ """Built-in ``fetch_logs`` action: streams rotated log files to CDF Files.""" +import logging from dataclasses import dataclass from datetime import date, timedelta, timezone from datetime import datetime as dt @@ -8,6 +9,8 @@ from cognite.extractorutils.unstable.configuration.models import ExtractorConfig, LogFileHandlerConfig from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError +_logger = logging.getLogger(__name__) + MAX_DATE_RANGE_DAYS = 7 """Maximum number of calendar days a single ``fetch_logs`` invocation may cover.""" @@ -20,7 +23,7 @@ class LogFileCandidate: """A log file resolved for a given date that exists and has content.""" - date: date + log_date: date path: Path is_current: bool # True when path is the live (unrotated) file.log @@ -83,11 +86,12 @@ def _build_candidate_files( try: size = path.stat().st_size - except OSError: + except OSError as e: + _logger.warning("fetch_logs: skipping %s for %s — %s", path, current, e) skipped.append(current) else: if size > 0: - candidates.append(LogFileCandidate(date=current, path=path, is_current=is_current)) + candidates.append(LogFileCandidate(log_date=current, path=path, is_current=is_current)) else: skipped.append(current) @@ -140,4 +144,11 @@ def fetch_logs_action(ctx: ActionContext) -> None: error_type="no_file_handler_configured", ) - _candidates, _skipped = _build_candidate_files(log_file_path, start_date, end_date, today) + candidates, skipped = _build_candidate_files(log_file_path, start_date, end_date, today) + _logger.info( + "fetch_logs: %d candidate file(s) for %s to %s; %d date(s) skipped", + len(candidates), + start_date, + end_date, + len(skipped), + ) diff --git a/tests/test_unstable/test_log_upload_action.py b/tests/test_unstable/test_log_upload_action.py index ac21d243..505b541b 100644 --- a/tests/test_unstable/test_log_upload_action.py +++ b/tests/test_unstable/test_log_upload_action.py @@ -269,3 +269,31 @@ def test_fetch_logs_action_with_file_handler_succeeds(tmp_path: Path, create_log statuses = [u.status for u in updates] assert ActionStatus.succeeded in statuses assert ActionStatus.failed not in statuses + + +def test_valid_exact_max_days_range_succeeds_at_dispatch(tmp_path: Path) -> None: + # Exactly MAX_DATE_RANGE_DAYS is the boundary — one more would be rejected. + start = date(2026, 6, 1) + end = start + timedelta(days=MAX_DATE_RANGE_DAYS - 1) + log_path = tmp_path / "extractor.log" + extractor = _make_extractor(log_path=log_path) + updates = _dispatch(extractor, {"start_date": str(start), "end_date": str(end)}) + statuses = [u.status for u in updates] + assert ActionStatus.succeeded in statuses + assert ActionStatus.failed not in statuses + + +def test_range_spanning_rotated_and_live_file(tmp_path: Path) -> None: + # Covers the common case: start_date = yesterday (rotated file), end_date = today (live file). + base = tmp_path / "extractor.log" + yesterday = _PAST_TODAY - timedelta(days=1) + base.write_bytes(b"today data") + (tmp_path / f"extractor.log.{yesterday.isoformat()}").write_bytes(b"yesterday data") + candidates, skipped = _build_candidate_files(base, yesterday, _PAST_TODAY, _PAST_TODAY) + assert skipped == [] + assert len(candidates) == 2 + rotated = next(c for c in candidates if c.log_date == yesterday) + live = next(c for c in candidates if c.log_date == _PAST_TODAY) + assert rotated.is_current is False + assert live.is_current is True + assert live.path == base From 97a064e147bfa2e41f3ee51d8d583f5dc6be65cb Mon Sep 17 00:00:00 2001 From: vikramlc Date: Thu, 25 Jun 2026 23:00:40 +0530 Subject: [PATCH 09/16] feat(odin): Add support for streaming log files to cdf files --- .../unstable/core/_log_upload_action.py | 159 +++++++++++++++++- .../extractorutils/unstable/core/actions.py | 7 + cognite/extractorutils/unstable/core/base.py | 7 +- tests/test_unstable/test_action_dispatch.py | 15 ++ tests/test_unstable/test_log_upload_action.py | 158 +++++++++++++++++ 5 files changed, 343 insertions(+), 3 deletions(-) diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index c027e00c..b69fa901 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -1,12 +1,18 @@ """Built-in ``fetch_logs`` action: streams rotated log files to CDF Files.""" +import json import logging +from concurrent.futures import Future, ThreadPoolExecutor, as_completed from dataclasses import dataclass from datetime import date, timedelta, timezone from datetime import datetime as dt from pathlib import Path +from typing import BinaryIO + +from cognite.client import CogniteClient from cognite.extractorutils.unstable.configuration.models import ExtractorConfig, LogFileHandlerConfig +from cognite.extractorutils.unstable.core._bounded_reader import BoundedReader from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError _logger = logging.getLogger(__name__) @@ -14,6 +20,9 @@ MAX_DATE_RANGE_DAYS = 7 """Maximum number of calendar days a single ``fetch_logs`` invocation may cover.""" +MAX_FILE_SIZE_BYTES = 4 * 1024 * 1024 * 1024 # 4 GiB — CDF single-request upload limit +DEFAULT_CONCURRENT_UPLOADS = 1 # Sequential by default; safe on constrained networks + _FETCH_LOGS_DESCRIPTION = ( f"Upload rotated log files to CDF Files for a given date range. At most {MAX_DATE_RANGE_DAYS} days per invocation." ) @@ -28,6 +37,15 @@ class LogFileCandidate: is_current: bool # True when path is the live (unrotated) file.log +@dataclass +class _FileUploadResult: + log_date: date + file_external_id: str + status: str # "uploaded" | "skipped_too_large" | "failed" + size_bytes: int = 0 + error: str | None = None + + def _today_utc() -> date: return dt.now(tz=timezone.utc).date() @@ -100,6 +118,68 @@ def _build_candidate_files( return candidates, skipped +def _file_external_id(integration_external_id: str, log_date: date) -> str: + return f"extractor-logs-{integration_external_id}-{log_date.isoformat()}" + + +def _upload_candidate( + candidate: LogFileCandidate, + integration_external_id: str, + cdf_client: CogniteClient, + snapshot_size: int | None, +) -> _FileUploadResult: + """Upload one candidate log file to CDF Files. Returns a result regardless of success or failure.""" + external_id = _file_external_id(integration_external_id, candidate.log_date) + + try: + actual_size = snapshot_size if snapshot_size is not None else candidate.path.stat().st_size + except OSError as e: + return _FileUploadResult( + log_date=candidate.log_date, file_external_id=external_id, status="failed", error=str(e) + ) + + if actual_size > MAX_FILE_SIZE_BYTES: + _logger.warning( + "fetch_logs: skipping %s (%d bytes) — exceeds MAX_FILE_SIZE_BYTES (%d bytes)", + candidate.path.name, + actual_size, + MAX_FILE_SIZE_BYTES, + ) + return _FileUploadResult( + log_date=candidate.log_date, + file_external_id=external_id, + status="skipped_too_large", + size_bytes=actual_size, + ) + + try: + f = open(candidate.path, "rb") # noqa: SIM115 + reader: BinaryIO = BoundedReader(f, snapshot_size) if snapshot_size is not None else f # type: ignore[assignment] + with reader: + cdf_client.files.upload_bytes( + content=reader, + name=f"{external_id}.log", + external_id=external_id, + mime_type="text/plain", + overwrite=True, + ) + _logger.info("fetch_logs: uploaded %s (%d bytes)", external_id, actual_size) + return _FileUploadResult( + log_date=candidate.log_date, + file_external_id=external_id, + status="uploaded", + size_bytes=actual_size, + ) + except Exception as e: + _logger.error("fetch_logs: failed to upload %s — %s", external_id, e) + return _FileUploadResult( + log_date=candidate.log_date, + file_external_id=external_id, + status="failed", + error=str(e), + ) + + def fetch_logs_action(ctx: ActionContext) -> None: """Validate parameters and upload rotated log files for the requested date range to CDF Files.""" params = ctx.call_metadata or {} @@ -144,11 +224,86 @@ def fetch_logs_action(ctx: ActionContext) -> None: error_type="no_file_handler_configured", ) - candidates, skipped = _build_candidate_files(log_file_path, start_date, end_date, today) + candidates, skipped_dates = _build_candidate_files(log_file_path, start_date, end_date, today) _logger.info( "fetch_logs: %d candidate file(s) for %s to %s; %d date(s) skipped", len(candidates), start_date, end_date, - len(skipped), + len(skipped_dates), + ) + + # Snapshot the current-day file size BEFORE spawning upload threads. + # This gives a fixed read ceiling for BoundedReader — bytes written after this + # point are excluded from the upload, preventing Content-Length mismatches. + snapshot_size: int | None = None + current_candidate = next((c for c in candidates if c.is_current), None) + if current_candidate is not None: + try: + snapshot_size = current_candidate.path.stat().st_size + _logger.info( + "fetch_logs: current-day snapshot %d bytes (%s)", + snapshot_size, + current_candidate.path.name, + ) + except OSError as e: + _logger.warning("fetch_logs: could not snapshot current-day file size — %s", e) + + integration_external_id = ctx._extractor.connection_config.integration.external_id + cdf_client = ctx._extractor.cognite_client + + upload_results: list[_FileUploadResult] = [] + with ThreadPoolExecutor(max_workers=DEFAULT_CONCURRENT_UPLOADS) as pool: + futures: dict[Future[_FileUploadResult], LogFileCandidate] = { + pool.submit( + _upload_candidate, + candidate, + integration_external_id, + cdf_client, + snapshot_size if candidate.is_current else None, + ): candidate + for candidate in candidates + } + upload_results.extend(future.result() for future in as_completed(futures)) + + upload_results.sort(key=lambda r: r.log_date) + + uploaded = [r for r in upload_results if r.status == "uploaded"] + too_large = [r for r in upload_results if r.status == "skipped_too_large"] + failed = [r for r in upload_results if r.status == "failed"] + + # Per-file entries: upload results (sorted by date) + missing dates (skipped by candidate builder) + files_list: list[dict[str, str]] = [] + for r in upload_results: + entry: dict[str, str] = { + "date": str(r.log_date), + "file_external_id": r.file_external_id, + "status": r.status, + } + if r.size_bytes: + entry["size_bytes"] = str(r.size_bytes) + if r.error: + entry["error"] = r.error + files_list.append(entry) + files_list.extend( + { + "date": str(d), + "file_external_id": _file_external_id(integration_external_id, d), + "status": "skipped", + } + for d in sorted(skipped_dates) + ) + files_list.sort(key=lambda e: e["date"]) + + total_skipped = len(skipped_dates) + len(too_large) + + ctx.set_result( + f"{len(uploaded)} of {num_days} log files uploaded to CDF Files", + metadata={ + "total_files": str(num_days), + "uploaded_files": str(len(uploaded)), + "skipped_files": str(total_skipped), + "failed_files": str(len(failed)), + "files": json.dumps(files_list), + }, ) diff --git a/cognite/extractorutils/unstable/core/actions.py b/cognite/extractorutils/unstable/core/actions.py index 0193dc67..c69cc8a8 100644 --- a/cognite/extractorutils/unstable/core/actions.py +++ b/cognite/extractorutils/unstable/core/actions.py @@ -37,6 +37,8 @@ def __init__( self._extractor = extractor self.external_id = external_id self.call_metadata = call_metadata + self._result_message: str | None = None + self._result_metadata: dict[str, str] | None = None self._logger = logging.getLogger(f"{self._extractor.EXTERNAL_ID}.action.{self._action.name.replace(' ', '')}") @@ -55,6 +57,11 @@ def _new_error( task_name=task_name if task_name is not None else self._action.name, ) + def set_result(self, message: str, *, metadata: dict[str, str] | None = None) -> None: + """Record the result for a successful action completion.""" + self._result_message = message + self._result_metadata = metadata + class ActionError(Exception): """Deliberate action failure with structured metadata for Odin result reporting.""" diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index 0fff2d52..68bbd4f7 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -677,7 +677,12 @@ def _handle_custom_action(self, action: Action) -> None: try: custom.target(ctx) self._checkin_worker.queue_action_update( - ActionUpdate(external_id=action.external_id, status=ActionStatus.succeeded) + ActionUpdate( + external_id=action.external_id, + status=ActionStatus.succeeded, + result_message=ctx._result_message, + result_metadata=ctx._result_metadata, + ) ) except ActionError as e: self._checkin_worker.queue_action_update( diff --git a/tests/test_unstable/test_action_dispatch.py b/tests/test_unstable/test_action_dispatch.py index 7b61a1ed..83a91a20 100644 --- a/tests/test_unstable/test_action_dispatch.py +++ b/tests/test_unstable/test_action_dispatch.py @@ -261,3 +261,18 @@ def test_start_registers_handle_actions_as_dispatcher() -> None: registered = extractor._checkin_worker.set_action_dispatcher.call_args[0][0] assert registered.__func__.__name__ == "_handle_actions" assert registered.__self__ is extractor + + +def test_set_result_propagates_to_succeeded_action_update() -> None: + extractor = _make_extractor() + + def action_with_result(ctx: ActionContext) -> None: + ctx.set_result("3 files uploaded", metadata={"total_files": "3", "uploaded_files": "3"}) + + extractor.add_action(CustomAction(name="upload", target=action_with_result)) + action = Action(external_id="act-r", action_name="upload", status=ActionStatus.pending) + extractor._dispatch_single_action(action) + updates = [c[0][0] for c in extractor._checkin_worker.queue_action_update.call_args_list] + succeeded = next(u for u in updates if u.status == ActionStatus.succeeded) + assert succeeded.result_message == "3 files uploaded" + assert succeeded.result_metadata == {"total_files": "3", "uploaded_files": "3"} diff --git a/tests/test_unstable/test_log_upload_action.py b/tests/test_unstable/test_log_upload_action.py index 505b541b..1a3ae867 100644 --- a/tests/test_unstable/test_log_upload_action.py +++ b/tests/test_unstable/test_log_upload_action.py @@ -1,3 +1,4 @@ +import json from datetime import date, timedelta from pathlib import Path from unittest.mock import MagicMock @@ -12,6 +13,7 @@ from cognite.extractorutils.unstable.core._log_upload_action import ( MAX_DATE_RANGE_DAYS, _build_candidate_files, + _FileUploadResult, _resolve_log_file_path, ) from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError, CustomAction @@ -297,3 +299,159 @@ def test_range_spanning_rotated_and_live_file(tmp_path: Path) -> None: assert rotated.is_current is False assert live.is_current is True assert live.path == base + + +def test_file_external_id_naming_convention() -> None: + from cognite.extractorutils.unstable.core._log_upload_action import _file_external_id + + assert _file_external_id("my-extractor", date(2026, 6, 1)) == "extractor-logs-my-extractor-2026-06-01" + + +def test_upload_candidate_calls_cdf_upload_for_rotated_file(tmp_path: Path) -> None: + from unittest.mock import MagicMock + + from cognite.extractorutils.unstable.core._log_upload_action import LogFileCandidate, _upload_candidate + + path = tmp_path / "extractor.log.2026-06-01" + path.write_bytes(b"log line\n" * 100) + candidate = LogFileCandidate(log_date=date(2026, 6, 1), path=path, is_current=False) + mock_client = MagicMock() + + result = _upload_candidate(candidate, "my-extractor", mock_client, snapshot_size=None) + + assert result.status == "uploaded" + assert result.size_bytes == path.stat().st_size + assert result.file_external_id == "extractor-logs-my-extractor-2026-06-01" + mock_client.files.upload_bytes.assert_called_once() + _, kwargs = mock_client.files.upload_bytes.call_args + assert kwargs["external_id"] == "extractor-logs-my-extractor-2026-06-01" + assert kwargs["overwrite"] is True + + +def test_upload_candidate_current_day_uses_bounded_reader(tmp_path: Path) -> None: + from unittest.mock import MagicMock + + from cognite.extractorutils.unstable.core._bounded_reader import BoundedReader + from cognite.extractorutils.unstable.core._log_upload_action import LogFileCandidate, _upload_candidate + + path = tmp_path / "extractor.log" + path.write_bytes(b"x" * 500) + candidate = LogFileCandidate(log_date=_PAST_TODAY, path=path, is_current=True) + mock_client = MagicMock() + + result = _upload_candidate(candidate, "my-extractor", mock_client, snapshot_size=300) + + assert result.status == "uploaded" + assert result.size_bytes == 300 + _, kwargs = mock_client.files.upload_bytes.call_args + assert isinstance(kwargs["content"], BoundedReader) + assert len(kwargs["content"]) == 300 + + +def test_upload_candidate_exceeds_max_size_returns_skipped_too_large(tmp_path: Path) -> None: + from unittest.mock import MagicMock, patch + + from cognite.extractorutils.unstable.core._log_upload_action import ( + MAX_FILE_SIZE_BYTES, + LogFileCandidate, + _upload_candidate, + ) + + path = tmp_path / "huge.log" + path.write_bytes(b"x") # actual bytes don't matter — we mock stat + candidate = LogFileCandidate(log_date=date(2026, 6, 1), path=path, is_current=False) + mock_client = MagicMock() + + oversized = MAX_FILE_SIZE_BYTES + 1 + mock_stat_result = MagicMock() + mock_stat_result.st_size = oversized + with patch("pathlib.Path.stat", return_value=mock_stat_result): + result = _upload_candidate(candidate, "my-extractor", mock_client, snapshot_size=None) + + assert result.status == "skipped_too_large" + assert result.size_bytes == oversized + mock_client.files.upload_bytes.assert_not_called() + + +def test_upload_candidate_cdf_error_returns_failed(tmp_path: Path) -> None: + from unittest.mock import MagicMock + + from cognite.extractorutils.unstable.core._log_upload_action import LogFileCandidate, _upload_candidate + + path = tmp_path / "extractor.log.2026-06-01" + path.write_bytes(b"data") + candidate = LogFileCandidate(log_date=date(2026, 6, 1), path=path, is_current=False) + mock_client = MagicMock() + mock_client.files.upload_bytes.side_effect = RuntimeError("CDF unavailable") + + result = _upload_candidate(candidate, "my-extractor", mock_client, snapshot_size=None) + + assert result.status == "failed" + assert "CDF unavailable" in (result.error or "") + + +def test_fetch_logs_action_upload_sets_result_metadata(tmp_path: Path) -> None: + log_path = tmp_path / "extractor.log" + (tmp_path / "extractor.log.2026-06-10").write_bytes(b"log line\n" * 50) + extractor = _make_extractor(log_path=log_path) + updates = _dispatch(extractor, {"start_date": "2026-06-10", "end_date": "2026-06-10"}) + succeeded = next(u for u in updates if u.status == ActionStatus.succeeded) + assert succeeded.result_message is not None + assert "1 of 1" in succeeded.result_message + assert succeeded.result_metadata is not None + assert succeeded.result_metadata["uploaded_files"] == "1" + assert succeeded.result_metadata["failed_files"] == "0" + assert succeeded.result_metadata["total_files"] == "1" + files = json.loads(succeeded.result_metadata["files"]) + assert len(files) == 1 + assert files[0]["status"] == "uploaded" + assert files[0]["date"] == "2026-06-10" + + +def test_fetch_logs_action_missing_files_reported_in_metadata(tmp_path: Path) -> None: + log_path = tmp_path / "extractor.log" + # Only 2026-06-11 exists; 2026-06-10 is missing + (tmp_path / "extractor.log.2026-06-11").write_bytes(b"data") + extractor = _make_extractor(log_path=log_path) + updates = _dispatch(extractor, {"start_date": "2026-06-10", "end_date": "2026-06-11"}) + succeeded = next(u for u in updates if u.status == ActionStatus.succeeded) + assert succeeded.result_metadata is not None + assert succeeded.result_metadata["uploaded_files"] == "1" + assert succeeded.result_metadata["skipped_files"] == "1" + assert succeeded.result_metadata["total_files"] == "2" + files = json.loads(succeeded.result_metadata["files"]) + by_date = {f["date"]: f for f in files} + assert by_date["2026-06-10"]["status"] == "skipped" + assert by_date["2026-06-11"]["status"] == "uploaded" + + +def test_fetch_logs_action_all_files_missing_still_succeeds(tmp_path: Path) -> None: + log_path = tmp_path / "extractor.log" + extractor = _make_extractor(log_path=log_path) + updates = _dispatch(extractor, {"start_date": "2026-06-10", "end_date": "2026-06-10"}) + succeeded = next(u for u in updates if u.status == ActionStatus.succeeded) + assert succeeded.result_metadata is not None + assert succeeded.result_metadata["uploaded_files"] == "0" + assert succeeded.result_metadata["skipped_files"] == "1" + + +def test_fetch_logs_action_upload_failure_still_succeeds(tmp_path: Path) -> None: + from unittest.mock import patch + + log_path = tmp_path / "extractor.log" + (tmp_path / "extractor.log.2026-06-10").write_bytes(b"data") + extractor = _make_extractor(log_path=log_path) + with patch( + "cognite.extractorutils.unstable.core._log_upload_action._upload_candidate", + return_value=_FileUploadResult( + log_date=date(2026, 6, 10), + file_external_id="extractor-logs-test-integration-2026-06-10", + status="failed", + error="upload boom", + ), + ): + updates = _dispatch(extractor, {"start_date": "2026-06-10", "end_date": "2026-06-10"}) + succeeded = next(u for u in updates if u.status == ActionStatus.succeeded) + assert succeeded.result_metadata is not None + assert succeeded.result_metadata["failed_files"] == "1" + assert succeeded.result_metadata["uploaded_files"] == "0" From 1019b858599997e55d3546e3febf6a71330cd003 Mon Sep 17 00:00:00 2001 From: vikramlc Date: Fri, 26 Jun 2026 11:45:58 +0530 Subject: [PATCH 10/16] refactor(odin): Rename parameter --- .../extractorutils/unstable/core/_bounded_reader.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cognite/extractorutils/unstable/core/_bounded_reader.py b/cognite/extractorutils/unstable/core/_bounded_reader.py index 54444945..b17a0d64 100644 --- a/cognite/extractorutils/unstable/core/_bounded_reader.py +++ b/cognite/extractorutils/unstable/core/_bounded_reader.py @@ -20,8 +20,8 @@ class BoundedReader: upload_queue.add_io_to_upload_queue(file_meta, lambda: reader, ...) """ - def __init__(self, f: BinaryIO, max_bytes: int) -> None: - self._f = f + def __init__(self, stream: BinaryIO, max_bytes: int) -> None: + self._stream = stream self._size = max_bytes self._remaining = max_bytes @@ -36,16 +36,16 @@ def tell(self) -> int: @property def closed(self) -> bool: - return self._f.closed + return self._stream.closed def close(self) -> None: - self._f.close() + self._stream.close() def read(self, size: int = -1) -> bytes: if self._remaining <= 0: return b"" to_read = self._remaining if size < 0 else min(size, self._remaining) - data = self._f.read(to_read) + data = self._stream.read(to_read) self._remaining -= len(data) return data From 556aee19dbc3f49e8f1816a821be4b3cd2ecd038 Mon Sep 17 00:00:00 2001 From: vikramlc Date: Mon, 29 Jun 2026 14:34:03 +0530 Subject: [PATCH 11/16] refactor(odin): Remove duplicate code snippets --- cognite/extractorutils/unstable/core/actions.py | 17 ----------------- cognite/extractorutils/unstable/core/base.py | 9 --------- 2 files changed, 26 deletions(-) diff --git a/cognite/extractorutils/unstable/core/actions.py b/cognite/extractorutils/unstable/core/actions.py index 0d008542..cf0084fc 100644 --- a/cognite/extractorutils/unstable/core/actions.py +++ b/cognite/extractorutils/unstable/core/actions.py @@ -86,23 +86,6 @@ def result_metadata(self) -> dict[str, str]: return meta -class ActionError(Exception): - """Deliberate action failure with structured metadata for Odin result reporting.""" - - def __init__(self, message: str, *, error_type: str, details: str | None = None) -> None: - super().__init__(message) - self.error_type = error_type - self.details = details - - @property - def result_metadata(self) -> dict[str, str]: - """Structured metadata dict for the action update.""" - meta: dict[str, str] = {"error_type": self.error_type} - if self.details is not None: - meta["error_detail"] = self.details - return meta - - ActionTarget = Callable[["ActionContext"], None] diff --git a/cognite/extractorutils/unstable/core/base.py b/cognite/extractorutils/unstable/core/base.py index fe00a9da..68bbd4f7 100644 --- a/cognite/extractorutils/unstable/core/base.py +++ b/cognite/extractorutils/unstable/core/base.py @@ -693,15 +693,6 @@ def _handle_custom_action(self, action: Action) -> None: result_metadata=e.result_metadata, ) ) - except ActionError as e: - self._checkin_worker.queue_action_update( - ActionUpdate( - external_id=action.external_id, - status=ActionStatus.failed, - result_message=str(e), - result_metadata=e.result_metadata, - ) - ) except Exception as e: self._checkin_worker.queue_action_update( ActionUpdate( From 0ab05e9f31ecbf163d6379d932b7aeebb46d990a Mon Sep 17 00:00:00 2001 From: vikramlc Date: Mon, 29 Jun 2026 14:56:42 +0530 Subject: [PATCH 12/16] refactor(odin): Add with open for optimisation --- cognite/extractorutils/unstable/core/_log_upload_action.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index 407388ef..5958a04d 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -153,9 +153,8 @@ def _upload_candidate( ) try: - f = open(candidate.path, "rb") # noqa: SIM115 - reader: BinaryIO = BoundedReader(f, snapshot_size) if snapshot_size is not None else f # type: ignore[assignment] - with reader: + with open(candidate.path, "rb") as f: + reader: BinaryIO = BoundedReader(f, snapshot_size) if snapshot_size is not None else f # type: ignore[assignment] cdf_client.files.upload_bytes( content=reader, name=f"{external_id}.log", From 3d85fbfc8cbdacfabb6250b99702928b20a494d1 Mon Sep 17 00:00:00 2001 From: vikramlc Date: Tue, 30 Jun 2026 14:04:14 +0530 Subject: [PATCH 13/16] refactor(odin): Remove thread pooling and refactor code --- .../unstable/core/_log_upload_action.py | 48 ++++++++----------- 1 file changed, 20 insertions(+), 28 deletions(-) diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index 5958a04d..08a8f160 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -2,12 +2,12 @@ import json import logging -from concurrent.futures import Future, ThreadPoolExecutor, as_completed +from collections import Counter from dataclasses import dataclass from datetime import date, timedelta, timezone from datetime import datetime as dt from pathlib import Path -from typing import BinaryIO +from typing import BinaryIO, Literal from cognite.client import CogniteClient @@ -41,7 +41,7 @@ class LogFileCandidate: class _FileUploadResult: log_date: date file_external_id: str - status: str # "uploaded" | "skipped_too_large" | "failed" + status: Literal["uploaded", "skipped_too_large", "failed"] size_bytes: int = 0 error: str | None = None @@ -251,25 +251,17 @@ def fetch_logs_action(ctx: ActionContext) -> None: integration_external_id = ctx._extractor.connection_config.integration.external_id cdf_client = ctx._extractor.cognite_client - upload_results: list[_FileUploadResult] = [] - with ThreadPoolExecutor(max_workers=DEFAULT_CONCURRENT_UPLOADS) as pool: - futures: dict[Future[_FileUploadResult], LogFileCandidate] = { - pool.submit( - _upload_candidate, - candidate, - integration_external_id, - cdf_client, - snapshot_size if candidate.is_current else None, - ): candidate - for candidate in candidates - } - upload_results.extend(future.result() for future in as_completed(futures)) - - upload_results.sort(key=lambda r: r.log_date) + upload_results: list[_FileUploadResult] = [ + _upload_candidate( + candidate, + integration_external_id, + cdf_client, + snapshot_size if candidate.is_current else None, + ) + for candidate in candidates + ] - uploaded = [r for r in upload_results if r.status == "uploaded"] - too_large = [r for r in upload_results if r.status == "skipped_too_large"] - failed = [r for r in upload_results if r.status == "failed"] + counts = Counter(r.status for r in upload_results) # Per-file entries: upload results (sorted by date) + missing dates (skipped by candidate builder) files_list: list[dict[str, str]] = [] @@ -286,23 +278,23 @@ def fetch_logs_action(ctx: ActionContext) -> None: files_list.append(entry) files_list.extend( { - "date": str(d), - "file_external_id": _file_external_id(integration_external_id, d), + "date": str(skipped_date), + "file_external_id": _file_external_id(integration_external_id, skipped_date), "status": "skipped", } - for d in sorted(skipped_dates) + for skipped_date in sorted(skipped_dates) ) files_list.sort(key=lambda e: e["date"]) - total_skipped = len(skipped_dates) + len(too_large) + total_skipped = len(skipped_dates) + counts["skipped_too_large"] ctx.set_result( - f"{len(uploaded)} of {num_days} log files uploaded to CDF Files", + f"{counts['uploaded']} of {num_days} log files uploaded to CDF Files", metadata={ "total_files": str(num_days), - "uploaded_files": str(len(uploaded)), + "uploaded_files": str(counts["uploaded"]), "skipped_files": str(total_skipped), - "failed_files": str(len(failed)), + "failed_files": str(counts["failed"]), "files": json.dumps(files_list), }, ) From c3084092ceb0e542168df0f0e77e1b57d3e9b1e1 Mon Sep 17 00:00:00 2001 From: vikramlc Date: Tue, 30 Jun 2026 14:07:04 +0530 Subject: [PATCH 14/16] refactor(odin): Remove unnecessary code --- cognite/extractorutils/unstable/core/_log_upload_action.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index 08a8f160..9be8a7c6 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -21,7 +21,6 @@ """Maximum number of calendar days a single ``fetch_logs`` invocation may cover.""" MAX_FILE_SIZE_BYTES = 4 * 1024 * 1024 * 1024 # 4 GiB — CDF single-request upload limit -DEFAULT_CONCURRENT_UPLOADS = 1 # Sequential by default; safe on constrained networks _FETCH_LOGS_DESCRIPTION = ( f"Upload rotated log files to CDF Files for a given date range. At most {MAX_DATE_RANGE_DAYS} days per invocation." From f9039393465226e11346a6223f0f32a1f35545a2 Mon Sep 17 00:00:00 2001 From: vikramlc Date: Wed, 1 Jul 2026 15:45:09 +0530 Subject: [PATCH 15/16] test(odin): Added tests --- tests/test_unstable/test_bounded_reader.py | 31 ++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tests/test_unstable/test_bounded_reader.py b/tests/test_unstable/test_bounded_reader.py index 0b608d4b..286cce17 100644 --- a/tests/test_unstable/test_bounded_reader.py +++ b/tests/test_unstable/test_bounded_reader.py @@ -93,6 +93,37 @@ def test_tell_tracks_bytes_consumed(tmp_path: Path) -> None: assert reader.tell() == 8 +def test_stream_shorter_than_snapshot_partial(tmp_path: Path) -> None: + # File has 5 bytes but snapshot declares 10 (e.g. file was truncated after snapshot). + # _remaining decrements by len(data) not to_read, so tell() reflects actual bytes read. + path = _make_file(tmp_path, b"hello") + with open(path, "rb") as f: + reader = BoundedReader(f, 10) + assert len(reader) == 10 # snapshot still declared + data = reader.read(10) + assert data == b"hello" # only 5 bytes available + assert reader.tell() == 5 # tracks actual bytes, not requested + assert reader.read() == b"" + + +def test_read_multiple_calls_clamps_final_chunk(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"hello world") + with open(path, "rb") as f: + reader = BoundedReader(f, 7) + assert reader.read(3) == b"hel" + assert reader.read(3) == b"lo " + assert reader.read(3) == b"w" # only 1 byte left — clamped by min(size, remaining) + assert reader.read(3) == b"" + + +def test_read_all_after_partial_consumption(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"hello") + with open(path, "rb") as f: + reader = BoundedReader(f, 5) + assert reader.read(3) == b"hel" + assert reader.read(-1) == b"lo" # read() after partial — returns only remaining 2 bytes + + def test_close_and_closed_property(tmp_path: Path) -> None: path = _make_file(tmp_path, b"data") f = open(path, "rb") # noqa: SIM115 From 71646683e037a386f45fe77d9c4662f2171e2687 Mon Sep 17 00:00:00 2001 From: vikramlc Date: Wed, 1 Jul 2026 18:58:22 +0530 Subject: [PATCH 16/16] fix(odin): Add seek in bounded reader and add tests --- .../unstable/core/_bounded_reader.py | 8 ++++++ .../unstable/core/_log_upload_action.py | 2 +- tests/test_unstable/test_bounded_reader.py | 27 +++++++++++++++++++ tests/test_unstable/test_log_upload_action.py | 8 +----- 4 files changed, 37 insertions(+), 8 deletions(-) diff --git a/cognite/extractorutils/unstable/core/_bounded_reader.py b/cognite/extractorutils/unstable/core/_bounded_reader.py index b17a0d64..45a67c43 100644 --- a/cognite/extractorutils/unstable/core/_bounded_reader.py +++ b/cognite/extractorutils/unstable/core/_bounded_reader.py @@ -34,6 +34,14 @@ def __len__(self) -> int: def tell(self) -> int: return self._size - self._remaining + def seek(self, offset: int, whence: int = 0) -> int: + pos = self._stream.seek(offset, whence) + self._remaining = max(0, self._size - pos) + return pos + + def seekable(self) -> bool: + return self._stream.seekable() + @property def closed(self) -> bool: return self._stream.closed diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index 9be8a7c6..15c61607 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -231,7 +231,7 @@ def fetch_logs_action(ctx: ActionContext) -> None: len(skipped_dates), ) - # Snapshot the current-day file size BEFORE spawning upload threads. + # Snapshot the current-day file size BEFORE starting uploads. # This gives a fixed read ceiling for BoundedReader — bytes written after this # point are excluded from the upload, preventing Content-Length mismatches. snapshot_size: int | None = None diff --git a/tests/test_unstable/test_bounded_reader.py b/tests/test_unstable/test_bounded_reader.py index 0b608d4b..84c4a5df 100644 --- a/tests/test_unstable/test_bounded_reader.py +++ b/tests/test_unstable/test_bounded_reader.py @@ -93,6 +93,33 @@ def test_tell_tracks_bytes_consumed(tmp_path: Path) -> None: assert reader.tell() == 8 +def test_seek_rewinds_for_retry(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"hello world") + with open(path, "rb") as f: + reader = BoundedReader(f, 8) + assert reader.read(5) == b"hello" + assert reader.seek(0) == 0 + assert reader.tell() == 0 + assert reader.read(5) == b"hello" + + +def test_seek_relative(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"hello world") + with open(path, "rb") as f: + reader = BoundedReader(f, 8) + reader.read(3) + assert reader.seek(2, 1) == 5 + assert reader.tell() == 5 + assert reader.read(3) == b" wo" # bytes 5-7 of b"hello world" within snapshot of 8 + + +def test_seekable_mirrors_underlying_stream(tmp_path: Path) -> None: + path = _make_file(tmp_path, b"data") + with open(path, "rb") as f: + reader = BoundedReader(f, 4) + assert reader.seekable() == f.seekable() + + def test_close_and_closed_property(tmp_path: Path) -> None: path = _make_file(tmp_path, b"data") f = open(path, "rb") # noqa: SIM115 diff --git a/tests/test_unstable/test_log_upload_action.py b/tests/test_unstable/test_log_upload_action.py index 1a3ae867..65c3ec2c 100644 --- a/tests/test_unstable/test_log_upload_action.py +++ b/tests/test_unstable/test_log_upload_action.py @@ -308,8 +308,6 @@ def test_file_external_id_naming_convention() -> None: def test_upload_candidate_calls_cdf_upload_for_rotated_file(tmp_path: Path) -> None: - from unittest.mock import MagicMock - from cognite.extractorutils.unstable.core._log_upload_action import LogFileCandidate, _upload_candidate path = tmp_path / "extractor.log.2026-06-01" @@ -329,8 +327,6 @@ def test_upload_candidate_calls_cdf_upload_for_rotated_file(tmp_path: Path) -> N def test_upload_candidate_current_day_uses_bounded_reader(tmp_path: Path) -> None: - from unittest.mock import MagicMock - from cognite.extractorutils.unstable.core._bounded_reader import BoundedReader from cognite.extractorutils.unstable.core._log_upload_action import LogFileCandidate, _upload_candidate @@ -349,7 +345,7 @@ def test_upload_candidate_current_day_uses_bounded_reader(tmp_path: Path) -> Non def test_upload_candidate_exceeds_max_size_returns_skipped_too_large(tmp_path: Path) -> None: - from unittest.mock import MagicMock, patch + from unittest.mock import patch from cognite.extractorutils.unstable.core._log_upload_action import ( MAX_FILE_SIZE_BYTES, @@ -374,8 +370,6 @@ def test_upload_candidate_exceeds_max_size_returns_skipped_too_large(tmp_path: P def test_upload_candidate_cdf_error_returns_failed(tmp_path: Path) -> None: - from unittest.mock import MagicMock - from cognite.extractorutils.unstable.core._log_upload_action import LogFileCandidate, _upload_candidate path = tmp_path / "extractor.log.2026-06-01"