diff --git a/README.md b/README.md index 9a06386..f788221 100644 --- a/README.md +++ b/README.md @@ -22,8 +22,8 @@ brew upgrade dlq-utils - Invoke a function using messages from a queue - Move or copy messages from one queue to another +- Delete messages from a queue based on a regular expression - Template a message before sending it to a queue or invoking a function -- (soon) Filter messages before sending them to a queue or invoking a function - Save messages from a queue to a text file - Read lines from a text file and send them as messages to a queue @@ -34,28 +34,35 @@ It's necessary to specify the environment variable `AWS_PROFILE` with the [named Invoke an AWS Lambda function with all messages from an Amazon SQS queue, being able to transform them before invoking the function. ```shell -AWS_PROFILE=configured-profile dlq-utils queue-to-lambda --queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" --function-name "some-lambda-function" +AWS_PROFILE=configured-profile dlq-utils queue-to-lambda -s "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" -d "some-lambda-function" ``` #### `file-to-queue` Read a text file to send each line as a message to an Amazon SQS queue. ```shell -AWS_PROFILE=configured-profile dlq-utils file-to-queue --queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" --file "/Users/myuser/Documents/some-file.txt" +AWS_PROFILE=configured-profile dlq-utils file-to-queue -s "/Users/myuser/Documents/some-file.txt" -d "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" ``` #### `queue-to-file` Consume all messages from an Amazon SQS queue to save them in a text file. ```shell -AWS_PROFILE=configured-profile dlq-utils queue-to-file --queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" --file "/Users/myuser/Documents/some-file.txt" +AWS_PROFILE=configured-profile dlq-utils queue-to-file -s "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" -d "/Users/myuser/Documents/some-file.txt" ``` #### `queue-to-queue` Move all messages from an Amazon SQS queue to another one, being able to transform them. ```shell -AWS_PROFILE=configured-profile dlq-utils queue-to-queue --source-queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/source-queue" --dest-queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/dest-queue" +AWS_PROFILE=configured-profile dlq-utils queue-to-queue -s "https://sqs.us-east-1.amazonaws.com/000000000000/source-queue" -d "https://sqs.us-east-1.amazonaws.com/000000000000/dest-queue" +``` + +#### `purge-queue` +Purge a queue conditionally based on a regular expression tested on the message body. + +```shell +AWS_PROFILE=configured-profile dlq-utils purge-queue --queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" --regex ".foo" ``` For full documentation run `dlq-utils help [command]`. @@ -74,12 +81,15 @@ Next, you need to run the command below inside the repository folder to locally npx link . ``` -After that, every time you make a change in the code base you need to rebuild the project to update the CLI behavior. +After that, every time you make a change in the code base you need to rebuild the project to update the CLI behavior. To execute commands use the prefix `npx` and do not forget the parameter `--endpoint-url`: + +```shell +npx dlq-utils queue-to-lambda -s "http://localhost:9324/000000000000/some-queue" -d "some-lambda-function" --endpoint-url "http://localhost:9324" +``` ## Roadmap Here you will find a list of features I want to include in the project: -- ✨ Add the ability to filter out messages with a regex - 🔧 Add tooling to facilitate local testing - 🔧 Add hot reload to automatically rebuild the project and improve the development experience diff --git a/src/index.js b/src/index.js index 91963f1..97cf3e5 100755 --- a/src/index.js +++ b/src/index.js @@ -7,6 +7,7 @@ import fileToQueue from "./file-to-queue"; import queueToFile from "./queue-to-file"; import queueToLambda from "./queue-to-lambda"; import queueToQueue from "./queue-to-queue"; +import purgeQueue from "./purge-queue"; const program = new Command(); @@ -53,8 +54,8 @@ program "Consume all messages from a queue (without deleting) to save them in a text file. " + "If the file already exists it will be overwritten" ) - .requiredOption("-s --file ", "The full name of the text file where the messages should be saved") - .requiredOption("-d --queue-url ", "The URL of the queue that contains the messages") + .requiredOption("-s --queue-url ", "The URL of the queue that contains the messages") + .requiredOption("-d --file ", "The full name of the text file where the messages should be saved") .option( "--endpoint-url ", "Just like in aws-cli commands, this is only required when using a local version of SQS" @@ -78,4 +79,15 @@ program ) .action(queueToQueue); +program + .command("purge-queue") + .description("Clear queue conditionally based on a regular expression") + .requiredOption("-r --regex ", "The regex to select the messages that should be deleted") + .requiredOption("-q --queue-url ", "The URL of the queue that contains the messages") + .option( + "--endpoint-url ", + "Just like in aws-cli commands, this is only required when using a local version of SQS and Lambda (e.g. LocalStack)" + ) + .action(purgeQueue); + program.parse(); diff --git a/src/purge-queue.js b/src/purge-queue.js new file mode 100644 index 0000000..f4a6fb3 --- /dev/null +++ b/src/purge-queue.js @@ -0,0 +1,29 @@ +import logger from "./logger"; +import { SQSClient } from "@aws-sdk/client-sqs"; +import { consumeMessages } from "./sqs-consumer"; +import resourceValidator from "./resource-validator"; + +export default async ({ regex: condition, queueUrl, endpointUrl: endpoint }) => { + const sqsClient = new SQSClient({ endpoint }); + + if (!(await resourceValidator.validateQueue(sqsClient, queueUrl))) { + return; + } + const regex = new RegExp(condition); + + let totalMessages = 0; + let totalMessagesDeleted = 0; + const messageConsumer = async (message) => { + totalMessages++; + const messageBody = message.body; + if (regex.test(messageBody)) { + totalMessagesDeleted++; + return true; + } + return false; + }; + await consumeMessages(sqsClient, queueUrl, messageConsumer); + logger.success( + `Finished purge-queue successfully. ${totalMessagesDeleted} of ${totalMessages} messages were deleted` + ); +}; diff --git a/src/queue-to-file.js b/src/queue-to-file.js index 9ba269e..d173b8a 100644 --- a/src/queue-to-file.js +++ b/src/queue-to-file.js @@ -7,7 +7,7 @@ import resourceValidator from "./resource-validator"; export default async ({ file, queueUrl, endpointUrl: endpoint }) => { const sqsClient = new SQSClient({ endpoint }); - if (!(await isQueueValid(sqsClient, queueUrl))) { + if (!(await resourceValidator.validateQueue(sqsClient, queueUrl))) { return; } @@ -23,8 +23,3 @@ export default async ({ file, queueUrl, endpointUrl: endpoint }) => { await consumeMessages(sqsClient, queueUrl, messageConsumer); logger.success(`Finished queue-to-file successfully. ${totalMessagesSaved} messages saved to file`); }; - -async function isQueueValid(sqsClient, queueUrl) { - const resourcesToValidate = [{ type: "queue", value: queueUrl }]; - return await resourceValidator.validate(resourcesToValidate, sqsClient); -} diff --git a/src/resource-validator.js b/src/resource-validator.js index 9dc498a..f987767 100644 --- a/src/resource-validator.js +++ b/src/resource-validator.js @@ -57,7 +57,7 @@ async function validateQueue(sqsClient, queueUrl) { if (await isExistingQueue(sqsClient, queueName)) { return true; } - logger.error("(ERROR) Some of the specified queues do not exist or are not accessible"); + logger.error(`(ERROR) Queue ${queueUrl} does not exist or is not accessible`); return false; } @@ -76,4 +76,4 @@ function mapPermissionToFsMode(permissionFlag) { throw new Error("Unknown file permission"); } -export default { validate }; +export default { validate, validateQueue }; diff --git a/test/integration/purge-queue.test.js b/test/integration/purge-queue.test.js new file mode 100644 index 0000000..8ed3528 --- /dev/null +++ b/test/integration/purge-queue.test.js @@ -0,0 +1,44 @@ +import purgeQueue from "purge-queue"; + +const QUEUE_NAME = 'events-queue'; + +let sqsContainer; +let sqsClient; + +beforeAll(async () => { + sqsContainer = await setUpSqsService(); + sqsClient = createSqsClient(); + + await createQueue(sqsClient, QUEUE_NAME); +}); + +afterAll(async () => { + await sqsContainer.stop(); +}); + +it('should consume messages from queue and delete only those that match the regex', async () => { + const queueUrl = getQueueUrl(QUEUE_NAME); + const messages = await sendTestMessages(sqsClient, QUEUE_NAME); + + const messagesThatShouldBeDeleted = messages.filter(message => message.slice(-1) % 2); + const messagesThatShouldBeKept = messages.filter(message => !(message.slice(-1) % 2)); + + await purgeQueue({ + endpointUrl: SQS_ENDPOINT_URL, + regex: `^(${messagesThatShouldBeDeleted.join("|")})$`, + queueUrl + }); + + await waitVisibilityTimeout(); + await assertQueueContainsMessages(sqsClient, QUEUE_NAME, messagesThatShouldBeKept); +}); + +it('should not throw exception when queue does not exist', async () => { + const queueUrl = getQueueUrl("nonexistent"); + + await purgeQueue({ + endpointUrl: SQS_ENDPOINT_URL, + regex: ".", + queueUrl + }); +}); diff --git a/test/purge-queue.test.js b/test/purge-queue.test.js new file mode 100644 index 0000000..43eb03a --- /dev/null +++ b/test/purge-queue.test.js @@ -0,0 +1,65 @@ +import purgeQueue from "purge-queue"; +import { consumeMessages } from "sqs-consumer"; +import resourceValidator from "resource-validator"; +jest.mock('sqs-consumer', () => ({ + consumeMessages: jest.fn() +})); +jest.mock('resource-validator', () => ({ + validateQueue: jest.fn() +})); + +it('should not consume messages when queue is not valid', async () => { + const queueUrl = 'https://sqs.us-east-1.amazonaws.com/00000000/test-queue'; + + resourceValidator.validateQueue.mockReturnValueOnce(false); + + await purgeQueue({ condition: ".", queueUrl }); + + expect(consumeMessages.mock.calls.length).toBe(0); +}); + +it('should use consumer that returns true when message matches condition', async () => { + const queueUrl = 'https://sqs.us-east-1.amazonaws.com/00000000/test-queue'; + const endpointUrl = 'http://localhost:4566'; + + resourceValidator.validateQueue.mockReturnValueOnce(true); + + await purgeQueue({ regex: "\"field\":\\s*\"value\"", queueUrl, endpointUrl }); + + expect(consumeMessages.mock.calls.length).toBe(1); + expect(consumeMessages.mock.calls[0][1]).toEqual(queueUrl); + + const createdSqsClient = consumeMessages.mock.calls[0][0]; + const resolvedEndpoint = await createdSqsClient.config.endpoint(); + + expect(resolvedEndpoint.protocol).toBe('http:'); + expect(resolvedEndpoint.hostname).toBe('localhost'); + expect(resolvedEndpoint.port).toBe(4566); + + const messageBody = '{ "field": "value", "other": 2 }'; + const shouldDeleteMessage = await consumeMessages.mock.calls[0][2]({ body: messageBody }); + expect(shouldDeleteMessage).toBe(true); +}); + +it('should use consumer that returns false when message does not match condition', async () => { + const queueUrl = 'https://sqs.us-east-1.amazonaws.com/00000000/test-queue'; + const endpointUrl = 'http://localhost:4566'; + + resourceValidator.validateQueue.mockReturnValueOnce(true); + + await purgeQueue({ regex: "\"field\":\\s*\"value\"", queueUrl, endpointUrl }); + + expect(consumeMessages.mock.calls.length).toBe(1); + expect(consumeMessages.mock.calls[0][1]).toEqual(queueUrl); + + const createdSqsClient = consumeMessages.mock.calls[0][0]; + const resolvedEndpoint = await createdSqsClient.config.endpoint(); + + expect(resolvedEndpoint.protocol).toBe('http:'); + expect(resolvedEndpoint.hostname).toBe('localhost'); + expect(resolvedEndpoint.port).toBe(4566); + + const messageBody = '{ "field": "potato", "other": 2 }'; + const shouldDeleteMessage = await consumeMessages.mock.calls[0][2]({ body: messageBody }); + expect(shouldDeleteMessage).toBe(false); +}); diff --git a/test/queue-to-file.test.js b/test/queue-to-file.test.js index 98a5b44..e1c9e3d 100644 --- a/test/queue-to-file.test.js +++ b/test/queue-to-file.test.js @@ -10,7 +10,7 @@ jest.mock('fs', () => ({ createWriteStream: jest.fn() })); jest.mock('resource-validator', () => ({ - validate: jest.fn() + validateQueue: jest.fn() })); it('should consume messages from queue and save them in file', async () => { @@ -20,7 +20,7 @@ it('should consume messages from queue and save them in file', async () => { const lineWriter = { write: jest.fn() }; fs.createWriteStream.mockReturnValueOnce(lineWriter); - resourceValidator.validate.mockReturnValueOnce(true); + resourceValidator.validateQueue.mockReturnValueOnce(true); await queueToFile({ queueUrl, file, endpointUrl }); @@ -46,7 +46,7 @@ it('should create consumer that returns false', async () => { const lineWriter = { write: jest.fn() }; fs.createWriteStream.mockReturnValueOnce(lineWriter); - resourceValidator.validate.mockReturnValueOnce(true); + resourceValidator.validateQueue.mockReturnValueOnce(true); await queueToFile({ queueUrl, file }); @@ -60,7 +60,7 @@ it('should not consume messages when queue is not valid', async () => { const queueUrl = 'https://sqs.us-east-1.amazonaws.com/00000000/test-queue'; const file = 'path/filename.csv'; - resourceValidator.validate.mockReturnValueOnce(false); + resourceValidator.validateQueue.mockReturnValueOnce(false); await queueToFile({ queueUrl, file }); diff --git a/test/resource-validator.test.js b/test/resource-validator.test.js index df3a3e4..c8ca565 100644 --- a/test/resource-validator.test.js +++ b/test/resource-validator.test.js @@ -144,3 +144,31 @@ describe('validate', () => { expect(isExistingFunction.mock.calls.length).toBe(0); }); }); + +describe('validateQueue', () => { + it('should extract queue name from queue URL and return true when queue exists', async () => { + const sqsClient = { send: jest.fn() }; + isExistingQueue.mockReturnValueOnce(true); + + const valid = await resourceValidator.validateQueue( + sqsClient, + "https://sqs.us-east-1.amazonaws.com/00000000/test-queue" + ); + + expect(isExistingQueue).toBeCalledWith(sqsClient, "test-queue"); + expect(valid).toBe(true); + }); + + it('should extract queue name from queue URL and return false when queue does not exist', async () => { + const sqsClient = { send: jest.fn() }; + isExistingQueue.mockReturnValueOnce(false); + + const valid = await resourceValidator.validateQueue( + sqsClient, + "https://sqs.us-east-1.amazonaws.com/00000000/test-queue" + ); + + expect(isExistingQueue).toBeCalledWith(sqsClient, "test-queue"); + expect(valid).toBe(false); + }); +});