Skip to content

Commit

Permalink
feat: converted ipc plugin to new base class
Browse files Browse the repository at this point in the history
also added new jinja filter for sorting dict lists by key
  • Loading branch information
jstucke committed Sep 13, 2024
1 parent a928cdb commit a6093ed
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 96 deletions.
105 changes: 63 additions & 42 deletions src/plugins/analysis/ipc/code/ipc_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,83 @@
import json
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any, List, Union

from docker.types import Mount
from pydantic import BaseModel, Field
from semver import Version

from analysis.PluginBase import AnalysisBasePlugin
from analysis.plugin import AnalysisPluginV0
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin
from helperFunctions.docker import run_docker_container

if TYPE_CHECKING:
from objects.file import FileObject
from io import FileIO

DOCKER_IMAGE = 'ipc'


class AnalysisPlugin(AnalysisBasePlugin):
"""
Inter-Process Communication Analysis
"""
class FunctionCall(BaseModel):
name: str = Field(
# Refer to sink_function_names in ../docker/ipc_analyzer/ipy_analyzer.py for a list of supported functions
description='The name of the function.',
)
target: Union[str, int] = Field(
description=(
'The first argument of the function call. '
'For all supported functions, this is either a pathname or a file descriptor.'
),
)
arguments: List[Any] = Field(
description=(
'The remaining arguments of the function call. Arguments of type `char*` are rendered as strings. '
'Arguments of type `char**` are rendered as array of strings. Integer arrays are rendered as such. '
'Everything else is rendered as integer.'
)
)

NAME = 'ipc_analyzer'
DESCRIPTION = 'Inter-Process Communication Analysis'
VERSION = '0.1.1'
FILE = __file__

MIME_WHITELIST = [ # noqa: RUF012
'application/x-executable',
'application/x-object',
'application/x-sharedlib',
]
DEPENDENCIES = ['file_type'] # noqa: RUF012
TIMEOUT = 600 # 10 minutes
class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
class Schema(BaseModel):
calls: List[FunctionCall] = Field(description='An array of IPC function calls.')

def _run_ipc_analyzer_in_docker(self, file_object: FileObject) -> dict:
def __init__(self):
metadata = self.MetaData(
name='ipc_analyzer',
dependencies=['file_type'],
description='Inter-Process Communication Analysis',
mime_whitelist=[
'application/x-executable',
'application/x-object',
'application/x-pie-executable',
'application/x-sharedlib',
],
timeout=600,
version=Version(1, 0, 0),
Schema=self.Schema,
)
super().__init__(metadata=metadata)

def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]) -> Schema:
del virtual_file_path, analyses
output = self._run_ipc_analyzer_in_docker(file_handle)
# output structure: { 'target': [{'type': 'type', 'arguments': [...]}, ...], ...}
# we need to restructure this a bit so it lines up with the Schema
calls = [
{'target': target, 'name': call_dict['type'], 'arguments': call_dict['arguments']}
for target, call_list in output['ipcCalls'].items()
for call_dict in call_list
]
return self.Schema.model_validate({'calls': calls})

def _run_ipc_analyzer_in_docker(self, file_handle: FileIO) -> dict:
with tempfile.TemporaryDirectory() as tmp_dir:
path = Path(file_handle.name).absolute()
folder = Path(tmp_dir) / 'results'
mount = f'/input/{file_object.file_name}'
mount = f'/input/{path.name}'
if not folder.exists():
folder.mkdir()
output = folder / f'{file_object.file_name}.json'
output = folder / f'{path.name}.json'
output.write_text(json.dumps({'ipcCalls': {}}))
run_docker_container(
DOCKER_IMAGE,
Expand All @@ -49,28 +88,10 @@ def _run_ipc_analyzer_in_docker(self, file_object: FileObject) -> dict:
command=f'{mount} /results/',
mounts=[
Mount('/results/', str(folder.resolve()), type='bind'),
Mount(mount, file_object.file_path, type='bind'),
Mount(mount, str(path), type='bind'),
],
)
return json.loads(output.read_text())

def _do_full_analysis(self, file_object: FileObject) -> FileObject:
output = self._run_ipc_analyzer_in_docker(file_object)
file_object.processed_analysis[self.NAME] = {
'full': output,
'summary': self._create_summary(output['ipcCalls']),
}
return file_object

def process_object(self, file_object: FileObject) -> FileObject:
"""
This function handles only ELF executables. Otherwise, it returns an empty dictionary.
It calls the ipc docker container.
"""
return self._do_full_analysis(file_object)

@staticmethod
def _create_summary(output: dict) -> list[str]:
# output structure: { 'target': [{'type': 'type', 'arguments': [...]}, ...], ...}
summary = {entry['type'] for result_list in output.values() for entry in result_list}
return sorted(summary)
def summarize(self, result: Schema) -> list[str]:
return sorted({call.name for call in result.calls})
36 changes: 19 additions & 17 deletions src/plugins/analysis/ipc/test/test_ipc_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,31 @@

