diff --git a/Pipfile b/Pipfile index 24a31acc2..13498db62 100644 --- a/Pipfile +++ b/Pipfile @@ -25,7 +25,6 @@ boto3 = ">=1.17.28" durationpy = "*" hjson = "*" jmespath = "*" -kappa = "==0.6.0" pip = ">=9.0.1" # Workaround until tests are updated to work with 'placebo' 0.10 # Move to 'dev-packages' when unpinned diff --git a/zappa/kappa/__init__.py b/zappa/kappa/__init__.py new file mode 100644 index 000000000..790a50b15 --- /dev/null +++ b/zappa/kappa/__init__.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2014, 2015 Mitch Garnaat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = '0.6.0' diff --git a/zappa/kappa/awsclient.py b/zappa/kappa/awsclient.py new file mode 100644 index 000000000..b086e5cc0 --- /dev/null +++ b/zappa/kappa/awsclient.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2015 Mitch Garnaat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import jmespath +import boto3 + + +LOG = logging.getLogger(__name__) + +_session_cache = {} + + +class AWSClient(object): + + def __init__(self, service_name, session): + self._service_name = service_name + self._session = session + self.client = self._create_client() + + @property + def service_name(self): + return self._service_name + + @property + def session(self): + return self._session + + @property + def region_name(self): + return self.client.meta.region_name + + def _create_client(self): + client = self._session.client(self._service_name) + return client + + def call(self, op_name, query=None, **kwargs): + """ + Make a request to a method in this client. The response data is + returned from this call as native Python data structures. + + This method differs from just calling the client method directly + in the following ways: + + * It automatically handles the pagination rather than + relying on a separate pagination method call. + * You can pass an optional jmespath query and this query + will be applied to the data returned from the low-level + call. This allows you to tailor the returned data to be + exactly what you want. + + :type op_name: str + :param op_name: The name of the request you wish to make. + + :type query: str + :param query: A jmespath query that will be applied to the + data returned by the operation prior to returning + it to the user. + + :type kwargs: keyword arguments + :param kwargs: Additional keyword arguments you want to pass + to the method when making the request. + """ + LOG.debug(kwargs) + if query: + query = jmespath.compile(query) + if self.client.can_paginate(op_name): + paginator = self.client.get_paginator(op_name) + results = paginator.paginate(**kwargs) + data = results.build_full_result() + else: + op = getattr(self.client, op_name) + data = op(**kwargs) + if query: + data = query.search(data) + return data + + +def create_session(profile_name, region_name): + global _session_cache + session_key = '{}:{}'.format(profile_name, region_name) + if session_key not in _session_cache: + session = boto3.session.Session( + region_name=region_name, profile_name=profile_name) + _session_cache[session_key] = session + return _session_cache[session_key] + + +def create_client(service_name, session): + return AWSClient(service_name, session) diff --git a/zappa/kappa/context.py b/zappa/kappa/context.py new file mode 100644 index 000000000..6557f357a --- /dev/null +++ b/zappa/kappa/context.py @@ -0,0 +1,304 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2014, 2015 Mitch Garnaat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import yaml +import time +import os +import shutil +import sys + +import kappa.function +import kappa.restapi +import kappa.event_source.dynamodb_stream +import kappa.event_source.kinesis +import kappa.event_source.s3 +import kappa.event_source.sns +import kappa.event_source.cloudwatch +import kappa.policy +import kappa.role +import kappa.awsclient + +import placebo + +LOG = logging.getLogger(__name__) + +DebugFmtString = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' +InfoFmtString = '-> %(message)s' + + +class Context(object): + + def __init__(self, config_file, environment=None, + debug=False, recording_path=None): + if debug: + self.set_logger('kappa', logging.DEBUG) + else: + self.set_logger('kappa', logging.INFO) + self._load_cache() + self.config = yaml.load(config_file) + self.environment = environment + + if self.environment not in self.config.get('environments', {}): + message = 'Invalid environment {0} specified'.format( + self.environment) + LOG.error(message) + sys.exit(1) + + profile = self.config['environments'][self.environment]['profile'] + region = self.config['environments'][self.environment]['region'] + self.session = kappa.awsclient.create_session(profile, region) + if recording_path: + self.pill = placebo.attach(self.session, recording_path) + self.pill.record() + self.policy = kappa.policy.Policy( + self, self.config['environments'][self.environment]) + self.role = kappa.role.Role( + self, self.config['environments'][self.environment]) + self.function = kappa.function.Function( + self, self.config['lambda']) + if 'restapi' in self.config: + self.restapi = kappa.restapi.RestApi( + self, self.config['restapi']) + else: + self.restapi = None + self.event_sources = [] + self._create_event_sources() + + def _load_cache(self): + self.cache = {} + if os.path.isdir('.kappa'): + cache_file = os.path.join('.kappa', 'cache') + if os.path.isfile(cache_file): + with open(cache_file, 'r') as fp: + self.cache = yaml.load(fp) + + def _delete_cache(self): + if os.path.isdir('.kappa'): + shutil.rmtree('.kappa') + self.cache = {} + + def _save_cache(self): + if not os.path.isdir('.kappa'): + os.mkdir('.kappa') + cache_file = os.path.join('.kappa', 'cache') + with open(cache_file, 'w') as fp: + yaml.dump(self.cache, fp) + + def get_cache_value(self, key): + ret = self.cache.setdefault(self.environment, dict()).get(key) + if ret is not None: + return ret.decode('utf8') + return ret + + def set_cache_value(self, key, value): + self.cache.setdefault( + self.environment, dict())[key] = value.encode('utf-8') + self._save_cache() + + @property + def name(self): + return self.config.get('name', os.path.basename(os.getcwd())) + + @property + def profile(self): + return self.config['environments'][self.environment]['profile'] + + @property + def region(self): + return self.config['environments'][self.environment]['region'] + + @property + def record(self): + return self.config.get('record', False) + + @property + def lambda_config(self): + return self.config.get('lambda') + + @property + def test_dir(self): + return self.config.get('tests', '_tests') + + @property + def source_dir(self): + return self.config.get('source', '_src') + + @property + def unit_test_runner(self): + return self.config.get('unit_test_runner', + 'nosetests . ../{}/unit/'.format(self.test_dir)) + + @property + def exec_role_arn(self): + return self.role.arn + + def debug(self): + self.set_logger('kappa', logging.DEBUG) + + def set_logger(self, logger_name, level=logging.INFO): + """ + Convenience function to quickly configure full debug output + to go to the console. + """ + log = logging.getLogger(logger_name) + log.setLevel(level) + + ch = logging.StreamHandler(None) + ch.setLevel(level) + + # create formatter + if level == logging.INFO: + formatter = logging.Formatter(InfoFmtString) + else: + formatter = logging.Formatter(DebugFmtString) + + # add formatter to ch + ch.setFormatter(formatter) + + # add ch to logger + log.addHandler(ch) + + def _create_event_sources(self): + env_cfg = self.config['environments'][self.environment] + if 'event_sources' not in env_cfg: + return + + event_sources = env_cfg.get('event_sources', {}) + if not event_sources: + return + + event_source_map = { + 'dynamodb': kappa.event_source.dynamodb_stream.DynamoDBStreamEventSource, + 'kinesis': kappa.event_source.kinesis.KinesisEventSource, + 's3': kappa.event_source.s3.S3EventSource, + 'sns': kappa.event_source.sns.SNSEventSource, + 'events': kappa.event_source.cloudwatch.CloudWatchEventSource + } + for event_source_cfg in event_sources: + _, _, svc, _ = event_source_cfg['arn'].split(':', 3) + event_source = event_source_map.get(svc, None) + if not event_source: + raise ValueError('Unknown event source: {0}'.format( + event_source_cfg['arn'])) + self.event_sources.append( + event_source(self, event_source_cfg)) + + def add_event_sources(self): + for event_source in self.event_sources: + event_source.add(self.function) + + def update_event_sources(self): + for event_source in self.event_sources: + event_source.update(self.function) + + def list_event_sources(self): + event_sources = [] + for event_source in self.event_sources: + event_sources.append({ + 'arn': event_source.arn, + 'starting_position': event_source.starting_position, + 'batch_size': event_source.batch_size, + 'enabled': event_source.enabled + }) + return event_sources + + def enable_event_sources(self): + for event_source in self.event_sources: + event_source.enable(self.function) + + def disable_event_sources(self): + for event_source in self.event_sources: + event_source.disable(self.function) + + def create(self): + if self.policy: + self.policy.create() + if self.role: + self.role.create() + # There is a consistency problem here. + # If you don't wait for a bit, the function.create call + # will fail because the policy has not been attached to the role. + LOG.debug('Waiting for policy/role propagation') + time.sleep(5) + self.function.create() + self.add_event_sources() + + def deploy(self): + if self.policy: + self.policy.deploy() + if self.role: + self.role.create() + self.function.deploy() + if self.restapi: + self.restapi.deploy() + self.add_event_sources() + + def invoke(self, data): + return self.function.invoke(data) + + def unit_tests(self): + # run any unit tests + unit_test_path = os.path.join(self.test_dir, 'unit') + if os.path.exists(unit_test_path): + os.chdir(self.source_dir) + print('running unit tests') + pipe = os.popen(self.unit_test_runner, 'r') + print(pipe.read()) + + def test(self): + return self.unit_tests() + + def dryrun(self): + return self.function.dryrun() + + def invoke_async(self): + return self.function.invoke_async() + + def tail(self): + return self.function.tail() + + def delete(self): + for event_source in self.event_sources: + event_source.remove(self.function) + self.function.log.delete() + self.function.delete() + if self.restapi: + self.restapi.delete() + time.sleep(5) + if self.role: + self.role.delete() + time.sleep(5) + if self.policy: + self.policy.delete() + self._delete_cache() + + def status(self): + status = {} + if self.policy: + status['policy'] = self.policy.status() + else: + status['policy'] = None + if self.role: + status['role'] = self.role.status() + else: + status['role'] = None + status['function'] = self.function.status() + status['event_sources'] = [] + if self.event_sources: + for event_source in self.event_sources: + status['event_sources'].append( + event_source.status(self.function)) + return status diff --git a/zappa/kappa/event_source/__init__.py b/zappa/kappa/event_source/__init__.py new file mode 100644 index 000000000..da1e4eba5 --- /dev/null +++ b/zappa/kappa/event_source/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2014, 2015 Mitch Garnaat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/zappa/kappa/event_source/base.py b/zappa/kappa/event_source/base.py new file mode 100644 index 000000000..4ceb12da8 --- /dev/null +++ b/zappa/kappa/event_source/base.py @@ -0,0 +1,37 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2014, 2015 Mitch Garnaat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class EventSource(object): + + def __init__(self, context, config): + self._context = context + self._config = config + + @property + def arn(self): + return self._config['arn'] + + @property + def starting_position(self): + return self._config.get('starting_position', 'LATEST') + + @property + def batch_size(self): + return self._config.get('batch_size', 100) + + @property + def enabled(self): + return self._config.get('enabled', False) diff --git a/zappa/kappa/event_source/cloudwatch.py b/zappa/kappa/event_source/cloudwatch.py new file mode 100644 index 000000000..bc87ae28c --- /dev/null +++ b/zappa/kappa/event_source/cloudwatch.py @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2014, 2015 Mitch Garnaat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kappa.event_source.base +import logging +import uuid + +LOG = logging.getLogger(__name__) + + +class CloudWatchEventSource(kappa.event_source.base.EventSource): + + def __init__(self, context, config): + super(CloudWatchEventSource, self).__init__(context, config) + self._events = kappa.awsclient.create_client('events', context.session) + self._lambda = kappa.awsclient.create_client('lambda', context.session) + self._name = config['arn'].split('/')[-1] + self._context = context + self._config = config + + def get_rule(self): + response = self._events.call('list_rules', NamePrefix=self._name) + LOG.debug(response) + if 'Rules' in response: + for r in response['Rules']: + if r['Name'] == self._name: + return r + return None + + def add(self, function): + kwargs = { + 'Name': self._name, + 'State': 'ENABLED' if self.enabled else 'DISABLED' + } + if 'schedule' in self._config: + kwargs['ScheduleExpression'] = self._config['schedule'] + if 'pattern' in self._config: + kwargs['EventPattern'] = self._config['pattern'] + if 'description' in self._config: + kwargs['Description'] = self._config['description'] + if 'role_arn' in self._config: + kwargs['RoleArn'] = self._config['role_arn'] + try: + response = self._events.call('put_rule', **kwargs) + LOG.debug(response) + self._config['arn'] = response['RuleArn'] + response = self._lambda.call('add_permission', + FunctionName=function.name, + StatementId=str(uuid.uuid4()), + Action='lambda:InvokeFunction', + Principal='events.amazonaws.com', + SourceArn=response['RuleArn']) + LOG.debug(response) + response = self._events.call('put_targets', + Rule=self._name, + Targets=[{ + 'Id': function.name, + 'Arn': function.arn + }]) + LOG.debug(response) + except Exception: + LOG.exception('Unable to put CloudWatch event source') + + def update(self, function): + self.add(function) + + def remove(self, function): + LOG.debug('removing CloudWatch event source') + try: + rule = self.get_rule() + if rule: + response = self._events.call('remove_targets', + Rule=self._name, + Ids=[function.name]) + LOG.debug(response) + response = self._events.call('delete_rule', + Name=self._name) + LOG.debug(response) + except Exception: + LOG.exception('Unable to remove CloudWatch event source %s', self._name) + + def status(self, function): + LOG.debug('status for CloudWatch event for %s', function.name) + return self._to_status(self.get_rule()) + + def enable(self, function): + if self.get_rule(): + self._events.call('enable_rule', Name=self._name) + + def disable(self, function): + if self.get_rule(): + self._events.call('disable_rule', Name=self._name) + + def _to_status(self, rule): + if rule: + return { + 'EventSourceArn': rule['Arn'], + 'State': rule['State'] + } + return None diff --git a/zappa/kappa/event_source/dynamodb_stream.py b/zappa/kappa/event_source/dynamodb_stream.py new file mode 100644 index 000000000..c7d80aa74 --- /dev/null +++ b/zappa/kappa/event_source/dynamodb_stream.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2014, 2015 Mitch Garnaat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kappa.event_source.kinesis + + +class DynamoDBStreamEventSource(kappa.event_source.kinesis.KinesisEventSource): + + pass diff --git a/zappa/kappa/event_source/kinesis.py b/zappa/kappa/event_source/kinesis.py new file mode 100644 index 000000000..ab85aa068 --- /dev/null +++ b/zappa/kappa/event_source/kinesis.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2014, 2015 Mitch Garnaat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import botocore.exceptions +import kappa.event_source.base +import logging + +LOG = logging.getLogger(__name__) + + +class KinesisEventSource(kappa.event_source.base.EventSource): + + def __init__(self, context, config): + super(KinesisEventSource, self).__init__(context, config) + self._lambda = kappa.awsclient.create_client( + 'lambda', context.session) + + def _get_uuid(self, function): + uuid = None + response = self._lambda.call( + 'list_event_source_mappings', + FunctionName=function.name, + EventSourceArn=self.arn) + LOG.debug(response) + if len(response['EventSourceMappings']) > 0: + uuid = response['EventSourceMappings'][0]['UUID'] + return uuid + + def add(self, function): + try: + response = self._lambda.call( + 'create_event_source_mapping', + FunctionName=function.name, + EventSourceArn=self.arn, + BatchSize=self.batch_size, + StartingPosition=self.starting_position, + Enabled=self.enabled + ) + LOG.debug(response) + except Exception: + LOG.exception('Unable to add event source') + + def enable(self, function): + self._config['enabled'] = True + try: + response = self._lambda.call( + 'update_event_source_mapping', + UUID=self._get_uuid(function), + Enabled=self.enabled + ) + LOG.debug(response) + except Exception: + LOG.exception('Unable to enable event source') + + def disable(self, function): + self._config['enabled'] = False + try: + response = self._lambda.call( + 'update_event_source_mapping', + FunctionName=function.name, + Enabled=self.enabled + ) + LOG.debug(response) + except Exception: + LOG.exception('Unable to disable event source') + + def update(self, function): + response = None + uuid = self._get_uuid(function) + if uuid: + try: + response = self._lambda.call( + 'update_event_source_mapping', + BatchSize=self.batch_size, + Enabled=self.enabled, + FunctionName=function.arn) + LOG.debug(response) + except Exception: + LOG.exception('Unable to update event source') + + def remove(self, function): + response = None + uuid = self._get_uuid(function) + if uuid: + response = self._lambda.call( + 'delete_event_source_mapping', + UUID=uuid) + LOG.debug(response) + return response + + def status(self, function): + response = None + LOG.debug('getting status for event source %s', self.arn) + uuid = self._get_uuid(function) + if uuid: + try: + response = self._lambda.call( + 'get_event_source_mapping', + UUID=self._get_uuid(function)) + LOG.debug(response) + except botocore.exceptions.ClientError: + LOG.debug('event source %s does not exist', self.arn) + response = None + else: + LOG.debug('No UUID for event source %s', self.arn) + return response diff --git a/zappa/kappa/event_source/s3.py b/zappa/kappa/event_source/s3.py new file mode 100644 index 000000000..9e5d527fc --- /dev/null +++ b/zappa/kappa/event_source/s3.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2014, 2015 Mitch Garnaat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kappa.event_source.base +import logging + +LOG = logging.getLogger(__name__) + + +class S3EventSource(kappa.event_source.base.EventSource): + + def __init__(self, context, config): + super(S3EventSource, self).__init__(context, config) + self._s3 = kappa.awsclient.create_client('s3', context.session) + + def _make_notification_id(self, function_name): + return 'Kappa-%s-notification' % function_name + + def _get_bucket_name(self): + return self.arn.split(':')[-1] + + def add(self, function): + notification_spec = { + 'LambdaFunctionConfigurations': [ + { + 'Id': self._make_notification_id(function.name), + 'Events': [e for e in self._config['events']], + 'LambdaFunctionArn': '%s:%s' % (function.arn, function._context.environment), + } + ] + } + + # Add S3 key filters + if 'key_filters' in self._config: + filters_spec = { 'Key' : { 'FilterRules' : [] } } + for filter in self._config['key_filters']: + if 'type' in filter and 'value' in filter and filter['type'] in ('prefix', 'suffix'): + rule = { 'Name' : filter['type'], 'Value' : filter['value'] } + filters_spec['Key']['FilterRules'].append(rule) + notification_spec['LambdaFunctionConfigurations'][0]['Filter'] = filters_spec + + try: + response = self._s3.call( + 'put_bucket_notification_configuration', + Bucket=self._get_bucket_name(), + NotificationConfiguration=notification_spec) + LOG.debug(response) + except Exception as exc: + LOG.debug(exc.response) + LOG.exception('Unable to add S3 event source') + + enable = add + + def update(self, function): + self.add(function) + + def remove(self, function): + LOG.debug('removing s3 notification') + response = self._s3.call( + 'get_bucket_notification', + Bucket=self._get_bucket_name()) + LOG.debug(response) + if 'CloudFunctionConfiguration' in response: + fn_arn = response['CloudFunctionConfiguration']['CloudFunction'] + if fn_arn == function.arn: + del response['CloudFunctionConfiguration'] + del response['ResponseMetadata'] + response = self._s3.call( + 'put_bucket_notification', + Bucket=self._get_bucket_name(), + NotificationConfiguration=response) + LOG.debug(response) + + disable = remove + + def status(self, function): + LOG.debug('status for s3 notification for %s', function.name) + response = self._s3.call( + 'get_bucket_notification', + Bucket=self._get_bucket_name()) + LOG.debug(response) + if 'CloudFunctionConfiguration' not in response: + return None + return { + 'EventSourceArn': response['CloudFunctionConfiguration']['CloudFunction'], + 'State': 'Enabled' + } diff --git a/zappa/kappa/event_source/sns.py b/zappa/kappa/event_source/sns.py new file mode 100644 index 000000000..8b965ffbd --- /dev/null +++ b/zappa/kappa/event_source/sns.py @@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2014, 2015 Mitch Garnaat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kappa.awsclient +import kappa.event_source.base +import logging + +LOG = logging.getLogger(__name__) + + +class SNSEventSource(kappa.event_source.base.EventSource): + + def __init__(self, context, config): + super(SNSEventSource, self).__init__(context, config) + self._sns = kappa.awsclient.create_client('sns', context.session) + + def _make_notification_id(self, function_name): + return 'Kappa-%s-notification' % function_name + + def exists(self, function): + try: + response = self._sns.call( + 'list_subscriptions_by_topic', + TopicArn=self.arn) + LOG.debug(response) + for subscription in response['Subscriptions']: + if subscription['Endpoint'] == function.arn: + return subscription + return None + except Exception: + LOG.exception('Unable to find event source %s', self.arn) + + def add(self, function): + try: + response = self._sns.call( + 'subscribe', + TopicArn=self.arn, Protocol='lambda', + Endpoint=function.arn) + LOG.debug(response) + except Exception: + LOG.exception('Unable to add SNS event source') + + enable = add + + def update(self, function): + self.add(function) + + def remove(self, function): + LOG.debug('removing SNS event source') + try: + subscription = self.exists(function) + if subscription: + response = self._sns.call( + 'unsubscribe', + SubscriptionArn=subscription['SubscriptionArn']) + LOG.debug(response) + except Exception: + LOG.exception('Unable to remove event source %s', self.arn) + + disable = remove + + def status(self, function): + LOG.debug('status for SNS notification for %s', function.name) + status = self.exists(function) + if status: + status['EventSourceArn'] = status['TopicArn'] + return status diff --git a/zappa/kappa/function.py b/zappa/kappa/function.py new file mode 100644 index 000000000..294093009 --- /dev/null +++ b/zappa/kappa/function.py @@ -0,0 +1,520 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2014, 2015 Mitch Garnaat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import hashlib +import logging +import os +import shutil +import time +import uuid +import zipfile + +from botocore.exceptions import ClientError + +import kappa.awsclient +import kappa.log + +LOG = logging.getLogger(__name__) + + +class Function(object): + + excluded_dirs = ['boto3', 'botocore', 'concurrent', 'dateutil', + 'docutils', 'futures', 'jmespath', 'python_dateutil'] + excluded_files = ['.gitignore'] + + def __init__(self, context, config): + self._context = context + self._config = config + self._lambda_client = kappa.awsclient.create_client( + 'lambda', context.session) + self._response = None + self._log = None + + @property + def name(self): + return self._context.name + + @property + def runtime(self): + return self._config['runtime'] + + @property + def handler(self): + return self._config['handler'] + + @property + def dependencies(self): + return self._config.get('dependencies', list()) + + @property + def description(self): + return self._config['description'] + + @property + def timeout(self): + return self._config['timeout'] + + @property + def memory_size(self): + return self._config['memory_size'] + + @property + def vpc_config(self): + vpc_config = {} + if 'vpc_config' in self._config: + if 'security_group_ids' in self._config['vpc_config']: + sgids = self._config['vpc_config']['security_group_ids'] + vpc_config['SecurityGroupIds'] = sgids + if 'subnet_ids' in self._config['vpc_config']: + snids = self._config['vpc_config']['subnet_ids'] + vpc_config['SubnetIds'] = snids + return vpc_config + + @property + def zipfile_name(self): + return '{}.zip'.format(self._context.name) + + @property + def tests(self): + return self._config.get('tests', '_tests') + + @property + def permissions(self): + return self._config.get('permissions', list()) + + @property + def log(self): + if self._log is None: + log_group_name = '/aws/lambda/%s' % self.name + self._log = kappa.log.Log(self._context, log_group_name) + return self._log + + @property + def code_sha_256(self): + return self._get_response_configuration('CodeSha256') + + @property + def arn(self): + return self._get_response_configuration('FunctionArn') + + @property + def alias_arn(self): + return self.arn + ':{}'.format(self._context.environment) + + @property + def repository_type(self): + return self._get_response_code('RepositoryType') + + @property + def location(self): + return self._get_response_code('Location') + + @property + def version(self): + return self._get_response_configuration('Version') + + @property + def deployment_uri(self): + return 'https://{}.execute-api.{}.amazonaws.com/{}'.format( + self.api_id, self._apigateway_client.region_name, + self._context.environment) + + def _get_response(self): + if self._response is None: + try: + self._response = self._lambda_client.call( + 'get_function', + FunctionName=self.name) + LOG.debug(self._response) + except Exception: + LOG.debug('Unable to find ARN for function: %s', self.name) + return self._response + + def _get_response_configuration(self, key, default=None): + value = None + response = self._get_response() + if response: + if 'Configuration' in response: + value = response['Configuration'].get(key, default) + return value + + def _get_response_code(self, key, default=None): + value = None + response = self._get_response + if response: + if 'Configuration' in response: + value = response['Configuration'].get(key, default) + return value + + def _check_function_md5(self): + # Zip up the source code and then compute the MD5 of that. + # If the MD5 does not match the cached MD5, the function has + # changed and needs to be updated so return True. + changed = True + self._copy_config_file() + files = [] + self.dependencies + [self._context.source_dir] + self.zip_lambda_function(self.zipfile_name, files) + m = hashlib.md5() + with open(self.zipfile_name, 'rb') as fp: + m.update(fp.read()) + zip_md5 = m.hexdigest() + cached_md5 = self._context.get_cache_value('zip_md5') + LOG.debug('zip_md5: %s', zip_md5) + LOG.debug('cached md5: %s', cached_md5) + if zip_md5 != cached_md5: + self._context.set_cache_value('zip_md5', zip_md5) + else: + changed = False + LOG.info('function unchanged') + return changed + + def _check_config_md5(self): + # Compute the MD5 of all of the components of the configuration. + # If the MD5 does not match the cached MD5, the configuration has + # changed and needs to be updated so return True. + m = hashlib.md5() + m.update(self.description.encode('utf-8')) + m.update(self.handler.encode('utf-8')) + m.update(str(self.memory_size).encode('utf-8')) + m.update(self._context.exec_role_arn.encode('utf-8')) + m.update(str(self.timeout).encode('utf-8')) + m.update(str(self.vpc_config).encode('utf-8')) + config_md5 = m.hexdigest() + cached_md5 = self._context.get_cache_value('config_md5') + LOG.debug('config_md5: %s', config_md5) + LOG.debug('cached_md5: %s', cached_md5) + if config_md5 != cached_md5: + self._context.set_cache_value('config_md5', config_md5) + changed = True + else: + changed = False + return changed + + def _copy_config_file(self): + config_name = '{}_config.json'.format(self._context.environment) + config_path = os.path.join(self._context.source_dir, config_name) + if os.path.exists(config_path): + dest_path = os.path.join(self._context.source_dir, 'config.json') + LOG.debug('copy %s to %s', config_path, dest_path) + shutil.copy2(config_path, dest_path) + + def _zip_lambda_dir(self, zipfile_name, lambda_dir): + LOG.debug('_zip_lambda_dir: lambda_dir=%s', lambda_dir) + LOG.debug('zipfile_name=%s', zipfile_name) + relroot = os.path.abspath(lambda_dir) + with zipfile.ZipFile(zipfile_name, 'a', + compression=zipfile.ZIP_DEFLATED) as zf: + for root, subdirs, files in os.walk(lambda_dir): + excluded_dirs = [] + for subdir in subdirs: + for excluded in self.excluded_dirs: + if subdir.startswith(excluded): + excluded_dirs.append(subdir) + for excluded in excluded_dirs: + subdirs.remove(excluded) + + try: + dir_path = os.path.relpath(root, relroot) + dir_path = os.path.normpath( + os.path.splitdrive(dir_path)[1] + ) + while dir_path[0] in (os.sep, os.altsep): + dir_path = dir_path[1:] + dir_path += '/' + zf.getinfo(dir_path) + except KeyError: + zf.write(root, dir_path) + + for filename in files: + if filename not in self.excluded_files: + filepath = os.path.join(root, filename) + if os.path.isfile(filepath): + arcname = os.path.join( + os.path.relpath(root, relroot), filename) + try: + zf.getinfo(arcname) + except KeyError: + zf.write(filepath, arcname) + + def _zip_lambda_file(self, zipfile_name, lambda_file): + LOG.debug('_zip_lambda_file: lambda_file=%s', lambda_file) + LOG.debug('zipfile_name=%s', zipfile_name) + with zipfile.ZipFile(zipfile_name, 'a', + compression=zipfile.ZIP_DEFLATED) as zf: + try: + zf.getinfo(lambda_file) + except KeyError: + zf.write(lambda_file) + + def zip_lambda_function(self, zipfile_name, files): + try: + os.remove(zipfile_name) + except OSError: + pass + for f in files: + LOG.debug('adding file %s', f) + if os.path.isdir(f): + self._zip_lambda_dir(zipfile_name, f) + else: + self._zip_lambda_file(zipfile_name, f) + + def exists(self): + return self._get_response() + + def tail(self, attempt=0): + try: + LOG.debug('tailing function: %s', self.name) + return self.log.tail() + except Exception as e: + if attempt > 10: + return e + + time.sleep(attempt) + return self.tail(attempt + 1) + + def list_aliases(self): + LOG.info('listing aliases of %s', self.name) + try: + response = self._lambda_client.call( + 'list_aliases', + FunctionName=self.name) + LOG.debug(response) + except Exception: + LOG.exception('Unable to list aliases') + return response.get('Versions', list()) + + def find_latest_version(self): + # Find the current (latest) version by version number + # First find the SHA256 of $LATEST + versions = self.list_versions() + for v in versions: + if v['Version'] == '$LATEST': + latest_sha256 = v['CodeSha256'] + break + for v in versions: + if v['Version'] != '$LATEST': + if v['CodeSha256'] == latest_sha256: + version = v['Version'] + break + return version + + def create_alias(self, name, description, version=None): + if not version: + version = self.find_latest_version() + try: + LOG.debug('creating alias %s=%s', name, version) + response = self._lambda_client.call( + 'create_alias', + FunctionName=self.name, + Description=description, + FunctionVersion=version, + Name=name) + LOG.debug(response) + except Exception: + LOG.exception('Unable to create alias') + + def update_alias(self, name, description, version=None): + # Find the current (latest) version by version number + # First find the SHA256 of $LATEST + if not version: + version = self.find_latest_version() + try: + LOG.debug('updating alias %s=%s', name, version) + response = self._lambda_client.call( + 'update_alias', + FunctionName=self.name, + Description=description, + FunctionVersion=version, + Name=name) + LOG.debug(response) + except Exception: + LOG.exception('Unable to update alias') + + def add_permission(self, action, principal, + source_arn=None, source_account=None): + try: + kwargs = { + 'FunctionName': self.name, + 'Qualifier': self._context.environment, + 'StatementId': str(uuid.uuid4()), + 'Action': action, + 'Principal': principal} + if source_arn: + kwargs['SourceArn'] = source_arn + if source_account: + kwargs['SourceAccount'] = source_account + response = self._lambda_client.call( + 'add_permission', **kwargs) + LOG.debug(response) + except Exception: + LOG.exception('Unable to add permission') + + def add_permissions(self): + if self.permissions: + time.sleep(5) + for permission in self.permissions: + self.add_permission( + permission['action'], + permission['principal'], + permission.get('source_arn'), + permission.get('source_account')) + + def create(self): + LOG.info('creating function %s', self.name) + self._check_function_md5() + self._check_config_md5() + # There is a consistency problem here. + # Sometimes the role is not ready to be used by the function. + ready = False + while not ready: + with open(self.zipfile_name, 'rb') as fp: + exec_role = self._context.exec_role_arn + LOG.debug('exec_role=%s', exec_role) + try: + zipdata = fp.read() + response = self._lambda_client.call( + 'create_function', + FunctionName=self.name, + Code={'ZipFile': zipdata}, + Runtime=self.runtime, + Role=exec_role, + Handler=self.handler, + Description=self.description, + Timeout=self.timeout, + MemorySize=self.memory_size, + VpcConfig=self.vpc_config, + Publish=True) + LOG.debug(response) + description = 'For stage {}'.format( + self._context.environment) + self.create_alias(self._context.environment, description) + ready = True + except ClientError as e: + if 'InvalidParameterValueException' in str(e): + LOG.debug('Role is not ready, waiting') + time.sleep(2) + else: + LOG.debug(str(e)) + ready = True + except Exception: + LOG.exception('Unable to upload zip file') + ready = True + self.add_permissions() + + def update(self): + LOG.info('updating function %s', self.name) + if self._check_function_md5(): + self._response = None + with open(self.zipfile_name, 'rb') as fp: + try: + LOG.info('uploading new function zipfile %s', + self.zipfile_name) + zipdata = fp.read() + response = self._lambda_client.call( + 'update_function_code', + FunctionName=self.name, + ZipFile=zipdata, + Publish=True) + LOG.debug(response) + self.update_alias( + self._context.environment, + 'For the {} stage'.format(self._context.environment)) + except Exception: + LOG.exception('unable to update zip file') + + def update_configuration(self): + if self._check_config_md5(): + self._response = None + LOG.info('updating configuration for %s', self.name) + exec_role = self._context.exec_role_arn + LOG.debug('exec_role=%s', exec_role) + try: + response = self._lambda_client.call( + 'update_function_configuration', + FunctionName=self.name, + VpcConfig=self.vpc_config, + Role=exec_role, + Handler=self.handler, + Description=self.description, + Timeout=self.timeout, + MemorySize=self.memory_size) + LOG.debug(response) + except Exception: + LOG.exception('unable to update function configuration') + else: + LOG.info('function configuration has not changed') + + def deploy(self): + if self.exists(): + self.update_configuration() + return self.update() + return self.create() + + def list_versions(self): + try: + response = self._lambda_client.call( + 'list_versions_by_function', + FunctionName=self.name) + LOG.debug(response) + except Exception: + LOG.exception('Unable to list versions') + return response['Versions'] + + def tag(self, name, description): + self.create_alias(name, description) + + def delete(self): + LOG.info('deleting function %s', self.name) + response = None + try: + response = self._lambda_client.call( + 'delete_function', + FunctionName=self.name) + LOG.debug(response) + except ClientError: + LOG.debug('function %s: not found', self.name) + return response + + def status(self): + try: + response = self._lambda_client.call( + 'get_function', + FunctionName=self.name) + LOG.debug(response) + except ClientError: + LOG.debug('function %s not found', self.name) + response = None + return response + + def _invoke(self, data, invocation_type): + LOG.debug('invoke %s as %s', self.name, invocation_type) + response = self._lambda_client.call( + 'invoke', + FunctionName=self.name, + InvocationType=invocation_type, + LogType='Tail', + Payload=data) + LOG.debug(response) + return response + + def invoke(self, test_data=None): + return self._invoke(test_data, 'RequestResponse') + + def invoke_async(self, test_data=None): + return self._invoke(test_data, 'Event') + + def dryrun(self, test_data=None): + return self._invoke(test_data, 'DryRun') diff --git a/zappa/kappa/log.py b/zappa/kappa/log.py new file mode 100644 index 000000000..8bd19d1bc --- /dev/null +++ b/zappa/kappa/log.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2014, 2015 Mitch Garnaat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from botocore.exceptions import ClientError + +import kappa.awsclient + +LOG = logging.getLogger(__name__) + + +class Log(object): + + def __init__(self, context, log_group_name): + self._context = context + self.log_group_name = log_group_name + self._log_client = kappa.awsclient.create_client( + 'logs', context.session) + + def _check_for_log_group(self): + LOG.debug('checking for log group') + response = self._log_client.call('describe_log_groups') + log_group_names = [lg['logGroupName'] for lg in response['logGroups']] + return self.log_group_name in log_group_names + + def streams(self): + LOG.debug('getting streams for log group: %s', self.log_group_name) + if not self._check_for_log_group(): + LOG.info( + 'log group %s has not been created yet', self.log_group_name) + return [] + response = self._log_client.call( + 'describe_log_streams', + logGroupName=self.log_group_name) + LOG.debug(response) + return response['logStreams'] + + def tail(self): + LOG.debug('tailing log group: %s', self.log_group_name) + if not self._check_for_log_group(): + LOG.info( + 'log group %s has not been created yet', self.log_group_name) + return [] + latest = None + streams = self.streams() + for stream in streams: + if not latest: + latest = stream + elif stream['lastEventTimestamp'] > latest['lastEventTimestamp']: + latest = stream + response = self._log_client.call( + 'get_log_events', + logGroupName=self.log_group_name, + logStreamName=latest['logStreamName']) + LOG.debug(response) + return response['events'] + + def delete(self): + try: + response = self._log_client.call( + 'delete_log_group', + logGroupName=self.log_group_name) + LOG.debug(response) + except ClientError: + LOG.debug('unable to delete log group') diff --git a/zappa/kappa/policy.py b/zappa/kappa/policy.py new file mode 100644 index 000000000..a314c5324 --- /dev/null +++ b/zappa/kappa/policy.py @@ -0,0 +1,189 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2014, 2015 Mitch Garnaat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import json +import hashlib + +import kappa.awsclient + +LOG = logging.getLogger(__name__) + + +class Policy(object): + + _path_prefix = '/kappa/' + + def __init__(self, context, config): + self.context = context + self.config = config + self._iam_client = kappa.awsclient.create_client( + 'iam', self.context.session) + self._arn = self.config['policy'].get('arn', None) + + @property + def name(self): + return '{}_{}'.format(self.context.name, self.context.environment) + + @property + def description(self): + return 'A kappa policy to control access to {} resources'.format( + self.context.environment) + + def document(self): + if ('resources' not in self.config['policy'] and + 'statements' not in self.config['policy']): + return None + document = {'Version': '2012-10-17'} + statements = [] + document['Statement'] = statements + for resource in self.config['policy'].get('resources', []): + arn = resource['arn'] + _, _, service, _ = arn.split(':', 3) + statement = {"Effect": "Allow", + "Resource": resource['arn']} + actions = [] + for action in resource['actions']: + actions.append("{}:{}".format(service, action)) + statement['Action'] = actions + statements.append(statement) + for statement in self.config['policy'].get('statements', []): + statements.append(statement) + return json.dumps(document, indent=2, sort_keys=True) + + @property + def arn(self): + if self._arn is None: + policy = self.exists() + if policy: + self._arn = policy.get('Arn', None) + return self._arn + + def _find_all_policies(self): + try: + response = self._iam_client.call( + 'list_policies', PathPrefix=self._path_prefix) + except Exception: + LOG.exception('Error listing policies') + response = {} + return response.get('Policies', list()) + + def _list_versions(self): + try: + response = self._iam_client.call( + 'list_policy_versions', + PolicyArn=self.arn) + except Exception: + LOG.exception('Error listing policy versions') + response = {} + return response.get('Versions', list()) + + def exists(self): + for policy in self._find_all_policies(): + if policy['PolicyName'] == self.name: + return policy + return None + + def _add_policy_version(self): + document = self.document() + if not document: + LOG.debug('not a custom policy, no need to version it') + return + versions = self._list_versions() + if len(versions) == 5: + try: + response = self._iam_client.call( + 'delete_policy_version', + PolicyArn=self.arn, + VersionId=versions[-1]['VersionId']) + except Exception: + LOG.exception('Unable to delete policy version') + # update policy with a new version here + try: + response = self._iam_client.call( + 'create_policy_version', + PolicyArn=self.arn, + PolicyDocument=document, + SetAsDefault=True) + LOG.debug(response) + except Exception: + LOG.exception('Error creating new Policy version') + + def _check_md5(self, document): + m = hashlib.md5() + m.update(document.encode('utf-8')) + policy_md5 = m.hexdigest() + cached_md5 = self.context.get_cache_value('policy_md5') + LOG.debug('policy_md5: %s', policy_md5) + LOG.debug('cached md5: %s', cached_md5) + if policy_md5 != cached_md5: + self.context.set_cache_value('policy_md5', policy_md5) + return True + return False + + def deploy(self): + LOG.info('deploying policy %s', self.name) + document = self.document() + if not document: + LOG.info('not a custom policy, no need to create it') + return + policy = self.exists() + if policy: + if self._check_md5(document): + self._add_policy_version() + else: + LOG.info('policy unchanged') + else: + # create a new policy + self._check_md5(document) + try: + response = self._iam_client.call( + 'create_policy', + Path=self._path_prefix, PolicyName=self.name, + PolicyDocument=document, + Description=self.description) + LOG.debug(response) + except Exception: + LOG.exception('Error creating Policy') + + def delete(self): + response = None + # Only delete the policy if it has a document associated with it. + # This indicates that it was a custom policy created by kappa. + document = self.document() + if self.arn and document: + LOG.info('deleting policy %s', self.name) + LOG.info('deleting all policy versions for %s', self.name) + versions = self._list_versions() + for version in versions: + LOG.debug('deleting version %s', version['VersionId']) + if not version['IsDefaultVersion']: + try: + response = self._iam_client.call( + 'delete_policy_version', + PolicyArn=self.arn, + VersionId=version['VersionId']) + except Exception: + LOG.exception('Unable to delete policy version %s', + version['VersionId']) + LOG.debug('now delete policy') + response = self._iam_client.call( + 'delete_policy', PolicyArn=self.arn) + LOG.debug(response) + return response + + def status(self): + LOG.debug('getting status for policy %s', self.name) + return self.exists() diff --git a/zappa/kappa/restapi.py b/zappa/kappa/restapi.py new file mode 100644 index 000000000..fcf1f710b --- /dev/null +++ b/zappa/kappa/restapi.py @@ -0,0 +1,269 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2014, 2015 Mitch Garnaat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from botocore.exceptions import ClientError + +import kappa.awsclient +import kappa.log + +LOG = logging.getLogger(__name__) + + +class RestApi(object): + + def __init__(self, context, config): + self._context = context + self._config = config + self._apigateway_client = kappa.awsclient.create_client( + 'apigateway', context.session) + self._api = None + self._resources = None + self._resource = None + + @property + def arn(self): + _, _, _, region, account, _ = self._context.function.arn.split(':', 5) + arn = 'arn:aws:execute-api:{}:{}:{}/*/*/{}'.format( + region, account, self.api_id, self.resource_name) + return arn + + @property + def api_name(self): + return self._config['name'] + + @property + def description(self): + return self._config['description'] + + @property + def resource_name(self): + return self._config['resource']['name'] + + @property + def parent_resource(self): + return self._config['resource']['parent'] + + @property + def full_path(self): + parts = self.parent_resource.split('/') + parts.append(self.resource_name) + return '/'.join(parts) + + @property + def api_id(self): + api = self._get_api() + return api.get('id') + + @property + def resource_id(self): + resources = self._get_resources() + return resources.get(self.full_path).get('id') + + def _get_api(self): + if self._api is None: + try: + response = self._apigateway_client.call( + 'get_rest_apis') + LOG.debug(response) + for item in response['items']: + if item['name'] == self.api_name: + self._api = item + except Exception: + LOG.exception('Error finding restapi') + return self._api + + def _get_resources(self): + if self._resources is None: + try: + response = self._apigateway_client.call( + 'get_resources', + restApiId=self.api_id) + LOG.debug(response) + self._resources = {} + for item in response['items']: + self._resources[item['path']] = item + except Exception: + LOG.exception('Unable to find resources for: %s', + self.api_name) + return self._resources + + def create_restapi(self): + if not self.api_exists(): + LOG.info('creating restapi %s', self.api_name) + try: + response = self._apigateway_client.call( + 'create_rest_api', + name=self.api_name, + description=self.description) + LOG.debug(response) + except Exception: + LOG.exception('Unable to create new restapi') + + def create_resource_path(self): + path = self.full_path + parts = path.split('/') + resources = self._get_resources() + parent = None + build_path = [] + for part in parts: + LOG.debug('part=%s', part) + build_path.append(part) + LOG.debug('build_path=%s', build_path) + full_path = '/'.join(build_path) + LOG.debug('full_path=%s', full_path) + if full_path is '': + parent = resources['/'] + else: + if full_path not in resources and parent: + try: + response = self._apigateway_client.call( + 'create_resource', + restApiId=self.api_id, + parentId=parent['id'], + pathPart=part) + LOG.debug(response) + resources[full_path] = response + except Exception: + LOG.exception('Unable to create new resource') + parent = resources[full_path] + self._item = resources[path] + + def create_method(self, method, config): + LOG.info('creating method: %s', method) + try: + response = self._apigateway_client.call( + 'put_method', + restApiId=self.api_id, + resourceId=self.resource_id, + httpMethod=method, + authorizationType=config.get('authorization_type'), + apiKeyRequired=config.get('apikey_required', False) + ) + LOG.debug(response) + LOG.debug('now create integration') + uri = 'arn:aws:apigateway:{}:'.format( + self._apigateway_client.region_name) + uri += 'lambda:path/2015-03-31/functions/' + uri += self._context.function.arn + uri += ':${stageVariables.environment}/invocations' + LOG.debug(uri) + response = self._apigateway_client.call( + 'put_integration', + restApiId=self.api_id, + resourceId=self.resource_id, + httpMethod=method, + integrationHttpMethod=method, + type='AWS', + uri=uri + ) + except Exception: + LOG.exception('Unable to create integration: %s', method) + + def create_deployment(self): + LOG.info('creating a deployment for %s to stage: %s', + self.api_name, self._context.environment) + try: + response = self._apigateway_client.call( + 'create_deployment', + restApiId=self.api_id, + stageName=self._context.environment + ) + LOG.debug(response) + LOG.info('Now deployed to: %s', self.deployment_uri) + except Exception: + LOG.exception('Unable to create a deployment') + + def create_methods(self): + resource_config = self._config['resource'] + for method in resource_config.get('methods', dict()): + if not self.method_exists(method): + method_config = resource_config['methods'][method] + self.create_method(method, method_config) + + def api_exists(self): + return self._get_api() + + def resource_exists(self): + resources = self._get_resources() + return resources.get(self.full_path) + + def method_exists(self, method): + exists = False + resource = self.resource_exists() + if resource: + methods = resource.get('resourceMethods') + if methods: + for method_name in methods: + if method_name == method: + exists = True + return exists + + def find_parent_resource_id(self): + parent_id = None + resources = self._get_resources() + for item in resources: + if item['path'] == self.parent: + parent_id = item['id'] + return parent_id + + def api_update(self): + LOG.info('updating restapi %s', self.api_name) + + def resource_update(self): + LOG.info('updating resource %s', self.full_path) + + def add_permission(self): + LOG.info('Adding permission for APIGateway to call function') + self._context.function.add_permission( + action='lambda:InvokeFunction', + principal='apigateway.amazonaws.com', + source_arn=self.arn) + + def deploy(self): + if self.api_exists(): + self.api_update() + else: + self.create_restapi() + if self.resource_exists(): + self.resource_update() + else: + self.create_resource_path() + self.create_methods() + self.add_permission() + + def delete(self): + LOG.info('deleting resource %s', self.resource_name) + try: + response = self._apigateway_client.call( + 'delete_resource', + restApiId=self.api_id, + resourceId=self.resource_id) + LOG.debug(response) + except ClientError: + LOG.exception('Unable to delete resource %s', self.resource_name) + return response + + def status(self): + try: + response = self._apigateway_client.call( + 'delete_', + FunctionName=self.name) + LOG.debug(response) + except ClientError: + LOG.exception('function %s not found', self.name) + response = None + return response diff --git a/zappa/kappa/role.py b/zappa/kappa/role.py new file mode 100644 index 000000000..0051108ca --- /dev/null +++ b/zappa/kappa/role.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2014, 2015 Mitch Garnaat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from botocore.exceptions import ClientError + +import kappa.awsclient + +LOG = logging.getLogger(__name__) + + +AssumeRolePolicyDocument = """{ + "Version" : "2012-10-17", + "Statement": [ { + "Effect": "Allow", + "Principal": { + "Service": [ "lambda.amazonaws.com" ] + }, + "Action": [ "sts:AssumeRole" ] + } ] +}""" + + +class Role(object): + + Path = '/kappa/' + + def __init__(self, context, config, iam_client=None): + self._context = context + self._config = config + self._iam_client = iam_client + if not iam_client: + self._iam_client = kappa.awsclient.create_client( + 'iam', context.session) + self._arn = None + + @property + def name(self): + return '{}_{}'.format(self._context.name, self._context.environment) + + @property + def arn(self): + if self._arn is None: + role = self._get_role() + if role: + self._arn = role['Arn'] + else: + LOG.debug('Unable to find ARN for role: %s', self.name) + return self._arn + + def _get_role(self): + try: + response = self._iam_client.call('get_role', RoleName=self.name) + if response and 'Role' in response: + response = response['Role'] + return response + except ClientError as e: + if e.response['Error']['Code'] != 'NoSuchEntity': + LOG.exception('Error getting role') + except Exception: + LOG.exception('Error getting role') + return None + + def exists(self): + return self._get_role() is not None + + def create(self): + LOG.info('creating role %s', self.name) + role = self.exists() + if not role: + try: + response = self._iam_client.call( + 'create_role', + Path=self.Path, RoleName=self.name, + AssumeRolePolicyDocument=AssumeRolePolicyDocument) + LOG.debug(response) + if self._context.policy: + LOG.debug('attaching policy %s', self._context.policy.arn) + response = self._iam_client.call( + 'attach_role_policy', + RoleName=self.name, + PolicyArn=self._context.policy.arn) + LOG.debug(response) + except ClientError: + LOG.exception('Error creating Role') + else: + LOG.info('role already exists') + + def delete(self): + response = None + if not self.exists(): + LOG.debug('role %s does not exist - skipping delete', self.name) + return response + + LOG.debug('deleting role %s', self.name) + try: + LOG.debug('First detach the policy from the role') + policy_arn = self._context.policy.arn + if policy_arn: + response = self._iam_client.call( + 'detach_role_policy', + RoleName=self.name, PolicyArn=policy_arn) + LOG.debug(response) + response = self._iam_client.call( + 'delete_role', RoleName=self.name) + LOG.debug(response) + except ClientError: + LOG.exception('role %s not found', self.name) + return response + + def status(self): + LOG.debug('getting status for role %s', self.name) + role = self._get_role() + if not role: + LOG.debug('role %s not found', self.name) + return role diff --git a/zappa/kappa/scripts/__init__.py b/zappa/kappa/scripts/__init__.py new file mode 100644 index 000000000..da1e4eba5 --- /dev/null +++ b/zappa/kappa/scripts/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright (c) 2014, 2015 Mitch Garnaat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/zappa/kappa/scripts/cli.py b/zappa/kappa/scripts/cli.py new file mode 100644 index 000000000..965105f7d --- /dev/null +++ b/zappa/kappa/scripts/cli.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright (c) 2014, 2015 Mitch Garnaat http://garnaat.org/ +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. + +from datetime import datetime +import base64 + +import click + +from kappa.context import Context + +pass_ctx = click.make_pass_decorator(Context) + + +@click.group() +@click.option( + '--config', + default='kappa.yml', + type=click.File('rb'), + envvar='KAPPA_CONFIG', + help='Name of config file (default is kappa.yml)' +) +@click.option( + '--debug/--no-debug', + default=False, + help='Turn on debugging output' +) +@click.option( + '--env', + default='dev', + help='Specify which environment to work with (default dev)' +) +@click.option( + '--record-path', + type=click.Path(exists=True, file_okay=False, writable=True), + help='Uses placebo to record AWS responses to this path' +) +@click.pass_context +def cli(ctx, config=None, debug=False, env=None, record_path=None): + ctx.obj = Context(config, env, debug, record_path) + + +@cli.command() +@pass_ctx +def deploy(ctx): + """Deploy the Lambda function and any policies and roles required""" + click.echo('deploying') + ctx.deploy() + click.echo('done') + + +@cli.command() +@click.argument('data_file', type=click.File('r')) +@pass_ctx +def invoke(ctx, data_file): + """Invoke the command synchronously""" + click.echo('invoking') + response = ctx.invoke(data_file.read()) + log_data = base64.b64decode(response['LogResult']) + click.echo(log_data) + click.echo('Response:') + click.echo(response['Payload'].read()) + click.echo('done') + + +@cli.command() +@pass_ctx +def test(ctx): + """Test the command synchronously""" + click.echo('testing') + ctx.test() + click.echo('done') + + +@cli.command() +@pass_ctx +def tail(ctx): + """Show the last 10 lines of the log file""" + click.echo('tailing logs') + for e in ctx.tail()[-10:]: + ts = datetime.utcfromtimestamp(e['timestamp'] // 1000).isoformat() + click.echo("{}: {}".format(ts, e['message'])) + click.echo('done') + + +@cli.command() +@pass_ctx +def status(ctx): + """Print a status of this Lambda function""" + status = ctx.status() + click.echo(click.style('Policy', bold=True)) + if status['policy']: + line = ' {} ({})'.format( + status['policy']['PolicyName'], + status['policy']['Arn']) + click.echo(click.style(line, fg='green')) + click.echo(click.style('Role', bold=True)) + if status['role']: + line = ' {} ({})'.format( + status['role']['RoleName'], + status['role']['Arn']) + click.echo(click.style(line, fg='green')) + click.echo(click.style('Function', bold=True)) + if status['function']: + line = ' {} ({})'.format( + status['function']['Configuration']['FunctionName'], + status['function']['Configuration']['FunctionArn']) + click.echo(click.style(line, fg='green')) + else: + click.echo(click.style(' None', fg='green')) + click.echo(click.style('Event Sources', bold=True)) + if status['event_sources']: + for event_source in status['event_sources']: + if event_source: + arn = event_source.get('EventSourceArn') + state = event_source.get('State', 'Enabled') + line = ' {}: {}'.format(arn, state) + click.echo(click.style(line, fg='green')) + else: + click.echo(click.style(' None', fg='green')) + + +@cli.command() +@pass_ctx +def delete(ctx): + """Delete the Lambda function and related policies and roles""" + click.echo('deleting') + ctx.delete() + click.echo('done') + + +@cli.command() +@click.argument('command', + type=click.Choice(['list', 'enable', 'disable'])) +@pass_ctx +def event_sources(ctx, command): + """List, enable, and disable event sources specified in the config file""" + if command == 'list': + click.echo('listing event sources') + event_sources = ctx.list_event_sources() + for es in event_sources: + click.echo('arn: {}'.format(es['arn'])) + click.echo('starting position: {}'.format(es['starting_position'])) + click.echo('batch size: {}'.format(es['batch_size'])) + click.echo('enabled: {}'.format(es['enabled'])) + click.echo('done') + elif command == 'enable': + click.echo('enabling event sources') + ctx.enable_event_sources() + click.echo('done') + elif command == 'disable': + click.echo('disabling event sources') + ctx.disable_event_sources() + click.echo('done')