Skip to content

Commit

Permalink
Misc: Exclusion list, add targetObj line numbers, standardize input s…
Browse files Browse the repository at this point in the history
…lice param (#35)

* Ignore case in regex rather than using lower().

Signed-off-by: Caroline Russell <caroline@appthreat.dev>

* Add exclusions list.

Signed-off-by: Caroline Russell <caroline@appthreat.dev>

* Standardize input slice for filtering.

Signed-off-by: Caroline Russell <caroline@appthreat.dev>

* Fix NoneType, check for X- paths to exclude.

Signed-off-by: Caroline Russell <caroline@appthreat.dev>

* Bump version, linting.

Signed-off-by: Caroline Russell <caroline@appthreat.dev>

* Improve jmespath expression.

Signed-off-by: Caroline Russell <caroline@appthreat.dev>

---------

Signed-off-by: Caroline Russell <caroline@appthreat.dev>
  • Loading branch information
cerrussell authored Mar 13, 2024
1 parent d048a69 commit 74a41dc
Show file tree
Hide file tree
Showing 11 changed files with 764 additions and 632 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Usage:
Options:
-f, --format=FORMAT Destination format [default: "openapi3.0.1"]
-u, --usages-slice=USAGES-SLICE Usages slice file
-i, --input-slice=INPUT-SLICE Usages slice file
-t, --type=TYPE Origin type of source on which the atom slice was generated. [default: "java"]
-o, --output-file=OUTPUT-FILE Output file [default: "openapi_from_slice.json"]
-s, --server=SERVER The server url to be included in the server object.
Expand All @@ -84,7 +84,7 @@ Help:
```

**Example**
>`atom-tools convert -u usages.slices.json -f openapi3.0.1 -o openapi_usages.json -t java -s https://myserver.com`
>`atom-tools convert -i usages.slices.json -f openapi3.0.1 -o openapi_usages.json -t java -s https://myserver.com`

### Validate Lines
Expand Down
2 changes: 1 addition & 1 deletion atom_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""
A cli, classes and functions for converting an atom slice to a different format
"""
__version__ = '0.4.1'
__version__ = '0.4.2'
10 changes: 9 additions & 1 deletion atom_tools/cli/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,15 @@ def __init__(self) -> None:
self.set_command_loader(command_loader)

@staticmethod
def register_command_loggers(event: Event, event_name: str, _: EventDispatcher) -> None:
def register_command_loggers(event: Event, event_name: str, _: EventDispatcher) -> None: # pylint: disable=unused-argument
"""
Registers the command loggers.
Args:
event (Event): The event.
event_name (str): The event name.
_: EventDispatcher: The event dispatcher.
"""
assert isinstance(event, ConsoleCommandEvent)
command = event.command
if not isinstance(command, Command):
Expand Down
2 changes: 1 addition & 1 deletion atom_tools/cli/commands/command.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# pylint: disable=missing-class-docstring,missing-function-docstring,missing-module-docstring
# pylint: disable-all
from __future__ import annotations

from typing import ClassVar
Expand Down
6 changes: 3 additions & 3 deletions atom_tools/cli/commands/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class ConvertCommand(Command):
default='openapi3.0.1',
),
option(
'usages-slice',
'u',
'input-slice',
'i',
'Usages slice file',
flag=False,
default=None,
Expand Down Expand Up @@ -88,7 +88,7 @@ def handle(self):
converter = OpenAPI(
self.option('format'),
self.option('type'),
self.option('usages-slice'),
self.option('input-slice'),
)

if not (result := converter.endpoints_to_openapi(self.option('server'))):
Expand Down
2 changes: 2 additions & 0 deletions atom_tools/cli/logging_config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# pylint: disable-all
# Adapted from poetry
from __future__ import annotations

import sys
Expand Down
136 changes: 90 additions & 46 deletions atom_tools/lib/converter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Classes and functions used to convert slices.
"""
import contextlib
import json.encoder
import logging
import re
Expand All @@ -21,6 +22,8 @@

logger = logging.getLogger(__name__)
regex = OpenAPIRegexCollection()
exclusions = ['/content-type', '/application/javascript', '/application/json', '/application/text',
'/application/xml', '/*', '/*/*', '/allow']


class OpenAPI:
Expand All @@ -42,6 +45,7 @@ class OpenAPI:
Methods:
_create_ln_entries: Creates an x-atom-usages entry.
_identify_target_line_nums: Identifies targetObj line numbers.
_filter_matches: Filters a list of matches based on certain criteria.
_js_helper: Formats path sections which are parameters correctly.
_process_methods_helper: Utility for process_methods.
Expand Down Expand Up @@ -75,6 +79,7 @@ def __init__(
self.file_endpoint_map: Dict = {}
self.params: Dict[str, List[Dict]] = {}
self.regex_param_count = 0
self.target_line_nums: Dict[str, Dict] = {}

def endpoints_to_openapi(self, server: str = '') -> Any:
"""
Expand All @@ -95,19 +100,19 @@ def create_file_to_method_dict(self, method_map):
"""
Creates a dictionary of endpoints and methods.
"""
full_names = list(method_map.get('full_names').keys())
file_endpoint_map = {i: [] for i in full_names}
for full_name in full_names:
for values in method_map['full_names'][full_name]['resolved_methods'].values():
file_names = list(method_map.get('file_names').keys())
file_endpoint_map = {i: [] for i in file_names}
for full_name in file_names:
for values in method_map['file_names'][full_name]['resolved_methods'].values():
file_endpoint_map[full_name].extend(values.get('endpoints'))
for k, v in file_endpoint_map.items():
filename = k.split(':')[0]
# filename = k.split(':')[0]
endpoints = set(v)
for i in endpoints:
if self.file_endpoint_map.get(i):
self.file_endpoint_map[i].add(filename)
self.file_endpoint_map[i].add(k)
else:
self.file_endpoint_map[i] = {filename}
self.file_endpoint_map[i] = {k}
self.file_endpoint_map = {k: list(v) for k, v in self.file_endpoint_map.items()}

def convert_usages(self) -> Dict[str, Any]:
Expand All @@ -117,9 +122,29 @@ def convert_usages(self) -> Dict[str, Any]:
methods = self.process_methods()
methods = self.methods_to_endpoints(methods)
self.create_file_to_method_dict(methods)
self._identify_target_line_nums(methods)
methods = self.process_calls(methods)
return self.populate_endpoints(methods)

def _identify_target_line_nums(self, methods):
file_names = list(methods['file_names'].keys())
if not file_names:
return
conditional = [f'fileName==`{json.dumps(i)}`' for i in file_names]
conditional = '*[?' + ' || '.join(conditional) + (
'][].{file_name: fileName, methods: usages[].targetObj[].{resolved_method: '
'resolvedMethod || callName || code || name, line_number: lineNumber}}')
pattern = jmespath.compile(conditional)
result = pattern.search(self.usages.content)
result = {i['file_name']: i['methods'] for i in result if i['methods']}
targets = {i: {} for i in result}

for k, v in result.items():
for i in v:
targets[k] |= {i['resolved_method']: i['line_number']}

self.target_line_nums = targets

def generic_params_helper(self, endpoint: str, orig_endpoint: str) -> List[Dict[str, Any]]:
"""
Extracts generic path parameters from the given endpoint.
Expand All @@ -145,17 +170,17 @@ def generic_params_helper(self, endpoint: str, orig_endpoint: str) -> List[Dict[

def process_methods(self) -> Dict[str, List[str]]:
"""
Create a dictionary of full names and their corresponding methods.
Create a dictionary of file names and their corresponding methods.
"""
method_map = self._process_methods_helper(
'objectSlices[].{full_name: fullName, resolved_methods: usages[].*.resolvedMethod[]}')
'objectSlices[].{file_name: fileName, resolved_methods: usages[].*.resolvedMethod[]}')

calls = self._process_methods_helper(
'objectSlices[].{full_name: fullName, resolved_methods: usages[].*[?resolvedMethod][]'
'objectSlices[].{file_name: fileName, resolved_methods: usages[].*[?resolvedMethod][]'
'[].resolvedMethod[]}')

user_defined_types = self._process_methods_helper(
'userDefinedTypes[].{full_name: name, resolved_methods: fields[].name}')
'userDefinedTypes[].{file_name: name, resolved_methods: fields[].name}')

for key, value in calls.items():
if method_map.get(key):
Expand All @@ -174,54 +199,54 @@ def process_methods(self) -> Dict[str, List[str]]:

return method_map

def query_calls(self, full_name: str, resolved_methods: List[str]) -> List:
def query_calls(self, file_name: str, resolved_methods: List[str]) -> List:
"""
Query calls for the given function name and resolved methods.
Args:
full_name (str): The name of the function to query calls for.
file_name (str): The name of the function to query calls for.
resolved_methods (list[str]): List of resolved methods.
Returns:
list[dict]: List of invoked calls and argument to calls.
"""
result = self._query_calls_helper(full_name)
result = self._query_calls_helper(file_name)
calls = []
for call in result:
m = call.get('resolvedMethod', '')
if m and m in resolved_methods:
calls.append(call)
return calls

def _query_calls_helper(self, full_name: str) -> List[Dict]:
def _query_calls_helper(self, file_name: str) -> List[Dict]:
"""
A function to help query calls.
Args:
full_name (str): The name of the function to query calls for.
file_name (str): The name of the function to query calls for.
Returns:
list: The result of searching for the calls pattern in the usages.
"""
pattern = f'objectSlices[?fullName==`{json.dumps(full_name)}`].usages[].*[?callName][][]'
pattern = f'objectSlices[?fileName==`{json.dumps(file_name)}`].usages[].*[?callName][][]'
compiled_pattern = jmespath.compile(pattern)
return compiled_pattern.search(self.usages.content)

def process_calls(self, method_map: Dict) -> Dict[str, Any]:
"""
Process calls and return a new method map.
Args:
method_map (dict): A mapping of full names to resolved methods.
method_map (dict): A mapping of file names to resolved methods.
Returns:
dict: A new method map containing calls.
"""
for full_name, resolved_methods in method_map['full_names'].items():
if res := self.query_calls(full_name, resolved_methods['resolved_methods'].keys()):
for file_name, resolved_methods in method_map['file_names'].items():
if res := self.query_calls(file_name, resolved_methods['resolved_methods'].keys()):
mmap = self.filter_calls(res, resolved_methods)
else:
mmap = self.filter_calls([], resolved_methods)

method_map['full_names'][full_name]['resolved_methods'] = mmap.get('resolved_methods')
method_map['file_names'][file_name]['resolved_methods'] = mmap.get('resolved_methods')

return method_map

Expand Down Expand Up @@ -260,10 +285,10 @@ def methods_to_endpoints(self, method_map: Dict[str, Any]) -> Dict[str, Any]:
Returns:
dict: A new method map containing endpoints.
"""
new_method_map: Dict = {'full_names': {}}
for full_name, resolved_methods in method_map.items():
new_method_map: Dict = {'file_names': {}}
for file_name, resolved_methods in method_map.items():
if new_resolved := self.process_resolved_methods(resolved_methods):
new_method_map['full_names'][full_name] = {
new_method_map['file_names'][file_name] = {
'resolved_methods': new_resolved
}

Expand Down Expand Up @@ -298,16 +323,18 @@ def _process_methods_helper(self, pattern: str) -> Dict[str, Any]:
"""
dict_resolved_pattern = jmespath.compile(pattern)
result = [
i for i in dict_resolved_pattern.search(self.usages.content)
if i.get('resolved_methods')
]
result = []
if dict_resolved_pattern:
result = [
i for i in dict_resolved_pattern.search(self.usages.content)
if i.get('resolved_methods')
]

resolved: Dict = {}
for r in result:
full_name = r['full_name']
file_name = r['file_name']
methods = r['resolved_methods']
resolved.setdefault(full_name, {'resolved_methods': []})[
resolved.setdefault(file_name, {'resolved_methods': []})[
'resolved_methods'].extend(methods)

return resolved
Expand Down Expand Up @@ -364,17 +391,21 @@ def create_paths_item(self, filename: str, paths_dict: Dict) -> Dict:
"""
endpoints = paths_dict[1].get('endpoints')
calls = paths_dict[1].get('calls')
line_numbers = paths_dict[1].get('line_nos')
call_line_numbers = paths_dict[1].get('line_nos')
target_line_number = None
if self.target_line_nums:
with contextlib.suppress(KeyError):
target_line_number = self.target_line_nums[filename][paths_dict[0]]

paths_object: Dict = {}

for ep in set(endpoints):
if ep.startswith('/ftp(?!/quarantine)'):
print('found')
ep, paths_item_object = self._paths_object_helper(
calls,
ep,
filename,
line_numbers,
call_line_numbers,
target_line_number
)
if paths_object.get(ep):
paths_object[ep] |= paths_item_object
Expand All @@ -384,7 +415,13 @@ def create_paths_item(self, filename: str, paths_dict: Dict) -> Dict:
return self._remove_nested_parameters(paths_object)

def _paths_object_helper(
self, calls: List, ep: str, filename: str, line_numbers: List) -> Tuple[str, Dict]:
self,
calls: List,
ep: str,
filename: str,
call_line_numbers: List,
line_number: int | None
) -> Tuple[str, Dict]:
"""
Creates a paths item object.
"""
Expand All @@ -401,9 +438,11 @@ def _paths_object_helper(
if calls:
for call in calls:
paths_item_object |= self.calls_to_params(ep, orig_ep, call)
if line_numbers and (line_nos := self._create_ln_entries(
filename, list(set(line_numbers)))):
if (call_line_numbers or line_number) and (line_nos := self._create_ln_entries(
filename, list(set(call_line_numbers)), line_number)):
paths_item_object |= line_nos
# if line_number:
# paths_item_object['x-atom-usages-target'] = {filename: line_number}
return ep, paths_item_object

def _extract_params(self, ep: str) -> Tuple[str, bool, List]:
Expand All @@ -417,19 +456,24 @@ def _extract_params(self, ep: str) -> Tuple[str, bool, List]:
return ep, py_special_case, tmp_params

@staticmethod
def _create_ln_entries(filename, line_numbers):
def _create_ln_entries(filename, call_line_numbers, line_numbers):
"""
Creates line number entries for a given filename and line numbers.
Args:
filename (str): The name of the file.
line_numbers (list): A list of line numbers.
call_line_numbers (list): A list of line numbers.
Returns:
dict: A dictionary containing line number entries.
"""
fn = filename.split(':')[0]
return {'x-atom-usages': {fn: line_numbers}}
x_atom = {'x-atom-usages': {}}
if call_line_numbers:
x_atom['x-atom-usages']['call'] = {fn: call_line_numbers}
if line_numbers:
x_atom['x-atom-usages']['target'] = {fn: line_numbers}
return x_atom

@staticmethod
def _remove_nested_parameters(data: Dict) -> Dict[str, Dict | List]:
Expand Down Expand Up @@ -525,13 +569,13 @@ def extract_endpoints(self, method: str) -> List[str]:
list: A list of endpoints extracted from the code.
"""
endpoints: List[str] = []
if not method:
return endpoints
if not (matches := re.findall(regex.endpoints, method)):
return endpoints
if not method or not (matches := re.findall(regex.endpoints, method)):
return []
matches = self._filter_matches(matches, method)
return [v for v in matches if v]
return [
v for v in matches
if v and v not in exclusions and not v.lower().startswith('/x-')
]

def _filter_matches(self, matches: List[str], code: str) -> List[str]:
"""
Expand Down
Loading

0 comments on commit 74a41dc

Please sign in to comment.