diff --git a/Dockerfile b/Dockerfile index b15aee0..da8b826 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ RUN pip install --upgrade pip && \ pip install --upgrade -r ${PKG_DIR}/pip_requirements.txt ARG CACHEBUST=1 -RUN pip install --upgrade --pre spaceone-core spaceone-api +RUN pip install --upgrade --pre spaceone-core==1.12.24 spaceone-api==1.12.19 COPY src ${SRC_DIR} WORKDIR ${SRC_DIR} diff --git a/pkg/pip_requirements.txt b/pkg/pip_requirements.txt index d6adef9..5b451b9 100644 --- a/pkg/pip_requirements.txt +++ b/pkg/pip_requirements.txt @@ -1,8 +1,8 @@ -spaceone-core -spaceone-api -schematics -requests -boto3 -pyarrow -pandas +spaceone-core==1.12.24 +spaceone-api==1.12.19 +schematics==2.1.1 +requests==2.31.0 +boto3==1.26.157 +pyarrow==12.0.1 +pandas==2.0.2 diff --git a/src/setup.py b/src/setup.py index 090baf5..5709eec 100644 --- a/src/setup.py +++ b/src/setup.py @@ -16,28 +16,28 @@ from setuptools import setup, find_packages -with open('VERSION', 'r') as f: +with open("VERSION", "r") as f: VERSION = f.read().strip() f.close() setup( - name='plugin-aws-hyperbilling-cost-datasource', + name="plugin-aws-hyperbilling-cost-datasource", version=VERSION, - description='Data source plugin for AWS HyperBilling', - long_description='', - url='https://www.spaceone.dev/', - author='MEGAZONE SpaceONE Team', - author_email='admin@spaceone.dev', - license='Apache License 2.0', + description="Data source plugin for AWS HyperBilling", + long_description="", + url="https://www.spaceone.dev/", + author="MEGAZONE SpaceONE Team", + author_email="admin@spaceone.dev", + license="Apache License 2.0", packages=find_packages(), install_requires=[ - 'spaceone-core', - 'spaceone-api', - 'schematics', - 'requests', - 'boto3', - 'pyarrow', - 'pandas' + "spaceone-core==1.12.24", + "spaceone-api==1.12.19", + "schematics==2.1.1", + "requests==2.31.0", + "boto3==1.26.157", + "pyarrow==12.0.1", + "pandas==2.0.2", ], zip_safe=False, ) diff --git a/src/spaceone/cost_analysis/manager/cost_manager.py b/src/spaceone/cost_analysis/manager/cost_manager.py index d05af9a..0a73b56 100644 --- a/src/spaceone/cost_analysis/manager/cost_manager.py +++ b/src/spaceone/cost_analysis/manager/cost_manager.py @@ -12,38 +12,38 @@ _LOGGER = logging.getLogger(__name__) _REGION_MAP = { - 'APE1': 'ap-east-1', - 'APN1': 'ap-northeast-1', - 'APN2': 'ap-northeast-2', - 'APN3': 'ap-northeast-3', - 'APS1': 'ap-southeast-1', - 'APS2': 'ap-southeast-2', - 'APS3': 'ap-south-1', - 'CAN1': 'ca-central-1', - 'CPT': 'af-south-1', - 'EUN1': 'eu-north-1', - 'EUC1': 'eu-central-1', - 'EU': 'eu-west-1', - 'EUW2': 'eu-west-2', - 'EUW3': 'eu-west-3', - 'MES1': 'me-south-1', - 'SAE1': 'sa-east-1', - 'UGW1': 'AWS GovCloud (US-West)', - 'UGE1': 'AWS GovCloud (US-East)', - 'USE1': 'us-east-1', - 'USE2': 'us-east-2', - 'USW1': 'us-west-1', - 'USW2': 'us-west-2', - 'AP': 'Asia Pacific', - 'AU': 'Australia', - 'CA': 'Canada', + "APE1": "ap-east-1", + "APN1": "ap-northeast-1", + "APN2": "ap-northeast-2", + "APN3": "ap-northeast-3", + "APS1": "ap-southeast-1", + "APS2": "ap-southeast-2", + "APS3": "ap-south-1", + "CAN1": "ca-central-1", + "CPT": "af-south-1", + "EUN1": "eu-north-1", + "EUC1": "eu-central-1", + "EU": "eu-west-1", + "EUW2": "eu-west-2", + "EUW3": "eu-west-3", + "MES1": "me-south-1", + "SAE1": "sa-east-1", + "UGW1": "AWS GovCloud (US-West)", + "UGE1": "AWS GovCloud (US-East)", + "USE1": "us-east-1", + "USE2": "us-east-2", + "USW1": "us-west-1", + "USW2": "us-west-2", + "AP": "Asia Pacific", + "AU": "Australia", + "CA": "Canada", # 'EU': 'Europe and Israel', - 'IN': 'India', - 'JP': 'Japan', - 'ME': 'Middle East', - 'SA': 'South America', - 'US': 'United States', - 'ZA': 'South Africa', + "IN": "India", + "JP": "Japan", + "ME": "Middle East", + "SA": "South America", + "US": "United States", + "ZA": "South Africa", } @@ -51,31 +51,41 @@ class CostManager(BaseManager): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.aws_s3_connector: AWSS3Connector = self.locator.get_connector('AWSS3Connector') - self.space_connector: SpaceONEConnector = self.locator.get_connector('SpaceONEConnector') + self.aws_s3_connector: AWSS3Connector = self.locator.get_connector( + "AWSS3Connector" + ) + self.space_connector: SpaceONEConnector = self.locator.get_connector( + "SpaceONEConnector" + ) def get_data(self, options, secret_data, schema, task_options): + task_type = options.get("task_type", "identity") + self.aws_s3_connector.create_session(options, secret_data, schema) - self._check_task_options(task_options) + self._check_task_options(task_options, task_type) - start = task_options['start'] - account_id = task_options['account_id'] - service_account_id = task_options['service_account_id'] - database = task_options['database'] - is_sync = task_options['is_sync'] + start = task_options["start"] + database = task_options["database"] + account_id = task_options["account_id"] - if is_sync == 'false': - self._update_sync_state(options, secret_data, schema, service_account_id) + if task_type == "identity": + service_account_id = task_options["service_account_id"] + is_sync = task_options["is_sync"] + if is_sync == "false": + self._update_sync_state( + options, secret_data, schema, service_account_id + ) date_ranges = self._get_date_range(start) for date in date_ranges: - year, month = date.split('-') - path = f'SPACE_ONE/billing/database={database}/account_id={account_id}/year={year}/month={month}' + year, month = date.split("-") + + path = f"SPACE_ONE/billing/database={database}/account_id={account_id}/year={year}/month={month}" response = self.aws_s3_connector.list_objects(path) - contents = response.get('Contents', []) + contents = response.get("Contents", []) for content in contents: - response_stream = self.aws_s3_connector.get_cost_data(content['Key']) + response_stream = self.aws_s3_connector.get_cost_data(content["Key"]) for results in response_stream: yield self._make_cost_data(results, account_id) @@ -83,9 +93,11 @@ def get_data(self, options, secret_data, schema, task_options): def _update_sync_state(self, options, secret_data, schema, service_account_id): self.space_connector.init_client(options, secret_data, schema) - service_account_info = self.space_connector.get_service_account(service_account_id) - tags = service_account_info.get('tags', {}) - tags['is_sync'] = 'true' + service_account_info = self.space_connector.get_service_account( + service_account_id + ) + tags = service_account_info.get("tags", {}) + tags["is_sync"] = "true" self.space_connector.update_service_account(service_account_id, tags) def _make_cost_data(self, results, account_id): @@ -110,47 +122,47 @@ class CostSummaryItem(BaseModel): for result in results: try: - region = result['region'] or 'USE1' + region = result["region"] or "USE1" data = { - 'cost': result['usage_cost'], - 'currency': 'USD', - 'usage_quantity': result['usage_quantity'], - 'provider': 'aws', - 'region_code': _REGION_MAP.get(region, region), - 'product': result['service_code'], - 'account': account_id, - 'usage_type': result['usage_type'], - 'usage_unit': None, - 'billed_at': datetime.strptime(result['usage_date'], '%Y-%m-%d'), - 'additional_info': { - 'Instance Type': self._parse_usage_type(result) + "cost": result["usage_cost"], + "currency": "USD", + "usage_quantity": result["usage_quantity"], + "provider": "aws", + "region_code": _REGION_MAP.get(region, region), + "product": result["service_code"], + "account": account_id, + "usage_type": result["usage_type"], + "usage_unit": None, + "billed_at": datetime.strptime(result["usage_date"], "%Y-%m-%d"), + "additional_info": { + "Instance Type": self._parse_usage_type(result) }, - 'tags': self._get_tags_from_cost_data(result) + "tags": self._get_tags_from_cost_data(result), } - tag_application = result.get('tag_application') - tag_environment = result.get('tag_environment') - tag_name = result.get('tag_name') - tag_role = result.get('tag_role') - tag_service = result.get('tag_service') + tag_application = result.get("tag_application") + tag_environment = result.get("tag_environment") + tag_name = result.get("tag_name") + tag_role = result.get("tag_role") + tag_service = result.get("tag_service") if tag_application: - data['tags']['Application'] = tag_application + data["tags"]["Application"] = tag_application if tag_environment: - data['tags']['Environment'] = tag_environment + data["tags"]["Environment"] = tag_environment if tag_name: - data['tags']['Name'] = tag_name + data["tags"]["Name"] = tag_name if tag_role: - data['tags']['Role'] = tag_role + data["tags"]["Role"] = tag_role if tag_service: - data['tags']['Service'] = tag_service + data["tags"]["Service"] = tag_service except Exception as e: - _LOGGER.error(f'[_make_cost_data] make data error: {e}', exc_info=True) + _LOGGER.error(f"[_make_cost_data] make data error: {e}", exc_info=True) raise e costs_data.append(data) @@ -167,78 +179,80 @@ class CostSummaryItem(BaseModel): def _get_tags_from_cost_data(cost_data: dict) -> dict: tags = {} - if tags_str := cost_data.get('tags'): + if tags_str := cost_data.get("tags"): try: tags_dict: dict = utils.load_json(tags_str) for key, value in tags_dict.items(): - key = key.replace('user:', '') + key = key.replace("user:", "") tags[key] = value except Exception as e: _LOGGER.debug(e) - if tag_application := cost_data.get('tag_application'): - tags['Application'] = tag_application + if tag_application := cost_data.get("tag_application"): + tags["Application"] = tag_application - if tag_environment := cost_data.get('tag_environment'): - tags['Environment'] = tag_environment + if tag_environment := cost_data.get("tag_environment"): + tags["Environment"] = tag_environment - if tag_name := cost_data.get('tag_name'): - tags['Name'] = tag_name + if tag_name := cost_data.get("tag_name"): + tags["Name"] = tag_name - if tag_role := cost_data.get('tag_role'): - tags['Role'] = tag_role + if tag_role := cost_data.get("tag_role"): + tags["Role"] = tag_role - if tag_service := cost_data.get('tag_service'): - tags['Service'] = tag_service + if tag_service := cost_data.get("tag_service"): + tags["Service"] = tag_service return tags @staticmethod def _parse_usage_type(cost_info): - service_code = cost_info['service_code'] - usage_type = cost_info['usage_type'] - - if service_code == 'AWSDataTransfer': - if usage_type.find('-In-Bytes') > 0: - return 'data-transfer.in' - elif usage_type.find('-Out-Bytes') > 0: - return 'data-transfer.out' + service_code = cost_info["service_code"] + usage_type = cost_info["usage_type"] + + if service_code == "AWSDataTransfer": + if usage_type.find("-In-Bytes") > 0: + return "data-transfer.in" + elif usage_type.find("-Out-Bytes") > 0: + return "data-transfer.out" else: - return 'data-transfer.etc' - elif service_code == 'AmazonCloudFront': - if usage_type.find('-HTTPS') > 0: - return 'requests.https' - elif usage_type.find('-Out-Bytes') > 0: - return 'data-transfer.out' + return "data-transfer.etc" + elif service_code == "AmazonCloudFront": + if usage_type.find("-HTTPS") > 0: + return "requests.https" + elif usage_type.find("-Out-Bytes") > 0: + return "data-transfer.out" else: - return 'requests.http' + return "requests.http" else: - return cost_info['instance_type'] + return cost_info["instance_type"] @staticmethod - def _check_task_options(task_options): - if 'start' not in task_options: - raise ERROR_REQUIRED_PARAMETER(key='task_options.start') + def _check_task_options(task_options, task_type): + + if "start" not in task_options: + raise ERROR_REQUIRED_PARAMETER(key="task_options.start") - if 'account_id' not in task_options: - raise ERROR_REQUIRED_PARAMETER(key='task_options.account_id') + if "database" not in task_options: + raise ERROR_REQUIRED_PARAMETER(key="task_options.database") - if 'service_account_id' not in task_options: - raise ERROR_REQUIRED_PARAMETER(key='task_options.service_account_id') + if task_type == "identity": + if "account_id" not in task_options: + raise ERROR_REQUIRED_PARAMETER(key="task_options.account_id") - if 'database' not in task_options: - raise ERROR_REQUIRED_PARAMETER(key='task_options.database') + if "service_account_id" not in task_options: + raise ERROR_REQUIRED_PARAMETER(key="task_options.service_account_id") - if 'is_sync' not in task_options: - raise ERROR_REQUIRED_PARAMETER(key='task_options.is_sync') + if "is_sync" not in task_options: + raise ERROR_REQUIRED_PARAMETER(key="task_options.is_sync") @staticmethod def _get_date_range(start): date_ranges = [] - start_time = datetime.strptime(start, '%Y-%m-%d') + start_time = datetime.strptime(start, "%Y-%m-%d") now = datetime.utcnow() for dt in rrule.rrule(rrule.MONTHLY, dtstart=start_time, until=now): - billed_month = dt.strftime('%Y-%m') + billed_month = dt.strftime("%Y-%m") date_ranges.append(billed_month) return date_ranges diff --git a/src/spaceone/cost_analysis/manager/data_source_manager.py b/src/spaceone/cost_analysis/manager/data_source_manager.py index 26c350f..cfc8dce 100644 --- a/src/spaceone/cost_analysis/manager/data_source_manager.py +++ b/src/spaceone/cost_analysis/manager/data_source_manager.py @@ -15,14 +15,16 @@ def init_response(options): plugin_metadata = PluginMetadata() plugin_metadata.validate() - return { - 'metadata': plugin_metadata.to_primitive() - } + return {"metadata": plugin_metadata.to_primitive()} def verify_plugin(self, options, secret_data, schema): - space_connector: SpaceONEConnector = self.locator.get_connector('SpaceONEConnector') - space_connector.init_client(options, secret_data, schema) - space_connector.verify_plugin() - - aws_s3_connector: AWSS3Connector = self.locator.get_connector('AWSS3Connector') + task_type = options.get("task_type", "identity") + if task_type == "identity": + space_connector: SpaceONEConnector = self.locator.get_connector( + "SpaceONEConnector" + ) + space_connector.init_client(options, secret_data, schema) + space_connector.verify_plugin() + + aws_s3_connector: AWSS3Connector = self.locator.get_connector("AWSS3Connector") aws_s3_connector.create_session(options, secret_data, schema) diff --git a/src/spaceone/cost_analysis/manager/job_manager.py b/src/spaceone/cost_analysis/manager/job_manager.py index 2e947d4..2a7f9b8 100644 --- a/src/spaceone/cost_analysis/manager/job_manager.py +++ b/src/spaceone/cost_analysis/manager/job_manager.py @@ -2,91 +2,139 @@ from datetime import datetime, timedelta from spaceone.core.manager import BaseManager + +from spaceone.cost_analysis.connector import AWSS3Connector from spaceone.cost_analysis.connector.spaceone_connector import SpaceONEConnector from spaceone.cost_analysis.model.job_model import Tasks _LOGGER = logging.getLogger(__name__) -_DEFAULT_DATABASE = 'MZC' +_DEFAULT_DATABASE = "MZC" class JobManager(BaseManager): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.space_connector: SpaceONEConnector = self.locator.get_connector('SpaceONEConnector') - - def get_tasks(self, options, secret_data, schema, start, last_synchronized_at, domain_id): + self.aws_s3_connector: AWSS3Connector = self.locator.get_connector( + "AWSS3Connector" + ) + self.space_connector: SpaceONEConnector = self.locator.get_connector( + "SpaceONEConnector" + ) + + def get_tasks( + self, options, secret_data, schema, start, last_synchronized_at, domain_id + ): tasks = [] changed = [] start_time = self._get_start_time(start, last_synchronized_at) - start_date = start_time.strftime('%Y-%m-%d') + start_date = start_time.strftime("%Y-%m-%d") changed_time = start_time self.space_connector.init_client(options, secret_data, schema) response = self.space_connector.list_projects(domain_id) - total_count = response.get('total_count') or 0 + total_count = response.get("total_count") or 0 if total_count > 0: - project_info = response['results'][0] - _LOGGER.debug(f'[get_tasks] project info: {project_info}') - - project_id = project_info['project_id'] - database = project_info.get('tags', {}).get('database', _DEFAULT_DATABASE) + project_info = response["results"][0] + _LOGGER.debug(f"[get_tasks] project info: {project_info}") + project_id = project_info["project_id"] + database = project_info.get("tags", {}).get("database", _DEFAULT_DATABASE) response = self.space_connector.list_service_accounts(project_id) - for service_account_info in response.get('results', []): - service_account_id = service_account_info['service_account_id'] - service_account_name = service_account_info['name'] - account_id = service_account_info['data']['account_id'] - is_sync = service_account_info['tags'].get('is_sync', 'false') - database = service_account_info['tags'].get('database', database) - - if is_sync != 'true': - is_sync = 'false' - - _LOGGER.debug(f'[get_tasks] service_account({service_account_id}): name={service_account_name}, account_id={account_id}') + for service_account_info in response.get("results", []): + service_account_id = service_account_info["service_account_id"] + service_account_name = service_account_info["name"] + account_id = service_account_info["data"]["account_id"] + is_sync = service_account_info["tags"].get("is_sync", "false") + database = service_account_info["tags"].get("database", database) + + if is_sync != "true": + is_sync = "false" + + _LOGGER.debug( + f"[get_tasks] service_account({service_account_id}): name={service_account_name}, account_id={account_id}" + ) task_options = { - 'is_sync': is_sync, - 'service_account_id': service_account_id, - 'service_account_name': service_account_name, - 'account_id': account_id, - 'database': database + "is_sync": is_sync, + "service_account_id": service_account_id, + "service_account_name": service_account_name, + "account_id": account_id, + "database": database, } - if is_sync == 'false': + if is_sync == "false": first_sync_time = self._get_start_time(start) - task_options['start'] = first_sync_time.strftime('%Y-%m-%d') - - changed.append({ - 'start': first_sync_time, - 'filter': { - 'account': account_id - } - }) + task_options["start"] = first_sync_time.strftime("%Y-%m-%d") + + changed.append( + {"start": first_sync_time, "filter": {"account": account_id}} + ) else: - task_options['start'] = start_date + task_options["start"] = start_date - tasks.append({'task_options': task_options}) + tasks.append({"task_options": task_options}) - changed.append({ - 'start': changed_time - }) + changed.append({"start": changed_time}) - _LOGGER.debug(f'[get_tasks] tasks: {tasks}') - _LOGGER.debug(f'[get_tasks] changed: {changed}') + _LOGGER.debug(f"[get_tasks] tasks: {tasks}") + _LOGGER.debug(f"[get_tasks] changed: {changed}") - tasks = Tasks({'tasks': tasks, 'changed': changed}) + tasks = Tasks({"tasks": tasks, "changed": changed}) tasks.validate() return tasks.to_primitive() else: - _LOGGER.debug(f'[get_tasks] no project: tags.domain_id = {domain_id}') + _LOGGER.debug(f"[get_tasks] no project: tags.domain_id = {domain_id}") + + return {"tasks": tasks, "changed": changed} + + def get_tasks_directory_type( + self, options, secret_data, schema, start, last_synchronized_at, domain_id + ): + tasks = [] + changed = [] + + self.aws_s3_connector.create_session(options, secret_data, schema) + + start_time = self._get_start_time(start, last_synchronized_at) + start_date = start_time.strftime("%Y-%m-%d") + changed_time = start_time + + database = options.get("database", _DEFAULT_DATABASE) + accounts = options.get("accounts", []) + + path = f"SPACE_ONE/billing/database={database}/" + + if not accounts: + response = self.aws_s3_connector.list_objects(path) + for content in response.get("Contents", []): + key = content["Key"] + account_id = key.split("/")[3].split("=")[-1] + + if account_id and not "": + accounts.append(account_id) + + accounts = list(set(accounts)) + for account_id in accounts: + task_options = { + "account_id": account_id, + "database": database, + "start": start_date, + "is_sync": "true", + } + task_options["start"] = start_date + tasks.append({"task_options": task_options}) + + changed.append({"start": changed_time}) + _LOGGER.debug(f"[get_tasks] tasks: {tasks}") + _LOGGER.debug(f"[get_tasks] changed: {changed}") + + tasks = Tasks({"tasks": tasks, "changed": changed}) - return { - 'tasks': tasks, - 'changed': changed - } + tasks.validate() + return tasks.to_primitive() @staticmethod def _get_start_time(start, last_synchronized_at=None): @@ -100,6 +148,8 @@ def _get_start_time(start, last_synchronized_at=None): start_time: datetime = datetime.utcnow() - timedelta(days=365) start_time = start_time.replace(day=1) - start_time = start_time.replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=None) + start_time = start_time.replace( + hour=0, minute=0, second=0, microsecond=0, tzinfo=None + ) return start_time diff --git a/src/spaceone/cost_analysis/model/job_model.py b/src/spaceone/cost_analysis/model/job_model.py index 5d783a5..1d77bf4 100644 --- a/src/spaceone/cost_analysis/model/job_model.py +++ b/src/spaceone/cost_analysis/model/job_model.py @@ -2,16 +2,16 @@ from schematics.types import ListType, IntType, DateTimeType, StringType, DictType from schematics.types.compound import ModelType -__all__ = ['Tasks'] +__all__ = ["Tasks"] class TaskOptions(Model): start = StringType(required=True) - service_account_id = StringType(required=True) - service_account_name = StringType(required=True) + service_account_id = StringType() + service_account_name = StringType() account_id = StringType(required=True) database = StringType(required=True) - is_sync = StringType(required=True, choices=('true', 'false')) + is_sync = StringType(required=True, choices=("true", "false")) class Task(Model): diff --git a/src/spaceone/cost_analysis/service/job_service.py b/src/spaceone/cost_analysis/service/job_service.py index 9bc652b..7ce2970 100644 --- a/src/spaceone/cost_analysis/service/job_service.py +++ b/src/spaceone/cost_analysis/service/job_service.py @@ -14,11 +14,13 @@ class JobService(BaseService): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.job_mgr: JobManager = self.locator.get_manager('JobManager') + self.job_mgr: JobManager = self.locator.get_manager("JobManager") @transaction - @check_required(['options', 'secret_data']) - @change_timestamp_value(['start', 'last_synchronized_at'], timestamp_format='iso8601') + @check_required(["options", "secret_data"]) + @change_timestamp_value( + ["start", "last_synchronized_at"], timestamp_format="iso8601" + ) def get_tasks(self, params): """Get Job Tasks @@ -37,11 +39,19 @@ def get_tasks(self, params): """ - options = params['options'] - secret_data = params['secret_data'] - schema = params.get('schema') - start = params.get('start') - last_synchronized_at = params.get('last_synchronized_at') - domain_id = params['domain_id'] - - return self.job_mgr.get_tasks(options, secret_data, schema, start, last_synchronized_at, domain_id) + options = params["options"] + secret_data = params["secret_data"] + schema = params.get("schema") + start = params.get("start") + last_synchronized_at = params.get("last_synchronized_at") + domain_id = params["domain_id"] + task_type = options.get("task_type", "identity") + + if task_type == "identity": + return self.job_mgr.get_tasks( + options, secret_data, schema, start, last_synchronized_at, domain_id + ) + else: + return self.job_mgr.get_tasks_directory_type( + options, secret_data, schema, start, last_synchronized_at, domain_id + )