Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/enhance bom diff #7

Merged
merged 3 commits into from
Jun 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,22 @@ ignore in the comparison and sorts all fields.

## CLI Usage
```
usage: cjd [-h] -i INPUT INPUT [-o OUTPUT] (-c CONFIG | -x EXCLUDE [EXCLUDE ...] | -p {cdxgen})
usage: cjd [-h] -i INPUT INPUT [-o OUTPUT] [-b] (-c CONFIG | -x EXCLUDE [EXCLUDE ...] | -p {cdxgen,cdxgen-extended})
options:
-h, --help show this help message and exit
-i INPUT INPUT, --input INPUT INPUT
Two JSON files to compare
Two JSON files to compare.
-o OUTPUT, --output OUTPUT
Export JSON of differences to this file
Export JSON of differences to this file.
-a, --allow-new-versions
Allow new versions in BOM comparison.
-b, --bom-diff Produce a comparison of CycloneDX BOMs.
-c CONFIG, --config-file CONFIG
Import TOML configuration file
Import TOML configuration file.
-x EXCLUDE [EXCLUDE ...], --exclude EXCLUDE [EXCLUDE ...]
Exclude field(s) from comparison
-p {cdxgen}, --preset {cdxgen}
Exclude field(s) from comparison.
-p {cdxgen,cdxgen-extended}, --preset {cdxgen,cdxgen-extended}
Preset to use
```
Expand Down Expand Up @@ -58,10 +61,9 @@ is flattened to:
```

To exclude field2, you would specify `field1.field2`. To exclude the `a` field in the array of
objects, you would specify `field1.field3.[].a`. custom-json-diff will create a regex which will
account for the array index in the field name. Multiple fields may be specified separated by a
space. To better understand what your fields should be, check out json-flatten, which is the
package used for this function.
objects, you would specify `field1.field3.[].a` (do NOT include the array index, just do `[]`).
Multiple fields may be specified separated by a space. To better understand what your fields should
be, check out json-flatten, which is the package used for this function.

## Sorting

Expand All @@ -75,4 +77,7 @@ The first key located from the provided keys that is present in the object will
[settings]
excluded_fields = ["serialNumber", "metadata.timestamp"]
sort_keys = ["url", "content", "ref", "name", "value"]

[bom_diff]
allow_new_versions = false
```
30 changes: 14 additions & 16 deletions custom_json_diff/cli.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import argparse

from custom_json_diff.custom_diff import (
compare_dicts,
get_common,
get_diffs,
perform_bom_diff,
report_results
compare_dicts, get_diff, perform_bom_diff, report_results
)


