Skip to content

Commit

Permalink
Feat: Add check-reachable command. (#51)
Browse files Browse the repository at this point in the history
Signed-off-by: Caroline Russell <caroline@appthreat.dev>
  • Loading branch information
cerrussell authored Jun 28, 2024
1 parent c9c6a5a commit 264c1d6
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 39 deletions.
69 changes: 55 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ e.g. `atom-tools help
convert`).

```
Atom Tools (version 0.5.0)
Atom Tools (version 0.6.0)
Usage:
command [options] [arguments]
Expand All @@ -44,11 +44,13 @@ Options:
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug.
Available commands:
convert Convert an atom slice to a different format.
filter Filter an atom slice based on specified criteria.
help Displays help for a command.
list Lists commands.
validate-lines Check the accuracy of the line numbers in an atom slice.
check-reachable Find out if there are hits for a given package:version or file:linenumber in an atom slice.
convert Convert an atom slice to a different format.
filter Filter an atom slice based on specified criteria.
help Displays help for a command.
list Lists commands.
query-endpoints List elements to display in the console.
validate-lines Check the accuracy of the line numbers in an atom slice.
```

## Features
Expand Down Expand Up @@ -134,21 +136,29 @@ This would be equivalent to

##### Available attributes (not case-sensitive):

*For usages slices*
- callName
- fileName
- fullName
- name
- resolvedMethod
- signature

| attribute | locations |
|----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------|
| callName | objectSlices.usages.argToCalls, objectSlices.usages.invokedCalls, userDefinedTypes.procedures, |
| fileName | objectSlices, userDefinedTypes | |
| fullName | objectSlices |
| name | objectSlices.usages.targetObj, objectSlices.usages.definedBy, userDefinedTypes.fields |
| resolvedMethod | objectSlices.usages.targetObj, objectSlices.usages.definedBy, objectSlices.usages.argToCalls, objectSlices.usages.invokedCalls, userDefinedTypes.procedures |
| signature | objectSlices |
| attribute | locations searched | reachables locations |
|----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------|
| callName | objectSlices.usages.argToCalls<br/>objectSlices.usages.invokedCalls<br/>userDefinedTypes.procedures, | |
| fileName | objectSlices<br/>userDefinedTypes | | |
| fullName | objectSlices | |
| name | objectSlices.usages.targetObj<br/>objectSlices.usages.definedBy<br/>userDefinedTypes.fields | |
| purl | | reachables.purls<br/>reachables.flows.tags |
| resolvedMethod | objectSlices.usages.targetObj<br/>objectSlices.usages.definedBy<br/>objectSlices.usages.argToCalls<br/>objectSlices.usages.invokedCalls<br/>userDefinedTypes.procedures | |
| signature | objectSlices | | | |

#### Searching reachables for package name/version

This option filters reachables to the given package name and version in the format of name:version

`--package mypackage:1.0.0`

#### Criteria syntax

Expand Down Expand Up @@ -237,6 +247,37 @@ Query using filter command to target by both filename and line number range

`filter -i usages.slices -t js -c filename=server.ts -e "query-endpoints -f 50-70"`

### Check Reachable

The check-reachable command takes either a package:version or filename:line_number/line_number_range

`check-reachable -i reachable_slice.json -p colors:1.0.0`
`check-reachable -i reachable_slice.json -p @colors/colors:1.0.0`
`check-reachable -i reachable_slice.json -l file:20`
`check-reachable -i reachable_slice.json -l file:20-40`

```
Description:
Find out if there are hits for a given package:version or file:linenumber in an atom slice.
Usage:
check-reachable [options]
Options:
-i, --input-slice=INPUT-SLICE Slice file
-p, --pkg=PKG Package to search for in the format of <package_name>:<version>
-l, --location=LOCATION Filename with line number to search for in the format of <filename>:<linenumber>
-h, --help Display help for the given command. When no command is given display help for the list command.
-q, --quiet Do not output any message.
-V, --version Display this application version.
--ansi Force ANSI output.
--no-ansi Disable ANSI output.
-n, --no-interaction Do not ask any interactive question.
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug.
Help:
The check-reachables command checks for reachable flows for a package:version or file:linenumber in an atom slice.
```

### Validate Lines

