Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add purge-queue command #41

Merged
merged 9 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ brew upgrade dlq-utils

- Invoke a function using messages from a queue
- Move or copy messages from one queue to another
- Delete messages from a queue based on a regular expression
- Template a message before sending it to a queue or invoking a function
- (soon) Filter messages before sending them to a queue or invoking a function
- Save messages from a queue to a text file
- Read lines from a text file and send them as messages to a queue

Expand All @@ -34,28 +34,35 @@ It's necessary to specify the environment variable `AWS_PROFILE` with the [named
Invoke an AWS Lambda function with all messages from an Amazon SQS queue, being able to transform them before invoking the function.

```shell
AWS_PROFILE=configured-profile dlq-utils queue-to-lambda --queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" --function-name "some-lambda-function"
AWS_PROFILE=configured-profile dlq-utils queue-to-lambda -s "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" -d "some-lambda-function"
```

#### `file-to-queue`
Read a text file to send each line as a message to an Amazon SQS queue.

```shell
AWS_PROFILE=configured-profile dlq-utils file-to-queue --queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" --file "/Users/myuser/Documents/some-file.txt"
AWS_PROFILE=configured-profile dlq-utils file-to-queue -s "/Users/myuser/Documents/some-file.txt" -d "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue"
```

#### `queue-to-file`
Consume all messages from an Amazon SQS queue to save them in a text file.

```shell
AWS_PROFILE=configured-profile dlq-utils queue-to-file --queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" --file "/Users/myuser/Documents/some-file.txt"
AWS_PROFILE=configured-profile dlq-utils queue-to-file -s "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" -d "/Users/myuser/Documents/some-file.txt"
```

#### `queue-to-queue`
Move all messages from an Amazon SQS queue to another one, being able to transform them.

```shell
AWS_PROFILE=configured-profile dlq-utils queue-to-queue --source-queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/source-queue" --dest-queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/dest-queue"
AWS_PROFILE=configured-profile dlq-utils queue-to-queue -s "https://sqs.us-east-1.amazonaws.com/000000000000/source-queue" -d "https://sqs.us-east-1.amazonaws.com/000000000000/dest-queue"
```

#### `purge-queue`
Purge a queue conditionally based on a regular expression tested on the message body.

```shell
AWS_PROFILE=configured-profile dlq-utils purge-queue --queue-url "https://sqs.us-east-1.amazonaws.com/000000000000/some-queue" --regex ".foo"
```
For full documentation run `dlq-utils help [command]`.

