Skip to content

Commit

Permalink
Add support for MIME block list instead of allow list (#7)
Browse files Browse the repository at this point in the history
I also document and test the (existing) behaviour with generic text and
binary files.

Closes element-hq/customer-success#197
  • Loading branch information
reivilibre committed Nov 19, 2024
1 parent 912ebff commit af7f726
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 3 deletions.
11 changes: 11 additions & 0 deletions config.sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,20 @@ scan:

# List of allowed MIME types. If a file has a MIME type that's not in this list, its
# scan is considered failed.
# Unrecognised binary files are considered to be `application/octet-stream`.
# Unrecognised text files are considered to be `text/plain`.
# Optional, defaults to allowing all MIME types.
allowed_mimetypes: ["image/jpeg"]

# List of blocked MIME types.
# If specified, `allowed_mimetypes` must not be specified as well.
# If specified, a file whose MIME type is on this list will produce a scan that is
# considered failed.
# Unrecognised binary files are considered to be `application/octet-stream`.
# Unrecognised text files are considered to be `text/plain`.
# Optional.
# blocked_mimetypes: ["image/jpeg"]

# Configuration of scan result caching.
#
# Results are stored in a cache to avoid having to download and scan a file twice. There
Expand Down
14 changes: 14 additions & 0 deletions src/matrix_content_scanner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def _parse_size(size: Optional[Union[str, float]]) -> Optional[float]:
"temp_directory": {"type": "string"},
"removal_command": {"type": "string"},
"allowed_mimetypes": {"type": "array", "items": {"type": "string"}},
"blocked_mimetypes": {"type": "array", "items": {"type": "string"}},
},
},
"download": {
Expand Down Expand Up @@ -131,6 +132,7 @@ class ScanConfig:
temp_directory: str
removal_command: str = "rm"
allowed_mimetypes: Optional[List[str]] = None
blocked_mimetypes: Optional[List[str]] = None


@attr.s(auto_attribs=True, frozen=True, slots=True)
Expand Down Expand Up @@ -175,3 +177,15 @@ def __init__(self, config_dict: Dict[str, Any]):
self.crypto = CryptoConfig(**(config_dict.get("crypto") or {}))
self.download = DownloadConfig(**(config_dict.get("download") or {}))
self.result_cache = ResultCacheConfig(**(config_dict.get("result_cache") or {}))

# Don't allow both allowlist and blocklist for MIME types, since we do not document
# the semantics for that and it is in any case pointless.
# This could have been expressed in JSONSchema but I suspect the error message would be poor
# in that case.
if (
self.scan.allowed_mimetypes is not None
and self.scan.blocked_mimetypes is not None
):
raise ConfigError(
"Both `scan.allowed_mimetypes` and `scan.blocked_mimetypes` are specified, which is not allowed!"
)
23 changes: 22 additions & 1 deletion src/matrix_content_scanner/scanner/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,18 @@ def __init__(self, mcs: "MatrixContentScanner"):

self._max_size_to_cache = mcs.config.result_cache.max_file_size

# List of MIME types we should allow. If None, we don't fail files based on their
# List of MIME types we should allow.
# If None, we fall back to `_blocked_mimetypes`.
# If that's also None, we don't fail files based on their
# MIME types (besides comparing it with the Content-Type header from the server
# for unencrypted files).
self._allowed_mimetypes = mcs.config.scan.allowed_mimetypes

# List of MIME types we should block.
# Must not be specified at the same time as `_allowed_mimetypes`.
# See the comment for `_allowed_mimetypes` for the semantics.
self._blocked_mimetypes = mcs.config.scan.blocked_mimetypes

# Cache of futures for files that are currently scanning and downloading, so that
# concurrent requests don't cause a file to be downloaded and scanned twice.
self._current_scans: Dict[str, Future[MediaDescription]] = {}
Expand Down Expand Up @@ -505,3 +512,17 @@ def _check_mimetype(self, media_content: bytes) -> None:
raise FileMimeTypeForbiddenError(
f"File type: {detected_mimetype} not allowed"
)

# If there's a block list for MIME types, check that the MIME type detected for
# this file is NOT in it.
if (
self._blocked_mimetypes is not None
and detected_mimetype in self._blocked_mimetypes
):
logger.error(
"MIME type for file is forbidden: %s",
detected_mimetype,
)
raise FileMimeTypeForbiddenError(
f"File type: {detected_mimetype} not allowed"
)
66 changes: 64 additions & 2 deletions tests/scanner/test_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
from tests.testutils import (
ENCRYPTED_FILE_METADATA,
MEDIA_PATH,
SMALL_BINARY_FILE,
SMALL_PNG,
SMALL_PNG_ENCRYPTED,
SMALL_TEXT_FILE,
get_content_scanner,
to_thumbnail_params,
)
Expand Down Expand Up @@ -219,7 +221,7 @@ async def test_different_encryption_key(self) -> None:
# But it also causes it to be downloaded again because its metadata have changed.
self.assertEqual(self.downloader_mock.call_count, 2)

async def test_mimetype(self) -> None:
async def test_allowlist_mimetype(self) -> None:
"""Tests that, if there's an allow list for MIME types and the file's MIME type
isn't in it, the file's scan fails.
"""
Expand All @@ -230,7 +232,7 @@ async def test_mimetype(self) -> None:
with self.assertRaises(FileMimeTypeForbiddenError):
await self.scanner.scan_file(MEDIA_PATH)

async def test_mimetype_encrypted(self) -> None:
async def test_allowlist_mimetype_encrypted(self) -> None:
"""Tests that the file's MIME type is correctly detected and compared with the
allow list (if set), even if it's encrypted.
"""
Expand All @@ -243,6 +245,66 @@ async def test_mimetype_encrypted(self) -> None:
with self.assertRaises(FileMimeTypeForbiddenError):
await self.scanner.scan_file(MEDIA_PATH, ENCRYPTED_FILE_METADATA)

async def test_blocklist_mimetype(self) -> None:
"""Tests that, if there's an allow list for MIME types and the file's MIME type
isn't in it, the file's scan fails.
"""
# Set a block list that blocks PNG images.
self.scanner._blocked_mimetypes = ["image/png"]

# Check that the scan fails since the file is a PNG.
with self.assertRaises(FileMimeTypeForbiddenError):
await self.scanner.scan_file(MEDIA_PATH)

async def test_blocklist_mimetype_encrypted(self) -> None:
"""Tests that the file's MIME type is correctly detected and compared with the
allow list (if set), even if it's encrypted.
"""
self._setup_encrypted()

# Set a block list that blocks PNG images.
self.scanner._blocked_mimetypes = ["image/png"]

# Check that the scan fails since the file is a PNG.
with self.assertRaises(FileMimeTypeForbiddenError):
await self.scanner.scan_file(MEDIA_PATH, ENCRYPTED_FILE_METADATA)

async def test_blocklist_mimetype_fallback_binary_file(self) -> None:
"""Tests that unrecognised binary files' MIME type is assumed to be
`application/octet-stream` and that they can be blocked in this way.
"""

self.downloader_res = MediaDescription(
# This is the *claimed* content-type by the uploader
content_type="application/vnd.io.element.generic_binary_file",
content=SMALL_BINARY_FILE,
response_headers=CIMultiDictProxy(CIMultiDict()),
)

# Set a block list that blocks uncategorised binary files.
self.scanner._blocked_mimetypes = ["application/octet-stream"]

with self.assertRaises(FileMimeTypeForbiddenError):
await self.scanner.scan_file(MEDIA_PATH)

async def test_blocklist_mimetype_fallback_text_file(self) -> None:
"""Tests that unrecognised text files' MIME type is assumed to be
`text/plain` and that they can be blocked in this way.
"""

self.downloader_res = MediaDescription(
# This is the *claimed* content-type by the uploader
content_type="application/vnd.io.element.generic_file",
content=SMALL_TEXT_FILE,
response_headers=CIMultiDictProxy(CIMultiDict()),
)

# Set a block list that blocks uncategorised text files.
self.scanner._blocked_mimetypes = ["text/plain"]

with self.assertRaises(FileMimeTypeForbiddenError):
await self.scanner.scan_file(MEDIA_PATH)

async def test_dont_cache_exit_codes(self) -> None:
"""Tests that if the configuration specifies exit codes to ignore when running
the scanning script, we don't cache them.
Expand Down
6 changes: 6 additions & 0 deletions tests/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
b"0a2db40000000049454e44ae426082"
)

# A small binary file without any specific format.
SMALL_BINARY_FILE = unhexlify(b"010203")

# A small text file without any specific format.
SMALL_TEXT_FILE = b"Hello world\nThis is a tiny text file"

# A small, encrypted PNG.
SMALL_PNG_ENCRYPTED = unhexlify(
b"9fd28dd7a1d845a04948f13af104e39402c888f7b601bce313ad"
Expand Down

0 comments on commit af7f726

Please sign in to comment.