diff --git a/cve_bin_tool/parsers/__init__.py b/cve_bin_tool/parsers/__init__.py index c3ac1fc6ed..9c70e620a9 100644 --- a/cve_bin_tool/parsers/__init__.py +++ b/cve_bin_tool/parsers/__init__.py @@ -18,6 +18,7 @@ "php", "perl", "dart", + "debian", ] diff --git a/cve_bin_tool/parsers/deb.py b/cve_bin_tool/parsers/deb.py new file mode 100644 index 0000000000..89170b1fbe --- /dev/null +++ b/cve_bin_tool/parsers/deb.py @@ -0,0 +1,138 @@ +# Copyright (C) 2022 Intel Corporation +# SPDX-License-Identifier: GPL-3.0-or-later + +import asyncio +import os +import re +import tempfile +from pathlib import Path + +from cve_bin_tool.async_utils import aio_glob, aio_inpath, aio_run_command +from cve_bin_tool.extractor import BaseExtractor +from cve_bin_tool.parsers import Parser + + +class DebParser(Parser): + def __init__(self, cve_db, logger) -> None: + super().__init__(cve_db, logger) + + def parse_control_file(self, control_file): + """Parse the Debian control file and return a dictionary of its contents.""" + control_data = {} + try: + for line in control_file: + line = line.decode("utf-8") + if ":" in line: + key, value = line.split(":", 1) + control_data[key.strip()] = value.strip() + except Exception as e: + self.logger.debug(f"An error occurred while parsing the control file: {e}") + return control_data + + async def unpack_tar_xz(self, archive_path, extraction_path): + """Unpack a tar.xz file asynchronously.""" + loop = asyncio.get_event_loop() + extractor = BaseExtractor() + await loop.run_in_executor( + None, extractor.extract_file_tar, archive_path, extraction_path + ) + + async def extract_control(self, filename): + """Extract and parse the control file from a debian package.""" + is_ar = True + control_data = {} + process_can_fail = False + if await aio_inpath("file"): + stdout, stderr, return_code = await aio_run_command( + ["file", filename], process_can_fail + ) + if not re.search(b"Debian binary package", stdout): + is_ar = False + + if is_ar: + if not await aio_inpath("ar"): + self.logger.debug("ar tool not found") + return control_data + else: + with tempfile.TemporaryDirectory() as temp_dir: + # Extract the .deb package + original_dir = os.getcwd() + + # Change the working directory to the temp_dir for extraction + os.chdir(temp_dir) + await aio_run_command(["ar", "x", filename]) + + # Change the working directory to original after extraction + os.chdir(original_dir) + + # Use aio_glob to find control.tar.xz + control_tar_files = await aio_glob( + str(Path(temp_dir) / "control.tar.*") + ) + self.logger.debug("Files extracted", control_tar_files) + if control_tar_files: + control_tar_path = control_tar_files[0] + await self.unpack_tar_xz(control_tar_path, temp_dir) + + # Parse the control file + control_file_path = Path(temp_dir, "./control") + self.logger.debug(control_file_path) + if control_file_path.exists(): + with open(control_file_path, "rb") as control_file: + control_data = self.parse_control_file(control_file) + else: + self.logger.debug("Control archive not found.") + else: + self.logger.debug(f"{filename} is not a Debian binary package") + + return control_data + + def run_checker(self, filename): + try: + # Create a new event loop + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Run the async function and wait for the result + control_data = loop.run_until_complete(self.extract_control(filename)) + + # Clean up and close the loop + loop.close() + + package = control_data["Package"] + version = control_data["Version"] + architechture = control_data["Architechture"] + essential = control_data["Essential"] + # priority= control_data['Priority'] + # depends= control_data['Depends'] + # maintainer= control_data['Maintainer'] + # description = control_data['Description'] + + if package: + self.logger.debug(f"Package name is {package}") + else: + self.logger.debug("Package not found") + + if version: + self.logger.debug(f"Version:{version}") + else: + self.logger.debug("No Version Found") + + if architechture: + self.logger.debug(f"architechture name is {architechture}") + else: + self.logger.debug("architechture not found") + + if essential: + self.logger.debug(f"essential name is {essential}") + else: + self.logger.debug("essential not found") + + if package and version: + vendor = self.find_vendor(package, version) + if vendor is not None: + yield from vendor + except Exception as e: + self.logger.debug(f"Some Error occurred while parsing the file {e}") + + self.logger.debug(f"Done parsing file {filename}") diff --git a/cve_bin_tool/parsers/parse.py b/cve_bin_tool/parsers/parse.py index acb8fc328a..0e7f645bbc 100644 --- a/cve_bin_tool/parsers/parse.py +++ b/cve_bin_tool/parsers/parse.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: GPL-3.0-or-later from cve_bin_tool.parsers.dart import DartParser +from cve_bin_tool.parsers.deb import DebParser from cve_bin_tool.parsers.go import GoParser from cve_bin_tool.parsers.java import JavaParser from cve_bin_tool.parsers.javascript import JavascriptParser @@ -26,6 +27,7 @@ "Package.resolved": SwiftParser, "composer.lock": PhpParser, "cpanfile": PerlParser, + "test.deb": DebParser, "pubspec.lock": DartParser, } diff --git a/test/language_data/test.deb b/test/language_data/test.deb new file mode 100644 index 0000000000..7be3d48a30 Binary files /dev/null and b/test/language_data/test.deb differ diff --git a/test/test_language_scanner.py b/test/test_language_scanner.py index d30dfb325e..b9f3cf95e2 100644 --- a/test/test_language_scanner.py +++ b/test/test_language_scanner.py @@ -158,6 +158,8 @@ class TestLanguageScanner: SWIFT_PRODUCTS = ["alliance_web_platform"] + DEBIAN_PRODUCTS = [] + DART_PRODUCTS = ["dio", "archive"] @classmethod @@ -225,6 +227,7 @@ def test_language_package_none_found(self, filename: str) -> None: (str(TEST_FILE_PATH / "Package.resolved"), SWIFT_PRODUCTS), (str(TEST_FILE_PATH / "composer.lock"), PHP_PRODUCTS), (str(TEST_FILE_PATH / "cpanfile"), PERL_PRODUCTS), + (str(TEST_FILE_PATH / "test.deb"), DEBIAN_PRODUCTS), (str(TEST_FILE_PATH / "pubspec.lock"), DART_PRODUCTS), ], ) @@ -233,6 +236,7 @@ def test_language_package(self, filename: str, products: set[str]) -> None: scanner = VersionScanner() scanner.file_stack.append(filename) found_product = [] + file_path = None for product in scanner.scan_file(filename): if product: product_info, file_path = product @@ -242,7 +246,8 @@ def test_language_package(self, filename: str, products: set[str]) -> None: # expanded out to make missing products easier to spot for p in products: assert p in found_product - assert file_path == filename + if file_path: + assert file_path == filename @pytest.mark.parametrize("filename", ((str(TEST_FILE_PATH / "PKG-INFO")),)) def test_python_package(self, filename: str) -> None: @@ -256,3 +261,16 @@ def test_python_package(self, filename: str) -> None: "facebook", "zstandard", "0.18.0", "/usr/local/bin/product" ) assert file_path == filename + + @pytest.mark.parametrize("filename", ((str(TEST_FILE_PATH / "test.deb")),)) + def test_debian_control(self, filename: str) -> None: + scanner = VersionScanner() + scanner.file_stack.append(filename) + found_product = [] + # Not expecting any packages to be found + for product in scanner.scan_file(filename): + if product: + product_info, file_path = product + if product_info.product not in found_product: + found_product.append(product_info.product) + assert found_product is not None