import pytest

from objects.file import FileObject

from ..code.ipc_analyzer import AnalysisPlugin

TEST_DIR = Path(__file__).parent / 'data'


EXPECTED_SYSTEM_RESULT = {
'whoami': [{'type': 'system', 'arguments': ['']}],
'ls': [{'type': 'system', 'arguments': ['-l']}],
'echo': [{'type': 'system', 'arguments': ['hello']}],
'id': [{'type': 'system', 'arguments': ['']}],
'pwd': [{'type': 'system', 'arguments': ['']}],
'calls': [
{'arguments': [''], 'name': 'system', 'target': 'whoami'},
{'arguments': ['-l'], 'name': 'system', 'target': 'ls'},
{'arguments': ['hello'], 'name': 'system', 'target': 'echo'},
{'arguments': [''], 'name': 'system', 'target': 'id'},
{'arguments': [''], 'name': 'system', 'target': 'pwd'},
]
}

EXPECTED_WRITE_RESULT = {
'data.dat': [
{'type': 'open', 'arguments': ['', ['O_RDWR | O_CREAT'], ['0666L']]},
'calls': [
{'arguments': ['', ['O_RDWR | O_CREAT'], ['0666L']], 'name': 'open', 'target': 'data.dat'},
{
'type': 'write',
'arguments': [
'',
['Now is the winter of our discontent\\nMade glorious summer by this sun of York\\n'],
['Now is the winter of our discontent\\nMade ' 'glorious summer by this sun of York\\n'],
[77],
],
'name': 'write',
'target': 'data.dat',
},
]
}
Expand All @@ -40,8 +40,10 @@
('ipc_shared_files_test_bin', EXPECTED_WRITE_RESULT, ['open', 'write']),
],
)
def test_ipc_system(analysis_plugin, test_file, expected_result, expected_summary):
test_object = FileObject(file_path=str((TEST_DIR / test_file).resolve()))
result = analysis_plugin.process_object(test_object)
assert result.processed_analysis['ipc_analyzer']['full']['ipcCalls'] == expected_result
assert result.processed_analysis['ipc_analyzer']['summary'] == expected_summary
def test_ipc_analyze_summary(analysis_plugin, test_file, expected_result, expected_summary):
with (TEST_DIR / test_file).open('rb') as fp:
result = analysis_plugin.analyze(fp, {}, {})
as_dict = result.model_dump()
assert as_dict == expected_result
summary = analysis_plugin.summarize(result)
assert summary == expected_summary
79 changes: 43 additions & 36 deletions src/plugins/analysis/ipc/view/ipc_analyzer.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,47 @@

{% block analysis_result_details %}

<table class="table table-bordered">
<colgroup>
<col style="width: 50px">
<col style="width: 150px">
<col style="width: 600px">
</colgroup>
<tbody class="table-analysis">
<tr>
<td class="table-head-light"><b>Target</b></td>
<td class="table-head-light"><b>Type</b></td>
<td class="table-head-light"><b>Arguments</b></td>
</tr>
{% set ipc_calls = analysis_result['full']['ipcCalls'] %}
{% for target in ipc_calls.keys()|sort %}
{% set row_count = 1 + ipc_calls[target]|length %}
<tr>
<td rowspan={{ row_count }}>{{ target }}</td>
</tr>
{% for ipc_call in ipc_calls[target] %}
<tr>
<td>{{ ipc_call['type'] }}</td>
<td>
<ul class="m-0">
{% for arg in ipc_call['arguments'] %}
{% if arg %}
<li>{{ arg }}</li>
{% endif %}
{% endfor %}
</ul>
</td>
</tr>
{% endfor %}
{% endfor %}
</tbody>
</table>
<tr>
<td colspan="2" class="p-0">

{% endblock %}
<table class="table table-bordered mb-0">
<colgroup>
<col style="width: 50px">
<col style="width: 150px">
<col style="width: 600px">
</colgroup>
<thead class="table-head-light">
<tr>
<th>Type</th>
<th>Target</th>
<th>Arguments</th>
</tr>
</thead>
<tbody>
{% for type, call_list in (analysis_result['calls'] | group_dict_list_by_key('name')).items() %}
{% set row_count = 1 + call_list | length %}
<tr>
<td rowspan="{{ row_count }}" style="font-family: monospace;">{{ type }}</td>
</tr>
{% for call_dict in call_list | sort_dict_list('target') %}
<tr>
<td style="font-family: monospace;">{{ call_dict.target }}</td>
<td>
<ul class="m-0">
{% for arg in call_dict.arguments %}
{% if arg %}
<li style="font-family: monospace;">{{ arg }}</li>
{% endif %}
{% endfor %}
</ul>
</td>
</tr>
{% endfor %}
{% endfor %}
</tbody>
</table>

