Skip to content

Commit

Permalink
feat: converted ipc plugin to new base class
Browse files Browse the repository at this point in the history
also added new jinja filter for sorting dict lists by key
  • Loading branch information
jstucke committed Sep 6, 2024
1 parent a928cdb commit 937bc7e
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 100 deletions.
92 changes: 50 additions & 42 deletions src/plugins/analysis/ipc/code/ipc_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,69 @@
import json
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, List, Union

from docker.types import Mount
from pydantic import BaseModel, Field
from semver import Version

from analysis.PluginBase import AnalysisBasePlugin
from analysis.plugin import AnalysisPluginV0
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin
from helperFunctions.docker import run_docker_container

if TYPE_CHECKING:
from objects.file import FileObject
from io import FileIO

DOCKER_IMAGE = 'ipc'


class AnalysisPlugin(AnalysisBasePlugin):
"""
Inter-Process Communication Analysis
"""
class IpcTarget(BaseModel):
name: str = Field(description='target of the IPC (usually a file path or address)')
ipc_calls: List[IpcCall]

NAME = 'ipc_analyzer'
DESCRIPTION = 'Inter-Process Communication Analysis'
VERSION = '0.1.1'
FILE = __file__

MIME_WHITELIST = [ # noqa: RUF012
'application/x-executable',
'application/x-object',
'application/x-sharedlib',
]
DEPENDENCIES = ['file_type'] # noqa: RUF012
TIMEOUT = 600 # 10 minutes
class IpcCall(BaseModel):
type: str = Field(description='a type of call (usually a POSIX system call)')
arguments: List[Union[str, int, List[Union[str, int]]]] = Field(description='arguments of the IPC call')

def _run_ipc_analyzer_in_docker(self, file_object: FileObject) -> dict:

class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
class Schema(BaseModel):
targets: List[IpcTarget] = Field(description='An array of targets of IPC calls')

def __init__(self):
metadata = self.MetaData(
name='ipc_analyzer',
dependencies=['file_type'],
description='Inter-Process Communication Analysis',
mime_whitelist=[
'application/x-executable',
'application/x-object',
'application/x-pie-executable',
'application/x-sharedlib',
],
timeout=600, # 10 minutes
version=Version(1, 0, 0),
Schema=self.Schema,
)
super().__init__(metadata=metadata)

def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]) -> Schema:
del virtual_file_path, analyses
output = self._run_ipc_analyzer_in_docker(file_handle)
# output structure: { 'target': [{'type': 'type', 'arguments': [...]}, ...], ...}
# we need to restructure this a bit so it lines up with the Schema
targets = [{'name': target, 'ipc_calls': target_info} for target, target_info in output['ipcCalls'].items()]
return self.Schema.model_validate({'targets': targets})

def _run_ipc_analyzer_in_docker(self, file_handle: FileIO) -> dict:
with tempfile.TemporaryDirectory() as tmp_dir:
path = Path(file_handle.name).absolute()
folder = Path(tmp_dir) / 'results'
mount = f'/input/{file_object.file_name}'
mount = f'/input/{path.name}'
if not folder.exists():
folder.mkdir()
output = folder / f'{file_object.file_name}.json'
output = folder / f'{path.name}.json'
output.write_text(json.dumps({'ipcCalls': {}}))
run_docker_container(
DOCKER_IMAGE,
Expand All @@ -49,28 +74,11 @@ def _run_ipc_analyzer_in_docker(self, file_object: FileObject) -> dict:
command=f'{mount} /results/',
mounts=[
Mount('/results/', str(folder.resolve()), type='bind'),
Mount(mount, file_object.file_path, type='bind'),
Mount(mount, str(path), type='bind'),
],
)
return json.loads(output.read_text())

def _do_full_analysis(self, file_object: FileObject) -> FileObject:
output = self._run_ipc_analyzer_in_docker(file_object)
file_object.processed_analysis[self.NAME] = {
'full': output,
'summary': self._create_summary(output['ipcCalls']),
}
return file_object

