diff --git a/craft_application/errors.py b/craft_application/errors.py index fe4c3d0b..e75e9f45 100644 --- a/craft_application/errors.py +++ b/craft_application/errors.py @@ -104,3 +104,20 @@ def from_os_error(cls, err: OSError) -> Self: if err.filename2: details += f", filename2: {err.filename2!r}" return cls(message, details=details) + + +class SecretsCommandError(CraftError): + """Error when rendering a build-secret.""" + + def __init__(self, host_secret: str, error_message: str) -> None: + message = f'Error when processing secret "{host_secret}"' + details = f"Command output: {error_message}" + super().__init__(message=message, details=details) + + +class SecretsFieldError(CraftError): + """Error when using a build-secret in a disallowed field.""" + + def __init__(self, host_secret: str, field_name: str) -> None: + message = f'Build secret "{host_secret}" is not allowed on field "{field_name}"' + super().__init__(message=message) diff --git a/craft_application/secrets.py b/craft_application/secrets.py new file mode 100644 index 00000000..92867aa6 --- /dev/null +++ b/craft_application/secrets.py @@ -0,0 +1,118 @@ +# This file is part of craft_application. +# +# Copyright 2023 Canonical Ltd. +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranties of MERCHANTABILITY, +# SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR PURPOSE. +# See the GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with this program. If not, see . +"""Handling of build-time secrets.""" +from __future__ import annotations + +import re +import subprocess +from typing import Any + +from craft_application import errors + +SECRET_REGEX = re.compile(r"\$\(HOST_SECRET:(?P.*)\)") + + +def render_secrets(yaml_data: dict[str, Any]) -> set[str]: + """Render/expand the build secrets in a project's yaml data (in-place). + + This function will process directives of the form $(HOST_SECRET:) in string + values in ``yaml_data``. For each such directive, the part is executed (with + bash) and the resulting output replaces the whole directive. The returned set + contains the outputs of all HOST_SECRET processing (for masking with craft-cli). + + Note that only a few fields are currently supported: + + - "source" and "build-environment" for parts. + + Using HOST_SECRET directives in any other field is an error. + """ + command_cache: dict[str, str] = {} + + # Process the fields where we allow build secrets + parts = yaml_data.get("parts", {}) + for part in parts.values(): + _render_part_secrets(part, command_cache) + + # Now loop over all the data to check for build secrets in disallowed fields + _check_for_secrets(yaml_data) + + return set(command_cache.values()) + + +def _render_part_secrets( + part_data: dict[str, Any], command_cache: dict[str, Any] +) -> None: + # Render "source" + source = part_data.get("source", "") + if (rendered := _render_secret(source, command_cache)) is not None: + part_data["source"] = rendered + + # Render "build-environment" + build_env = part_data.get("build-environment", []) + # "build-environment" is a list of dicts with a single item each + for single_entry_dict in build_env: + for var_name, var_value in single_entry_dict.items(): + if (rendered := _render_secret(var_value, command_cache)) is not None: + single_entry_dict[var_name] = rendered + + +def _render_secret(text: str, command_cache: dict[str, str]) -> str | None: + if match := SECRET_REGEX.search(text): + command = match.group("command") + host_directive = match.group(0) + + if command in command_cache: + output = command_cache[command] + else: + try: + output = _run_command(command) + except subprocess.CalledProcessError as err: + raise errors.SecretsCommandError( + host_directive, err.stderr.decode() + ) from err + command_cache[command] = output + + return text.replace(host_directive, output) + return None + + +def _run_command(command: str) -> str: + bash_command = f"set -euo pipefail; {command}" + return ( + subprocess.check_output(["bash", "-c", bash_command], stderr=subprocess.PIPE) + .decode("utf-8") + .strip() + ) + + +# pyright: reportUnknownVariableType=false,reportUnknownArgumentType=false +def _check_for_secrets(data: Any) -> None: # noqa: ANN401 (using Any on purpose) + if isinstance(data, dict): + for key, value in data.items(): + _check_str(value, field_name=key) + if isinstance(value, list): + for item in value: + _check_str(item, field_name=key) + _check_for_secrets(item) + elif isinstance(value, dict): + _check_for_secrets(value) + + +def _check_str( + value: Any, field_name: str # noqa: ANN401 (using Any on purpose) +) -> None: + if isinstance(value, str) and (match := SECRET_REGEX.search(value)): + raise errors.SecretsFieldError(host_secret=match.group(), field_name=field_name) diff --git a/tests/unit/test_secrets.py b/tests/unit/test_secrets.py new file mode 100644 index 00000000..99a8c2b1 --- /dev/null +++ b/tests/unit/test_secrets.py @@ -0,0 +1,103 @@ +# This file is part of craft_application. +# +# Copyright 2023 Canonical Ltd. +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranties of MERCHANTABILITY, +# SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License along +# with this program. If not, see . +"""Tests for build secrets.""" + +import pytest +from craft_application import errors, secrets + + +def test_secrets_parts(monkeypatch): + p1_data = { + "source": "the source secret is $(HOST_SECRET:echo ${SECRET_1})", + "build-environment": [ + {"VAR1": "the env secret is $(HOST_SECRET:echo ${SECRET_2})"}, + {"VAR2": "some value"}, + ], + } + + monkeypatch.setenv("SECRET_1", "source-secret") + monkeypatch.setenv("SECRET_2", "env-secret") + + yaml_data = {"parts": {"p1": p1_data}} + secret_values = secrets.render_secrets(yaml_data) + + assert secret_values == {"source-secret", "env-secret"} + + assert p1_data["source"] == "the source secret is source-secret" + assert p1_data["build-environment"][0]["VAR1"] == "the env secret is env-secret" + + # Check that the rest of the build-environment is preserved + assert p1_data["build-environment"][1]["VAR2"] == "some value" + + +def test_secrets_command_error(): + yaml_data = {"parts": {"p1": {"source": "$(HOST_SECRET:echo ${I_DONT_EXIST})"}}} + + with pytest.raises(errors.SecretsCommandError) as exc: + secrets.render_secrets(yaml_data) + + expected_message = ( + 'Error when processing secret "$(HOST_SECRET:echo ${I_DONT_EXIST})"' + ) + expected_details = "I_DONT_EXIST: unbound variable" + + error = exc.value + assert str(error) == expected_message + assert error.details is not None + assert expected_details in error.details + + +def test_secrets_cache(mocker, monkeypatch): + monkeypatch.setenv("SECRET_1", "secret") + p1_data = { + "source": "the source secret is $(HOST_SECRET:echo ${SECRET_1})", + "build-environment": [ + {"VAR1": "the env secret is $(HOST_SECRET:echo ${SECRET_1})"} + ], + } + yaml_data = {"parts": {"p1": p1_data}} + + spied_run = mocker.spy(secrets, "_run_command") + secrets.render_secrets(yaml_data) + + # Even though the HOST_SECRET is used twice, only a single bash call is done because + # the command is the same. + spied_run.assert_called_once_with("echo ${SECRET_1}") + + +_SECRET = "$(HOST_SECRET:echo ${GIT_VERSION})" # noqa: S105 (this is not a password) + + +@pytest.mark.parametrize( + ("yaml_data", "field_name"), + [ + # A basic string field + ({"version": f"v{_SECRET}"}, "version"), + # A list item + ({"stage-packages": ["first", "second", _SECRET]}, "stage-packages"), + # A dict value + ({"parts": {"p1": {"source-version": f"v{_SECRET}"}}}, "source-version"), + ], +) +def test_secrets_bad_field(monkeypatch, yaml_data, field_name): + monkeypatch.setenv("GIT_VERSION", "1.0") + + with pytest.raises(errors.SecretsFieldError) as exc: + secrets.render_secrets(yaml_data) + + expected_error = f'Build secret "{_SECRET}" is not allowed on field "{field_name}"' + err = exc.value + assert str(err) == expected_error