Skip to content

Commit

Permalink
refactor: QC and linting
Browse files Browse the repository at this point in the history
  • Loading branch information
clay-lake committed Oct 23, 2024
1 parent 250e56f commit f35f822
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 28 deletions.
2 changes: 1 addition & 1 deletion craft_parts/executor/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class Executor:
:param ignore_patterns: File patterns to ignore when pulling local sources.
"""

def __init__(
def __init__( # noqa: PLR0913
self,
*,
part_list: list[Part],
Expand Down
85 changes: 58 additions & 27 deletions craft_parts/overlays/chroot.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from collections.abc import Callable, Mapping
from multiprocessing.connection import Connection
from pathlib import Path
from shutil import copytree, rmtree
from shutil import copytree
from typing import Any

from craft_parts.utils import os_utils
Expand Down Expand Up @@ -91,17 +91,17 @@ def _runner(
conn.send((res, None))


def _compare_os_release(host: os_utils.OsRelease, chroot: os_utils.OsRelease):
def _compare_os_release(host: os_utils.OsRelease, chroot: os_utils.OsRelease) -> None:
"""Compare OsRelease objects from host and chroot for compatibility. See _host_compatible_chroot."""
if (host_val := host.id()) != (chroot_val := chroot.id()):
errors.IncompatibleChrootError("id", host_val, chroot_val)
raise errors.IncompatibleChrootError("id", host_val, chroot_val)

if (host_val := host.version_id()) != (chroot_val := chroot.version_id()):
errors.IncompatibleChrootError("version_id", host_val, chroot_val)
raise errors.IncompatibleChrootError("version_id", host_val, chroot_val)


def _host_compatible_chroot(path: Path) -> bool:
"""Raise exception if host and chroot are not the same distribution and release"""
def _host_compatible_chroot(path: Path) -> None:
"""Raise exception if host and chroot are not the same distribution and release."""
# Note: /etc/os-release is symlinked to /usr/lib/os-release
# This could cause an issue if /etc/os-release is removed at any point.
host_os_release = os_utils.OsRelease()
Expand All @@ -111,7 +111,7 @@ def _host_compatible_chroot(path: Path) -> bool:
_compare_os_release(host_os_release, chroot_os_release)


def _setup_chroot(path: Path, use_host_sources: bool) -> None:
def _setup_chroot(path: Path, use_host_sources: bool) -> None: # noqa: FBT001
"""Prepare the chroot environment before executing the target function."""
logger.debug("setup chroot: %r", path)
if sys.platform == "linux":
Expand All @@ -126,7 +126,7 @@ def _setup_chroot(path: Path, use_host_sources: bool) -> None:
logger.debug("chroot setup complete")


def _cleanup_chroot(path: Path, use_host_sources: bool) -> None:
def _cleanup_chroot(path: Path, use_host_sources: bool) -> None: # noqa: FBT001
"""Clean the chroot environment after executing the target function."""
logger.debug("cleanup chroot: %r", path)
if sys.platform == "linux":
Expand All @@ -140,20 +140,27 @@ def _cleanup_chroot(path: Path, use_host_sources: bool) -> None:
logger.debug("chroot cleanup complete")


# TODO: refactor as to not call _Mount.get_abs_path repeatedly
class _Mount:
def __init__(
self,
src: str | Path,
dst: str | Path,
*args,
*args: str,
fstype: str | None = None,
skip_missing: bool = True,
) -> None:
"""Management class for chroot mounts."""

"""Manage setup and clean up of chroot mounts.
:param src: Mount source. This can be a device or path on the file system.
:param dst: Point. Path to the mount point relative to the chroot mounted on.
:param args: Additional args to pass to mount command.
:param fstype: fstype arg to use when calling mount.
:param skip_missing: skip mounts when dst_exists returns False.
"""
self.src = Path(src)
self.dst = Path(dst)
self.args = list(args)
self.args = [*args]
self.skip_missing = skip_missing

if fstype is not None:
Expand All @@ -170,21 +177,26 @@ def _umount(self, chroot: Path, *args: str) -> None:
os_utils.umount(str(abs_dst), *args)

def get_abs_path(self, path: Path, chroot_path: Path) -> Path:
"""Make `chroot_path` relative to host `path`"""
"""Make `chroot_path` relative to host `path`."""
return path / str(chroot_path).lstrip("/")

def dst_exists(self, chroot: Path):
def dst_exists(self, chroot: Path) -> bool:
"""Return True if `self.dst` exists within `chroot`."""
abs_dst = self.get_abs_path(chroot, self.dst)
return abs_dst.is_symlink() or abs_dst.exists()

def remove_dst(self, chroot: Path):
pass
def remove_dst(self, chroot: Path) -> None:
"""Remove `self.dst` if present to prepare mountpoint `self.dst`."""
# This is not required for the base class
# TODO: consider using abc

def create_dst(self, chroot: Path):
def create_dst(self, chroot: Path) -> None:
"""Create mountpoint `self.dst` later used in mount call."""
abs_dst = self.get_abs_path(chroot, self.dst)
abs_dst.parent.mkdir(parents=True, exist_ok=True)

def mount_to(self, chroot: Path, *args: str) -> None:
"""Mount `self.src` to `self.dst` within chroot."""
logger.debug(f"Mounting {self.dst}")
if self.dst_exists(chroot):
self.remove_dst(chroot)
Expand All @@ -198,6 +210,7 @@ def mount_to(self, chroot: Path, *args: str) -> None:
self._mount(self.src, chroot, *self.args, *args)

def unmount_from(self, chroot: Path, *args: str) -> None:
"""Unmount `self.dst` within chroot."""
logger.debug(f"Mounting {self.dst}")
if self.skip_missing and not self.dst_exists(chroot):
abs_dst = self.get_abs_path(chroot, self.dst)
Expand All @@ -217,27 +230,36 @@ def __init__(
*args: str,
skip_missing: bool = True,
) -> None:
"""Manage setup and clean up of `--bind` mount chroot mounts.
This subclass of _Mount contains extra support for creating mount points for
individual files.
:param src: Mount source. This can be a device or path on the file system.
:param dst: Point. Path to the mount point relative to the chroot mounted on.
:param args: Additional args to pass to mount command.
:param skip_missing: skip mounts when dst_exists returns False.
"""
super().__init__(
src, dst, f"--{self.bind_type}", *args, skip_missing=skip_missing
)

def dst_exists(self, chroot: Path):
def dst_exists(self, chroot: Path) -> bool:
abs_dst = self.get_abs_path(chroot, self.dst)

if self.src.is_file():
return abs_dst.is_symlink() or abs_dst.exists() or abs_dst.parent.is_dir()

return abs_dst.is_symlink() or abs_dst.exists()

def create_dst(self, chroot: Path):
def create_dst(self, chroot: Path) -> None:
abs_dst = self.get_abs_path(chroot, self.dst)

if self.src.is_dir():
abs_dst.mkdir(parents=True, exist_ok=True)
elif self.src.is_file():
abs_dst.touch()

def remove_dst(self, chroot: Path):
def remove_dst(self, chroot: Path) -> None:
abs_dst = self.get_abs_path(chroot, self.dst)

if abs_dst.is_symlink() or abs_dst.is_file():
Expand All @@ -253,15 +275,26 @@ def _mount(self, src: Path, chroot: Path, *args: str) -> None:
class _RBindMount(_BindMount):
bind_type = "rbind"

def _umount(self, chroot: Path, *args) -> None:
def _umount(self, chroot: Path, *args: str) -> None:
super()._umount(chroot, "--recursive", "--lazy", *args)


class _TempFSClone(_Mount):
def __init__(self, src: str, dst: str, *args) -> None:
super().__init__(src, dst, *args, fstype="tmpfs", skip_missing=False)
def __init__(
self, src: str, dst: str, *args: str, skip_missing: bool = True
) -> None:
"""Manage setup and clean up of `--bind` mount chroot mounts.
def _mount(self, src: Path, chroot: Path, *args) -> None:
This subclass of _Mount contains extra support for creating mount points for
individual files.
:param src: Mount source. This can be a device or path on the file system.
:param dst: Point. Path to the mount point relative to the chroot mounted on.
:param args: Additional args to pass to mount command.
:param skip_missing: skip mounts when dst_exists returns False.
"""
super().__init__(src, dst, *args, fstype="tmpfs", skip_missing=skip_missing)

def _mount(self, src: Path, chroot: Path, *args: str) -> None:
if src.is_file():
raise NotADirectoryError(f"Path is a file: {src}")
if not src.exists():
Expand Down Expand Up @@ -293,7 +326,7 @@ def _mount(self, src: Path, chroot: Path, *args) -> None:
# Mounts required to import host's Ubuntu Pro apt configuration to chroot
# TODO: parameterize this per linux distribution / package manager
_ubuntu_apt_mounts = [
_TempFSClone("/etc/apt", "/etc/apt"),
_TempFSClone("/etc/apt", "/etc/apt", skip_missing=False),
_BindMount(
"/usr/share/ca-certificates/",
"/usr/share/ca-certificates/",
Expand All @@ -308,13 +341,11 @@ def _mount(self, src: Path, chroot: Path, *args) -> None:

def _setup_chroot_mounts(path: Path, mounts: list[_Mount]) -> None:
"""Linux-specific chroot environment preparation."""

for entry in mounts:
entry.mount_to(path)


def _cleanup_chroot_mounts(path: Path, mounts: list[_Mount]) -> None:
"""Linux-specific chroot environment cleanup."""

for entry in reversed(mounts):
entry.unmount_from(path)

0 comments on commit f35f822

Please sign in to comment.