Skip to content

Commit

Permalink
Include additional metadata in vulnerability occurrences for CSAF.
Browse files Browse the repository at this point in the history
Minor CSAF changes

Signed-off-by: Caroline Russell <caroline@appthreat.dev>

More metadata for CSAF

Signed-off-by: Caroline Russell <caroline@appthreat.dev>

Added original date of vulnerability for use with CSAF

Signed-off-by: Caroline Russell <caroline@appthreat.dev>

Additional mods to collect more csaf info

Signed-off-by: Caroline Russell <caroline@appthreat.dev>

Modifications to enable cvssv3 to be stored in csaf.

Signed-off-by: Caroline Russell <caroline@appthreat.dev>
  • Loading branch information
cerrussell committed Oct 4, 2023
1 parent c076ea8 commit 2c78bb6
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "appthreat-vulnerability-db"
version = "5.4.3"
version = "5.5.0"
description = "AppThreat's vulnerability database and package search library with a built-in file based storage. OSV, CVE, GitHub, npm are the primary sources of vulnerabilities."
authors = [
{name = "Team AppThreat", email = "cloud@appthreat.com"},
Expand Down
69 changes: 67 additions & 2 deletions vdb/lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
)



class VulnerabilitySource(metaclass=ABCMeta):
@classmethod
@abstractmethod
Expand Down Expand Up @@ -155,6 +156,7 @@ def __init__(
details,
cvss_v3,
source_update_time,
source_orig_time,
):
self.id = vid
self.problem_type = problem_type
Expand All @@ -165,6 +167,7 @@ def __init__(
self.details = details
self.cvss_v3 = cvss_v3
self.source_update_time: datetime = convert_time(source_update_time)
self.source_orig_time: datetime = convert_time(source_orig_time)

def __repr__(self):
return json.dumps(
Expand All @@ -180,6 +183,9 @@ def __repr__(self):
"source_update_time": self.source_update_time.isoformat()
if isinstance(self.source_update_time, datetime)
else self.source_update_time,
"source_orig_time": self.source_orig_time.isoformat()
if isinstance(self.source_orig_time, datetime)
else self.source_orig_time,
}
)

Expand All @@ -201,6 +207,7 @@ def __init__(
package_type,
is_obsolete,
source_update_time,
source_orig_time,
):
parts = CPE_REGEX.match(cpe_uri)
self.cpe_uri = cpe_uri
Expand Down Expand Up @@ -247,6 +254,7 @@ def __init__(
self.package_type = VulnerabilityDetail.get_type(cpe_uri, package_type)
self.is_obsolete = is_obsolete
self.source_update_time: datetime = convert_time(source_update_time)
self.source_orig_time: datetime = convert_time(source_orig_time)

@staticmethod
def get_type(cpe_uri, package_type):
Expand Down Expand Up @@ -298,8 +306,30 @@ def from_dict(detail):
detail.get("package_type"),
detail.get("is_obsolete"),
detail.get("source_update_time"),
detail.get("source_orig_time"),
)

def to_dict(self):
return {
"cpe_uri": self.cpe_uri,
"package": self.package,
"mii": self.mii,
"mai": self.mai,
"mie": self.mie,
"mae": self.mae,
"severity": self.severity.value,
"description": self.description,
"fixed_location": self.fixed_location,
"package_type": self.package_type,
"is_obsolete": self.is_obsolete,
"source_update_time": self.source_update_time.isoformat()
if isinstance(self.source_update_time, datetime)
else self.source_update_time,
"source_orig_time": self.source_orig_time.isoformat()
if isinstance(self.source_orig_time, datetime)
else self.source_orig_time,
}


class PackageIssue(object):
"""Package issue class"""
Expand Down Expand Up @@ -342,6 +372,12 @@ def from_dict(package_issue):
package_issue.get("affected_location"), package_issue.get("fixed_location")
)

def to_dict(self):
return {
"affected_location": str(self.affected_location),
"fixed_location": self.fixed_location,
}

