Skip to content

Commit

Permalink
Merge pull request #30 from joblocal/feature/ignore-unlisted-messages…
Browse files Browse the repository at this point in the history
…-from-topic

skip job creation for messages you're not listening to
  • Loading branch information
vixone authored Nov 16, 2021
2 parents a21e08d + a9e36e1 commit cfcdc03
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 11 deletions.
33 changes: 25 additions & 8 deletions src/Queue/SqsSnsQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,31 @@ public function pop($queue = null)
]);

if (is_array($response['Messages']) && count($response['Messages']) > 0) {
return new SqsSnsJob(
$this->container,
$this->sqs,
$response['Messages'][0],
$this->connectionName,
$queue,
$this->routes
);
if ($this->routeExists($response['Messages'][0])) {
return new SqsSnsJob(
$this->container,
$this->sqs,
$response['Messages'][0],
$this->connectionName,
$queue,
$this->routes
);
}
}
}

/**
* Check if subject exist within the routes.
* This skips creating a job for messages from
* topics that publish multiple different messages.
*
* @param array $message
* @return bool
*/
protected function routeExists(array $message)
{
$body = json_decode($message['Body'], true);

return isset($body['Subject']) && array_key_exists($body['Subject'], $this->routes);
}
}
27 changes: 24 additions & 3 deletions tests/SqsSnsQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,17 @@ public function testWillCallReceiveMessage()

public function testWillPopMessageOffQueue()
{
$body = json_encode(
[
'MessageId' => 'bc065409-fe1b-59c2-b17c-0e056cd19d5d',
'TopicArn' => 'arn:aws:sns',
'Subject' => 'Subject#action',
'Message' => '',
]
);

$message = [
'Body' => 'The Body',
'Body' => $body,
];

$this->sqsClient->method('receiveMessage')->willReturn([
Expand All @@ -69,12 +78,24 @@ public function testWillPopMessageOffQueue()
],
]);

$queue = new SqsSnsQueue($this->sqsClient, 'default_queue');
$queue = new SqsSnsQueue($this->sqsClient, 'default_queue', '', [
"Subject#action" => '\\Job',
]);

$queue->setContainer($this->createMock(\Illuminate\Container\Container::class));

$job = $queue->pop();
$expectedRawBody = [
'uuid' => 'bc065409-fe1b-59c2-b17c-0e056cd19d5d',
'displayName' => '\\Job',
'job' => 'Illuminate\Queue\CallQueuedHandler@call',
'data' => [
'commandName' => '\\Job',
'command' => 'N;',
],
];

$this->assertInstanceOf(SqsSnsJob::class, $job);
$this->assertEquals($message['Body'], $job->getRawBody());
$this->assertEquals(json_encode($expectedRawBody), $job->getRawBody());
}
}

0 comments on commit cfcdc03

Please sign in to comment.