Skip to content

Commit

Permalink
Merge pull request #100 from intezer/feature/add-verdict-propery
Browse files Browse the repository at this point in the history
feat(analysis): add verdict property to UrlAnalysis and EndpointAnalysis
  • Loading branch information
davidt99 authored Jul 2, 2023
2 parents 3e0bb4c + 269e058 commit 64de124
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 126 deletions.
5 changes: 5 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
1.17.4
______
- Add verdict property to `UrlAnalysis` and `EndpointAnalysis`


1.17.3
______
- Raise `UrlOfflineError` when analyzing a url that seems offline.
Expand Down
2 changes: 1 addition & 1 deletion intezer_sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.17.3'
__version__ = '1.17.4'
22 changes: 22 additions & 0 deletions intezer_sdk/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,17 @@ def dynamic_ttps(self) -> list:

@property
def verdict(self) -> str:
"""
The analysis verdict.
"""
self._assert_analysis_finished()
return self._report['verdict']

@property
def sub_verdict(self) -> str:
"""
The analysis sub-verdict.
"""
self._assert_analysis_finished()
return self._report['sub_verdict']

Expand Down Expand Up @@ -399,6 +405,22 @@ def from_latest_analysis(cls,

return cls.from_analysis_id(analyses_ids[0], api=api)

@property
def verdict(self) -> str:
"""
The analysis verdict.
"""
self._assert_analysis_finished()
return self._report['summary']['verdict_type']

@property
def sub_verdict(self) -> str:
"""
The analysis sub-verdict.
"""
self._assert_analysis_finished()
return self._report['summary']['verdict_name']

def _set_report(self, report: dict):
super()._set_report(report)
if not self.url:
Expand Down
8 changes: 8 additions & 0 deletions intezer_sdk/base_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ def _query_status_from_api(self) -> Response:
def from_analysis_id(cls, analysis_id: str, api: IntezerApiClient = None) -> 'Analysis':
raise NotImplementedError()

@property
@abc.abstractmethod
def verdict(self) -> str:
"""
The analysis verdict.
"""
raise NotImplementedError()

def wait_for_completion(self,
interval: int = None,
sleep_before_first_check=False,
Expand Down
8 changes: 8 additions & 0 deletions intezer_sdk/endpoint_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ def from_analysis_id(cls, analysis_id: str, api: IntezerApiClient = None):
response = IntezerApi(api or get_global_api()).get_endpoint_analysis_response(analysis_id, True)
return cls._create_analysis_from_response(response, api, analysis_id)

@property
def verdict(self) -> str:
"""
The analysis verdict.
"""
self._assert_analysis_finished()
return self._report['verdict']

def _set_report(self, report: dict):
super()._set_report(report)
if 'scan_start_time' in report:
Expand Down
136 changes: 136 additions & 0 deletions tests/unit/test_endpoint_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import uuid
from http import HTTPStatus

import responses

from intezer_sdk import consts
from intezer_sdk import errors
from intezer_sdk.endpoint_analysis import EndpointAnalysis
from tests.unit.base_test import BaseTest


class EndpointAnalysisSpec(BaseTest):
def test_analysis_in_progress(self):
# Arrange
analysis_id = str(uuid.uuid4())
result = {'status': 'in_progress'}

with responses.RequestsMock() as mock:
mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.ACCEPTED,
json=result)
# Act
analysis = EndpointAnalysis.from_analysis_id(analysis_id)

# Assert
self.assertIsNotNone(analysis)
self.assertEqual(analysis_id, analysis.analysis_id)
self.assertEqual(consts.AnalysisStatusCode.IN_PROGRESS, analysis.status)

def test_wait_for_completion(self):
# Arrange
analysis_id = str(uuid.uuid4())
in_progress_result = {
'status': 'in_progress',
'result_url': 'foo'
}
success_result = {
'status': 'succeeded',
'result': {
'analysis_id': analysis_id,
'scan_status': 'done'
}
}

with responses.RequestsMock() as mock:
mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.ACCEPTED,
json=in_progress_result)
mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.OK,
json=success_result)
# Act
analysis = EndpointAnalysis.from_analysis_id(analysis_id)
analysis.wait_for_completion(sleep_before_first_check=False)

# Assert
self.assertIsNotNone(analysis)
self.assertEqual(consts.AnalysisStatusCode.FINISHED, analysis.status)

def test_analysis_done(self):
# Arrange
analysis_id = str(uuid.uuid4())
result = {
'status': 'succeeded',
'result': {
'analysis_id': analysis_id,
'scan_status': 'done',
'verdict': 'malicious',
}
}

with responses.RequestsMock() as mock:
mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.OK, json=result)
# Act
analysis = EndpointAnalysis.from_analysis_id(analysis_id)

# Assert
self.assertIsNotNone(analysis)
self.assertEqual(analysis_id, analysis.analysis_id)
self.assertEqual(consts.AnalysisStatusCode.FINISHED, analysis.status)
self.assertEqual('malicious', analysis.verdict)
self.assertDictEqual(result['result'], analysis.result())

def test_analysis_failed(self):
# Arrange
analysis_id = str(uuid.uuid4())
result = {
'status': 'failed',
}

with responses.RequestsMock() as mock:
mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.OK,
json=result)

# Act and Assert
with self.assertRaises(errors.AnalysisFailedError):
EndpointAnalysis.from_analysis_id(analysis_id)

def test_analysis_not_found(self):
# Arrange
analysis_id = str(uuid.uuid4())

