Skip to content

Commit

Permalink
Darktrace update for model breaches (#36451) (#36515)
Browse files Browse the repository at this point in the history
* updated with demisto-sdk and fixed comment command

* updating release notes

* fixed post comment test and added error check for login page

* removing validate.json

Co-authored-by: seanmacdonald8 <86425481+seanmacdonald8@users.noreply.github.com>
  • Loading branch information
content-bot and seanmacdonald8 authored Sep 26, 2024
1 parent c7112b6 commit 8de5cf6
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 106 deletions.
54 changes: 27 additions & 27 deletions Packs/Darktrace/Integrations/DarktraceAIA/DarktraceAIA.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import json
import traceback
from datetime import datetime, timezone
from typing import Any, Dict, List, Mapping, Optional, Tuple, cast
from typing import Any, cast
from collections.abc import Mapping

import dateparser
import urllib3
Expand Down Expand Up @@ -46,7 +47,7 @@ class Client(BaseClient):
Most calls use _http_request() that handles proxy, SSL verification, etc.
"""

def get(self, query_uri: str, params: Dict[str, str] = None):
def get(self, query_uri: str, params: dict[str, str] = None):
"""Handles Darktrace GET API calls"""
return self._darktrace_api_call(query_uri, method="GET", params=params)

Expand All @@ -61,7 +62,7 @@ def _darktrace_api_call(
params: dict = None,
data: dict = None,
json: dict = None,
headers: Dict[str, str] = None,
headers: dict[str, str] = None,
):
"""Handles Darktrace API calls"""
headers = {
Expand Down Expand Up @@ -111,14 +112,14 @@ def error_handler(self, res: requests.Response):
elif res.status_code >= 300:
raise Exception(DARKTRACE_API_ERRORS['UNDETERMINED_ERROR'])

def _create_headers(self, query_uri: str, query_data: dict = None, is_json: bool = False) -> Dict[str, str]:
def _create_headers(self, query_uri: str, query_data: dict = None, is_json: bool = False) -> dict[str, str]:
"""Create headers required for successful authentication"""
public_token, _ = self._auth
date = (datetime.now(timezone.utc)).isoformat(timespec="auto")
signature = _create_signature(self._auth, query_uri, date, query_data, is_json=is_json)
return {"DTAPI-Token": public_token, "DTAPI-Date": date, "DTAPI-Signature": signature}

def get_ai_analyst_incident_event(self, event_id: str) -> List[Dict[str, Any]]:
def get_ai_analyst_incident_event(self, event_id: str) -> list[dict[str, Any]]:
"""Searches for a single AI Analyst Incident alerts using '/incidentevents?uuid=<event_id>'
:type event_id: ``str``
:param event_id: unique event identifier
Expand All @@ -127,7 +128,7 @@ def get_ai_analyst_incident_event(self, event_id: str) -> List[Dict[str, Any]]:
"""
return self.get(AI_ANALYST_ENDPOINT, params={"uuid": event_id})

def search_ai_analyst_incident_events(self, min_score: int, start_time: Optional[int]) -> List[Dict[str, Any]]:
def search_ai_analyst_incident_events(self, min_score: int, start_time: int | None) -> list[dict[str, Any]]:
"""Searches all AI Analyst Incident alerts from a certain date and score'
:type min_score: ``str``
:param min_score: minimum score for data to be pulled
Expand All @@ -143,7 +144,7 @@ def search_ai_analyst_incident_events(self, min_score: int, start_time: Optional
}
return self.get(query_uri, params)

def get_comments_for_ai_analyst_incident_event(self, event_id: str) -> Dict[str, Any]:
def get_comments_for_ai_analyst_incident_event(self, event_id: str) -> dict[str, Any]:
""" Returns all comments for a specified incident event id
:type event_id: ``str``
:param event_id: unique event identifier
Expand All @@ -156,7 +157,7 @@ def get_comments_for_ai_analyst_incident_event(self, event_id: str) -> Dict[str,
}
return self.get(query_uri, params)

def post_comment_to_ai_analyst_incident_event(self, event_id: str, comment: str) -> Dict[str, Any]:
def post_comment_to_ai_analyst_incident_event(self, event_id: str, comment: str) -> dict[str, Any]:
""" Posts a message to an incident event id
:type event_id: ``str``
:param event_id: unique event identifier
Expand All @@ -172,7 +173,7 @@ def post_comment_to_ai_analyst_incident_event(self, event_id: str, comment: str)
}
return self.post(query_uri, json=body)

