diff --git a/src/backend/findings/models.py b/src/backend/findings/models.py index 5c116ceb4..992d67604 100644 --- a/src/backend/findings/models.py +++ b/src/backend/findings/models.py @@ -30,11 +30,15 @@ class OSINT(Finding): def parse( self, target: Target = None, accumulated: Dict[str, Any] = {} ) -> Dict[str, Any]: - return { - InputKeyword.TARGET.name.lower(): self.data, - InputKeyword.HOST.name.lower(): self.data, - InputKeyword.URL.name.lower(): self._get_url(self.data), - } if self.data_type in [OSINTDataType.IP, OSINTDataType.DOMAIN] else {} + return ( + { + InputKeyword.TARGET.name.lower(): self.data, + InputKeyword.HOST.name.lower(): self.data, + InputKeyword.URL.name.lower(): self._get_url(self.data), + } + if self.data_type in [OSINTDataType.IP, OSINTDataType.DOMAIN] + else {} + ) def defect_dojo(self) -> Dict[str, Any]: return { @@ -165,31 +169,43 @@ class Path(Finding): Finding.Filter(str, "path", contains=True, processor=lambda p: p.lower()), ] - def _clean_path_value(self, value: str) -> str: + def _clean_comparison_path(self, value: str) -> str: if len(value) > 1: - if value[0] != "/": - value = f"/{value}" + value = self._clean_path(value) if value[-1] != "/": value += "/" return value + def _clean_path(self, value: str) -> str: + return f"/{value}" if len(value) > 1 and value[0] != "/" else value + def parse( self, target: Target = None, accumulated: Dict[str, Any] = {} ) -> Dict[str, Any]: - output = self.port.parse(target, accumulated) if self.port else {} target_port = TargetPort.objects.filter(target=target, port=self.port.port) - include_path_data = True if target_port.exists(): - include_path_data = self._clean_path_value(self.path).startswith( - self._clean_path_value(target_port.first().path or self.path) - ) - if include_path_data: - output[InputKeyword.ENDPOINT.name.lower()] = self.path - if self.port: - output[InputKeyword.URL.name.lower()] = self._get_url( - self.port.host.address, self.port.port, self.path + target_port_path = target_port.first().path + path = ( + self.path + if target_port_path + and self._clean_comparison_path(self.path).startswith( + self._clean_comparison_path(target_port_path) ) - return output + else target_port_path + ) + else: + path = self.path + output = ( + { + **self.port.parse(target, accumulated), + InputKeyword.URL.name.lower(): self._get_url( + self.port.host.address, self.port.port, self._clean_path(path) + ), + } + if self.port + else {} + ) + return {**output, InputKeyword.ENDPOINT.name.lower(): self._clean_path(path)} def defect_dojo(self) -> Dict[str, Any]: return { diff --git a/src/backend/tests/executors/test_base.py b/src/backend/tests/executors/test_base.py index 5eafffaa4..1b0ed23b3 100644 --- a/src/backend/tests/executors/test_base.py +++ b/src/backend/tests/executors/test_base.py @@ -63,6 +63,16 @@ def test_get_arguments_only_findings(self) -> None: "-p 10.10.10.11 -p http://10.10.10.10:80/index.php -p 80 -p /index.php -p WordPress -p admin -p CVE-2023-1111 -p ReverseShell", self.findings, ) + self.vulnerability.technology = None + self.vulnerability.port = self.port + self.vulnerability.save(update_fields=["port", "technology"]) + self.exploit.vulnerability = None + self.exploit.technology = self.technology + self.exploit.save(update_fields=["vulnerability", "technology"]) + self._success_get_arguments( + "-p 10.10.10.11 -p http://10.10.10.10:80/index.php -p 80 -p /index.php -p WordPress -p admin -p CVE-2023-1111 -p ReverseShell", + self.findings, + ) @mock.patch("framework.models.BaseInput._get_url", get_url) def test_get_arguments_only_required_findings(self) -> None: @@ -86,6 +96,23 @@ def test_get_arguments_multiple_ports(self) -> None: ], ) + @mock.patch("framework.models.BaseInput._get_url", get_url) + def test_get_arguments_with_path_filter(self) -> None: + self._setup_task_user_provided_entities() + self._success_get_arguments( + "-p 10.10.10.10 -p http://10.10.10.10:80/login.php -p 80 -p /login.php -p WordPress -p CVE-2023-1111 -p root", + [self.port, self.path, self.technology, self.vulnerability], + ) + self.path.path = "rootpath/test" + self.path.save(update_fields=["path"]) + self.target_port.path = "rootpath" + self.target_port.save(update_fields=["path"]) + self._success_get_arguments( + "-p 10.10.10.10 -p http://10.10.10.10:80/rootpath/test -p 80 -p /rootpath/test -p WordPress -p CVE-2023-1111 -p root", + [self.port, self.path, self.technology, self.vulnerability], + ) + + @mock.patch("framework.models.BaseInput._get_url", get_url) def _test_get_arguments_no_findings(self) -> None: self.target.target = "10.10.10.12" self.target.save(update_fields=["target"])