From ea46baf74ea865bea98c767ce31e3f8318f3840a Mon Sep 17 00:00:00 2001 From: ImMin5 Date: Fri, 20 Dec 2024 17:46:27 +0900 Subject: [PATCH] feat: add task_type option for job tasks --- src/plugin/main.py | 57 +++--- src/plugin/manager/cost_manager.py | 210 +++++++++++----------- src/plugin/manager/data_source_manager.py | 62 +++++-- src/plugin/manager/job_manager.py | 173 ++++++++++++------ 4 files changed, 307 insertions(+), 195 deletions(-) diff --git a/src/plugin/main.py b/src/plugin/main.py index 5b72b3a..8222635 100644 --- a/src/plugin/main.py +++ b/src/plugin/main.py @@ -7,9 +7,9 @@ app = DataSourcePluginServer() -@app.route('DataSource.init') +@app.route("DataSource.init") def data_source_init(params: dict) -> dict: - """ init plugin by options + """init plugin by options Args: params (DataSourceInitRequest): { @@ -22,15 +22,15 @@ def data_source_init(params: dict) -> dict: 'metadata': 'dict' } """ - options = params['options'] + options = params["options"] data_source_mgr = DataSourceManager() return data_source_mgr.init_response(options) -@app.route('DataSource.verify') +@app.route("DataSource.verify") def data_source_verify(params: dict) -> None: - """ Verifying data source plugin + """Verifying data source plugin Args: params (CollectorVerifyRequest): { @@ -44,18 +44,18 @@ def data_source_verify(params: dict) -> None: None """ - options = params['options'] - secret_data = params['secret_data'] - domain_id = params.get('domain_id') - schema = params.get('schema') + options = params["options"] + secret_data = params["secret_data"] + domain_id = params.get("domain_id") + schema = params.get("schema") data_source_mgr = DataSourceManager() data_source_mgr.verify_plugin(options, secret_data, domain_id, schema) -@app.route('Job.get_tasks') +@app.route("Job.get_tasks") def job_get_tasks(params: dict) -> dict: - """ Get job tasks + """Get job tasks Args: params (JobGetTaskRequest): { @@ -75,20 +75,29 @@ def job_get_tasks(params: dict) -> dict: """ - domain_id = params['domain_id'] - options = params['options'] - secret_data = params['secret_data'] - schema = params.get('schema') - start = params.get('start') - last_synchronized_at = params.get('last_synchronized_at') + domain_id = params["domain_id"] + options = params["options"] + secret_data = params["secret_data"] + schema = params.get("schema") + start = params.get("start") + last_synchronized_at = params.get("last_synchronized_at") job_mgr = JobManager() - return job_mgr.get_tasks(domain_id, options, secret_data, schema, start, last_synchronized_at) + task_type = options.get("task_type", "identity") + if task_type == "identity": + return job_mgr.get_tasks( + domain_id, options, secret_data, schema, start, last_synchronized_at + ) + else: + return job_mgr.get_tasks_directory_type( + domain_id, options, secret_data, schema, start, last_synchronized_at + ) -@app.route('Cost.get_data') + +@app.route("Cost.get_data") def cost_get_data(params: dict) -> Generator[dict, None, None]: - """ Get external cost data + """Get external cost data Args: params (CostGetDataRequest): { @@ -117,11 +126,11 @@ def cost_get_data(params: dict) -> Generator[dict, None, None]: } """ - options = params['options'] - secret_data = params['secret_data'] + options = params["options"] + secret_data = params["secret_data"] - task_options = params.get('task_options', {}) - schema = params.get('schema') + task_options = params.get("task_options", {}) + schema = params.get("schema") cost_mgr = CostManager() return cost_mgr.get_data(options, secret_data, task_options, schema) diff --git a/src/plugin/manager/cost_manager.py b/src/plugin/manager/cost_manager.py index 6480da1..2a47370 100644 --- a/src/plugin/manager/cost_manager.py +++ b/src/plugin/manager/cost_manager.py @@ -12,38 +12,38 @@ _LOGGER = logging.getLogger(__name__) _REGION_MAP = { - 'APE1': 'ap-east-1', - 'APN1': 'ap-northeast-1', - 'APN2': 'ap-northeast-2', - 'APN3': 'ap-northeast-3', - 'APS1': 'ap-southeast-1', - 'APS2': 'ap-southeast-2', - 'APS3': 'ap-south-1', - 'CAN1': 'ca-central-1', - 'CPT': 'af-south-1', - 'EUN1': 'eu-north-1', - 'EUC1': 'eu-central-1', - 'EU': 'eu-west-1', - 'EUW2': 'eu-west-2', - 'EUW3': 'eu-west-3', - 'MES1': 'me-south-1', - 'SAE1': 'sa-east-1', - 'UGW1': 'AWS GovCloud (US-West)', - 'UGE1': 'AWS GovCloud (US-East)', - 'USE1': 'us-east-1', - 'USE2': 'us-east-2', - 'USW1': 'us-west-1', - 'USW2': 'us-west-2', - 'AP': 'Asia Pacific', - 'AU': 'Australia', - 'CA': 'Canada', + "APE1": "ap-east-1", + "APN1": "ap-northeast-1", + "APN2": "ap-northeast-2", + "APN3": "ap-northeast-3", + "APS1": "ap-southeast-1", + "APS2": "ap-southeast-2", + "APS3": "ap-south-1", + "CAN1": "ca-central-1", + "CPT": "af-south-1", + "EUN1": "eu-north-1", + "EUC1": "eu-central-1", + "EU": "eu-west-1", + "EUW2": "eu-west-2", + "EUW3": "eu-west-3", + "MES1": "me-south-1", + "SAE1": "sa-east-1", + "UGW1": "AWS GovCloud (US-West)", + "UGE1": "AWS GovCloud (US-East)", + "USE1": "us-east-1", + "USE2": "us-east-2", + "USW1": "us-west-1", + "USW2": "us-west-2", + "AP": "Asia Pacific", + "AU": "Australia", + "CA": "Canada", # 'EU': 'Europe and Israel', - 'IN': 'India', - 'JP': 'Japan', - 'ME': 'Middle East', - 'SA': 'South America', - 'US': 'United States', - 'ZA': 'South Africa', + "IN": "India", + "JP": "Japan", + "ME": "Middle East", + "SA": "South America", + "US": "United States", + "ZA": "South Africa", } @@ -54,41 +54,47 @@ def __init__(self, *args, **kwargs): self.aws_s3_connector = AWSS3Connector() self.space_connector = SpaceONEConnector() - def get_data(self, options: dict, secret_data: dict, task_options: dict, schema: str = None) \ - -> Generator[dict, None, None]: + def get_data( + self, options: dict, secret_data: dict, task_options: dict, schema: str = None + ) -> Generator[dict, None, None]: + task_type = task_options.get("task_type", "identity") + self.aws_s3_connector.create_session(options, secret_data, schema) self._check_task_options(task_options) - start = task_options['start'] - account_id = task_options['account_id'] - service_account_id = task_options['service_account_id'] - database = task_options['database'] - is_sync = task_options['is_sync'] + start = task_options["start"] + account_id = task_options["account_id"] + database = task_options["database"] - if is_sync == 'false': - self._update_sync_state(options, secret_data, schema, service_account_id) + if task_type == "identity": + service_account_id = task_options["service_account_id"] + is_sync = task_options["is_sync"] + if is_sync == "false": + self._update_sync_state( + options, secret_data, schema, service_account_id + ) date_ranges = self._get_date_range(start) for date in date_ranges: - year, month = date.split('-') - path = f'SPACE_ONE/billing/database={database}/account_id={account_id}/year={year}/month={month}' + year, month = date.split("-") + path = f"SPACE_ONE/billing/database={database}/account_id={account_id}/year={year}/month={month}" response = self.aws_s3_connector.list_objects(path) - contents = response.get('Contents', []) + contents = response.get("Contents", []) for content in contents: - response_stream = self.aws_s3_connector.get_cost_data(content['Key']) + response_stream = self.aws_s3_connector.get_cost_data(content["Key"]) for results in response_stream: yield self._make_cost_data(results, account_id) - yield { - 'results': [] - } + yield {"results": []} def _update_sync_state(self, options, secret_data, schema, service_account_id): self.space_connector.init_client(options, secret_data, schema) - service_account_info = self.space_connector.get_service_account(service_account_id) - tags = service_account_info.get('tags', {}) - tags['is_sync'] = 'true' + service_account_info = self.space_connector.get_service_account( + service_account_id + ) + tags = service_account_info.get("tags", {}) + tags["is_sync"] = "true" self.space_connector.update_service_account(service_account_id, tags) def _make_cost_data(self, results, account_id): @@ -113,65 +119,63 @@ class CostSummaryItem(BaseModel): for result in results: try: - region = result['region'] or 'USE1' - service_code = result['service_code'] - usage_type = result['usage_type'] + region = result["region"] or "USE1" + service_code = result["service_code"] + usage_type = result["usage_type"] data = { - 'cost': result.get('usage_cost', 0.0) or 0.0, - 'usage_quantity': result['usage_quantity'], - 'usage_unit': None, - 'provider': 'aws', - 'region_code': _REGION_MAP.get(region, region), - 'product': service_code, - 'usage_type': usage_type, - 'billed_date': result['usage_date'], - 'additional_info': { - 'Instance Type': result['instance_type'], - 'Account ID': account_id + "cost": result.get("usage_cost", 0.0) or 0.0, + "usage_quantity": result["usage_quantity"], + "usage_unit": None, + "provider": "aws", + "region_code": _REGION_MAP.get(region, region), + "product": service_code, + "usage_type": usage_type, + "billed_date": result["usage_date"], + "additional_info": { + "Instance Type": result["instance_type"], + "Account ID": account_id, }, - 'tags': self._get_tags_from_cost_data(result) + "tags": self._get_tags_from_cost_data(result), } - if service_code == 'AWSDataTransfer': - data['usage_unit'] = 'Bytes' - if usage_type.find('-In-Bytes') > 0: - data['additional_info']['Usage Type Details'] = 'Transfer In' - elif usage_type.find('-Out-Bytes') > 0: - data['additional_info']['Usage Type Details'] = 'Transfer Out' + if service_code == "AWSDataTransfer": + data["usage_unit"] = "Bytes" + if usage_type.find("-In-Bytes") > 0: + data["additional_info"]["Usage Type Details"] = "Transfer In" + elif usage_type.find("-Out-Bytes") > 0: + data["additional_info"]["Usage Type Details"] = "Transfer Out" else: - data['additional_info']['Usage Type Details'] = 'Transfer Etc' - elif service_code == 'AmazonCloudFront': - if usage_type.find('-HTTPS') > 0: - data['usage_unit'] = 'Count' - data['additional_info']['Usage Type Details'] = 'HTTPS Requests' - elif usage_type.find('-Out-Bytes') > 0: - data['usage_unit'] = 'GB' - data['additional_info']['Usage Type Details'] = 'Transfer Out' + data["additional_info"]["Usage Type Details"] = "Transfer Etc" + elif service_code == "AmazonCloudFront": + if usage_type.find("-HTTPS") > 0: + data["usage_unit"] = "Count" + data["additional_info"]["Usage Type Details"] = "HTTPS Requests" + elif usage_type.find("-Out-Bytes") > 0: + data["usage_unit"] = "GB" + data["additional_info"]["Usage Type Details"] = "Transfer Out" else: - data['usage_unit'] = 'Count' - data['additional_info']['Usage Type Details'] = 'HTTP Requests' + data["usage_unit"] = "Count" + data["additional_info"]["Usage Type Details"] = "HTTP Requests" else: - data['additional_info']['Usage Type Details'] = None + data["additional_info"]["Usage Type Details"] = None except Exception as e: - _LOGGER.error(f'[_make_cost_data] make data error: {e}', exc_info=True) + _LOGGER.error(f"[_make_cost_data] make data error: {e}", exc_info=True) raise e costs_data.append(data) - return { - 'results': costs_data - } + return {"results": costs_data} @staticmethod def _get_tags_from_cost_data(cost_data: dict) -> dict: tags = {} - if tags_str := cost_data.get('tags'): + if tags_str := cost_data.get("tags"): try: tags_dict: dict = utils.load_json(tags_str) for key, value in tags_dict.items(): - key = key.replace('user:', '') + key = key.replace("user:", "") tags[key] = value except Exception as e: _LOGGER.debug(e) @@ -179,29 +183,33 @@ def _get_tags_from_cost_data(cost_data: dict) -> dict: return tags @staticmethod - def _check_task_options(task_options): - if 'start' not in task_options: - raise ERROR_REQUIRED_PARAMETER(key='task_options.start') + def _check_task_options(task_options: dict): + task_type = task_options.get("task_type", "identity") + + if "start" not in task_options: + raise ERROR_REQUIRED_PARAMETER(key="task_options.start") + + if "database" not in task_options: + raise ERROR_REQUIRED_PARAMETER(key="task_options.database") - if 'account_id' not in task_options: - raise ERROR_REQUIRED_PARAMETER(key='task_options.account_id') + if "is_sync" not in task_options: + raise ERROR_REQUIRED_PARAMETER(key="task_options.is_sync") - if 'service_account_id' not in task_options: - raise ERROR_REQUIRED_PARAMETER(key='task_options.service_account_id') + if task_type == "identity": - if 'database' not in task_options: - raise ERROR_REQUIRED_PARAMETER(key='task_options.database') + if "account_id" not in task_options: + raise ERROR_REQUIRED_PARAMETER(key="task_options.account_id") - if 'is_sync' not in task_options: - raise ERROR_REQUIRED_PARAMETER(key='task_options.is_sync') + if "service_account_id" not in task_options: + raise ERROR_REQUIRED_PARAMETER(key="task_options.service_account_id") @staticmethod def _get_date_range(start): date_ranges = [] - start_time = datetime.strptime(start, '%Y-%m') + start_time = datetime.strptime(start, "%Y-%m") now = datetime.utcnow() for dt in rrule.rrule(rrule.MONTHLY, dtstart=start_time, until=now): - billed_month = dt.strftime('%Y-%m') + billed_month = dt.strftime("%Y-%m") date_ranges.append(billed_month) return date_ranges diff --git a/src/plugin/manager/data_source_manager.py b/src/plugin/manager/data_source_manager.py index 1f645f0..646d697 100644 --- a/src/plugin/manager/data_source_manager.py +++ b/src/plugin/manager/data_source_manager.py @@ -1,5 +1,6 @@ import logging +from spaceone.core.error import ERROR_INVALID_PARAMETER from spaceone.core.manager import BaseManager from ..connector.aws_s3_connector import AWSS3Connector from ..connector.spaceone_connector import SpaceONEConnector @@ -9,35 +10,62 @@ class DataSourceManager(BaseManager): - @staticmethod - def init_response(options: dict) -> dict: + def init_response(self, options: dict) -> dict: + self._check_options(options) + currency: str = options.get("currency", "USD") + return { - 'metadata': { - 'currency': 'USD', - 'supported_secret_types': ['MANUAL'], - 'data_source_rules': [ + "metadata": { + "currency": currency, + "supported_secret_types": ["MANUAL"], + "data_source_rules": [ { - 'name': 'match_service_account', - 'conditions_policy': 'ALWAYS', - 'actions': { - 'match_service_account': { - 'source': 'additional_info.Account ID', - 'target': 'data.account_id' + "name": "match_service_account", + "conditions_policy": "ALWAYS", + "actions": { + "match_service_account": { + "source": "additional_info.Account ID", + "target": "data.account_id", } }, - 'options': { - 'stop_processing': True - } + "options": {"stop_processing": True}, } - ] + ], } } @staticmethod - def verify_plugin(options: dict, secret_data: dict, domain_id: str, schema: str = None) -> None: + def verify_plugin( + options: dict, secret_data: dict, domain_id: str, schema: str = None + ) -> None: space_connector = SpaceONEConnector() space_connector.init_client(options, secret_data, schema) space_connector.verify_plugin(domain_id) aws_s3_connector = AWSS3Connector() aws_s3_connector.create_session(options, secret_data, schema) + + @staticmethod + def _check_options(options: dict): + task_type = options.get("task_type", "identity") + resync_days = options.get("resync_days_from_last_synced_at", 7) + + if task_type not in ["identity", "directory"]: + raise ERROR_INVALID_PARAMETER( + task_type=task_type, + reason="task_type should be 'identity' or 'directory'", + ) + + if isinstance(resync_days, float): + resync_days = int(resync_days) + elif not isinstance(resync_days, int): + raise ERROR_INVALID_PARAMETER( + key="resync_days_from_last_synced_at", + reason="resync_days_from_last_synced_at should be integer", + ) + + if resync_days < 3 or resync_days > 27: + raise ERROR_INVALID_PARAMETER( + resync_days_from_last_synced_at=resync_days, + reason="resync_days_from_last_synced_at should be 4 ~ 26", + ) diff --git a/src/plugin/manager/job_manager.py b/src/plugin/manager/job_manager.py index 564c6ca..d98de35 100644 --- a/src/plugin/manager/job_manager.py +++ b/src/plugin/manager/job_manager.py @@ -3,10 +3,12 @@ from spaceone.core.error import * from spaceone.core.manager import BaseManager + +from ..connector.aws_s3_connector import AWSS3Connector from ..connector.spaceone_connector import SpaceONEConnector _LOGGER = logging.getLogger(__name__) -_DEFAULT_DATABASE = 'MZC' +_DEFAULT_DATABASE = "MZC" class JobManager(BaseManager): @@ -15,94 +17,159 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.space_connector = SpaceONEConnector() - def get_tasks(self, domain_id: str, options: dict, secret_data: dict, schema: str = None, start: str = None, - last_synchronized_at: datetime = None) -> dict: + def get_tasks( + self, + domain_id: str, + options: dict, + secret_data: dict, + schema: str = None, + start: str = None, + last_synchronized_at: datetime = None, + ) -> dict: tasks = [] changed = [] - start_month = self._get_start_month(start, last_synchronized_at) + _LOGGER.debug(f"[get_tasks] options: {options}") + + start_month = self._get_start_month(options, start, last_synchronized_at) self.space_connector.init_client(options, secret_data, schema) response = self.space_connector.list_projects(domain_id) - total_count = response.get('total_count') or 0 + total_count = response.get("total_count") or 0 if total_count > 0: - project_info = response['results'][0] - _LOGGER.debug(f'[get_tasks] project info: {project_info}') + project_info = response["results"][0] + _LOGGER.debug(f"[get_tasks] project info: {project_info}") - project_id = project_info['project_id'] - database = project_info.get('tags', {}).get('database', _DEFAULT_DATABASE) + project_id = project_info["project_id"] + database = project_info.get("tags", {}).get("database", _DEFAULT_DATABASE) response = self.space_connector.list_service_accounts(project_id) - for service_account_info in response.get('results', []): - service_account_tags = service_account_info.get('tags', {}) - service_account_id = service_account_info['service_account_id'] - service_account_name = service_account_info['name'] - account_id = service_account_info['data']['account_id'] - is_sync = service_account_tags.get('is_sync', 'false') - database = service_account_tags.get('database', database) - - if is_sync != 'true': - is_sync = 'false' - - _LOGGER.debug(f'[get_tasks] service_account({service_account_id}): ' - f'name={service_account_name}, account_id={account_id}') + for service_account_info in response.get("results", []): + service_account_tags = service_account_info.get("tags", {}) + service_account_id = service_account_info["service_account_id"] + service_account_name = service_account_info["name"] + account_id = service_account_info["data"]["account_id"] + is_sync = service_account_tags.get("is_sync", "false") + database = service_account_tags.get("database", database) + + if is_sync != "true": + is_sync = "false" + + _LOGGER.debug( + f"[get_tasks] service_account({service_account_id}): " + f"name={service_account_name}, account_id={account_id}" + ) task_options = { - 'is_sync': is_sync, - 'service_account_id': service_account_id, - 'service_account_name': service_account_name, - 'account_id': account_id, - 'database': database + "is_sync": is_sync, + "service_account_id": service_account_id, + "service_account_name": service_account_name, + "account_id": account_id, + "database": database, + "task_type": "identity", } - if is_sync == 'false': - first_sync_month = self._get_start_month(start) - task_options['start'] = first_sync_month + if is_sync == "false": + first_sync_month = self._get_start_month(options, start) + task_options["start"] = first_sync_month - changed.append({ - 'start': first_sync_month, - 'filter': { - 'additional_info.Account ID': account_id + changed.append( + { + "start": first_sync_month, + "filter": {"additional_info.Account ID": account_id}, } - }) + ) else: - task_options['start'] = start_month + task_options["start"] = start_month - tasks.append({'task_options': task_options}) + tasks.append({"task_options": task_options}) - changed.append({ - 'start': start_month - }) + changed.append({"start": start_month}) - _LOGGER.debug(f'[get_tasks] tasks: {tasks}') - _LOGGER.debug(f'[get_tasks] changed: {changed}') + _LOGGER.debug(f"[get_tasks] tasks: {tasks}") + _LOGGER.debug(f"[get_tasks] changed: {changed}") else: - _LOGGER.debug(f'[get_tasks] no project: tags.domain_id = {domain_id}') + _LOGGER.debug(f"[get_tasks] no project: tags.domain_id = {domain_id}") + + return {"tasks": tasks, "changed": changed} + + def get_tasks_directory_type( + self, + domain_id: str, + options: dict, + secret_data: dict, + schema: str = None, + start: str = None, + last_synchronized_at: datetime = None, + ) -> dict: + tasks = [] + changed = [] + + aws_s3_connector = AWSS3Connector() + aws_s3_connector.create_session(options, secret_data, schema) + + start_month: str = self._get_start_month(options, start, last_synchronized_at) + + database = options.get("database", _DEFAULT_DATABASE) + accounts = options.get("accounts", []) + + path = f"SPACE_ONE/billing/database={database}/" + + if not accounts: + response = aws_s3_connector.list_objects(path) + for content in response.get("Contents", []): + key = content["Key"] + account_id = key.split("/")[3].split("=")[-1] + + if account_id and not "": + accounts.append(account_id) + + accounts = list(set(accounts)) + for account_id in accounts: + task_options = { + "account_id": account_id, + "database": database, + "start": start_month, + "is_sync": "true", + "task_type": "directory", + } + tasks.append({"task_options": task_options}) + + changed.append({"start": start_month}) + + _LOGGER.debug(f"[get_tasks] tasks: {tasks}") + _LOGGER.debug(f"[get_tasks] changed: {changed}") + + return {"tasks": tasks, "changed": changed} - return { - 'tasks': tasks, - 'changed': changed - } + def _get_start_month( + self, + options: dict, + start: str = None, + last_synchronized_at: datetime = None, + ) -> str: + resync_days = options.get("resync_days_from_last_synced_at", 7) - def _get_start_month(self, start, last_synchronized_at=None): if start: start_time: datetime = self._parse_start_time(start) elif last_synchronized_at: - start_time: datetime = last_synchronized_at - timedelta(days=7) + start_time: datetime = last_synchronized_at - timedelta(days=resync_days) start_time = start_time.replace(day=1) else: start_time: datetime = datetime.utcnow() - timedelta(days=365) start_time = start_time.replace(day=1) - start_time = start_time.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=None) + start_time = start_time.replace( + hour=0, minute=0, second=0, microsecond=0, tzinfo=None + ) - return start_time.strftime('%Y-%m') + return start_time.strftime("%Y-%m") @staticmethod def _parse_start_time(start_str): - date_format = '%Y-%m' + date_format = "%Y-%m" try: return datetime.strptime(start_str, date_format) except Exception as e: - raise ERROR_INVALID_PARAMETER_TYPE(key='start', type=date_format) + raise ERROR_INVALID_PARAMETER_TYPE(key="start", type=date_format)