Expand All @@ -74,12 +81,15 @@ Next, you need to run the command below inside the repository folder to locally
npx link .
```

After that, every time you make a change in the code base you need to rebuild the project to update the CLI behavior.
After that, every time you make a change in the code base you need to rebuild the project to update the CLI behavior. To execute commands use the prefix `npx` and do not forget the parameter `--endpoint-url`:

```shell
npx dlq-utils queue-to-lambda -s "http://localhost:9324/000000000000/some-queue" -d "some-lambda-function" --endpoint-url "http://localhost:9324"
```

## Roadmap

Here you will find a list of features I want to include in the project:

- ✨ Add the ability to filter out messages with a regex
- 🔧 Add tooling to facilitate local testing
- 🔧 Add hot reload to automatically rebuild the project and improve the development experience
16 changes: 14 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import fileToQueue from "./file-to-queue";
import queueToFile from "./queue-to-file";
import queueToLambda from "./queue-to-lambda";
import queueToQueue from "./queue-to-queue";
import purgeQueue from "./purge-queue";

const program = new Command();

Expand Down Expand Up @@ -53,8 +54,8 @@ program
"Consume all messages from a queue (without deleting) to save them in a text file. " +
"If the file already exists it will be overwritten"
)
.requiredOption("-s --file <string>", "The full name of the text file where the messages should be saved")
.requiredOption("-d --queue-url <string>", "The URL of the queue that contains the messages")
.requiredOption("-s --queue-url <string>", "The URL of the queue that contains the messages")
.requiredOption("-d --file <string>", "The full name of the text file where the messages should be saved")
.option(
"--endpoint-url <string>",
"Just like in aws-cli commands, this is only required when using a local version of SQS"
Expand All @@ -78,4 +79,15 @@ program
)
.action(queueToQueue);

program
.command("purge-queue")
.description("Clear queue conditionally based on a regular expression")
.requiredOption("-r --regex <string>", "The regex to select the messages that should be deleted")
.requiredOption("-q --queue-url <string>", "The URL of the queue that contains the messages")
.option(
"--endpoint-url <string>",
"Just like in aws-cli commands, this is only required when using a local version of SQS and Lambda (e.g. LocalStack)"
)
.action(purgeQueue);

program.parse();
29 changes: 29 additions & 0 deletions src/purge-queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import logger from "./logger";
import { SQSClient } from "@aws-sdk/client-sqs";
import { consumeMessages } from "./sqs-consumer";
import resourceValidator from "./resource-validator";

export default async ({ regex: condition, queueUrl, endpointUrl: endpoint }) => {
const sqsClient = new SQSClient({ endpoint });

if (!(await resourceValidator.validateQueue(sqsClient, queueUrl))) {
return;
}
const regex = new RegExp(condition);

let totalMessages = 0;
let totalMessagesDeleted = 0;
const messageConsumer = async (message) => {
totalMessages++;
const messageBody = message.body;
if (regex.test(messageBody)) {
totalMessagesDeleted++;
return true;
}
return false;
};
await consumeMessages(sqsClient, queueUrl, messageConsumer);
logger.success(
`Finished purge-queue successfully. ${totalMessagesDeleted} of ${totalMessages} messages were deleted`
);
};
7 changes: 1 addition & 6 deletions src/queue-to-file.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import resourceValidator from "./resource-validator";
export default async ({ file, queueUrl, endpointUrl: endpoint }) => {
const sqsClient = new SQSClient({ endpoint });

if (!(await isQueueValid(sqsClient, queueUrl))) {
if (!(await resourceValidator.validateQueue(sqsClient, queueUrl))) {
return;
}

Expand All @@ -23,8 +23,3 @@ export default async ({ file, queueUrl, endpointUrl: endpoint }) => {
await consumeMessages(sqsClient, queueUrl, messageConsumer);
logger.success(`Finished queue-to-file successfully. ${totalMessagesSaved} messages saved to file`);
};

async function isQueueValid(sqsClient, queueUrl) {
const resourcesToValidate = [{ type: "queue", value: queueUrl }];
return await resourceValidator.validate(resourcesToValidate, sqsClient);
}
4 changes: 2 additions & 2 deletions src/resource-validator.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async function validateQueue(sqsClient, queueUrl) {
if (await isExistingQueue(sqsClient, queueName)) {
return true;
}
logger.error("(ERROR) Some of the specified queues do not exist or are not accessible");
logger.error(`(ERROR) Queue ${queueUrl} does not exist or is not accessible`);
return false;
}

Expand All @@ -76,4 +76,4 @@ function mapPermissionToFsMode(permissionFlag) {
throw new Error("Unknown file permission");
}

export default { validate };
export default { validate, validateQueue };
44 changes: 44 additions & 0 deletions test/integration/purge-queue.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import purgeQueue from "purge-queue";

const QUEUE_NAME = 'events-queue';

let sqsContainer;
let sqsClient;

beforeAll(async () => {
sqsContainer = await setUpSqsService();
sqsClient = createSqsClient();

await createQueue(sqsClient, QUEUE_NAME);
});

afterAll(async () => {
await sqsContainer.stop();
});

it('should consume messages from queue and delete only those that match the regex', async () => {
const queueUrl = getQueueUrl(QUEUE_NAME);
const messages = await sendTestMessages(sqsClient, QUEUE_NAME);

const messagesThatShouldBeDeleted = messages.filter(message => message.slice(-1) % 2);
const messagesThatShouldBeKept = messages.filter(message => !(message.slice(-1) % 2));

await purgeQueue({
endpointUrl: SQS_ENDPOINT_URL,
regex: `^(${messagesThatShouldBeDeleted.join("|")})$`,
queueUrl
});

await waitVisibilityTimeout();
await assertQueueContainsMessages(sqsClient, QUEUE_NAME, messagesThatShouldBeKept);
});

it('should not throw exception when queue does not exist', async () => {
const queueUrl = getQueueUrl("nonexistent");

await purgeQueue({
endpointUrl: SQS_ENDPOINT_URL,
regex: ".",
queueUrl
});
});
65 changes: 65 additions & 0 deletions test/purge-queue.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import purgeQueue from "purge-queue";
import { consumeMessages } from "sqs-consumer";
import resourceValidator from "resource-validator";
jest.mock('sqs-consumer', () => ({
consumeMessages: jest.fn()
}));
jest.mock('resource-validator', () => ({
validateQueue: jest.fn()
}));

it('should not consume messages when queue is not valid', async () => {
const queueUrl = 'https://sqs.us-east-1.amazonaws.com/00000000/test-queue';

resourceValidator.validateQueue.mockReturnValueOnce(false);

await purgeQueue({ condition: ".", queueUrl });

expect(consumeMessages.mock.calls.length).toBe(0);
});

it('should use consumer that returns true when message matches condition', async () => {
const queueUrl = 'https://sqs.us-east-1.amazonaws.com/00000000/test-queue';
const endpointUrl = 'http://localhost:4566';

resourceValidator.validateQueue.mockReturnValueOnce(true);

await purgeQueue({ regex: "\"field\":\\s*\"value\"", queueUrl, endpointUrl });

expect(consumeMessages.mock.calls.length).toBe(1);
expect(consumeMessages.mock.calls[0][1]).toEqual(queueUrl);

const createdSqsClient = consumeMessages.mock.calls[0][0];
const resolvedEndpoint = await createdSqsClient.config.endpoint();

expect(resolvedEndpoint.protocol).toBe('http:');
expect(resolvedEndpoint.hostname).toBe('localhost');
expect(resolvedEndpoint.port).toBe(4566);

const messageBody = '{ "field": "value", "other": 2 }';
const shouldDeleteMessage = await consumeMessages.mock.calls[0][2]({ body: messageBody });
expect(shouldDeleteMessage).toBe(true);
});

it('should use consumer that returns false when message does not match condition', async () => {
const queueUrl = 'https://sqs.us-east-1.amazonaws.com/00000000/test-queue';
const endpointUrl = 'http://localhost:4566';

resourceValidator.validateQueue.mockReturnValueOnce(true);

await purgeQueue({ regex: "\"field\":\\s*\"value\"", queueUrl, endpointUrl });

expect(consumeMessages.mock.calls.length).toBe(1);
expect(consumeMessages.mock.calls[0][1]).toEqual(queueUrl);

const createdSqsClient = consumeMessages.mock.calls[0][0];
const resolvedEndpoint = await createdSqsClient.config.endpoint();

expect(resolvedEndpoint.protocol).toBe('http:');
expect(resolvedEndpoint.hostname).toBe('localhost');
expect(resolvedEndpoint.port).toBe(4566);

const messageBody = '{ "field": "potato", "other": 2 }';
const shouldDeleteMessage = await consumeMessages.mock.calls[0][2]({ body: messageBody });
expect(shouldDeleteMessage).toBe(false);
});
8 changes: 4 additions & 4 deletions test/queue-to-file.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jest.mock('fs', () => ({
createWriteStream: jest.fn()
}));
jest.mock('resource-validator', () => ({
validate: jest.fn()
validateQueue: jest.fn()
}));

it('should consume messages from queue and save them in file', async () => {
Expand All @@ -20,7 +20,7 @@ it('should consume messages from queue and save them in file', async () => {
const lineWriter = { write: jest.fn() };

fs.createWriteStream.mockReturnValueOnce(lineWriter);
resourceValidator.validate.mockReturnValueOnce(true);
resourceValidator.validateQueue.mockReturnValueOnce(true);

await queueToFile({ queueUrl, file, endpointUrl });

Expand All @@ -46,7 +46,7 @@ it('should create consumer that returns false', async () => {
const lineWriter = { write: jest.fn() };

fs.createWriteStream.mockReturnValueOnce(lineWriter);
resourceValidator.validate.mockReturnValueOnce(true);
resourceValidator.validateQueue.mockReturnValueOnce(true);

await queueToFile({ queueUrl, file });

Expand All @@ -60,7 +60,7 @@ it('should not consume messages when queue is not valid', async () => {
const queueUrl = 'https://sqs.us-east-1.amazonaws.com/00000000/test-queue';
const file = 'path/filename.csv';

resourceValidator.validate.mockReturnValueOnce(false);
resourceValidator.validateQueue.mockReturnValueOnce(false);

await queueToFile({ queueUrl, file });

Expand Down
28 changes: 28 additions & 0 deletions test/resource-validator.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,31 @@ describe('validate', () => {
expect(isExistingFunction.mock.calls.length).toBe(0);
});
});

describe('validateQueue', () => {
it('should extract queue name from queue URL and return true when queue exists', async () => {
const sqsClient = { send: jest.fn() };
isExistingQueue.mockReturnValueOnce(true);

const valid = await resourceValidator.validateQueue(
sqsClient,
"https://sqs.us-east-1.amazonaws.com/00000000/test-queue"
);

expect(isExistingQueue).toBeCalledWith(sqsClient, "test-queue");
expect(valid).toBe(true);
});

it('should extract queue name from queue URL and return false when queue does not exist', async () => {
const sqsClient = { send: jest.fn() };
isExistingQueue.mockReturnValueOnce(false);

const valid = await resourceValidator.validateQueue(
sqsClient,
"https://sqs.us-east-1.amazonaws.com/00000000/test-queue"
);

expect(isExistingQueue).toBeCalledWith(sqsClient, "test-queue");
expect(valid).toBe(false);
});
});
Loading