def process_object(self, file_object: FileObject) -> FileObject:
"""
This function handles only ELF executables. Otherwise, it returns an empty dictionary.
It calls the ipc docker container.
"""
return self._do_full_analysis(file_object)

@staticmethod
def _create_summary(output: dict) -> list[str]:
# output structure: { 'target': [{'type': 'type', 'arguments': [...]}, ...], ...}
summary = {entry['type'] for result_list in output.values() for entry in result_list}
return sorted(summary)
def summarize(self, result: Schema) -> list[str]:
ipc_types = {ipc_call.type for target in result.targets for ipc_call in target.ipc_calls}
return sorted(ipc_types)
48 changes: 27 additions & 21 deletions src/plugins/analysis/ipc/test/test_ipc_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,36 @@

import pytest

from objects.file import FileObject

from ..code.ipc_analyzer import AnalysisPlugin

TEST_DIR = Path(__file__).parent / 'data'


EXPECTED_SYSTEM_RESULT = {
'whoami': [{'type': 'system', 'arguments': ['']}],
'ls': [{'type': 'system', 'arguments': ['-l']}],
'echo': [{'type': 'system', 'arguments': ['hello']}],
'id': [{'type': 'system', 'arguments': ['']}],
'pwd': [{'type': 'system', 'arguments': ['']}],
'targets': [
{'name': 'whoami', 'ipc_calls': [{'type': 'system', 'arguments': ['']}]},
{'name': 'ls', 'ipc_calls': [{'type': 'system', 'arguments': ['-l']}]},
{'name': 'echo', 'ipc_calls': [{'type': 'system', 'arguments': ['hello']}]},
{'name': 'id', 'ipc_calls': [{'type': 'system', 'arguments': ['']}]},
{'name': 'pwd', 'ipc_calls': [{'type': 'system', 'arguments': ['']}]},
]
}

EXPECTED_WRITE_RESULT = {
'data.dat': [
{'type': 'open', 'arguments': ['', ['O_RDWR | O_CREAT'], ['0666L']]},
'targets': [
{
'type': 'write',
'arguments': [
'',
['Now is the winter of our discontent\\nMade glorious summer by this sun of York\\n'],
[77],
'name': 'data.dat',
'ipc_calls': [
{'type': 'open', 'arguments': ['', ['O_RDWR | O_CREAT'], ['0666L']]},
{
'type': 'write',
'arguments': [
'',
['Now is the winter of our discontent\\nMade glorious summer by this sun of York\\n'],
[77],
],
},
],
},
}
]
}

Expand All @@ -40,8 +44,10 @@
('ipc_shared_files_test_bin', EXPECTED_WRITE_RESULT, ['open', 'write']),
],
)
def test_ipc_system(analysis_plugin, test_file, expected_result, expected_summary):
test_object = FileObject(file_path=str((TEST_DIR / test_file).resolve()))
result = analysis_plugin.process_object(test_object)
assert result.processed_analysis['ipc_analyzer']['full']['ipcCalls'] == expected_result
assert result.processed_analysis['ipc_analyzer']['summary'] == expected_summary
def test_ipc_analyze_summary(analysis_plugin, test_file, expected_result, expected_summary):
with (TEST_DIR / test_file).open('rb') as fp:
result = analysis_plugin.analyze(fp, {}, {})
as_dict = result.model_dump()
assert as_dict == expected_result
summary = analysis_plugin.summarize(result)
assert summary == expected_summary
79 changes: 43 additions & 36 deletions src/plugins/analysis/ipc/view/ipc_analyzer.html
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,47 @@

{% block analysis_result_details %}

