diff --git a/src/plugins/analysis/ipc/code/ipc_analyzer.py b/src/plugins/analysis/ipc/code/ipc_analyzer.py
index 470f38fefa..66b7c563f4 100644
--- a/src/plugins/analysis/ipc/code/ipc_analyzer.py
+++ b/src/plugins/analysis/ipc/code/ipc_analyzer.py
@@ -3,44 +3,83 @@
import json
import tempfile
from pathlib import Path
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any, List, Union
from docker.types import Mount
+from pydantic import BaseModel, Field
+from semver import Version
-from analysis.PluginBase import AnalysisBasePlugin
+from analysis.plugin import AnalysisPluginV0
+from analysis.plugin.compat import AnalysisBasePluginAdapterMixin
from helperFunctions.docker import run_docker_container
if TYPE_CHECKING:
- from objects.file import FileObject
+ from io import FileIO
DOCKER_IMAGE = 'ipc'
-class AnalysisPlugin(AnalysisBasePlugin):
- """
- Inter-Process Communication Analysis
- """
+class FunctionCall(BaseModel):
+ name: str = Field(
+ # Refer to sink_function_names in ../docker/ipc_analyzer/ipy_analyzer.py for a list of supported functions
+ description='The name of the function.',
+ )
+ target: Union[str, int] = Field(
+ description=(
+ 'The first argument of the function call. '
+ 'For all supported functions, this is either a pathname or a file descriptor.'
+ ),
+ )
+ arguments: List[Any] = Field(
+ description=(
+ 'The remaining arguments of the function call. Arguments of type `char*` are rendered as strings. '
+ 'Arguments of type `char**` are rendered as array of strings. Integer arrays are rendered as such. '
+ 'Everything else is rendered as integer.'
+ )
+ )
- NAME = 'ipc_analyzer'
- DESCRIPTION = 'Inter-Process Communication Analysis'
- VERSION = '0.1.1'
- FILE = __file__
- MIME_WHITELIST = [ # noqa: RUF012
- 'application/x-executable',
- 'application/x-object',
- 'application/x-sharedlib',
- ]
- DEPENDENCIES = ['file_type'] # noqa: RUF012
- TIMEOUT = 600 # 10 minutes
+class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
+ class Schema(BaseModel):
+ calls: List[FunctionCall] = Field(description='An array of IPC function calls.')
- def _run_ipc_analyzer_in_docker(self, file_object: FileObject) -> dict:
+ def __init__(self):
+ metadata = self.MetaData(
+ name='ipc_analyzer',
+ dependencies=['file_type'],
+ description='Inter-Process Communication Analysis',
+ mime_whitelist=[
+ 'application/x-executable',
+ 'application/x-object',
+ 'application/x-pie-executable',
+ 'application/x-sharedlib',
+ ],
+ timeout=600,
+ version=Version(1, 0, 0),
+ Schema=self.Schema,
+ )
+ super().__init__(metadata=metadata)
+
+ def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]) -> Schema:
+ del virtual_file_path, analyses
+ output = self._run_ipc_analyzer_in_docker(file_handle)
+ # output structure: { 'target': [{'type': 'type', 'arguments': [...]}, ...], ...}
+ # we need to restructure this a bit so it lines up with the Schema
+ calls = [
+ {'target': target, 'name': call_dict['type'], 'arguments': call_dict['arguments']}
+ for target, call_list in output['ipcCalls'].items()
+ for call_dict in call_list
+ ]
+ return self.Schema.model_validate({'calls': calls})
+
+ def _run_ipc_analyzer_in_docker(self, file_handle: FileIO) -> dict:
with tempfile.TemporaryDirectory() as tmp_dir:
+ path = Path(file_handle.name).absolute()
folder = Path(tmp_dir) / 'results'
- mount = f'/input/{file_object.file_name}'
+ mount = f'/input/{path.name}'
if not folder.exists():
folder.mkdir()
- output = folder / f'{file_object.file_name}.json'
+ output = folder / f'{path.name}.json'
output.write_text(json.dumps({'ipcCalls': {}}))
run_docker_container(
DOCKER_IMAGE,
@@ -49,28 +88,10 @@ def _run_ipc_analyzer_in_docker(self, file_object: FileObject) -> dict:
command=f'{mount} /results/',
mounts=[
Mount('/results/', str(folder.resolve()), type='bind'),
- Mount(mount, file_object.file_path, type='bind'),
+ Mount(mount, str(path), type='bind'),
],
)
return json.loads(output.read_text())
- def _do_full_analysis(self, file_object: FileObject) -> FileObject:
- output = self._run_ipc_analyzer_in_docker(file_object)
- file_object.processed_analysis[self.NAME] = {
- 'full': output,
- 'summary': self._create_summary(output['ipcCalls']),
- }
- return file_object
-
- def process_object(self, file_object: FileObject) -> FileObject:
- """
- This function handles only ELF executables. Otherwise, it returns an empty dictionary.
- It calls the ipc docker container.
- """
- return self._do_full_analysis(file_object)
-
- @staticmethod
- def _create_summary(output: dict) -> list[str]:
- # output structure: { 'target': [{'type': 'type', 'arguments': [...]}, ...], ...}
- summary = {entry['type'] for result_list in output.values() for entry in result_list}
- return sorted(summary)
+ def summarize(self, result: Schema) -> list[str]:
+ return sorted({call.name for call in result.calls})
diff --git a/src/plugins/analysis/ipc/test/test_ipc_analyzer.py b/src/plugins/analysis/ipc/test/test_ipc_analyzer.py
index 80807e9212..aa11ec63b8 100644
--- a/src/plugins/analysis/ipc/test/test_ipc_analyzer.py
+++ b/src/plugins/analysis/ipc/test/test_ipc_analyzer.py
@@ -2,31 +2,31 @@
import pytest
-from objects.file import FileObject
-
from ..code.ipc_analyzer import AnalysisPlugin
TEST_DIR = Path(__file__).parent / 'data'
-
EXPECTED_SYSTEM_RESULT = {
- 'whoami': [{'type': 'system', 'arguments': ['']}],
- 'ls': [{'type': 'system', 'arguments': ['-l']}],
- 'echo': [{'type': 'system', 'arguments': ['hello']}],
- 'id': [{'type': 'system', 'arguments': ['']}],
- 'pwd': [{'type': 'system', 'arguments': ['']}],
+ 'calls': [
+ {'arguments': [''], 'name': 'system', 'target': 'whoami'},
+ {'arguments': ['-l'], 'name': 'system', 'target': 'ls'},
+ {'arguments': ['hello'], 'name': 'system', 'target': 'echo'},
+ {'arguments': [''], 'name': 'system', 'target': 'id'},
+ {'arguments': [''], 'name': 'system', 'target': 'pwd'},
+ ]
}
EXPECTED_WRITE_RESULT = {
- 'data.dat': [
- {'type': 'open', 'arguments': ['', ['O_RDWR | O_CREAT'], ['0666L']]},
+ 'calls': [
+ {'arguments': ['', ['O_RDWR | O_CREAT'], ['0666L']], 'name': 'open', 'target': 'data.dat'},
{
- 'type': 'write',
'arguments': [
'',
- ['Now is the winter of our discontent\\nMade glorious summer by this sun of York\\n'],
+ ['Now is the winter of our discontent\\nMade ' 'glorious summer by this sun of York\\n'],
[77],
],
+ 'name': 'write',
+ 'target': 'data.dat',
},
]
}
@@ -40,8 +40,10 @@
('ipc_shared_files_test_bin', EXPECTED_WRITE_RESULT, ['open', 'write']),
],
)
-def test_ipc_system(analysis_plugin, test_file, expected_result, expected_summary):
- test_object = FileObject(file_path=str((TEST_DIR / test_file).resolve()))
- result = analysis_plugin.process_object(test_object)
- assert result.processed_analysis['ipc_analyzer']['full']['ipcCalls'] == expected_result
- assert result.processed_analysis['ipc_analyzer']['summary'] == expected_summary
+def test_ipc_analyze_summary(analysis_plugin, test_file, expected_result, expected_summary):
+ with (TEST_DIR / test_file).open('rb') as fp:
+ result = analysis_plugin.analyze(fp, {}, {})
+ as_dict = result.model_dump()
+ assert as_dict == expected_result
+ summary = analysis_plugin.summarize(result)
+ assert summary == expected_summary
diff --git a/src/plugins/analysis/ipc/view/ipc_analyzer.html b/src/plugins/analysis/ipc/view/ipc_analyzer.html
index eb6fd392ca..cef5a5e8dc 100644
--- a/src/plugins/analysis/ipc/view/ipc_analyzer.html
+++ b/src/plugins/analysis/ipc/view/ipc_analyzer.html
@@ -2,40 +2,47 @@
{% block analysis_result_details %}
-
-
-
-
-
-
-
-
- Target |
- Type |
- Arguments |
-
- {% set ipc_calls = analysis_result['full']['ipcCalls'] %}
- {% for target in ipc_calls.keys()|sort %}
- {% set row_count = 1 + ipc_calls[target]|length %}
-
- {{ target }} |
-
- {% for ipc_call in ipc_calls[target] %}
-
- {{ ipc_call['type'] }} |
-
-
- {% for arg in ipc_call['arguments'] %}
- {% if arg %}
- - {{ arg }}
- {% endif %}
- {% endfor %}
-
- |
-
- {% endfor %}
- {% endfor %}
-
-
+
+
-{% endblock %}
\ No newline at end of file
+
+
+
+
+
+
+
+
+ Type |
+ Target |
+ Arguments |
+
+
+
+ {% for type, call_list in (analysis_result['calls'] | group_dict_list_by_key('name')).items() %}
+ {% set row_count = 1 + call_list | length %}
+
+ {{ type }} |
+
+ {% for call_dict in call_list | sort_dict_list('target') %}
+
+ {{ call_dict.target }} |
+
+
+ {% for arg in call_dict.arguments %}
+ {% if arg %}
+ - {{ arg }}
+ {% endif %}
+ {% endfor %}
+
+ |
+
+ {% endfor %}
+ {% endfor %}
+
+
+
+ |
+
+
+{% endblock %}
diff --git a/src/test/unit/web_interface/test_filter.py b/src/test/unit/web_interface/test_filter.py
index 8f29b859ba..6db481f10f 100644
--- a/src/test/unit/web_interface/test_filter.py
+++ b/src/test/unit/web_interface/test_filter.py
@@ -302,6 +302,17 @@ def test_get_unique_keys_from_list_of_dicts(list_of_dicts, expected_result):
assert flt.get_unique_keys_from_list_of_dicts(list_of_dicts) == expected_result
+@pytest.mark.parametrize(
+ ('list_of_dicts', 'key', 'expected_result'),
+ [
+ ([], '', {}),
+ ([{'a': '1'}, {'a': '1'}, {'a': '2'}], 'a', {'1': [{'a': '1'}, {'a': '1'}], '2': [{'a': '2'}]}),
+ ],
+)
+def test_group_dict_list_by_key(list_of_dicts, key, expected_result):
+ assert flt.group_dict_list_by_key(list_of_dicts, key) == expected_result
+
+
@pytest.mark.parametrize(
('function', 'input_data', 'expected_output', 'error_message'),
[
@@ -503,3 +514,16 @@ def test_str_to_hex(input_, expected_result):
)
def test_octal_to_readable(input_, include_type, expected_result):
assert flt.octal_to_readable(input_, include_type=include_type) == expected_result
+
+
+@pytest.mark.parametrize(
+ ('input_', 'expected_result'),
+ [
+ ([], []),
+ ([{'a': 2}, {'a': 1}, {'a': 3}], [{'a': 1}, {'a': 2}, {'a': 3}]),
+ ([{'a': 2}, {'a': 1}, {'b': 3}], [{'b': 3}, {'a': 1}, {'a': 2}]),
+ ([{'a': 'b'}, {'a': 'c'}, {'a': 'a'}], [{'a': 'a'}, {'a': 'b'}, {'a': 'c'}]),
+ ],
+)
+def test_sort_dict_list_by_key(input_, expected_result):
+ assert flt.sort_dict_list_by_key(input_, 'a') == expected_result
diff --git a/src/web_interface/components/jinja_filter.py b/src/web_interface/components/jinja_filter.py
index 622d4afb2f..8711fcb729 100644
--- a/src/web_interface/components/jinja_filter.py
+++ b/src/web_interface/components/jinja_filter.py
@@ -190,6 +190,7 @@ def _setup_filters(self): # noqa: PLR0915
self._app.jinja_env.filters['get_canvas_height'] = flt.get_canvas_height
self._app.jinja_env.filters['get_searchable_crypto_block'] = flt.get_searchable_crypto_block
self._app.jinja_env.filters['get_unique_keys_from_list_of_dicts'] = flt.get_unique_keys_from_list_of_dicts
+ self._app.jinja_env.filters['group_dict_list_by_key'] = flt.group_dict_list_by_key
self._app.jinja_env.filters['hex'] = hex
self._app.jinja_env.filters['hide_dts_binary_data'] = flt.hide_dts_binary_data
self._app.jinja_env.filters['infection_color'] = flt.infection_color
@@ -228,6 +229,7 @@ def _setup_filters(self): # noqa: PLR0915
self._app.jinja_env.filters['sort_chart_list_by_value'] = flt.sort_chart_list_by_value
self._app.jinja_env.filters['sort_comments'] = flt.sort_comments
self._app.jinja_env.filters['sort_cve'] = flt.sort_cve_results
+ self._app.jinja_env.filters['sort_dict_list'] = flt.sort_dict_list_by_key
self._app.jinja_env.filters['sort_privileges'] = lambda privileges: sorted(
privileges, key=lambda role: len(privileges[role]), reverse=True
)
diff --git a/src/web_interface/filter.py b/src/web_interface/filter.py
index 236e9ac9e8..f99ad74c06 100644
--- a/src/web_interface/filter.py
+++ b/src/web_interface/filter.py
@@ -14,7 +14,7 @@
from re import Match
from string import ascii_letters
from time import localtime, strftime, struct_time, time
-from typing import Iterable, Union
+from typing import Any, Iterable, Union
import packaging.version
import semver
@@ -366,6 +366,13 @@ def get_unique_keys_from_list_of_dicts(list_of_dicts: list[dict]):
return unique_keys
+def group_dict_list_by_key(dict_list: list[dict], key: Any) -> dict[str, list[dict]]:
+ result = {}
+ for dictionary in dict_list:
+ result.setdefault(dictionary.get(key), []).append(dictionary)
+ return result
+
+
def random_collapse_id():
return ''.join(random.choice(ascii_letters) for _ in range(10))
@@ -433,6 +440,10 @@ def _cve_score_to_float(score: float | str) -> float:
return 0.0
+def sort_dict_list_by_key(dict_list: list[dict], key: Any) -> list[dict]:
+ return sorted(dict_list, key=lambda d: str(d.get(key, '')))
+
+
def linter_reformat_issues(issues) -> dict[str, list[dict[str, str]]]:
reformatted = defaultdict(list, {})
for issue in issues: