From f35f8228e037edc912c5ab12085c6420c5f79798 Mon Sep 17 00:00:00 2001 From: Adrian Clay Lake Date: Wed, 23 Oct 2024 20:38:56 +0200 Subject: [PATCH] refactor: QC and linting --- craft_parts/executor/executor.py | 2 +- craft_parts/overlays/chroot.py | 85 ++++++++++++++++++++++---------- 2 files changed, 59 insertions(+), 28 deletions(-) diff --git a/craft_parts/executor/executor.py b/craft_parts/executor/executor.py index aae91e2e..aaa79203 100644 --- a/craft_parts/executor/executor.py +++ b/craft_parts/executor/executor.py @@ -53,7 +53,7 @@ class Executor: :param ignore_patterns: File patterns to ignore when pulling local sources. """ - def __init__( + def __init__( # noqa: PLR0913 self, *, part_list: list[Part], diff --git a/craft_parts/overlays/chroot.py b/craft_parts/overlays/chroot.py index 17c65fa5..cd4c682c 100644 --- a/craft_parts/overlays/chroot.py +++ b/craft_parts/overlays/chroot.py @@ -23,7 +23,7 @@ from collections.abc import Callable, Mapping from multiprocessing.connection import Connection from pathlib import Path -from shutil import copytree, rmtree +from shutil import copytree from typing import Any from craft_parts.utils import os_utils @@ -91,17 +91,17 @@ def _runner( conn.send((res, None)) -def _compare_os_release(host: os_utils.OsRelease, chroot: os_utils.OsRelease): +def _compare_os_release(host: os_utils.OsRelease, chroot: os_utils.OsRelease) -> None: """Compare OsRelease objects from host and chroot for compatibility. See _host_compatible_chroot.""" if (host_val := host.id()) != (chroot_val := chroot.id()): - errors.IncompatibleChrootError("id", host_val, chroot_val) + raise errors.IncompatibleChrootError("id", host_val, chroot_val) if (host_val := host.version_id()) != (chroot_val := chroot.version_id()): - errors.IncompatibleChrootError("version_id", host_val, chroot_val) + raise errors.IncompatibleChrootError("version_id", host_val, chroot_val) -def _host_compatible_chroot(path: Path) -> bool: - """Raise exception if host and chroot are not the same distribution and release""" +def _host_compatible_chroot(path: Path) -> None: + """Raise exception if host and chroot are not the same distribution and release.""" # Note: /etc/os-release is symlinked to /usr/lib/os-release # This could cause an issue if /etc/os-release is removed at any point. host_os_release = os_utils.OsRelease() @@ -111,7 +111,7 @@ def _host_compatible_chroot(path: Path) -> bool: _compare_os_release(host_os_release, chroot_os_release) -def _setup_chroot(path: Path, use_host_sources: bool) -> None: +def _setup_chroot(path: Path, use_host_sources: bool) -> None: # noqa: FBT001 """Prepare the chroot environment before executing the target function.""" logger.debug("setup chroot: %r", path) if sys.platform == "linux": @@ -126,7 +126,7 @@ def _setup_chroot(path: Path, use_host_sources: bool) -> None: logger.debug("chroot setup complete") -def _cleanup_chroot(path: Path, use_host_sources: bool) -> None: +def _cleanup_chroot(path: Path, use_host_sources: bool) -> None: # noqa: FBT001 """Clean the chroot environment after executing the target function.""" logger.debug("cleanup chroot: %r", path) if sys.platform == "linux": @@ -140,20 +140,27 @@ def _cleanup_chroot(path: Path, use_host_sources: bool) -> None: logger.debug("chroot cleanup complete") +# TODO: refactor as to not call _Mount.get_abs_path repeatedly class _Mount: def __init__( self, src: str | Path, dst: str | Path, - *args, + *args: str, fstype: str | None = None, skip_missing: bool = True, ) -> None: - """Management class for chroot mounts.""" - + """Manage setup and clean up of chroot mounts. + + :param src: Mount source. This can be a device or path on the file system. + :param dst: Point. Path to the mount point relative to the chroot mounted on. + :param args: Additional args to pass to mount command. + :param fstype: fstype arg to use when calling mount. + :param skip_missing: skip mounts when dst_exists returns False. + """ self.src = Path(src) self.dst = Path(dst) - self.args = list(args) + self.args = [*args] self.skip_missing = skip_missing if fstype is not None: @@ -170,21 +177,26 @@ def _umount(self, chroot: Path, *args: str) -> None: os_utils.umount(str(abs_dst), *args) def get_abs_path(self, path: Path, chroot_path: Path) -> Path: - """Make `chroot_path` relative to host `path`""" + """Make `chroot_path` relative to host `path`.""" return path / str(chroot_path).lstrip("/") - def dst_exists(self, chroot: Path): + def dst_exists(self, chroot: Path) -> bool: + """Return True if `self.dst` exists within `chroot`.""" abs_dst = self.get_abs_path(chroot, self.dst) return abs_dst.is_symlink() or abs_dst.exists() - def remove_dst(self, chroot: Path): - pass + def remove_dst(self, chroot: Path) -> None: + """Remove `self.dst` if present to prepare mountpoint `self.dst`.""" + # This is not required for the base class + # TODO: consider using abc - def create_dst(self, chroot: Path): + def create_dst(self, chroot: Path) -> None: + """Create mountpoint `self.dst` later used in mount call.""" abs_dst = self.get_abs_path(chroot, self.dst) abs_dst.parent.mkdir(parents=True, exist_ok=True) def mount_to(self, chroot: Path, *args: str) -> None: + """Mount `self.src` to `self.dst` within chroot.""" logger.debug(f"Mounting {self.dst}") if self.dst_exists(chroot): self.remove_dst(chroot) @@ -198,6 +210,7 @@ def mount_to(self, chroot: Path, *args: str) -> None: self._mount(self.src, chroot, *self.args, *args) def unmount_from(self, chroot: Path, *args: str) -> None: + """Unmount `self.dst` within chroot.""" logger.debug(f"Mounting {self.dst}") if self.skip_missing and not self.dst_exists(chroot): abs_dst = self.get_abs_path(chroot, self.dst) @@ -217,11 +230,20 @@ def __init__( *args: str, skip_missing: bool = True, ) -> None: + """Manage setup and clean up of `--bind` mount chroot mounts. + + This subclass of _Mount contains extra support for creating mount points for + individual files. + :param src: Mount source. This can be a device or path on the file system. + :param dst: Point. Path to the mount point relative to the chroot mounted on. + :param args: Additional args to pass to mount command. + :param skip_missing: skip mounts when dst_exists returns False. + """ super().__init__( src, dst, f"--{self.bind_type}", *args, skip_missing=skip_missing ) - def dst_exists(self, chroot: Path): + def dst_exists(self, chroot: Path) -> bool: abs_dst = self.get_abs_path(chroot, self.dst) if self.src.is_file(): @@ -229,7 +251,7 @@ def dst_exists(self, chroot: Path): return abs_dst.is_symlink() or abs_dst.exists() - def create_dst(self, chroot: Path): + def create_dst(self, chroot: Path) -> None: abs_dst = self.get_abs_path(chroot, self.dst) if self.src.is_dir(): @@ -237,7 +259,7 @@ def create_dst(self, chroot: Path): elif self.src.is_file(): abs_dst.touch() - def remove_dst(self, chroot: Path): + def remove_dst(self, chroot: Path) -> None: abs_dst = self.get_abs_path(chroot, self.dst) if abs_dst.is_symlink() or abs_dst.is_file(): @@ -253,15 +275,26 @@ def _mount(self, src: Path, chroot: Path, *args: str) -> None: class _RBindMount(_BindMount): bind_type = "rbind" - def _umount(self, chroot: Path, *args) -> None: + def _umount(self, chroot: Path, *args: str) -> None: super()._umount(chroot, "--recursive", "--lazy", *args) class _TempFSClone(_Mount): - def __init__(self, src: str, dst: str, *args) -> None: - super().__init__(src, dst, *args, fstype="tmpfs", skip_missing=False) + def __init__( + self, src: str, dst: str, *args: str, skip_missing: bool = True + ) -> None: + """Manage setup and clean up of `--bind` mount chroot mounts. - def _mount(self, src: Path, chroot: Path, *args) -> None: + This subclass of _Mount contains extra support for creating mount points for + individual files. + :param src: Mount source. This can be a device or path on the file system. + :param dst: Point. Path to the mount point relative to the chroot mounted on. + :param args: Additional args to pass to mount command. + :param skip_missing: skip mounts when dst_exists returns False. + """ + super().__init__(src, dst, *args, fstype="tmpfs", skip_missing=skip_missing) + + def _mount(self, src: Path, chroot: Path, *args: str) -> None: if src.is_file(): raise NotADirectoryError(f"Path is a file: {src}") if not src.exists(): @@ -293,7 +326,7 @@ def _mount(self, src: Path, chroot: Path, *args) -> None: # Mounts required to import host's Ubuntu Pro apt configuration to chroot # TODO: parameterize this per linux distribution / package manager _ubuntu_apt_mounts = [ - _TempFSClone("/etc/apt", "/etc/apt"), + _TempFSClone("/etc/apt", "/etc/apt", skip_missing=False), _BindMount( "/usr/share/ca-certificates/", "/usr/share/ca-certificates/", @@ -308,13 +341,11 @@ def _mount(self, src: Path, chroot: Path, *args) -> None: def _setup_chroot_mounts(path: Path, mounts: list[_Mount]) -> None: """Linux-specific chroot environment preparation.""" - for entry in mounts: entry.mount_to(path) def _cleanup_chroot_mounts(path: Path, mounts: list[_Mount]) -> None: """Linux-specific chroot environment cleanup.""" - for entry in reversed(mounts): entry.unmount_from(path)