-
Notifications
You must be signed in to change notification settings - Fork 227
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: converted IP&URI plugin to new base class
- Loading branch information
Showing
6 changed files
with
148 additions
and
176 deletions.
There are no files selected for viewing
169 changes: 91 additions & 78 deletions
169
src/plugins/analysis/ip_and_uri_finder/code/ip_and_uri_finder.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,105 +1,118 @@ | ||
from __future__ import annotations | ||
|
||
import logging | ||
from contextlib import suppress | ||
from itertools import product | ||
from pathlib import Path | ||
from re import search | ||
from typing import TYPE_CHECKING, List, Optional | ||
|
||
import geoip2.database | ||
from common_analysis_ip_and_uri_finder import CommonAnalysisIPAndURIFinder | ||
from geoip2.errors import AddressNotFoundError | ||
from maxminddb.errors import InvalidDatabaseError | ||
from pydantic import BaseModel | ||
|
||
from analysis.plugin import AnalysisPluginV0 | ||
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin | ||
|
||
from analysis.PluginBase import AnalysisBasePlugin | ||
if TYPE_CHECKING: | ||
from io import FileIO | ||
|
||
GEOIP_DATABASE_PATH = Path(__file__).parent.parent / 'bin/GeoLite2-City/GeoLite2-City.mmdb' | ||
|
||
IP_V4_BLACKLIST = [r'127.0.[0-9]+.1', r'255.[0-9]+.[0-9]+.[0-9]+'] # localhost # subnet masks | ||
IP_V6_BLACKLIST = [r'^[0-9A-Za-z]::$', r'^::[0-9A-Za-z]$', r'^[0-9A-Za-z]::[0-9A-Za-z]$', r'^::$'] # trivial addresses | ||
|
||
|
||
class AnalysisPlugin(AnalysisBasePlugin): | ||
NAME = 'ip_and_uri_finder' | ||
DEPENDENCIES = [] # noqa: RUF012 | ||
MIME_WHITELIST = [ # noqa: RUF012 | ||
'text/plain', | ||
'application/octet-stream', | ||
'application/x-executable', | ||
'application/x-object', | ||
'application/x-sharedlib', | ||
'application/x-dosexec', | ||
] | ||
DESCRIPTION = 'Search file for IP addresses and URIs based on regular expressions.' | ||
VERSION = '0.4.2' | ||
FILE = __file__ | ||
|
||
def additional_setup(self): | ||
class IpAddress(BaseModel): | ||
address: str | ||
location: Optional[Location] | ||
|
||
|
||
class Location(BaseModel): | ||
longitude: float | ||
latitude: float | ||
|
||
|
||
class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin): | ||
class Schema(BaseModel): | ||
ips_v4: List[IpAddress] | ||
ips_v6: List[IpAddress] | ||
uris: List[str] | ||
|
||
def __init__(self): | ||
self.ip_and_uri_finder = CommonAnalysisIPAndURIFinder() | ||
try: | ||
self.reader = geoip2.database.Reader(str(GEOIP_DATABASE_PATH)) | ||
except FileNotFoundError: | ||
logging.error('could not load GeoIP database') | ||
self.reader = None | ||
|
||
def process_object(self, file_object): | ||
result = self.ip_and_uri_finder.analyze_file(file_object.file_path, separate_ipv6=True) | ||
|
||
for key in ['uris', 'ips_v4', 'ips_v6']: | ||
result[key] = self._remove_duplicates(result[key]) | ||
result['ips_v4'] = self._remove_blacklisted(result['ips_v4'], IP_V4_BLACKLIST) | ||
result['ips_v6'] = self._remove_blacklisted(result['ips_v6'], IP_V6_BLACKLIST) | ||
|
||
file_object.processed_analysis[self.NAME] = self._get_augmented_result(self.add_geo_uri_to_ip(result)) | ||
|
||
return file_object | ||
|
||
def _get_augmented_result(self, result): | ||
result['summary'] = self._get_summary(result) | ||
result['system_version'] = self.ip_and_uri_finder.system_version | ||
return result | ||
|
||
def add_geo_uri_to_ip(self, result): | ||
for key in ['ips_v4', 'ips_v6']: | ||
result[key] = self.link_ips_with_geo_location(result[key]) | ||
return result | ||
|
||
def find_geo_location(self, ip_address): | ||
response = self.reader.city(ip_address) | ||
return f'{response.location.latitude}, {response.location.longitude}' | ||
|
||
def link_ips_with_geo_location(self, ip_addresses): | ||
linked_ip_geo_list = [] | ||
for ip in ip_addresses: | ||
try: | ||
ip_tuple = ip, self.find_geo_location(ip) | ||
except ( | ||
AttributeError, | ||
AddressNotFoundError, | ||
FileNotFoundError, | ||
ValueError, | ||
InvalidDatabaseError, | ||
) as exception: | ||
logging.debug(f'Error during {self.NAME} analysis: {exception!s}', exc_info=True) | ||
ip_tuple = ip, '' | ||
linked_ip_geo_list.append(ip_tuple) | ||
return linked_ip_geo_list | ||
|
||
@staticmethod | ||
def _get_summary(results): | ||
summary = [] | ||
summary.extend(results['uris']) | ||
for key in ['ips_v4', 'ips_v6']: | ||
for ip, *_ in results[key]: # IP results come in tuples (ip, latitude, longitude) | ||
summary.append(ip) | ||
super().__init__( | ||
metadata=self.MetaData( | ||
name='ip_and_uri_finder', | ||
description='Search file for IP addresses and URIs based on regular expressions.', | ||
version='1.0.0', | ||
Schema=self.Schema, | ||
mime_whitelist=[ | ||
'text/plain', | ||
'application/octet-stream', | ||
'application/x-executable', | ||
'application/x-object', | ||
'application/x-sharedlib', | ||
'application/x-dosexec', | ||
], | ||
system_version=self.ip_and_uri_finder.system_version, | ||
), | ||
) | ||
|
||
def analyze(self, file_handle: FileIO, virtual_file_path: dict[str, list[str]], analyses: dict) -> Schema: | ||
del virtual_file_path, analyses | ||
ip_data = self.ip_and_uri_finder.analyze_file(file_handle.name, separate_ipv6=True) | ||
ip_v4_results = _remove_blacklisted(_remove_duplicates(ip_data['ips_v4']), IP_V4_BLACKLIST) | ||
ip_v6_results = _remove_blacklisted(_remove_duplicates(ip_data['ips_v6']), IP_V6_BLACKLIST) | ||
uris = _remove_duplicates(ip_data['uris']) | ||
return self.Schema( | ||
ips_v4=[IpAddress(address=ip, location=self.find_geo_location(ip)) for ip in ip_v4_results], | ||
ips_v6=[IpAddress(address=ip, location=self.find_geo_location(ip)) for ip in ip_v6_results], | ||
uris=uris, | ||
) | ||
|
||
def find_geo_location(self, ip_address: str) -> Location | None: | ||
if self.reader is None: | ||
return None | ||
try: | ||
response = self.reader.city(ip_address) | ||
return Location( | ||
longitude=float(response.location.longitude), | ||
latitude=float(response.location.latitude), | ||
) | ||
except ( | ||
AttributeError, | ||
AddressNotFoundError, | ||
FileNotFoundError, | ||
ValueError, | ||
InvalidDatabaseError, | ||
) as exception: | ||
logging.debug(f'Error during {self.NAME} analysis: {exception!s}', exc_info=True) | ||
return None | ||
|
||
def summarize(self, result: Schema) -> list: | ||
summary = [*result.uris] | ||
for ip_list in [result.ips_v4, result.ips_v6]: | ||
for ip in ip_list: | ||
summary.append(ip.address) | ||
return summary | ||
|
||
@staticmethod | ||
def _remove_duplicates(input_list): | ||
return list(set(input_list)) | ||
|
||
@staticmethod | ||
def _remove_blacklisted(ip_list, blacklist): | ||
for ip, blacklist_entry in product(ip_list, blacklist): | ||
if search(blacklist_entry, ip): | ||
with suppress(ValueError): | ||
ip_list.remove(ip) | ||
return ip_list | ||
|
||
def _remove_duplicates(input_list: list[str]) -> list[str]: | ||
return list(set(input_list)) | ||
|
||
|
||
def _remove_blacklisted(ip_list: list[str], blacklist: list[str]) -> list[str]: | ||
for ip, blacklist_entry in product(ip_list, blacklist): | ||
if search(blacklist_entry, ip): | ||
with suppress(ValueError): | ||
ip_list.remove(ip) | ||
return ip_list |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.