Skip to content

Commit

Permalink
tests: add test to validate seccomp filters
Browse files Browse the repository at this point in the history
Add a test to validate that a seccomp filter works as defined in the
JSON description.

To do this we use a simple C program that just loads a given seccomp
filter and calls a syscall also given in the arguments.

Signed-off-by: Pablo Barbáchano <pablob@amazon.com>
  • Loading branch information
pb8o committed Dec 11, 2024
1 parent 5104188 commit 9b15435
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 0 deletions.
77 changes: 77 additions & 0 deletions tests/host_tools/test_syscalls.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// This is used by `test_seccomp_validate.py`

#include <linux/types.h>
#include <linux/filter.h>
#include <linux/seccomp.h>
#include <sys/prctl.h>
#include <sys/stat.h>

#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <fcntl.h>


void install_bpf_filter(char *bpf_file) {
int fd = open(bpf_file, O_RDONLY);
if (fd == -1) {
perror("open");
exit(EXIT_FAILURE);
}
struct stat sb;
if (fstat(fd, &sb) == -1) {
perror("stat");
exit(EXIT_FAILURE);
}
size_t size = sb.st_size;
size_t insn_len = size / sizeof(struct sock_filter);
struct sock_filter *filterbuf = (struct sock_filter*)malloc(size);
if (read(fd, filterbuf, size) == -1) {
perror("read");
exit(EXIT_FAILURE);
}

/* Install seccomp filter */
struct sock_fprog prog = {
.len = (unsigned short)(insn_len),
.filter = filterbuf,
};
if (prctl(PR_SET_NO_NEW_PRIVS, 1, 0, 0, 0)) {
perror("prctl(NO_NEW_PRIVS)");
exit(EXIT_FAILURE);
}
if (prctl(PR_SET_SECCOMP, SECCOMP_MODE_FILTER, &prog)) {
perror("prctl(SECCOMP)");
exit(EXIT_FAILURE);
}
}


int main(int argc, char **argv) {
/* parse arguments */
if (argc < 3) {
fprintf(stderr, "Usage: %s BPF_FILE ARG0..\n", argv[0]);
exit(EXIT_FAILURE);
}
char *bpf_file = argv[1];
long syscall_id = atoi(argv[2]);
long arg0, arg1, arg2, arg3;
arg0 = arg1 = arg2 = arg3 = 0;
if (argc > 3) arg0 = atoi(argv[3]);
if (argc > 4) arg1 = atoi(argv[4]);
if (argc > 5) arg2 = atoi(argv[5]);
if (argc > 6) arg3 = atoi(argv[6]);

/* read seccomp filter from file */
if (strcmp(bpf_file, "/dev/null") != 0) {
install_bpf_filter(bpf_file);
}

long res = syscall(syscall_id, arg0, arg1, arg2, arg3);
printf("%ld\n", res);
return EXIT_SUCCESS;
}
134 changes: 134 additions & 0 deletions tests/integration_tests/security/test_seccomp_validate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

"""Test that validates that seccompiler filters work as expected"""

import json
import platform
import struct
from pathlib import Path

import pytest
import seccomp

from framework import utils
from host_tools import cargo_build

ARCH = platform.machine()


@pytest.fixture(scope="session")
def bin_test_syscall(test_fc_session_root_path):
"""Build a simple vsock client/server application."""
test_syscall_bin = Path(test_fc_session_root_path) / "test_syscall"
cargo_build.gcc_compile("host_tools/test_syscalls.c", test_syscall_bin)
assert test_syscall_bin.exists()
yield test_syscall_bin


class BpfMapReader:
"""
Simple reader for the files that seccompiler-bin produces
The files are serialized with bincode[1] in format that is easy to parse.
sock_filter = <ushort uchar uchar uint>
BpfProgram = Vec<sock_filter>
BpfMap = BTreeMap(str, BpfProgram)
str = Vec<uchar>
[1] https://github.com/bincode-org/bincode/blob/trunk/docs/spec.md
"""

INSN_FMT = "<HBBI"
INSN_SIZEOF = struct.calcsize(INSN_FMT)

def __init__(self, buf):
self.buf = buf
self.offset = 0

@classmethod
def from_file(cls, file):
"""Initialize a buffer from a file"""
return cls(Path(file).read_bytes())

def read_format(self, fmt):
"""Read a struct format string from the buffer"""
val = struct.unpack_from(fmt, self.buf, offset=self.offset)
self.offset += struct.calcsize(fmt)
if len(val) == 1:
return val[0]
return val

def is_eof(self):
"""Are we at the end of the buffer?"""
return self.offset == len(self.buf)

def lookahead(self, size):
"""Look ahead `size` bytes"""
return self.buf[self.offset : self.offset + size]

def split(self):
"""Return separate filters"""
threads = {}
# how many entries in the map
map_len = self.read_format("<Q")
for _ in range(map_len):
# read key
key_str_len = self.read_format("<Q")
key_str = self.read_format(f"{key_str_len}s")
# read value: vec of instructions
insn_len = self.read_format("<Q")
data = self.lookahead(insn_len * self.INSN_SIZEOF)
threads[key_str.decode("ascii")] = data
self.offset += len(data)

assert self.is_eof()
return threads


def test_validate_filter(seccompiler, bin_test_syscall, monkeypatch, tmp_path):
"""Assert that the seccomp filter matches the JSON description."""

fc_filter_path = Path(f"../resources/seccomp/{ARCH}-unknown-linux-musl.json")
fc_filter = json.loads(fc_filter_path.read_text(encoding="ascii"))

# cd to a tmp dir because we are going to generate lots of coredumps
monkeypatch.chdir(tmp_path)
bpf_path = seccompiler.compile(fc_filter)
filters = BpfMapReader.from_file(bpf_path).split()
arch = seccomp.Arch.X86_64 if ARCH == "x86_64" else seccomp.Arch.AARCH64
for thread, filter_data in fc_filter.items():
filter_path = Path(f"{thread}.bpf")
filter_path.write_bytes(filters[thread])
# for each rule, run the helper program and execute a syscall
for rule in filter_data["filter"]:
print(filter_path, rule)
syscall = rule["syscall"]
# this one cannot be called directly
if syscall in ["rt_sigreturn"]:
continue
syscall_id = seccomp.resolve_syscall(arch, syscall)
cmd = f"{bin_test_syscall} {filter_path} {syscall_id}"
if "args" not in rule:
# syscall should be allowed with any arguments and exit 0
assert utils.run_cmd(cmd).returncode == 0
else:
allowed_args = [0] * 4
# if we call it with allowed args, it should exit 0
for arg in rule["args"]:
allowed_args[arg["index"]] = arg["val"]
allowed_str = " ".join(str(x) for x in allowed_args)
assert utils.run_cmd(f"{cmd} {allowed_str}").returncode == 0
# for each allowed arg try a different number
for arg in rule["args"]:
# We just add 1000000 to the allowed arg and assume it is
# not something we allow in another rule. While not perfect
# it works in practice.
bad_args = allowed_args.copy()
bad_args[arg["index"]] = str(arg["val"] + 1_000_000)
unallowed_str = " ".join(str(x) for x in bad_args)
outcome = utils.run_cmd(f"{cmd} {unallowed_str}")
# if we call it with unallowed args, it should exit 159
# 159 = 128 (abnormal termination) + 31 (SIGSYS)
assert outcome.returncode == 159

0 comments on commit 9b15435

Please sign in to comment.