Expand Down
2 changes: 1 addition & 1 deletion atom_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
A cli, classes and functions for converting an atom slice to a different format
"""
__version__ = '0.5.5'
__version__ = '0.6.0'
1 change: 1 addition & 0 deletions atom_tools/cli/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def _load() -> Command:
'convert',
'filter',
'query-endpoints',
'check-reachable',
'validate-lines',
]

Expand Down
64 changes: 64 additions & 0 deletions atom_tools/cli/commands/check_reachable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# pylint: disable=R0801
"""Query Reachables Command for the atom-tools CLI."""
import logging

from cleo.helpers import option

from atom_tools.cli.commands.command import Command
from atom_tools.lib.slices import AtomSlice
from atom_tools.lib.utils import check_reachable


logger = logging.getLogger(__name__)


class CheckReachableCommand(Command):
"""
This command handles the conversion of an atom slice to a specified
destination format.
Attributes:
name (str): The name of the command.
description (str): The description of the command.
options (list): The list of options for the command.
help (str): The help message for the command.
Methods:
handle: Executes the command and performs the conversion.
"""

name = 'check-reachable'
description = ('Find out if there are hits for a given package:version or file:linenumber in '
'an atom slice.')
options = [
option(
'input-slice',
'i',
'Slice file',
flag=False,
value_required=True,
),
option(
'pkg',
'p',
'Package to search for in the format of <package_name>:<version>',
flag=False,
),
option(
'location',
'l',
'Filename with line number to search for in the format of <filename>:<linenumber>',
flag=False,
),
]
help = """Checks for reachable flows for a pkg:version or file:linenumber in an atom slice."""

loggers = ['atom_tools.lib.filtering', 'atom_tools.lib.regex_utils', 'atom_tools.lib.slices',
'atom_tools.lib.utils']

def handle(self):
"""
Executes the query command and performs the search.
"""
atom_slice = AtomSlice(self.option('input-slice'))
print(check_reachable(atom_slice.content, self.option('pkg'), self.option('location')))
7 changes: 7 additions & 0 deletions atom_tools/cli/commands/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ class FilterCommand(Command):
'expression. Please see documentation for syntax.',
flag=False,
),
option(
'package-version',
'p',
description='Filter a reachables slice based on a package name and version in format '
'package:version. May include multiple separated by a comma.',
flag=False,
),
option(
'outfile',
'o',
Expand Down
2 changes: 1 addition & 1 deletion atom_tools/cli/commands/query_endpoints.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# pylint: disable=R0801
"""Query Command for the atom-tools CLI."""
"""Query Endpoints Command for the atom-tools CLI."""
import logging

from cleo.helpers import option
Expand Down
87 changes: 81 additions & 6 deletions atom_tools/lib/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


logger = logging.getLogger(__name__)
filtering = FilteringPatternCollection()
patterns = FilteringPatternCollection()


@dataclass
Expand Down Expand Up @@ -62,7 +62,7 @@ def filter_slice(self) -> Dict:
if self.slc.slice_type == 'usages':
return self.filter_usages()
if self.slc.slice_type == 'reachables':
return self.filter_reachables()
return self.filter_usages()
raise ValueError(f'Unknown slice type: {self.slc.slice_type}')

def filter_usages(self) -> Dict:
Expand Down Expand Up @@ -114,10 +114,10 @@ def _process_slice_indexes(self) -> Dict:
include_indexes = set()
exclude_indexes = set()
for k in self.results:
if matched := filtering.top_level_flat_loc_index.search(k):
if matched := patterns.top_level_flat_loc_index.search(k):
include_indexes.add(matched)
for k in self.negative_results:
if matched := filtering.top_level_flat_loc_index.search(k):
if matched := patterns.top_level_flat_loc_index.search(k):
exclude_indexes.add(matched)
return self._exclude_indexes(include_indexes, exclude_indexes)

Expand Down Expand Up @@ -152,13 +152,19 @@ def _search_values_fuzzy(self, f: AttributeFilter) -> None:
self._process_fuzzy_results(f, result)


def check_reachable_purl(data: Dict, purl: str) -> bool:
"""Checks if purl is reachable"""
purls = enumerate_reachable_purls(data)
return purl.lower() in purls


def create_attribute_filter(key: str, value: str, fuzz_pct: int | None) -> Tuple:
"""Create an attribute filter"""
lns = ()
fn_only = False
if key.lower() == 'filename' and '/' not in value and '\\' not in value:
if (key.lower() in {'filename', 'parentfilename'}) and '/' not in value and '\\' not in value:
fn_only = True
if ':' in value and (match := filtering.attribute_and_line.search(value)):
if ':' in value and (match := patterns.attribute_and_line.search(value)):
value = match.group('attrib')
lns = get_ln_range(match.group('line_nums'))
if fuzz_pct:
Expand All @@ -170,6 +176,40 @@ def create_attribute_filter(key: str, value: str, fuzz_pct: int | None) -> Tuple
return new_value, lns, fn_only


def create_purl_map(data: Dict) -> Dict:
"""Map purls to package:version strings"""
purls = set(patterns.jmespath_purls.search(data))
purl_dict = {}
for purl in purls:
formatted_purls = parse_purl(purl)
for p in formatted_purls:
purl_dict[p] = purl
return purl_dict


def enumerate_reachable_purls(data: Dict) -> Set[str]:
"""Enumerate reachable purls"""
all_purls = set(patterns.jmespath_purls.search(data))
purls = []
for purl in all_purls:
purls.extend(parse_purl(purl))
return set(purls)


def filter_flows(reachables: List[Dict], filename: str, ln: Tuple[int, int]) -> bool:
"""Filters flows"""
if not reachables:
return False
for flows in reachables:
for f in flows.get('flows', []):
num = f.get('lineNumber')
if num and num not in ln:
continue
if f.get('parentFileName').endswith(filename):
return True
return False


def get_ln_range(value: str) -> Tuple[int, int] | Tuple:
"""
Extracts line numbers from arguments and returns a tuple of (start, end)
Expand All @@ -195,3 +235,38 @@ def parse_filters(filter_options: str) -> Generator[Tuple[str, str, str], None,
if condition == '=':
condition = '=='
yield target, value, condition


def parse_purl_pkgs(match: re.Match) -> List[str]:
"""Extract package and version variations from purl"""
pkgs = [match.group('p1')]
pkgs.append(match.group('p2'))
pkgs = list(set(pkgs))
for i, p in enumerate(pkgs):
pkgs[i] = p.replace('pypi/', '').replace('npm/', '').replace('%40', '@') # type: ignore
return pkgs


def parse_purl_versions(match: re.Match) -> List[str]:
"""Returns a list of version variations from a purl"""
versions = {match.group('v1')}
versions.add(match.group('v2'))
if match.group('ext'):
versions.add(f"{match.group('v1')}{match.group('ext')}")
versions.add(f"{match.group('v2')}{match.group('ext')}")
return list(versions)


def parse_purl(purl: str) -> List[str]:
"""Returns a list of permutations of pkg:version from a purl"""
purl = patterns.purl_trailing_version.sub('', purl)
result: List[str] = []
pkgs: List[str] = []
versions: List[str] = []
if match := patterns.purl_version.search(purl):
versions = parse_purl_versions(match)
if match := patterns.purl_pkg.search(purl):
pkgs = parse_purl_pkgs(match)
for i in pkgs:
result.extend(f"{i}:{j}" for j in versions)
return list(set(result))
6 changes: 6 additions & 0 deletions atom_tools/lib/regex_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dataclasses import dataclass
from typing import Tuple, List, Dict, Any

import jmespath

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -71,6 +72,11 @@ class FilteringPatternCollection:
'{objectSlices: objectSlices[?ATTRIBUTECONDITION`TARGET_VALUE`], '
'userDefinedTypes: userDefinedTypes[?ATTRIBUTECONDITION`TARGET_VALUE`]}'
)
jmespath_purls = jmespath.compile('reachables[].purls[]')
purl_pkg = re.compile(r'(?P<p1>[^/:]+/(?P<p2>[^/]+))(?:(?:.|/)v\d+)?(?=@)')
purl_trailing_version = re.compile(r'(?:.|/)v\d+(?=@)')
purl_version = re.compile(r'(?<=@)(?P<v1>v?(?P<v2>[\d.]+){1,3})(?P<ext>[^?\s]+)?')
filename = re.compile(r'[^/]+(?!/)')


