diff --git a/README.md b/README.md index 9bdf370..214243d 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,13 @@ To list all available packages, run kup list ``` +To see the history of versions you have previously had installed (marking the active one), add `--history`: + +``` +kup list --history +kup list k --history +``` + Any further functionality is described in the help commands: ``` diff --git a/src/kup/__main__.py b/src/kup/__main__.py index 4bc60d8..b3df746 100644 --- a/src/kup/__main__.py +++ b/src/kup/__main__.py @@ -9,6 +9,7 @@ import textwrap import time from argparse import ArgumentParser, RawDescriptionHelpFormatter, _HelpAction +from dataclasses import replace from typing import TYPE_CHECKING import giturlparse @@ -44,14 +45,19 @@ ) from .package import ( AVAILABLE, + GENERATION_LINK_RE, INSTALLED, ConcretePackage, Follows, GithubPackage, + InstalledVersion, LocalPackage, PackageMetadata, PackageName, PackageVersion, + read_generation_manifests, + save_tags, + tag_cache, ) from .telemetry import emit_event @@ -330,32 +336,193 @@ def _format_duration(seconds: float) -> str: return f'{int(hours)}h {int(mins)}m' +def _github_auth(package: GithubPackage) -> dict[str, str]: + if package.access_token: + return {'Authorization': f'Bearer {package.access_token}'} + token = os.getenv('GH_TOKEN') + if token: + return {'Authorization': f'Bearer {token}'} + return {} + + +def _available_by_base(base: str) -> GithubPackage | None: + for p in available_packages: + if p.package_name.base == base: + return p + return None + + +def profile_generations_dir() -> tuple[str, int] | None: + """Locate the Nix profile generations directory and the active generation number. + + Resolves the first-level ``~/.nix-profile`` symlink -- which transparently handles + both the new-style (``~/.local/state/nix/profiles``) and old-style + (``/nix/var/nix/profiles/per-user/$USER``) locations -- and reads the ``profile`` + symlink in that directory to find the active generation. Returns ``None`` if no + profile or generations can be found. + """ + home = os.getenv('HOME') + if home is None: + return None + nix_profile = os.path.join(home, '.nix-profile') + if not os.path.islink(nix_profile): + return None + target = os.readlink(nix_profile) + if not os.path.isabs(target): + target = os.path.join(os.path.dirname(nix_profile), target) + gens_dir = os.path.dirname(target) + profile_link = os.path.join(gens_dir, 'profile') + if not os.path.islink(profile_link): + return None + match = GENERATION_LINK_RE.fullmatch(os.path.basename(os.readlink(profile_link))) + if match is None: + return None + return gens_dir, int(match.group(1)) + + +def build_history(package_filter: str | None) -> dict[str, list[InstalledVersion]]: + """Reconstruct, per package, the timeline of versions installed over time. + + Walks the Nix profile generations in order and emits a new ``InstalledVersion`` for + a package only when its version differs from the previously emitted one, so that the + result shows version *changes* rather than every unrelated generation bump. The + version that is installed in the active generation is marked ``is_current``. Git tags + are resolved with a single GitHub request per package (plus the on-disk tag cache). + """ + found = profile_generations_dir() + if found is None: + return {} + gens_dir, current_generation = found + + history: dict[str, list[InstalledVersion]] = {} + last_version: dict[str, tuple[str | None, str | None]] = {} + current_version_per_pkg: dict[str, tuple[str | None, str | None]] = {} + + for generation, date, elements in read_generation_manifests(gens_dir): + installed_in_gen: dict[str, tuple[str | None, str | None]] = {} + for element in elements.values(): + attr_path = element.get('attrPath') + if not attr_path: + continue + available = lookup_available_package(attr_path) + if available is None: + continue + base = available.package_name.base + url = element.get('url', '') + original_url = element.get('originalUrl', '') + if url.startswith(available.base_repo_path): + installed_in_gen[base] = (url.removeprefix(f'{available.base_repo_path}/'), None) + elif original_url.startswith('git+file://'): + installed_in_gen[base] = (None, original_url.removeprefix('git+file://')) + + if generation == current_generation: + current_version_per_pkg = dict(installed_in_gen) + + for base, version in installed_in_gen.items(): + if package_filter is not None and base != package_filter: + continue + if last_version.get(base) == version: + continue # unchanged since the previous emitted entry -- collapse + commit, local_path = version + history.setdefault(base, []).append( + InstalledVersion(generation, date, commit, None, local_path, is_current=False) + ) + last_version[base] = version + + return _enrich_tags(_mark_current(history, current_version_per_pkg, current_generation)) + + +def _mark_current( + history: dict[str, list[InstalledVersion]], + current_version_per_pkg: dict[str, tuple[str | None, str | None]], + current_generation: int, +) -> dict[str, list[InstalledVersion]]: + marked: dict[str, list[InstalledVersion]] = {} + for base, versions in history.items(): + current = current_version_per_pkg.get(base) + active_idx = None + if current is not None: + for idx, v in enumerate(versions): + if v.generation <= current_generation and (v.commit, v.local_path) == current: + active_idx = idx + marked[base] = [replace(v, is_current=True) if idx == active_idx else v for idx, v in enumerate(versions)] + return marked + + +def _enrich_tags(history: dict[str, list[InstalledVersion]]) -> dict[str, list[InstalledVersion]]: + enriched: dict[str, list[InstalledVersion]] = {} + for base, versions in history.items(): + sha_to_tag: dict[str, str | None] = {} + if any(v.commit for v in versions): + package = _available_by_base(base) + if package is not None and not package.ssh_git: + tags = requests.get( + f'https://api.github.com/repos/{package.org}/{package.repo}/tags', + headers=_github_auth(package), + ) + if tags.ok: + fetched = {t['commit']['sha']: t['name'] for t in tags.json()} + save_tags(fetched) + sha_to_tag.update(fetched) + enriched[base] = [ + replace(v, tag=sha_to_tag.get(v.commit, tag_cache.get(v.commit)) if v.commit else None) for v in versions + ] + return enriched + + +def _format_version(v: InstalledVersion) -> str: + if v.local_path is not None: + return f'local checkout ({v.local_path})' + if v.commit is None: + return '' + return f'{v.commit[:7]} ({v.tag})' if v.tag else v.commit[:7] + + +def list_history(package_filter: str | None) -> None: + history = build_history(package_filter) + if not history: + target = f" for '[green]{package_filter}[/]'" if package_filter else '' + rich.print(f'❗ [yellow]No installation history found{target}.[/]') + return + for base in sorted(history.keys()): + rich.print(f'\n[bold]{base}[/]') + table_data = [['', 'Generation', 'Date', 'Version']] + [ + highlight_row( + v.is_current, + ['→' if v.is_current else '', str(v.generation), v.date, _format_version(v)], + ) + for v in history[base] + ] + print(SingleTable(table_data).table) + + def list_package( package_name: str, show_inputs: bool, show_status: bool, version: str | None = None, + show_history: bool = False, ) -> None: reload_packages() - if package_name != 'all': - if package_name not in packages.keys(): - rich.print( - f"❗ [red]The package '[green]{package_name}[/]' does not exist.\n" - "[/]Use '[blue]kup list[/]' to see all the available packages." - ) - return + if package_name != 'all' and package_name not in packages.keys(): + rich.print( + f"❗ [red]The package '[green]{package_name}[/]' does not exist.\n" + "[/]Use '[blue]kup list[/]' to see all the available packages." + ) + return + + if show_history: + list_history(package_name if package_name != 'all' else None) + return + if package_name != 'all': listed_package = packages[package_name].concrete(version, []) if version else packages[package_name] if show_inputs or show_status: inputs = get_package_metadata(listed_package) rich.print(package_metadata_tree(inputs, show_status=show_status)) else: - auth = ( - {'Authorization': f'Bearer {listed_package.access_token}'} - if listed_package.access_token - else {'Authorization': f'Bearer {os.getenv("GH_TOKEN")}'} if os.getenv('GH_TOKEN') else {} - ) + auth = _github_auth(listed_package) tags = requests.get( f'https://api.github.com/repos/{listed_package.org}/{listed_package.repo}/tags', headers=auth ) @@ -930,6 +1097,11 @@ def main() -> None: action='store_true', help='show the input dependencies of the selected package and how stale they are compared to the default branch', ) + list.add_argument( + '--history', + action='store_true', + help='show the history of installed versions for the selected package (or all packages)', + ) list.add_argument('-h', '--help', action=_HelpListAction) install = subparser.add_parser( @@ -1021,7 +1193,7 @@ def main() -> None: package_name = PackageName.parse(args.package) if args.command == 'list': - list_package(package_name.base, args.inputs, args.status, args.version) + list_package(package_name.base, args.inputs, args.status, args.version, args.history) elif args.command in ['install', 'update']: install_package(package_name, args.version, args.override) diff --git a/src/kup/list-help.md b/src/kup/list-help.md index de42b1a..4ae29b5 100644 --- a/src/kup/list-help.md +++ b/src/kup/list-help.md @@ -77,3 +77,24 @@ If you just want to build with a specific commit of `llvm-backend`, you can use *Note*: Certain inputs in the tree have a `follows `** instead of the repository and hash. This is because they are linked to the version pointed to by **. If you want to override one of these inputs, it is almost always the case that you want to override the ** input instead. ``kup` will let you proceed if you know what you are doing but issue a warning. --- + +# kup list --history + +Adding the `--history` flag shows the versions you have previously had installed for each package, reconstructed from the Nix profile generations. This is useful to find out which version you had before an upgrade. The currently-active version is marked with a `→`. + +``` +➜ kup list kontrol --history + +kontrol +┌───┬────────────┬────────────┬──────────────────────────┐ +│ │ Generation │ Date │ Version │ +├───┼────────────┼────────────┼──────────────────────────┤ +│ │ 149 │ 2025-12-02 │ 273fe5f │ +│ │ 157 │ 2025-12-12 │ 92ff80d │ +│ → │ 174 │ 2026-01-19 │ a1b2c3d (v1.0.1-a1b2c3d) │ +└───┴────────────┴────────────┴──────────────────────────┘ +``` + +Without a package argument, `kup list --history` prints the timeline for every package that appears in the history. + +--- diff --git a/src/kup/package.py b/src/kup/package.py index 2c2aae0..ffe1276 100644 --- a/src/kup/package.py +++ b/src/kup/package.py @@ -2,7 +2,9 @@ import json import os +import re from dataclasses import dataclass +from datetime import datetime from typing import TYPE_CHECKING import requests @@ -357,3 +359,53 @@ class Follows: def __init__(self, follows: list[str]): self.follows = follows + + +@dataclass(frozen=True) +class InstalledVersion: + generation: int + date: str # YYYY-MM-DD, from the generation symlink mtime + commit: str | None # 40-char sha for github packages, None for local checkouts + tag: str | None # resolved from the tag cache / GitHub + local_path: str | None # set for `git+file://` local checkouts + is_current: bool + + +GENERATION_LINK_RE = re.compile(r'profile-(\d+)-link') + + +def read_generation_manifests(gens_dir: str) -> list[tuple[int, str, dict]]: + """Read every Nix profile generation manifest found in ``gens_dir``. + + Returns ``(generation_number, date, elements)`` tuples sorted by generation + number, where ``elements`` is the manifest's ``elements`` normalized to a dict + keyed by index (matching ``reload_packages``). Generations whose store path has + been garbage-collected (missing ``manifest.json``) are skipped. + + This function only touches the filesystem -- no Nix or network calls -- so it can + be unit-tested against a fake generations tree. + """ + generations: list[tuple[int, str, dict]] = [] + if not os.path.isdir(gens_dir): + return generations + for entry in os.listdir(gens_dir): + match = GENERATION_LINK_RE.fullmatch(entry) + if match is None: + continue + link_path = os.path.join(gens_dir, entry) + manifest_path = os.path.join(link_path, 'manifest.json') + if not os.path.exists(manifest_path): + continue + with open(manifest_path) as manifest_file: + elements = json.loads(manifest_file.read())['elements'] + if type(elements) is list: + elements = dict(enumerate(elements)) + # fix potential inconsistencies between nix profiles (see reload_packages) + for element in elements.values(): + if 'uri' in element: + element['url'] = element['uri'] + if 'originalUri' in element: + element['originalUrl'] = element['originalUri'] + date = datetime.fromtimestamp(os.lstat(link_path).st_mtime).strftime('%Y-%m-%d') + generations.append((int(match.group(1)), date, elements)) + return sorted(generations, key=lambda gen: gen[0]) diff --git a/src/tests/unit/test_unit.py b/src/tests/unit/test_unit.py index 2e6d5ba..404fa79 100644 --- a/src/tests/unit/test_unit.py +++ b/src/tests/unit/test_unit.py @@ -1,5 +1,157 @@ +from __future__ import annotations + +import json +import os +from typing import TYPE_CHECKING + +import kup.__main__ as kup_main from kup.hello import hello +from kup.nix import ARCH +from kup.package import InstalledVersion, read_generation_manifests + +if TYPE_CHECKING: + from pathlib import Path + + from pytest import MonkeyPatch + +SHA_A = 'a' * 40 +SHA_B = 'b' * 40 def test_hello() -> None: assert hello('World') == 'Hello, World!' + + +def _write_generation(gens_dir: Path, generation: int, elements: object) -> None: + link = gens_dir / f'profile-{generation}-link' + link.mkdir() + (link / 'manifest.json').write_text(json.dumps({'version': 3, 'elements': elements})) + + +def _kontrol_element(sha: str) -> dict: + return { + 'active': True, + 'attrPath': f'packages.{ARCH}.kontrol', + 'storePaths': [f'/nix/store/{sha[:8]}-kontrol'], + 'url': f'github:runtimeverification/kontrol/{sha}', + } + + +def test_read_generation_manifests_sorts_and_skips_gc(tmp_path: Path) -> None: + gens_dir = tmp_path / 'profiles' + gens_dir.mkdir() + _write_generation(gens_dir, 3, [_kontrol_element(SHA_B)]) + _write_generation(gens_dir, 1, [_kontrol_element(SHA_A)]) + # a garbage-collected generation: link exists but manifest is gone + (gens_dir / 'profile-2-link').mkdir() + # an unrelated symlink that must be ignored + (gens_dir / 'profile').symlink_to('profile-3-link') + + generations = read_generation_manifests(str(gens_dir)) + + assert [g[0] for g in generations] == [1, 3] + assert all(isinstance(elements, dict) for _, _, elements in generations) + first_element = next(iter(generations[0][2].values())) + assert first_element['url'] == f'github:runtimeverification/kontrol/{SHA_A}' + + +def test_read_generation_manifests_normalizes_uri(tmp_path: Path) -> None: + gens_dir = tmp_path / 'profiles' + gens_dir.mkdir() + _write_generation( + gens_dir, + 1, + [ + { + 'attrPath': f'packages.{ARCH}.kontrol', + 'storePaths': ['/nix/store/x-kontrol'], + 'uri': f'github:runtimeverification/kontrol/{SHA_A}', + 'originalUri': 'git+file:///home/me/kontrol', + } + ], + ) + + [(_, _, elements)] = read_generation_manifests(str(gens_dir)) + element = next(iter(elements.values())) + assert element['url'] == f'github:runtimeverification/kontrol/{SHA_A}' + assert element['originalUrl'] == 'git+file:///home/me/kontrol' + + +def test_read_generation_manifests_missing_dir(tmp_path: Path) -> None: + assert read_generation_manifests(str(tmp_path / 'does-not-exist')) == [] + + +def test_build_history_collapses_and_marks_current(tmp_path: Path, monkeypatch: MonkeyPatch) -> None: + gens_dir = tmp_path / 'profiles' + gens_dir.mkdir() + _write_generation(gens_dir, 1, [_kontrol_element(SHA_A)]) + _write_generation(gens_dir, 2, [_kontrol_element(SHA_A)]) # unchanged -> collapsed + _write_generation(gens_dir, 3, [_kontrol_element(SHA_B)]) + + monkeypatch.setattr(kup_main, 'profile_generations_dir', lambda: (str(gens_dir), 3)) + monkeypatch.setattr(kup_main, '_enrich_tags', lambda history: history) + + history = kup_main.build_history(None) + + versions = history['kontrol'] + assert [(v.generation, v.commit, v.is_current) for v in versions] == [ + (1, SHA_A, False), + (3, SHA_B, True), + ] + + +def test_build_history_local_checkout_and_filter(tmp_path: Path, monkeypatch: MonkeyPatch) -> None: + gens_dir = tmp_path / 'profiles' + gens_dir.mkdir() + _write_generation( + gens_dir, + 1, + [ + _kontrol_element(SHA_A), + { + 'attrPath': f'packages.{ARCH}.kevm', + 'storePaths': ['/nix/store/y-kevm'], + 'originalUrl': 'git+file:///home/me/evm-semantics/master', + }, + ], + ) + + monkeypatch.setattr(kup_main, 'profile_generations_dir', lambda: (str(gens_dir), 1)) + monkeypatch.setattr(kup_main, '_enrich_tags', lambda history: history) + + history = kup_main.build_history('kevm') + + assert set(history.keys()) == {'kevm'} + [version] = history['kevm'] + assert version.commit is None + assert version.local_path == '/home/me/evm-semantics/master' + assert version.is_current is True + + +def test_build_history_no_profile(monkeypatch: MonkeyPatch) -> None: + monkeypatch.setattr(kup_main, 'profile_generations_dir', lambda: None) + assert kup_main.build_history(None) == {} + + +def test_format_version_renders_tag_commit_and_local() -> None: + tagged = InstalledVersion(1, '2026-01-01', SHA_A, 'v1.0.1', None, is_current=True) + untagged = InstalledVersion(2, '2026-01-02', SHA_B, None, None, is_current=False) + local = InstalledVersion(3, '2026-01-03', None, None, '/home/me/kontrol', is_current=False) + + assert kup_main._format_version(tagged) == f'{SHA_A[:7]} (v1.0.1)' + assert kup_main._format_version(untagged) == SHA_B[:7] + assert kup_main._format_version(local) == 'local checkout (/home/me/kontrol)' + + +def test_profile_generations_dir_resolves_symlink(tmp_path: Path, monkeypatch: MonkeyPatch) -> None: + profiles = tmp_path / 'state' / 'nix' / 'profiles' + profiles.mkdir(parents=True) + (profiles / 'profile-7-link').mkdir() + (profiles / 'profile').symlink_to('profile-7-link') + home = tmp_path / 'home' + home.mkdir() + (home / '.nix-profile').symlink_to(profiles / 'profile') + + monkeypatch.setattr(os, 'getenv', lambda key, default=None: str(home) if key == 'HOME' else default) + + assert kup_main.profile_generations_dir() == (str(profiles), 7)