</td>
</tr>

{% endblock %}
29 changes: 29 additions & 0 deletions src/test/unit/web_interface/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,17 @@ def test_get_unique_keys_from_list_of_dicts(list_of_dicts, expected_result):
assert flt.get_unique_keys_from_list_of_dicts(list_of_dicts) == expected_result


@pytest.mark.parametrize(
('list_of_dicts', 'key', 'expected_result'),
[
([], '', {}),
([{'a': '1'}, {'a': '1'}, {'a': '2'}], 'a', {'1': [{'a': '1'}, {'a': '1'}], '2': [{'a': '2'}]}),
],
)
def test_group_dict_list_by_key(list_of_dicts, key, expected_result):
assert flt.group_dict_list_by_key(list_of_dicts, key) == expected_result


@pytest.mark.parametrize(
('function', 'input_data', 'expected_output', 'error_message'),
[
Expand Down Expand Up @@ -488,3 +499,21 @@ def test_as_ascii_table():
)
def test_str_to_hex(input_, expected_result):
assert flt.str_to_hex(input_) == expected_result


@pytest.mark.parametrize(
('input_', 'expected_result'),
[
([], []),
([{'a': 2}, {'a': 1}, {'a': 3}], [{'a': 1}, {'a': 2}, {'a': 3}]),
([{'a': 2}, {'a': 1}, {'b': 3}], [{'b': 3}, {'a': 1}, {'a': 2}]),
([{'a': 'b'}, {'a': 'c'}, {'a': 'a'}], [{'a': 'a'}, {'a': 'b'}, {'a': 'c'}]),
],
)
def test_sort_dict_list_by_key(input_, expected_result):
assert flt.sort_dict_list_by_key(input_, 'a') == expected_result


def test_sort_dict_list_by_key_error():
with pytest.raises(ValueError, match='Dict values must be of the same type'):
flt.sort_dict_list_by_key([{'a': 2}, {'a': '1'}, {'a': {}}], 'a')
2 changes: 2 additions & 0 deletions src/web_interface/components/jinja_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ def _setup_filters(self): # noqa: PLR0915
self._app.jinja_env.filters['get_canvas_height'] = flt.get_canvas_height
self._app.jinja_env.filters['get_searchable_crypto_block'] = flt.get_searchable_crypto_block
self._app.jinja_env.filters['get_unique_keys_from_list_of_dicts'] = flt.get_unique_keys_from_list_of_dicts
self._app.jinja_env.filters['group_dict_list_by_key'] = flt.group_dict_list_by_key
self._app.jinja_env.filters['hex'] = hex
self._app.jinja_env.filters['hide_dts_binary_data'] = flt.hide_dts_binary_data
self._app.jinja_env.filters['infection_color'] = flt.infection_color
Expand Down Expand Up @@ -233,6 +234,7 @@ def _setup_filters(self): # noqa: PLR0915
self._app.jinja_env.filters['sort_chart_list_by_value'] = flt.sort_chart_list_by_value
self._app.jinja_env.filters['sort_comments'] = flt.sort_comments
self._app.jinja_env.filters['sort_cve'] = flt.sort_cve_results
self._app.jinja_env.filters['sort_dict_list'] = flt.sort_dict_list_by_key
self._app.jinja_env.filters['sort_privileges'] = lambda privileges: sorted(
privileges, key=lambda role: len(privileges[role]), reverse=True
)
Expand Down
16 changes: 15 additions & 1 deletion src/web_interface/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from re import Match
from string import ascii_letters
from time import localtime, strftime, struct_time, time
from typing import Iterable, Union
from typing import Any, Iterable, Union

import packaging.version
import semver
Expand Down Expand Up @@ -366,6 +366,13 @@ def get_unique_keys_from_list_of_dicts(list_of_dicts: list[dict]):
return unique_keys


def group_dict_list_by_key(dict_list: list[dict], key: Any) -> dict[str, list[dict]]:
result = {}
for dictionary in dict_list:
result.setdefault(dictionary.get(key), []).append(dictionary)
return result


def random_collapse_id():
return ''.join(random.choice(ascii_letters) for _ in range(10))

Expand Down Expand Up @@ -433,6 +440,13 @@ def _cve_score_to_float(score: float | str) -> float:
return 0.0


def sort_dict_list_by_key(dict_list: list[dict], key: Any) -> list[dict]:
types = {type(d.get(key)) for d in dict_list if key in d}
if types not in [set(), {str}, {int}]:
raise ValueError(f'Dict values must be of the same type, not {types}')
return sorted(dict_list, key=lambda d: d.get(key, -9999 if types == {int} else ''))


def linter_reformat_issues(issues) -> dict[str, list[dict[str, str]]]:
reformatted = defaultdict(list, {})
for issue in issues:
Expand Down

0 comments on commit a6093ed

Please sign in to comment.