Skip to content

Commit

Permalink
fix: add extraction info to sub analysis (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidt99 authored Jun 1, 2022
1 parent 7ae743e commit 3ebbc4d
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 38 deletions.
4 changes: 4 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
1.8.3
-------
- add extraction info to sub analysis

1.8.1
-------
- Add space in note title
Expand Down
2 changes: 1 addition & 1 deletion intezer_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.8.2'
__version__ = '1.8.3'
1 change: 1 addition & 0 deletions intezer_sdk/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def _init_sub_analyses(self):
self.analysis_id,
sub_analysis['sha256'],
sub_analysis['source'],
sub_analysis.get('extraction_info'),
api=self._api)
if sub_analysis_object.source == 'root':
self._root_analysis = sub_analysis_object
Expand Down
36 changes: 20 additions & 16 deletions intezer_sdk/api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import os
import typing
from http import HTTPStatus
from typing import Any
from typing import BinaryIO
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

import requests
import requests.adapters
Expand All @@ -13,12 +17,12 @@
from intezer_sdk.consts import IndexType
from intezer_sdk.consts import OnPremiseVersion

_global_api: typing.Optional['IntezerApi'] = None
_global_api: Optional['IntezerApi'] = None


def raise_for_status(response: requests.Response,
statuses_to_ignore: typing.List[typing.Union[HTTPStatus, int]] = None,
allowed_statuses: typing.List[typing.Union[HTTPStatus, int]] = None):
statuses_to_ignore: List[Union[HTTPStatus, int]] = None,
allowed_statuses: List[Union[HTTPStatus, int]] = None):
"""Raises stored :class:`HTTPError`, if one occurred."""