with responses.RequestsMock() as mock:
mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.NOT_FOUND)

# Act
analysis = EndpointAnalysis.from_analysis_id(analysis_id)

# Assert
self.assertIsNone(analysis)

def test_get_sub_analyses(self):
# Arrange
analysis_id = str(uuid.uuid4())
sub_analysis_id = str(uuid.uuid4())
analysis = EndpointAnalysis()
analysis.status = consts.AnalysisStatusCode.FINISHED
analysis.analysis_id = analysis_id
sha256 = 'a' * 64
verdict = 'malicious'
result = {
'sub_analyses': [
{'sub_analysis_id': sub_analysis_id,
'source': 'endpoint',
'sha256': sha256,
'verdict': verdict}
]
}

with responses.RequestsMock() as mock:
mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}/sub-analyses', status=HTTPStatus.OK,
json=result)

sub_analyses = analysis.get_sub_analyses()[0]

# Assert
self.assertEqual(sub_analysis_id, sub_analyses.analysis_id)
self.assertEqual(verdict, sub_analyses.verdict)
self.assertEqual(sha256, sub_analyses.sha256)
126 changes: 1 addition & 125 deletions tests/unit/test_analysis.py → tests/unit/test_file_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,7 @@ def test_get_latest_analysis_analysis_returns_none_when_latest_analysis_found_bu
analysis = FileAnalysis.from_latest_hash_analysis(file_hash, days_threshold_for_latest_analysis=1)

self.assertIsNone(analysis)

def test_get_latest_analysis_analysis_returns_analysis_when_latest_analysis_found_and_young_than_requested(self):
# Arrange
file_hash = 'hash'
Expand Down Expand Up @@ -1324,128 +1325,3 @@ def test_compare_returns_false_when_analysis_not_the_same_type(self):

# Assert
self.assertNotEqual(endpoint_analysis, file_analysis)


class EndpointAnalysisSpec(BaseTest):
def test_analysis_in_progress(self):
# Arrange
analysis_id = str(uuid.uuid4())
result = {'status': 'in_progress'}

with responses.RequestsMock() as mock:
mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.ACCEPTED,
json=result)
# Act
analysis = EndpointAnalysis.from_analysis_id(analysis_id)

# Assert
self.assertIsNotNone(analysis)
self.assertEqual(analysis_id, analysis.analysis_id)
self.assertEqual(consts.AnalysisStatusCode.IN_PROGRESS, analysis.status)

def test_wait_for_completion(self):
# Arrange
analysis_id = str(uuid.uuid4())
in_progress_result = {
'status': 'in_progress',
'result_url': 'foo'
}
success_result = {
'status': 'succeeded',
'result': {
'analysis_id': analysis_id,
'scan_status': 'done'
}
}

with responses.RequestsMock() as mock:
mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.ACCEPTED,
json=in_progress_result)
mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.OK,
json=success_result)
# Act
analysis = EndpointAnalysis.from_analysis_id(analysis_id)
analysis.wait_for_completion(sleep_before_first_check=False)

# Assert
self.assertIsNotNone(analysis)
self.assertEqual(consts.AnalysisStatusCode.FINISHED, analysis.status)

def test_analysis_done(self):
# Arrange
analysis_id = str(uuid.uuid4())
result = {
'status': 'succeeded',
'result': {
'analysis_id': analysis_id,
'scan_status': 'done'
}
}

with responses.RequestsMock() as mock:
mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.OK, json=result)
# Act
analysis = EndpointAnalysis.from_analysis_id(analysis_id)

# Assert
self.assertIsNotNone(analysis)
self.assertEqual(analysis_id, analysis.analysis_id)
self.assertEqual(consts.AnalysisStatusCode.FINISHED, analysis.status)
self.assertDictEqual(result['result'], analysis.result())

def test_analysis_failed(self):
# Arrange
analysis_id = str(uuid.uuid4())
result = {
'status': 'failed',
}

with responses.RequestsMock() as mock:
mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.OK,
json=result)

# Act and Assert
with self.assertRaises(errors.AnalysisFailedError):
EndpointAnalysis.from_analysis_id(analysis_id)

def test_analysis_not_found(self):
# Arrange
analysis_id = str(uuid.uuid4())

with responses.RequestsMock() as mock:
mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.NOT_FOUND)

# Act
analysis = EndpointAnalysis.from_analysis_id(analysis_id)

# Assert
self.assertIsNone(analysis)

def test_get_sub_analyses(self):
# Arrange
analysis_id = str(uuid.uuid4())
sub_analysis_id = str(uuid.uuid4())
analysis = EndpointAnalysis()
analysis.status = consts.AnalysisStatusCode.FINISHED
analysis.analysis_id = analysis_id
sha256 = 'a' * 64
verdict = 'malicious'
result = {
'sub_analyses': [
{'sub_analysis_id': sub_analysis_id,
'source': 'endpoint',
'sha256': sha256,
'verdict': verdict}
]
}

with responses.RequestsMock() as mock:
mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}/sub-analyses', status=HTTPStatus.OK,
json=result)

sub_analyses = analysis.get_sub_analyses()[0]

# Assert
self.assertEqual(sub_analysis_id, sub_analyses.analysis_id)
self.assertEqual(verdict, sub_analyses.verdict)
self.assertEqual(sha256, sub_analyses.sha256)
File renamed without changes.

0 comments on commit 64de124

Please sign in to comment.