-
Notifications
You must be signed in to change notification settings - Fork 468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: added debian parser #3543
base: main
Are you sure you want to change the base?
Changes from 17 commits
222b916
637f06c
4dc4a0a
04c68b3
1f3e65f
d084999
c5052bb
3fee439
b4e89df
95fdbc2
f774d9d
e72523e
560e375
1e8a625
5f609a8
626faf1
6c5b5fc
1be11cf
d0b260a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
"swift", | ||
"php", | ||
"perl", | ||
"deb", | ||
] | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
# Copyright (C) 2022 Intel Corporation | ||
# SPDX-License-Identifier: GPL-3.0-or-later | ||
|
||
import asyncio | ||
import os | ||
import re | ||
import tempfile | ||
from pathlib import Path | ||
|
||
from cve_bin_tool.async_utils import aio_glob, aio_inpath, aio_run_command | ||
from cve_bin_tool.extractor import BaseExtractor | ||
from cve_bin_tool.parsers import Parser | ||
|
||
|
||
class DebParser(Parser): | ||
def __init__(self, cve_db, logger) -> None: | ||
super().__init__(cve_db, logger) | ||
|
||
def parse_control_file(self, control_file): | ||
"""Parse the Debian control file and return a dictionary of its contents.""" | ||
control_data = {} | ||
try: | ||
for line in control_file: | ||
line = line.decode("utf-8") | ||
if ":" in line: | ||
key, value = line.split(":", 1) | ||
control_data[key.strip()] = value.strip() | ||
except Exception as e: | ||
self.logger.debug(f"An error occurred while parsing the control file: {e}") | ||
return control_data | ||
|
||
async def unpack_tar_xz(self, archive_path, extraction_path): | ||
"""Unpack a tar.xz file asynchronously.""" | ||
loop = asyncio.get_event_loop() | ||
extractor = BaseExtractor() | ||
await loop.run_in_executor( | ||
None, extractor.extract_file_tar, archive_path, extraction_path | ||
) | ||
|
||
async def extract_control(self, filename): | ||
"""Extract and parse the control file from a debian package.""" | ||
is_ar = True | ||
control_data = {} | ||
process_can_fail = False | ||
if await aio_inpath("file"): | ||
stdout, stderr, return_code = await aio_run_command( | ||
["file", filename], process_can_fail | ||
) | ||
if not re.search(b"Debian binary package", stdout): | ||
is_ar = False | ||
|
||
if is_ar: | ||
if not await aio_inpath("ar"): | ||
self.logger.debug("ar tool not found") | ||
return control_data | ||
else: | ||
with tempfile.TemporaryDirectory() as temp_dir: | ||
# Extract the .deb package | ||
original_dir = os.getcwd() | ||
|
||
# Change the working directory to the temp_dir for extraction | ||
os.chdir(temp_dir) | ||
await aio_run_command(["ar", "x", filename]) | ||
|
||
# Change the working directory to original after extraction | ||
os.chdir(original_dir) | ||
|
||
# Use aio_glob to find control.tar.xz | ||
control_tar_files = await aio_glob( | ||
str(Path(temp_dir) / "control.tar.*") | ||
) | ||
self.logger.debug("Files extracted", control_tar_files) | ||
if control_tar_files: | ||
control_tar_path = control_tar_files[0] | ||
await self.unpack_tar_xz(control_tar_path, temp_dir) | ||
|
||
# Parse the control file | ||
control_file_path = Path(temp_dir, "./control") | ||
self.logger.debug(control_file_path) | ||
if control_file_path.exists(): | ||
with open(control_file_path, "rb") as control_file: | ||
control_data = self.parse_control_file(control_file) | ||
else: | ||
self.logger.debug("Control archive not found.") | ||
else: | ||
self.logger.debug(f"{filename} is not a Debian binary package") | ||
|
||
return control_data | ||
|
||
def run_checker(self, filename): | ||
try: | ||
# Create a new event loop | ||
loop = asyncio.new_event_loop() | ||
asyncio.set_event_loop(loop) | ||
|
||
# Run the async function and wait for the result | ||
control_data = loop.run_until_complete(self.extract_control(filename)) | ||
|
||
# Clean up and close the loop | ||
loop.close() | ||
|
||
package = control_data["Package"] | ||
version = control_data["Version"] | ||
architechture = control_data["Architechture"] | ||
essential = control_data["Essential"] | ||
# priority= control_data['Priority'] | ||
# depends= control_data['Depends'] | ||
# maintainer= control_data['Maintainer'] | ||
# description = control_data['Description'] | ||
|
||
if package: | ||
self.logger.debug(f"Package name is {package}") | ||
else: | ||
self.logger.debug("Package not found") | ||
|
||
if version: | ||
self.logger.debug(f"Version:{version}") | ||
else: | ||
self.logger.debug("No Version Found") | ||
|
||
if architechture: | ||
self.logger.debug(f"architechture name is {architechture}") | ||
else: | ||
self.logger.debug("architechture not found") | ||
|
||
if essential: | ||
self.logger.debug(f"essential name is {essential}") | ||
else: | ||
self.logger.debug("essential not found") | ||
|
||
if package and version: | ||
vendor = self.find_vendor(package, version) | ||
if vendor is not None: | ||
yield from vendor | ||
except Exception as e: | ||
self.logger.debug(f"Some Error occurred while parsing the file {e}") | ||
|
||
self.logger.debug(f"Done parsing file {filename}") |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -158,6 +158,8 @@ class TestLanguageScanner: | |
|
||
SWIFT_PRODUCTS = ["alliance_web_platform"] | ||
|
||
DEBIAN_PRODUCTS = [] | ||
|
||
@classmethod | ||
def setup_class(cls): | ||
cls.cvedb = CVEDB() | ||
|
@@ -236,13 +238,15 @@ def test_language_package_none_found(self, filename: str) -> None: | |
(str(TEST_FILE_PATH / "Package.resolved"), SWIFT_PRODUCTS), | ||
(str(TEST_FILE_PATH / "composer.lock"), PHP_PRODUCTS), | ||
(str(TEST_FILE_PATH / "cpanfile"), PERL_PRODUCTS), | ||
(str(TEST_FILE_PATH / "test.deb"), DEBIAN_PRODUCTS), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we have test_language_scanner create the .deb file rather than checking it in to the test directory? We've typically just put small test files in the directory and let it be, but it's hurting our OpenSSF score when we provide things like .deb packages that are basically installable (weirdly, it doesn't flag on the thousand .tar.gz files... yet). I think .deb files use mostly tools we already have installed so it should be possible to write python or a makefile to generate the file here and add some code to skip the test if the file can't be built. Sorry that you get stuck as a guinea pig here! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Umm, I was working on this and was able to write something like this.. import os
import subprocess
def create_debian_package(directory, package_name, version, architecture, description, maintainer):
# Create the necessary directory structure
debian_dir = os.path.join(directory, 'DEBIAN')
os.makedirs(debian_dir, exist_ok=True)
# Create the control file
control_content = f"""Package: {package_name}
Version: {version}
Architecture: {architecture}
Maintainer: {maintainer}
Description: {description}
"""
with open(os.path.join(debian_dir, 'control'), 'w') as control_file:
control_file.write(control_content)
# Build the package
subprocess.run(['dpkg-deb', '--build', directory, f'{package_name}_{version}_{architecture}.deb'])
if __name__ == '__main__':
create_debian_package(
directory='mypackage',
package_name='mypackage',
version='1.0',
architecture='all',
description='Example package',
maintainer='Joydeep <mail@joydeep.com>'
) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should I add this file in the current PR or as a different PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test.deb can of course be created for testing. But, would'nt that be like extra memory space? |
||
], | ||
) | ||
def test_language_package(self, filename: str, products: set[str]) -> None: | ||
"""Test valid language product list files""" | ||
scanner = VersionScanner() | ||
scanner.file_stack.append(filename) | ||
found_product = [] | ||
file_path = None | ||
for product in scanner.scan_file(filename): | ||
if product: | ||
product_info, file_path = product | ||
|
@@ -252,7 +256,8 @@ def test_language_package(self, filename: str, products: set[str]) -> None: | |
# expanded out to make missing products easier to spot | ||
for p in products: | ||
assert p in found_product | ||
assert file_path == filename | ||
if file_path: | ||
assert file_path == filename | ||
|
||
@pytest.mark.parametrize("filename", ((str(TEST_FILE_PATH / "PKG-INFO")),)) | ||
def test_python_package(self, filename: str) -> None: | ||
|
@@ -264,3 +269,16 @@ def test_python_package(self, filename: str) -> None: | |
product_info, file_path = product | ||
assert product_info == ProductInfo("facebook", "zstandard", "0.18.0") | ||
assert file_path == filename | ||
|
||
@pytest.mark.parametrize("filename", ((str(TEST_FILE_PATH / "test.deb")),)) | ||
def test_debian_control(self, filename: str) -> None: | ||
scanner = VersionScanner() | ||
scanner.file_stack.append(filename) | ||
found_product = [] | ||
# Not expecting any packages to be found | ||
for product in scanner.scan_file(filename): | ||
if product: | ||
product_info, file_path = product | ||
if product_info.product not in found_product: | ||
found_product.append(product_info.product) | ||
assert found_product is not None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ar
exists before we run it.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ar
, as used in the aio_run_command() function, helps to extract the contents of the given debian package into a temporary directory, inside which we keep extracting till we reach the control file, then the contents of the file are written ontocontrol_data
variable and then the temporary directory is closed, deleting all the extracted contents.I think my code already does what you want it to.
As for why im using
-x
to extract all the files inside the package - It is because the control files are sometimes present in different directories than one might expect it to . It might be present directly inside the debian package or maybe inside another tar file in the package. That is why all the files are being extracted here.