Skip to content

Commit

Permalink
[sc115424] add send messages by batch to sqs sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
mtakerrabet committed Dec 4, 2023
1 parent b41265d commit d4d2e20
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,67 @@ public String sendMessage(String queueName, String messageBody, Consumer<SendMes
return messageId;
}

/**
*
* @param queueName the name of the queue
* @param messagesList the list of the messages bodies with their Ids
* @param delaySeconds the delay in seconds
* @param groupIds list of group ids with messages Ids for FIFO queues
* @return
*/
public List<String> sendMessages(String queueName, Map<String, String> messagesList, int delaySeconds, Map<String, String> groupIds) {
String queueUrl = getQueueUrl(queueName);

SendMessageBatchRequest.Builder request = SendMessageBatchRequest.builder().queueUrl(queueUrl);
List<SendMessageBatchRequestEntry> entries = new ArrayList<>();
for (Map.Entry<String, String> messageBody : messagesList.entrySet()) {
SendMessageBatchRequestEntry.Builder entry = SendMessageBatchRequestEntry.builder()
.id(messageBody.getKey())
.messageBody(messageBody.getValue());
if (delaySeconds > 0) {
entry.delaySeconds(delaySeconds);
}
if (groupIds != null && groupIds.containsKey(messageBody.getKey())) {
entry.messageGroupId(groupIds.get(messageBody.getKey()));
}
entries.add(entry.build());
}

request.entries(entries);

List<String> messagesIds = client.sendMessageBatch(request.build()).successful().stream()
.map(SendMessageBatchResultEntry::messageId).collect(Collectors.toList());
LOGGER.debug("Messages sent (messagesIds={})", messagesIds);
return messagesIds;
}

/**
* @param queueName
* @param messagesList
* @return
*/
public List<String> sendMessages(String queueName, Map<String, String> messagesList, Consumer<SendMessageBatchRequest.Builder> messageConfiguration) {
String queueUrl = getQueueUrl(queueName);

SendMessageBatchRequest.Builder request = SendMessageBatchRequest.builder().queueUrl(queueUrl);
List<SendMessageBatchRequestEntry> entries = new ArrayList<>();
for (Map.Entry<String, String> messageBody : messagesList.entrySet()) {
SendMessageBatchRequestEntry.Builder entry = SendMessageBatchRequestEntry.builder()
.id(messageBody.getKey())
.messageBody(messageBody.getValue());
entries.add(entry.build());
}

request.entries(entries);

messageConfiguration.accept(request);

List<String> messagesIds = client.sendMessageBatch(request.build()).successful().stream()
.map(SendMessageBatchResultEntry::messageId).collect(Collectors.toList());
LOGGER.debug("Messages sent (messagesIds={})", messagesIds);
return messagesIds;
}

