From 269e0582ca65cbbeddfcd090731e7924afe4e5b0 Mon Sep 17 00:00:00 2001 From: davidt99 Date: Thu, 29 Jun 2023 13:34:53 +0300 Subject: [PATCH] feat(analysis): add verdict property to UrlAnalysis and EndpointAnalysis --- CHANGES | 5 + intezer_sdk/__init__.py | 2 +- intezer_sdk/analysis.py | 22 +++ intezer_sdk/base_analysis.py | 8 ++ intezer_sdk/endpoint_analysis.py | 8 ++ tests/unit/test_endpoint_analysis.py | 136 ++++++++++++++++++ ...test_analysis.py => test_file_analysis.py} | 126 +--------------- ...s.py => test_offline_endpoint_analysis.py} | 0 8 files changed, 181 insertions(+), 126 deletions(-) create mode 100644 tests/unit/test_endpoint_analysis.py rename tests/unit/{test_analysis.py => test_file_analysis.py} (92%) rename tests/unit/{test_endpoint_scan_analysis.py => test_offline_endpoint_analysis.py} (100%) diff --git a/CHANGES b/CHANGES index 48126f8..db110fe 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,8 @@ +1.17.4 +______ +- Add verdict property to `UrlAnalysis` and `EndpointAnalysis` + + 1.17.3 ______ - Raise `UrlOfflineError` when analyzing a url that seems offline. diff --git a/intezer_sdk/__init__.py b/intezer_sdk/__init__.py index de37c9e..c92360e 100644 --- a/intezer_sdk/__init__.py +++ b/intezer_sdk/__init__.py @@ -1 +1 @@ -__version__ = '1.17.3' +__version__ = '1.17.4' diff --git a/intezer_sdk/analysis.py b/intezer_sdk/analysis.py index 9cc3440..75bdf01 100644 --- a/intezer_sdk/analysis.py +++ b/intezer_sdk/analysis.py @@ -295,11 +295,17 @@ def dynamic_ttps(self) -> list: @property def verdict(self) -> str: + """ + The analysis verdict. + """ self._assert_analysis_finished() return self._report['verdict'] @property def sub_verdict(self) -> str: + """ + The analysis sub-verdict. + """ self._assert_analysis_finished() return self._report['sub_verdict'] @@ -399,6 +405,22 @@ def from_latest_analysis(cls, return cls.from_analysis_id(analyses_ids[0], api=api) + @property + def verdict(self) -> str: + """ + The analysis verdict. + """ + self._assert_analysis_finished() + return self._report['summary']['verdict_type'] + + @property + def sub_verdict(self) -> str: + """ + The analysis sub-verdict. + """ + self._assert_analysis_finished() + return self._report['summary']['verdict_name'] + def _set_report(self, report: dict): super()._set_report(report) if not self.url: diff --git a/intezer_sdk/base_analysis.py b/intezer_sdk/base_analysis.py index e25b84a..42e4171 100644 --- a/intezer_sdk/base_analysis.py +++ b/intezer_sdk/base_analysis.py @@ -42,6 +42,14 @@ def _query_status_from_api(self) -> Response: def from_analysis_id(cls, analysis_id: str, api: IntezerApiClient = None) -> 'Analysis': raise NotImplementedError() + @property + @abc.abstractmethod + def verdict(self) -> str: + """ + The analysis verdict. + """ + raise NotImplementedError() + def wait_for_completion(self, interval: int = None, sleep_before_first_check=False, diff --git a/intezer_sdk/endpoint_analysis.py b/intezer_sdk/endpoint_analysis.py index c27d1de..5628b6e 100644 --- a/intezer_sdk/endpoint_analysis.py +++ b/intezer_sdk/endpoint_analysis.py @@ -73,6 +73,14 @@ def from_analysis_id(cls, analysis_id: str, api: IntezerApiClient = None): response = IntezerApi(api or get_global_api()).get_endpoint_analysis_response(analysis_id, True) return cls._create_analysis_from_response(response, api, analysis_id) + @property + def verdict(self) -> str: + """ + The analysis verdict. + """ + self._assert_analysis_finished() + return self._report['verdict'] + def _set_report(self, report: dict): super()._set_report(report) if 'scan_start_time' in report: diff --git a/tests/unit/test_endpoint_analysis.py b/tests/unit/test_endpoint_analysis.py new file mode 100644 index 0000000..fbe6a46 --- /dev/null +++ b/tests/unit/test_endpoint_analysis.py @@ -0,0 +1,136 @@ +import uuid +from http import HTTPStatus + +import responses + +from intezer_sdk import consts +from intezer_sdk import errors +from intezer_sdk.endpoint_analysis import EndpointAnalysis +from tests.unit.base_test import BaseTest + + +class EndpointAnalysisSpec(BaseTest): + def test_analysis_in_progress(self): + # Arrange + analysis_id = str(uuid.uuid4()) + result = {'status': 'in_progress'} + + with responses.RequestsMock() as mock: + mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.ACCEPTED, + json=result) + # Act + analysis = EndpointAnalysis.from_analysis_id(analysis_id) + + # Assert + self.assertIsNotNone(analysis) + self.assertEqual(analysis_id, analysis.analysis_id) + self.assertEqual(consts.AnalysisStatusCode.IN_PROGRESS, analysis.status) + + def test_wait_for_completion(self): + # Arrange + analysis_id = str(uuid.uuid4()) + in_progress_result = { + 'status': 'in_progress', + 'result_url': 'foo' + } + success_result = { + 'status': 'succeeded', + 'result': { + 'analysis_id': analysis_id, + 'scan_status': 'done' + } + } + + with responses.RequestsMock() as mock: + mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.ACCEPTED, + json=in_progress_result) + mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.OK, + json=success_result) + # Act + analysis = EndpointAnalysis.from_analysis_id(analysis_id) + analysis.wait_for_completion(sleep_before_first_check=False) + + # Assert + self.assertIsNotNone(analysis) + self.assertEqual(consts.AnalysisStatusCode.FINISHED, analysis.status) + + def test_analysis_done(self): + # Arrange + analysis_id = str(uuid.uuid4()) + result = { + 'status': 'succeeded', + 'result': { + 'analysis_id': analysis_id, + 'scan_status': 'done', + 'verdict': 'malicious', + } + } + + with responses.RequestsMock() as mock: + mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.OK, json=result) + # Act + analysis = EndpointAnalysis.from_analysis_id(analysis_id) + + # Assert + self.assertIsNotNone(analysis) + self.assertEqual(analysis_id, analysis.analysis_id) + self.assertEqual(consts.AnalysisStatusCode.FINISHED, analysis.status) + self.assertEqual('malicious', analysis.verdict) + self.assertDictEqual(result['result'], analysis.result()) + + def test_analysis_failed(self): + # Arrange + analysis_id = str(uuid.uuid4()) + result = { + 'status': 'failed', + } + + with responses.RequestsMock() as mock: + mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.OK, + json=result) + + # Act and Assert + with self.assertRaises(errors.AnalysisFailedError): + EndpointAnalysis.from_analysis_id(analysis_id) + + def test_analysis_not_found(self): + # Arrange + analysis_id = str(uuid.uuid4()) + + with responses.RequestsMock() as mock: + mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.NOT_FOUND) + + # Act + analysis = EndpointAnalysis.from_analysis_id(analysis_id) + + # Assert + self.assertIsNone(analysis) + + def test_get_sub_analyses(self): + # Arrange + analysis_id = str(uuid.uuid4()) + sub_analysis_id = str(uuid.uuid4()) + analysis = EndpointAnalysis() + analysis.status = consts.AnalysisStatusCode.FINISHED + analysis.analysis_id = analysis_id + sha256 = 'a' * 64 + verdict = 'malicious' + result = { + 'sub_analyses': [ + {'sub_analysis_id': sub_analysis_id, + 'source': 'endpoint', + 'sha256': sha256, + 'verdict': verdict} + ] + } + + with responses.RequestsMock() as mock: + mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}/sub-analyses', status=HTTPStatus.OK, + json=result) + + sub_analyses = analysis.get_sub_analyses()[0] + + # Assert + self.assertEqual(sub_analysis_id, sub_analyses.analysis_id) + self.assertEqual(verdict, sub_analyses.verdict) + self.assertEqual(sha256, sub_analyses.sha256) diff --git a/tests/unit/test_analysis.py b/tests/unit/test_file_analysis.py similarity index 92% rename from tests/unit/test_analysis.py rename to tests/unit/test_file_analysis.py index d53a393..36239df 100644 --- a/tests/unit/test_analysis.py +++ b/tests/unit/test_file_analysis.py @@ -983,6 +983,7 @@ def test_get_latest_analysis_analysis_returns_none_when_latest_analysis_found_bu analysis = FileAnalysis.from_latest_hash_analysis(file_hash, days_threshold_for_latest_analysis=1) self.assertIsNone(analysis) + def test_get_latest_analysis_analysis_returns_analysis_when_latest_analysis_found_and_young_than_requested(self): # Arrange file_hash = 'hash' @@ -1324,128 +1325,3 @@ def test_compare_returns_false_when_analysis_not_the_same_type(self): # Assert self.assertNotEqual(endpoint_analysis, file_analysis) - - -class EndpointAnalysisSpec(BaseTest): - def test_analysis_in_progress(self): - # Arrange - analysis_id = str(uuid.uuid4()) - result = {'status': 'in_progress'} - - with responses.RequestsMock() as mock: - mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.ACCEPTED, - json=result) - # Act - analysis = EndpointAnalysis.from_analysis_id(analysis_id) - - # Assert - self.assertIsNotNone(analysis) - self.assertEqual(analysis_id, analysis.analysis_id) - self.assertEqual(consts.AnalysisStatusCode.IN_PROGRESS, analysis.status) - - def test_wait_for_completion(self): - # Arrange - analysis_id = str(uuid.uuid4()) - in_progress_result = { - 'status': 'in_progress', - 'result_url': 'foo' - } - success_result = { - 'status': 'succeeded', - 'result': { - 'analysis_id': analysis_id, - 'scan_status': 'done' - } - } - - with responses.RequestsMock() as mock: - mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.ACCEPTED, - json=in_progress_result) - mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.OK, - json=success_result) - # Act - analysis = EndpointAnalysis.from_analysis_id(analysis_id) - analysis.wait_for_completion(sleep_before_first_check=False) - - # Assert - self.assertIsNotNone(analysis) - self.assertEqual(consts.AnalysisStatusCode.FINISHED, analysis.status) - - def test_analysis_done(self): - # Arrange - analysis_id = str(uuid.uuid4()) - result = { - 'status': 'succeeded', - 'result': { - 'analysis_id': analysis_id, - 'scan_status': 'done' - } - } - - with responses.RequestsMock() as mock: - mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.OK, json=result) - # Act - analysis = EndpointAnalysis.from_analysis_id(analysis_id) - - # Assert - self.assertIsNotNone(analysis) - self.assertEqual(analysis_id, analysis.analysis_id) - self.assertEqual(consts.AnalysisStatusCode.FINISHED, analysis.status) - self.assertDictEqual(result['result'], analysis.result()) - - def test_analysis_failed(self): - # Arrange - analysis_id = str(uuid.uuid4()) - result = { - 'status': 'failed', - } - - with responses.RequestsMock() as mock: - mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.OK, - json=result) - - # Act and Assert - with self.assertRaises(errors.AnalysisFailedError): - EndpointAnalysis.from_analysis_id(analysis_id) - - def test_analysis_not_found(self): - # Arrange - analysis_id = str(uuid.uuid4()) - - with responses.RequestsMock() as mock: - mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}', status=HTTPStatus.NOT_FOUND) - - # Act - analysis = EndpointAnalysis.from_analysis_id(analysis_id) - - # Assert - self.assertIsNone(analysis) - - def test_get_sub_analyses(self): - # Arrange - analysis_id = str(uuid.uuid4()) - sub_analysis_id = str(uuid.uuid4()) - analysis = EndpointAnalysis() - analysis.status = consts.AnalysisStatusCode.FINISHED - analysis.analysis_id = analysis_id - sha256 = 'a' * 64 - verdict = 'malicious' - result = { - 'sub_analyses': [ - {'sub_analysis_id': sub_analysis_id, - 'source': 'endpoint', - 'sha256': sha256, - 'verdict': verdict} - ] - } - - with responses.RequestsMock() as mock: - mock.add('GET', url=f'{self.full_url}/endpoint-analyses/{analysis_id}/sub-analyses', status=HTTPStatus.OK, - json=result) - - sub_analyses = analysis.get_sub_analyses()[0] - - # Assert - self.assertEqual(sub_analysis_id, sub_analyses.analysis_id) - self.assertEqual(verdict, sub_analyses.verdict) - self.assertEqual(sha256, sub_analyses.sha256) diff --git a/tests/unit/test_endpoint_scan_analysis.py b/tests/unit/test_offline_endpoint_analysis.py similarity index 100% rename from tests/unit/test_endpoint_scan_analysis.py rename to tests/unit/test_offline_endpoint_analysis.py