Expand All @@ -27,12 +23,13 @@ def build_args():
help="Export JSON of differences to this file.",
dest="output",
)
# parser.add_argument(
# "--common",
# action="store_true",
# help="Include common elements as well as differences",
# dest="common",
# )
parser.add_argument(
"-a",
"--allow-new-versions",
action="store_true",
help="Allow new versions in BOM comparison.",
dest="allow_new_versions",
)
parser.add_argument(
"-b",
"--bom-diff",
Expand Down Expand Up @@ -70,13 +67,14 @@ def build_args():

def main():
args = build_args()
result, j1, j2 = compare_dicts(args.input[0], args.input[1], args.preset, args.exclude, args.config)
diffs = get_diffs(args.input[0], args.input[1], j1, j2)
settings = args.preset or args.config or args.exclude
result, j1, j2 = compare_dicts(args.input[0], args.input[1], settings, args.bom_diff, args.allow_new_versions)

if args.bom_diff:
common = get_common(j1, j2)
perform_bom_diff(result, diffs, common, args.input[0], args.input[1], args.output)
result_summary = perform_bom_diff(j1, j2)
else:
report_results(result, diffs, args.output)
result_summary = get_diff(args.input[0], args.input[1], j1, j2)
report_results(result, result_summary, args.output)


if __name__ == "__main__":
Expand Down
201 changes: 75 additions & 126 deletions custom_json_diff/custom_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,94 +8,24 @@
import toml
from json_flatten import flatten, unflatten # type: ignore


DELIM = "|>"


class FlatDicts:
def __init__(self, elements):
self.data, self.search_keys = import_flat_dict(elements)

def __eq__(self, other):
return all(i in other.data for i in self.data)

def __ne__(self, other):
return not self == other

def __sub__(self, other):
missing = [i for i in self.data if i not in other.data]
return {i.key: i.value for i in missing}
# new_flat_dict = {}
# for i in missing:
# for j in self.data:
# if i == j.search_key:
# new_flat_dict[j.key] = j.value
# return new_flat_dict

def to_dict(self, unflat: bool = False):
result = {i.key: i.value for i in self.data}
if unflat:
result = unflatten(result)
return result

def intersection(self, other):
intersection = {}
for i in self.data:
if i.search_key in other.search_keys:
intersection[i.key] = i.value
return intersection

def filter_out_keys(self, exclude_keys):
filtered_data = []
for i in self.data:
if check_key(i.search_key, exclude_keys):
filtered_data.append(i)
self.data = filtered_data
return self


class FlatElement:
def __init__(self, key, value):
self.key = key
self.value = value
self.search_key = create_search_key(key, value)

def __eq__(self, other):
return self.search_key == other.search_key


def check_key(key: str, exclude_keys: Set[str]) -> bool:
return not any(key.startswith(k) for k in exclude_keys)
from custom_json_diff.custom_diff_classes import BomDicts, FlatDicts


def check_regex(regex_keys: Set[re.Pattern], key: str) -> bool:
return any(regex.match(key) for regex in regex_keys)


def compare_dicts(
json1: str, json2: str, preset: str | None = None,
excluded: List[str] | None = None, config: str | None = None
) -> Tuple[int, FlatDicts, FlatDicts]:
if preset:
exclude_keys, sort_keys = set_excluded_fields(preset)
elif config:
exclude_keys, sort_keys = import_toml(config)
else:
exclude_keys, sort_keys = set(excluded), [] # type: ignore
json_1_data = load_json(json1, exclude_keys, sort_keys)
json_2_data = load_json(json2, exclude_keys, sort_keys)
def compare_dicts(json1: str, json2: str, settings: str | List[str], bom_diff: bool, allow_new_versions: bool) -> Tuple[int, FlatDicts | BomDicts, FlatDicts | BomDicts]:
json_1_data = load_json(json1, allow_new_versions=allow_new_versions, settings=settings,
bom_diff=bom_diff)
json_2_data = load_json(json2, allow_new_versions=allow_new_versions, settings=settings,
bom_diff=bom_diff)
if json_1_data.data == json_2_data.data:
return 0, json_1_data, json_2_data
else:
return 1, json_1_data, json_2_data


def create_search_key(key: str, value: str) -> str:
combined_key = re.sub(r"(?<=\[)[0-9]+(?=])", "", key)
combined_key += f"|>{value}"
return combined_key


def export_results(outfile: str, diffs: Dict) -> None:
with open(outfile, "w", encoding="utf-8") as f:
f.write(json.dumps(diffs, indent=2))
Expand All @@ -106,21 +36,26 @@ def filter_dict(data: Dict, exclude_keys: Set[str], sort_keys: List[str]) -> Fla
return FlatDicts(data).filter_out_keys(exclude_keys)


def filter_simple(flattened_data: Dict, exclude_keys: Set[str]) -> Dict:
return {
key: value
for key, value in flattened_data.items()
if check_key(key, exclude_keys)
}
def get_bom_commons(bom_1: BomDicts, bom_2: BomDicts) -> Dict:
commons = {"metadata": (bom_1.data.intersection(bom_2.data)).to_dict(True)}
libraries = [i.original_data for i in bom_1.components if i in bom_2.components and i.component_type == "library"]
frameworks = [i.original_data for i in bom_1.components if i in bom_2.components and i.component_type == "framework"]
commons["components"] = {"libraries": libraries, "frameworks": frameworks}
commons["services"] = [i.original_data for i in bom_1.services if i in bom_2.services] # type: ignore
commons["dependencies"] = [i.original_data for i in bom_1.dependencies if i in bom_2.dependencies] # type: ignore
return commons


def get_common(json_1_data: FlatDicts, json_2_data: FlatDicts) -> Dict:
return unflatten(json_1_data.intersection(json_2_data))
def get_bom_diff(bom_1: BomDicts, bom_2: BomDicts) -> Dict:
diff = get_diff(bom_1.filename, bom_2.filename, bom_1.data, bom_2.data)
diff[bom_1.filename] |= populate_bom_diff(bom_1, bom_2)
diff[bom_2.filename] |= populate_bom_diff(bom_2, bom_1)
return diff


def get_diffs(f1: str | Path, f2: str | Path, j1: FlatDicts, j2: FlatDicts) -> Dict:
diff_1 = unflatten(j1 - j2)
diff_2 = unflatten(j2 - j1)
def get_diff(f1: str | Path, f2: str | Path, j1: FlatDicts, j2: FlatDicts) -> Dict:
diff_1 = (j1 - j2).to_dict(unflat=True)
diff_2 = (j2 - j1).to_dict(unflat=True)
return {str(f1): diff_1, str(f2): diff_2}


Expand All @@ -131,36 +66,27 @@ def get_sort_key(data: Dict, sort_keys: List[str]) -> str | bool:
def handle_results(outfile: str, diffs: Dict) -> None:
if outfile:
export_results(outfile, diffs)
else:
print("Differences found:")
if not outfile:
print(json.dumps(diffs, indent=2))


def import_flat_dict(my_dict: Dict) -> Tuple[List[FlatElement], Set[str]]:
searchable = []
search_keys = set()
for key, value in my_dict.items():
ele = FlatElement(key, value)
searchable.append(ele)
search_keys.add(ele.search_key)
return searchable, search_keys


def import_toml(toml_file_path: str) -> Tuple[Set[str], List[str]]:
def import_toml(toml_file_path: str) -> Tuple[Set[str], List[str], bool]:
with open(toml_file_path, "r", encoding="utf-8") as f:
try:
toml_data = toml.load(f)
except toml.TomlDecodeError:
logging.error("Invalid TOML.")
sys.exit(1)
try:
return toml_data["settings"]["excluded_fields"], toml_data["settings"]["sort_keys"]
except KeyError:
logging.error("Invalid TOML.")
sys.exit(1)
return (
set(toml_data.get("settings", {}).get("excluded_fields", [])),
toml_data.get("settings", {}).get("sort_keys", []),
toml_data.get("bom_diff", {}).get("allow_new_versions", False))


def load_json(json_file: str, exclude_keys: Set[str], sort_keys: List[str]) -> FlatDicts:
def load_json(json_file: str, allow_new_versions: bool,
settings: str | List[str] | None = None, exclude_keys: Set[str] | None = None,
sort_keys: List[str] | None = None,
bom_diff: bool | None = False) -> FlatDicts | BomDicts:
try:
with open(json_file, "r", encoding="utf-8") as f:
data = json.load(f)
Expand All @@ -170,38 +96,61 @@ def load_json(json_file: str, exclude_keys: Set[str], sort_keys: List[str]) -> F
except json.JSONDecodeError:
logging.error("Invalid JSON: %s", json_file)
sys.exit(1)
if bom_diff:
data = sort_dict(data, ["url", "content", "ref", "name", "value"])
return BomDicts(allow_new_versions, json_file, data)
if settings:
exclude_keys, sort_keys, allow_new_versions = load_settings(settings)
elif not exclude_keys:
exclude_keys = set()
if not sort_keys:
sort_keys = []
return filter_dict(data, exclude_keys, sort_keys)


def perform_bom_diff(status: int, diff: Dict, commons: Dict, f1: str, f2: str, outfile: str):
diff_elements = produce_bom_diff(diff, commons, f1, f2)
handle_results(outfile, diff_elements)


def populate_bom_diff(diff: Dict) -> Dict:
if not diff:
return {"components": [], "services": []}
return {
"components": [i.get("purl") for i in diff.get("components", []) if i.get("purl")],
"services": [i.get("bom-ref") for i in diff.get("services", []) if i.get("bom-ref")],
def load_settings(settings: str | List[str]) -> Tuple[Set[str], List[str], bool]:
if isinstance(settings, str):
if settings.endswith(".toml"):
exclude_keys, sort_keys, allow_new_versions = import_toml(settings)
else:
exclude_keys, sort_keys, allow_new_versions = set_excluded_fields(settings)
else:
exclude_keys, sort_keys, allow_new_versions = set(excluded), [], False # type: ignore
return exclude_keys, sort_keys, allow_new_versions


def perform_bom_diff(bom_1: BomDicts, bom_2: BomDicts) -> Dict:
return {"commons_summary":get_bom_commons(bom_1, bom_2), "diff_summary": get_bom_diff(bom_1, bom_2)}


def populate_bom_diff(bom_1: BomDicts, bom_2: BomDicts) -> Dict:
diff: Dict = {}
diff |= {
"components": {
"libraries": [
i.original_data
for i in bom_1.components
if i not in bom_2.components and i.component_type == "library"
],
"frameworks": [
i.original_data for i in bom_1.components if
i not in bom_2.components and i.component_type == "framework"
]}
}


def produce_bom_diff(diff: Dict, common: Dict, f1: str, f2: str) -> Dict:
diff_summary = {f1: populate_bom_diff(diff.get(f1, {})), f2: populate_bom_diff(diff.get(f2, {}))}
if common:
diff_summary["common"] = populate_bom_diff(common)
return diff_summary
diff |= {"services": [i.original_data for i in bom_1.services if i not in bom_2.services]}
diff |= {"dependencies": [i.original_data for i in bom_1.dependencies if i not in bom_2.dependencies]}
return diff


def report_results(status: int, diffs: Dict, outfile: str):
if status == 0:
print("No differences found.")
else:
print("Differences found.")
handle_results(outfile, diffs)


def set_excluded_fields(preset: str) -> Tuple[Set[str], List[str]]:
def set_excluded_fields(preset: str) -> Tuple[Set[str], List[str], bool]:
excluded = []
sort_fields = []
if preset.startswith("cdxgen"):
Expand All @@ -215,7 +164,7 @@ def set_excluded_fields(preset: str) -> Tuple[Set[str], List[str]]:
if preset == "cdxgen-extended":
excluded.append("components.[].licenses")
sort_fields.extend(["url", "content", "ref", "name", "value"])
return set(excluded), sort_fields
return set(excluded), sort_fields, False


def sort_dict(result: Dict, sort_keys: List[str]) -> Dict:
Expand Down
Loading