void assertDefaultQueueName() {
if (StringUtils.isEmpty(configuration.getQueue())) {
throw new IllegalStateException("Queue not configured");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

import java.util.List;
Expand Down Expand Up @@ -288,4 +289,84 @@ default String sendMessage(String messageBody, Consumer<SendMessageRequest.Build
* @return message id
*/
String sendMessage(String queueName, String messageBody, Consumer<SendMessageRequest.Builder> messageConfiguration);

/**
* Send list of messages immediately
* @param queueName the name of the queue
* @param messagesList the list of the messages bodies
* @return the message id of the message sent
*/
default List<String> sendMessages(String queueName, Map<String, String> messagesList) {
return sendMessages(queueName, messagesList, 0);
}

/**
* Send list of messages with given delay.
* @param queueName the name of the queue
* @param messagesList the list of the messages bodies with their Ids
* @param delaySeconds the delay in seconds
* @param groupIds list of group ids with messages Ids for FIFO queues
* @return the message id of the message sent
*/
List<String> sendMessages(String queueName, Map<String, String> messagesList, int delaySeconds, Map<String, String> groupIds);

/**
* Send list of messages with given delay.
* @param queueName the name of the queue
* @param messagesList the bodies of the messages with their Ids
* @param delaySeconds the delay in seconds
* @return the message id of the message sent
*/
default List<String> sendMessages(String queueName, Map<String, String> messagesList, int delaySeconds) {
return sendMessages(queueName, messagesList, delaySeconds, null);
}

/**
* Send list of messages in default queue immediately
* @param messagesList the bodies of the messages with their Ids
* @return the message id of the message sent
*/
default List<String> sendMessages(Map<String, String> messagesList) {
return sendMessages(getDefaultQueueName(), messagesList);
}

/**
* Send message in the default queue with given delay.
* @param messagesList the bodies of the messages with their Ids
* @param delaySeconds the delay in seconds
* @return the message id of the message sent
*/
default List<String> sendMessages(Map<String, String> messagesList, int delaySeconds) {
return sendMessages(getDefaultQueueName(), messagesList, delaySeconds);
}

/**
* Send message with given delay.
* @param messagesList the bodies of the messages with their Ids
* @param delaySeconds the delay in seconds
* @param groupIds list of group ids with messages Ids for FIFO queues
* @return the message id of the message sent
*/
default List<String> sendMessages(Map<String, String> messagesList, int delaySeconds, Map<String, String> groupIds) {
return sendMessages(getDefaultQueueName(), messagesList, delaySeconds, groupIds);
}

/**
* Sends message with additional configuration into the default queue.
* @param messagesList the bodies of the messages with their Ids
* @param messageConfiguration additional configuration
* @return message id
*/
default List<String> sendMessages(Map<String, String> messagesList, Consumer<SendMessageBatchRequest.Builder> messageConfiguration) {
return sendMessages(getDefaultQueueName(), messagesList, messageConfiguration);
}

/**
* Sends message with additional configuration into the given queue.
* @param queueName name of the queue
* @param messagesList the bodies of the messages with their Ids
* @param messageConfiguration additional configuration
* @return message id
*/
List<String> sendMessages(String queueName, Map<String, String> messagesList, Consumer<SendMessageBatchRequest.Builder> messageConfiguration);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import space.jasan.support.groovy.closure.ConsumerWithDelegate;

import java.util.List;
import java.util.Map;

/**
* Amazon SQS services
*
Expand Down Expand Up @@ -69,4 +72,39 @@ public static String sendMessage(
return self.sendMessage(queueName, messageBody, ConsumerWithDelegate.create(messageConfiguration));
}

/**
* Sends messages with additional configuration into the default queue.
*
* @param messagesList the list of the messages bodies with their Ids
* @param messageConfiguration additional configuration
* @return message id
*/
public static List<String> sendMessages(
SimpleQueueService self,
Map<String, String> messagesList,
@DelegatesTo(value = SendMessageRequest.Builder.class, strategy = Closure.DELEGATE_FIRST)
@ClosureParams(value = FromString.class, options = "software.amazon.awssdk.services.sqs.model.SendMessageRequest.Builder")
Closure<?> messageConfiguration
) {
return self.sendMessages(self.getDefaultQueueName(), messagesList, ConsumerWithDelegate.create(messageConfiguration));
}

/**
* Sends messages with additional configuration into the given queue.
*
* @param queueName name of the queue
* @param messagesList the list of the messages bodies with their Ids
* @param messageConfiguration additional configuration
* @return message id
*/
public static List<String> sendMessages(
SimpleQueueService self,
String queueName,
Map<String, String> messagesList,
@DelegatesTo(value = SendMessageRequest.Builder.class, strategy = Closure.DELEGATE_FIRST)
@ClosureParams(value = FromString.class, options = "software.amazon.awssdk.services.sqs.model.SendMessageRequest.Builder")
Closure<?> messageConfiguration
) {
return self.sendMessages(queueName, messagesList, ConsumerWithDelegate.create(messageConfiguration));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;

import javax.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -38,6 +40,13 @@ public class SimpleQueueServiceTest {
public static final String TEST_QUEUE = "TestQueueJava";

private static final String DATA = "Hello World";
private static final Map<String, String> DATA_BATCH = new HashMap<>() {
{
put("1", "Hello World");
put("2", "It's sunny today");
put("3", "I'm happy");
}
};

// tag::setup[]
@Inject SimpleQueueService service; // <3>
Expand Down Expand Up @@ -87,4 +96,58 @@ public void testWorkingWithQueue() {
// end::delete-queue[]
}

@Test
public void testBatchMessagesWithQueue() {
// tag::new-queue[]
String queueUrl = service.createQueue(TEST_QUEUE); // <1>

assertTrue(service.listQueueUrls().contains(queueUrl)); // <2>
// end::new-queue[]

assertNotNull(queueUrl);

// tag::describe-queue[]
Map<QueueAttributeName, String> queueAttributes = service
.getQueueAttributes(TEST_QUEUE); // <1>

assertEquals("0", queueAttributes
.get(QueueAttributeName.DELAY_SECONDS)); // <2>
// end::describe-queue[]

// tag::messages[]
List<String> msgsIds = service.sendMessages(DATA_BATCH); // <1>

assertNotNull(msgsIds);
assertEquals(3, msgsIds.size());

List<Message> messages = service.receiveMessages(3);
assertEquals(3, messages.size());// <2>
Message first = messages.get(0);
Message second = messages.get(1);
Message third = messages.get(2);

assertEquals(DATA_BATCH.get("1"), first.body()); // <3>
assertEquals(msgsIds.get(0), first.messageId());
service.deleteMessage(first.receiptHandle());

assertEquals(DATA_BATCH.get("2"), second.body()); // <3>
assertEquals(msgsIds.get(1), second.messageId());
service.deleteMessage(second.receiptHandle());

assertEquals(DATA_BATCH.get("3"), third.body()); // <3>
assertEquals(msgsIds.get(2), third.messageId());
service.deleteMessage(third.receiptHandle());
// end::messages[]

List<Message> nextMessages = service.receiveMessages();

assertEquals(0, nextMessages.size());

// tag::delete-queue[]
service.deleteQueue(TEST_QUEUE); // <1>

assertFalse(service.listQueueUrls().contains(queueUrl)); // <2>
// end::delete-queue[]
}

}

0 comments on commit d4d2e20

Please sign in to comment.