Skip to content

Commit

Permalink
tests: cleanup test_seccomp
Browse files Browse the repository at this point in the history
- convert inline JSON to dicts
- use pytest temporary files instead of tempfile
- create a seccompiler fixture to make running it easy

Signed-off-by: Pablo Barbáchano <pablob@amazon.com>
  • Loading branch information
pb8o committed Dec 5, 2024
1 parent 35a1683 commit ccb1a10
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 287 deletions.
29 changes: 29 additions & 0 deletions tests/integration_tests/security/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

"""Fixtures for security tests"""

import json
from pathlib import Path

import pytest

from host_tools.cargo_build import run_seccompiler_bin


@pytest.fixture()
def seccompiler(tmp_path):
"A seccompiler helper fixture"

class Seccompiler:
"A seccompiler helper class"

def compile(self, data: dict, basic=False) -> Path:
"Use seccompiler-bin to compile a filter from a dict"
inp = tmp_path / "input.json"
inp.write_text(json.dumps(data))
bpf = tmp_path / "output.bpfmap"
run_seccompiler_bin(bpf_path=bpf, json_path=inp, basic=basic)
return bpf

return Seccompiler()
198 changes: 47 additions & 151 deletions tests/integration_tests/security/test_custom_seccomp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,179 +2,76 @@
# SPDX-License-Identifier: Apache-2.0
"""Tests that the --seccomp-filter parameter works as expected."""

import os
import platform
import tempfile
import time
from pathlib import Path

import pytest
import requests

from framework import utils
from host_tools.cargo_build import run_seccompiler_bin


def _custom_filter_setup(test_microvm, json_filter):
json_temp = tempfile.NamedTemporaryFile(delete=False)
json_temp.write(json_filter)
json_temp.flush()
def install_filter(microvm, bpf_path):
"""Install seccomp filter in microvm."""
microvm.create_jailed_resource(bpf_path)
microvm.jailer.extra_args.update({"seccomp-filter": bpf_path.name})

bpf_path = os.path.join(test_microvm.path, "bpf.out")

run_seccompiler_bin(bpf_path=bpf_path, json_path=json_temp.name)
def test_allow_all(uvm_plain, seccompiler):
"""Test --seccomp-filter, allowing all syscalls."""
seccomp_filter = {
thread: {"default_action": "allow", "filter_action": "trap", "filter": []}
for thread in ["vmm", "api", "vcpu"]
}

os.unlink(json_temp.name)
test_microvm.create_jailed_resource(bpf_path)
test_microvm.jailer.extra_args.update({"seccomp-filter": "bpf.out"})


def _config_file_setup(test_microvm, vm_config_file):
test_microvm.create_jailed_resource(test_microvm.kernel_file)
test_microvm.create_jailed_resource(test_microvm.rootfs_file)

vm_config_path = os.path.join(test_microvm.path, os.path.basename(vm_config_file))
with open(vm_config_file, encoding="utf-8") as f1:
with open(vm_config_path, "w", encoding="utf-8") as f2:
for line in f1:
f2.write(line)
test_microvm.create_jailed_resource(vm_config_path)
test_microvm.jailer.extra_args = {"config-file": os.path.basename(vm_config_file)}

test_microvm.jailer.extra_args.update({"no-api": None})


def test_allow_all(uvm_plain):
"""
Test --seccomp-filter, allowing all syscalls.
"""
bpf_path = seccompiler.compile(seccomp_filter)
test_microvm = uvm_plain

_custom_filter_setup(
test_microvm,
"""{
"Vmm": {
"default_action": "allow",
"filter_action": "trap",
"filter": []
},
"Api": {
"default_action": "allow",
"filter_action": "trap",
"filter": []
},
"Vcpu": {
"default_action": "allow",
"filter_action": "trap",
"filter": []
}
}""".encode(
"utf-8"
),
)

install_filter(test_microvm, bpf_path)
test_microvm.spawn()

test_microvm.basic_config()

test_microvm.start()

utils.assert_seccomp_level(test_microvm.firecracker_pid, "2")


def test_working_filter(uvm_plain):
"""
Test --seccomp-filter, rejecting some dangerous syscalls.
"""
test_microvm = uvm_plain
def test_working_filter(uvm_plain, seccompiler):
"""Test --seccomp-filter, rejecting some dangerous syscalls."""

