From 706872ec507a8fad57787dd8a60b2212dec23271 Mon Sep 17 00:00:00 2001 From: Tiago Nobrega Date: Fri, 14 Jul 2023 16:36:13 -0300 Subject: [PATCH] Revert "repo: migrate to craft-archives (#4037)" This reverts commit ce23bc7d196f67d898dcd7735dc58eb60d558a6b. --- .../internal/meta/package_repository.py | 404 +++++++++++++++++ snapcraft_legacy/internal/meta/snap.py | 3 +- .../internal/project_loader/_config.py | 4 +- .../internal/repo/apt_key_manager.py | 226 ++++++++++ snapcraft_legacy/internal/repo/apt_ppa.py | 50 +++ .../internal/repo/apt_sources_manager.py | 248 +++++++++++ snapcraft_legacy/plugins/v1/_plugin.py | 3 +- snapcraft_legacy/plugins/v1/catkin.py | 7 +- snapcraft_legacy/plugins/v1/colcon.py | 7 +- .../unit/meta/test_package_repository.py | 411 ++++++++++++++++++ .../legacy/unit/repo/test_apt_key_manager.py | 339 +++++++++++++++ tests/legacy/unit/repo/test_apt_ppa.py | 62 +++ .../unit/repo/test_apt_sources_manager.py | 269 ++++++++++++ 13 files changed, 2019 insertions(+), 14 deletions(-) create mode 100644 snapcraft_legacy/internal/meta/package_repository.py create mode 100644 snapcraft_legacy/internal/repo/apt_key_manager.py create mode 100644 snapcraft_legacy/internal/repo/apt_ppa.py create mode 100644 snapcraft_legacy/internal/repo/apt_sources_manager.py create mode 100644 tests/legacy/unit/meta/test_package_repository.py create mode 100644 tests/legacy/unit/repo/test_apt_key_manager.py create mode 100644 tests/legacy/unit/repo/test_apt_ppa.py create mode 100644 tests/legacy/unit/repo/test_apt_sources_manager.py diff --git a/snapcraft_legacy/internal/meta/package_repository.py b/snapcraft_legacy/internal/meta/package_repository.py new file mode 100644 index 0000000000..b5cbe3126f --- /dev/null +++ b/snapcraft_legacy/internal/meta/package_repository.py @@ -0,0 +1,404 @@ +# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*- +# +# Copyright (C) 2019 Canonical Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import abc +import logging +import re +from copy import deepcopy +from typing import Any, Dict, List, Optional + +from . import errors + +logger = logging.getLogger(__name__) + + +class PackageRepository(abc.ABC): + @abc.abstractmethod + def marshal(self) -> Dict[str, Any]: + ... + + @classmethod + def unmarshal(cls, data: Dict[str, str]) -> "PackageRepository": + if not isinstance(data, dict): + raise errors.PackageRepositoryValidationError( + url=str(data), + brief=f"Invalid package repository object {data!r}.", + details="Package repository must be a valid dictionary object.", + resolution="Verify repository configuration and ensure that the correct syntax is used.", + ) + + if "ppa" in data: + return PackageRepositoryAptPpa.unmarshal(data) + + return PackageRepositoryApt.unmarshal(data) + + @classmethod + def unmarshal_package_repositories(cls, data: Any) -> List["PackageRepository"]: + repositories = list() + + if data is not None: + if not isinstance(data, list): + raise errors.PackageRepositoryValidationError( + url=str(data), + brief=f"Invalid package-repositories list object {data!r}.", + details="Package repositories must be a list of objects.", + resolution="Verify 'package-repositories' configuration and ensure that the correct syntax is used.", + ) + + for repository in data: + package_repo = cls.unmarshal(repository) + repositories.append(package_repo) + + return repositories + + +class PackageRepositoryAptPpa(PackageRepository): + def __init__(self, *, ppa: str) -> None: + self.type = "apt" + self.ppa = ppa + + self.validate() + + def marshal(self) -> Dict[str, Any]: + data = dict(type="apt") + data["ppa"] = self.ppa + return data + + def validate(self) -> None: + if not self.ppa: + raise errors.PackageRepositoryValidationError( + url=self.ppa, + brief=f"Invalid PPA {self.ppa!r}.", + details="PPAs must be non-empty strings.", + resolution="Verify repository configuration and ensure that 'ppa' is correctly specified.", + ) + + @classmethod + def unmarshal(cls, data: Dict[str, str]) -> "PackageRepositoryAptPpa": + if not isinstance(data, dict): + raise errors.PackageRepositoryValidationError( + url=str(data), + brief=f"Invalid package repository object {data!r}.", + details="Package repository must be a valid dictionary object.", + resolution="Verify repository configuration and ensure that the correct syntax is used.", + ) + + data_copy = deepcopy(data) + + ppa = data_copy.pop("ppa", "") + repo_type = data_copy.pop("type", None) + + if repo_type != "apt": + raise errors.PackageRepositoryValidationError( + url=ppa, + brief=f"Unsupported type {repo_type!r}.", + details="The only currently supported type is 'apt'.", + resolution="Verify repository configuration and ensure that 'type' is correctly specified.", + ) + + if not isinstance(ppa, str): + raise errors.PackageRepositoryValidationError( + url=ppa, + brief=f"Invalid PPA {ppa!r}.", + details="PPA must be a valid string.", + resolution="Verify repository configuration and ensure that 'ppa' is correctly specified.", + ) + + if data_copy: + keys = ", ".join([repr(k) for k in data_copy.keys()]) + raise errors.PackageRepositoryValidationError( + url=ppa, + brief=f"Found unsupported package repository properties {keys}.", + resolution="Verify repository configuration and ensure that it is correct.", + ) + + return cls(ppa=ppa) + + +class PackageRepositoryApt(PackageRepository): + def __init__( + self, + *, + architectures: Optional[List[str]] = None, + components: Optional[List[str]] = None, + formats: Optional[List[str]] = None, + key_id: str, + key_server: Optional[str] = None, + name: Optional[str] = None, + path: Optional[str] = None, + suites: Optional[List[str]] = None, + url: str, + ) -> None: + self.type = "apt" + self.architectures = architectures + self.components = components + self.formats = formats + self.key_id = key_id + self.key_server = key_server + + if name is None: + # Default name is URL, stripping non-alphanumeric characters. + self.name: str = re.sub(r"\W+", "_", url) + else: + self.name = name + + self.path = path + self.suites = suites + self.url = url + + self.validate() + + def marshal(self) -> Dict[str, Any]: + data: Dict[str, Any] = {"type": "apt"} + + if self.architectures: + data["architectures"] = self.architectures + + if self.components: + data["components"] = self.components + + if self.formats: + data["formats"] = self.formats + + data["key-id"] = self.key_id + + if self.key_server: + data["key-server"] = self.key_server + + data["name"] = self.name + + if self.path: + data["path"] = self.path + + if self.suites: + data["suites"] = self.suites + + data["url"] = self.url + + return data + + def validate(self) -> None: # noqa: C901 + if self.formats is not None: + for repo_format in self.formats: + if repo_format not in ["deb", "deb-src"]: + raise errors.PackageRepositoryValidationError( + url=self.url, + brief=f"Invalid format {repo_format!r}.", + details="Valid formats include: deb and deb-src.", + resolution="Verify the repository configuration and ensure that 'formats' is correctly specified.", + ) + + if not self.key_id or not re.match(r"^[0-9A-F]{40}$", self.key_id): + raise errors.PackageRepositoryValidationError( + url=self.url, + brief=f"Invalid key identifier {self.key_id!r}.", + details="Key IDs must be 40 upper-case hex characters.", + resolution="Verify the repository configuration and ensure that 'key-id' is correctly specified.", + ) + + if not self.url: + raise errors.PackageRepositoryValidationError( + url=self.url, + brief=f"Invalid URL {self.url!r}.", + details="URLs must be non-empty strings.", + resolution="Verify the repository configuration and ensure that 'url' is correctly specified.", + ) + + if self.suites: + for suite in self.suites: + if suite.endswith("/"): + raise errors.PackageRepositoryValidationError( + url=self.url, + brief=f"Invalid suite {suite!r}.", + details="Suites must not end with a '/'.", + resolution="Verify the repository configuration and remove the trailing '/ from suites or use the 'path' property to define a path.", + ) + + if self.path is not None and self.path == "": + raise errors.PackageRepositoryValidationError( + url=self.url, + brief=f"Invalid path {self.path!r}.", + details="Paths must be non-empty strings.", + resolution="Verify the repository configuration and ensure that 'path' is a non-empty string such as '/'.", + ) + + if self.path and self.components: + raise errors.PackageRepositoryValidationError( + url=self.url, + brief=f"Components {self.components!r} cannot be combined with path {self.path!r}.", + details="Path and components are incomptiable options.", + resolution="Verify the repository configuration and remove 'path' or 'components'.", + ) + + if self.path and self.suites: + raise errors.PackageRepositoryValidationError( + url=self.url, + brief=f"Suites {self.suites!r} cannot be combined with path {self.path!r}.", + details="Path and suites are incomptiable options.", + resolution="Verify the repository configuration and remove 'path' or 'suites'.", + ) + + if self.suites and not self.components: + raise errors.PackageRepositoryValidationError( + url=self.url, + brief="No components specified.", + details="Components are required when using suites.", + resolution="Verify the repository configuration and ensure that 'components' is correctly specified.", + ) + + if self.components and not self.suites: + raise errors.PackageRepositoryValidationError( + url=self.url, + brief="No suites specified.", + details="Suites are required when using components.", + resolution="Verify the repository configuration and ensure that 'suites' is correctly specified.", + ) + + @classmethod # noqa: C901 + def unmarshal(cls, data: Dict[str, Any]) -> "PackageRepositoryApt": # noqa: C901 + if not isinstance(data, dict): + raise errors.PackageRepositoryValidationError( + url=str(data), + brief=f"Invalid package repository object {data!r}.", + details="Package repository must be a valid dictionary object.", + resolution="Verify repository configuration and ensure that the correct syntax is used.", + ) + + data_copy = deepcopy(data) + + architectures = data_copy.pop("architectures", None) + components = data_copy.pop("components", None) + formats = data_copy.pop("formats", None) + key_id = data_copy.pop("key-id", None) + key_server = data_copy.pop("key-server", None) + name = data_copy.pop("name", None) + path = data_copy.pop("path", None) + suites = data_copy.pop("suites", None) + url = data_copy.pop("url", "") + repo_type = data_copy.pop("type", None) + + if repo_type != "apt": + raise errors.PackageRepositoryValidationError( + url=url, + brief=f"Unsupported type {repo_type!r}.", + details="The only currently supported type is 'apt'.", + resolution="Verify repository configuration and ensure that 'type' is correctly specified.", + ) + + if architectures is not None and ( + not isinstance(architectures, list) + or not all(isinstance(x, str) for x in architectures) + ): + raise errors.PackageRepositoryValidationError( + url=url, + brief=f"Invalid architectures {architectures!r}.", + details="Architectures must be a list of valid architecture strings.", + resolution="Verify repository configuration and ensure that 'architectures' is correctly specified.", + ) + + if components is not None and ( + not isinstance(components, list) + or not all(isinstance(x, str) for x in components) + or not components + ): + raise errors.PackageRepositoryValidationError( + url=url, + brief=f"Invalid components {components!r}.", + details="Components must be a list of strings.", + resolution="Verify repository configuration and ensure that 'components' is correctly specified.", + ) + + if formats is not None and ( + not isinstance(formats, list) + or not all(isinstance(x, str) for x in formats) + ): + raise errors.PackageRepositoryValidationError( + url=url, + brief=f"Invalid formats {formats!r}.", + details="Formats must be a list of strings.", + resolution="Verify repository configuration and ensure that 'formats' is correctly specified.", + ) + + if not isinstance(key_id, str): + raise errors.PackageRepositoryValidationError( + url=url, + brief=f"Invalid key identifier {key_id!r}.", + details="Key identifiers must be a valid string.", + resolution="Verify repository configuration and ensure that 'key-id' is correctly specified.", + ) + + if key_server is not None and not isinstance(key_server, str): + raise errors.PackageRepositoryValidationError( + url=url, + brief=f"Invalid key server {key_server!r}.", + details="Key servers must be a valid string.", + resolution="Verify repository configuration and ensure that 'key-server' is correctly specified.", + ) + + if name is not None and not isinstance(name, str): + raise errors.PackageRepositoryValidationError( + url=url, + brief=f"Invalid name {name!r}.", + details="Names must be a valid string.", + resolution="Verify repository configuration and ensure that 'name' is correctly specified.", + ) + + if path is not None and not isinstance(path, str): + raise errors.PackageRepositoryValidationError( + url=url, + brief=f"Invalid path {path!r}.", + details="Paths must be a valid string.", + resolution="Verify repository configuration and ensure that 'path' is correctly specified.", + ) + + if suites is not None and ( + not isinstance(suites, list) + or not all(isinstance(x, str) for x in suites) + or not suites + ): + raise errors.PackageRepositoryValidationError( + url=url, + brief=f"Invalid suites {suites!r}.", + details="Suites must be a list of strings.", + resolution="Verify repository configuration and ensure that 'suites' is correctly specified.", + ) + + if not isinstance(url, str): + raise errors.PackageRepositoryValidationError( + url=url, + brief=f"Invalid URL {url!r}.", + details="URLs must be a valid string.", + resolution="Verify repository configuration and ensure that 'url' is correctly specified.", + ) + + if data_copy: + keys = ", ".join([repr(k) for k in data_copy.keys()]) + raise errors.PackageRepositoryValidationError( + url=url, + brief=f"Found unsupported package repository properties {keys}.", + resolution="Verify repository configuration and ensure it is correct.", + ) + + return cls( + architectures=architectures, + components=components, + formats=formats, + key_id=key_id, + key_server=key_server, + name=name, + suites=suites, + url=url, + ) diff --git a/snapcraft_legacy/internal/meta/snap.py b/snapcraft_legacy/internal/meta/snap.py index cd6eba60b7..b355d10153 100644 --- a/snapcraft_legacy/internal/meta/snap.py +++ b/snapcraft_legacy/internal/meta/snap.py @@ -20,13 +20,12 @@ from copy import deepcopy from typing import Any, Dict, List, Optional, Sequence, Set -from craft_archives.repo.package_repository import PackageRepository - from snapcraft_legacy import yaml_utils from snapcraft_legacy.internal import common from snapcraft_legacy.internal.meta import errors from snapcraft_legacy.internal.meta.application import Application from snapcraft_legacy.internal.meta.hooks import Hook +from snapcraft_legacy.internal.meta.package_repository import PackageRepository from snapcraft_legacy.internal.meta.plugs import ContentPlug, Plug from snapcraft_legacy.internal.meta.slots import ContentSlot, Slot from snapcraft_legacy.internal.meta.system_user import SystemUser diff --git a/snapcraft_legacy/internal/project_loader/_config.py b/snapcraft_legacy/internal/project_loader/_config.py index 2c859fefb3..a8a7cbdbb5 100644 --- a/snapcraft_legacy/internal/project_loader/_config.py +++ b/snapcraft_legacy/internal/project_loader/_config.py @@ -23,15 +23,15 @@ from typing import List, Set import jsonschema -from craft_archives.repo import apt_key_manager, apt_sources_manager -from craft_archives.repo.package_repository import PackageRepository from snapcraft_legacy import formatting_utils, plugins, project from snapcraft_legacy.internal import deprecations, repo, states, steps +from snapcraft_legacy.internal.meta.package_repository import PackageRepository from snapcraft_legacy.internal.meta.snap import Snap from snapcraft_legacy.internal.pluginhandler._part_environment import ( get_snapcraft_global_environment, ) +from snapcraft_legacy.internal.repo import apt_key_manager, apt_sources_manager from snapcraft_legacy.project._schema import Validator from . import errors, grammar_processing, replace_attr diff --git a/snapcraft_legacy/internal/repo/apt_key_manager.py b/snapcraft_legacy/internal/repo/apt_key_manager.py new file mode 100644 index 0000000000..0ea6f82a54 --- /dev/null +++ b/snapcraft_legacy/internal/repo/apt_key_manager.py @@ -0,0 +1,226 @@ +# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*- +# +# Copyright (C) 2015-2020 Canonical Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import logging +import pathlib +import subprocess +import tempfile +from typing import List, Optional + +import gnupg + +from snapcraft_legacy.internal.meta import package_repository + +from . import apt_ppa, errors + +logger = logging.getLogger(__name__) + + +class AptKeyManager: + def __init__( + self, + *, + gpg_keyring: pathlib.Path = pathlib.Path( + "/etc/apt/trusted.gpg.d/snapcraft.gpg" + ), + key_assets: pathlib.Path, + ) -> None: + self._gpg_keyring = gpg_keyring + self._key_assets = key_assets + + def find_asset_with_key_id(self, *, key_id: str) -> Optional[pathlib.Path]: + """Find snap key asset matching key_id. + + The key asset much be named with the last 8 characters of the key + identifier, in upper case. + + :param key_id: Key ID to search for. + + :returns: Path of key asset if match found, otherwise None. + """ + key_file = key_id[-8:].upper() + ".asc" + key_path = self._key_assets / key_file + + if key_path.exists(): + return key_path + + return None + + def get_key_fingerprints(self, *, key: str) -> List[str]: + """List fingerprints found in specified key. + + Do this by importing the key into a temporary keyring, + then querying the keyring for fingerprints. + + :param key: Key data (string) to parse. + + :returns: List of key fingerprints/IDs. + """ + with tempfile.NamedTemporaryFile(suffix="keyring") as temp_file: + return ( + gnupg.GPG(keyring=temp_file.name).import_keys(key_data=key).fingerprints + ) + + def is_key_installed(self, *, key_id: str) -> bool: + """Check if specified key_id is installed. + + Check if key is installed by attempting to export the key. + Unfortunately, apt-key does not exit with error and + we have to do our best to parse the output. + + :param key_id: Key ID to check for. + + :returns: True if key is installed. + """ + try: + proc = subprocess.run( + ["sudo", "apt-key", "export", key_id], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + check=True, + ) + except subprocess.CalledProcessError as error: + # Export shouldn't exit with failure based on testing, + # but assume the key is not installed and log a warning. + logger.warning(f"Unexpected apt-key failure: {error.output}") + return False + + apt_key_output = proc.stdout.decode() + + if "BEGIN PGP PUBLIC KEY BLOCK" in apt_key_output: + return True + + if "nothing exported" in apt_key_output: + return False + + # The two strings above have worked in testing, but if neither is + # present for whatever reason, assume the key is not installed + # and log a warning. + logger.warning(f"Unexpected apt-key output: {apt_key_output}") + return False + + def install_key(self, *, key: str) -> None: + """Install given key. + + :param key: Key to install. + + :raises: AptGPGKeyInstallError if unable to install key. + """ + cmd = [ + "sudo", + "apt-key", + "--keyring", + str(self._gpg_keyring), + "add", + "-", + ] + + try: + logger.debug(f"Executing: {cmd!r}") + env = dict() + env["LANG"] = "C.UTF-8" + subprocess.run( + cmd, + input=key.encode(), + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + check=True, + env=env, + ) + except subprocess.CalledProcessError as error: + raise errors.AptGPGKeyInstallError(output=error.output.decode(), key=key) + + logger.debug(f"Installed apt repository key:\n{key}") + + def install_key_from_keyserver( + self, *, key_id: str, key_server: str = "keyserver.ubuntu.com" + ) -> None: + """Install key from specified key server. + + :param key_id: Key ID to install. + :param key_server: Key server to query. + + :raises: AptGPGKeyInstallError if unable to install key. + """ + env = dict() + env["LANG"] = "C.UTF-8" + + cmd = [ + "sudo", + "apt-key", + "--keyring", + str(self._gpg_keyring), + "adv", + "--keyserver", + key_server, + "--recv-keys", + key_id, + ] + + try: + logger.debug(f"Executing: {cmd!r}") + subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + check=True, + env=env, + ) + except subprocess.CalledProcessError as error: + raise errors.AptGPGKeyInstallError( + output=error.output.decode(), key_id=key_id, key_server=key_server + ) + + def install_package_repository_key( + self, *, package_repo: package_repository.PackageRepository + ) -> bool: + """Install required key for specified package repository. + + For both PPA and other Apt package repositories: + 1) If key is already installed, return False. + 2) Install key from local asset, if available. + 3) Install key from key server, if available. An unspecified + keyserver will default to using keyserver.ubuntu.com. + + :param package_repo: Apt PackageRepository configuration. + + :returns: True if key configuration was changed. False if + key already installed. + + :raises: AptGPGKeyInstallError if unable to install key. + """ + key_server: Optional[str] = None + if isinstance(package_repo, package_repository.PackageRepositoryAptPpa): + key_id = apt_ppa.get_launchpad_ppa_key_id(ppa=package_repo.ppa) + elif isinstance(package_repo, package_repository.PackageRepositoryApt): + key_id = package_repo.key_id + key_server = package_repo.key_server + else: + raise RuntimeError(f"unhandled package repo type: {package_repo!r}") + + # Already installed, nothing to do. + if self.is_key_installed(key_id=key_id): + return False + + key_path = self.find_asset_with_key_id(key_id=key_id) + if key_path is not None: + self.install_key(key=key_path.read_text()) + else: + if key_server is None: + key_server = "keyserver.ubuntu.com" + self.install_key_from_keyserver(key_id=key_id, key_server=key_server) + + return True diff --git a/snapcraft_legacy/internal/repo/apt_ppa.py b/snapcraft_legacy/internal/repo/apt_ppa.py new file mode 100644 index 0000000000..f4269d1d04 --- /dev/null +++ b/snapcraft_legacy/internal/repo/apt_ppa.py @@ -0,0 +1,50 @@ +# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*- +# +# Copyright (C) 2020 Canonical Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import logging +from typing import Tuple + +import lazr.restfulclient.errors +from launchpadlib.launchpad import Launchpad + +from . import errors + +logger = logging.getLogger(__name__) + + +def split_ppa_parts(*, ppa: str) -> Tuple[str, str]: + ppa_split = ppa.split("/") + if len(ppa_split) != 2: + raise errors.AptPPAInstallError(ppa=ppa, reason="invalid PPA format") + return ppa_split[0], ppa_split[1] + + +def get_launchpad_ppa_key_id(*, ppa: str) -> str: + """Query Launchpad for PPA's key ID.""" + owner, name = split_ppa_parts(ppa=ppa) + launchpad = Launchpad.login_anonymously("snapcraft", "production") + launchpad_url = f"~{owner}/+archive/{name}" + + logger.debug(f"Loading launchpad url: {launchpad_url}") + try: + key_id = launchpad.load(launchpad_url).signing_key_fingerprint + except lazr.restfulclient.errors.NotFound as error: + raise errors.AptPPAInstallError( + ppa=ppa, reason="not found on launchpad" + ) from error + + logger.debug(f"Retrieved launchpad PPA key ID: {key_id}") + return key_id diff --git a/snapcraft_legacy/internal/repo/apt_sources_manager.py b/snapcraft_legacy/internal/repo/apt_sources_manager.py new file mode 100644 index 0000000000..8d979c37c6 --- /dev/null +++ b/snapcraft_legacy/internal/repo/apt_sources_manager.py @@ -0,0 +1,248 @@ +# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*- +# +# Copyright (C) 2015-2021 Canonical Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +"""Manage the host's apt source repository configuration.""" + +import io +import logging +import os +import pathlib +import re +import subprocess +import tempfile +from typing import List, Optional + +from snapcraft_legacy.internal import os_release +from snapcraft_legacy.internal.meta import package_repository +from snapcraft_legacy.project._project_options import ProjectOptions + +from . import apt_ppa + +logger = logging.getLogger(__name__) + + +def _construct_deb822_source( + *, + architectures: Optional[List[str]] = None, + components: Optional[List[str]] = None, + formats: Optional[List[str]] = None, + suites: List[str], + url: str, +) -> str: + """Construct deb-822 formatted sources.list config string.""" + with io.StringIO() as deb822: + if formats: + type_text = " ".join(formats) + else: + type_text = "deb" + + print(f"Types: {type_text}", file=deb822) + + print(f"URIs: {url}", file=deb822) + + suites_text = " ".join(suites) + print(f"Suites: {suites_text}", file=deb822) + + if components: + components_text = " ".join(components) + print(f"Components: {components_text}", file=deb822) + + if architectures: + arch_text = " ".join(architectures) + else: + arch_text = _get_host_arch() + + print(f"Architectures: {arch_text}", file=deb822) + + return deb822.getvalue() + + +def _get_host_arch() -> str: + return ProjectOptions().deb_arch + + +def _sudo_write_file(*, dst_path: pathlib.Path, content: bytes) -> None: + """Workaround for writing privileged files in destructive mode.""" + try: + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + temp_file.write(content) + temp_file.flush() + f_name = temp_file.name + + try: + command = [ + "sudo", + "install", + "--owner=root", + "--group=root", + "--mode=0644", + f_name, + str(dst_path), + ] + subprocess.run(command, check=True) + except subprocess.CalledProcessError as error: + raise RuntimeError( + f"Failed to install repository config with: {command!r}" + ) from error + finally: + os.unlink(f_name) + + +class AptSourcesManager: + """Manage apt source configuration in /etc/apt/sources.list.d. + + :param sources_list_d: Path to sources.list.d directory. + """ + + # pylint: disable=too-few-public-methods + def __init__( + self, + *, + sources_list_d: pathlib.Path = pathlib.Path("/etc/apt/sources.list.d"), + ) -> None: + self._sources_list_d = sources_list_d + + def _install_sources( + self, + *, + architectures: Optional[List[str]] = None, + components: Optional[List[str]] = None, + formats: Optional[List[str]] = None, + name: str, + suites: List[str], + url: str, + ) -> bool: + """Install sources list configuration. + + Write config to: + /etc/apt/sources.list.d/snapcraft-.sources + + :returns: True if configuration was changed. + """ + config = _construct_deb822_source( + architectures=architectures, + components=components, + formats=formats, + suites=suites, + url=url, + ) + + if name not in ["default", "default-security"]: + name = "snapcraft-" + name + + config_path = self._sources_list_d / f"{name}.sources" + if config_path.exists() and config_path.read_text() == config: + # Already installed and matches, nothing to do. + logger.debug("Ignoring unchanged sources: %s", str(config_path)) + return False + + _sudo_write_file(dst_path=config_path, content=config.encode()) + logger.debug("Installed sources: %s", str(config_path)) + return True + + def _install_sources_apt( + self, *, package_repo: package_repository.PackageRepositoryApt + ) -> bool: + """Install repository configuration. + + 1) First check to see if package repo is implied path, + or "bare repository" config. This is indicated when no + path, components, or suites are indicated. + 2) If path is specified, convert path to a suite entry, + ending with "/". + + Relatedly, this assumes all of the error-checking has been + done already on the package_repository object in a proper + fashion, but do some sanity checks here anyways. + + :returns: True if source configuration was changed. + """ + if ( + not package_repo.path + and not package_repo.components + and not package_repo.suites + ): + suites = ["/"] + elif package_repo.path: + # Suites denoting exact path must end with '/'. + path = package_repo.path + if not path.endswith("/"): + path += "/" + suites = [path] + elif package_repo.suites: + suites = package_repo.suites + if not package_repo.components: + raise RuntimeError("no components with suite") + else: + raise RuntimeError("no suites or path") + + if package_repo.name: + name = package_repo.name + else: + name = re.sub(r"\W+", "_", package_repo.url) + + return self._install_sources( + architectures=package_repo.architectures, + components=package_repo.components, + formats=package_repo.formats, + name=name, + suites=suites, + url=package_repo.url, + ) + + def _install_sources_ppa( + self, *, package_repo: package_repository.PackageRepositoryAptPpa + ) -> bool: + """Install PPA formatted repository. + + Create a sources list config by: + - Looking up the codename of the host OS and using it as the "suites" + entry. + - Formulate deb URL to point to PPA. + - Enable only "deb" formats. + + :returns: True if source configuration was changed. + """ + owner, name = apt_ppa.split_ppa_parts(ppa=package_repo.ppa) + codename = os_release.OsRelease().version_codename() + + return self._install_sources( + components=["main"], + formats=["deb"], + name=f"ppa-{owner}_{name}", + suites=[codename], + url=f"http://ppa.launchpad.net/{owner}/{name}/ubuntu", + ) + + def install_package_repository_sources( + self, + *, + package_repo: package_repository.PackageRepository, + ) -> bool: + """Install configured package repositories. + + :param package_repo: Repository to install the source configuration for. + + :returns: True if source configuration was changed. + """ + logger.debug("Processing repo: %r", package_repo) + if isinstance(package_repo, package_repository.PackageRepositoryAptPpa): + return self._install_sources_ppa(package_repo=package_repo) + + if isinstance(package_repo, package_repository.PackageRepositoryApt): + return self._install_sources_apt(package_repo=package_repo) + + raise RuntimeError(f"unhandled package repository: {package_repository!r}") diff --git a/snapcraft_legacy/plugins/v1/_plugin.py b/snapcraft_legacy/plugins/v1/_plugin.py index fdc992311c..7c6e5801fa 100644 --- a/snapcraft_legacy/plugins/v1/_plugin.py +++ b/snapcraft_legacy/plugins/v1/_plugin.py @@ -21,9 +21,8 @@ from subprocess import CalledProcessError from typing import List -from craft_archives.repo.package_repository import PackageRepository - from snapcraft_legacy.internal import common, errors +from snapcraft_legacy.internal.meta.package_repository import PackageRepository from snapcraft_legacy.project import Project logger = logging.getLogger(__name__) diff --git a/snapcraft_legacy/plugins/v1/catkin.py b/snapcraft_legacy/plugins/v1/catkin.py index 73fe9de2e1..0bf45cf79d 100644 --- a/snapcraft_legacy/plugins/v1/catkin.py +++ b/snapcraft_legacy/plugins/v1/catkin.py @@ -81,13 +81,12 @@ import textwrap from typing import TYPE_CHECKING, List, Set -from craft_archives.repo.package_repository import ( +from snapcraft_legacy import file_utils, formatting_utils +from snapcraft_legacy.internal import common, errors, mangling, os_release, repo +from snapcraft_legacy.internal.meta.package_repository import ( PackageRepository, PackageRepositoryApt, ) - -from snapcraft_legacy import file_utils, formatting_utils -from snapcraft_legacy.internal import common, errors, mangling, os_release, repo from snapcraft_legacy.plugins.v1 import PluginV1, _python, _ros if TYPE_CHECKING: diff --git a/snapcraft_legacy/plugins/v1/colcon.py b/snapcraft_legacy/plugins/v1/colcon.py index 73737be1ce..522e2db918 100644 --- a/snapcraft_legacy/plugins/v1/colcon.py +++ b/snapcraft_legacy/plugins/v1/colcon.py @@ -66,13 +66,12 @@ import textwrap from typing import List -from craft_archives.repo.package_repository import ( +from snapcraft_legacy import file_utils +from snapcraft_legacy.internal import errors, mangling, os_release, repo +from snapcraft_legacy.internal.meta.package_repository import ( PackageRepository, PackageRepositoryApt, ) - -from snapcraft_legacy import file_utils -from snapcraft_legacy.internal import errors, mangling, os_release, repo from snapcraft_legacy.plugins.v1 import PluginV1, _python, _ros logger = logging.getLogger(__name__) diff --git a/tests/legacy/unit/meta/test_package_repository.py b/tests/legacy/unit/meta/test_package_repository.py new file mode 100644 index 0000000000..c81c6c5688 --- /dev/null +++ b/tests/legacy/unit/meta/test_package_repository.py @@ -0,0 +1,411 @@ +# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*- +# +# Copyright (C) 2019 Canonical Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +import pytest + +from snapcraft_legacy.internal.meta import errors +from snapcraft_legacy.internal.meta.package_repository import ( + PackageRepository, + PackageRepositoryApt, + PackageRepositoryAptPpa, +) + + +def test_apt_name(): + repo = PackageRepositoryApt( + architectures=["amd64", "i386"], + components=["main", "multiverse"], + formats=["deb", "deb-src"], + key_id="A" * 40, + key_server="keyserver.ubuntu.com", + suites=["xenial", "xenial-updates"], + url="http://archive.ubuntu.com/ubuntu", + ) + + assert repo.name == "http_archive_ubuntu_com_ubuntu" + + +@pytest.mark.parametrize( + "arch", ["amd64", "armhf", "arm64", "i386", "ppc64el", "riscv", "s390x"] +) +def test_apt_valid_architectures(arch): + package_repo = PackageRepositoryApt( + key_id="A" * 40, url="http://test", architectures=[arch] + ) + + assert package_repo.architectures == [arch] + + +def test_apt_invalid_url(): + with pytest.raises(errors.PackageRepositoryValidationError) as exc_info: + PackageRepositoryApt( + key_id="A" * 40, + url="", + ) + + assert exc_info.value.brief == "Invalid URL ''." + assert exc_info.value.details == "URLs must be non-empty strings." + assert ( + exc_info.value.resolution + == "Verify the repository configuration and ensure that 'url' is correctly specified." + ) + + +def test_apt_invalid_path(): + with pytest.raises(errors.PackageRepositoryValidationError) as exc_info: + PackageRepositoryApt( + key_id="A" * 40, + path="", + url="http://archive.ubuntu.com/ubuntu", + ) + + assert exc_info.value.brief == "Invalid path ''." + assert exc_info.value.details == "Paths must be non-empty strings." + assert ( + exc_info.value.resolution + == "Verify the repository configuration and ensure that 'path' is a non-empty string such as '/'." + ) + + +def test_apt_invalid_path_with_suites(): + with pytest.raises(errors.PackageRepositoryValidationError) as exc_info: + PackageRepositoryApt( + key_id="A" * 40, + path="/", + suites=["xenial", "xenial-updates"], + url="http://archive.ubuntu.com/ubuntu", + ) + + assert ( + exc_info.value.brief + == "Suites ['xenial', 'xenial-updates'] cannot be combined with path '/'." + ) + assert exc_info.value.details == "Path and suites are incomptiable options." + assert ( + exc_info.value.resolution + == "Verify the repository configuration and remove 'path' or 'suites'." + ) + + +def test_apt_invalid_path_with_components(): + with pytest.raises(errors.PackageRepositoryValidationError) as exc_info: + PackageRepositoryApt( + key_id="A" * 40, + path="/", + components=["main"], + url="http://archive.ubuntu.com/ubuntu", + ) + + assert ( + exc_info.value.brief == "Components ['main'] cannot be combined with path '/'." + ) + assert exc_info.value.details == "Path and components are incomptiable options." + assert ( + exc_info.value.resolution + == "Verify the repository configuration and remove 'path' or 'components'." + ) + + +def test_apt_invalid_missing_components(): + with pytest.raises(errors.PackageRepositoryValidationError) as exc_info: + PackageRepositoryApt( + key_id="A" * 40, + suites=["xenial", "xenial-updates"], + url="http://archive.ubuntu.com/ubuntu", + ) + + assert exc_info.value.brief == "No components specified." + assert exc_info.value.details == "Components are required when using suites." + assert ( + exc_info.value.resolution + == "Verify the repository configuration and ensure that 'components' is correctly specified." + ) + + +def test_apt_invalid_missing_suites(): + with pytest.raises(errors.PackageRepositoryValidationError) as exc_info: + PackageRepositoryApt( + key_id="A" * 40, + components=["main"], + url="http://archive.ubuntu.com/ubuntu", + ) + + assert exc_info.value.brief == "No suites specified." + assert exc_info.value.details == "Suites are required when using components." + assert ( + exc_info.value.resolution + == "Verify the repository configuration and ensure that 'suites' is correctly specified." + ) + + +def test_apt_invalid_suites_as_path(): + with pytest.raises(errors.PackageRepositoryValidationError) as exc_info: + PackageRepositoryApt( + key_id="A" * 40, + suites=["my-suite/"], + url="http://archive.ubuntu.com/ubuntu", + ) + + assert exc_info.value.brief == "Invalid suite 'my-suite/'." + assert exc_info.value.details == "Suites must not end with a '/'." + assert ( + exc_info.value.resolution + == "Verify the repository configuration and remove the trailing '/ from suites or use the 'path' property to define a path." + ) + + +def test_apt_marshal(): + repo = PackageRepositoryApt( + architectures=["amd64", "i386"], + components=["main", "multiverse"], + formats=["deb", "deb-src"], + key_id="A" * 40, + key_server="xkeyserver.ubuntu.com", + name="test-name", + suites=["xenial", "xenial-updates"], + url="http://archive.ubuntu.com/ubuntu", + ) + + assert repo.marshal() == { + "architectures": ["amd64", "i386"], + "components": ["main", "multiverse"], + "formats": ["deb", "deb-src"], + "key-id": "A" * 40, + "key-server": "xkeyserver.ubuntu.com", + "name": "test-name", + "suites": ["xenial", "xenial-updates"], + "type": "apt", + "url": "http://archive.ubuntu.com/ubuntu", + } + + +def test_apt_unmarshal_invalid_extra_keys(): + test_dict = { + "architectures": ["amd64", "i386"], + "components": ["main", "multiverse"], + "formats": ["deb", "deb-src"], + "key-id": "A" * 40, + "key-server": "keyserver.ubuntu.com", + "name": "test-name", + "suites": ["xenial", "xenial-updates"], + "type": "apt", + "url": "http://archive.ubuntu.com/ubuntu", + "foo": "bar", + "foo2": "bar", + } + + with pytest.raises(errors.PackageRepositoryValidationError) as exc_info: + PackageRepositoryApt.unmarshal(test_dict) + + assert ( + exc_info.value.brief + == "Found unsupported package repository properties 'foo', 'foo2'." + ) + assert exc_info.value.details is None + assert ( + exc_info.value.resolution + == "Verify repository configuration and ensure it is correct." + ) + + +def test_apt_unmarshal_invalid_data(): + test_dict = "not-a-dict" + + with pytest.raises(errors.PackageRepositoryValidationError) as exc_info: + PackageRepositoryApt.unmarshal(test_dict) + + assert exc_info.value.brief == "Invalid package repository object 'not-a-dict'." + assert ( + exc_info.value.details + == "Package repository must be a valid dictionary object." + ) + assert ( + exc_info.value.resolution + == "Verify repository configuration and ensure that the correct syntax is used." + ) + + +def test_apt_unmarshal_invalid_type(): + test_dict = { + "architectures": ["amd64", "i386"], + "components": ["main", "multiverse"], + "formats": ["deb", "deb-src"], + "key-id": "A" * 40, + "key-server": "keyserver.ubuntu.com", + "name": "test-name", + "suites": ["xenial", "xenial-updates"], + "type": "aptx", + "url": "http://archive.ubuntu.com/ubuntu", + } + + with pytest.raises(errors.PackageRepositoryValidationError) as exc_info: + PackageRepositoryApt.unmarshal(test_dict) + + assert exc_info.value.brief == "Unsupported type 'aptx'." + assert exc_info.value.details == "The only currently supported type is 'apt'." + assert ( + exc_info.value.resolution + == "Verify repository configuration and ensure that 'type' is correctly specified." + ) + + +def test_ppa_marshal(): + repo = PackageRepositoryAptPpa(ppa="test/ppa") + + assert repo.marshal() == {"type": "apt", "ppa": "test/ppa"} + + +def test_ppa_invalid_ppa(): + with pytest.raises(errors.PackageRepositoryValidationError) as exc_info: + PackageRepositoryAptPpa(ppa="") + + assert exc_info.value.brief == "Invalid PPA ''." + assert exc_info.value.details == "PPAs must be non-empty strings." + assert ( + exc_info.value.resolution + == "Verify repository configuration and ensure that 'ppa' is correctly specified." + ) + + +def test_ppa_unmarshal_invalid_data(): + test_dict = "not-a-dict" + + with pytest.raises(errors.PackageRepositoryValidationError) as exc_info: + PackageRepositoryAptPpa.unmarshal(test_dict) + + assert exc_info.value.brief == "Invalid package repository object 'not-a-dict'." + assert ( + exc_info.value.details + == "Package repository must be a valid dictionary object." + ) + assert ( + exc_info.value.resolution + == "Verify repository configuration and ensure that the correct syntax is used." + ) + + +def test_ppa_unmarshal_invalid_apt_ppa_type(): + test_dict = {"type": "aptx", "ppa": "test/ppa"} + + with pytest.raises(errors.PackageRepositoryValidationError) as exc_info: + PackageRepositoryAptPpa.unmarshal(test_dict) + + assert exc_info.value.brief == "Unsupported type 'aptx'." + assert exc_info.value.details == "The only currently supported type is 'apt'." + assert ( + exc_info.value.resolution + == "Verify repository configuration and ensure that 'type' is correctly specified." + ) + + +def test_ppa_unmarshal_invalid_apt_ppa_extra_keys(): + test_dict = {"type": "apt", "ppa": "test/ppa", "test": "foo"} + + with pytest.raises(errors.PackageRepositoryValidationError) as exc_info: + PackageRepositoryAptPpa.unmarshal(test_dict) + + assert ( + exc_info.value.brief + == "Found unsupported package repository properties 'test'." + ) + assert exc_info.value.details is None + assert ( + exc_info.value.resolution + == "Verify repository configuration and ensure that it is correct." + ) + + +def test_unmarshal_package_repositories_list_none(): + assert PackageRepository.unmarshal_package_repositories(None) == list() + + +def test_unmarshal_package_repositories_list_empty(): + assert PackageRepository.unmarshal_package_repositories(list()) == list() + + +def test_unmarshal_package_repositories_list_ppa(): + test_dict = {"type": "apt", "ppa": "test/foo"} + test_list = [test_dict] + + unmarshalled_list = [ + repo.marshal() + for repo in PackageRepository.unmarshal_package_repositories(test_list) + ] + + assert unmarshalled_list == test_list + + +def test_unmarshal_package_repositories_list_apt(): + test_dict = { + "architectures": ["amd64", "i386"], + "components": ["main", "multiverse"], + "formats": ["deb", "deb-src"], + "key-id": "A" * 40, + "key-server": "keyserver.ubuntu.com", + "name": "test-name", + "suites": ["xenial", "xenial-updates"], + "type": "apt", + "url": "http://archive.ubuntu.com/ubuntu", + } + + test_list = [test_dict] + + unmarshalled_list = [ + repo.marshal() + for repo in PackageRepository.unmarshal_package_repositories(test_list) + ] + + assert unmarshalled_list == test_list + + +def test_unmarshal_package_repositories_list_all(): + test_ppa = {"type": "apt", "ppa": "test/foo"} + + test_deb = { + "architectures": ["amd64", "i386"], + "components": ["main", "multiverse"], + "formats": ["deb", "deb-src"], + "key-id": "A" * 40, + "key-server": "keyserver.ubuntu.com", + "name": "test-name", + "suites": ["xenial", "xenial-updates"], + "type": "apt", + "url": "http://archive.ubuntu.com/ubuntu", + } + + test_list = [test_ppa, test_deb] + + unmarshalled_list = [ + repo.marshal() + for repo in PackageRepository.unmarshal_package_repositories(test_list) + ] + + assert unmarshalled_list == test_list + + +def test_unmarshal_package_repositories_invalid_data(): + with pytest.raises(errors.PackageRepositoryValidationError) as exc_info: + PackageRepository.unmarshal_package_repositories("not-a-list") + + assert ( + exc_info.value.brief == "Invalid package-repositories list object 'not-a-list'." + ) + assert exc_info.value.details == "Package repositories must be a list of objects." + assert ( + exc_info.value.resolution + == "Verify 'package-repositories' configuration and ensure that the correct syntax is used." + ) diff --git a/tests/legacy/unit/repo/test_apt_key_manager.py b/tests/legacy/unit/repo/test_apt_key_manager.py new file mode 100644 index 0000000000..e5e14bbaab --- /dev/null +++ b/tests/legacy/unit/repo/test_apt_key_manager.py @@ -0,0 +1,339 @@ +# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*- +# +# Copyright (C) 2020 Canonical Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +import subprocess +from unittest import mock +from unittest.mock import call + +import gnupg +import pytest + +from snapcraft_legacy.internal.meta.package_repository import ( + PackageRepositoryApt, + PackageRepositoryAptPpa, +) +from snapcraft_legacy.internal.repo import apt_ppa, errors +from snapcraft_legacy.internal.repo.apt_key_manager import AptKeyManager + + +@pytest.fixture(autouse=True) +def mock_environ_copy(): + with mock.patch("os.environ.copy") as m: + yield m + + +@pytest.fixture(autouse=True) +def mock_gnupg(tmp_path, autouse=True): + with mock.patch("gnupg.GPG", spec=gnupg.GPG) as m: + m.return_value.import_keys.return_value.fingerprints = [ + "FAKE-KEY-ID-FROM-GNUPG" + ] + yield m + + +@pytest.fixture(autouse=True) +def mock_run(): + with mock.patch("subprocess.run", spec=subprocess.run) as m: + yield m + + +@pytest.fixture(autouse=True) +def mock_apt_ppa_get_signing_key(): + with mock.patch( + "snapcraft_legacy.internal.repo.apt_ppa.get_launchpad_ppa_key_id", + spec=apt_ppa.get_launchpad_ppa_key_id, + return_value="FAKE-PPA-SIGNING-KEY", + ) as m: + yield m + + +@pytest.fixture +def key_assets(tmp_path): + key_assets = tmp_path / "key-assets" + key_assets.mkdir(parents=True) + yield key_assets + + +@pytest.fixture +def gpg_keyring(tmp_path): + yield tmp_path / "keyring.gpg" + + +@pytest.fixture +def apt_gpg(key_assets, gpg_keyring): + yield AptKeyManager( + gpg_keyring=gpg_keyring, + key_assets=key_assets, + ) + + +def test_find_asset( + apt_gpg, + key_assets, +): + key_id = "8" * 40 + expected_key_path = key_assets / ("8" * 8 + ".asc") + expected_key_path.write_text("key") + + key_path = apt_gpg.find_asset_with_key_id(key_id=key_id) + + assert key_path == expected_key_path + + +def test_find_asset_none( + apt_gpg, +): + key_path = apt_gpg.find_asset_with_key_id(key_id="foo") + + assert key_path is None + + +def test_get_key_fingerprints( + apt_gpg, + mock_gnupg, +): + with mock.patch("tempfile.NamedTemporaryFile") as m: + m.return_value.__enter__.return_value.name = "/tmp/foo" + ids = apt_gpg.get_key_fingerprints(key="8" * 40) + + assert ids == ["FAKE-KEY-ID-FROM-GNUPG"] + assert mock_gnupg.mock_calls == [ + call(keyring="/tmp/foo"), + call().import_keys(key_data="8888888888888888888888888888888888888888"), + ] + + +@pytest.mark.parametrize( + "stdout,expected", + [ + (b"nothing exported", False), + (b"BEGIN PGP PUBLIC KEY BLOCK", True), + (b"invalid", False), + ], +) +def test_is_key_installed( + stdout, + expected, + apt_gpg, + mock_run, +): + mock_run.return_value.stdout = stdout + + is_installed = apt_gpg.is_key_installed(key_id="foo") + + assert is_installed is expected + assert mock_run.mock_calls == [ + call( + ["sudo", "apt-key", "export", "foo"], + check=True, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + ) + ] + + +def test_is_key_installed_with_apt_key_failure( + apt_gpg, + mock_run, +): + mock_run.side_effect = subprocess.CalledProcessError( + cmd=["apt-key"], returncode=1, output=b"some error" + ) + + is_installed = apt_gpg.is_key_installed(key_id="foo") + + assert is_installed is False + + +def test_install_key( + apt_gpg, + gpg_keyring, + mock_run, +): + key = "some-fake-key" + apt_gpg.install_key(key=key) + + assert mock_run.mock_calls == [ + call( + ["sudo", "apt-key", "--keyring", str(gpg_keyring), "add", "-"], + check=True, + env={"LANG": "C.UTF-8"}, + input=b"some-fake-key", + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + ) + ] + + +def test_install_key_with_apt_key_failure(apt_gpg, mock_run): + mock_run.side_effect = subprocess.CalledProcessError( + cmd=["foo"], returncode=1, output=b"some error" + ) + + with pytest.raises(errors.AptGPGKeyInstallError) as exc_info: + apt_gpg.install_key(key="FAKEKEY") + + assert exc_info.value._output == "some error" + assert exc_info.value._key == "FAKEKEY" + + +def test_install_key_from_keyserver(apt_gpg, gpg_keyring, mock_run): + apt_gpg.install_key_from_keyserver(key_id="FAKE_KEYID", key_server="key.server") + + assert mock_run.mock_calls == [ + call( + [ + "sudo", + "apt-key", + "--keyring", + str(gpg_keyring), + "adv", + "--keyserver", + "key.server", + "--recv-keys", + "FAKE_KEYID", + ], + check=True, + env={"LANG": "C.UTF-8"}, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + ) + ] + + +def test_install_key_from_keyserver_with_apt_key_failure( + apt_gpg, gpg_keyring, mock_run +): + mock_run.side_effect = subprocess.CalledProcessError( + cmd=["apt-key"], returncode=1, output=b"some error" + ) + + with pytest.raises(errors.AptGPGKeyInstallError) as exc_info: + apt_gpg.install_key_from_keyserver( + key_id="fake-key-id", key_server="fake-server" + ) + + assert exc_info.value._output == "some error" + assert exc_info.value._key_id == "fake-key-id" + + +@mock.patch( + "snapcraft_legacy.internal.repo.apt_key_manager.AptKeyManager.is_key_installed" +) +@pytest.mark.parametrize( + "is_installed", + [True, False], +) +def test_install_package_repository_key_already_installed( + mock_is_key_installed, + is_installed, + apt_gpg, +): + mock_is_key_installed.return_value = is_installed + package_repo = PackageRepositoryApt( + components=["main", "multiverse"], + key_id="8" * 40, + key_server="xkeyserver.com", + suites=["xenial"], + url="http://archive.ubuntu.com/ubuntu", + ) + + updated = apt_gpg.install_package_repository_key(package_repo=package_repo) + + assert updated is not is_installed + + +@mock.patch( + "snapcraft_legacy.internal.repo.apt_key_manager.AptKeyManager.is_key_installed", + return_value=False, +) +@mock.patch("snapcraft_legacy.internal.repo.apt_key_manager.AptKeyManager.install_key") +def test_install_package_repository_key_from_asset( + mock_install_key, + mock_is_key_installed, + apt_gpg, + key_assets, +): + key_id = "123456789012345678901234567890123456AABB" + expected_key_path = key_assets / "3456AABB.asc" + expected_key_path.write_text("key-data") + + package_repo = PackageRepositoryApt( + components=["main", "multiverse"], + key_id=key_id, + suites=["xenial"], + url="http://archive.ubuntu.com/ubuntu", + ) + + updated = apt_gpg.install_package_repository_key(package_repo=package_repo) + + assert updated is True + assert mock_install_key.mock_calls == [call(key="key-data")] + + +@mock.patch( + "snapcraft_legacy.internal.repo.apt_key_manager.AptKeyManager.is_key_installed", + return_value=False, +) +@mock.patch( + "snapcraft_legacy.internal.repo.apt_key_manager.AptKeyManager.install_key_from_keyserver" +) +def test_install_package_repository_key_apt_from_keyserver( + mock_install_key_from_keyserver, + mock_is_key_installed, + apt_gpg, +): + key_id = "8" * 40 + + package_repo = PackageRepositoryApt( + components=["main", "multiverse"], + key_id=key_id, + key_server="key.server", + suites=["xenial"], + url="http://archive.ubuntu.com/ubuntu", + ) + + updated = apt_gpg.install_package_repository_key(package_repo=package_repo) + + assert updated is True + assert mock_install_key_from_keyserver.mock_calls == [ + call(key_id=key_id, key_server="key.server") + ] + + +@mock.patch( + "snapcraft_legacy.internal.repo.apt_key_manager.AptKeyManager.is_key_installed", + return_value=False, +) +@mock.patch( + "snapcraft_legacy.internal.repo.apt_key_manager.AptKeyManager.install_key_from_keyserver" +) +def test_install_package_repository_key_ppa_from_keyserver( + mock_install_key_from_keyserver, + mock_is_key_installed, + apt_gpg, +): + package_repo = PackageRepositoryAptPpa( + ppa="test/ppa", + ) + + updated = apt_gpg.install_package_repository_key(package_repo=package_repo) + + assert updated is True + assert mock_install_key_from_keyserver.mock_calls == [ + call(key_id="FAKE-PPA-SIGNING-KEY", key_server="keyserver.ubuntu.com") + ] diff --git a/tests/legacy/unit/repo/test_apt_ppa.py b/tests/legacy/unit/repo/test_apt_ppa.py new file mode 100644 index 0000000000..095f70113b --- /dev/null +++ b/tests/legacy/unit/repo/test_apt_ppa.py @@ -0,0 +1,62 @@ +# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*- +# +# Copyright (C) 2020 Canonical Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +from unittest import mock +from unittest.mock import call + +import launchpadlib +import pytest + +from snapcraft_legacy.internal.repo import apt_ppa, errors + + +@pytest.fixture +def mock_launchpad(autouse=True): + with mock.patch( + "snapcraft_legacy.internal.repo.apt_ppa.Launchpad", + spec=launchpadlib.launchpad.Launchpad, + ) as m: + m.login_anonymously.return_value.load.return_value.signing_key_fingerprint = ( + "FAKE-PPA-SIGNING-KEY" + ) + yield m + + +def test_split_ppa_parts(): + owner, name = apt_ppa.split_ppa_parts(ppa="test-owner/test-name") + + assert owner == "test-owner" + assert name == "test-name" + + +def test_split_ppa_parts_invalid(): + with pytest.raises(errors.AptPPAInstallError) as exc_info: + apt_ppa.split_ppa_parts(ppa="ppa-missing-slash") + + assert exc_info.value._ppa == "ppa-missing-slash" + + +def test_get_launchpad_ppa_key_id( + mock_launchpad, +): + key_id = apt_ppa.get_launchpad_ppa_key_id(ppa="ppa-owner/ppa-name") + + assert key_id == "FAKE-PPA-SIGNING-KEY" + assert mock_launchpad.mock_calls == [ + call.login_anonymously("snapcraft", "production"), + call.login_anonymously().load("~ppa-owner/+archive/ppa-name"), + ] diff --git a/tests/legacy/unit/repo/test_apt_sources_manager.py b/tests/legacy/unit/repo/test_apt_sources_manager.py new file mode 100644 index 0000000000..2f8f59bd10 --- /dev/null +++ b/tests/legacy/unit/repo/test_apt_sources_manager.py @@ -0,0 +1,269 @@ +# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*- +# +# Copyright (C) 2021 Canonical Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 3 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +import pathlib +import subprocess +from textwrap import dedent +from unittest import mock +from unittest.mock import call + +import pytest + +from snapcraft_legacy.internal.meta.package_repository import ( + PackageRepositoryApt, + PackageRepositoryAptPpa, +) +from snapcraft_legacy.internal.repo import apt_ppa, apt_sources_manager, errors + + +@pytest.fixture(autouse=True) +def mock_apt_ppa_get_signing_key(): + with mock.patch( + "snapcraft_legacy.internal.repo.apt_ppa.get_launchpad_ppa_key_id", + spec=apt_ppa.get_launchpad_ppa_key_id, + return_value="FAKE-PPA-SIGNING-KEY", + ) as m: + yield m + + +@pytest.fixture(autouse=True) +def mock_environ_copy(): + with mock.patch("os.environ.copy") as m: + yield m + + +@pytest.fixture(autouse=True) +def mock_host_arch(): + with mock.patch( + "snapcraft_legacy.internal.repo.apt_sources_manager.ProjectOptions" + ) as m: + m.return_value.deb_arch = "FAKE-HOST-ARCH" + yield m + + +@pytest.fixture(autouse=True) +def mock_run(): + with mock.patch("subprocess.run") as m: + yield m + + +@pytest.fixture() +def mock_sudo_write(): + def write_file(*, dst_path: pathlib.Path, content: bytes) -> None: + dst_path.write_bytes(content) + + with mock.patch( + "snapcraft_legacy.internal.repo.apt_sources_manager._sudo_write_file" + ) as m: + m.side_effect = write_file + yield m + + +@pytest.fixture(autouse=True) +def mock_version_codename(): + with mock.patch( + "snapcraft_legacy.internal.os_release.OsRelease.version_codename", + return_value="FAKE-CODENAME", + ) as m: + yield m + + +@pytest.fixture +def apt_sources_mgr(tmp_path): + sources_list_d = tmp_path / "sources.list.d" + sources_list_d.mkdir(parents=True) + + yield apt_sources_manager.AptSourcesManager( + sources_list_d=sources_list_d, + ) + + +@mock.patch("tempfile.NamedTemporaryFile") +@mock.patch("os.unlink") +def test_sudo_write_file(mock_unlink, mock_tempfile, mock_run, tmp_path): + mock_tempfile.return_value.__enter__.return_value.name = "/tmp/foobar" + + apt_sources_manager._sudo_write_file(dst_path="/foo/bar", content=b"some-content") + + assert mock_tempfile.mock_calls == [ + call(delete=False), + call().__enter__(), + call().__enter__().write(b"some-content"), + call().__enter__().flush(), + call().__exit__(None, None, None), + ] + assert mock_run.mock_calls == [ + call( + [ + "sudo", + "install", + "--owner=root", + "--group=root", + "--mode=0644", + "/tmp/foobar", + "/foo/bar", + ], + check=True, + ) + ] + assert mock_unlink.mock_calls == [call("/tmp/foobar")] + + +def test_sudo_write_file_fails(mock_run): + mock_run.side_effect = subprocess.CalledProcessError( + cmd=["sudo"], returncode=1, output=b"some error" + ) + + with pytest.raises(RuntimeError) as error: + apt_sources_manager._sudo_write_file( + dst_path="/foo/bar", content=b"some-content" + ) + + assert ( + str(error.value).startswith( + "Failed to install repository config with: ['sudo', 'install'" + ) + is True + ) + + +@pytest.mark.parametrize( + "package_repo,name,content", + [ + ( + PackageRepositoryApt( + architectures=["amd64", "arm64"], + components=["test-component"], + formats=["deb", "deb-src"], + key_id="A" * 40, + suites=["test-suite1", "test-suite2"], + url="http://test.url/ubuntu", + ), + "snapcraft-http_test_url_ubuntu.sources", + dedent( + """\ + Types: deb deb-src + URIs: http://test.url/ubuntu + Suites: test-suite1 test-suite2 + Components: test-component + Architectures: amd64 arm64 + """ + ).encode(), + ), + ( + PackageRepositoryApt( + architectures=["amd64", "arm64"], + components=["test-component"], + key_id="A" * 40, + name="NO-FORMAT", + suites=["test-suite1", "test-suite2"], + url="http://test.url/ubuntu", + ), + "snapcraft-NO-FORMAT.sources", + dedent( + """\ + Types: deb + URIs: http://test.url/ubuntu + Suites: test-suite1 test-suite2 + Components: test-component + Architectures: amd64 arm64 + """ + ).encode(), + ), + ( + PackageRepositoryApt( + key_id="A" * 40, + name="WITH-PATH", + path="some-path", + url="http://test.url/ubuntu", + ), + "snapcraft-WITH-PATH.sources", + dedent( + """\ + Types: deb + URIs: http://test.url/ubuntu + Suites: some-path/ + Architectures: FAKE-HOST-ARCH + """ + ).encode(), + ), + ( + PackageRepositoryApt( + key_id="A" * 40, + name="IMPLIED-PATH", + url="http://test.url/ubuntu", + ), + "snapcraft-IMPLIED-PATH.sources", + dedent( + """\ + Types: deb + URIs: http://test.url/ubuntu + Suites: / + Architectures: FAKE-HOST-ARCH + """ + ).encode(), + ), + ( + PackageRepositoryAptPpa(ppa="test/ppa"), + "snapcraft-ppa-test_ppa.sources", + dedent( + """\ + Types: deb + URIs: http://ppa.launchpad.net/test/ppa/ubuntu + Suites: FAKE-CODENAME + Components: main + Architectures: FAKE-HOST-ARCH + """ + ).encode(), + ), + ], +) +def test_install(package_repo, name, content, apt_sources_mgr, mock_sudo_write): + sources_path = apt_sources_mgr._sources_list_d / name + + changed = apt_sources_mgr.install_package_repository_sources( + package_repo=package_repo + ) + + assert changed is True + assert sources_path.read_bytes() == content + assert mock_sudo_write.mock_calls == [ + call( + content=content, + dst_path=sources_path, + ) + ] + + # Verify a second-run does not incur any changes. + mock_sudo_write.reset_mock() + + changed = apt_sources_mgr.install_package_repository_sources( + package_repo=package_repo + ) + + assert changed is False + assert sources_path.read_bytes() == content + assert mock_sudo_write.mock_calls == [] + + +def test_install_ppa_invalid(apt_sources_mgr): + repo = PackageRepositoryAptPpa(ppa="ppa-missing-slash") + + with pytest.raises(errors.AptPPAInstallError) as exc_info: + apt_sources_mgr.install_package_repository_sources(package_repo=repo) + + assert exc_info.value._ppa == "ppa-missing-slash"