generated from canonical/starbase
-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This commit adds the functionality to support build-time secrets; this is placed in a new `secrets` module with a single public `render_secrets()` function. Note that this new code is not used by the Application itself yet.
- Loading branch information
Showing
3 changed files
with
238 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
# This file is part of craft_application. | ||
# | ||
# Copyright 2023 Canonical Ltd. | ||
# | ||
# This program is free software: you can redistribute it and/or modify it | ||
# under the terms of the GNU Lesser General Public License version 3, as | ||
# published by the Free Software Foundation. | ||
# | ||
# This program is distributed in the hope that it will be useful, but WITHOUT | ||
# ANY WARRANTY; without even the implied warranties of MERCHANTABILITY, | ||
# SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR PURPOSE. | ||
# See the GNU Lesser General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU Lesser General Public License along | ||
# with this program. If not, see <http://www.gnu.org/licenses/>. | ||
"""Handling of build-time secrets.""" | ||
from __future__ import annotations | ||
|
||
import re | ||
import subprocess | ||
from typing import Any | ||
|
||
from craft_application import errors | ||
|
||
SECRET_REGEX = re.compile(r"\$\(HOST_SECRET:(?P<command>.*)\)") | ||
|
||
|
||
def render_secrets(yaml_data: dict[str, Any]) -> set[str]: | ||
"""Render/expand the build secrets in a project's yaml data (in-place). | ||
This function will process directives of the form $(HOST_SECRET:<cmd>) in string | ||
values in ``yaml_data``. For each such directive, the <cmd> part is executed (with | ||
bash) and the resulting output replaces the whole directive. The returned set | ||
contains the outputs of all HOST_SECRET processing (for masking with craft-cli). | ||
Note that only a few fields are currently supported: | ||
- "source" and "build-environment" for parts. | ||
Using HOST_SECRET directives in any other field is an error. | ||
""" | ||
command_cache: dict[str, str] = {} | ||
|
||
# Process the fields where we allow build secrets | ||
parts = yaml_data.get("parts", {}) | ||
for part in parts.values(): | ||
_render_part_secrets(part, command_cache) | ||
|
||
# Now loop over all the data to check for build secrets in disallowed fields | ||
_check_for_secrets(yaml_data) | ||
|
||
return set(command_cache.values()) | ||
|
||
|
||
def _render_part_secrets( | ||
part_data: dict[str, Any], command_cache: dict[str, Any] | ||
) -> None: | ||
# Render "source" | ||
source = part_data.get("source", "") | ||
if (rendered := _render_secret(source, command_cache)) is not None: | ||
part_data["source"] = rendered | ||
|
||
# Render "build-environment" | ||
build_env = part_data.get("build-environment", []) | ||
# "build-environment" is a list of dicts with a single item each | ||
for single_entry_dict in build_env: | ||
for var_name, var_value in single_entry_dict.items(): | ||
if (rendered := _render_secret(var_value, command_cache)) is not None: | ||
single_entry_dict[var_name] = rendered | ||
|
||
|
||
def _render_secret(text: str, command_cache: dict[str, str]) -> str | None: | ||
if match := SECRET_REGEX.search(text): | ||
command = match.group("command") | ||
host_directive = match.group(0) | ||
|
||
if command in command_cache: | ||
output = command_cache[command] | ||
else: | ||
try: | ||
output = _run_command(command) | ||
except subprocess.CalledProcessError as err: | ||
raise errors.SecretsCommandError( | ||
host_directive, err.stderr.decode() | ||
) from err | ||
command_cache[command] = output | ||
|
||
return text.replace(host_directive, output) | ||
return None | ||
|
||
|
||
def _run_command(command: str) -> str: | ||
bash_command = f"set -euo pipefail; {command}" | ||
return ( | ||
subprocess.check_output(["bash", "-c", bash_command], stderr=subprocess.PIPE) | ||
.decode("utf-8") | ||
.strip() | ||
) | ||
|
||
|
||
# pyright: reportUnknownVariableType=false,reportUnknownArgumentType=false | ||
def _check_for_secrets(data: Any) -> None: # noqa: ANN401 (using Any on purpose) | ||
if isinstance(data, dict): | ||
for key, value in data.items(): | ||
_check_str(value, field_name=key) | ||
if isinstance(value, list): | ||
for item in value: | ||
_check_str(item, field_name=key) | ||
_check_for_secrets(item) | ||
elif isinstance(value, dict): | ||
_check_for_secrets(value) | ||
|
||
|
||
def _check_str( | ||
value: Any, field_name: str # noqa: ANN401 (using Any on purpose) | ||
) -> None: | ||
if isinstance(value, str) and (match := SECRET_REGEX.search(value)): | ||
raise errors.SecretsFieldError(host_secret=match.group(), field_name=field_name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
# This file is part of craft_application. | ||
# | ||
# Copyright 2023 Canonical Ltd. | ||
# | ||
# This program is free software: you can redistribute it and/or modify it | ||
# under the terms of the GNU Lesser General Public License version 3, as | ||
# published by the Free Software Foundation. | ||
# | ||
# This program is distributed in the hope that it will be useful, but WITHOUT | ||
# ANY WARRANTY; without even the implied warranties of MERCHANTABILITY, | ||
# SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | ||
# Lesser General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU Lesser General Public License along | ||
# with this program. If not, see <http://www.gnu.org/licenses/>. | ||
"""Tests for build secrets.""" | ||
|
||
import pytest | ||
from craft_application import errors, secrets | ||
|
||
|
||
def test_secrets_parts(monkeypatch): | ||
p1_data = { | ||
"source": "the source secret is $(HOST_SECRET:echo ${SECRET_1})", | ||
"build-environment": [ | ||
{"VAR1": "the env secret is $(HOST_SECRET:echo ${SECRET_2})"}, | ||
{"VAR2": "some value"}, | ||
], | ||
} | ||
|
||
monkeypatch.setenv("SECRET_1", "source-secret") | ||
monkeypatch.setenv("SECRET_2", "env-secret") | ||
|
||
yaml_data = {"parts": {"p1": p1_data}} | ||
secret_values = secrets.render_secrets(yaml_data) | ||
|
||
assert secret_values == {"source-secret", "env-secret"} | ||
|
||
assert p1_data["source"] == "the source secret is source-secret" | ||
assert p1_data["build-environment"][0]["VAR1"] == "the env secret is env-secret" | ||
|
||
# Check that the rest of the build-environment is preserved | ||
assert p1_data["build-environment"][1]["VAR2"] == "some value" | ||
|
||
|
||
def test_secrets_command_error(): | ||
yaml_data = {"parts": {"p1": {"source": "$(HOST_SECRET:echo ${I_DONT_EXIST})"}}} | ||
|
||
with pytest.raises(errors.SecretsCommandError) as exc: | ||
secrets.render_secrets(yaml_data) | ||
|
||
expected_message = ( | ||
'Error when processing secret "$(HOST_SECRET:echo ${I_DONT_EXIST})"' | ||
) | ||
expected_details = "I_DONT_EXIST: unbound variable" | ||
|
||
error = exc.value | ||
assert str(error) == expected_message | ||
assert error.details is not None | ||
assert expected_details in error.details | ||
|
||
|
||
def test_secrets_cache(mocker, monkeypatch): | ||
monkeypatch.setenv("SECRET_1", "secret") | ||
p1_data = { | ||
"source": "the source secret is $(HOST_SECRET:echo ${SECRET_1})", | ||
"build-environment": [ | ||
{"VAR1": "the env secret is $(HOST_SECRET:echo ${SECRET_1})"} | ||
], | ||
} | ||
yaml_data = {"parts": {"p1": p1_data}} | ||
|
||
spied_run = mocker.spy(secrets, "_run_command") | ||
secrets.render_secrets(yaml_data) | ||
|
||
# Even though the HOST_SECRET is used twice, only a single bash call is done because | ||
# the command is the same. | ||
spied_run.assert_called_once_with("echo ${SECRET_1}") | ||
|
||
|
||
_SECRET = "$(HOST_SECRET:echo ${GIT_VERSION})" # noqa: S105 (this is not a password) | ||
|
||
|
||
@pytest.mark.parametrize( | ||
("yaml_data", "field_name"), | ||
[ | ||
# A basic string field | ||
({"version": f"v{_SECRET}"}, "version"), | ||
# A list item | ||
({"stage-packages": ["first", "second", _SECRET]}, "stage-packages"), | ||
# A dict value | ||
({"parts": {"p1": {"source-version": f"v{_SECRET}"}}}, "source-version"), | ||
], | ||
) | ||
def test_secrets_bad_field(monkeypatch, yaml_data, field_name): | ||
monkeypatch.setenv("GIT_VERSION", "1.0") | ||
|
||
with pytest.raises(errors.SecretsFieldError) as exc: | ||
secrets.render_secrets(yaml_data) | ||
|
||
expected_error = f'Build secret "{_SECRET}" is not allowed on field "{field_name}"' | ||
err = exc.value | ||
assert str(err) == expected_error |