Skip to content

Commit

Permalink
feat: create function to send batch message (#84)
Browse files Browse the repository at this point in the history
* feat: create function to send batch message

* refactor: change publish_message_batch and add docstring

* refactor: change publish_message_batch and add tests

* fix: fix publish message batch and add test

* refactor: refactor function send_batch_message code

* feat: add tests for failed cases

* feat: add tests for attributes

* feat: add tests for entries

* fix: handle empty body

* fix: add function build_message_attributes and test

* fix: remove improve send_message and tests

* fix: remove get_attributes
  • Loading branch information
gabriel-f-santos authored Apr 19, 2023
1 parent 27ae219 commit c63780e
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 1 deletion.
49 changes: 49 additions & 0 deletions serpens/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from datetime import datetime
from functools import wraps
from json.decoder import JSONDecodeError
import numbers
from typing import Any, Dict, Union
from uuid import uuid4

import boto3

Expand All @@ -15,6 +17,53 @@
logger = logging.getLogger(__name__)


def build_message_attributes(attributes):
message_attributes = {}

for key, value in attributes.items():
if isinstance(value, str):
attributes = {"StringValue": value, "DataType": "String"}
elif isinstance(value, numbers.Number):
attributes = {"StringValue": value, "DataType": "Number"}
elif isinstance(value, bytes):
attributes = {"BinaryValue": value, "DataType": "Binary"}
else:
raise ValueError(f"Invalid data type for attribute {value}")
message_attributes[key] = attributes
return message_attributes


def publish_message_batch(queue_url, messages, message_group_id=None):
client = boto3.client("sqs")
entries = []

params = {"QueueUrl": queue_url}

if queue_url.endswith(".fifo"):
params["MessageGroupId"] = message_group_id
params["MessageDeduplicationId"] = message_group_id

for message in messages:
message_attributes = {}

body = message["body"] or {}
if not isinstance(body, str):
body = json.dumps(body, cls=SchemaEncoder)

entry = {"Id": str(uuid4()), "MessageBody": body}

message_attributes = build_message_attributes(message.get("attributes", {}))

if message_attributes:
entry["MessageAttributes"] = message_attributes

entries.append(entry)

params["Entries"] = entries

return client.send_message_batch(**params)


def publish_message(queue_url, body, message_group_id=None):
client = boto3.client("sqs")

Expand Down
139 changes: 138 additions & 1 deletion tests/test_sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import unittest
from datetime import datetime
from unittest.mock import patch
from uuid import uuid4

import sqs
from sqs import Record
from sqs import Record, build_message_attributes


class TestPublishMessage(unittest.TestCase):
Expand Down Expand Up @@ -184,3 +185,139 @@ def test_create_record_with_body_as_str(self):
record = Record(data)

self.assertEqual(record.body, data["body"])


class TestPublishMessageBatch(unittest.TestCase):
def setUp(self) -> None:
self.patch_boto3 = patch("sqs.boto3")
self.mock_boto3 = self.patch_boto3.start()
self.response = {
"Successful": [],
"Failed": [],
}

self.messages = [
{
"body": "message 1",
"attributes": {
"key1": "value1",
"key2": 123,
"key3": b"binary data",
},
},
{
"body": "message 2",
"attributes": {
"key1": "value2",
"key2": "123",
"key3": 123456,
},
},
]

self.queue_url = "test.fifo"

def tearDown(self) -> None:
self.patch_boto3.stop()

def test_publish_message_succeeded(self):
response = self.response

for message in self.messages:
uuid = str(uuid4)
response["Successful"].append(
{
"Id": uuid,
"MessageId": uuid,
"MD5OfMessageBody": message["body"],
"MD5OfMessageAttributes": message["attributes"],
"MD5OfMessageSystemAttributes": "",
"SequenceNumber": "",
}
)

expected_entries = [
{
"MessageBody": "message 1",
"MessageAttributes": {
"key1": {"StringValue": "value1", "DataType": "String"},
"key2": {"StringValue": 123, "DataType": "Number"},
"key3": {"BinaryValue": b"binary data", "DataType": "Binary"},
},
},
{
"MessageBody": "message 2",
"MessageAttributes": {
"key1": {"StringValue": "value2", "DataType": "String"},
"key2": {"StringValue": "123", "DataType": "String"},
"key3": {"StringValue": 123456, "DataType": "Number"},
},
},
]

mock_publish_message_batch = self.mock_boto3.client.return_value.send_message_batch
mock_publish_message_batch.return_value = response

response = sqs.publish_message_batch(self.queue_url, self.messages)

call_entries = mock_publish_message_batch.call_args.kwargs["Entries"]

for entry in call_entries:
del entry["Id"]

self.assertEqual(mock_publish_message_batch.call_count, 1)
self.assertEqual(len(call_entries), 2)
self.assertEqual(response["Failed"], [])
self.assertListEqual(call_entries, expected_entries)

def test_publish_message_fail(self):
response = self.response

for message in self.messages:
uuid = str(uuid4())
response["Failed"].append(
{
"Id": uuid,
"MessageId": uuid,
"MD5OfMessageBody": message["body"],
"MD5OfMessageAttributes": message["attributes"],
"MD5OfMessageSystemAttributes": "",
"SequenceNumber": "",
}
)

mock_publish_message_batch = self.mock_boto3.client.return_value.send_message_batch
mock_publish_message_batch.return_value = response

response = sqs.publish_message_batch(self.queue_url, self.messages)

self.assertEqual(mock_publish_message_batch.call_count, 1)
self.assertEqual(len(response["Failed"]), 2)


class TestBuildAttributesFunction(unittest.TestCase):
def test_build_message_attributes(self):
attributes = {
"String": "this is a string",
"Number": 123,
"Binary": b"this is a byte",
}
expected_message_attributes = {
"String": {"StringValue": "this is a string", "DataType": "String"},
"Number": {"StringValue": 123, "DataType": "Number"},
"Binary": {"BinaryValue": b"this is a byte", "DataType": "Binary"},
}
message_Attributes = build_message_attributes(attributes)

self.assertDictEqual(expected_message_attributes, message_Attributes)

def test_build_attributes_exception(self):
value = datetime.now()
attributes = {
"Date": value,
}

message = f"Invalid data type for attribute {value}"

with self.assertRaisesRegex(ValueError, message):
build_message_attributes(attributes)

0 comments on commit c63780e

Please sign in to comment.