<table class="table table-bordered">
<colgroup>
<col style="width: 50px">
<col style="width: 150px">
<col style="width: 600px">
</colgroup>
<tbody class="table-analysis">
<tr>
<td class="table-head-light"><b>Target</b></td>
<td class="table-head-light"><b>Type</b></td>
<td class="table-head-light"><b>Arguments</b></td>
</tr>
{% set ipc_calls = analysis_result['full']['ipcCalls'] %}
{% for target in ipc_calls.keys()|sort %}
{% set row_count = 1 + ipc_calls[target]|length %}
<tr>
<td rowspan={{ row_count }}>{{ target }}</td>
</tr>
{% for ipc_call in ipc_calls[target] %}
<tr>
<td>{{ ipc_call['type'] }}</td>
<td>
<ul class="m-0">
{% for arg in ipc_call['arguments'] %}
{% if arg %}
<li>{{ arg }}</li>
{% endif %}
{% endfor %}
</ul>
</td>
</tr>
{% endfor %}
{% endfor %}
</tbody>
</table>
<tr>
<td colspan="2" class="p-0">

{% endblock %}
<table class="table table-bordered mb-0">
<colgroup>
<col style="width: 50px">
<col style="width: 150px">
<col style="width: 600px">
</colgroup>
<thead class="table-head-light">
<tr>
<th>Target</th>
<th>Type</th>
<th>Arguments</th>
</tr>
</thead>
<tbody>
{% for target in analysis_result['targets'] | sort_dict_list('name') %}
{% set row_count = 1 + target.ipc_calls | length %}
<tr>
<td rowspan={{ row_count }}>{{ target.name }}</td>
</tr>
{% for ipc_call in target.ipc_calls %}
<tr>
<td>{{ ipc_call.type }}</td>
<td>
<ul class="m-0">
{% for arg in ipc_call.arguments %}
{% if arg %}
<li>{{ arg }}</li>
{% endif %}
{% endfor %}
</ul>
</td>
</tr>
{% endfor %}
{% endfor %}
</tbody>
</table>

</td>
</tr>

{% endblock %}
13 changes: 13 additions & 0 deletions src/test/unit/web_interface/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,3 +488,16 @@ def test_as_ascii_table():
)
def test_str_to_hex(input_, expected_result):
assert flt.str_to_hex(input_) == expected_result


@pytest.mark.parametrize(
('input_', 'expected_result'),
[
([], []),
([{'a': 2}, {'a': 1}, {'a': 3}], [{'a': 1}, {'a': 2}, {'a': 3}]),
([{'a': 2}, {'a': 1}, {'b': 3}], [{'b': 3}, {'a': 1}, {'a': 2}]),
([{'a': 2}, {'a': '1'}, {'a': 3.3}], [{'b': 3}, {'a': 1}, {'a': 2}]),
],
)
def test_sort_dict_list_by_key(input_, expected_result):
assert flt.sort_dict_list_by_key(input_, 'a') == expected_result
1 change: 1 addition & 0 deletions src/web_interface/components/jinja_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ def _setup_filters(self): # noqa: PLR0915
self._app.jinja_env.filters['sort_chart_list_by_value'] = flt.sort_chart_list_by_value
self._app.jinja_env.filters['sort_comments'] = flt.sort_comments
self._app.jinja_env.filters['sort_cve'] = flt.sort_cve_results
self._app.jinja_env.filters['sort_dict_list'] = flt.sort_dict_list_by_key
self._app.jinja_env.filters['sort_privileges'] = lambda privileges: sorted(
privileges, key=lambda role: len(privileges[role]), reverse=True
)
Expand Down
6 changes: 5 additions & 1 deletion src/web_interface/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from re import Match
from string import ascii_letters
from time import localtime, strftime, struct_time, time
from typing import Iterable, Union
from typing import Any, Iterable, Union

import packaging.version
import semver
Expand Down Expand Up @@ -433,6 +433,10 @@ def _cve_score_to_float(score: float | str) -> float:
return 0.0


def sort_dict_list_by_key(dict_list: list[dict], key: Any) -> list[dict]:
return sorted(dict_list, key=lambda d: d.get(key, -9999))


def linter_reformat_issues(issues) -> dict[str, list[dict[str, str]]]:
reformatted = defaultdict(list, {})
for issue in issues:
Expand Down

0 comments on commit 937bc7e

Please sign in to comment.