Skip to content

Commit

Permalink
feat: add task_type option for job tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
ImMin5 committed Dec 20, 2024
1 parent 7419e61 commit ea46baf
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 195 deletions.
57 changes: 33 additions & 24 deletions src/plugin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
app = DataSourcePluginServer()


@app.route('DataSource.init')
@app.route("DataSource.init")
def data_source_init(params: dict) -> dict:
""" init plugin by options
"""init plugin by options
Args:
params (DataSourceInitRequest): {
Expand All @@ -22,15 +22,15 @@ def data_source_init(params: dict) -> dict:
'metadata': 'dict'
}
"""
options = params['options']
options = params["options"]

data_source_mgr = DataSourceManager()
return data_source_mgr.init_response(options)


@app.route('DataSource.verify')
@app.route("DataSource.verify")
def data_source_verify(params: dict) -> None:
""" Verifying data source plugin
"""Verifying data source plugin
Args:
params (CollectorVerifyRequest): {
Expand All @@ -44,18 +44,18 @@ def data_source_verify(params: dict) -> None:
None
"""

options = params['options']
secret_data = params['secret_data']
domain_id = params.get('domain_id')
schema = params.get('schema')
options = params["options"]
secret_data = params["secret_data"]
domain_id = params.get("domain_id")
schema = params.get("schema")

data_source_mgr = DataSourceManager()
data_source_mgr.verify_plugin(options, secret_data, domain_id, schema)


