diff --git a/src/plugins/analysis/ipc/code/ipc_analyzer.py b/src/plugins/analysis/ipc/code/ipc_analyzer.py
index 470f38fefa..7dba62e1b3 100644
--- a/src/plugins/analysis/ipc/code/ipc_analyzer.py
+++ b/src/plugins/analysis/ipc/code/ipc_analyzer.py
@@ -3,44 +3,69 @@
import json
import tempfile
from pathlib import Path
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, List, Union
from docker.types import Mount
+from pydantic import BaseModel, Field
+from semver import Version
-from analysis.PluginBase import AnalysisBasePlugin
+from analysis.plugin import AnalysisPluginV0
+from analysis.plugin.compat import AnalysisBasePluginAdapterMixin
from helperFunctions.docker import run_docker_container
if TYPE_CHECKING:
- from objects.file import FileObject
+ from io import FileIO
DOCKER_IMAGE = 'ipc'
-class AnalysisPlugin(AnalysisBasePlugin):
- """
- Inter-Process Communication Analysis
- """
+class IpcTarget(BaseModel):
+ name: str = Field(description='target of the IPC (usually a file path or address)')
+ ipc_calls: List[IpcCall]
- NAME = 'ipc_analyzer'
- DESCRIPTION = 'Inter-Process Communication Analysis'
- VERSION = '0.1.1'
- FILE = __file__
- MIME_WHITELIST = [ # noqa: RUF012
- 'application/x-executable',
- 'application/x-object',
- 'application/x-sharedlib',
- ]
- DEPENDENCIES = ['file_type'] # noqa: RUF012
- TIMEOUT = 600 # 10 minutes
+class IpcCall(BaseModel):
+ type: str = Field(description='a type of call (usually a POSIX system call)')
+ arguments: List[Union[str, int, List[Union[str, int]]]] = Field(description='arguments of the IPC call')
- def _run_ipc_analyzer_in_docker(self, file_object: FileObject) -> dict:
+
+class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
+ class Schema(BaseModel):
+ targets: List[IpcTarget] = Field(description='An array of targets of IPC calls')
+
+ def __init__(self):
+ metadata = self.MetaData(
+ name='ipc_analyzer',
+ dependencies=['file_type'],
+ description='Inter-Process Communication Analysis',
+ mime_whitelist=[
+ 'application/x-executable',
+ 'application/x-object',
+ 'application/x-pie-executable',
+ 'application/x-sharedlib',
+ ],
+ timeout=600, # 10 minutes
+ version=Version(1, 0, 0),
+ Schema=self.Schema,
+ )
+ super().__init__(metadata=metadata)
+
+ def analyze(self, file_handle: FileIO, virtual_file_path: dict, analyses: dict[str, BaseModel]) -> Schema:
+ del virtual_file_path, analyses
+ output = self._run_ipc_analyzer_in_docker(file_handle)
+ # output structure: { 'target': [{'type': 'type', 'arguments': [...]}, ...], ...}
+ # we need to restructure this a bit so it lines up with the Schema
+ targets = [{'name': target, 'ipc_calls': target_info} for target, target_info in output['ipcCalls'].items()]
+ return self.Schema.model_validate({'targets': targets})
+
+ def _run_ipc_analyzer_in_docker(self, file_handle: FileIO) -> dict:
with tempfile.TemporaryDirectory() as tmp_dir:
+ path = Path(file_handle.name).absolute()
folder = Path(tmp_dir) / 'results'
- mount = f'/input/{file_object.file_name}'
+ mount = f'/input/{path.name}'
if not folder.exists():
folder.mkdir()
- output = folder / f'{file_object.file_name}.json'
+ output = folder / f'{path.name}.json'
output.write_text(json.dumps({'ipcCalls': {}}))
run_docker_container(
DOCKER_IMAGE,
@@ -49,28 +74,11 @@ def _run_ipc_analyzer_in_docker(self, file_object: FileObject) -> dict:
command=f'{mount} /results/',
mounts=[
Mount('/results/', str(folder.resolve()), type='bind'),
- Mount(mount, file_object.file_path, type='bind'),
+ Mount(mount, str(path), type='bind'),
],
)
return json.loads(output.read_text())
- def _do_full_analysis(self, file_object: FileObject) -> FileObject:
- output = self._run_ipc_analyzer_in_docker(file_object)
- file_object.processed_analysis[self.NAME] = {
- 'full': output,
- 'summary': self._create_summary(output['ipcCalls']),
- }
- return file_object
-
- def process_object(self, file_object: FileObject) -> FileObject:
- """
- This function handles only ELF executables. Otherwise, it returns an empty dictionary.
- It calls the ipc docker container.
- """
- return self._do_full_analysis(file_object)
-
- @staticmethod
- def _create_summary(output: dict) -> list[str]:
- # output structure: { 'target': [{'type': 'type', 'arguments': [...]}, ...], ...}
- summary = {entry['type'] for result_list in output.values() for entry in result_list}
- return sorted(summary)
+ def summarize(self, result: Schema) -> list[str]:
+ ipc_types = {ipc_call.type for target in result.targets for ipc_call in target.ipc_calls}
+ return sorted(ipc_types)
diff --git a/src/plugins/analysis/ipc/test/test_ipc_analyzer.py b/src/plugins/analysis/ipc/test/test_ipc_analyzer.py
index 80807e9212..bb9b70edb9 100644
--- a/src/plugins/analysis/ipc/test/test_ipc_analyzer.py
+++ b/src/plugins/analysis/ipc/test/test_ipc_analyzer.py
@@ -2,32 +2,36 @@
import pytest
-from objects.file import FileObject
-
from ..code.ipc_analyzer import AnalysisPlugin
TEST_DIR = Path(__file__).parent / 'data'
-
EXPECTED_SYSTEM_RESULT = {
- 'whoami': [{'type': 'system', 'arguments': ['']}],
- 'ls': [{'type': 'system', 'arguments': ['-l']}],
- 'echo': [{'type': 'system', 'arguments': ['hello']}],
- 'id': [{'type': 'system', 'arguments': ['']}],
- 'pwd': [{'type': 'system', 'arguments': ['']}],
+ 'targets': [
+ {'name': 'whoami', 'ipc_calls': [{'type': 'system', 'arguments': ['']}]},
+ {'name': 'ls', 'ipc_calls': [{'type': 'system', 'arguments': ['-l']}]},
+ {'name': 'echo', 'ipc_calls': [{'type': 'system', 'arguments': ['hello']}]},
+ {'name': 'id', 'ipc_calls': [{'type': 'system', 'arguments': ['']}]},
+ {'name': 'pwd', 'ipc_calls': [{'type': 'system', 'arguments': ['']}]},
+ ]
}
EXPECTED_WRITE_RESULT = {
- 'data.dat': [
- {'type': 'open', 'arguments': ['', ['O_RDWR | O_CREAT'], ['0666L']]},
+ 'targets': [
{
- 'type': 'write',
- 'arguments': [
- '',
- ['Now is the winter of our discontent\\nMade glorious summer by this sun of York\\n'],
- [77],
+ 'name': 'data.dat',
+ 'ipc_calls': [
+ {'type': 'open', 'arguments': ['', ['O_RDWR | O_CREAT'], ['0666L']]},
+ {
+ 'type': 'write',
+ 'arguments': [
+ '',
+ ['Now is the winter of our discontent\\nMade glorious summer by this sun of York\\n'],
+ [77],
+ ],
+ },
],
- },
+ }
]
}
@@ -40,8 +44,10 @@
('ipc_shared_files_test_bin', EXPECTED_WRITE_RESULT, ['open', 'write']),
],
)
-def test_ipc_system(analysis_plugin, test_file, expected_result, expected_summary):
- test_object = FileObject(file_path=str((TEST_DIR / test_file).resolve()))
- result = analysis_plugin.process_object(test_object)
- assert result.processed_analysis['ipc_analyzer']['full']['ipcCalls'] == expected_result
- assert result.processed_analysis['ipc_analyzer']['summary'] == expected_summary
+def test_ipc_analyze_summary(analysis_plugin, test_file, expected_result, expected_summary):
+ with (TEST_DIR / test_file).open('rb') as fp:
+ result = analysis_plugin.analyze(fp, {}, {})
+ as_dict = result.model_dump()
+ assert as_dict == expected_result
+ summary = analysis_plugin.summarize(result)
+ assert summary == expected_summary
diff --git a/src/plugins/analysis/ipc/view/ipc_analyzer.html b/src/plugins/analysis/ipc/view/ipc_analyzer.html
index eb6fd392ca..e230f0f552 100644
--- a/src/plugins/analysis/ipc/view/ipc_analyzer.html
+++ b/src/plugins/analysis/ipc/view/ipc_analyzer.html
@@ -2,40 +2,47 @@
{% block analysis_result_details %}
-
-
-
-
-
-
-
-
- Target |
- Type |
- Arguments |
-
- {% set ipc_calls = analysis_result['full']['ipcCalls'] %}
- {% for target in ipc_calls.keys()|sort %}
- {% set row_count = 1 + ipc_calls[target]|length %}
-
- {{ target }} |
-
- {% for ipc_call in ipc_calls[target] %}
-
- {{ ipc_call['type'] }} |
-
-
- {% for arg in ipc_call['arguments'] %}
- {% if arg %}
- - {{ arg }}
- {% endif %}
- {% endfor %}
-
- |
-
- {% endfor %}
- {% endfor %}
-
-
+
+
-{% endblock %}
\ No newline at end of file
+
+
+
+
+
+
+
+
+ Target |
+ Type |
+ Arguments |
+
+
+
+ {% for target in analysis_result['targets'] | sort_dict_list('name') %}
+ {% set row_count = 1 + target.ipc_calls | length %}
+
+ {{ target.name }} |
+
+ {% for ipc_call in target.ipc_calls %}
+
+ {{ ipc_call.type }} |
+
+
+ {% for arg in ipc_call.arguments %}
+ {% if arg %}
+ - {{ arg }}
+ {% endif %}
+ {% endfor %}
+
+ |
+
+ {% endfor %}
+ {% endfor %}
+
+
+
+ |
+
+
+{% endblock %}
diff --git a/src/test/unit/web_interface/test_filter.py b/src/test/unit/web_interface/test_filter.py
index 5aa55dfbc3..72f2176d08 100644
--- a/src/test/unit/web_interface/test_filter.py
+++ b/src/test/unit/web_interface/test_filter.py
@@ -488,3 +488,16 @@ def test_as_ascii_table():
)
def test_str_to_hex(input_, expected_result):
assert flt.str_to_hex(input_) == expected_result
+
+
+@pytest.mark.parametrize(
+ ('input_', 'expected_result'),
+ [
+ ([], []),
+ ([{'a': 2}, {'a': 1}, {'a': 3}], [{'a': 1}, {'a': 2}, {'a': 3}]),
+ ([{'a': 2}, {'a': 1}, {'b': 3}], [{'b': 3}, {'a': 1}, {'a': 2}]),
+ ([{'a': 2}, {'a': '1'}, {'a': 3.3}], [{'b': 3}, {'a': 1}, {'a': 2}]),
+ ],
+)
+def test_sort_dict_list_by_key(input_, expected_result):
+ assert flt.sort_dict_list_by_key(input_, 'a') == expected_result
diff --git a/src/web_interface/components/jinja_filter.py b/src/web_interface/components/jinja_filter.py
index 71bd04bcca..cd878ff6d5 100644
--- a/src/web_interface/components/jinja_filter.py
+++ b/src/web_interface/components/jinja_filter.py
@@ -233,6 +233,7 @@ def _setup_filters(self): # noqa: PLR0915
self._app.jinja_env.filters['sort_chart_list_by_value'] = flt.sort_chart_list_by_value
self._app.jinja_env.filters['sort_comments'] = flt.sort_comments
self._app.jinja_env.filters['sort_cve'] = flt.sort_cve_results
+ self._app.jinja_env.filters['sort_dict_list'] = flt.sort_dict_list_by_key
self._app.jinja_env.filters['sort_privileges'] = lambda privileges: sorted(
privileges, key=lambda role: len(privileges[role]), reverse=True
)
diff --git a/src/web_interface/filter.py b/src/web_interface/filter.py
index b41eb744af..5c99d1e1e8 100644
--- a/src/web_interface/filter.py
+++ b/src/web_interface/filter.py
@@ -14,7 +14,7 @@
from re import Match
from string import ascii_letters
from time import localtime, strftime, struct_time, time
-from typing import Iterable, Union
+from typing import Any, Iterable, Union
import packaging.version
import semver
@@ -433,6 +433,10 @@ def _cve_score_to_float(score: float | str) -> float:
return 0.0
+def sort_dict_list_by_key(dict_list: list[dict], key: Any) -> list[dict]:
+ return sorted(dict_list, key=lambda d: d.get(key, -9999))
+
+
def linter_reformat_issues(issues) -> dict[str, list[dict[str, str]]]:
reformatted = defaultdict(list, {})
for issue in issues: