diff --git a/src/helperFunctions/hash.py b/src/helperFunctions/hash.py index acbd870b9..8916fe51b 100644 --- a/src/helperFunctions/hash.py +++ b/src/helperFunctions/hash.py @@ -30,10 +30,3 @@ def get_md5(code: bytes | str) -> str: def get_tlsh_comparison(first, second): return tlsh.diff(first, second) - - -def normalize_lief_items(functions): - """ - Shorthand to convert a list of objects to a list of strings - """ - return [str(function) for function in functions] diff --git a/src/plugins/analysis/elf_analysis/code/elf_analysis.py b/src/plugins/analysis/elf_analysis/code/elf_analysis.py index 3ce8267b5..f073f170d 100644 --- a/src/plugins/analysis/elf_analysis/code/elf_analysis.py +++ b/src/plugins/analysis/elf_analysis/code/elf_analysis.py @@ -1,161 +1,278 @@ from __future__ import annotations import json -import logging -import re +import string from difflib import SequenceMatcher from pathlib import Path +from typing import TYPE_CHECKING, Iterable, List, Optional import lief +from pydantic import BaseModel, Field +from semver import Version -from analysis.PluginBase import AnalysisBasePlugin -from helperFunctions.hash import normalize_lief_items +from analysis.plugin import AnalysisPluginV0, Tag +from analysis.plugin.compat import AnalysisBasePluginAdapterMixin from helperFunctions.tag import TagColor -LIEF_DATA_ENTRIES = ( - 'dynamic_entries', - 'exported_functions', - 'header', - 'imported_functions', - 'libraries', - 'sections', - 'segments', - 'symbols_version', -) +FUNCTION_MATCHING_THRESHOLD = 0.85 + +if TYPE_CHECKING: + from io import FileIO + TEMPLATE_FILE_PATH = Path(__file__).parent.parent / 'internal/matching_template.json' BEHAVIOUR_CLASSES = json.loads(TEMPLATE_FILE_PATH.read_text()) +PRINTABLE_BYTES = set(string.printable.encode()) +ELF_SEGMENT_FLAGS = { + 0x1: 'execute', + 0x2: 'write', + 0x4: 'read', +} -class AnalysisPlugin(AnalysisBasePlugin): - NAME = 'elf_analysis' - DESCRIPTION = 'Analyzes and tags ELF executables and libraries' - VERSION = '0.3.4' - MIME_WHITELIST = [ # noqa: RUF012 - 'application/x-executable', - 'application/x-pie-executable', - 'application/x-object', - 'application/x-sharedlib', - ] - FILE = __file__ - - def process_object(self, file_object): - try: - elf_dict, parsed_binary = self._analyze_elf(file_object) - file_object.processed_analysis[self.NAME] = {'Output': elf_dict} - self.create_tags(parsed_binary, file_object) - file_object.processed_analysis[self.NAME]['summary'] = list(elf_dict.keys()) - except (RuntimeError, ValueError): - logging.error(f'lief could not parse {file_object.uid}', exc_info=True) - file_object.processed_analysis[self.NAME] = {'failed': 'lief could not parse the file'} - return file_object +class ElfHeader(BaseModel): + entrypoint: int + file_type: str + header_size: int + identity_abi_version: int + identity_class: str + identity_data: str + identity_os_abi: str + identity_version: str + machine_type: str + numberof_sections: int + numberof_segments: int + object_file_version: str + processor_flag: int + program_header_size: int + program_headers_offset: int + section_header_size: int + section_headers_offset: int + section_name_table_idx: int - @staticmethod - def _get_tags_from_library_list(libraries: list, behaviour_class: str, indicators: list, tags: list): - for library, indicator in ((lib, ind) for lib in libraries for ind in indicators): - if re.search(indicator, library): - tags.append(behaviour_class) + @classmethod + def from_lief_header(cls, header: lief.ELF.Header) -> ElfHeader: + return cls( + entrypoint=header.entrypoint, + file_type=header.file_type.__name__, + header_size=header.header_size, + identity_abi_version=header.identity_abi_version, + identity_class=header.identity_class.__name__, + identity_data=header.identity_data.__name__, + identity_os_abi=header.identity_os_abi.__name__, + identity_version=header.identity_version.__name__, + machine_type=header.machine_type.__name__.lower(), + numberof_sections=header.numberof_sections, + numberof_segments=header.numberof_segments, + object_file_version=header.object_file_version.__name__, + processor_flag=header.processor_flag, + program_header_size=header.program_header_size, + program_headers_offset=header.program_header_offset, + section_header_size=header.section_header_size, + section_headers_offset=header.section_header_offset, + section_name_table_idx=header.section_name_table_idx, + ) - @staticmethod - def _get_tags_from_function_list(functions: list, behaviour_class: str, indicators: list, tags: list): - for function, indicator in ((f, i) for f in functions for i in indicators): - if ( - indicator.lower() in function.lower() and SequenceMatcher(None, indicator, function).ratio() >= 0.85 # noqa: PLR2004 - ): - tags.append(behaviour_class) - def _get_tags(self, libraries: list, functions: list) -> list: - tags = [] - for behaviour_class in BEHAVIOUR_CLASSES: - if behaviour_class not in tags: - behaviour_indicators = BEHAVIOUR_CLASSES[behaviour_class] - self._get_tags_from_function_list(functions, behaviour_class, behaviour_indicators, tags) - self._get_tags_from_library_list(libraries, behaviour_class, behaviour_indicators, tags) - return list(set(tags)) +class ElfSection(BaseModel): + flags: List[str] + name: str + size: int + type: str + offset: int + virtual_address: int - @staticmethod - def _get_symbols_version_entries(symbol_versions): - imported_libs = [] - for sv in symbol_versions: - if str(sv) != '* Local *' and str(sv) != '* Global *': - imported_libs.append(str(sv).split('(', maxsplit=1)[0]) - return list(set(imported_libs)) + @classmethod + def from_lief_section(cls, section: lief.ELF.Section) -> ElfSection: + return cls( + flags=[f.__name__ for f in section.flags_list], + name=section.name, + size=section.size, + type=section.type.__name__, + offset=section.offset, + virtual_address=section.virtual_address, + ) - @staticmethod - def _get_relevant_imp_functions(imp_functions): - imp_functions[:] = [x for x in imp_functions if not x.startswith('__')] - return imp_functions + +class ElfSegment(BaseModel): + file_offset: int + flags: List[str] + physical_address: int + physical_size: int + type: str + virtual_address: int + virtual_size: int + + @classmethod + def from_lief_segment(cls, segment: lief.ELF.Segment) -> ElfSegment: + return cls( + file_offset=segment.file_offset, + flags=[ELF_SEGMENT_FLAGS.get(segment.flags.value, 'None')], + physical_address=segment.physical_address, + physical_size=segment.physical_size, + type=segment.type.__name__, + virtual_address=segment.virtual_address, + virtual_size=segment.virtual_size, + ) + + +class DynamicEntry(BaseModel): + tag: str + value: int + library: Optional[str] = None + flags: Optional[List[str]] = None + array: Optional[List[str]] = None + + @classmethod + def from_lief_dyn_entry(cls, entry: lief.ELF.DynamicEntry) -> DynamicEntry: + return cls( + tag=entry.tag.__name__, + value=entry.value, + library=getattr(entry, 'name', None), + flags=[f.__name__ for f in entry.flags] if hasattr(entry, 'flags') else None, + array=[str(i) for i in entry.array] if hasattr(entry, 'array') else None, + ) + + +class ElfSymbol(BaseModel): + name: str + offset: int + + +class InfoSectionData(BaseModel): + name: str + contents: str + + +class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin): + class Schema(BaseModel): + header: ElfHeader + sections: List[ElfSection] + segments: List[ElfSegment] + dynamic_entries: List[DynamicEntry] + exported_functions: List[ElfSymbol] + imported_functions: List[str] + mod_info: Optional[List[str]] = Field(description='Key value pairs with module information.') + note_sections: List[InfoSectionData] + behavior_classes: List[str] = Field(description='List of behavior classes (e.g. "crypto" or "network").') + + def __init__(self): + metadata = self.MetaData( + name='elf_analysis', + description='Analyzes and tags ELF executables and libraries', + version=Version(1, 0, 0), + Schema=self.Schema, + mime_whitelist=[ + 'application/x-executable', + 'application/x-pie-executable', + 'application/x-object', + 'application/x-sharedlib', + ], + ) + super().__init__(metadata=metadata) + + def analyze(self, file_handle: FileIO, virtual_file_path: str, analyses: dict) -> Schema: + del virtual_file_path, analyses + elf = lief.ELF.parse(file_handle.name) + if elf is None: + raise TypeError('not a valid ELF file') + return self.Schema( + header=ElfHeader.from_lief_header(elf.header), + exported_functions=[ElfSymbol(name=f.name, offset=f.address) for f in elf.exported_functions], + imported_functions=[f.name for f in elf.imported_functions], + sections=[ElfSection.from_lief_section(s) for s in elf.sections], + segments=[ElfSegment.from_lief_segment(s) for s in elf.segments], + dynamic_entries=[DynamicEntry.from_lief_dyn_entry(e) for e in elf.dynamic_entries], + note_sections=[c for c in _get_note_sections_content(elf) if c], + mod_info=_get_modinfo(elf), + behavior_classes=_get_behavior_classes(elf), + ) + + def summarize(self, result: Schema) -> list[str]: + keys = ['sections', 'dynamic_entries', 'exported_functions', 'imported_functions', 'note_sections', 'mod_info'] + return [k for k, v in result.model_dump().items() if k in keys and v] + + def get_tags(self, result: Schema, summary: list[str]) -> list[Tag]: + del summary + tags = [] + for behaviour_class in result.behavior_classes: + tags.append( + Tag( + name=behaviour_class, + value=behaviour_class, + color=self._get_color_codes(behaviour_class), + propagate=False, + ) + ) + return tags @staticmethod - def _get_color_codes(tag): - if tag == 'crypto': + def _get_color_codes(behavior_class: str) -> str: + if behavior_class == 'crypto': return TagColor.RED - if tag == 'file_system': + if behavior_class == 'file_system': return TagColor.BLUE - if tag == 'network': + if behavior_class == 'network': return TagColor.ORANGE - if tag == 'memory_operations': + if behavior_class == 'memory_operations': return TagColor.GREEN - if tag == 'randomize': + if behavior_class == 'randomize': return TagColor.LIGHT_BLUE return TagColor.GRAY - def create_tags(self, parsed_bin, file_object): - all_libs = self._get_symbols_version_entries(normalize_lief_items(parsed_bin.symbols_version)) - all_libs.extend(normalize_lief_items(parsed_bin.libraries)) - all_funcs = self._get_relevant_imp_functions(normalize_lief_items(parsed_bin.imported_functions)) - for entry in self._get_tags(all_libs, all_funcs): - self.add_analysis_tag( - file_object=file_object, - tag_name=entry, - value=entry, - color=self._get_color_codes(entry), - propagate=False, - ) - @staticmethod - def get_final_analysis_dict(binary_json_dict, elf_dict): - for key in binary_json_dict: - if key in LIEF_DATA_ENTRIES and binary_json_dict[key]: - elf_dict[key] = binary_json_dict[key] - - def _analyze_elf(self, file_object): - elf_dict = {} - try: - parsed_binary = lief.parse(file_object.file_path) - binary_json_dict = json.loads(lief.to_json(parsed_binary)) - if parsed_binary.exported_functions: - binary_json_dict['exported_functions'] = normalize_lief_items(parsed_binary.exported_functions) - if parsed_binary.imported_functions: - binary_json_dict['imported_functions'] = normalize_lief_items(parsed_binary.imported_functions) - if parsed_binary.libraries: - binary_json_dict['libraries'] = normalize_lief_items(parsed_binary.libraries) - modinfo_data = self.filter_modinfo(parsed_binary) - if modinfo_data: - elf_dict['modinfo'] = modinfo_data - - except (AttributeError, TypeError): - logging.error(f'Bad file for lief/elf analysis {file_object.uid}.', exc_info=True) - return elf_dict - - self.get_final_analysis_dict(binary_json_dict, elf_dict) - self._convert_address_values_to_hex(elf_dict) - - return elf_dict, parsed_binary +def _get_behavior_classes(elf: lief.ELF) -> list[str]: + libraries = _get_symbols_version_entries([str(s) for s in elf.symbols_version]) + libraries.extend([str(lib) for lib in elf.libraries]) + functions = _get_relevant_imp_functions([str(f) for f in elf.imported_functions]) - @staticmethod - def _convert_address_values_to_hex(elf_dict): - for category in {'sections', 'segments'}.intersection(elf_dict): - for entry in elf_dict[category]: - for key in {'virtual_address', 'offset'}.intersection(entry): - entry[key] = hex(entry[key]) + behaviour_classes = [] + for behaviour_class in BEHAVIOUR_CLASSES: + indicators = BEHAVIOUR_CLASSES[behaviour_class] + if _behaviour_class_applies(functions, libraries, indicators): + behaviour_classes.append(behaviour_class) + return behaviour_classes - @staticmethod - def filter_modinfo(binary) -> list[str] | None: - # getting the information from the *.ko files .modinfo section - modinfo = None - for section in binary.sections: - if section.name == '.modinfo': - modinfo = bytes(section.content).decode() - modinfo = [entry for entry in modinfo.split('\x00') if entry] - break - return modinfo + +def _get_relevant_imp_functions(imp_functions: list[str]) -> list[str]: + return [f for f in imp_functions if not f.startswith('__')] + + +def _get_symbols_version_entries(symbol_versions: list[str]) -> list[str]: + imported_libs = [] + for sv in symbol_versions: + if str(sv) != '* Local *' and str(sv) != '* Global *': + imported_libs.append(str(sv).split('(', maxsplit=1)[0]) + return list(set(imported_libs)) + + +def _behaviour_class_applies(functions: list[str], libraries: list[str], indicators: list[str]) -> bool: + for function in functions: + for indicator in indicators: + if ( + indicator.lower() in function.lower() + and SequenceMatcher(None, indicator, function).ratio() >= FUNCTION_MATCHING_THRESHOLD + ): + return True + for library in libraries: + for indicator in indicators: + if indicator.lower() in library.lower(): + return True + return False + + +def _get_modinfo(elf: lief.ELF) -> list[str] | None: + # getting the information from the *.ko files .modinfo section + modinfo = None + for section in elf.sections: + if section.name == '.modinfo': + modinfo = section.content.tobytes() + modinfo = [entry.decode(errors='replace') for entry in modinfo.split(b'\x00') if entry] + break + return modinfo + + +def _get_note_sections_content(elf: lief.ELF) -> Iterable[InfoSectionData]: + for section in elf.sections: # type: lief.ELF.Section + if section.type == lief.ELF.Section.TYPE.NOTE: + readable_content = bytes([c for c in section.content.tobytes() if c in PRINTABLE_BYTES]) + yield InfoSectionData(name=section.name, contents=readable_content.decode()) diff --git a/src/plugins/analysis/elf_analysis/test/test_plugin_elf_analysis.py b/src/plugins/analysis/elf_analysis/test/test_plugin_elf_analysis.py index f74b20b63..45a3bb661 100644 --- a/src/plugins/analysis/elf_analysis/test/test_plugin_elf_analysis.py +++ b/src/plugins/analysis/elf_analysis/test/test_plugin_elf_analysis.py @@ -3,44 +3,55 @@ import pytest +from analysis.plugin import Tag from helperFunctions.tag import TagColor -from objects.file import FileObject -from test.common_helper import get_test_data_dir -from ..code.elf_analysis import AnalysisPlugin - -TEST_DATA = get_test_data_dir() / 'test_data_file.bin' +from ..code.elf_analysis import ( + AnalysisPlugin, + ElfHeader, + _behaviour_class_applies, + _get_behavior_classes, + _get_symbols_version_entries, +) TEST_DATA_DIR = Path(__file__).parent / 'data' - LiefResult = namedtuple( 'LiefResult', ['symbols_version', 'libraries', 'imported_functions', 'exported_functions', 'sections'] ) -MOCK_DATA = ( - '{"header": {"entrypoint": 109724, "file_type": "DYNAMIC", "header_size": 52, "identity_class": "CLASS32", "identity_data": "LSB", "identity_os_abi": "SYSTEMV"},' # noqa: E501 - '"dynamic_entries": [{"library": "libdl.so.2", "tag": "NEEDED", "value": 1}, {"library": "libc.so.6", "tag": "NEEDED", "value": 137}, {"tag": "INIT", "value": 99064}],' # noqa: E501 - '"sections": [{"alignment": 0, "entry_size": 0, "flags": [], "information": 0, "link": 0, "name": "", "offset": 0, "size": 0, "type": "NULL", "virtual_address": 0}],' # noqa: E501 - '"segments": [{"alignment": 4, "file_offset": 2269, "flags": 4, "physical_address": 2269, "physical_size": 8, ' - '"sections": [".ARM.exidx"], "type": "ARM_EXIDX", "virtual_address": 2269, "virtual_size": 8}],' - '"symbols_version": [{"value": 0}, {"symbol_version_auxiliary": "GLIBC_2.4", "value": 2}, {"symbol_version_auxiliary": "GLIBC_2.4", "value": 2}]}' # noqa: E501 -) - -MOCK_LIEF_RESULT = LiefResult( - libraries=['libdl.so.2', 'libc.so.6'], - imported_functions=['fdopen', 'calloc', 'strstr', 'raise', 'gmtime_r', 'strcmp'], - symbols_version=[], - exported_functions=['SHA256_Transform', 'GENERAL_NAMES_free', 'i2d_RSAPrivateKey', 'd2i_OCSP_REQUEST'], +MOCK_RESULT = AnalysisPlugin.Schema( + header=ElfHeader( + entrypoint=0, + file_type='test', + header_size=52, + identity_abi_version=0, + identity_class='CLASS32', + identity_data='LSB', + identity_os_abi='SYSTEMV', + identity_version='', + machine_type='', + numberof_sections=1, + numberof_segments=0, + object_file_version='', + processor_flag=0, + program_header_size=0, + program_headers_offset=0, + section_header_size=0, + section_headers_offset=0, + section_name_table_idx=0, + ), sections=[], + segments=[], + dynamic_entries=[], + exported_functions=[], + imported_functions=[], + mod_info=None, + note_sections=[], + behavior_classes=['crypto', 'network'], ) -@pytest.fixture -def stub_object(): - return FileObject(file_path=str(TEST_DATA)) - - @pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin) class TestElfAnalysis: @pytest.mark.parametrize( @@ -58,42 +69,30 @@ def test_get_color_code(self, analysis_plugin, tag, tag_color): assert analysis_plugin._get_color_codes(tag) == tag_color @pytest.mark.parametrize( - ('indicators', 'behaviour_class', 'libraries', 'tags', 'expected'), + ('indicators', 'functions', 'libraries', 'should_apply'), [ - (['a'], 'b', ['c'], [], []), - (['a', 'b', 'c'], 'b', ['c'], [], ['b']), - (['a', 'b', 'c'], 'b', ['c'], ['b'], ['b', 'b']), - (['a', 'b', 'c'], 'b', ['c', 'a'], [], ['b', 'b']), - (['a', 'b', 'c'], 'b', ['d', 'e'], [], []), - (['a', 'b', 'c'], 'b', ['d', 'e'], ['x'], ['x']), + ([], [], [], False), + (['foo'], [], ['test1234'], False), + (['foo', 'test'], [], ['test1234'], True), # for libraries, any substring match is OK + (['foo'], ['test1234'], [], False), + (['test'], ['test1234'], [], False), # for function names, the coverage must be at least 85% + (['foobar123'], ['foobar1234'], [], True), ], ) - def test_get_tags_from_library_list(self, analysis_plugin, indicators, behaviour_class, libraries, tags, expected): - analysis_plugin._get_tags_from_library_list(libraries, behaviour_class, indicators, tags) - assert tags == expected + def test_behaviour_class_applies(self, indicators, functions, libraries, should_apply): + assert _behaviour_class_applies(functions, libraries, indicators) == should_apply - @pytest.mark.parametrize( - ('functions', 'behaviour_class', 'indicators', 'tags', 'expected_result'), - [ - ([], '', [], [], []), - (['a'], 'c', ['b'], [], []), - (['a'], 'c', ['b'], ['d'], ['d']), - (['a', 'b'], 'c', ['b'], ['d'], ['d', 'c']), - (['a', 'b', 'x', 'y'], 'c', ['o', 'p', 'y'], [], ['c']), - (['a', 'b'], 'c', ['b'], ['d', 'e'], ['d', 'e', 'c']), - ], - ) - def test_get_tags_from_function_list( - self, analysis_plugin, functions, behaviour_class, indicators, tags, expected_result - ): - analysis_plugin._get_tags_from_function_list(functions, behaviour_class, indicators, tags) - assert tags == expected_result - - def test_get_tags(self, analysis_plugin, monkeypatch): + def test_get_behavior_classes(self, analysis_plugin, monkeypatch): behaviour_classes = {'one': ['x', 'y'], 'two': ['z', 'a'], 'three': ['f', 'u']} monkeypatch.setattr('plugins.analysis.elf_analysis.code.elf_analysis.BEHAVIOUR_CLASSES', behaviour_classes) - tags = analysis_plugin._get_tags(libraries=['a', 'b', 'c'], functions=['d', 'e', 'f']) - assert sorted(tags) == ['three', 'two'] + elf = LiefResult( + libraries=['a', 'b', 'c'], + imported_functions=['d', 'e', 'f'], + symbols_version=[], + exported_functions=[], + sections=[], + ) + assert set(_get_behavior_classes(elf)) == {'three', 'two'} @pytest.mark.parametrize( ('symbol_versions', 'expected'), @@ -105,66 +104,37 @@ def test_get_tags(self, analysis_plugin, monkeypatch): ], ) def test_get_symbols_version_entries(self, analysis_plugin, symbol_versions, expected): - assert sorted(analysis_plugin._get_symbols_version_entries(symbol_versions)) == sorted(expected) - - def test_create_tags(self, analysis_plugin, stub_object): - stub_object.processed_analysis[analysis_plugin.NAME] = {} - stub_result = LiefResult( - libraries=['recvmsg', 'unknown'], - imported_functions=[], - symbols_version=[], - exported_functions=[], - sections=[], - ) - analysis_plugin.create_tags(stub_result, stub_object) + assert sorted(_get_symbols_version_entries(symbol_versions)) == sorted(expected) - assert 'network' in stub_object.processed_analysis[analysis_plugin.NAME]['tags'] - assert stub_object.processed_analysis[analysis_plugin.NAME]['tags']['network']['color'] == 'warning' + def test_get_tags(self, analysis_plugin): + tags = analysis_plugin.get_tags(MOCK_RESULT, []) + assert len(tags) == 2 + assert Tag(name='crypto', value='crypto', color='danger', propagate=False) in tags - def test_analyze_elf_bad_file(self, analysis_plugin, stub_object, tmpdir): + def test_analyze_elf_bad_file(self, analysis_plugin, tmpdir): random_file = Path(tmpdir.dirname, 'random') random_file.write_bytes(b'ABCDEFGHIJKLMNOPQRSTUVWXYZ') - stub_object.file_path = str(random_file.absolute()) - - result = analysis_plugin._analyze_elf(stub_object) - assert result == {} - - final_analysis_test_data = [({}, {}, 0), ({'header': [], 'segments': [1, 2], 'a': []}, {}, 1)] # noqa: RUF012 - - @pytest.mark.parametrize(('binary_json_dict', 'elf_dict', 'expected'), final_analysis_test_data) - def test_get_final_analysis_dict(self, analysis_plugin, binary_json_dict, elf_dict, expected): - analysis_plugin.get_final_analysis_dict(binary_json_dict, elf_dict) - assert len(elf_dict) == expected - - def test_pie(self, analysis_plugin): - test_file = FileObject(file_path=str(TEST_DATA_DIR / 'x-pie-executable')) - elf_dict, _ = analysis_plugin._analyze_elf(test_file) - assert elf_dict != {} - - def test_plugin(self, analysis_plugin, stub_object, monkeypatch): - monkeypatch.setattr('lief.parse', lambda _: MOCK_LIEF_RESULT) - monkeypatch.setattr('lief.to_json', lambda _: MOCK_DATA) - - analysis_plugin.process_object(stub_object) - - output = stub_object.processed_analysis[analysis_plugin.NAME]['Output'] - assert output != {} - result_summary = sorted(stub_object.processed_analysis[analysis_plugin.NAME]['summary']) - assert result_summary == [ - 'dynamic_entries', - 'exported_functions', - 'header', - 'imported_functions', - 'libraries', - 'sections', - 'segments', - 'symbols_version', - ] - assert 'strcmp' in output['imported_functions'] - assert output['segments'][0]['virtual_address'].startswith('0x'), 'addresses should be converted to hex' + with pytest.raises(TypeError), random_file.open('rb') as fp: + analysis_plugin.analyze(fp, {}, {}) + + def test_analyze_summarize(self, analysis_plugin): + test_file = TEST_DATA_DIR / 'x-pie-executable' + with test_file.open('rb') as fp: + result = analysis_plugin.analyze(fp, {}, {}) + assert result is not None + assert result.header.machine_type == 'i386' + assert len(result.sections) == 36 + assert result.sections[2].type == 'NOTE' + assert len(result.segments) == 12 + assert result.segments[0].flags == ['read'] + assert result.behavior_classes == ['stringops', 'libc'] + assert 'puts' in result.imported_functions + + summary = analysis_plugin.summarize(result) + assert summary == ['sections', 'dynamic_entries', 'exported_functions', 'imported_functions', 'note_sections'] def test_modinfo(self, analysis_plugin): - test_file = FileObject(file_path=str(TEST_DATA_DIR / 'test_data.ko')) - _, binary = analysis_plugin._analyze_elf(test_file) - result = analysis_plugin.filter_modinfo(binary) - assert result[0] == 'this are test data\n' + test_file = TEST_DATA_DIR / 'test_data.ko' + with test_file.open('rb') as fp: + result = analysis_plugin.analyze(fp, {}, {}) + assert result.mod_info == ['this are test data\n'] diff --git a/src/plugins/analysis/elf_analysis/view/elf_analysis.html b/src/plugins/analysis/elf_analysis/view/elf_analysis.html index 9bce6941e..89d6f53da 100644 --- a/src/plugins/analysis/elf_analysis/view/elf_analysis.html +++ b/src/plugins/analysis/elf_analysis/view/elf_analysis.html @@ -2,61 +2,72 @@ {% block analysis_result_details %} -{% set analysis = analysis_result['Output'] %} -{% for key in analysis.keys() | sort %} +{% set analysis = analysis_result %} +{% for key, value in analysis.items() | sort %} {{ key }} - -
- -
- -
- - {% if analysis[key] | is_list %} - {% if not analysis[key] %} - - - - {% elif analysis[key][0] is string %} - {% for element in analysis[key] %} + {% if not value %} + + {% else %} + +
None
None +
+ +
+ {# FixMe: sort data by offset/name #} +
+ + {% if value | is_list %} + {% if value[0] is string %} + {% for element in value %} {% endfor %} - {% else %} - - {% for unique_key in analysis[key] | get_unique_keys_from_list_of_dicts | sort %} - - {% endfor %} - - {% for dictionary in analysis[key] %} + {% else %} + {% set unique_keys = value | get_unique_keys_from_list_of_dicts | sort %} - {% for unique_key in analysis[key] | get_unique_keys_from_list_of_dicts | sort %} - {% if unique_key in dictionary.keys() %} - - {% else %} - - {% endif %} + {% for unique_key in unique_keys %} + {% endfor %} + {% for dictionary in value %} + + {% for unique_key in unique_keys %} + {% if unique_key in dictionary and dictionary[unique_key] is not none %} + {% if "offset" in unique_key %} + + {% elif dictionary[unique_key] | is_list %} + + {% else %} + + {% endif %} + {% else %} + + {% endif %} + {% endfor %} + + {% endfor %} + {% endif %} + {% elif value %} + {% for k, v in value.items() %} + + + {% if "offset" in k or "entrypoint" in k %} + + {% else %} + + {% endif %} + {% endfor %} {% endif %} - {% elif analysis[key] %} - {% for k, v in analysis[key].items() %} - - - - - {% endfor %} - {% endif %} - -
{{ element }}
{{ unique_key }}
{{ dictionary[unique_key] }}{{ unique_key }}
+ {{ dictionary[unique_key] | hex }} ({{ dictionary[unique_key] }}) + {{ ', '.join(dictionary[unique_key]) }}{{ dictionary[unique_key] }}
{{ k }}{{ v | hex }} ({{ v }}){{ v }}
{{ k }}{{ v }}
-
- -
+
+ + {% endif %} {% endfor %} diff --git a/src/plugins/analysis/hardware_analysis/code/hardware_analysis.py b/src/plugins/analysis/hardware_analysis/code/hardware_analysis.py index 1595104eb..fc2d83ffe 100644 --- a/src/plugins/analysis/hardware_analysis/code/hardware_analysis.py +++ b/src/plugins/analysis/hardware_analysis/code/hardware_analysis.py @@ -9,9 +9,9 @@ class AnalysisPlugin(AnalysisBasePlugin): """ NAME = 'hardware_analysis' - DESCRIPTION = 'Hardware Analysis Plug-in' + DESCRIPTION = 'Hardware Analysis Plugin' DEPENDENCIES = ['cpu_architecture', 'elf_analysis', 'kernel_config'] # noqa: RUF012 - VERSION = '0.2' + VERSION = '0.2.1' FILE = __file__ def process_object(self, file_object): @@ -42,7 +42,7 @@ def cpu_architecture_analysis(file_object) -> str | None: @staticmethod def get_modinfo(file_object): # getting the information from the *.ko files .modinfo - return file_object.processed_analysis['elf_analysis']['result'].get('Output', {}).get('modinfo') + return file_object.processed_analysis['elf_analysis']['result'].get('mod_info') @staticmethod def filter_kernel_config(file_object): diff --git a/src/test/unit/helperFunctions/test_hash.py b/src/test/unit/helperFunctions/test_hash.py index 19a7e8896..7305e4c4b 100644 --- a/src/test/unit/helperFunctions/test_hash.py +++ b/src/test/unit/helperFunctions/test_hash.py @@ -1,7 +1,6 @@ from helperFunctions.hash import ( get_md5, get_sha256, - normalize_lief_items, ) TEST_STRING = 'test string' @@ -15,24 +14,3 @@ def test_get_sha256(): def test_get_md5(): assert get_md5(TEST_STRING) == TEST_MD5, 'not correct from string' - - -def test_normalize_items_from_strings(): - functions = ['printf', '__libc_start_main'] - assert normalize_lief_items(functions) == functions - - -def test_normalize_items_from_objects(): - class Function: - def __init__(self, name): - self.name = name - - def __str__(self): - return self.name - - functions = ['printf', '__libc_start_main'] - assert normalize_lief_items([Function(name) for name in functions]) == functions - - -def test_normalize_items_empty_list(): - assert normalize_lief_items([]) == []