Skip to content

Commit

Permalink
Bugfixes for search module queries and logic. (#185)
Browse files Browse the repository at this point in the history
* Log whether cache was successfully cleaned.

Signed-off-by: Caroline Russell <caroline@appthreat.dev>

* Check additional advisory prefixes in search-by-any.

Signed-off-by: Caroline Russell <caroline@appthreat.dev>

* Minor refactor to close bom file before processing.

Signed-off-by: Caroline Russell <caroline@appthreat.dev>

* Return results if no version.

Signed-off-by: Caroline Russell <caroline@appthreat.dev>

* Typing.

Signed-off-by: Caroline Russell <caroline@appthreat.dev>

* Bump version.

Signed-off-by: Caroline Russell <caroline@appthreat.dev>

* Check if search term is CVE-like with a regex.

Signed-off-by: Caroline Russell <caroline@appthreat.dev>

* Revert return results if no version.

Signed-off-by: Caroline Russell <caroline@appthreat.dev>

---------

Signed-off-by: Caroline Russell <caroline@appthreat.dev>
  • Loading branch information
cerrussell authored Oct 16, 2024
1 parent 923165e commit ccabaa3
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 64 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ vdb --search "cpe:2.3:a:npm:gitblame:*:*:*:*:*:*:*:*"
# Search by colon separated values
vdb --search "npm:gitblame:0.0.1"
# Search by CVE id
# Search by vulnerability id (CVE, GHSA, ALSA, DSA, etc.)
vdb --search CVE-2024-25169
# Search with wildcard for CVE
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "appthreat-vulnerability-db"
version = "6.0.14"
version = "6.1.0"
description = "AppThreat's vulnerability database and package search library with a built-in sqlite based storage. OSV, CVE, GitHub, npm are the primary sources of vulnerabilities."
authors = [
{name = "Team AppThreat", email = "cloud@appthreat.com"},
Expand Down
3 changes: 3 additions & 0 deletions test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,9 @@ def test_vers_compare():
assert not utils.vers_compare("6.0.13", "vers:maven/>=5.1.0.RELEASE|<5.1.1.RELEASE")
assert not utils.vers_compare("6.0.13", "vers:maven/>=4.3.0.RELEASE|<4.3.18.RELEASE")
assert not utils.vers_compare("6.0.13", "vers:maven/>=5.0.0.RELEASE|<5.0.7.RELEASE")
assert utils.vers_compare("1.12", "")



def test_clean_cpe_uri():
test_tuples = (
Expand Down
6 changes: 5 additions & 1 deletion vdb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def build_args():
parser.add_argument(
"--search",
dest="search",
help="Search for the package or CVE ID in the database. Use purl, cpe, or git http url.",
help="Search for the package or vulnerability ID (CVE, GHSA, ALSA, DSA, etc.) in the database. Use purl, cpe, or git http url.",
)
parser.add_argument(
"--list-malware",
Expand Down Expand Up @@ -267,6 +267,10 @@ def main():
if args.clean:
if os.path.exists(config.DATA_DIR):
shutil.rmtree(config.DATA_DIR, ignore_errors=True)
if not os.path.exists(config.DATA_DIR):
LOG.info("VDB cache cleaned successfully.")
else:
LOG.info("VDB cache at %s not cleaned successfully.", config.DATA_DIR)
if args.print_vdb_metadata:
print_db_file_metadata(config.VDB_METADATA_FILE)
if args.download_image or args.download_full_image:
Expand Down
17 changes: 9 additions & 8 deletions vdb/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
{"cve":{"data_type":"CVE","data_format":"MITRE","data_version":"4.0","CVE_data_meta":{"ID":"%(cve_id)s","ASSIGNER":"%(assigner)s"},"problemtype":{"problemtype_data":[{"description":[{"lang":"en","value":"%(cwe_id)s"}]}]},"references":{"reference_data": %(references)s},"description":{"description_data":[{"lang":"en","value":"%(description)s"}]}},"configurations":{"CVE_data_version":"4.0","nodes":[{"operator":"OR","cpe_match":[{"vulnerable":true,"cpe23Uri":"cpe:2.3:a:%(vendor)s:%(product)s:%(version)s:*:%(edition)s:*:*:*:*:*","versionStartExcluding":"%(version_start_excluding)s","versionEndExcluding":"%(version_end_excluding)s","versionStartIncluding":"%(version_start_including)s","versionEndIncluding":"%(version_end_including)s"}, {"vulnerable":false,"cpe23Uri":"cpe:2.3:a:%(vendor)s:%(product)s:%(fix_version_start_including)s:*:%(edition)s:*:*:*:*:*","versionStartExcluding":"%(fix_version_start_excluding)s","versionEndExcluding":"%(fix_version_end_excluding)s","versionStartIncluding":"%(fix_version_start_including)s","versionEndIncluding":"%(fix_version_end_including)s"}]}]},"impact":{"baseMetricV3":{"cvssV3":{"version":"3.1","vectorString":"%(vectorString)s","attackVector":"NETWORK","attackComplexity":"%(attackComplexity)s","privilegesRequired":"NONE","userInteraction":"%(userInteraction)s","scope":"UNCHANGED","confidentialityImpact":"%(severity)s","integrityImpact":"%(severity)s","availabilityImpact":"%(severity)s","baseScore":%(score).1f,"baseSeverity":"%(severity)s"},"exploitabilityScore":%(exploitabilityScore).1f,"impactScore":%(score).1f},"baseMetricV2":{"cvssV2":{"version":"2.0","vectorString":"AV:N/AC:M/Au:N/C:P/I:P/A:P","accessVector":"NETWORK","accessComplexity":"MEDIUM","authentication":"NONE","confidentialityImpact":"PARTIAL","integrityImpact":"PARTIAL","availabilityImpact":"PARTIAL","baseScore":%(score).1f},"severity":"%(severity)s","exploitabilityScore":%(exploitabilityScore).1f,"impactScore":%(score).1f,"acInsufInfo":false,"obtainAllPrivilege":false,"obtainUserPrivilege":false,"obtainOtherPrivilege":false,"userInteractionRequired":false}},"publishedDate":"%(publishedDate)s","lastModifiedDate":"%(lastModifiedDate)s"}
"""

osv_url_dict = {
OSV_URL_DICT = {
"javascript": "https://osv-vulnerabilities.storage.googleapis.com/npm/all.zip",
"python": "https://osv-vulnerabilities.storage.googleapis.com/PyPI/all.zip",
"go": "https://osv-vulnerabilities.storage.googleapis.com/Go/all.zip",
Expand All @@ -71,24 +71,24 @@

# Support for disabling individual distro feeds
if os.getenv("VDB_IGNORE_ALMALINUX", "") not in ("true", "1"):
osv_url_dict["almalinux"] = "https://osv-vulnerabilities.storage.googleapis.com/AlmaLinux/all.zip"
OSV_URL_DICT["almalinux"] = "https://osv-vulnerabilities.storage.googleapis.com/AlmaLinux/all.zip"
if os.getenv("VDB_IGNORE_ALPINE", "") not in ("true", "1"):
osv_url_dict["alpine"] = "https://osv-vulnerabilities.storage.googleapis.com/Alpine/all.zip"
OSV_URL_DICT["alpine"] = "https://osv-vulnerabilities.storage.googleapis.com/Alpine/all.zip"
if os.getenv("VDB_IGNORE_DEBIAN", "") not in ("true", "1"):
osv_url_dict["debian"] = "https://osv-vulnerabilities.storage.googleapis.com/Debian/all.zip"
OSV_URL_DICT["debian"] = "https://osv-vulnerabilities.storage.googleapis.com/Debian/all.zip"
if os.getenv("VDB_IGNORE_ROCKYLINUX", "") not in ("true", "1"):
osv_url_dict["rockylinux"] = "https://osv-vulnerabilities.storage.googleapis.com/Rocky%20Linux/all.zip"
OSV_URL_DICT["rockylinux"] = "https://osv-vulnerabilities.storage.googleapis.com/Rocky%20Linux/all.zip"


# These feeds introduce too much false positives
if os.getenv("OSV_INCLUDE_FUZZ"):
osv_url_dict["linux"] = (
OSV_URL_DICT["linux"] = (
"https://osv-vulnerabilities.storage.googleapis.com/Linux/all.zip"
)
osv_url_dict["oss-fuzz"] = (
OSV_URL_DICT["oss-fuzz"] = (
"https://osv-vulnerabilities.storage.googleapis.com/OSS-Fuzz/all.zip"
)
osv_url_dict["android"] = (
OSV_URL_DICT["android"] = (
"https://osv-vulnerabilities.storage.googleapis.com/Android/all.zip",
)

Expand Down Expand Up @@ -191,3 +191,4 @@
"ubuntu": ["ubuntu"],
"wolfi": ["wolfi"]
}

70 changes: 35 additions & 35 deletions vdb/lib/search.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from typing import Any, Generator
import re
from typing import Generator, List, Tuple

import apsw
import orjson

from vdb.lib import db6, utils
from vdb.lib.cve_model import CVE, CVE1
from vdb.lib.utils import load_json


def _filter_hits(raw_hits: list, compare_ver: str) -> list:
IS_ADVISORY = re.compile("^[A-Z]{1,7}-")


def filter_hits(raw_hits: List, compare_ver: str) -> List:
filtered_list = []
for ahit in raw_hits:
cve_id = ahit[0]
Expand All @@ -25,9 +31,7 @@ def _filter_hits(raw_hits: list, compare_ver: str) -> list:
return filtered_list


def get_cve_data(
db_conn, index_hits: list[dict, Any], search_str: str
) -> Generator | list[dict[str, str | CVE | None]]:
def get_cve_data(db_conn: apsw.Connection | None, index_hits: List, search_str: str) -> Generator:
"""Get CVE data for the index results
Args:
Expand Down Expand Up @@ -65,43 +69,39 @@ def get_cve_data(
}


def search_by_any(any_str: str, with_data: bool = False) -> list | None:
def search_by_any(any_str: str, with_data: bool = False) -> List:
"""Convenient method to search by a string"""
if any_str.startswith("pkg:"):
return search_by_purl_like(any_str, with_data)
if (
any_str.startswith("CVE-")
or any_str.startswith("GHSA-")
or any_str.startswith("MAL-")
):
if IS_ADVISORY.search(any_str):
return search_by_cve(any_str, with_data)
if any_str.startswith("http"):
return search_by_url(any_str, with_data)
return search_by_cpe_like(any_str, with_data)


def search_by_cpe_like(cpe: str, with_data=False) -> list | None:
def search_by_cpe_like(cpe: str, with_data: bool = False) -> List:
"""Search by CPE or colon-separate strings"""
db_conn, index_conn = db6.get(read_only=True)
if cpe.startswith("cpe:"):
vendor, package, version, _ = utils.parse_cpe(cpe)
elif cpe.count(":") == 2:
vendor, package, version = cpe.split(":")
else:
return None
return []
# check for vendor name in both namespace and type
raw_hits = exec_query(
index_conn,
"SELECT DISTINCT cve_id, type, namespace, name, vers, purl_prefix FROM cve_index where (namespace = ? OR type = ?) AND name = ?;",
(vendor, vendor, package),
)
filtered_list = _filter_hits(raw_hits, version)
filtered_list = filter_hits(raw_hits, version)
if with_data:
return get_cve_data(db_conn, filtered_list, cpe)
return list(get_cve_data(db_conn, filtered_list, cpe))
return filtered_list


def search_by_purl_like(purl: str, with_data=False) -> list | None:
def search_by_purl_like(purl: str, with_data: bool = False) -> List:
"""Search by purl like string"""
db_conn, index_conn = db6.get(read_only=True)
purl_obj = utils.parse_purl(purl)
Expand All @@ -124,38 +124,38 @@ def search_by_purl_like(purl: str, with_data=False) -> list | None:
"SELECT DISTINCT cve_id, type, namespace, name, vers, purl_prefix FROM cve_index where purl_prefix = ?;",
args,
)
filtered_list = _filter_hits(raw_hits, version)
filtered_list = filter_hits(raw_hits, version)
if with_data:
return get_cve_data(db_conn, filtered_list, purl)
return list(get_cve_data(db_conn, filtered_list, purl))
return filtered_list
return None

return []

def search_by_cve(cve_id: str, with_data=False, with_limit=None) -> list | None:
def search_by_cve(cve_id: str, with_data: bool = False, with_limit: int | None = None) -> List:
"""Search by CVE"""
db_conn, index_conn = db6.get(read_only=True)
filter_part = "cve_id LIKE ?" if "%" in cve_id else "cve_id = ?"
filter_part = f"{filter_part} ORDER BY cve_id DESC"
args = [cve_id]
if with_limit:
if with_limit and isinstance(with_limit, int):
filter_part = f"{filter_part} LIMIT ?"
args.append(with_limit)
args = tuple(args)
raw_hits = exec_query(
index_conn,
f"SELECT DISTINCT cve_id, type, namespace, name, vers, purl_prefix FROM cve_index where {filter_part}",
args,
)
filtered_list = _filter_hits(raw_hits, "*")
filtered_list = filter_hits(raw_hits, "*")
if with_data:
return get_cve_data(db_conn, filtered_list, cve_id)
return list(get_cve_data(db_conn, filtered_list, cve_id))
return filtered_list


def search_by_url(url: str, with_data=False) -> list | None:
def search_by_url(url: str, with_data: bool = False) -> List:
"""Search by URL"""
purl_obj = utils.url_to_purl(url)
if not purl_obj:
return None
return []
name = purl_obj["name"]
purl_str = (
f"pkg:{purl_obj['type']}/{purl_obj['namespace']}/{name}"
Expand All @@ -167,21 +167,21 @@ def search_by_url(url: str, with_data=False) -> list | None:
return search_by_purl_like(purl_str, with_data)


def search_by_cdx_bom(bom_file: str, with_data=False) -> Generator:
def search_by_cdx_bom(bom_file: str, with_data: bool = False) -> Generator:
"""Search by CycloneDX BOM file"""
with open(bom_file, encoding="utf-8", mode="r") as fp:
cdx_obj = orjson.loads(fp.read())
for component in cdx_obj.get("components"):
if component.get("purl"):
yield search_by_purl_like(component.get("purl"), with_data)
if component.get("cpe"):
yield search_by_cpe_like(component.get("cpe"), with_data)
cdx_obj = load_json(bom_file)
for component in cdx_obj.get("components", []):
if component.get("purl"):
yield search_by_purl_like(component["purl"], with_data)
if component.get("cpe"):
yield search_by_cpe_like(component["cpe"], with_data)


def latest_malware(with_limit=20, with_data=False) -> Generator:
"""Search for latest malware with CVE ID beginning with MAL-"""
yield search_by_cve("MAL-%", with_data=with_data, with_limit=with_limit)


def exec_query(conn, query: str, args: tuple[str, ...]) -> list:
def exec_query(conn, query: str, args: Tuple[str, ...]) -> list:
res = conn.execute(query, args)
return res.fetchall()
42 changes: 24 additions & 18 deletions vdb/lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from datetime import date, datetime
from enum import Enum
from hashlib import blake2b
from typing import Dict, List
from urllib.parse import parse_qs, urlparse

import orjson
from cvss import CVSS3
from packageurl import PackageURL
from semver import VersionInfo
Expand Down Expand Up @@ -450,7 +452,7 @@ def trim_epoch(
)


def vers_compare(compare_ver: str | int | float, vers: str) -> bool:
def vers_compare(compare_ver: str | int | float | None, vers: str) -> bool:
"""Purl vers based version comparison"""
min_version, max_version, min_excluding, max_excluding = None, None, None, None
if vers == "*" or compare_ver is None or compare_ver == "*":
Expand Down Expand Up @@ -489,8 +491,8 @@ def vers_compare(compare_ver: str | int | float, vers: str) -> bool:

def version_compare(
compare_ver: str | int | float,
min_version: str | int | float,
max_version: str | int | float,
min_version: str | int | float | None,
max_version: str | int | float | None,
mie: str | int | float | None = None,
mae: str | int | float | None = None,
) -> bool:
Expand Down Expand Up @@ -897,7 +899,7 @@ def get_default_cve_data(severity):
return score, severity, vector_string, attack_complexity


def get_cvss3_from_vector(vector: str) -> dict:
def get_cvss3_from_vector(vector: str) -> Dict:
"""
Return CVE metadata for the given vector
:param vector: Vector
Expand Down Expand Up @@ -982,7 +984,7 @@ def fix_text(text):
return text


def convert_md_references(md_text):
def convert_md_references(md_text: str) -> List:
"""Method to convert markdown list to references url format"""
if not md_text:
return []
Expand All @@ -999,7 +1001,7 @@ def convert_md_references(md_text):
return ref_list


def parse_purl(purl_str: str) -> dict:
def parse_purl(purl_str: str) -> Dict:
"""Method to parse a package url string safely"""
purl_obj = None
if purl_str and purl_str.startswith("pkg:"):
Expand Down Expand Up @@ -1071,7 +1073,7 @@ def decompress_str(s):
return s


def to_purl_vers(vendor: str, versions: list) -> str:
def to_purl_vers(vendor: str, versions: List) -> str:
vers_list = []
scheme = VENDOR_TO_VERS_SCHEME.get(vendor, vendor)
if vendor.startswith("git") or not vendor.isalpha():
Expand Down Expand Up @@ -1125,18 +1127,18 @@ def to_purl_vers(vendor: str, versions: list) -> str:
else:
vers_list.append(f"<={less_than_or_equal}")

return f"vers:{scheme}/{'|'.join(vers_list)}" if vers_list else None
return f"vers:{scheme}/{'|'.join(vers_list)}" if vers_list else ""


def calculate_hash(content: str, digest_size=16) -> str:
def calculate_hash(content: str, digest_size: int = 16) -> str:
"""Function to calculate has using blake2b algorithm"""
h = blake2b(digest_size=digest_size)
h.update(content.encode())
return h.hexdigest()


def url_to_purl(url: str) -> dict | None:
"""Convert a given http url to purl objecg"""
def url_to_purl(url: str) -> Dict:
"""Convert a given http url to purl object"""
url_obj = urlparse(url)
git_repo_name = url_obj.hostname
version = None
Expand Down Expand Up @@ -1165,9 +1167,7 @@ def url_to_purl(url: str) -> dict | None:
for v in ("commit", "tag", "hash", "version", "id"):
if query_obj.get(v):
version = query_obj.get(v)[0].split(";")[0]
git_repo_name = (
git_repo_name.removesuffix("-").removesuffix("/commit").removesuffix(".git")
)
git_repo_name = git_repo_name.removesuffix("-").removesuffix("/commit").removesuffix(".git")
url_obj = urlparse(f"https://{git_repo_name}")
# Fix for #112
pkg_type = "generic"
Expand All @@ -1179,20 +1179,20 @@ def url_to_purl(url: str) -> dict | None:
# Filter repo names without a path
# eg: github.com
if not url_obj.path:
return None
return {}
purl_obj = parse_purl(
f"pkg:{pkg_type}/{url_obj.hostname}/{url_obj.path}"
if hostname
else f"pkg:{pkg_type}/{url_obj.path}"
)
if not purl_obj or not purl_obj["namespace"] or not purl_obj["name"]:
return None
return {}
if not purl_obj["version"] and version:
purl_obj["version"] = version
return purl_obj


def clean_cpe_uri(cpe_uri):
def clean_cpe_uri(cpe_uri: str) -> str:
if not cpe_uri:
return cpe_uri
cpe_uri = re.sub(r"[\\!&,()+\[\]]" , "", cpe_uri)
Expand All @@ -1203,7 +1203,7 @@ def clean_cpe_uri(cpe_uri):
return cpe_uri


def extract_affected_symbols(description: str) -> dict:
def extract_affected_symbols(description: str) -> Dict:
"""
Method to extract affected_modules and affected_functions from the description
"""
Expand All @@ -1226,3 +1226,9 @@ def extract_affected_symbols(description: str) -> dict:
"affected_functions": sorted(affected_functions),
"affected_modules": sorted(affected_modules),
}


def load_json(filepath: str) -> Dict:
with open(filepath, encoding="utf-8", mode="r") as fp:
cdx_obj = orjson.loads(fp.read())
return cdx_obj

0 comments on commit ccabaa3

Please sign in to comment.