From ccb1a10ae407817612deb2c162e206ca6dc058cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Barb=C3=A1chano?= Date: Tue, 5 Nov 2024 13:25:40 +0100 Subject: [PATCH] tests: cleanup test_seccomp MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - convert inline JSON to dicts - use pytest temporary files instead of tempfile - create a seccompiler fixture to make running it easy Signed-off-by: Pablo Barbáchano --- tests/integration_tests/security/conftest.py | 29 +++ .../security/test_custom_seccomp.py | 198 +++++------------- .../security/test_seccomp.py | 196 ++++++----------- 3 files changed, 136 insertions(+), 287 deletions(-) create mode 100644 tests/integration_tests/security/conftest.py diff --git a/tests/integration_tests/security/conftest.py b/tests/integration_tests/security/conftest.py new file mode 100644 index 00000000000..1cce3067b2f --- /dev/null +++ b/tests/integration_tests/security/conftest.py @@ -0,0 +1,29 @@ +# Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Fixtures for security tests""" + +import json +from pathlib import Path + +import pytest + +from host_tools.cargo_build import run_seccompiler_bin + + +@pytest.fixture() +def seccompiler(tmp_path): + "A seccompiler helper fixture" + + class Seccompiler: + "A seccompiler helper class" + + def compile(self, data: dict, basic=False) -> Path: + "Use seccompiler-bin to compile a filter from a dict" + inp = tmp_path / "input.json" + inp.write_text(json.dumps(data)) + bpf = tmp_path / "output.bpfmap" + run_seccompiler_bin(bpf_path=bpf, json_path=inp, basic=basic) + return bpf + + return Seccompiler() diff --git a/tests/integration_tests/security/test_custom_seccomp.py b/tests/integration_tests/security/test_custom_seccomp.py index 5fffb83f6cc..05f9b9aa96e 100644 --- a/tests/integration_tests/security/test_custom_seccomp.py +++ b/tests/integration_tests/security/test_custom_seccomp.py @@ -2,179 +2,76 @@ # SPDX-License-Identifier: Apache-2.0 """Tests that the --seccomp-filter parameter works as expected.""" -import os import platform -import tempfile import time +from pathlib import Path -import pytest import requests from framework import utils -from host_tools.cargo_build import run_seccompiler_bin -def _custom_filter_setup(test_microvm, json_filter): - json_temp = tempfile.NamedTemporaryFile(delete=False) - json_temp.write(json_filter) - json_temp.flush() +def install_filter(microvm, bpf_path): + """Install seccomp filter in microvm.""" + microvm.create_jailed_resource(bpf_path) + microvm.jailer.extra_args.update({"seccomp-filter": bpf_path.name}) - bpf_path = os.path.join(test_microvm.path, "bpf.out") - run_seccompiler_bin(bpf_path=bpf_path, json_path=json_temp.name) +def test_allow_all(uvm_plain, seccompiler): + """Test --seccomp-filter, allowing all syscalls.""" + seccomp_filter = { + thread: {"default_action": "allow", "filter_action": "trap", "filter": []} + for thread in ["vmm", "api", "vcpu"] + } - os.unlink(json_temp.name) - test_microvm.create_jailed_resource(bpf_path) - test_microvm.jailer.extra_args.update({"seccomp-filter": "bpf.out"}) - - -def _config_file_setup(test_microvm, vm_config_file): - test_microvm.create_jailed_resource(test_microvm.kernel_file) - test_microvm.create_jailed_resource(test_microvm.rootfs_file) - - vm_config_path = os.path.join(test_microvm.path, os.path.basename(vm_config_file)) - with open(vm_config_file, encoding="utf-8") as f1: - with open(vm_config_path, "w", encoding="utf-8") as f2: - for line in f1: - f2.write(line) - test_microvm.create_jailed_resource(vm_config_path) - test_microvm.jailer.extra_args = {"config-file": os.path.basename(vm_config_file)} - - test_microvm.jailer.extra_args.update({"no-api": None}) - - -def test_allow_all(uvm_plain): - """ - Test --seccomp-filter, allowing all syscalls. - """ + bpf_path = seccompiler.compile(seccomp_filter) test_microvm = uvm_plain - - _custom_filter_setup( - test_microvm, - """{ - "Vmm": { - "default_action": "allow", - "filter_action": "trap", - "filter": [] - }, - "Api": { - "default_action": "allow", - "filter_action": "trap", - "filter": [] - }, - "Vcpu": { - "default_action": "allow", - "filter_action": "trap", - "filter": [] - } - }""".encode( - "utf-8" - ), - ) - + install_filter(test_microvm, bpf_path) test_microvm.spawn() - test_microvm.basic_config() - test_microvm.start() - utils.assert_seccomp_level(test_microvm.firecracker_pid, "2") -def test_working_filter(uvm_plain): - """ - Test --seccomp-filter, rejecting some dangerous syscalls. - """ - test_microvm = uvm_plain +def test_working_filter(uvm_plain, seccompiler): + """Test --seccomp-filter, rejecting some dangerous syscalls.""" - _custom_filter_setup( - test_microvm, - """{ - "Vmm": { - "default_action": "allow", - "filter_action": "kill_process", - "filter": [ - { - "syscall": "clone" - }, - { - "syscall": "execve" - } - ] - }, - "Api": { - "default_action": "allow", - "filter_action": "kill_process", - "filter": [ - { - "syscall": "clone" - }, - { - "syscall": "execve" - } - ] - }, - "Vcpu": { + seccomp_filter = { + thread: { "default_action": "allow", "filter_action": "kill_process", - "filter": [ - { - "syscall": "clone" - }, - { - "syscall": "execve", - "comment": "sample comment" - } - ] + "filter": [{"syscall": "clone"}, {"syscall": "execve"}], } - }""".encode( - "utf-8" - ), - ) + for thread in ["vmm", "api", "vcpu"] + } + bpf_path = seccompiler.compile(seccomp_filter) + test_microvm = uvm_plain + install_filter(test_microvm, bpf_path) test_microvm.spawn() - test_microvm.basic_config() - test_microvm.start() # level should be 2, with no additional errors utils.assert_seccomp_level(test_microvm.firecracker_pid, "2") -def test_failing_filter(uvm_plain): - """ - Test --seccomp-filter, denying some needed syscalls. - """ - test_microvm = uvm_plain +def test_failing_filter(uvm_plain, seccompiler): + """Test --seccomp-filter, denying some needed syscalls.""" - _custom_filter_setup( - test_microvm, - """{ - "Vmm": { + seccomp_filter = { + "vmm": {"default_action": "allow", "filter_action": "trap", "filter": []}, + "api": {"default_action": "allow", "filter_action": "trap", "filter": []}, + "vcpu": { "default_action": "allow", "filter_action": "trap", - "filter": [] + "filter": [{"syscall": "ioctl"}], }, - "Api": { - "default_action": "allow", - "filter_action": "trap", - "filter": [] - }, - "Vcpu": { - "default_action": "allow", - "filter_action": "trap", - "filter": [ - { - "syscall": "ioctl" - } - ] - } - }""".encode( - "utf-8" - ), - ) + } + bpf_path = seccompiler.compile(seccomp_filter) + test_microvm = uvm_plain + install_filter(test_microvm, bpf_path) test_microvm.spawn() test_microvm.basic_config(vcpu_count=1) @@ -190,8 +87,7 @@ def test_failing_filter(uvm_plain): # Check the logger output ioctl_num = 16 if platform.machine() == "x86_64" else 29 test_microvm.check_log_message( - "Shutting down VM after intercepting a bad" - " syscall ({})".format(str(ioctl_num)) + f"Shutting down VM after intercepting a bad syscall ({ioctl_num})" ) # Check the metrics @@ -208,28 +104,28 @@ def test_failing_filter(uvm_plain): test_microvm.mark_killed() -@pytest.mark.parametrize("vm_config_file", ["framework/vm_config.json"]) -def test_invalid_bpf(uvm_plain, vm_config_file): - """ - Test that FC does not start, given an invalid binary filter. - """ +def test_invalid_bpf(uvm_plain): + """Test that FC does not start, given an invalid binary filter.""" test_microvm = uvm_plain # Configure VM from JSON. Otherwise, the test will error because # the process will be killed before configuring the API socket. - _config_file_setup(uvm_plain, vm_config_file) + test_microvm.create_jailed_resource(test_microvm.kernel_file) + test_microvm.create_jailed_resource(test_microvm.rootfs_file) - bpf_path = os.path.join(test_microvm.path, "bpf.out") - file = open(bpf_path, "w", encoding="utf-8") - file.write("Invalid BPF!") - file.close() + vm_config_file = Path("framework/vm_config.json") + test_microvm.create_jailed_resource(vm_config_file) + test_microvm.jailer.extra_args = {"config-file": vm_config_file.name} + test_microvm.jailer.extra_args.update({"no-api": None}) + bpf_path = Path(test_microvm.path) / "bpf.out" + bpf_path.write_bytes(b"Invalid BPF!") test_microvm.create_jailed_resource(bpf_path) - test_microvm.jailer.extra_args.update({"seccomp-filter": "bpf.out"}) + test_microvm.jailer.extra_args.update({"seccomp-filter": bpf_path.name}) test_microvm.spawn() - # give time for the process to get killed time.sleep(1) + assert "Seccomp error: Filter deserialization failed" in test_microvm.log_data test_microvm.mark_killed() diff --git a/tests/integration_tests/security/test_seccomp.py b/tests/integration_tests/security/test_seccomp.py index 0fa9aefafcb..8ae87271daf 100644 --- a/tests/integration_tests/security/test_seccomp.py +++ b/tests/integration_tests/security/test_seccomp.py @@ -2,91 +2,47 @@ # SPDX-License-Identifier: Apache-2.0 """Tests that the seccomp filters don't let denied syscalls through.""" -import json as json_lib +import json import os import platform -import tempfile +from pathlib import Path from framework import utils -from host_tools.cargo_build import run_seccompiler_bin + +ARCH = platform.machine() def _get_basic_syscall_list(): """Return the JSON list of syscalls that the demo jailer needs.""" - if platform.machine() == "x86_64": - sys_list = [ - "rt_sigprocmask", - "rt_sigaction", - "execve", - "mmap", - "mprotect", + sys_list = [ + "rt_sigprocmask", + "rt_sigaction", + "execve", + "mmap", + "mprotect", + "set_tid_address", + "read", + "close", + "brk", + "sched_getaffinity", + "sigaltstack", + "munmap", + "exit_group", + ] + if ARCH == "x86_64": + sys_list += [ "arch_prctl", - "set_tid_address", "readlink", "open", - "read", - "close", - "brk", - "sched_getaffinity", - "sigaltstack", - "munmap", - "exit_group", "poll", ] - else: - # platform.machine() == "aarch64" - sys_list = [ - "rt_sigprocmask", - "rt_sigaction", - "execve", - "mmap", - "mprotect", - "set_tid_address", - "read", - "close", - "brk", - "sched_getaffinity", - "sigaltstack", - "munmap", - "exit_group", - "ppoll", - ] - - json = "" - for syscall in sys_list[0:-1]: - json += """ - {{ - "syscall": \"{}\" - }}, - """.format( - syscall - ) - - json += """ - {{ - "syscall": \"{}\" - }} - """.format( - sys_list[-1] - ) - - return json - - -def _run_seccompiler_bin(json_data, basic=False): - json_temp = tempfile.NamedTemporaryFile(delete=False) - json_temp.write(json_data.encode("utf-8")) - json_temp.flush() - - bpf_temp = tempfile.NamedTemporaryFile(delete=False) + elif ARCH == "aarch64": + sys_list += ["ppoll"] - run_seccompiler_bin(bpf_path=bpf_temp.name, json_path=json_temp.name, basic=basic) + return sys_list - os.unlink(json_temp.name) - return bpf_temp.name - -def test_seccomp_ls(bin_seccomp_paths): +def test_seccomp_ls(bin_seccomp_paths, seccompiler): """ Assert that the seccomp filter denies an unallowed syscall. """ @@ -99,32 +55,26 @@ def test_seccomp_ls(bin_seccomp_paths): demo_jailer = bin_seccomp_paths["demo_jailer"] assert os.path.exists(demo_jailer) - json_filter = """{{ - "main": {{ + json_filter = { + "main": { "default_action": "trap", "filter_action": "allow", - "filter": [ - {} - ] - }} - }}""".format( - _get_basic_syscall_list() - ) + "filter": [{"syscall": x} for x in _get_basic_syscall_list()], + } + } # Run seccompiler-bin. - bpf_path = _run_seccompiler_bin(json_filter) + bpf_path = seccompiler.compile(json_filter) # Run the mini jailer. outcome = utils.run_cmd([demo_jailer, ls_command_path, bpf_path], shell=False) - os.unlink(bpf_path) - # The seccomp filters should send SIGSYS (31) to the binary. `ls` doesn't # handle it, so it will exit with error. assert outcome.returncode != 0 -def test_advanced_seccomp(bin_seccomp_paths): +def test_advanced_seccomp(bin_seccomp_paths, seccompiler): """ Test seccompiler-bin with `demo_jailer`. @@ -143,39 +93,37 @@ def test_advanced_seccomp(bin_seccomp_paths): assert os.path.exists(demo_harmless) assert os.path.exists(demo_malicious) - json_filter = """{{ - "main": {{ + json_filter = { + "main": { "default_action": "trap", "filter_action": "allow", "filter": [ - {}, - {{ + *[{"syscall": x} for x in _get_basic_syscall_list()], + { "syscall": "write", "args": [ - {{ + { "index": 0, "type": "dword", "op": "eq", "val": 1, - "comment": "stdout fd" - }}, - {{ + "comment": "stdout fd", + }, + { "index": 2, "type": "qword", "op": "eq", "val": 14, - "comment": "nr of bytes" - }} - ] - }} - ] - }} - }}""".format( - _get_basic_syscall_list() - ) + "comment": "nr of bytes", + }, + ], + }, + ], + } + } # Run seccompiler-bin. - bpf_path = _run_seccompiler_bin(json_filter) + bpf_path = seccompiler.compile(json_filter) # Run the mini jailer for harmless binary. outcome = utils.run_cmd([demo_jailer, demo_harmless, bpf_path], shell=False) @@ -189,10 +137,8 @@ def test_advanced_seccomp(bin_seccomp_paths): # The demo malicious binary should have received `SIGSYS`. assert outcome.returncode == -31 - os.unlink(bpf_path) - # Run seccompiler-bin with `--basic` flag. - bpf_path = _run_seccompiler_bin(json_filter, basic=True) + bpf_path = seccompiler.compile(json_filter, basic=True) # Run the mini jailer for malicious binary. outcome = utils.run_cmd([demo_jailer, demo_malicious, bpf_path], shell=False) @@ -201,28 +147,20 @@ def test_advanced_seccomp(bin_seccomp_paths): # disables all argument checks. assert outcome.returncode == 0 - os.unlink(bpf_path) - # Run the mini jailer with an empty allowlist. It should trap on any # syscall. - json_filter = """{ - "main": { - "default_action": "trap", - "filter_action": "allow", - "filter": [] - } - }""" + json_filter = { + "main": {"default_action": "trap", "filter_action": "allow", "filter": []} + } # Run seccompiler-bin. - bpf_path = _run_seccompiler_bin(json_filter) + bpf_path = seccompiler.compile(json_filter) outcome = utils.run_cmd([demo_jailer, demo_harmless, bpf_path], shell=False) # The demo binary should have received `SIGSYS`. assert outcome.returncode == -31 - os.unlink(bpf_path) - def test_no_seccomp(uvm_plain): """ @@ -231,11 +169,8 @@ def test_no_seccomp(uvm_plain): test_microvm = uvm_plain test_microvm.jailer.extra_args.update({"no-seccomp": None}) test_microvm.spawn() - test_microvm.basic_config() - test_microvm.start() - utils.assert_seccomp_level(test_microvm.firecracker_pid, "0") @@ -245,15 +180,12 @@ def test_default_seccomp_level(uvm_plain): """ test_microvm = uvm_plain test_microvm.spawn() - test_microvm.basic_config() - test_microvm.start() - utils.assert_seccomp_level(test_microvm.firecracker_pid, "2") -def test_seccomp_rust_panic(bin_seccomp_paths): +def test_seccomp_rust_panic(bin_seccomp_paths, seccompiler): """ Test seccompiler-bin with `demo_panic`. @@ -266,19 +198,15 @@ def test_seccomp_rust_panic(bin_seccomp_paths): demo_panic = bin_seccomp_paths["demo_panic"] assert os.path.exists(demo_panic) - fc_filters_path = "../resources/seccomp/{}-unknown-linux-musl.json".format( - platform.machine() - ) - with open(fc_filters_path, "r", encoding="utf-8") as fc_filters: - filter_threads = list(json_lib.loads(fc_filters.read())) + fc_filters = Path(f"../resources/seccomp/{ARCH}-unknown-linux-musl.json") + fc_filters_data = json.loads(fc_filters.read_text(encoding="ascii")) + filter_threads = list(fc_filters_data) - bpf_temp = tempfile.NamedTemporaryFile(delete=False) - run_seccompiler_bin(bpf_path=bpf_temp.name, json_path=fc_filters_path) - bpf_path = bpf_temp.name + bpf_path = seccompiler.compile(fc_filters_data) # Run the panic binary with all filters. for thread in filter_threads: - code, _, _ = utils.run_cmd([demo_panic, bpf_path, thread], shell=False) + code, _, _ = utils.run_cmd([demo_panic, str(bpf_path), thread], shell=False) # The demo panic binary should have terminated with SIGABRT # and not with a seccomp violation. # On a seccomp violation, the program exits with code -31 for @@ -286,8 +214,4 @@ def test_seccomp_rust_panic(bin_seccomp_paths): # is for SIGABRT. assert ( code == -6 - ), "Panic binary failed with exit code {} on {} " "filters.".format( - code, thread - ) - - os.unlink(bpf_path) + ), f"Panic binary failed with exit code {code} on {thread} filters."