def py_helper(endpoint: str, regex: OpenAPIRegexCollection) -> Tuple[str, List[Dict]]:
Expand Down
11 changes: 7 additions & 4 deletions atom_tools/lib/slices.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@

import json_flatten # type: ignore

from atom_tools.lib.regex_utils import FilteringPatternCollection


logger = logging.getLogger(__name__)
patterns = FilteringPatternCollection()


def create_flattened_dicts(data: Dict) -> Dict[str, Dict]:
def create_attrib_dicts(data: Dict) -> Dict[str, Dict]:
"""Creates a flattened slice and individual attribute dictionaries."""
attributes: Dict[str, Dict] = {
'filename': {},
Expand All @@ -27,7 +30,7 @@ def create_flattened_dicts(data: Dict) -> Dict[str, Dict]:
}

for k, v in data.items():
if 'fileName' in k:
if 'fileName' in k or 'parentFileName' in k:
attributes['filename'] = process_attrib_dict(attributes['filename'], k, v)
elif 'fullName' in k:
attributes['fullname'] = process_attrib_dict(attributes['fullname'], k, v)
Expand All @@ -48,7 +51,7 @@ def import_flat_slice(content: Dict) -> Dict[str, Dict]:
Import a slice from a JSON file.
Args:
filename (str): The path to the JSON file.
content (dict): The contents of the JSON file
Returns:
tuple[dict, str]: The contents of the JSON file and the type of slice
Expand All @@ -62,7 +65,7 @@ def import_flat_slice(content: Dict) -> Dict[str, Dict]:
If the JSON file is not a valid slice, a warning is logged.
"""
content = json_flatten.flatten(content)
return create_flattened_dicts(content)
return create_attrib_dicts(content)


def import_slice(filename: str | Path) -> Tuple[Dict, str, str]:
Expand Down
Loading

0 comments on commit 264c1d6

Please sign in to comment.