Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Support creating a FIFO queue if using FIFO SNS event #3357

Merged
merged 2 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions samtranslator/model/eventsources/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,15 @@
from samtranslator.model.eventsources import FUNCTION_EVETSOURCE_METRIC_PREFIX
from samtranslator.model.eventsources.pull import SQS
from samtranslator.model.exceptions import InvalidDocumentException, InvalidEventException, InvalidResourceException
from samtranslator.model.intrinsics import fnGetAtt, fnSub, is_intrinsic, make_conditional, make_shorthand, ref
from samtranslator.model.intrinsics import (
fnGetAtt,
fnSub,
get_logical_id_from_intrinsic,
is_intrinsic,
make_conditional,
make_shorthand,
ref,
)
from samtranslator.model.iot import IotTopicRule
from samtranslator.model.lambda_ import LambdaPermission
from samtranslator.model.s3 import S3Bucket
Expand Down Expand Up @@ -517,6 +525,8 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]
if not function:
raise TypeError("Missing required keyword argument: function")

intrinsics_resolver: IntrinsicsResolver = kwargs["intrinsics_resolver"]

# SNS -> Lambda
if not self.SqsSubscription:
subscription = self._inject_subscription(
Expand All @@ -534,7 +544,11 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]
# SNS -> SQS(Create New) -> Lambda
if isinstance(self.SqsSubscription, bool):
resources = [] # type: ignore[var-annotated]
queue = self._inject_sqs_queue(function) # type: ignore[no-untyped-call]

fifo_topic = self._check_fifo_topic(
get_logical_id_from_intrinsic(self.Topic), kwargs.get("original_template"), intrinsics_resolver
)
queue = self._inject_sqs_queue(function, fifo_topic) # type: ignore[no-untyped-call]
queue_arn = queue.get_runtime_attr("arn")
queue_url = queue.get_runtime_attr("queue_url")

Expand Down Expand Up @@ -591,6 +605,19 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def]
resources.append(subscription)
return resources

def _check_fifo_topic(
self,
topic_id: Optional[str],
template: Optional[Dict[str, Any]],
intrinsics_resolver: IntrinsicsResolver,
) -> bool:
if not topic_id or not template:
return False

resources = template.get("Resources", {})
properties = resources.get(topic_id, {}).get("Properties", {})
return intrinsics_resolver.resolve_parameter_refs(properties.get("FifoTopic", False)) # type: ignore[no-any-return]

def _inject_subscription( # noqa: PLR0913
self,
protocol: str,
Expand Down Expand Up @@ -621,8 +648,12 @@ def _inject_subscription( # noqa: PLR0913

return subscription

def _inject_sqs_queue(self, function): # type: ignore[no-untyped-def]
return SQSQueue(self.logical_id + "Queue", attributes=function.get_passthrough_resource_attributes())
def _inject_sqs_queue(self, function, fifo_topic=False): # type: ignore[no-untyped-def]
queue = SQSQueue(self.logical_id + "Queue", attributes=function.get_passthrough_resource_attributes())

if fifo_topic:
queue.FifoQueue = fifo_topic
return queue

def _inject_sqs_event_source_mapping(self, function, role, queue_arn, batch_size=None, enabled=None): # type: ignore[no-untyped-def]
event_source = SQS(
Expand Down
5 changes: 4 additions & 1 deletion samtranslator/model/sam_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P
kwargs["event_resources"],
intrinsics_resolver,
lambda_alias=lambda_alias,
original_template=kwargs.get("original_template"),
)
except InvalidEventException as e:
raise InvalidResourceException(self.logical_id, e.message) from e
Expand Down Expand Up @@ -775,13 +776,14 @@ def order_events(event: Tuple[str, Any]) -> Any:
return logical_id
return event_dict.get("Properties", {}).get("Path", logical_id)

def _generate_event_resources(
def _generate_event_resources( # noqa: PLR0913
GavinZZ marked this conversation as resolved.
Show resolved Hide resolved
self,
lambda_function: LambdaFunction,
execution_role: Optional[IAMRole],
event_resources: Any,
intrinsics_resolver: IntrinsicsResolver,
lambda_alias: Optional[LambdaAlias] = None,
original_template: Optional[Dict[str, Any]] = None,
) -> List[Any]:
"""Generates and returns the resources associated with this function's events.

Expand Down Expand Up @@ -811,6 +813,7 @@ def _generate_event_resources(
"function": lambda_alias or lambda_function,
"role": execution_role,
"intrinsics_resolver": intrinsics_resolver,
"original_template": original_template,
}

for name, resource in event_resources[logical_id].items():
Expand Down
8 changes: 7 additions & 1 deletion samtranslator/model/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,22 @@

from samtranslator.model import GeneratedProperty, PropertyType, Resource
from samtranslator.model.intrinsics import fnGetAtt, ref
from samtranslator.model.types import PassThrough


class SQSQueue(Resource):
resource_type = "AWS::SQS::Queue"
property_types: Dict[str, PropertyType] = {"Tags": GeneratedProperty()}
property_types: Dict[str, PropertyType] = {
"FifoQueue": GeneratedProperty(),
"Tags": GeneratedProperty(),
}
runtime_attrs = {
"queue_url": lambda self: ref(self.logical_id),
"arn": lambda self: fnGetAtt(self.logical_id, "Arn"),
}

FifoQueue: PassThrough


class SQSQueuePolicy(Resource):
resource_type = "AWS::SQS::QueuePolicy"
Expand Down
14 changes: 8 additions & 6 deletions tests/model/eventsources/test_sns_event_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ def setUp(self):
self.function.get_passthrough_resource_attributes = Mock()
self.function.get_passthrough_resource_attributes.return_value = {}

self.kwargs = {"function": self.function, "intrinsics_resolver": Mock()}

def test_to_cloudformation_returns_permission_and_subscription_resources(self):
resources = self.sns_event_source.to_cloudformation(function=self.function)
resources = self.sns_event_source.to_cloudformation(**self.kwargs)
self.assertEqual(len(resources), 2)
self.assertEqual(resources[0].resource_type, "AWS::Lambda::Permission")
self.assertEqual(resources[1].resource_type, "AWS::SNS::Subscription")
Expand All @@ -37,7 +39,7 @@ def test_to_cloudformation_passes_the_region(self):
region = "us-west-2"
self.sns_event_source.Region = region

resources = self.sns_event_source.to_cloudformation(function=self.function)
resources = self.sns_event_source.to_cloudformation(**self.kwargs)
self.assertEqual(len(resources), 2)
self.assertEqual(resources[1].resource_type, "AWS::SNS::Subscription")
subscription = resources[1]
Expand All @@ -51,7 +53,7 @@ def test_to_cloudformation_passes_the_filter_policy(self):
}
self.sns_event_source.FilterPolicy = filterPolicy

resources = self.sns_event_source.to_cloudformation(function=self.function)
resources = self.sns_event_source.to_cloudformation(**self.kwargs)
self.assertEqual(len(resources), 2)
self.assertEqual(resources[1].resource_type, "AWS::SNS::Subscription")
subscription = resources[1]
Expand All @@ -61,7 +63,7 @@ def test_to_cloudformation_passes_the_filter_policy_scope(self):
filterPolicyScope = "MessageAttributes"
self.sns_event_source.FilterPolicyScope = filterPolicyScope

resources = self.sns_event_source.to_cloudformation(function=self.function)
resources = self.sns_event_source.to_cloudformation(**self.kwargs)
self.assertEqual(len(resources), 2)
self.assertEqual(resources[1].resource_type, "AWS::SNS::Subscription")
subscription = resources[1]
Expand All @@ -71,7 +73,7 @@ def test_to_cloudformation_passes_the_redrive_policy(self):
redrive_policy = {"deadLetterTargetArn": "arn:aws:sqs:us-east-2:123456789012:MyDeadLetterQueue"}
self.sns_event_source.RedrivePolicy = redrive_policy

resources = self.sns_event_source.to_cloudformation(function=self.function)
resources = self.sns_event_source.to_cloudformation(**self.kwargs)
self.assertEqual(len(resources), 2)
self.assertEqual(resources[1].resource_type, "AWS::SNS::Subscription")
subscription = resources[1]
Expand All @@ -89,7 +91,7 @@ def test_to_cloudformation_when_sqs_subscription_disable(self):
sqsSubscription = False
self.sns_event_source.SqsSubscription = sqsSubscription

resources = self.sns_event_source.to_cloudformation(function=self.function)
resources = self.sns_event_source.to_cloudformation(**self.kwargs)
self.assertEqual(len(resources), 2)
self.assertEqual(resources[0].resource_type, "AWS::Lambda::Permission")
self.assertEqual(resources[1].resource_type, "AWS::SNS::Subscription")
Expand Down
36 changes: 36 additions & 0 deletions tests/translator/input/function_with_fifo_topic_event.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
Transform: AWS::Serverless-2016-10-31
Description: SNS Fifo
Globals:
Function:
Timeout: 3

Resources:
MyFifoTopic:
Type: AWS::SNS::Topic
Properties:
ContentBasedDeduplication: true
FifoTopic: true
TopicName: myFifoTopic.fifo

HelloWorldFunction:
Type: AWS::Serverless::Function
Properties:
InlineCode: |
exports.handler = async (event, context, callback) => {
return {
statusCode: 200,
body: 'Success'
}
}
Handler: index.handler
Runtime: nodejs16.x
Events:
FifoTrigger:
Type: SNS
Properties:
SqsSubscription: true
Topic: !Ref MyFifoTopic
Metadata:
DockerTag: nodejs12.x-v1
DockerContext: ./hello-world
Dockerfile: Dockerfile
142 changes: 142 additions & 0 deletions tests/translator/output/aws-cn/function_with_fifo_topic_event.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
{
"Description": "SNS Fifo",
"Resources": {
"HelloWorldFunction": {
"Metadata": {
"DockerContext": "./hello-world",
"DockerTag": "nodejs12.x-v1",
"Dockerfile": "Dockerfile"
},
"Properties": {
"Code": {
"ZipFile": "exports.handler = async (event, context, callback) => {\n return {\n statusCode: 200,\n body: 'Success'\n }\n}\n"
},
"Handler": "index.handler",
"Role": {
"Fn::GetAtt": [
"HelloWorldFunctionRole",
"Arn"
]
},
"Runtime": "nodejs16.x",
"Tags": [
{
"Key": "lambda:createdBy",
"Value": "SAM"
}
],
"Timeout": 3
},
"Type": "AWS::Lambda::Function"
},
"HelloWorldFunctionFifoTrigger": {
"Properties": {
"Endpoint": {
"Fn::GetAtt": [
"HelloWorldFunctionFifoTriggerQueue",
"Arn"
]
},
"Protocol": "sqs",
"TopicArn": {
"Ref": "MyFifoTopic"
}
},
"Type": "AWS::SNS::Subscription"
},
"HelloWorldFunctionFifoTriggerEventSourceMapping": {
"Properties": {
"BatchSize": 10,
"Enabled": true,
"EventSourceArn": {
"Fn::GetAtt": [
"HelloWorldFunctionFifoTriggerQueue",
"Arn"
]
},
"FunctionName": {
"Ref": "HelloWorldFunction"
}
},
"Type": "AWS::Lambda::EventSourceMapping"
},
"HelloWorldFunctionFifoTriggerQueue": {
"Properties": {
"FifoQueue": true
},
"Type": "AWS::SQS::Queue"
},
"HelloWorldFunctionFifoTriggerQueuePolicy": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": "sqs:SendMessage",
"Condition": {
"ArnEquals": {
"aws:SourceArn": {
"Ref": "MyFifoTopic"
}
}
},
"Effect": "Allow",
"Principal": "*",
"Resource": {
"Fn::GetAtt": [
"HelloWorldFunctionFifoTriggerQueue",
"Arn"
]
}
}
],
"Version": "2012-10-17"
},
"Queues": [
{
"Ref": "HelloWorldFunctionFifoTriggerQueue"
}
]
},
"Type": "AWS::SQS::QueuePolicy"
},
"HelloWorldFunctionRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
}
}
],
"Version": "2012-10-17"
},
"ManagedPolicyArns": [
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
"arn:aws-cn:iam::aws:policy/service-role/AWSLambdaSQSQueueExecutionRole"
],
"Tags": [
{
"Key": "lambda:createdBy",
"Value": "SAM"
}
]
},
"Type": "AWS::IAM::Role"
},
"MyFifoTopic": {
"Properties": {
"ContentBasedDeduplication": true,
"FifoTopic": true,
"TopicName": "myFifoTopic.fifo"
},
"Type": "AWS::SNS::Topic"
}
}
}
Loading