http_error_msg = ''
Expand Down Expand Up @@ -124,7 +128,7 @@ def analyze_by_hash(self,

return self._get_analysis_id_from_response(response)

def _analyze_file_stream(self, file_stream: typing.BinaryIO, file_name: str, options: dict) -> str:
def _analyze_file_stream(self, file_stream: BinaryIO, file_name: str, options: dict) -> str:
file = {'file': (file_name, file_stream)}

response = self.request_with_refresh_expired_access_token(path='/analyze',
Expand All @@ -138,13 +142,13 @@ def _analyze_file_stream(self, file_stream: typing.BinaryIO, file_name: str, opt

def analyze_by_file(self,
file_path: str = None,
file_stream: typing.BinaryIO = None,
file_stream: BinaryIO = None,
disable_dynamic_unpacking: bool = None,
disable_static_unpacking: bool = None,
file_name: str = None,
code_item_type: str = None,
zip_password: str = None,
**additional_parameters) -> typing.Optional[str]:
**additional_parameters) -> Optional[str]:
options = self._param_initialize(disable_dynamic_unpacking,
disable_static_unpacking,
code_item_type,
Expand All @@ -160,7 +164,7 @@ def analyze_by_file(self,
def get_latest_analysis(self,
file_hash: str,
private_only: bool = False,
**additional_parameters) -> typing.Optional[dict]:
**additional_parameters) -> Optional[dict]:

if not self.on_premise_version or self.on_premise_version > OnPremiseVersion.V21_11:
options = {'should_get_only_private_analysis': private_only, **additional_parameters}
Expand Down Expand Up @@ -196,38 +200,38 @@ def get_url_analysis_response(self, analyses_id: str, ignore_not_found: bool) ->

return response

def get_iocs(self, analyses_id: str) -> typing.Optional[dict]:
def get_iocs(self, analyses_id: str) -> Optional[dict]:
response = self.request_with_refresh_expired_access_token(path='/analyses/{}/iocs'.format(analyses_id),
method='GET')
raise_for_status(response)

return response.json()['result']

def get_dynamic_ttps(self, analyses_id: str) -> typing.Optional[dict]:
def get_dynamic_ttps(self, analyses_id: str) -> Optional[dict]:
self.assert_on_premise_above_v21_11()
response = self.request_with_refresh_expired_access_token(path='/analyses/{}/dynamic-ttps'.format(analyses_id),
method='GET')
raise_for_status(response)

return response.json()['result']

def get_family_info(self, family_id: str) -> typing.Optional[dict]:
def get_family_info(self, family_id: str) -> Optional[dict]:
response = self.request_with_refresh_expired_access_token('GET', '/families/{}/info'.format(family_id))
if response.status_code == HTTPStatus.NOT_FOUND:
return None

raise_for_status(response, allowed_statuses=[HTTPStatus.OK])
return response.json()['result']

def get_family_by_name(self, family_name: str) -> typing.Optional[typing.Dict[str, typing.Any]]:
def get_family_by_name(self, family_name: str) -> Optional[Dict[str, Any]]:
response = self.request_with_refresh_expired_access_token('GET', '/families', {'family_name': family_name})
if response.status_code == HTTPStatus.NOT_FOUND:
return None

raise_for_status(response, allowed_statuses=[HTTPStatus.OK])
return response.json()['result']

def get_sub_analyses_by_id(self, analysis_id: str) -> typing.Optional[list]:
def get_sub_analyses_by_id(self, analysis_id: str) -> Optional[List[dict]]:
response = self.request_with_refresh_expired_access_token(path='/analyses/{}/sub-analyses'.format(analysis_id),
method='GET')
raise_for_status(response)
Expand All @@ -236,7 +240,7 @@ def get_sub_analyses_by_id(self, analysis_id: str) -> typing.Optional[list]:

def get_sub_analysis_code_reuse_by_id(self,
composed_analysis_id: str,
sub_analysis_id: str) -> typing.Optional[dict]:
sub_analysis_id: str) -> Optional[dict]:
response = self.request_with_refresh_expired_access_token(path='/analyses/{}/sub-analyses/{}/code-reuse'
.format(composed_analysis_id, sub_analysis_id),
method='GET')
Expand Down Expand Up @@ -319,7 +323,7 @@ def get_string_related_samples_by_id(self,

return response.json()['result_url']

def get_url_result(self, url: str) -> typing.Optional[Response]:
def get_url_result(self, url: str) -> Optional[Response]:
response = self.request_with_refresh_expired_access_token(path=url, method='GET')

raise_for_status(response)
Expand Down Expand Up @@ -401,7 +405,7 @@ def set_session(self):
self._session.headers['Authorization'] = 'Bearer {}'.format(self._access_token)
self._session.headers['User-Agent'] = consts.USER_AGENT

def analyze_url(self, url: str, **additional_parameters) -> typing.Optional[str]:
def analyze_url(self, url: str, **additional_parameters) -> Optional[str]:
self.assert_on_premise_above_v21_11()
response = self.request_with_refresh_expired_access_token(method='POST',
path='/url/',
Expand Down
43 changes: 24 additions & 19 deletions intezer_sdk/sub_analysis.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
import datetime
import typing
from typing import Optional
from typing import Union

from intezer_sdk import errors
from intezer_sdk.api import IntezerApi
from intezer_sdk.api import get_global_api
from intezer_sdk.consts import AnalysisStatusCode
from intezer_sdk.consts import OnPremiseVersion
from intezer_sdk.operation import Operation


class SubAnalysis:
def __init__(self, analysis_id: str, composed_analysis_id: str, sha256: str, source: str, api: IntezerApi = None):
def __init__(self,
analysis_id: str,
composed_analysis_id: str,
sha256: str,
source: str,
extraction_info: Optional[dict],
api: IntezerApi = None):
self.composed_analysis_id = composed_analysis_id
self.analysis_id = analysis_id
self.sha256 = sha256
self.source = source
self.extraction_info = extraction_info
self._api = api or get_global_api()
self._code_reuse = None
self._metadata = None
self._capabilities = None
self._operations = {}

@property
Expand All @@ -35,16 +40,16 @@ def metadata(self):

def find_related_files(self,
family_id: str,
wait: typing.Union[bool, int] = False,
wait_timeout: typing.Optional[datetime.timedelta] = None) -> Operation:
wait: Union[bool, int] = False,
wait_timeout: Optional[datetime.timedelta] = None) -> Operation:
result_url = self._api.get_sub_analysis_related_files_by_family_id(self.composed_analysis_id,
self.analysis_id,
family_id)
return self._handle_operation(family_id, result_url, wait, wait_timeout)

def get_account_related_samples(self,
wait: typing.Union[bool, int] = False,
wait_timeout: typing.Optional[datetime.timedelta] = None) -> typing.Optional[Operation]:
wait: Union[bool, int] = False,
wait_timeout: Optional[datetime.timedelta] = None) -> Optional[Operation]:
try:
result_url = self._api.get_sub_analysis_account_related_samples_by_id(self.composed_analysis_id,
self.analysis_id)
Expand All @@ -54,27 +59,27 @@ def get_account_related_samples(self,
return self._handle_operation('Account related samples', result_url, wait, wait_timeout)

def generate_vaccine(self,
wait: typing.Union[bool, int] = False,
wait_timeout: typing.Optional[datetime.timedelta] = None) -> Operation:
wait: Union[bool, int] = False,
wait_timeout: Optional[datetime.timedelta] = None) -> Operation:
result_url = self._api.generate_sub_analysis_vaccine_by_id(self.composed_analysis_id, self.analysis_id)
return self._handle_operation('Vaccine', result_url, wait, wait_timeout)

def get_capabilities(self,
wait: typing.Union[bool, int] = False,
wait_timeout: typing.Optional[datetime.timedelta] = None) -> Operation:
wait: Union[bool, int] = False,
wait_timeout: Optional[datetime.timedelta] = None) -> Operation:
result_url = self._api.get_sub_analysis_capabilities_by_id(self.composed_analysis_id, self.analysis_id)
return self._handle_operation('Capabilities', result_url, wait, wait_timeout)

def get_strings(self,
wait: typing.Union[bool, int] = False,
wait_timeout: typing.Optional[datetime.timedelta] = None) -> Operation:
wait: Union[bool, int] = False,
wait_timeout: Optional[datetime.timedelta] = None) -> Operation:
result = self._api.get_strings_by_id(self.composed_analysis_id, self.analysis_id)
return self._handle_operation('Strings', result['result_url'], wait, wait_timeout)

