From 3ebbc4d77f433c60ce168f096082e4d9f3770d4c Mon Sep 17 00:00:00 2001 From: davidt99 Date: Wed, 1 Jun 2022 11:19:51 +0300 Subject: [PATCH] fix: add extraction info to sub analysis (#49) --- CHANGES | 4 ++++ intezer_sdk/__init__.py | 2 +- intezer_sdk/analysis.py | 1 + intezer_sdk/api.py | 36 +++++++++++++++++-------------- intezer_sdk/sub_analysis.py | 43 +++++++++++++++++++++---------------- tests/unit/test_analysis.py | 4 ++-- 6 files changed, 52 insertions(+), 38 deletions(-) diff --git a/CHANGES b/CHANGES index 1d507e6..84f2b47 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,7 @@ +1.8.3 +------- +- add extraction info to sub analysis + 1.8.1 ------- - Add space in note title diff --git a/intezer_sdk/__init__.py b/intezer_sdk/__init__.py index aa1a8c4..cfe6447 100644 --- a/intezer_sdk/__init__.py +++ b/intezer_sdk/__init__.py @@ -1 +1 @@ -__version__ = '1.8.2' +__version__ = '1.8.3' diff --git a/intezer_sdk/analysis.py b/intezer_sdk/analysis.py index c3cbf02..f825a53 100644 --- a/intezer_sdk/analysis.py +++ b/intezer_sdk/analysis.py @@ -136,6 +136,7 @@ def _init_sub_analyses(self): self.analysis_id, sub_analysis['sha256'], sub_analysis['source'], + sub_analysis.get('extraction_info'), api=self._api) if sub_analysis_object.source == 'root': self._root_analysis = sub_analysis_object diff --git a/intezer_sdk/api.py b/intezer_sdk/api.py index b57c9d3..5fb3a1a 100644 --- a/intezer_sdk/api.py +++ b/intezer_sdk/api.py @@ -1,7 +1,11 @@ import os -import typing from http import HTTPStatus +from typing import Any +from typing import BinaryIO +from typing import Dict +from typing import List from typing import Optional +from typing import Union import requests import requests.adapters @@ -13,12 +17,12 @@ from intezer_sdk.consts import IndexType from intezer_sdk.consts import OnPremiseVersion -_global_api: typing.Optional['IntezerApi'] = None +_global_api: Optional['IntezerApi'] = None def raise_for_status(response: requests.Response, - statuses_to_ignore: typing.List[typing.Union[HTTPStatus, int]] = None, - allowed_statuses: typing.List[typing.Union[HTTPStatus, int]] = None): + statuses_to_ignore: List[Union[HTTPStatus, int]] = None, + allowed_statuses: List[Union[HTTPStatus, int]] = None): """Raises stored :class:`HTTPError`, if one occurred.""" http_error_msg = '' @@ -124,7 +128,7 @@ def analyze_by_hash(self, return self._get_analysis_id_from_response(response) - def _analyze_file_stream(self, file_stream: typing.BinaryIO, file_name: str, options: dict) -> str: + def _analyze_file_stream(self, file_stream: BinaryIO, file_name: str, options: dict) -> str: file = {'file': (file_name, file_stream)} response = self.request_with_refresh_expired_access_token(path='/analyze', @@ -138,13 +142,13 @@ def _analyze_file_stream(self, file_stream: typing.BinaryIO, file_name: str, opt def analyze_by_file(self, file_path: str = None, - file_stream: typing.BinaryIO = None, + file_stream: BinaryIO = None, disable_dynamic_unpacking: bool = None, disable_static_unpacking: bool = None, file_name: str = None, code_item_type: str = None, zip_password: str = None, - **additional_parameters) -> typing.Optional[str]: + **additional_parameters) -> Optional[str]: options = self._param_initialize(disable_dynamic_unpacking, disable_static_unpacking, code_item_type, @@ -160,7 +164,7 @@ def analyze_by_file(self, def get_latest_analysis(self, file_hash: str, private_only: bool = False, - **additional_parameters) -> typing.Optional[dict]: + **additional_parameters) -> Optional[dict]: if not self.on_premise_version or self.on_premise_version > OnPremiseVersion.V21_11: options = {'should_get_only_private_analysis': private_only, **additional_parameters} @@ -196,14 +200,14 @@ def get_url_analysis_response(self, analyses_id: str, ignore_not_found: bool) -> return response - def get_iocs(self, analyses_id: str) -> typing.Optional[dict]: + def get_iocs(self, analyses_id: str) -> Optional[dict]: response = self.request_with_refresh_expired_access_token(path='/analyses/{}/iocs'.format(analyses_id), method='GET') raise_for_status(response) return response.json()['result'] - def get_dynamic_ttps(self, analyses_id: str) -> typing.Optional[dict]: + def get_dynamic_ttps(self, analyses_id: str) -> Optional[dict]: self.assert_on_premise_above_v21_11() response = self.request_with_refresh_expired_access_token(path='/analyses/{}/dynamic-ttps'.format(analyses_id), method='GET') @@ -211,7 +215,7 @@ def get_dynamic_ttps(self, analyses_id: str) -> typing.Optional[dict]: return response.json()['result'] - def get_family_info(self, family_id: str) -> typing.Optional[dict]: + def get_family_info(self, family_id: str) -> Optional[dict]: response = self.request_with_refresh_expired_access_token('GET', '/families/{}/info'.format(family_id)) if response.status_code == HTTPStatus.NOT_FOUND: return None @@ -219,7 +223,7 @@ def get_family_info(self, family_id: str) -> typing.Optional[dict]: raise_for_status(response, allowed_statuses=[HTTPStatus.OK]) return response.json()['result'] - def get_family_by_name(self, family_name: str) -> typing.Optional[typing.Dict[str, typing.Any]]: + def get_family_by_name(self, family_name: str) -> Optional[Dict[str, Any]]: response = self.request_with_refresh_expired_access_token('GET', '/families', {'family_name': family_name}) if response.status_code == HTTPStatus.NOT_FOUND: return None @@ -227,7 +231,7 @@ def get_family_by_name(self, family_name: str) -> typing.Optional[typing.Dict[st raise_for_status(response, allowed_statuses=[HTTPStatus.OK]) return response.json()['result'] - def get_sub_analyses_by_id(self, analysis_id: str) -> typing.Optional[list]: + def get_sub_analyses_by_id(self, analysis_id: str) -> Optional[List[dict]]: response = self.request_with_refresh_expired_access_token(path='/analyses/{}/sub-analyses'.format(analysis_id), method='GET') raise_for_status(response) @@ -236,7 +240,7 @@ def get_sub_analyses_by_id(self, analysis_id: str) -> typing.Optional[list]: def get_sub_analysis_code_reuse_by_id(self, composed_analysis_id: str, - sub_analysis_id: str) -> typing.Optional[dict]: + sub_analysis_id: str) -> Optional[dict]: response = self.request_with_refresh_expired_access_token(path='/analyses/{}/sub-analyses/{}/code-reuse' .format(composed_analysis_id, sub_analysis_id), method='GET') @@ -319,7 +323,7 @@ def get_string_related_samples_by_id(self, return response.json()['result_url'] - def get_url_result(self, url: str) -> typing.Optional[Response]: + def get_url_result(self, url: str) -> Optional[Response]: response = self.request_with_refresh_expired_access_token(path=url, method='GET') raise_for_status(response) @@ -401,7 +405,7 @@ def set_session(self): self._session.headers['Authorization'] = 'Bearer {}'.format(self._access_token) self._session.headers['User-Agent'] = consts.USER_AGENT - def analyze_url(self, url: str, **additional_parameters) -> typing.Optional[str]: + def analyze_url(self, url: str, **additional_parameters) -> Optional[str]: self.assert_on_premise_above_v21_11() response = self.request_with_refresh_expired_access_token(method='POST', path='/url/', diff --git a/intezer_sdk/sub_analysis.py b/intezer_sdk/sub_analysis.py index 63686e2..021dda1 100644 --- a/intezer_sdk/sub_analysis.py +++ b/intezer_sdk/sub_analysis.py @@ -1,24 +1,29 @@ import datetime -import typing +from typing import Optional +from typing import Union -from intezer_sdk import errors from intezer_sdk.api import IntezerApi from intezer_sdk.api import get_global_api from intezer_sdk.consts import AnalysisStatusCode -from intezer_sdk.consts import OnPremiseVersion from intezer_sdk.operation import Operation class SubAnalysis: - def __init__(self, analysis_id: str, composed_analysis_id: str, sha256: str, source: str, api: IntezerApi = None): + def __init__(self, + analysis_id: str, + composed_analysis_id: str, + sha256: str, + source: str, + extraction_info: Optional[dict], + api: IntezerApi = None): self.composed_analysis_id = composed_analysis_id self.analysis_id = analysis_id self.sha256 = sha256 self.source = source + self.extraction_info = extraction_info self._api = api or get_global_api() self._code_reuse = None self._metadata = None - self._capabilities = None self._operations = {} @property @@ -35,16 +40,16 @@ def metadata(self): def find_related_files(self, family_id: str, - wait: typing.Union[bool, int] = False, - wait_timeout: typing.Optional[datetime.timedelta] = None) -> Operation: + wait: Union[bool, int] = False, + wait_timeout: Optional[datetime.timedelta] = None) -> Operation: result_url = self._api.get_sub_analysis_related_files_by_family_id(self.composed_analysis_id, self.analysis_id, family_id) return self._handle_operation(family_id, result_url, wait, wait_timeout) def get_account_related_samples(self, - wait: typing.Union[bool, int] = False, - wait_timeout: typing.Optional[datetime.timedelta] = None) -> typing.Optional[Operation]: + wait: Union[bool, int] = False, + wait_timeout: Optional[datetime.timedelta] = None) -> Optional[Operation]: try: result_url = self._api.get_sub_analysis_account_related_samples_by_id(self.composed_analysis_id, self.analysis_id) @@ -54,27 +59,27 @@ def get_account_related_samples(self, return self._handle_operation('Account related samples', result_url, wait, wait_timeout) def generate_vaccine(self, - wait: typing.Union[bool, int] = False, - wait_timeout: typing.Optional[datetime.timedelta] = None) -> Operation: + wait: Union[bool, int] = False, + wait_timeout: Optional[datetime.timedelta] = None) -> Operation: result_url = self._api.generate_sub_analysis_vaccine_by_id(self.composed_analysis_id, self.analysis_id) return self._handle_operation('Vaccine', result_url, wait, wait_timeout) def get_capabilities(self, - wait: typing.Union[bool, int] = False, - wait_timeout: typing.Optional[datetime.timedelta] = None) -> Operation: + wait: Union[bool, int] = False, + wait_timeout: Optional[datetime.timedelta] = None) -> Operation: result_url = self._api.get_sub_analysis_capabilities_by_id(self.composed_analysis_id, self.analysis_id) return self._handle_operation('Capabilities', result_url, wait, wait_timeout) def get_strings(self, - wait: typing.Union[bool, int] = False, - wait_timeout: typing.Optional[datetime.timedelta] = None) -> Operation: + wait: Union[bool, int] = False, + wait_timeout: Optional[datetime.timedelta] = None) -> Operation: result = self._api.get_strings_by_id(self.composed_analysis_id, self.analysis_id) return self._handle_operation('Strings', result['result_url'], wait, wait_timeout) def get_string_related_samples(self, string_value: str, - wait: typing.Union[bool, int] = False, - wait_timeout: typing.Optional[datetime.timedelta] = None) -> Operation: + wait: Union[bool, int] = False, + wait_timeout: Optional[datetime.timedelta] = None) -> Operation: result_url = self._api.get_string_related_samples_by_id(self.composed_analysis_id, self.analysis_id, string_value) @@ -83,8 +88,8 @@ def get_string_related_samples(self, def _handle_operation(self, operation: str, url: str, - wait: typing.Union[bool, int], - wait_timeout: typing.Optional[datetime.timedelta]) -> Operation: + wait: Union[bool, int], + wait_timeout: Optional[datetime.timedelta]) -> Operation: if operation not in self._operations: self._operations[operation] = Operation(AnalysisStatusCode.IN_PROGRESS, url, api=self._api) diff --git a/tests/unit/test_analysis.py b/tests/unit/test_analysis.py index 7b7f95f..d6635a7 100644 --- a/tests/unit/test_analysis.py +++ b/tests/unit/test_analysis.py @@ -661,7 +661,7 @@ def test_sub_analysis_operations(self): url=self.full_url + 'a/b/capabilities', status=200, json={'result': 'abd'}) - sub_analysis = SubAnalysis('ab', 'asd', 'axaxax', 'root') + sub_analysis = SubAnalysis('ab', 'asd', 'axaxax', 'root', None) # Act related_files_operation = sub_analysis.find_related_files('ax', wait=True) @@ -681,7 +681,7 @@ def test_sub_analysis_operations(self): def test_capabilities_raises_when_on_premise_21_11(self): # Arrange - sub_analysis = SubAnalysis('ab', 'asd', 'axaxax', 'root') + sub_analysis = SubAnalysis('ab', 'asd', 'axaxax', 'root', None) get_global_api().on_premise_version = OnPremiseVersion.V21_11 # Act and Assert