_custom_filter_setup(
test_microvm,
"""{
"Vmm": {
"default_action": "allow",
"filter_action": "kill_process",
"filter": [
{
"syscall": "clone"
},
{
"syscall": "execve"
}
]
},
"Api": {
"default_action": "allow",
"filter_action": "kill_process",
"filter": [
{
"syscall": "clone"
},
{
"syscall": "execve"
}
]
},
"Vcpu": {
seccomp_filter = {
thread: {
"default_action": "allow",
"filter_action": "kill_process",
"filter": [
{
"syscall": "clone"
},
{
"syscall": "execve",
"comment": "sample comment"
}
]
"filter": [{"syscall": "clone"}, {"syscall": "execve"}],
}
}""".encode(
"utf-8"
),
)
for thread in ["vmm", "api", "vcpu"]
}

bpf_path = seccompiler.compile(seccomp_filter)
test_microvm = uvm_plain
install_filter(test_microvm, bpf_path)
test_microvm.spawn()

test_microvm.basic_config()

test_microvm.start()

# level should be 2, with no additional errors
utils.assert_seccomp_level(test_microvm.firecracker_pid, "2")


def test_failing_filter(uvm_plain):
"""
Test --seccomp-filter, denying some needed syscalls.
"""
test_microvm = uvm_plain
def test_failing_filter(uvm_plain, seccompiler):
"""Test --seccomp-filter, denying some needed syscalls."""

_custom_filter_setup(
test_microvm,
"""{
"Vmm": {
seccomp_filter = {
"vmm": {"default_action": "allow", "filter_action": "trap", "filter": []},
"api": {"default_action": "allow", "filter_action": "trap", "filter": []},
"vcpu": {
"default_action": "allow",
"filter_action": "trap",
"filter": []
"filter": [{"syscall": "ioctl"}],
},
"Api": {
"default_action": "allow",
"filter_action": "trap",
"filter": []
},
"Vcpu": {
"default_action": "allow",
"filter_action": "trap",
"filter": [
{
"syscall": "ioctl"
}
]
}
}""".encode(
"utf-8"
),
)
}

bpf_path = seccompiler.compile(seccomp_filter)
test_microvm = uvm_plain
install_filter(test_microvm, bpf_path)
test_microvm.spawn()
test_microvm.basic_config(vcpu_count=1)

Expand All @@ -190,8 +87,7 @@ def test_failing_filter(uvm_plain):
# Check the logger output
ioctl_num = 16 if platform.machine() == "x86_64" else 29
test_microvm.check_log_message(
"Shutting down VM after intercepting a bad"
" syscall ({})".format(str(ioctl_num))
f"Shutting down VM after intercepting a bad syscall ({ioctl_num})"
)

# Check the metrics
Expand All @@ -208,28 +104,28 @@ def test_failing_filter(uvm_plain):
test_microvm.mark_killed()


@pytest.mark.parametrize("vm_config_file", ["framework/vm_config.json"])
def test_invalid_bpf(uvm_plain, vm_config_file):
"""
Test that FC does not start, given an invalid binary filter.
"""
def test_invalid_bpf(uvm_plain):
"""Test that FC does not start, given an invalid binary filter."""
test_microvm = uvm_plain

# Configure VM from JSON. Otherwise, the test will error because
# the process will be killed before configuring the API socket.
_config_file_setup(uvm_plain, vm_config_file)
test_microvm.create_jailed_resource(test_microvm.kernel_file)
test_microvm.create_jailed_resource(test_microvm.rootfs_file)

bpf_path = os.path.join(test_microvm.path, "bpf.out")
file = open(bpf_path, "w", encoding="utf-8")
file.write("Invalid BPF!")
file.close()
vm_config_file = Path("framework/vm_config.json")
test_microvm.create_jailed_resource(vm_config_file)
test_microvm.jailer.extra_args = {"config-file": vm_config_file.name}
test_microvm.jailer.extra_args.update({"no-api": None})

bpf_path = Path(test_microvm.path) / "bpf.out"
bpf_path.write_bytes(b"Invalid BPF!")
test_microvm.create_jailed_resource(bpf_path)
test_microvm.jailer.extra_args.update({"seccomp-filter": "bpf.out"})
test_microvm.jailer.extra_args.update({"seccomp-filter": bpf_path.name})

test_microvm.spawn()

# give time for the process to get killed
time.sleep(1)
assert "Seccomp error: Filter deserialization failed" in test_microvm.log_data

test_microvm.mark_killed()
Loading

0 comments on commit ccb1a10

Please sign in to comment.