diff --git a/CHANGES b/CHANGES index 9bd85ee..967e62f 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,8 @@ +1.6.2 +------- +- Fix: analysis summary didn't look for genes in root analysis + + 1.6.1 ------- - Fix: Handle no iocs correctly diff --git a/README.md b/README.md index ff2cbe3..28b9f70 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ Basic SDK for Intezer Analyze API 2.0 [View full API documentation](https://analyze.intezer.com/api/docs/documentation) (Notice - You must be logged in to Intezer Analyze to access the documentation) -Currently the following options are available in the SDK: +Currently, the following options are available in the SDK: - Analyze by file - Analyze by SHA256 @@ -13,7 +13,8 @@ Currently the following options are available in the SDK: - Index by SHA256 - Get Latest Analysis - Account and file related samples -- Code reuse and metadata +- Code reuse and Metadata +- IOCs, Dynamic TTPs and Capabilities - Strings related samples - Search a family @@ -138,6 +139,9 @@ You can find more code examples under [analyze-python-sdk/examples/](https://git ## Changelog +### 1.6.2 +- Fix: analysis summary didn't look for genes in root analysis + ### 1.6.1 - Fix: Handle no iocs correctly diff --git a/examples/sentinel_one_integration.py b/examples/sentinel_one_integration.py index cbd202d..1ede358 100755 --- a/examples/sentinel_one_integration.py +++ b/examples/sentinel_one_integration.py @@ -4,7 +4,9 @@ import datetime import io import logging +import logging.handlers import secrets +import sys import time import urllib.parse from http import HTTPStatus @@ -13,16 +15,18 @@ import requests import requests.adapters + from intezer_sdk import api from intezer_sdk import errors +from intezer_sdk import util from intezer_sdk.analysis import Analysis -from intezer_sdk.util import get_analysis_summary _s1_session: Optional[requests.Session] = None _logger = logging.getLogger('intezer') class BaseUrlSession(requests.Session): + """Taken from https://github.com/requests/toolbelt/blob/master/requests_toolbelt/sessions.py""" base_url = None def __init__(self, base_url=None): @@ -31,23 +35,23 @@ def __init__(self, base_url=None): super(BaseUrlSession, self).__init__() def request(self, method, url, *args, **kwargs): - 'Send the request after generating the complete URL.' + """Send the request after generating the complete URL.""" url = self.create_url(url) return super(BaseUrlSession, self).request( method, url, *args, **kwargs ) def prepare_request(self, request): - 'Prepare the request after generating the complete URL.' + """Prepare the request after generating the complete URL.""" request.url = self.create_url(request.url) return super(BaseUrlSession, self).prepare_request(request) def create_url(self, url): - 'Create the URL based off this partial path.' + """Create the URL based off this partial path.""" return urllib.parse.urljoin(self.base_url, url) -def init_s1_requests_session(api_token: str, base_url: str, skip_ssl_verification: bool=False): +def init_s1_requests_session(api_token: str, base_url: str, skip_ssl_verification: bool): headers = {'Authorization': 'ApiToken ' + api_token} global _s1_session _s1_session = BaseUrlSession(base_url) @@ -60,6 +64,7 @@ def init_s1_requests_session(api_token: str, base_url: str, skip_ssl_verificatio def analyze_by_file(threat_id: str): download_url, zipp_password = fetch_file(threat_id) file = download_file(download_url) + _logger.debug('starting to analyze file') analysis = Analysis(file_stream=file, file_name=f'{threat_id}.zip', zip_password=zipp_password) return analysis @@ -68,12 +73,13 @@ def fetch_file(threat_id: str) -> Tuple[str, Optional[str]]: zip_password = secrets.token_urlsafe(32) fetch_file_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=5) + _logger.debug('sending fetch command to the endpoint') response = _s1_session.post('/web/api/v2.1/threats/fetch-file', json={'data': {'password': zip_password}, 'filter': {'ids': [threat_id]}}) assert_s1_response(response) - for c in range(20): - _logger.debug(f'starting to fetch file with request number {c}') + for count in range(20): + _logger.debug(f'waiting for s1 to fetch the file from the endpoint ({count})') time.sleep(10) response = _s1_session.get('/web/api/v2.1/activities', params={'threatIds': threat_id, @@ -87,20 +93,18 @@ def fetch_file(threat_id: str) -> Tuple[str, Optional[str]]: if download_url: return download_url, zip_password else: - err_msg = 'Time out fetching the file, this is most likely when the endpoint is powered off' \ - 'or the agent is shut down' + err_msg = ('Time out fetching the file, this is most likely when the endpoint is powered off' + 'or the agent is shut down') _logger.debug(err_msg) raise Exception(err_msg) def download_file(download_url: str): - _logger.debug(f'starting to download file from s1 with download url of {download_url}') + _logger.debug(f'downloading file from s1 (download url of {download_url})') response = _s1_session.get('/web/api/v2.1' + download_url) - _logger.debug(f'got this response from s1 - {response}') - assert_s1_response(response) - _logger.debug(f'assert s1 response finished successfully') + _logger.debug(f'download finished') file = io.BytesIO(response.content) return file @@ -139,8 +143,8 @@ def filter_threat(threat_info: dict) -> bool: return threat_info['agentDetectionInfo']['agentOsName'].lower().startswith(('linux', 'windows')) -def send_note(threat_id: str, analysis: Analysis, no_emojis: bool): - note = get_analysis_summary(analysis, no_emojis) +def send_note(threat_id: str, analysis: Analysis): + note = util.get_analysis_summary(analysis) response = _s1_session.post('/web/api/v2.1/threats/notes', json={'data': {'text': note}, 'filter': {'ids': [threat_id]}}) @@ -153,12 +157,11 @@ def send_failure_note(note: str, threat_id: str): assert_s1_response(response) -def analyze_threat(intezer_api_key: str, s1_api_key: str, s1_base_address: str, threat_id: str, skip_ssl_verification: bool=False, no_emojis: bool=False): - api.set_global_api(intezer_api_key) - init_s1_requests_session(s1_api_key, s1_base_address, skip_ssl_verification) +def analyze_threat(threat_id: str, threat: dict = None): _logger.info(f'incoming threat: {threat_id}') try: - threat = get_threat(threat_id) + if not threat: + threat = get_threat(threat_id) if not filter_threat(threat): _logger.info(f'threat {threat_id} is been filtered') return @@ -176,7 +179,6 @@ def analyze_threat(intezer_api_key: str, s1_api_key: str, s1_base_address: str, analysis = None if not analysis: - _logger.debug('starting to analyze file') analysis = analyze_by_file(threat_id) analysis.send(requester='s1') @@ -184,7 +186,7 @@ def analyze_threat(intezer_api_key: str, s1_api_key: str, s1_base_address: str, analysis.wait_for_completion() _logger.debug('analysis completed') - send_note(threat_id, analysis, no_emojis) + send_note(threat_id, analysis) except Exception as ex: send_failure_note(str(ex), threat_id) @@ -197,22 +199,52 @@ def parse_argparse_args(): parser.add_argument('-i', '--intezer-api-key', help='Intezer API key', required=True) parser.add_argument('-s', '--s1-api-key', help='S1 API Key', required=True) parser.add_argument('-a', '--s1-base-address', help='S1 base address', required=True) - parser.add_argument('-t', '--threat-id', help='S1 threat id', required=True) parser.add_argument('-sv', '--skip-ssl-verification', action='store_true', help='Skipping SSL verification on S1 request') - parser.add_argument('-ne', '--no-emojis', action='store_true', help="Don't show emojis") + subparser_options = {} + if sys.version_info >= (3, 7): + subparser_options['required'] = True + + subparsers = parser.add_subparsers(title='valid subcommands', dest='subcommand', **subparser_options) + threat_parser = subparsers.add_parser('threat', help='Get a threat ID and analyze it') + threat_parser.add_argument('threat_id', help='SentinelOne threat id') + subparsers.add_parser('query', help='Analyze new incoming threat') return parser.parse_args() -if __name__ == '__main__': - args = parse_argparse_args() +def _init_logger(): + _logger.setLevel(logging.DEBUG) + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s')) + _logger.addHandler(handler) - analyze_threat(args.intezer_api_key, - args.s1_api_key, - args.s1_base_address, - args.threat_id, - args.skip_ssl_verification, - args.no_emojis) +def query_threats(): + next_time_query = datetime.datetime.utcnow() + while True: + _logger.info('checking for new threats...') + response = _s1_session.get('/web/api/v2.1/threats', params={'createdAt__gte': next_time_query.isoformat()}) + next_time_query = datetime.datetime.utcnow() + assert_s1_response(response) + threats = response.json()['data'] + for threat in threats: + analyze_threat(threat['id'], threat) + if not threats: + _logger.info('no new threats found') + time.sleep(10) + + +if __name__ == '__main__': + _args = parse_argparse_args() + api.set_global_api(_args.intezer_api_key) + init_s1_requests_session(_args.s1_api_key, _args.s1_base_address, _args.skip_ssl_verification) + _init_logger() + if _args.subcommand == 'threat': + analyze_threat(_args.threat_id) + elif _args.subcommand == 'query': + query_threats() + else: + print('error: the following arguments are required: subcommand') + sys.exit(1) diff --git a/intezer_sdk/__init__.py b/intezer_sdk/__init__.py index bb64aa4..4a9b978 100644 --- a/intezer_sdk/__init__.py +++ b/intezer_sdk/__init__.py @@ -1 +1 @@ -__version__ = '1.6.1' +__version__ = '1.6.2' diff --git a/intezer_sdk/util.py b/intezer_sdk/util.py index a882ad8..e2eb08e 100644 --- a/intezer_sdk/util.py +++ b/intezer_sdk/util.py @@ -1,4 +1,5 @@ import collections +import itertools from typing import List from typing import Optional from typing import Tuple @@ -14,7 +15,7 @@ } -def get_analysis_summary(analysis: Analysis, no_emojis: bool=False) -> str: +def get_analysis_summary(analysis: Analysis, no_emojis: bool = False) -> str: result = analysis.result() metadata = analysis.get_root_analysis().metadata @@ -96,7 +97,7 @@ def get_analysis_family(analysis: Analysis, software_type_priorities: List[str]) def get_analysis_family_by_family_id(analysis: Analysis, family_id: str) -> int: reused_gene_count = 0 - for sub_analysis in analysis.get_sub_analyses(): + for sub_analysis in itertools.chain([analysis.get_root_analysis()], analysis.get_sub_analyses()): if not sub_analysis.code_reuse: continue @@ -104,13 +105,14 @@ def get_analysis_family_by_family_id(analysis: Analysis, family_id: str) -> int: if family['family_id'] == family_id: if family['reused_gene_count'] > reused_gene_count: reused_gene_count = family['reused_gene_count'] + break return reused_gene_count def find_largest_family(analysis: Analysis) -> dict: largest_family_by_software_type = collections.defaultdict(lambda: {'reused_gene_count': 0}) - for sub_analysis in analysis.get_sub_analyses(): + for sub_analysis in itertools.chain([analysis.get_root_analysis()], analysis.get_sub_analyses()): for family in sub_analysis.code_reuse['families']: software_type = family['family_type'] if family['reused_gene_count'] > largest_family_by_software_type[software_type]['reused_gene_count']: