Skip to content

Commit

Permalink
Handle tarfile.extractall() issue
Browse files Browse the repository at this point in the history
As reported by `bandit` B202 handler, the `tarfile.extractall()` method
is not safe with some file paths.

We previously ignored this handler, and decided to handle things in
the `e3.archive` module now. The  `tarfile.data_filter()` is now used
when it exists, and a fallback handling absolute paths and traversal
paths has been created.

Ref: #694
  • Loading branch information
grouigrokon committed Oct 23, 2024
1 parent e9800c8 commit 31ee075
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 11 deletions.
85 changes: 77 additions & 8 deletions src/e3/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import tarfile
import tempfile
import zipfile

from contextlib import closing
from pathlib import Path
from typing import TYPE_CHECKING, cast

import e3
Expand Down Expand Up @@ -44,7 +46,7 @@

if sys.platform == "win32":
# On Windows force the executable bit on all files. This ensures that when
# using cygwin unziped files get an executable bit set (the executable
# using cygwin unzipped files get an executable bit set (the executable
# does not exist in win32 but is simulated in Cygwin).

class E3ZipInfo(zipfile.ZipInfo):
Expand All @@ -66,6 +68,7 @@ def _extract_member(
path: str | PathLike[str] | None,
pwd: bytes | None,
) -> str:
# noinspection PyProtectedMember
result = super()._extract_member(member, path, pwd) # type: ignore

if sys.platform != "win32":
Expand Down Expand Up @@ -226,7 +229,7 @@ def unpack_archive(
ext = check_type(filename, force_extension=force_extension)

# If remove_root_dir is set then extract to a temp directory first.
# Otherwise extract directly to the final destination
# Otherwise, extract directly to the final destination
if remove_root_dir:
if tmp_dir_root is None:
tmp_dir_root = os.path.dirname(os.path.abspath(dest))
Expand Down Expand Up @@ -255,6 +258,62 @@ def unpack_archive(
) as fd:
check_selected = set(selected_files)

def filter_invalid_paths(
destination: str, members: list[tarfile.TarInfo]
) -> list[tarfile.TarInfo]:
"""Filter invalid archive members.
As per CWE-22 (and tracked by bandit's B202 report),
using TarFile.extractall() should at least filter
members on invalid paths (like absolute paths, or
traversal path - containing "..").
:param destination: The destination path of the
uncompressed archive content. This is used to make
sure no element may be uncompressed outside this
path.
:param members: The list of elements of the archive to
perform path filtering for.
:return: The list of elements from the archive which are
allowed to be uncompressed.
"""
filtered: list[tarfile.TarInfo] = []
if hasattr(tarfile, "data_filter"):
# Use the tarfile filter.
for member in members:
try:
# The call to data_filter() raises an
# exception on any problematic member. We
# may build the list of members from
# unfiltered.
tarfile.data_filter(member, destination)
filtered.append(member)
except Exception:
logger.error(
f"Ignoring invalid member {member.name}"
" from tarball"
)
else:
# At least remove some problematic paths.
for member in members:
member_path: Path = Path(member.name)
if member_path.is_absolute():
logger.error(
f"Ignoring absolute path {member.name}"
)
elif ".." in member_path.parts:
logger.error(
f"Ignoring traversal path {member.name}"
)
elif member_path.is_reserved():
logger.error(
f"Ignoring reserved path {member.name}"
)
else:
filtered.append(member)
return filtered

def is_match(name: str, files: Sequence[str]) -> bool:
"""check if name match any of the expression in files.
Expand All @@ -272,12 +331,12 @@ def is_match(name: str, files: Sequence[str]) -> bool:
dirs: list[str] = []

# IMPORTANT: don't use the method extract. Always use the
# extractall function. Indeed extractall will set file
# extractall function. Indeed, extractall will set file
# permissions only once all selected members are unpacked.
# Using extract can lead to permission denied for example
# if a read-only directory is created.
member_list: list[tarfile.TarInfo] = []
if selected_files:
member_list = []
for tinfo in fd:
if is_match(
tinfo.name, selected_files
Expand All @@ -293,10 +352,16 @@ def is_match(name: str, files: Sequence[str]) -> bool:
raise ArchiveError(
"unpack_archive", f"Cannot untar {filename} "
)

fd.extractall(path=tmp_dest, members=member_list)
else:
fd.extractall(path=tmp_dest)
member_list = fd.getmembers()

fd.extractall(
path=tmp_dest,
members=filter_invalid_paths(
destination=tmp_dest,
members=member_list,
),
)

except tarfile.TarError as e:
raise ArchiveError(
Expand All @@ -309,8 +374,12 @@ def is_match(name: str, files: Sequence[str]) -> bool:
with closing(
E3ZipFile(fileobj if fileobj is not None else filename, mode="r")
) as zip_fd:
# Note that the ZipFile.extractall() already takes care
# of file names with "." or "..". Still bandit thinks this
# is a call to TarFile.extractall(), adding a flag to let
# `bandit` ignore this.
zip_fd.extractall(
tmp_dest, selected_files if selected_files else None
tmp_dest, selected_files if selected_files else None # nosec
)
except zipfile.BadZipfile as e:
raise ArchiveError(
Expand Down
42 changes: 41 additions & 1 deletion tests/tests_e3/archive/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,47 @@
from unittest.mock import patch


@pytest.mark.parametrize("ext", (".tar.gz", ".tar.bz2", ".tar.xz", ".tar", ".zip"))
E3_ARCHIVE_EXTENSIONS: list[str] = [".tar.gz", ".tar.bz2", ".tar.xz", ".tar", ".zip"]


@pytest.mark.parametrize("ext", E3_ARCHIVE_EXTENSIONS)
def test_bandit_b202(ext) -> None:
archive_name: str = f"test_b202{ext}"

# Create a tarball with ".." everywhere
e3.fs.mkdir("archive")
e3.fs.mkdir("archive/dir1")
with open("archive/dir1/file1", "w") as f:
f.write("file1")
e3.fs.mkdir("archive/dir2/")
with open("archive/dir2/file2", "w") as f:
f.write("file2")
dest = os.getcwd()
os.chdir("archive/dir1")
e3.archive.create_archive(archive_name, "..", dest)
os.chdir(dest)

# Make sure e3.archive.unpack_archive() unpacks it with caring about the
# ".." issues.
os.chdir(dest)
e3.fs.mkdir("unpack/level1")
e3.archive.unpack_archive(
os.path.join(dest, archive_name), os.path.join(dest, "unpack/level1")
)
# As we packed with (for instance) "../dir1/file1", make sure that none of
# - unpack/dir1
# - unpack/dir2
# exist.

assert (
os.path.exists("unpack/dir1") is False
), "Directory unpack/dir1 should not exist"
assert (
os.path.exists("unpack/dir2") is False
), "Directory unpack/dir2 should not exist"


@pytest.mark.parametrize("ext", E3_ARCHIVE_EXTENSIONS)
def test_unpack(ext):
dir_to_pack = os.path.dirname(__file__)

Expand Down
4 changes: 2 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ commands =
# is needed by e3. There is also e3.env.tmp_dir that returns the TMPDIR
# environment variable. Don't check for that.
# Ignore B324 that is no longer similar to B303 since Python3.9.
bandit -r {toxinidir}/src -ll -ii -s B102,B108,B301,B506,B303,B324,B202
mypy {toxinidir}/src
bandit -r {toxinidir}/src -ll -ii -s B102,B108,B301,B506,B303,B324
mypy {toxinidir}/src

[testenv:docs]
changedir = docs
Expand Down

0 comments on commit 31ee075

Please sign in to comment.