def get_string_related_samples(self,
string_value: str,
wait: typing.Union[bool, int] = False,
wait_timeout: typing.Optional[datetime.timedelta] = None) -> Operation:
wait: Union[bool, int] = False,
wait_timeout: Optional[datetime.timedelta] = None) -> Operation:
result_url = self._api.get_string_related_samples_by_id(self.composed_analysis_id,
self.analysis_id,
string_value)
Expand All @@ -83,8 +88,8 @@ def get_string_related_samples(self,
def _handle_operation(self,
operation: str,
url: str,
wait: typing.Union[bool, int],
wait_timeout: typing.Optional[datetime.timedelta]) -> Operation:
wait: Union[bool, int],
wait_timeout: Optional[datetime.timedelta]) -> Operation:
if operation not in self._operations:
self._operations[operation] = Operation(AnalysisStatusCode.IN_PROGRESS, url, api=self._api)

Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ def test_sub_analysis_operations(self):
url=self.full_url + 'a/b/capabilities',
status=200, json={'result': 'abd'})

sub_analysis = SubAnalysis('ab', 'asd', 'axaxax', 'root')
sub_analysis = SubAnalysis('ab', 'asd', 'axaxax', 'root', None)

# Act
related_files_operation = sub_analysis.find_related_files('ax', wait=True)
Expand All @@ -681,7 +681,7 @@ def test_sub_analysis_operations(self):

def test_capabilities_raises_when_on_premise_21_11(self):
# Arrange
sub_analysis = SubAnalysis('ab', 'asd', 'axaxax', 'root')
sub_analysis = SubAnalysis('ab', 'asd', 'axaxax', 'root', None)
get_global_api().on_premise_version = OnPremiseVersion.V21_11

# Act and Assert
Expand Down

0 comments on commit 3ebbc4d

Please sign in to comment.