From 4b87ffa74e3db50ee146a779ab40c663cd446bd1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Tue, 29 Oct 2024 16:55:59 +0100 Subject: [PATCH] feat: converted IP&URI plugin to new base class --- .../code/ip_and_uri_finder.py | 169 ++++++++++-------- .../test/test_ip_and_uri_finder.py | 129 +++++-------- .../view/ip_and_uri_finder.html | 14 +- src/statistic/update.py | 4 +- src/storage/db_interface_stats.py | 4 +- .../templates/show_statistic.html | 4 +- 6 files changed, 148 insertions(+), 176 deletions(-) diff --git a/src/plugins/analysis/ip_and_uri_finder/code/ip_and_uri_finder.py b/src/plugins/analysis/ip_and_uri_finder/code/ip_and_uri_finder.py index 83dde8aae..93bfd272d 100644 --- a/src/plugins/analysis/ip_and_uri_finder/code/ip_and_uri_finder.py +++ b/src/plugins/analysis/ip_and_uri_finder/code/ip_and_uri_finder.py @@ -1,15 +1,23 @@ +from __future__ import annotations + import logging from contextlib import suppress from itertools import product from pathlib import Path from re import search +from typing import TYPE_CHECKING, List, Optional import geoip2.database from common_analysis_ip_and_uri_finder import CommonAnalysisIPAndURIFinder from geoip2.errors import AddressNotFoundError from maxminddb.errors import InvalidDatabaseError +from pydantic import BaseModel + +from analysis.plugin import AnalysisPluginV0 +from analysis.plugin.compat import AnalysisBasePluginAdapterMixin -from analysis.PluginBase import AnalysisBasePlugin +if TYPE_CHECKING: + from io import FileIO GEOIP_DATABASE_PATH = Path(__file__).parent.parent / 'bin/GeoLite2-City/GeoLite2-City.mmdb' @@ -17,22 +25,23 @@ IP_V6_BLACKLIST = [r'^[0-9A-Za-z]::$', r'^::[0-9A-Za-z]$', r'^[0-9A-Za-z]::[0-9A-Za-z]$', r'^::$'] # trivial addresses -class AnalysisPlugin(AnalysisBasePlugin): - NAME = 'ip_and_uri_finder' - DEPENDENCIES = [] # noqa: RUF012 - MIME_WHITELIST = [ # noqa: RUF012 - 'text/plain', - 'application/octet-stream', - 'application/x-executable', - 'application/x-object', - 'application/x-sharedlib', - 'application/x-dosexec', - ] - DESCRIPTION = 'Search file for IP addresses and URIs based on regular expressions.' - VERSION = '0.4.2' - FILE = __file__ - - def additional_setup(self): +class IpAddress(BaseModel): + address: str + location: Optional[Location] + + +class Location(BaseModel): + longitude: float + latitude: float + + +class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin): + class Schema(BaseModel): + ips_v4: List[IpAddress] + ips_v6: List[IpAddress] + uris: List[str] + + def __init__(self): self.ip_and_uri_finder = CommonAnalysisIPAndURIFinder() try: self.reader = geoip2.database.Reader(str(GEOIP_DATABASE_PATH)) @@ -40,66 +49,70 @@ def additional_setup(self): logging.error('could not load GeoIP database') self.reader = None - def process_object(self, file_object): - result = self.ip_and_uri_finder.analyze_file(file_object.file_path, separate_ipv6=True) - - for key in ['uris', 'ips_v4', 'ips_v6']: - result[key] = self._remove_duplicates(result[key]) - result['ips_v4'] = self._remove_blacklisted(result['ips_v4'], IP_V4_BLACKLIST) - result['ips_v6'] = self._remove_blacklisted(result['ips_v6'], IP_V6_BLACKLIST) - - file_object.processed_analysis[self.NAME] = self._get_augmented_result(self.add_geo_uri_to_ip(result)) - - return file_object - - def _get_augmented_result(self, result): - result['summary'] = self._get_summary(result) - result['system_version'] = self.ip_and_uri_finder.system_version - return result - - def add_geo_uri_to_ip(self, result): - for key in ['ips_v4', 'ips_v6']: - result[key] = self.link_ips_with_geo_location(result[key]) - return result - - def find_geo_location(self, ip_address): - response = self.reader.city(ip_address) - return f'{response.location.latitude}, {response.location.longitude}' - - def link_ips_with_geo_location(self, ip_addresses): - linked_ip_geo_list = [] - for ip in ip_addresses: - try: - ip_tuple = ip, self.find_geo_location(ip) - except ( - AttributeError, - AddressNotFoundError, - FileNotFoundError, - ValueError, - InvalidDatabaseError, - ) as exception: - logging.debug(f'Error during {self.NAME} analysis: {exception!s}', exc_info=True) - ip_tuple = ip, '' - linked_ip_geo_list.append(ip_tuple) - return linked_ip_geo_list - - @staticmethod - def _get_summary(results): - summary = [] - summary.extend(results['uris']) - for key in ['ips_v4', 'ips_v6']: - for ip, *_ in results[key]: # IP results come in tuples (ip, latitude, longitude) - summary.append(ip) + super().__init__( + metadata=self.MetaData( + name='ip_and_uri_finder', + description='Search file for IP addresses and URIs based on regular expressions.', + version='1.0.0', + Schema=self.Schema, + mime_whitelist=[ + 'text/plain', + 'application/octet-stream', + 'application/x-executable', + 'application/x-object', + 'application/x-sharedlib', + 'application/x-dosexec', + ], + system_version=self.ip_and_uri_finder.system_version, + ), + ) + + def analyze(self, file_handle: FileIO, virtual_file_path: dict[str, list[str]], analyses: dict) -> Schema: + del virtual_file_path, analyses + ip_data = self.ip_and_uri_finder.analyze_file(file_handle.name, separate_ipv6=True) + ip_v4_results = _remove_blacklisted(_remove_duplicates(ip_data['ips_v4']), IP_V4_BLACKLIST) + ip_v6_results = _remove_blacklisted(_remove_duplicates(ip_data['ips_v6']), IP_V6_BLACKLIST) + uris = _remove_duplicates(ip_data['uris']) + return self.Schema( + ips_v4=[IpAddress(address=ip, location=self.find_geo_location(ip)) for ip in ip_v4_results], + ips_v6=[IpAddress(address=ip, location=self.find_geo_location(ip)) for ip in ip_v6_results], + uris=uris, + ) + + def find_geo_location(self, ip_address: str) -> Location | None: + if self.reader is None: + return None + try: + response = self.reader.city(ip_address) + return Location( + longitude=float(response.location.longitude), + latitude=float(response.location.latitude), + ) + except ( + AttributeError, + AddressNotFoundError, + FileNotFoundError, + ValueError, + InvalidDatabaseError, + ) as exception: + logging.debug(f'Error during {self.NAME} analysis: {exception!s}', exc_info=True) + return None + + def summarize(self, result: Schema) -> list: + summary = [*result.uris] + for ip_list in [result.ips_v4, result.ips_v6]: + for ip in ip_list: + summary.append(ip.address) return summary - @staticmethod - def _remove_duplicates(input_list): - return list(set(input_list)) - - @staticmethod - def _remove_blacklisted(ip_list, blacklist): - for ip, blacklist_entry in product(ip_list, blacklist): - if search(blacklist_entry, ip): - with suppress(ValueError): - ip_list.remove(ip) - return ip_list + +def _remove_duplicates(input_list: list[str]) -> list[str]: + return list(set(input_list)) + + +def _remove_blacklisted(ip_list: list[str], blacklist: list[str]) -> list[str]: + for ip, blacklist_entry in product(ip_list, blacklist): + if search(blacklist_entry, ip): + with suppress(ValueError): + ip_list.remove(ip) + return ip_list diff --git a/src/plugins/analysis/ip_and_uri_finder/test/test_ip_and_uri_finder.py b/src/plugins/analysis/ip_and_uri_finder/test/test_ip_and_uri_finder.py index 883443f11..49d958dfc 100644 --- a/src/plugins/analysis/ip_and_uri_finder/test/test_ip_and_uri_finder.py +++ b/src/plugins/analysis/ip_and_uri_finder/test/test_ip_and_uri_finder.py @@ -2,13 +2,12 @@ import tempfile from collections import namedtuple +from pathlib import Path import pytest from geoip2.errors import AddressNotFoundError -from objects.file import FileObject - -from ..code.ip_and_uri_finder import AnalysisPlugin +from ..code.ip_and_uri_finder import AnalysisPlugin, _remove_blacklisted MockResponse = namedtuple('MockResponse', ['location']) MockLocation = namedtuple('MockLocation', ['latitude', 'longitude']) @@ -51,97 +50,59 @@ def ip_and_uri_finder_plugin(analysis_plugin): @pytest.mark.AnalysisPluginTestConfig(plugin_class=AnalysisPlugin) class TestAnalysisPluginIpAndUriFinder: def test_process_object_ips(self, ip_and_uri_finder_plugin): - with tempfile.NamedTemporaryFile() as tmp: - with open(tmp.name, 'w') as fp: # noqa: PTH123 - fp.write( - '1.2.3.4 abc 1.1.1.1234 abc 3. 3. 3. 3 abc 1255.255.255.255 1234:1234:abcd:abcd:1234:1234:abcd:abc' - 'd xyz 2001:db8::8d3:: xyz 2001:db8:0:0:8d3::' - ) - tmp_fo = FileObject(file_path=tmp.name) - processed_object = ip_and_uri_finder_plugin.process_object(tmp_fo) - results = processed_object.processed_analysis[ip_and_uri_finder_plugin.NAME] - assert results['uris'] == [] - assert { - ('1.2.3.4', '47.913, -122.3042'), - ('1.1.1.123', '-37.7, 145.1833'), - } == set(results['ips_v4']) - assert len( - [ - ('1.2.3.4', '47.913, -122.3042'), - ('1.1.1.123', '-37.7, 145.1833'), - ] - ) == len(results['ips_v4']) - assert { - ('1234:1234:abcd:abcd:1234:1234:abcd:abcd', '2.1, 2.1'), - ('2001:db8:0:0:8d3::', '3.1, 3.1'), - } == set(results['ips_v6']) - assert len( - [ - ('1234:1234:abcd:abcd:1234:1234:abcd:abcd', '2.1, 2.1'), - ('2001:db8:0:0:8d3::', '3.1, 3.1'), - ] - ) == len(results['ips_v6']) + with tempfile.NamedTemporaryFile() as tmp, Path(tmp.name).open('w') as fp: + fp.write( + '1.2.3.4 abc 1.1.1.1234 abc 3. 3. 3. 3 abc 1255.255.255.255 1234:1234:abcd:abcd:1234:1234:abcd:abc' + 'd xyz 2001:db8::8d3:: xyz 2001:db8:0:0:8d3::' + ) + fp.seek(0) + results = ip_and_uri_finder_plugin.analyze(fp, {}, {}) + assert results.uris == [] + assert len(results.ips_v4) == 2 + ip_v4_addresses = {ipa.address: f'{ipa.location.latitude}, {ipa.location.longitude}' for ipa in results.ips_v4} + assert ip_v4_addresses == { + '1.2.3.4': '47.913, -122.3042', + '1.1.1.123': '-37.7, 145.1833', + } + assert len(results.ips_v6) == 2 + ip_v6_addresses = {ipa.address: f'{ipa.location.latitude}, {ipa.location.longitude}' for ipa in results.ips_v6} + assert ip_v6_addresses == { + '1234:1234:abcd:abcd:1234:1234:abcd:abcd': '2.1, 2.1', + '2001:db8:0:0:8d3::': '3.1, 3.1', + } + + assert set(ip_and_uri_finder_plugin.summarize(results)) == {*ip_v4_addresses, *ip_v6_addresses} def test_process_object_uris(self, ip_and_uri_finder_plugin): - with tempfile.NamedTemporaryFile() as tmp: - with open(tmp.name, 'w') as fp: # noqa: PTH123 - fp.write( - 'http://www.google.de https://www.test.de/test/?x=y&1=2 ftp://ftp.is.co.za/rfc/rfc1808.txt ' - 'telnet://192.0.2.16:80/' - ) - tmp_fo = FileObject(file_path=tmp.name) - processed_object = ip_and_uri_finder_plugin.process_object(tmp_fo) - results = processed_object.processed_analysis[ip_and_uri_finder_plugin.NAME] - assert { + with tempfile.NamedTemporaryFile() as tmp, Path(tmp.name).open('w') as fp: + fp.write( + 'http://www.google.de https://www.test.de/test/?x=y&1=2 ftp://ftp.is.co.za/rfc/rfc1808.txt ' + 'telnet://192.0.2.16:80/' + ) + fp.seek(0) + results = ip_and_uri_finder_plugin.analyze(fp, {}, {}) + assert set(results.uris) == { 'http://www.google.de', 'https://www.test.de/test/', 'ftp://ftp.is.co.za/rfc/rfc1808.txt', 'telnet://192.0.2.16:80/', - } == set(results['uris']) - assert len( - [ - 'http://www.google.de', - 'https://www.test.de/test/', - 'ftp://ftp.is.co.za/rfc/rfc1808.txt', - 'telnet://192.0.2.16:80/', - ] - ) == len(results['uris']) - - def test_add_geo_uri_to_ip(self, ip_and_uri_finder_plugin): - test_data = { - 'ips_v4': ['128.101.101.101', '255.255.255.255'], - 'ips_v6': ['1234:1234:abcd:abcd:1234:1234:abcd:abcd'], - 'uris': 'http://www.google.de', } - results = ip_and_uri_finder_plugin.add_geo_uri_to_ip(test_data) - assert results['uris'] == 'http://www.google.de' - assert [('128.101.101.101', '44.9759, -93.2166'), ('255.255.255.255', '0.0, 0.0')] == results['ips_v4'] - assert [('1234:1234:abcd:abcd:1234:1234:abcd:abcd', '2.1, 2.1')] == results['ips_v6'] + assert len(results.uris) == 4 + + assert set(ip_and_uri_finder_plugin.summarize(results)) == set(results.uris).union({'192.0.2.16'}) def test_find_geo_location(self, ip_and_uri_finder_plugin): - assert ip_and_uri_finder_plugin.find_geo_location('128.101.101.101') == '44.9759, -93.2166' - assert ip_and_uri_finder_plugin.find_geo_location('127.101.101.101') == '4.1, 4.1' - - with pytest.raises(AddressNotFoundError): - ip_and_uri_finder_plugin.find_geo_location('1.1.2.345') - with pytest.raises(ValueError): # noqa: PT011 - ip_and_uri_finder_plugin.find_geo_location('aaa') - - def test_link_ips_with_geo_location(self, ip_and_uri_finder_plugin): - ip_addresses = ['128.101.101.101', '255.255.255.255'] - expected_results = [('128.101.101.101', '44.9759, -93.2166'), ('255.255.255.255', '0.0, 0.0')] - assert ip_and_uri_finder_plugin.link_ips_with_geo_location(ip_addresses) == expected_results - - def test_get_summary(self): - results = { - 'uris': ['http://www.google.de'], - 'ips_v4': [('128.101.101.101', '44.9759, -93.2166')], - 'ips_v6': [('1234:1234:abcd:abcd:1234:1234:abcd:abcd', '2.1, 2.1')], - } - expected_results = ['http://www.google.de', '128.101.101.101', '1234:1234:abcd:abcd:1234:1234:abcd:abcd'] - assert AnalysisPlugin._get_summary(results), expected_results + location = ip_and_uri_finder_plugin.find_geo_location('128.101.101.101') + assert location.latitude == 44.9759 + assert location.longitude == -93.2166 + location = ip_and_uri_finder_plugin.find_geo_location('127.101.101.101') + assert location.latitude == 4.1 + assert location.longitude == 4.1 + + assert ip_and_uri_finder_plugin.find_geo_location('1.1.2.345') is None + assert ip_and_uri_finder_plugin.find_geo_location('aaa') is None def test_remove_blacklisted(self, ip_and_uri_finder_plugin): input_list = ['1.1.1.1', 'blah', '0.0.0.0'] blacklist = [r'[0-9].{4}', r'x.y'] - assert ip_and_uri_finder_plugin._remove_blacklisted(input_list, blacklist) == ['blah'] + assert _remove_blacklisted(input_list, blacklist) == ['blah'] diff --git a/src/plugins/analysis/ip_and_uri_finder/view/ip_and_uri_finder.html b/src/plugins/analysis/ip_and_uri_finder/view/ip_and_uri_finder.html index b0e42d7ab..8045fc0e1 100644 --- a/src/plugins/analysis/ip_and_uri_finder/view/ip_and_uri_finder.html +++ b/src/plugins/analysis/ip_and_uri_finder/view/ip_and_uri_finder.html @@ -8,7 +8,6 @@ {% if key == "ips_v4" %}IPv4 {% elif key == "ips_v6" %}IPv6 - {% elif key == "ips" %}IP {% else %}URI{% endif %} @@ -16,20 +15,16 @@ {% for item in value %} {% if key == "ips_v6" or key == "ips_v4" %}
  • - {{ item[0] }} - {% if item[1] %} - + {{ item.address }} + {% if item.location %} + {% endif %}
  • {% else %}
  • - {% if key != 'ips' %} - {{ item }} - {% else %} - {{ item }} - {% endif %} + {{ item }}
  • {% endif %} {% endfor %} @@ -40,4 +35,3 @@ {% endfor %} {% endblock %} - diff --git a/src/statistic/update.py b/src/statistic/update.py index 7e7600f8b..d5b06a1d0 100644 --- a/src/statistic/update.py +++ b/src/statistic/update.py @@ -192,12 +192,14 @@ def get_ip_stats(self) -> dict[str, Stats]: @staticmethod def _remove_location_info(ip_stats: dict[str, Stats]): - # IP data can contain location info -> just use the IP string (which is the first element in a list) for key in ['ips_v4', 'ips_v6']: for index, (ip, count) in enumerate(ip_stats[key]): if isinstance(ip, list): + # FixMe: deprecated format of the old plugin version => remove in future release ip_without_gps_info = ip[0] ip_stats[key][index] = (ip_without_gps_info, count) + elif isinstance(ip, dict): + ip_stats[key][index] = (ip['address'], count) def get_time_stats(self): release_date_stats = self.db.get_release_date_stats(q_filter=self.match) diff --git a/src/storage/db_interface_stats.py b/src/storage/db_interface_stats.py index 262ac700e..433b73063 100644 --- a/src/storage/db_interface_stats.py +++ b/src/storage/db_interface_stats.py @@ -295,7 +295,9 @@ def count_occurrences(result_list: list[str]) -> Stats: def _sort_tuples(query_result: Stats) -> Stats: # Sort stats tuples by count in ascending order - return sorted(_convert_to_tuples(query_result), key=lambda e: (e[1], e[0])) + return sorted( + _convert_to_tuples(query_result), key=lambda e: (e[1], e[0]) if not isinstance(e[0], dict) else (e[1],) + ) def _convert_to_tuples(query_result) -> Iterator[tuple[str, int]]: diff --git a/src/web_interface/templates/show_statistic.html b/src/web_interface/templates/show_statistic.html index 1a9dd5cc1..154f8ac7b 100644 --- a/src/web_interface/templates/show_statistic.html +++ b/src/web_interface/templates/show_statistic.html @@ -209,7 +209,7 @@

    Firmware Statistics

    {% call macros.stats_panel("IPv4 Addresses (Top {}/{})".format([10, ips_v4_num] | min, ips_v4_num), "globe") %} {% for ip, count in (stats["ip_and_uri_stats"]["ips_v4"] | sort_chart_list_by_value)[:10] %} - {% set query = {"processed_analysis.ip_and_uri_finder.ips_v4": {"$contains": ip}} %} + {% set query = {"processed_analysis.ip_and_uri_finder.ips_v4": {"$contains": [{"address": ip}]}} %} {{ macros.stats_table_row(ip, count, link=query_url + query | json_dumps | urlencode) }} {% endfor %}
    @@ -221,7 +221,7 @@

    Firmware Statistics

    {% call macros.stats_panel("IPv6 Addresses (Top {}/{})".format([10, ips_v6_num] | min, ips_v6_num), "globe") %} {% for ip, count in (stats["ip_and_uri_stats"]["ips_v6"] | sort_chart_list_by_value)[:10] %} - {% set query = {"processed_analysis.ip_and_uri_finder.ips_v6": {"$contains": ip}} %} + {% set query = {"processed_analysis.ip_and_uri_finder.ips_v6": {"$contains": [{"address": ip}]}} %} {{ macros.stats_table_row(ip, count, link=query_url + query | json_dumps | urlencode) }} {% endfor %}