diff --git a/cognite/extractorutils/unstable/core/_log_upload_action.py b/cognite/extractorutils/unstable/core/_log_upload_action.py index 9d6b20e1..ed7407f2 100644 --- a/cognite/extractorutils/unstable/core/_log_upload_action.py +++ b/cognite/extractorutils/unstable/core/_log_upload_action.py @@ -1,8 +1,12 @@ """Built-in ``fetch_logs`` action: streams rotated log files to CDF Files.""" import logging -from datetime import date +from dataclasses import dataclass +from datetime import date, timedelta, timezone +from datetime import datetime as dt +from pathlib import Path +from cognite.extractorutils.unstable.configuration.models import ExtractorConfig, LogFileHandlerConfig from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError _logger = logging.getLogger(__name__) @@ -15,6 +19,19 @@ ) +@dataclass(frozen=True) +class LogFileCandidate: + """A log file resolved for a given date that exists and has content.""" + + log_date: date + path: Path + is_current: bool # True when path is the live (unrotated) file.log + + +def _today_utc() -> date: + return dt.now(tz=timezone.utc).date() + + def _parse_date(raw: str, field: str) -> date: try: return date.fromisoformat(raw) @@ -25,6 +42,64 @@ def _parse_date(raw: str, field: str) -> date: ) from None +def _resolve_log_file_path(config: ExtractorConfig) -> Path | None: + """Return the base log file path from the first file handler in config, or None.""" + for handler in config.log_handlers: + if isinstance(handler, LogFileHandlerConfig): + return handler.path + return None + + +def _build_candidate_files( + base_path: Path, + start_date: date, + end_date: date, + today: date, +) -> tuple[list[LogFileCandidate], list[date]]: + """ + Enumerate log files for [start_date, end_date] and partition into candidates vs skipped. + + Rotated files follow the naming convention ``.YYYY-MM-DD``. + The live file (``base_path``) is used for ``today``; all other dates use the rotated name. + + A file is skipped when it does not exist, is empty, or is inaccessible (any ``OSError``). + This covers the brief rotation race window: if the action is dispatched right after midnight + before ``TimedRotatingFileHandler`` has renamed ``file.log`` to ``file.log.YYYY-MM-DD``, + the rotated file will not be found and that date will appear in ``skipped``. Retrying after + rotation completes (within seconds) will pick it up. + + Returns: + (candidates, skipped): candidates are files that exist and have content; + skipped contains dates for which no usable file was found. + """ + candidates: list[LogFileCandidate] = [] + skipped: list[date] = [] + + current = start_date + while current <= end_date: + if current == today: + path = base_path + is_current = True + else: + path = base_path.parent / f"{base_path.name}.{current.isoformat()}" + is_current = False + + try: + size = path.stat().st_size + except OSError as e: + _logger.warning("fetch_logs: skipping %s for %s — %s", path, current, e) + skipped.append(current) + else: + if size > 0: + candidates.append(LogFileCandidate(log_date=current, path=path, is_current=is_current)) + else: + skipped.append(current) + + current += timedelta(days=1) + + return candidates, skipped + + def fetch_logs_action(ctx: ActionContext) -> None: """Validate parameters and upload rotated log files for the requested date range to CDF Files.""" params = ctx.call_metadata or {} @@ -46,6 +121,14 @@ def fetch_logs_action(ctx: ActionContext) -> None: error_type="invalid_date_range", ) + today = _today_utc() + + if end_date > today: + raise ActionError( + f"end_date ({end_date}) cannot be in the future (today is {today})", + error_type="invalid_date_range", + ) + num_days = (end_date - start_date).days + 1 if num_days > MAX_DATE_RANGE_DAYS: raise ActionError( @@ -54,4 +137,18 @@ def fetch_logs_action(ctx: ActionContext) -> None: error_type="invalid_date_range", ) - _logger.info("fetch_logs: uploading logs for %s to %s (%d day(s))", start_date, end_date, num_days) + log_file_path = _resolve_log_file_path(ctx.application_config) + if log_file_path is None: + raise ActionError( + "No file log handler configured; add a 'file' type log handler to enable log uploads", + error_type="no_file_handler_configured", + ) + + candidates, skipped = _build_candidate_files(log_file_path, start_date, end_date, today) + _logger.info( + "fetch_logs: %d candidate file(s) for %s to %s; %d date(s) skipped", + len(candidates), + start_date, + end_date, + len(skipped), + ) diff --git a/cognite/extractorutils/unstable/core/actions.py b/cognite/extractorutils/unstable/core/actions.py index 0193dc67..dd1c6f1a 100644 --- a/cognite/extractorutils/unstable/core/actions.py +++ b/cognite/extractorutils/unstable/core/actions.py @@ -4,8 +4,9 @@ import logging from collections.abc import Callable -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Generic +from cognite.extractorutils.unstable.configuration.models import ConfigType from cognite.extractorutils.unstable.core.errors import Error, ErrorLevel from cognite.extractorutils.unstable.core.logger import CogniteLogger @@ -15,7 +16,7 @@ __all__ = ["ActionContext", "ActionError", "ActionTarget", "CustomAction"] -class ActionContext(CogniteLogger): +class ActionContext(Generic[ConfigType], CogniteLogger): """ Context for a custom action invocation. @@ -28,7 +29,7 @@ class ActionContext(CogniteLogger): def __init__( self, action: "CustomAction", - extractor: "Extractor", + extractor: "Extractor[ConfigType]", external_id: str, call_metadata: dict[str, str] | None = None, ) -> None: @@ -40,6 +41,11 @@ def __init__( self._logger = logging.getLogger(f"{self._extractor.EXTERNAL_ID}.action.{self._action.name.replace(' ', '')}") + @property + def application_config(self) -> ConfigType: + """The extractor's application configuration.""" + return self._extractor.application_config + def _new_error( self, level: ErrorLevel, diff --git a/tests/test_unstable/test_log_upload_action.py b/tests/test_unstable/test_log_upload_action.py index 1fc00915..505b541b 100644 --- a/tests/test_unstable/test_log_upload_action.py +++ b/tests/test_unstable/test_log_upload_action.py @@ -1,22 +1,36 @@ from datetime import date, timedelta +from pathlib import Path from unittest.mock import MagicMock import pytest +from cognite.extractorutils.unstable.configuration.models import ( + LogFileHandlerConfig, + LogLevel, +) from cognite.extractorutils.unstable.core._dto import Action, ActionStatus, ActionUpdate -from cognite.extractorutils.unstable.core._log_upload_action import MAX_DATE_RANGE_DAYS +from cognite.extractorutils.unstable.core._log_upload_action import ( + MAX_DATE_RANGE_DAYS, + _build_candidate_files, + _resolve_log_file_path, +) from cognite.extractorutils.unstable.core.actions import ActionContext, ActionError, CustomAction from cognite.extractorutils.unstable.core.base import FullConfig from .conftest import TestConfig, TestExtractor +_PAST_TODAY = date(2026, 6, 19) -def _make_extractor() -> TestExtractor: + +def _make_extractor(log_path: Path | None = None) -> TestExtractor: conn = MagicMock() conn.integration.external_id = "test-integration" + config_kwargs: dict = {"parameter_one": 1, "parameter_two": "a"} + if log_path is not None: + config_kwargs["log_handlers"] = [LogFileHandlerConfig(type="file", path=log_path, level=LogLevel.INFO)] full_config = FullConfig( connection_config=conn, - application_config=TestConfig(parameter_one=1, parameter_two="a"), + application_config=TestConfig(**config_kwargs), current_config_revision=1, ) return TestExtractor(full_config, MagicMock()) @@ -111,8 +125,9 @@ def test_parse_date_non_string_type_raises_action_error() -> None: }, str(MAX_DATE_RANGE_DAYS), ), + ({"start_date": "2020-01-01", "end_date": "2099-01-01"}, "future"), ], - ids=["end_before_start", "exceeds_max_days"], + ids=["end_before_start", "exceeds_max_days", "end_date_in_future"], ) def test_invalid_date_range_reports_invalid_date_range( call_metadata: dict[str, str], message_contains: str | None @@ -125,22 +140,6 @@ def test_invalid_date_range_reports_invalid_date_range( assert message_contains in (failed.result_message or "") -@pytest.mark.parametrize( - "start_str,end_str", - [ - ("2026-06-10", "2026-06-10"), - ("2026-06-01", str(date(2026, 6, 1) + timedelta(days=MAX_DATE_RANGE_DAYS - 1))), - ], - ids=["single_day", "exact_max_days"], -) -def test_valid_date_range_succeeds(start_str: str, end_str: str) -> None: - extractor = _make_extractor() - updates = _dispatch(extractor, {"start_date": start_str, "end_date": end_str}) - statuses = [u.status for u in updates] - assert ActionStatus.succeeded in statuses - assert ActionStatus.failed not in statuses - - def test_action_error_details_included_in_result_metadata() -> None: extractor = _make_extractor() @@ -153,3 +152,148 @@ def raise_with_details(ctx: ActionContext) -> None: failed = _failed_update(_queued_updates(extractor)) assert failed.result_metadata == {"error_type": "unexpected_error", "error_detail": "inner detail"} assert failed.result_message == "boom" + + +def test_resolve_log_file_path_returns_none_for_console_only_config() -> None: + config = TestConfig(parameter_one=1, parameter_two="a") + assert _resolve_log_file_path(config) is None + + +@pytest.mark.parametrize( + "handler_names,expected_name", + [ + (["extractor.log"], "extractor.log"), + (["first.log", "second.log"], "first.log"), + ], + ids=["single_handler", "multiple_handlers_returns_first"], +) +def test_resolve_log_file_path_with_file_handlers(tmp_path: Path, handler_names: list[str], expected_name: str) -> None: + config = TestConfig( + parameter_one=1, + parameter_two="a", + log_handlers=[LogFileHandlerConfig(type="file", path=tmp_path / n, level=LogLevel.INFO) for n in handler_names], + ) + assert _resolve_log_file_path(config) == tmp_path / expected_name + + +def test_existing_rotated_files_become_candidates(tmp_path: Path) -> None: + base = tmp_path / "extractor.log" + start = date(2026, 6, 1) + end = date(2026, 6, 3) + for d in [start, start + timedelta(days=1), end]: + (tmp_path / f"extractor.log.{d.isoformat()}").write_bytes(b"data") + candidates, skipped = _build_candidate_files(base, start, end, _PAST_TODAY) + assert len(candidates) == 3 + assert skipped == [] + assert all(not c.is_current for c in candidates) + + +@pytest.mark.parametrize( + "create_file", + [False, True], + ids=["missing", "empty"], +) +def test_unusable_file_goes_to_skipped(tmp_path: Path, create_file: bool) -> None: + # Covers missing files (including the rotation race window) and 0-byte files. + base = tmp_path / "extractor.log" + d = date(2026, 6, 1) + if create_file: + (tmp_path / f"extractor.log.{d.isoformat()}").touch() # 0 bytes + candidates, skipped = _build_candidate_files(base, d, d, _PAST_TODAY) + assert candidates == [] + assert skipped == [d] + + +def test_today_uses_live_file_not_rotated_name(tmp_path: Path) -> None: + base = tmp_path / "extractor.log" + base.write_bytes(b"today data") + candidates, skipped = _build_candidate_files(base, _PAST_TODAY, _PAST_TODAY, _PAST_TODAY) + assert len(candidates) == 1 + assert candidates[0].path == base + assert candidates[0].is_current is True + assert skipped == [] + + +def test_mixed_range_partitions_correctly(tmp_path: Path) -> None: + base = tmp_path / "extractor.log" + start = date(2026, 6, 1) + end = date(2026, 6, 3) + (tmp_path / "extractor.log.2026-06-01").write_bytes(b"data") + (tmp_path / "extractor.log.2026-06-03").write_bytes(b"data") + candidates, skipped = _build_candidate_files(base, start, end, _PAST_TODAY) + assert len(candidates) == 2 + assert skipped == [date(2026, 6, 2)] + + +@pytest.mark.parametrize( + "start,end", + [ + (_PAST_TODAY, _PAST_TODAY), + (date(2026, 6, 1), date(2026, 6, 1) + timedelta(days=MAX_DATE_RANGE_DAYS - 1)), + ], + ids=["single_today", "exact_max_historical"], +) +def test_boundary_ranges_produce_correct_candidate_count(tmp_path: Path, start: date, end: date) -> None: + base = tmp_path / "extractor.log" + expected = (end - start).days + 1 + if start == _PAST_TODAY: + base.write_bytes(b"data") + else: + current = start + while current <= end: + (tmp_path / f"extractor.log.{current.isoformat()}").write_bytes(b"data") + current += timedelta(days=1) + candidates, skipped = _build_candidate_files(base, start, end, _PAST_TODAY) + assert len(candidates) == expected + assert skipped == [] + + +def test_no_file_handler_reports_no_file_handler_configured() -> None: + extractor = _make_extractor() + updates = _dispatch(extractor, {"start_date": "2026-06-01", "end_date": "2026-06-07"}) + failed = _failed_update(updates) + assert failed.result_metadata == {"error_type": "no_file_handler_configured"} + + +@pytest.mark.parametrize( + "create_log_file", + [True, False], + ids=["files_present", "all_files_missing"], +) +def test_fetch_logs_action_with_file_handler_succeeds(tmp_path: Path, create_log_file: bool) -> None: + log_path = tmp_path / "extractor.log" + if create_log_file: + (tmp_path / "extractor.log.2026-06-10").write_bytes(b"log data") + extractor = _make_extractor(log_path=log_path) + updates = _dispatch(extractor, {"start_date": "2026-06-10", "end_date": "2026-06-10"}) + statuses = [u.status for u in updates] + assert ActionStatus.succeeded in statuses + assert ActionStatus.failed not in statuses + + +def test_valid_exact_max_days_range_succeeds_at_dispatch(tmp_path: Path) -> None: + # Exactly MAX_DATE_RANGE_DAYS is the boundary — one more would be rejected. + start = date(2026, 6, 1) + end = start + timedelta(days=MAX_DATE_RANGE_DAYS - 1) + log_path = tmp_path / "extractor.log" + extractor = _make_extractor(log_path=log_path) + updates = _dispatch(extractor, {"start_date": str(start), "end_date": str(end)}) + statuses = [u.status for u in updates] + assert ActionStatus.succeeded in statuses + assert ActionStatus.failed not in statuses + + +def test_range_spanning_rotated_and_live_file(tmp_path: Path) -> None: + # Covers the common case: start_date = yesterday (rotated file), end_date = today (live file). + base = tmp_path / "extractor.log" + yesterday = _PAST_TODAY - timedelta(days=1) + base.write_bytes(b"today data") + (tmp_path / f"extractor.log.{yesterday.isoformat()}").write_bytes(b"yesterday data") + candidates, skipped = _build_candidate_files(base, yesterday, _PAST_TODAY, _PAST_TODAY) + assert skipped == [] + assert len(candidates) == 2 + rotated = next(c for c in candidates if c.log_date == yesterday) + live = next(c for c in candidates if c.log_date == _PAST_TODAY) + assert rotated.is_current is False + assert live.is_current is True + assert live.path == base