@app.route('Job.get_tasks')
@app.route("Job.get_tasks")
def job_get_tasks(params: dict) -> dict:
""" Get job tasks
"""Get job tasks
Args:
params (JobGetTaskRequest): {
Expand All @@ -75,20 +75,29 @@ def job_get_tasks(params: dict) -> dict:
"""

domain_id = params['domain_id']
options = params['options']
secret_data = params['secret_data']
schema = params.get('schema')
start = params.get('start')
last_synchronized_at = params.get('last_synchronized_at')
domain_id = params["domain_id"]
options = params["options"]
secret_data = params["secret_data"]
schema = params.get("schema")
start = params.get("start")
last_synchronized_at = params.get("last_synchronized_at")

job_mgr = JobManager()
return job_mgr.get_tasks(domain_id, options, secret_data, schema, start, last_synchronized_at)
task_type = options.get("task_type", "identity")

if task_type == "identity":
return job_mgr.get_tasks(
domain_id, options, secret_data, schema, start, last_synchronized_at
)
else:
return job_mgr.get_tasks_directory_type(
domain_id, options, secret_data, schema, start, last_synchronized_at
)

@app.route('Cost.get_data')

@app.route("Cost.get_data")
def cost_get_data(params: dict) -> Generator[dict, None, None]:
""" Get external cost data
"""Get external cost data
Args:
params (CostGetDataRequest): {
Expand Down Expand Up @@ -117,11 +126,11 @@ def cost_get_data(params: dict) -> Generator[dict, None, None]:
}
"""

options = params['options']
secret_data = params['secret_data']
options = params["options"]
secret_data = params["secret_data"]

task_options = params.get('task_options', {})
schema = params.get('schema')
task_options = params.get("task_options", {})
schema = params.get("schema")

cost_mgr = CostManager()
return cost_mgr.get_data(options, secret_data, task_options, schema)
210 changes: 109 additions & 101 deletions src/plugin/manager/cost_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,38 @@
_LOGGER = logging.getLogger(__name__)

_REGION_MAP = {
'APE1': 'ap-east-1',
'APN1': 'ap-northeast-1',
'APN2': 'ap-northeast-2',
'APN3': 'ap-northeast-3',
'APS1': 'ap-southeast-1',
'APS2': 'ap-southeast-2',
'APS3': 'ap-south-1',
'CAN1': 'ca-central-1',
'CPT': 'af-south-1',
'EUN1': 'eu-north-1',
'EUC1': 'eu-central-1',
'EU': 'eu-west-1',
'EUW2': 'eu-west-2',
'EUW3': 'eu-west-3',
'MES1': 'me-south-1',
'SAE1': 'sa-east-1',
'UGW1': 'AWS GovCloud (US-West)',
'UGE1': 'AWS GovCloud (US-East)',
'USE1': 'us-east-1',
'USE2': 'us-east-2',
'USW1': 'us-west-1',
'USW2': 'us-west-2',
'AP': 'Asia Pacific',
'AU': 'Australia',
'CA': 'Canada',
"APE1": "ap-east-1",
"APN1": "ap-northeast-1",
"APN2": "ap-northeast-2",
"APN3": "ap-northeast-3",
"APS1": "ap-southeast-1",
"APS2": "ap-southeast-2",
"APS3": "ap-south-1",
"CAN1": "ca-central-1",
"CPT": "af-south-1",
"EUN1": "eu-north-1",
"EUC1": "eu-central-1",
"EU": "eu-west-1",
"EUW2": "eu-west-2",
"EUW3": "eu-west-3",
"MES1": "me-south-1",
"SAE1": "sa-east-1",
"UGW1": "AWS GovCloud (US-West)",
"UGE1": "AWS GovCloud (US-East)",
"USE1": "us-east-1",
"USE2": "us-east-2",
"USW1": "us-west-1",
"USW2": "us-west-2",
"AP": "Asia Pacific",
"AU": "Australia",
"CA": "Canada",
# 'EU': 'Europe and Israel',
'IN': 'India',
'JP': 'Japan',
'ME': 'Middle East',
'SA': 'South America',
'US': 'United States',
'ZA': 'South Africa',
"IN": "India",
"JP": "Japan",
"ME": "Middle East",
"SA": "South America",
"US": "United States",
"ZA": "South Africa",
}


Expand All @@ -54,41 +54,47 @@ def __init__(self, *args, **kwargs):
self.aws_s3_connector = AWSS3Connector()
self.space_connector = SpaceONEConnector()

def get_data(self, options: dict, secret_data: dict, task_options: dict, schema: str = None) \
-> Generator[dict, None, None]:
def get_data(
self, options: dict, secret_data: dict, task_options: dict, schema: str = None
) -> Generator[dict, None, None]:
task_type = task_options.get("task_type", "identity")

self.aws_s3_connector.create_session(options, secret_data, schema)
self._check_task_options(task_options)

start = task_options['start']
account_id = task_options['account_id']
service_account_id = task_options['service_account_id']
database = task_options['database']
is_sync = task_options['is_sync']
start = task_options["start"]
account_id = task_options["account_id"]
database = task_options["database"]

if is_sync == 'false':
self._update_sync_state(options, secret_data, schema, service_account_id)
if task_type == "identity":
service_account_id = task_options["service_account_id"]
is_sync = task_options["is_sync"]
if is_sync == "false":
self._update_sync_state(
options, secret_data, schema, service_account_id
)

date_ranges = self._get_date_range(start)

for date in date_ranges:
year, month = date.split('-')
path = f'SPACE_ONE/billing/database={database}/account_id={account_id}/year={year}/month={month}'
year, month = date.split("-")
path = f"SPACE_ONE/billing/database={database}/account_id={account_id}/year={year}/month={month}"
response = self.aws_s3_connector.list_objects(path)
contents = response.get('Contents', [])
contents = response.get("Contents", [])
for content in contents:
response_stream = self.aws_s3_connector.get_cost_data(content['Key'])
response_stream = self.aws_s3_connector.get_cost_data(content["Key"])
for results in response_stream:
yield self._make_cost_data(results, account_id)

yield {
'results': []
}
yield {"results": []}

def _update_sync_state(self, options, secret_data, schema, service_account_id):
self.space_connector.init_client(options, secret_data, schema)
service_account_info = self.space_connector.get_service_account(service_account_id)
tags = service_account_info.get('tags', {})
tags['is_sync'] = 'true'
service_account_info = self.space_connector.get_service_account(
service_account_id
)
tags = service_account_info.get("tags", {})
tags["is_sync"] = "true"
self.space_connector.update_service_account(service_account_id, tags)

def _make_cost_data(self, results, account_id):
Expand All @@ -113,95 +119,97 @@ class CostSummaryItem(BaseModel):

for result in results:
try:
region = result['region'] or 'USE1'
service_code = result['service_code']
usage_type = result['usage_type']
region = result["region"] or "USE1"
service_code = result["service_code"]
usage_type = result["usage_type"]
data = {
'cost': result.get('usage_cost', 0.0) or 0.0,
'usage_quantity': result['usage_quantity'],
'usage_unit': None,
'provider': 'aws',
'region_code': _REGION_MAP.get(region, region),
'product': service_code,
'usage_type': usage_type,
'billed_date': result['usage_date'],
'additional_info': {
'Instance Type': result['instance_type'],
'Account ID': account_id
"cost": result.get("usage_cost", 0.0) or 0.0,
"usage_quantity": result["usage_quantity"],
"usage_unit": None,
"provider": "aws",
"region_code": _REGION_MAP.get(region, region),
"product": service_code,
"usage_type": usage_type,
"billed_date": result["usage_date"],
"additional_info": {
"Instance Type": result["instance_type"],
"Account ID": account_id,
},
'tags': self._get_tags_from_cost_data(result)
"tags": self._get_tags_from_cost_data(result),
}

if service_code == 'AWSDataTransfer':
data['usage_unit'] = 'Bytes'
if usage_type.find('-In-Bytes') > 0:
data['additional_info']['Usage Type Details'] = 'Transfer In'
elif usage_type.find('-Out-Bytes') > 0:
data['additional_info']['Usage Type Details'] = 'Transfer Out'
if service_code == "AWSDataTransfer":
data["usage_unit"] = "Bytes"
if usage_type.find("-In-Bytes") > 0:
data["additional_info"]["Usage Type Details"] = "Transfer In"
elif usage_type.find("-Out-Bytes") > 0:
data["additional_info"]["Usage Type Details"] = "Transfer Out"
else:
data['additional_info']['Usage Type Details'] = 'Transfer Etc'
elif service_code == 'AmazonCloudFront':
if usage_type.find('-HTTPS') > 0:
data['usage_unit'] = 'Count'
data['additional_info']['Usage Type Details'] = 'HTTPS Requests'
elif usage_type.find('-Out-Bytes') > 0:
data['usage_unit'] = 'GB'
data['additional_info']['Usage Type Details'] = 'Transfer Out'
data["additional_info"]["Usage Type Details"] = "Transfer Etc"
elif service_code == "AmazonCloudFront":
if usage_type.find("-HTTPS") > 0:
data["usage_unit"] = "Count"
data["additional_info"]["Usage Type Details"] = "HTTPS Requests"
elif usage_type.find("-Out-Bytes") > 0:
data["usage_unit"] = "GB"
data["additional_info"]["Usage Type Details"] = "Transfer Out"
else:
data['usage_unit'] = 'Count'
data['additional_info']['Usage Type Details'] = 'HTTP Requests'
data["usage_unit"] = "Count"
data["additional_info"]["Usage Type Details"] = "HTTP Requests"
else:
data['additional_info']['Usage Type Details'] = None
data["additional_info"]["Usage Type Details"] = None

except Exception as e:
_LOGGER.error(f'[_make_cost_data] make data error: {e}', exc_info=True)
_LOGGER.error(f"[_make_cost_data] make data error: {e}", exc_info=True)
raise e

costs_data.append(data)

return {
'results': costs_data
}
return {"results": costs_data}

@staticmethod
def _get_tags_from_cost_data(cost_data: dict) -> dict:
tags = {}

if tags_str := cost_data.get('tags'):
if tags_str := cost_data.get("tags"):
try:
tags_dict: dict = utils.load_json(tags_str)
for key, value in tags_dict.items():
key = key.replace('user:', '')
key = key.replace("user:", "")
tags[key] = value
except Exception as e:
_LOGGER.debug(e)

return tags

@staticmethod
def _check_task_options(task_options):
if 'start' not in task_options:
raise ERROR_REQUIRED_PARAMETER(key='task_options.start')
def _check_task_options(task_options: dict):
task_type = task_options.get("task_type", "identity")

if "start" not in task_options:
raise ERROR_REQUIRED_PARAMETER(key="task_options.start")

if "database" not in task_options:
raise ERROR_REQUIRED_PARAMETER(key="task_options.database")

if 'account_id' not in task_options:
raise ERROR_REQUIRED_PARAMETER(key='task_options.account_id')
if "is_sync" not in task_options:
raise ERROR_REQUIRED_PARAMETER(key="task_options.is_sync")

if 'service_account_id' not in task_options:
raise ERROR_REQUIRED_PARAMETER(key='task_options.service_account_id')
if task_type == "identity":

if 'database' not in task_options:
raise ERROR_REQUIRED_PARAMETER(key='task_options.database')
if "account_id" not in task_options:
raise ERROR_REQUIRED_PARAMETER(key="task_options.account_id")

if 'is_sync' not in task_options:
raise ERROR_REQUIRED_PARAMETER(key='task_options.is_sync')
if "service_account_id" not in task_options:
raise ERROR_REQUIRED_PARAMETER(key="task_options.service_account_id")

@staticmethod
def _get_date_range(start):
date_ranges = []
start_time = datetime.strptime(start, '%Y-%m')
start_time = datetime.strptime(start, "%Y-%m")
now = datetime.utcnow()
for dt in rrule.rrule(rrule.MONTHLY, dtstart=start_time, until=now):
billed_month = dt.strftime('%Y-%m')
billed_month = dt.strftime("%Y-%m")
date_ranges.append(billed_month)

return date_ranges
Loading

0 comments on commit ea46baf

Please sign in to comment.