diff --git a/src/Queue/SqsSnsQueue.php b/src/Queue/SqsSnsQueue.php index 5e30f62..a3caf26 100644 --- a/src/Queue/SqsSnsQueue.php +++ b/src/Queue/SqsSnsQueue.php @@ -47,14 +47,31 @@ public function pop($queue = null) ]); if (is_array($response['Messages']) && count($response['Messages']) > 0) { - return new SqsSnsJob( - $this->container, - $this->sqs, - $response['Messages'][0], - $this->connectionName, - $queue, - $this->routes - ); + if ($this->routeExists($response['Messages'][0])) { + return new SqsSnsJob( + $this->container, + $this->sqs, + $response['Messages'][0], + $this->connectionName, + $queue, + $this->routes + ); + } } } + + /** + * Check if subject exist within the routes. + * This skips creating a job for messages from + * topics that publish multiple different messages. + * + * @param array $message + * @return bool + */ + protected function routeExists(array $message) + { + $body = json_decode($message['Body'], true); + + return isset($body['Subject']) && array_key_exists($body['Subject'], $this->routes); + } } diff --git a/tests/SqsSnsQueueTest.php b/tests/SqsSnsQueueTest.php index 1dac699..306d824 100644 --- a/tests/SqsSnsQueueTest.php +++ b/tests/SqsSnsQueueTest.php @@ -59,8 +59,17 @@ public function testWillCallReceiveMessage() public function testWillPopMessageOffQueue() { + $body = json_encode( + [ + 'MessageId' => 'bc065409-fe1b-59c2-b17c-0e056cd19d5d', + 'TopicArn' => 'arn:aws:sns', + 'Subject' => 'Subject#action', + 'Message' => '', + ] + ); + $message = [ - 'Body' => 'The Body', + 'Body' => $body, ]; $this->sqsClient->method('receiveMessage')->willReturn([ @@ -69,12 +78,24 @@ public function testWillPopMessageOffQueue() ], ]); - $queue = new SqsSnsQueue($this->sqsClient, 'default_queue'); + $queue = new SqsSnsQueue($this->sqsClient, 'default_queue', '', [ + "Subject#action" => '\\Job', + ]); + $queue->setContainer($this->createMock(\Illuminate\Container\Container::class)); $job = $queue->pop(); + $expectedRawBody = [ + 'uuid' => 'bc065409-fe1b-59c2-b17c-0e056cd19d5d', + 'displayName' => '\\Job', + 'job' => 'Illuminate\Queue\CallQueuedHandler@call', + 'data' => [ + 'commandName' => '\\Job', + 'command' => 'N;', + ], + ]; $this->assertInstanceOf(SqsSnsJob::class, $job); - $this->assertEquals($message['Body'], $job->getRawBody()); + $this->assertEquals(json_encode($expectedRawBody), $job->getRawBody()); } }