Skip to content

Commit

Permalink
Search job handler in HandlerRegistry
Browse files Browse the repository at this point in the history
  • Loading branch information
msmakouz committed Dec 8, 2023
1 parent 420cd3a commit d1a982e
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 26 deletions.
8 changes: 7 additions & 1 deletion src/Queue/src/Interceptor/Consume/RetryPolicyInterceptor.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
use Spiral\Queue\Exception\JobException;
use Spiral\Queue\Exception\RetryableExceptionInterface;
use Spiral\Queue\Exception\RetryException;
use Spiral\Queue\HandlerRegistryInterface;
use Spiral\Queue\Options;
use Spiral\Queue\RetryPolicyInterface;

final class RetryPolicyInterceptor implements CoreInterceptorInterface
{
public function __construct(
private readonly ReaderInterface $reader
private readonly ReaderInterface $reader,
private readonly HandlerRegistryInterface $registry,
) {
}

Expand All @@ -26,6 +28,10 @@ public function process(string $controller, string $action, array $parameters, C
try {
return $core->callAction($controller, $action, $parameters);
} catch (\Throwable $e) {
if (!\class_exists($controller)) {
$controller = $this->registry->getHandler($controller)::class;
}

$policy = $this->getRetryPolicy($e, new \ReflectionClass($controller));

if ($policy === null) {
Expand Down
78 changes: 53 additions & 25 deletions src/Queue/tests/Interceptor/Consume/RetryPolicyInterceptorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@

namespace Spiral\Tests\Queue\Interceptor\Consume;

use PHPUnit\Framework\Attributes\DataProvider;
use Psr\Container\ContainerInterface;
use Spiral\Attributes\ReaderInterface;
use Spiral\Core\CoreInterface;
use Spiral\Core\FactoryInterface;
use Spiral\Queue\Attribute\RetryPolicy;
use Spiral\Queue\Exception\JobException;
use Spiral\Queue\Exception\RetryException;
use Spiral\Queue\HandlerInterface;
use Spiral\Queue\HandlerRegistryInterface;
use Spiral\Queue\Interceptor\Consume\RetryPolicyInterceptor;
use Spiral\Queue\QueueRegistry;
use Spiral\Queue\RetryPolicyInterface;
use Spiral\Tests\Queue\Exception\TestRetryException;
use Spiral\Tests\Queue\TestCase;
Expand All @@ -24,7 +30,15 @@ protected function setUp(): void
{
$this->reader = $this->createMock(ReaderInterface::class);
$this->core = $this->createMock(CoreInterface::class);
$this->interceptor = new RetryPolicyInterceptor($this->reader);

$registry = new QueueRegistry(
$this->createMock(ContainerInterface::class),
$this->createMock(FactoryInterface::class),
$this->createMock(HandlerRegistryInterface::class),
);
$registry->setHandler('foo', $this->createMock(HandlerInterface::class));

$this->interceptor = new RetryPolicyInterceptor($this->reader, $registry);
}

public function testWithoutException(): void
Expand All @@ -40,75 +54,80 @@ public function testWithoutException(): void
$this->assertSame('result', $this->interceptor->process('foo', 'bar', [], $this->core));
}

public function testWithoutRetryPolicy(): void
#[DataProvider('jobNameDataProvider')]
public function testWithoutRetryPolicy(string $name): void
{
$this->reader->expects($this->once())->method('firstClassMetadata')->willReturn(null);

$this->core
->expects($this->once())
->method('callAction')
->with(self::class, 'bar', [])
->with($name, 'bar', [])
->willThrowException(new \Exception('Something went wrong'));

$this->expectException(\Exception::class);
$this->expectExceptionMessage('Something went wrong');
$this->interceptor->process(self::class, 'bar', [], $this->core);
$this->interceptor->process($name, 'bar', [], $this->core);
}

public function testNotRetryableException(): void
#[DataProvider('jobNameDataProvider')]
public function testNotRetryableException(string $name): void
{
$this->reader->expects($this->once())->method('firstClassMetadata')->willReturn(new RetryPolicy());

$this->core
->expects($this->once())
->method('callAction')
->with(self::class, 'bar', [])
->with($name, 'bar', [])
->willThrowException(new \Exception('Something went wrong'));

$this->expectException(\Exception::class);
$this->expectExceptionMessage('Something went wrong');
$this->interceptor->process(self::class, 'bar', [], $this->core);
$this->interceptor->process($name, 'bar', [], $this->core);
}

public function testWithDefaultRetryPolicy(): void
#[DataProvider('jobNameDataProvider')]
public function testWithDefaultRetryPolicy(string $name): void
{
$this->reader->expects($this->once())->method('firstClassMetadata')->willReturn(new RetryPolicy());

$this->core
->expects($this->once())
->method('callAction')
->with(self::class, 'bar', [])
->with($name, 'bar', [])
->willThrowException(new TestRetryException());

try {
$this->interceptor->process(self::class, 'bar', [], $this->core);
$this->interceptor->process($name, 'bar', [], $this->core);
} catch (RetryException $e) {
$this->assertSame(1, $e->getOptions()->getDelay());
$this->assertSame(['attempts' => ['1']], $e->getOptions()->getHeaders());
}
}

public function testWithoutRetryPolicyAttribute(): void
#[DataProvider('jobNameDataProvider')]
public function testWithoutRetryPolicyAttribute(string $name): void
{
$this->reader->expects($this->once())->method('firstClassMetadata')->willReturn(null);

$this->core
->expects($this->once())
->method('callAction')
->with(self::class, 'bar', [])
->with($name, 'bar', [])
->willThrowException(new TestRetryException(
retryPolicy: new \Spiral\Queue\RetryPolicy(maxAttempts: 2, delay: 4)
));

try {
$this->interceptor->process(self::class, 'bar', [], $this->core);
$this->interceptor->process($name, 'bar', [], $this->core);
} catch (RetryException $e) {
$this->assertSame(4, $e->getOptions()->getDelay());
$this->assertSame(['attempts' => ['1']], $e->getOptions()->getHeaders());
}
}

public function testWithRetryPolicyInAttribute(): void
#[DataProvider('jobNameDataProvider')]
public function testWithRetryPolicyInAttribute(string $name): void
{
$this->reader->expects($this->once())->method('firstClassMetadata')->willReturn(
new RetryPolicy(maxAttempts: 3, delay: 4, multiplier: 2)
Expand All @@ -117,12 +136,12 @@ public function testWithRetryPolicyInAttribute(): void
$this->core
->expects($this->once())
->method('callAction')
->with(self::class, 'bar', ['headers' => ['attempts' => ['1']]])
->with($name, 'bar', ['headers' => ['attempts' => ['1']]])
->willThrowException(new TestRetryException());

try {
$this->interceptor->process(
self::class,
$name,
'bar',
['headers' => ['attempts' => ['1']]],
$this->core
Expand All @@ -133,7 +152,8 @@ public function testWithRetryPolicyInAttribute(): void
}
}

public function testWithRetryPolicyInException(): void
#[DataProvider('jobNameDataProvider')]
public function testWithRetryPolicyInException(string $name): void
{
$this->reader->expects($this->once())->method('firstClassMetadata')->willReturn(
new RetryPolicy(maxAttempts: 30, delay: 400, multiplier: 25)
Expand All @@ -142,14 +162,14 @@ public function testWithRetryPolicyInException(): void
$this->core
->expects($this->once())
->method('callAction')
->with(self::class, 'bar', ['headers' => ['attempts' => ['1']]])
->with($name, 'bar', ['headers' => ['attempts' => ['1']]])
->willThrowException(new TestRetryException(
retryPolicy: new \Spiral\Queue\RetryPolicy(maxAttempts: 3, delay: 4, multiplier: 2)
));

try {
$this->interceptor->process(
self::class,
$name,
'bar',
['headers' => ['attempts' => ['1']]],
$this->core
Expand All @@ -160,7 +180,8 @@ public function testWithRetryPolicyInException(): void
}
}

public function testWithCustomRetryPolicyInException(): void
#[DataProvider('jobNameDataProvider')]
public function testWithCustomRetryPolicyInException(string $name): void
{
$this->reader->expects($this->once())->method('firstClassMetadata')->willReturn(
new RetryPolicy(maxAttempts: 30, delay: 400, multiplier: 25)
Expand All @@ -169,7 +190,7 @@ public function testWithCustomRetryPolicyInException(): void
$this->core
->expects($this->once())
->method('callAction')
->with(self::class, 'bar', ['headers' => ['attempts' => ['1']]])
->with($name, 'bar', ['headers' => ['attempts' => ['1']]])
->willThrowException(new TestRetryException(
retryPolicy: new class implements RetryPolicyInterface {
public function isRetryable(\Throwable $exception, int $attempts = 0): bool
Expand All @@ -186,7 +207,7 @@ public function getDelay(int $attempts = 0): int

try {
$this->interceptor->process(
self::class,
$name,
'bar',
['headers' => ['attempts' => ['1']]],
$this->core
Expand All @@ -197,7 +218,8 @@ public function getDelay(int $attempts = 0): int
}
}

public function testWithRetryPolicyInExceptionInsideJobException(): void
#[DataProvider('jobNameDataProvider')]
public function testWithRetryPolicyInExceptionInsideJobException(string $name): void
{
$this->reader->expects($this->once())->method('firstClassMetadata')->willReturn(
new RetryPolicy(maxAttempts: 30, delay: 400, multiplier: 25)
Expand All @@ -206,7 +228,7 @@ public function testWithRetryPolicyInExceptionInsideJobException(): void
$this->core
->expects($this->once())
->method('callAction')
->with(self::class, 'bar', ['headers' => ['attempts' => ['1']]])
->with($name, 'bar', ['headers' => ['attempts' => ['1']]])
->willThrowException(new JobException(
previous: new TestRetryException(
retryPolicy: new \Spiral\Queue\RetryPolicy(maxAttempts: 3, delay: 4, multiplier: 2)
Expand All @@ -215,7 +237,7 @@ public function testWithRetryPolicyInExceptionInsideJobException(): void

try {
$this->interceptor->process(
self::class,
$name,
'bar',
['headers' => ['attempts' => ['1']]],
$this->core
Expand All @@ -225,4 +247,10 @@ public function testWithRetryPolicyInExceptionInsideJobException(): void
$this->assertSame(['attempts' => ['2']], $e->getOptions()->getHeaders());
}
}

public static function jobNameDataProvider(): \Traversable
{
yield [self::class];
yield ['foo'];
}
}

0 comments on commit d1a982e

Please sign in to comment.