def acknowledge_ai_analyst_incident_event(self, event_id: str) -> Dict[str, Any]:
def acknowledge_ai_analyst_incident_event(self, event_id: str) -> dict[str, Any]:
""" acknowledges an incident event
:type event_id: ``str``
:param event_id: unique event identifier
Expand All @@ -182,7 +183,7 @@ def acknowledge_ai_analyst_incident_event(self, event_id: str) -> Dict[str, Any]
query_uri = AI_ANALYST_ACKNOWLEDGE_ENDPOINT
return self.post(query_uri, data={'uuid': str(event_id)})

def unacknowledge_ai_analyst_incident_event(self, event_id: str) -> Dict[str, Any]:
def unacknowledge_ai_analyst_incident_event(self, event_id: str) -> dict[str, Any]:
""" unacknowledges an incident event
:type event_id: ``str``
:param event_id: unique event identifier
Expand All @@ -192,7 +193,7 @@ def unacknowledge_ai_analyst_incident_event(self, event_id: str) -> Dict[str, An
query_uri = AI_ANALYST_UNACKNOWLEDGE_ENDPOINT
return self.post(query_uri, data={'uuid': str(event_id)})

def get_ai_analyst_incident_group_from_eventId(self, event_id: str) -> List[Dict[str, Any]]:
def get_ai_analyst_incident_group_from_eventId(self, event_id: str) -> list[dict[str, Any]]:
"""Searches for a single AI Analyst Group alerts using '/groups?uuid=<event_id>'
:type event_id: ``str``
:param event_id: unique event identifier
Expand All @@ -205,7 +206,7 @@ def get_ai_analyst_incident_group_from_eventId(self, event_id: str) -> List[Dict
"""*****HELPER FUNCTIONS****"""


def arg_to_timestamp(arg: Any, arg_name: str, required: bool = False) -> Optional[int]:
def arg_to_timestamp(arg: Any, arg_name: str, required: bool = False) -> int | None:
"""Converts an XSOAR argument to a timestamp (seconds from epoch)
This function is used to quickly validate an argument provided to XSOAR
via ``demisto.args()`` into an ``int`` containing a timestamp (seconds
Expand Down Expand Up @@ -244,7 +245,7 @@ def arg_to_timestamp(arg: Any, arg_name: str, required: bool = False) -> Optiona
raise ValueError(f'Invalid date: {arg_name}')

return int(date.timestamp())
if isinstance(arg, (int, float)):
if isinstance(arg, int | float):
# Convert to int if the input is a float
return int(arg)
raise ValueError(f'Invalid date: "{arg_name}"')
Expand All @@ -270,7 +271,7 @@ def stringify_data(data: Mapping) -> str:
return "&".join([f"{k}={v}" for k, v in data.items()])


def format_JSON_for_ai_analyst_incident(aia_incident: Dict[str, Any], details: bool = False) -> Dict[str, Any]:
def format_JSON_for_ai_analyst_incident(aia_incident: dict[str, Any], details: bool = False) -> dict[str, Any]:
"""Formats JSON for get-ai-incident-event command
:type aia_incident: ``Dict[str, Any]``
:param aia_incident: JSON incident event as returned by API for fetch incident
Expand Down Expand Up @@ -319,7 +320,7 @@ def check_required_fields(args, *fields):
raise ValueError(f'Argument error could not find {field} in {args}')


def test_module(client: Client, first_fetch_time: Optional[int]) -> str:
def test_module(client: Client, first_fetch_time: int | None) -> str:
"""
Returning 'ok' indicates that the integration works like it is supposed to. Connection to the service is successful.
Expand All @@ -344,8 +345,8 @@ def test_module(client: Client, first_fetch_time: Optional[int]) -> str:
return 'ok'


def fetch_incidents(client: Client, max_alerts: int, last_run: Dict[str, int],
first_fetch_time: Optional[int], min_score: int) -> Tuple[Dict[str, int], List[dict]]:
def fetch_incidents(client: Client, max_alerts: int, last_run: dict[str, int],
first_fetch_time: int | None, min_score: int) -> tuple[dict[str, int], list[dict]]:
"""This function retrieves new ai analyst incident event every minute. It will use last_run
to save the timestamp of the last incident it processed. If last_run is not provided,
it should use the integration parameter first_fetch to determine when to start fetching
Expand Down Expand Up @@ -386,7 +387,7 @@ def fetch_incidents(client: Client, max_alerts: int, last_run: Dict[str, int],
latest_created_time = cast(int, last_fetch)

# Each incident is a dict with a string as a key
incidents: List[Dict[str, Any]] = []
incidents: list[dict[str, Any]] = []

ai_analyst_alerts = client.search_ai_analyst_incident_events(
min_score=min_score, # Scale the min score from [0,100] to [0 to 1] for API calls
Expand All @@ -396,9 +397,8 @@ def fetch_incidents(client: Client, max_alerts: int, last_run: Dict[str, int],
for alert in ai_analyst_alerts:
incident_created_time = int(alert.get('createdAt', 0))
alert['time'] = timestamp_to_datestring(incident_created_time)
if last_fetch:
if incident_created_time <= last_fetch:
continue
if last_fetch and incident_created_time <= last_fetch:
continue
id = str(alert['id'])
title = str(alert['title'])
incident_name = f'DT eventId #{id}: {title}'
Expand Down Expand Up @@ -426,7 +426,7 @@ def fetch_incidents(client: Client, max_alerts: int, last_run: Dict[str, int],
return next_run, incidents


def get_ai_analyst_incident_event_command(client: Client, args: Dict[str, Any]) -> CommandResults:
def get_ai_analyst_incident_event_command(client: Client, args: dict[str, Any]) -> CommandResults:
"""get-ai-analyst-incident-event-command: Returns a Darktrace incident event details
:type client: ``Client``
Expand Down Expand Up @@ -464,7 +464,7 @@ def get_ai_analyst_incident_event_command(client: Client, args: Dict[str, Any])
)


def get_comments_for_ai_analyst_incident_event_command(client: Client, args: Dict[str, Any]) -> CommandResults:
def get_comments_for_ai_analyst_incident_event_command(client: Client, args: dict[str, Any]) -> CommandResults:
"""darktrace-get-comments-for-ai-analyst-incident-event-command: Returns all comments associated with an
incident event.
Expand Down Expand Up @@ -502,7 +502,7 @@ def get_comments_for_ai_analyst_incident_event_command(client: Client, args: Dic
)


def post_comment_to_ai_analyst_incident_event_command(client: Client, args: Dict[str, Any]) -> CommandResults:
def post_comment_to_ai_analyst_incident_event_command(client: Client, args: dict[str, Any]) -> CommandResults:
"""darktrace-post-comment-to-ai-analyst-incident-event-command: Posts a comment to an ai analyst event
:type client: ``Client``
Expand Down Expand Up @@ -541,7 +541,7 @@ def post_comment_to_ai_analyst_incident_event_command(client: Client, args: Dict
)


def acknowledge_ai_analyst_incident_event_command(client: Client, args: Dict[str, Any]) -> CommandResults:
def acknowledge_ai_analyst_incident_event_command(client: Client, args: dict[str, Any]) -> CommandResults:
"""acknowledge-ai-analyst-incident-event-command: Acknowledges an ai analyst event
:type client: ``Client``
Expand Down Expand Up @@ -578,7 +578,7 @@ def acknowledge_ai_analyst_incident_event_command(client: Client, args: Dict[str
)


def unacknowledge_ai_analyst_incident_event_command(client: Client, args: Dict[str, Any]) -> CommandResults:
def unacknowledge_ai_analyst_incident_event_command(client: Client, args: dict[str, Any]) -> CommandResults:
"""unacknowledge-ai-analyst-incident-event-command: Unacknowledges an ai analyst event
:type client: ``Client``
Expand Down Expand Up @@ -615,7 +615,7 @@ def unacknowledge_ai_analyst_incident_event_command(client: Client, args: Dict[s
)


def get__ai_analyst_incident_group_from_eventId_command(client: Client, args: Dict[str, Any]) -> CommandResults:
def get__ai_analyst_incident_group_from_eventId_command(client: Client, args: dict[str, Any]) -> CommandResults:
"""darktrace-get-incident-group-from-event: Pulls all events belonging to the same investigation group.
:type client: ``Client``
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import io
import json


def util_load_json(path):
with io.open(path, mode='r', encoding='utf-8') as f:
with open(path, encoding='utf-8') as f:
return json.loads(f.read())


Expand Down
Loading

0 comments on commit 8de5cf6

Please sign in to comment.