From 1f26ad4618fbe008842e93a3d325536681dda5ac Mon Sep 17 00:00:00 2001 From: leoaugustov Date: Sat, 26 Aug 2023 10:51:32 -0300 Subject: [PATCH 1/9] Refactoring queue validation --- src/queue-to-file.js | 7 +------ src/resource-validator.js | 4 ++-- test/queue-to-file.test.js | 8 ++++---- test/resource-validator.test.js | 28 ++++++++++++++++++++++++++++ 4 files changed, 35 insertions(+), 12 deletions(-) diff --git a/src/queue-to-file.js b/src/queue-to-file.js index 9ba269e..d173b8a 100644 --- a/src/queue-to-file.js +++ b/src/queue-to-file.js @@ -7,7 +7,7 @@ import resourceValidator from "./resource-validator"; export default async ({ file, queueUrl, endpointUrl: endpoint }) => { const sqsClient = new SQSClient({ endpoint }); - if (!(await isQueueValid(sqsClient, queueUrl))) { + if (!(await resourceValidator.validateQueue(sqsClient, queueUrl))) { return; } @@ -23,8 +23,3 @@ export default async ({ file, queueUrl, endpointUrl: endpoint }) => { await consumeMessages(sqsClient, queueUrl, messageConsumer); logger.success(`Finished queue-to-file successfully. ${totalMessagesSaved} messages saved to file`); }; - -async function isQueueValid(sqsClient, queueUrl) { - const resourcesToValidate = [{ type: "queue", value: queueUrl }]; - return await resourceValidator.validate(resourcesToValidate, sqsClient); -} diff --git a/src/resource-validator.js b/src/resource-validator.js index 9dc498a..f987767 100644 --- a/src/resource-validator.js +++ b/src/resource-validator.js @@ -57,7 +57,7 @@ async function validateQueue(sqsClient, queueUrl) { if (await isExistingQueue(sqsClient, queueName)) { return true; } - logger.error("(ERROR) Some of the specified queues do not exist or are not accessible"); + logger.error(`(ERROR) Queue ${queueUrl} does not exist or is not accessible`); return false; } @@ -76,4 +76,4 @@ function mapPermissionToFsMode(permissionFlag) { throw new Error("Unknown file permission"); } -export default { validate }; +export default { validate, validateQueue }; diff --git a/test/queue-to-file.test.js b/test/queue-to-file.test.js index 98a5b44..e1c9e3d 100644 --- a/test/queue-to-file.test.js +++ b/test/queue-to-file.test.js @@ -10,7 +10,7 @@ jest.mock('fs', () => ({ createWriteStream: jest.fn() })); jest.mock('resource-validator', () => ({ - validate: jest.fn() + validateQueue: jest.fn() })); it('should consume messages from queue and save them in file', async () => { @@ -20,7 +20,7 @@ it('should consume messages from queue and save them in file', async () => { const lineWriter = { write: jest.fn() }; fs.createWriteStream.mockReturnValueOnce(lineWriter); - resourceValidator.validate.mockReturnValueOnce(true); + resourceValidator.validateQueue.mockReturnValueOnce(true); await queueToFile({ queueUrl, file, endpointUrl }); @@ -46,7 +46,7 @@ it('should create consumer that returns false', async () => { const lineWriter = { write: jest.fn() }; fs.createWriteStream.mockReturnValueOnce(lineWriter); - resourceValidator.validate.mockReturnValueOnce(true); + resourceValidator.validateQueue.mockReturnValueOnce(true); await queueToFile({ queueUrl, file }); @@ -60,7 +60,7 @@ it('should not consume messages when queue is not valid', async () => { const queueUrl = 'https://sqs.us-east-1.amazonaws.com/00000000/test-queue'; const file = 'path/filename.csv'; - resourceValidator.validate.mockReturnValueOnce(false); + resourceValidator.validateQueue.mockReturnValueOnce(false); await queueToFile({ queueUrl, file }); diff --git a/test/resource-validator.test.js b/test/resource-validator.test.js index df3a3e4..c8ca565 100644 --- a/test/resource-validator.test.js +++ b/test/resource-validator.test.js @@ -144,3 +144,31 @@ describe('validate', () => { expect(isExistingFunction.mock.calls.length).toBe(0); }); }); + +describe('validateQueue', () => { + it('should extract queue name from queue URL and return true when queue exists', async () => { + const sqsClient = { send: jest.fn() }; + isExistingQueue.mockReturnValueOnce(true); + + const valid = await resourceValidator.validateQueue( + sqsClient, + "https://sqs.us-east-1.amazonaws.com/00000000/test-queue" + ); + + expect(isExistingQueue).toBeCalledWith(sqsClient, "test-queue"); + expect(valid).toBe(true); + }); + + it('should extract queue name from queue URL and return false when queue does not exist', async () => { + const sqsClient = { send: jest.fn() }; + isExistingQueue.mockReturnValueOnce(false); + + const valid = await resourceValidator.validateQueue( + sqsClient, + "https://sqs.us-east-1.amazonaws.com/00000000/test-queue" + ); + + expect(isExistingQueue).toBeCalledWith(sqsClient, "test-queue"); + expect(valid).toBe(false); + }); +}); From 31ae10df1213b910f1f427f92bcf793e7910aac9 Mon Sep 17 00:00:00 2001 From: leoaugustov Date: Wed, 14 Feb 2024 11:53:59 -0300 Subject: [PATCH 2/9] Add purge-queue command --- README.md | 24 +++++++--- src/index.js | 12 +++++ src/purge-queue.js | 27 ++++++++++++ test/integration/purge-queue.test.js | 44 +++++++++++++++++++ test/purge-queue.test.js | 65 ++++++++++++++++++++++++++++ 5 files changed, 165 insertions(+), 7 deletions(-) create mode 100644 src/purge-queue.js create mode 100644 test/integration/purge-queue.test.js create mode 100644 test/purge-queue.test.js diff --git a/README.md b/README.md index 9a06386..46b49a4 100644 --- a/README.md +++ b/README.md @@ -22,8 +22,8 @@ brew upgrade dlq-utils - Invoke a function using messages from a queue - Move or copy messages from one queue to another +- Delete messages from a queue based on a regular expression - Template a message before sending it to a queue or invoking a function -- (soon) Filter messages before sending them to a queue or invoking a function - Save messages from a queue to a text file - Read lines from a text file and send them as messages to a queue @@ -34,28 +34,35 @@ It's necessary to specify the environment variable `AWS_PROFILE` with the [named Invoke an AWS Lambda function with all messages from an Amazon SQS queue, being able to transform them before invoking the function. ```shell -AWS_PROFILE=configured-profile dlq-utils queue-to-lambda --queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" --function-name "some-lambda-function" +AWS_PROFILE=configured-profile dlq-utils queue-to-lambda -s "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" -d "some-lambda-function" ``` #### `file-to-queue` Read a text file to send each line as a message to an Amazon SQS queue. ```shell -AWS_PROFILE=configured-profile dlq-utils file-to-queue --queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" --file "/Users/myuser/Documents/some-file.txt" +AWS_PROFILE=configured-profile dlq-utils file-to-queue -s "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" -d "/Users/myuser/Documents/some-file.txt" ``` #### `queue-to-file` Consume all messages from an Amazon SQS queue to save them in a text file. ```shell -AWS_PROFILE=configured-profile dlq-utils queue-to-file --queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" --file "/Users/myuser/Documents/some-file.txt" +AWS_PROFILE=configured-profile dlq-utils queue-to-file -s "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" -d "/Users/myuser/Documents/some-file.txt" ``` #### `queue-to-queue` Move all messages from an Amazon SQS queue to another one, being able to transform them. ```shell -AWS_PROFILE=configured-profile dlq-utils queue-to-queue --source-queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/source-queue" --dest-queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/dest-queue" +AWS_PROFILE=configured-profile dlq-utils queue-to-queue -s "https://sqs.us-east-1.amazonaws.com/000000000000/source-queue" -d "https://sqs.us-east-1.amazonaws.com/000000000000/dest-queue" +``` + +#### `purge-queue` +Purge a queue conditionally based on a regular expression tested on the message body. + +```shell +AWS_PROFILE=configured-profile dlq-utils purge-queue --queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" --regex ".foo" ``` For full documentation run `dlq-utils help [command]`. @@ -74,12 +81,15 @@ Next, you need to run the command below inside the repository folder to locally npx link . ``` -After that, every time you make a change in the code base you need to rebuild the project to update the CLI behavior. +After that, every time you make a change in the code base you need to rebuild the project to update the CLI behavior. To execute commands use the prefix `npx` and do not forget the parameter `--endpoint-url`, for example: + +```shell +npx dlq-utils queue-to-lambda -s "http://localhost:9324/000000000000/some-queue" -d "some-lambda-function" --endpoint-url "http://localhost:9324" +``` ## Roadmap Here you will find a list of features I want to include in the project: -- ✨ Add the ability to filter out messages with a regex - 🔧 Add tooling to facilitate local testing - 🔧 Add hot reload to automatically rebuild the project and improve the development experience diff --git a/src/index.js b/src/index.js index 91963f1..c3950d4 100755 --- a/src/index.js +++ b/src/index.js @@ -7,6 +7,7 @@ import fileToQueue from "./file-to-queue"; import queueToFile from "./queue-to-file"; import queueToLambda from "./queue-to-lambda"; import queueToQueue from "./queue-to-queue"; +import purgeQueue from "./purge-queue"; const program = new Command(); @@ -78,4 +79,15 @@ program ) .action(queueToQueue); +program + .command("purge-queue") + .description("Clear queue conditionally based on a regular expression") + .requiredOption("-r --regex ", "The regex to select the messages that should be deleted") + .requiredOption("-q --queue-url ", "The URL of the queue that contains the messages") + .option( + "--endpoint-url ", + "Just like in aws-cli commands, this is only required when using a local version of SQS and Lambda (e.g. LocalStack)" + ) + .action(purgeQueue); + program.parse(); diff --git a/src/purge-queue.js b/src/purge-queue.js new file mode 100644 index 0000000..004db29 --- /dev/null +++ b/src/purge-queue.js @@ -0,0 +1,27 @@ +import logger from "./logger"; +import { SQSClient } from "@aws-sdk/client-sqs"; +import { consumeMessages } from "./sqs-consumer"; +import resourceValidator from "./resource-validator"; + +export default async ({ regex: condition, queueUrl, endpointUrl: endpoint }) => { + const sqsClient = new SQSClient({ endpoint }); + + if (!(await resourceValidator.validateQueue(sqsClient, queueUrl))) { + return; + } + const regex = new RegExp(condition); + + let totalMessages = 0; + let totalMessagesDeleted = 0; + const messageConsumer = async (message) => { + totalMessages++; + const messageBody = message.body; + if (regex.test(messageBody)) { + totalMessagesDeleted++; + return true; + } + return false; + }; + await consumeMessages(sqsClient, queueUrl, messageConsumer); + logger.success(`Finished purge-queue successfully. ${totalMessagesDeleted} of ${totalMessages} messages were deleted`); +}; diff --git a/test/integration/purge-queue.test.js b/test/integration/purge-queue.test.js new file mode 100644 index 0000000..8101cf5 --- /dev/null +++ b/test/integration/purge-queue.test.js @@ -0,0 +1,44 @@ +import purgeQueue from "purge-queue"; + +const QUEUE_NAME = 'events-queue'; + +let sqsContainer; +let sqsClient; + +beforeAll(async () => { + sqsContainer = await setUpSqsService(); + sqsClient = createSqsClient(); + + await createQueue(sqsClient, QUEUE_NAME); +}); + +afterAll(async () => { + await sqsContainer.stop(); +}); + +it('should consume messages from queue and delete only those that match the regex', async () => { + const queueUrl = getQueueUrl(QUEUE_NAME); + + const messages = await sendTestMessages(sqsClient, QUEUE_NAME); + + messagesThatShouldBeDeleted = messages.filter(message => message.slice(-1) % 2); + messagesThatShouldBeKept = messages.filter(message => ! message.slice(-1) % 2); + + await purgeQueue({ + endpointUrl: SQS_ENDPOINT_URL, + regex: `^(${messagesThatShouldBeDeleted.join(" | ")})$`, + queueUrl + }); + + await assertQueueContainsMessages(sqsClient, QUEUE_NAME, messagesThatShouldBeKept); +}); + +it('should not throw exception when queue does not exist', async () => { + const queueUrl = getQueueUrl("nonexistent"); + + await purgeQueue({ + endpointUrl: SQS_ENDPOINT_URL, + regex: ".", + queueUrl + }); +}); diff --git a/test/purge-queue.test.js b/test/purge-queue.test.js new file mode 100644 index 0000000..43eb03a --- /dev/null +++ b/test/purge-queue.test.js @@ -0,0 +1,65 @@ +import purgeQueue from "purge-queue"; +import { consumeMessages } from "sqs-consumer"; +import resourceValidator from "resource-validator"; +jest.mock('sqs-consumer', () => ({ + consumeMessages: jest.fn() +})); +jest.mock('resource-validator', () => ({ + validateQueue: jest.fn() +})); + +it('should not consume messages when queue is not valid', async () => { + const queueUrl = 'https://sqs.us-east-1.amazonaws.com/00000000/test-queue'; + + resourceValidator.validateQueue.mockReturnValueOnce(false); + + await purgeQueue({ condition: ".", queueUrl }); + + expect(consumeMessages.mock.calls.length).toBe(0); +}); + +it('should use consumer that returns true when message matches condition', async () => { + const queueUrl = 'https://sqs.us-east-1.amazonaws.com/00000000/test-queue'; + const endpointUrl = 'http://localhost:4566'; + + resourceValidator.validateQueue.mockReturnValueOnce(true); + + await purgeQueue({ regex: "\"field\":\\s*\"value\"", queueUrl, endpointUrl }); + + expect(consumeMessages.mock.calls.length).toBe(1); + expect(consumeMessages.mock.calls[0][1]).toEqual(queueUrl); + + const createdSqsClient = consumeMessages.mock.calls[0][0]; + const resolvedEndpoint = await createdSqsClient.config.endpoint(); + + expect(resolvedEndpoint.protocol).toBe('http:'); + expect(resolvedEndpoint.hostname).toBe('localhost'); + expect(resolvedEndpoint.port).toBe(4566); + + const messageBody = '{ "field": "value", "other": 2 }'; + const shouldDeleteMessage = await consumeMessages.mock.calls[0][2]({ body: messageBody }); + expect(shouldDeleteMessage).toBe(true); +}); + +it('should use consumer that returns false when message does not match condition', async () => { + const queueUrl = 'https://sqs.us-east-1.amazonaws.com/00000000/test-queue'; + const endpointUrl = 'http://localhost:4566'; + + resourceValidator.validateQueue.mockReturnValueOnce(true); + + await purgeQueue({ regex: "\"field\":\\s*\"value\"", queueUrl, endpointUrl }); + + expect(consumeMessages.mock.calls.length).toBe(1); + expect(consumeMessages.mock.calls[0][1]).toEqual(queueUrl); + + const createdSqsClient = consumeMessages.mock.calls[0][0]; + const resolvedEndpoint = await createdSqsClient.config.endpoint(); + + expect(resolvedEndpoint.protocol).toBe('http:'); + expect(resolvedEndpoint.hostname).toBe('localhost'); + expect(resolvedEndpoint.port).toBe(4566); + + const messageBody = '{ "field": "potato", "other": 2 }'; + const shouldDeleteMessage = await consumeMessages.mock.calls[0][2]({ body: messageBody }); + expect(shouldDeleteMessage).toBe(false); +}); From a736d996f5d7501d8ad1355edcd775385ce2a293 Mon Sep 17 00:00:00 2001 From: leoaugustov Date: Wed, 14 Feb 2024 12:01:03 -0300 Subject: [PATCH 3/9] Fix --- src/purge-queue.js | 4 +++- test/integration/purge-queue.test.js | 5 ++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/purge-queue.js b/src/purge-queue.js index 004db29..f4a6fb3 100644 --- a/src/purge-queue.js +++ b/src/purge-queue.js @@ -23,5 +23,7 @@ export default async ({ regex: condition, queueUrl, endpointUrl: endpoint }) => return false; }; await consumeMessages(sqsClient, queueUrl, messageConsumer); - logger.success(`Finished purge-queue successfully. ${totalMessagesDeleted} of ${totalMessages} messages were deleted`); + logger.success( + `Finished purge-queue successfully. ${totalMessagesDeleted} of ${totalMessages} messages were deleted` + ); }; diff --git a/test/integration/purge-queue.test.js b/test/integration/purge-queue.test.js index 8101cf5..1cfc617 100644 --- a/test/integration/purge-queue.test.js +++ b/test/integration/purge-queue.test.js @@ -18,11 +18,10 @@ afterAll(async () => { it('should consume messages from queue and delete only those that match the regex', async () => { const queueUrl = getQueueUrl(QUEUE_NAME); - const messages = await sendTestMessages(sqsClient, QUEUE_NAME); - messagesThatShouldBeDeleted = messages.filter(message => message.slice(-1) % 2); - messagesThatShouldBeKept = messages.filter(message => ! message.slice(-1) % 2); + const messagesThatShouldBeDeleted = messages.filter(message => message.slice(-1) % 2); + const messagesThatShouldBeKept = messages.filter(message => ! message.slice(-1) % 2); await purgeQueue({ endpointUrl: SQS_ENDPOINT_URL, From 796aef992f7f9ebcedbf60549408197c2fc5c66d Mon Sep 17 00:00:00 2001 From: leoaugustov Date: Wed, 14 Feb 2024 12:05:35 -0300 Subject: [PATCH 4/9] Fix --- test/integration/purge-queue.test.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/integration/purge-queue.test.js b/test/integration/purge-queue.test.js index 1cfc617..ef48a2b 100644 --- a/test/integration/purge-queue.test.js +++ b/test/integration/purge-queue.test.js @@ -21,7 +21,7 @@ it('should consume messages from queue and delete only those that match the rege const messages = await sendTestMessages(sqsClient, QUEUE_NAME); const messagesThatShouldBeDeleted = messages.filter(message => message.slice(-1) % 2); - const messagesThatShouldBeKept = messages.filter(message => ! message.slice(-1) % 2); + const messagesThatShouldBeKept = messages.filter(message => !(message.slice(-1) % 2)); await purgeQueue({ endpointUrl: SQS_ENDPOINT_URL, @@ -29,6 +29,7 @@ it('should consume messages from queue and delete only those that match the rege queueUrl }); + await waitVisibilityTimeout(); await assertQueueContainsMessages(sqsClient, QUEUE_NAME, messagesThatShouldBeKept); }); From 1fcb5a0e2f996f1f8edaf4f69ec925766e94febd Mon Sep 17 00:00:00 2001 From: leoaugustov Date: Wed, 14 Feb 2024 13:37:22 -0300 Subject: [PATCH 5/9] Fix --- test/integration/purge-queue.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/purge-queue.test.js b/test/integration/purge-queue.test.js index ef48a2b..8ed3528 100644 --- a/test/integration/purge-queue.test.js +++ b/test/integration/purge-queue.test.js @@ -25,7 +25,7 @@ it('should consume messages from queue and delete only those that match the rege await purgeQueue({ endpointUrl: SQS_ENDPOINT_URL, - regex: `^(${messagesThatShouldBeDeleted.join(" | ")})$`, + regex: `^(${messagesThatShouldBeDeleted.join("|")})$`, queueUrl }); From 45ded47c3cd5188a20ec49bddb6e290a221cbe7c Mon Sep 17 00:00:00 2001 From: leoaugustov Date: Wed, 14 Feb 2024 18:12:25 -0300 Subject: [PATCH 6/9] Improve README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 46b49a4..2aee407 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,7 @@ Next, you need to run the command below inside the repository folder to locally npx link . ``` -After that, every time you make a change in the code base you need to rebuild the project to update the CLI behavior. To execute commands use the prefix `npx` and do not forget the parameter `--endpoint-url`, for example: +After that, every time you make a change in the code base you need to rebuild the project to update the CLI behavior. To execute commands use the prefix `npx` and do not forget the parameter `--endpoint-url`: ```shell npx dlq-utils queue-to-lambda -s "http://localhost:9324/000000000000/some-queue" -d "some-lambda-function" --endpoint-url "http://localhost:9324" @@ -91,5 +91,5 @@ npx dlq-utils queue-to-lambda -s "http://localhost:9324/000000000000/some-queue" Here you will find a list of features I want to include in the project: -- 🔧 Add tooling to facilitate local testing +- 🔧 Add tooling to facilitate local testing (RODAR LAMBDA LOCAL: https://github.com/lambci/docker-lambda; https://docs.localstack.cloud/user-guide/aws/lambda/) - 🔧 Add hot reload to automatically rebuild the project and improve the development experience From 362b09d8492a6a3cdb2f7d9ce107f4b100e56612 Mon Sep 17 00:00:00 2001 From: leoaugustov Date: Wed, 14 Feb 2024 18:14:55 -0300 Subject: [PATCH 7/9] Fix queue-to-file option alias --- src/index.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/index.js b/src/index.js index c3950d4..97cf3e5 100755 --- a/src/index.js +++ b/src/index.js @@ -54,8 +54,8 @@ program "Consume all messages from a queue (without deleting) to save them in a text file. " + "If the file already exists it will be overwritten" ) - .requiredOption("-s --file ", "The full name of the text file where the messages should be saved") - .requiredOption("-d --queue-url ", "The URL of the queue that contains the messages") + .requiredOption("-s --queue-url ", "The URL of the queue that contains the messages") + .requiredOption("-d --file ", "The full name of the text file where the messages should be saved") .option( "--endpoint-url ", "Just like in aws-cli commands, this is only required when using a local version of SQS" From 01d1d5d843c906766c60307ac049c6292bc341ad Mon Sep 17 00:00:00 2001 From: leoaugustov Date: Wed, 14 Feb 2024 18:16:00 -0300 Subject: [PATCH 8/9] Fix README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2aee407..5d6909a 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ AWS_PROFILE=configured-profile dlq-utils queue-to-lambda -s "https://sqs.us-east Read a text file to send each line as a message to an Amazon SQS queue. ```shell -AWS_PROFILE=configured-profile dlq-utils file-to-queue -s "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" -d "/Users/myuser/Documents/some-file.txt" +AWS_PROFILE=configured-profile dlq-utils file-to-queue -s "/Users/myuser/Documents/some-file.txt" -d "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" ``` #### `queue-to-file` From 0cc2d327a031e65479d35e0b8ad19e9b8b9ca0b1 Mon Sep 17 00:00:00 2001 From: leoaugustov Date: Wed, 14 Feb 2024 18:24:48 -0300 Subject: [PATCH 9/9] Fix README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 5d6909a..f788221 100644 --- a/README.md +++ b/README.md @@ -91,5 +91,5 @@ npx dlq-utils queue-to-lambda -s "http://localhost:9324/000000000000/some-queue" Here you will find a list of features I want to include in the project: -- 🔧 Add tooling to facilitate local testing (RODAR LAMBDA LOCAL: https://github.com/lambci/docker-lambda; https://docs.localstack.cloud/user-guide/aws/lambda/) +- 🔧 Add tooling to facilitate local testing - 🔧 Add hot reload to automatically rebuild the project and improve the development experience