def __str__(self):
return json.dumps(
{
Expand All @@ -365,6 +401,7 @@ class CvssV3(object):
confidentiality_impact: str
integrity_impact: str
availability_impact: str
vector_string: str

def __init__(
self,
Expand All @@ -379,6 +416,7 @@ def __init__(
confidentiality_impact,
integrity_impact,
availability_impact,
vector_string=None,
):
self.base_score = base_score
self.exploitability_score = exploitability_score
Expand All @@ -391,6 +429,23 @@ def __init__(
self.confidentiality_impact = confidentiality_impact
self.integrity_impact = integrity_impact
self.availability_impact = availability_impact
self.vector_string = vector_string

def to_dict(self):
return {
"base_score": self.base_score,
"exploitability_score": self.exploitability_score,
"impact_score": self.impact_score,
"attack_vector": self.attack_vector,
"attack_complexity": self.attack_complexity,
"privileges_required": self.privileges_required,
"user_interaction": self.user_interaction,
"scope": self.scope,
"confidentiality_impact": self.confidentiality_impact,
"integrity_impact": self.integrity_impact,
"availability_impact": self.availability_impact,
"vector_string": self.vector_string,
}


class VulnerabilityLocation(object):
Expand Down Expand Up @@ -465,12 +520,14 @@ class VulnerabilityOccurrence:
type: str
severity: Severity
cvss_score: str
cvss_v3: CvssV3
package_issue: PackageIssue
short_description: str
long_description: str
related_urls: list
effective_severity: Severity
matched_by: str
vdetails: VulnerabilityDetail

def __init__(
self,
Expand All @@ -479,39 +536,47 @@ def __init__(
type,
severity,
cvss_score,
cvss_v3,
package_issue,
short_description,
long_description,
related_urls,
effective_severity,
matched_by,
vdetails,
):
self.id = id
self.problem_type = problem_type
self.type = type
self.severity = severity
self.cvss_score = cvss_score
self.cvss_v3 = cvss_v3 if cvss_v3 else None
self.package_issue = package_issue
self.short_description = short_description
self.long_description = long_description
self.related_urls = related_urls
self.effective_severity = effective_severity
self.matched_by = matched_by
self.vdetails = vdetails if vdetails else None

def to_dict(self):
"""Convert the object to dict"""
return {
"id": self.id,
"problem_type": self.problem_type,
"problem_type": self.problem_type
if type(self.problem_type) == str
else ",".join(self.problem_type),
"type": self.type,
"severity": str(self.severity),
"cvss_score": str(self.cvss_score),
"package_issue": str(self.package_issue),
"cvss_v3": self.cvss_v3.to_dict() if self.cvss_v3 else None,
"package_issue": self.package_issue.to_dict(),
"short_description": self.short_description,
"long_description": self.long_description,
"related_urls": self.related_urls,
"effective_severity": str(self.effective_severity),
"matched_by": self.matched_by,
"vdetails": self.vdetails.to_dict() if self.vdetails else None,
}


Expand Down
3 changes: 3 additions & 0 deletions vdb/lib/nvd.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def convert_vuln(vuln):
confidentiality_impact=cvss_data["confidentialityImpact"],
integrity_impact=cvss_data["integrityImpact"],
availability_impact=cvss_data["availabilityImpact"],
vector_string=cvss_data["vectorString"],
)
severity = cvss_data["baseSeverity"]
base_score = cvss_v3.base_score
Expand All @@ -178,6 +179,7 @@ def convert_vuln(vuln):
details,
cvss_v3,
vuln["lastModifiedDate"],
vuln["publishedDate"],
)

@staticmethod
Expand Down Expand Up @@ -209,6 +211,7 @@ def convert_vuln_detail(vuln):
detail["mai"] = cpe.get("versionEndIncluding")
detail["mae"] = cpe.get("versionEndExcluding")
detail["source_update_time"] = vuln["lastModifiedDate"]
detail["source_orig_time"] = vuln["publishedDate"]
cpe_details_list.append(detail)
else: # cpe is not vulnerable
if node["operator"] == "OR":
Expand Down
11 changes: 10 additions & 1 deletion vdb/lib/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import codecs
import importlib
import os
import re
import string
from datetime import date, datetime
Expand Down Expand Up @@ -493,7 +494,13 @@ def version_compare(
# Easy check
if compare_ver and mae and compare_ver == mae:
return False
(tcompare_ver, tmin_version, tmax_version, tmie, tmae,) = trim_epoch(
(
tcompare_ver,
tmin_version,
tmax_version,
tmie,
tmae,
) = trim_epoch(
compare_ver,
min_version,
max_version,
Expand Down Expand Up @@ -904,6 +911,7 @@ def convert_to_occurrence(datas):
type=package_type,
severity=vobj["severity"],
cvss_score=vobj["score"],
cvss_v3=vobj["cvss_v3"],
package_issue=PackageIssue(
affected_location=cpe_uri,
fixed_location=fixed_location,
Expand All @@ -917,6 +925,7 @@ def convert_to_occurrence(datas):
related_urls=vobj["related_urls"],
effective_severity=vobj["severity"],
matched_by=match,
vdetails=vdetails,
)
id_list.append(unique_key)
data_list.append(occ)
Expand Down

0 comments on commit 2c78bb6

Please sign in to comment.