Skip to content
This repository has been archived by the owner on Feb 14, 2024. It is now read-only.

Commit

Permalink
Merge pull request #11 from carvefx/5.2
Browse files Browse the repository at this point in the history
Control reservation timeouts when popping a new item off the queue
  • Loading branch information
tshafer committed Mar 15, 2016
2 parents 256c103 + 60ab5c9 commit 041eca6
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 7 deletions.
1 change: 1 addition & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ Sample Configuration:
'project' => 'your-project-id',
'queue' => 'your-queue-name',
'encrypt' => true,
'timeout' => 60
],
```
2 changes: 1 addition & 1 deletion src/Connectors/IronConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ public function connect(array $config)
$iron->ssl_verifypeer = $config['ssl_verifypeer'];
}

return new IronQueue($iron, $this->request, $config['queue'], $config['encrypt']);
return new IronQueue($iron, $this->request, $config['queue'], $config['encrypt'], $config['timeout']);
}
}
15 changes: 11 additions & 4 deletions src/IronQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,29 @@ class IronQueue extends Queue implements QueueContract
*/
protected $shouldEncrypt;

/**
* Number of seconds before the reservation_id times out on a newly popped message.
*
* @var int
*/
protected $timeout;

/**
* Create a new IronMQ queue instance.
*
* @param \IronMQ\IronMQ $iron
* @param \Illuminate\Http\Request $request
* @param string $default
* @param bool $shouldEncrypt
*
* @return void
* @param int $timeout
*/
public function __construct(IronMQ $iron, Request $request, $default, $shouldEncrypt = false)
public function __construct(IronMQ $iron, Request $request, $default, $shouldEncrypt = false, $timeout = 60)
{
$this->iron = $iron;
$this->request = $request;
$this->default = $default;
$this->shouldEncrypt = $shouldEncrypt;
$this->timeout = $timeout;
}

/**
Expand Down Expand Up @@ -135,7 +142,7 @@ public function pop($queue = null)
{
$queue = $this->getQueue($queue);

$job = $this->iron->reserveMessage($queue);
$job = $this->iron->reserveMessage($queue, $this->timeout);

// If we were able to pop a message off of the queue, we will need to decrypt
// the message body, as all Iron.io messages are encrypted, since the push
Expand Down
29 changes: 27 additions & 2 deletions tests/IronQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,21 @@ public function testPopProperlyPopsJobOffOfIron()
$crypt = m::mock('Illuminate\Contracts\Encryption\Encrypter');
$queue->setEncrypter($crypt);
$queue->setContainer(m::mock('Illuminate\Container\Container'));
$iron->shouldReceive('reserveMessage')->once()->with('default')->andReturn($job = m::mock('IronMQ_Message'));
$iron->shouldReceive('reserveMessage')->once()->with('default', 60)->andReturn($job = m::mock('IronMQ_Message'));
$job->body = 'foo';
$crypt->shouldReceive('decrypt')->once()->with('foo')->andReturn('foo');
$result = $queue->pop();

$this->assertInstanceOf('Collective\IronQueue\Jobs\IronJob', $result);
}

public function testPopProperlyPopsJobOffOfIronWithCustomTimeout()
{
$queue = new Collective\IronQueue\IronQueue($iron = m::mock('IronMQ\IronMQ'), m::mock('Illuminate\Http\Request'), 'default', true, 120);
$crypt = m::mock('Illuminate\Contracts\Encryption\Encrypter');
$queue->setEncrypter($crypt);
$queue->setContainer(m::mock('Illuminate\Container\Container'));
$iron->shouldReceive('reserveMessage')->once()->with('default', 120)->andReturn($job = m::mock('IronMQ_Message'));
$job->body = 'foo';
$crypt->shouldReceive('decrypt')->once()->with('foo')->andReturn('foo');
$result = $queue->pop();
Expand All @@ -89,14 +103,25 @@ public function testPopProperlyPopsJobOffOfIronWithoutEncryption()
$crypt = m::mock('Illuminate\Contracts\Encryption\Encrypter');
$queue->setEncrypter($crypt);
$queue->setContainer(m::mock('Illuminate\Container\Container'));
$iron->shouldReceive('reserveMessage')->once()->with('default')->andReturn($job = m::mock('IronMQ_Message'));
$iron->shouldReceive('reserveMessage')->once()->with('default', 60)->andReturn($job = m::mock('IronMQ_Message'));
$job->body = 'foo';
$crypt->shouldReceive('decrypt')->never();
$result = $queue->pop();

$this->assertInstanceOf('Collective\IronQueue\Jobs\IronJob', $result);
}

/**
* @expectedException IronCore\HttpException
*/
public function testDeleteJobWithExpiredReservationIdThrowsAnException()
{
$queue = new Collective\IronQueue\IronQueue($iron = m::mock('IronMQ\IronMQ'), m::mock('Illuminate\Http\Request'), 'default', false, 30);
$iron->shouldReceive('deleteMessage')->with('default', 1, 'def456')->andThrow('IronCore\HttpException', '{"msg":"Reservation has timed out"}');
// 'def456' refers to a reservation id that expired
$queue->deleteMessage('default', 1, 'def456');
}

public function testPushedJobsCanBeMarshaled()
{
$queue = $this->getMock('Collective\IronQueue\IronQueue', ['createPushedIronJob'], [$iron = m::mock('IronMQ\IronMQ'), $request = m::mock('Illuminate\Http\Request'), 'default', true]);
Expand Down

0 comments on commit 041eca6

Please sign in to comment.