Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trunk enhancements branch #190

Closed
wants to merge 75 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
3f704ba
Drop support pre-configured handlers, make it compatible with validat…
xepozz Dec 25, 2023
ca8a20f
Clean up after refactoring
xepozz Dec 25, 2023
101e251
Apply fixes from StyleCI
StyleCIBot Dec 25, 2023
4a31aac
Eliminate return value
xepozz Dec 25, 2023
dc2cd05
Merge remote-tracking branch 'origin/refactor-handlers' into refactor…
xepozz Dec 25, 2023
8297902
Merge branch 'master' into refactor-handlers
xepozz Jan 13, 2024
e3a598d
Merge master
xepozz Jan 13, 2024
fe95f62
Apply fixes from StyleCI
StyleCIBot Jan 13, 2024
2a65982
Fix psalm
xepozz Jan 13, 2024
da8f918
Merge remote-tracking branch 'origin/refactor-handlers' into refactor…
xepozz Jan 13, 2024
3c946d6
Remove doubled tests
xepozz Jan 13, 2024
6fe5c55
Replace inlined handler with a new envelope
xepozz Jan 13, 2024
cc15b35
Add envelope stack
xepozz Jan 13, 2024
66fc7b6
Move serializer from ampq driver and refactor
xepozz Jan 13, 2024
d70d5c1
Apply fixes from StyleCI
StyleCIBot Jan 13, 2024
86c0cf5
Apply Rector changes (CI)
xepozz Jan 13, 2024
f1b4f91
Support php80
xepozz Jan 13, 2024
3a689a2
Test envelope decodes right
xepozz Jan 13, 2024
5dd848c
Remove redundant class
xepozz Jan 13, 2024
d4f709e
Rename class
xepozz Jan 13, 2024
12eb2d1
Add default DI binding
xepozz Jan 13, 2024
f5d1915
Add test case
xepozz Jan 13, 2024
a0ecfa1
Fix error string
xepozz Jan 14, 2024
ba21c45
Fix tests
xepozz Jan 14, 2024
e0a045f
Rewrite middlewares
xepozz Jan 14, 2024
6f1ee24
Apply fixes from StyleCI
StyleCIBot Jan 14, 2024
548d3c8
Fix log message
xepozz Jan 14, 2024
84db7a4
Implement method
xepozz Jan 14, 2024
4e700c8
Merge remote-tracking branch 'origin/middlewares' into middlewares
xepozz Jan 14, 2024
8302821
Revert some consume middlewares
xepozz Jan 14, 2024
7fe6ad7
Apply fixes from StyleCI
StyleCIBot Jan 14, 2024
df89b8e
Fix psalm
xepozz Jan 14, 2024
c192519
Merge remote-tracking branch 'origin/middlewares' into middlewares
xepozz Jan 14, 2024
b6f94d1
Merge branch 'refactor-handlers' into new
xepozz Jan 14, 2024
cc82476
Merge branches
xepozz Jan 14, 2024
3f70b03
Merge branch 'middlewares' into new
xepozz Jan 14, 2024
693734c
Merge branches
xepozz Jan 14, 2024
eb06266
Apply fixes from StyleCI
StyleCIBot Jan 14, 2024
d5295db
Move middlewares
xepozz Jan 14, 2024
8b8d83d
Move envelope
xepozz Jan 14, 2024
2762662
Eliminate FailureMiddlewares
xepozz Jan 14, 2024
4d9b924
Remove queue from FailureMiddlewares
xepozz Jan 14, 2024
beaf64b
Delete mentions of FailureMiddlewares
xepozz Jan 14, 2024
cc5309b
Replace Adapter with Queue
xepozz Jan 14, 2024
92e0fcc
Apply fixes from StyleCI
StyleCIBot Jan 14, 2024
e6d200b
Decouple adapter and the queue
xepozz Jan 19, 2024
840f786
Apply fixes from StyleCI
StyleCIBot Jan 19, 2024
1f2d8b7
Move callback to events
xepozz Feb 2, 2024
992213b
Merge remote-tracking branch 'origin/new' into new
xepozz Feb 2, 2024
2697a18
Apply fixes from StyleCI
StyleCIBot Feb 2, 2024
5408f92
Use event-dispatcher instead of handlers
xepozz Feb 2, 2024
35ec5c5
Merge remote-tracking branch 'origin/new' into new
xepozz Feb 2, 2024
4163503
Apply fixes from StyleCI
StyleCIBot Feb 2, 2024
4bba2f4
Serialize message class
xepozz Feb 4, 2024
0705f4c
Add message trait
xepozz Feb 4, 2024
878da81
Fix bug
xepozz Feb 4, 2024
403710e
Remove definition
xepozz Feb 4, 2024
b4315ad
Merge remote-tracking branch 'origin/new' into new
xepozz Feb 4, 2024
2e4ae7a
Apply fixes from StyleCI
StyleCIBot Feb 4, 2024
1d1b7f9
Merge branch 'master' into new
xepozz Feb 4, 2024
997b9a5
Fix tests
xepozz Feb 4, 2024
d62d871
Fix namespaces
xepozz Feb 13, 2024
ecd7123
Remove stack
xepozz Feb 13, 2024
9f3ee0e
Adjust tests to pub-sub pattern
xepozz Feb 13, 2024
3b2262a
Apply fixes from StyleCI
StyleCIBot Feb 13, 2024
1c824e0
Apply Rector changes (CI)
xepozz Feb 13, 2024
76c605f
Apply fixes from StyleCI
StyleCIBot Feb 13, 2024
8f1bb1d
Share test stuff
xepozz Feb 14, 2024
60bbaf0
Merge remote-tracking branch 'origin/new' into new
xepozz Feb 14, 2024
2959358
Add psr/event-dispatcher
xepozz Feb 14, 2024
d0caf53
Fix psalm warnings
xepozz Feb 14, 2024
8df9810
Merge branch 'master' into new
xepozz Feb 14, 2024
4e92f9d
Dump messages with yiisoft/var-dumper
xepozz Feb 16, 2024
4165f39
Apply fixes from StyleCI
StyleCIBot Feb 16, 2024
453c219
Merge branch 'master' into new
xepozz Mar 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
/psalm.xml export-ignore
/phpunit.xml.dist export-ignore
/Makefile export-ignore
/tests export-ignore
/tests/App export-ignore
/tests/Integration export-ignore
/tests/Unit export-ignore
/tests/runtime export-ignore
/docs export-ignore

# Avoid merge conflicts in CHANGELOG
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ composer.lock

# PHPUnit
.phpunit.result.cache
.phpunit.cache
coverage.html
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ $data = [
'url' => $url,
'destinationFile' => $filename,
];
$message = new \Yiisoft\Queue\Message\Message('file-download', $data);
$message = new \Yiisoft\Queue\Message\Message(FileDownloader::class, $data);
```

Then you should push it to the queue:
Expand All @@ -80,7 +80,10 @@ $queue->push($message);
Its handler may look like the following:

```php
class FileDownloader
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageHandlerInterface;

class FileDownloader implements MessageHandlerInterface
{
private string $absolutePath;

Expand All @@ -89,21 +92,21 @@ class FileDownloader
$this->absolutePath = $absolutePath;
}

public function handle(\Yiisoft\Queue\Message\MessageInterface $downloadMessage): void
public function handle(\Yiisoft\Queue\Message\MessageInterface $downloadMessage): MessageInterface
{
$fileName = $downloadMessage->getData()['destinationFile'];
$fileName = $message->getData()['destinationFile'];
$path = "$this->absolutePath/$fileName";
file_put_contents($path, file_get_contents($downloadMessage->getData()['url']));
file_put_contents($path, file_get_contents($message->getData()['url']));

return $message;
}
}
```

The last thing we should do is to create a configuration for the `Yiisoft\Queue\Worker\Worker`:

```php
$handlers = ['file-download' => [new FileDownloader('/path/to/save/files'), 'handle']];
$worker = new \Yiisoft\Queue\Worker\Worker(
$handlers, // Here it is
$logger,
$injector,
$container
Expand Down
11 changes: 9 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@
"require": {
"php": "^8.1",
"psr/container": "^1.0|^2.0",
"psr/event-dispatcher": "^1.0",
"psr/log": "^2.0|^3.0",
"symfony/console": "^5.4|^6.0",
"yiisoft/definitions": "^1.0|^2.0|^3.0",
"yiisoft/event-dispatcher": "^1.1",
"yiisoft/friendly-exception": "^1.0",
"yiisoft/injector": "^1.0"
"yiisoft/injector": "^1.0",
"yiisoft/var-dumper": "^1.7",
"yiisoft/yii-event": "^2.1"
},
"require-dev": {
"maglnet/composer-require-checker": "^4.7",
Expand All @@ -49,7 +53,8 @@
},
"autoload": {
"psr-4": {
"Yiisoft\\Queue\\": "src"
"Yiisoft\\Queue\\": "src",
"Yiisoft\\Queue\\Tests\\Shared\\": "tests/Shared"
}
},
"autoload-dev": {
Expand All @@ -66,6 +71,8 @@
},
"config-plugin": {
"di": "di.php",
"events-web": "events-web.php",
"events-console": "events-console.php",
"params": "params.php"
}
},
Expand Down
71 changes: 44 additions & 27 deletions config/di.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,60 +3,77 @@
declare(strict_types=1);

use Psr\Container\ContainerInterface;
use Yiisoft\Config\ConfigInterface;
use Yiisoft\Definitions\Reference;
use Yiisoft\EventDispatcher\Dispatcher\Dispatcher;
use Yiisoft\EventDispatcher\Provider\Provider;
use Yiisoft\Injector\Injector;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Adapter\SynchronousAdapter;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Cli\SignalLoop;
use Yiisoft\Queue\Cli\SimpleLoop;
use Yiisoft\Queue\Command\ListenAllCommand;
use Yiisoft\Queue\Command\RunCommand;
use Yiisoft\Queue\Message\JsonMessageSerializer;
use Yiisoft\Queue\Message\MessageSerializerInterface;
use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsume;
use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsumeInterface;
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFactoryFailure;
use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFactoryFailureInterface;
use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPush;
use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPushInterface;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\MiddlewareDispatcher;
use Yiisoft\Queue\Middleware\MiddlewareFactory;
use Yiisoft\Queue\Middleware\MiddlewareFactoryInterface;
use Yiisoft\Queue\Command\ListenAllCommand;
use Yiisoft\Queue\Command\RunCommand;
use Yiisoft\Queue\Queue;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Worker\Worker as QueueWorker;
use Yiisoft\Queue\Worker\WorkerInterface;
use Yiisoft\Yii\Event\ListenerCollectionFactory;

/* @var array $params */

return [
QueueWorker::class => [
'class' => QueueWorker::class,
'__construct()' => [$params['yiisoft/queue']['handlers']],
'__construct()' => [
'eventDispatcher' => Reference::to('queue.dispatcher'),
],

Check warning on line 38 in config/di.php

View check run for this annotation

Codecov / codecov/patch

config/di.php#L36-L38

Added lines #L36 - L38 were not covered by tests
],
WorkerInterface::class => QueueWorker::class,
LoopInterface::class => static function (ContainerInterface $container): LoopInterface {
return extension_loaded('pcntl')
? $container->get(SignalLoop::class)
: $container->get(SimpleLoop::class);
return $container->get(
extension_loaded('pcntl')
? SignalLoop::class
: SimpleLoop::class
);
},
'queue.middlewareDispatcher.push' => static function (Injector $injector) use ($params) {
return $injector->make(
MiddlewareDispatcher::class,
['middlewareDefinitions' => $params['yiisoft/queue']['middlewares-push']]
);

Check warning on line 52 in config/di.php

View check run for this annotation

Codecov / codecov/patch

config/di.php#L42-L52

Added lines #L42 - L52 were not covered by tests
},
Queue::class => [
'__construct()' => [
'adapter' => Reference::to(AdapterInterface::class),
'pushMiddlewareDispatcher' => Reference::to('queue.middlewareDispatcher.push'),
],
],

Check warning on line 59 in config/di.php

View check run for this annotation

Codecov / codecov/patch

config/di.php#L54-L59

Added lines #L54 - L59 were not covered by tests
QueueFactoryInterface::class => QueueFactory::class,
QueueFactory::class => [
'__construct()' => ['channelConfiguration' => $params['yiisoft/queue']['channel-definitions']],
],
AdapterInterface::class => SynchronousAdapter::class,

Check warning on line 64 in config/di.php

View check run for this annotation

Codecov / codecov/patch

config/di.php#L64

Added line #L64 was not covered by tests

QueueInterface::class => Queue::class,
MiddlewareFactoryPushInterface::class => MiddlewareFactoryPush::class,
MiddlewareFactoryConsumeInterface::class => MiddlewareFactoryConsume::class,
MiddlewareFactoryFailureInterface::class => MiddlewareFactoryFailure::class,
PushMiddlewareDispatcher::class => [
'__construct()' => ['middlewareDefinitions' => $params['yiisoft/queue']['middlewares-push']],
],
ConsumeMiddlewareDispatcher::class => [
'__construct()' => ['middlewareDefinitions' => $params['yiisoft/queue']['middlewares-consume']],
],
FailureMiddlewareDispatcher::class => [
'__construct()' => ['middlewareDefinitions' => $params['yiisoft/queue']['middlewares-fail']],
],
MessageSerializerInterface::class => JsonMessageSerializer::class,
MiddlewareFactoryInterface::class => MiddlewareFactory::class,

Check warning on line 68 in config/di.php

View check run for this annotation

Codecov / codecov/patch

config/di.php#L68

Added line #L68 was not covered by tests

'queue.dispatcher' => static function (ConfigInterface $config, ListenerCollectionFactory $factory) {
$listeners = $factory->create($config->get('queue'));

Check warning on line 71 in config/di.php

View check run for this annotation

Codecov / codecov/patch

config/di.php#L70-L71

Added lines #L70 - L71 were not covered by tests

$provider = new Provider($listeners);

Check warning on line 73 in config/di.php

View check run for this annotation

Codecov / codecov/patch

config/di.php#L73

Added line #L73 was not covered by tests

return new Dispatcher($provider);
},

Check warning on line 76 in config/di.php

View check run for this annotation

Codecov / codecov/patch

config/di.php#L75-L76

Added lines #L75 - L76 were not covered by tests
RunCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/queue']['channel-definitions']),
Expand Down
20 changes: 20 additions & 0 deletions config/events-console.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

use Psr\Container\ContainerInterface;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Adapter\SynchronousAdapter;
use Yiisoft\Queue\QueueInterface;

/* @var array $params */

return [
\Yiisoft\Yii\Console\Event\ApplicationShutdown::class => [
function (AdapterInterface $adapter, ContainerInterface $container) {
if ($adapter instanceof SynchronousAdapter) {
$container->get(QueueInterface::class)->run(0);

Check warning on line 16 in config/events-console.php

View check run for this annotation

Codecov / codecov/patch

config/events-console.php#L12-L16

Added lines #L12 - L16 were not covered by tests
}
},
],
];

Check warning on line 20 in config/events-console.php

View check run for this annotation

Codecov / codecov/patch

config/events-console.php#L18-L20

Added lines #L18 - L20 were not covered by tests
20 changes: 20 additions & 0 deletions config/events-web.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

declare(strict_types=1);

use Psr\Container\ContainerInterface;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Adapter\SynchronousAdapter;
use Yiisoft\Queue\QueueInterface;

/* @var array $params */

return [
\Yiisoft\Yii\Http\Event\AfterRequest::class => [
function (AdapterInterface $adapter, ContainerInterface $container) {
if ($adapter instanceof SynchronousAdapter) {
$container->get(QueueInterface::class)->run(0);

Check warning on line 16 in config/events-web.php

View check run for this annotation

Codecov / codecov/patch

config/events-web.php#L12-L16

Added lines #L12 - L16 were not covered by tests
}
},
],
];

Check warning on line 20 in config/events-web.php

View check run for this annotation

Codecov / codecov/patch

config/events-web.php#L18-L20

Added lines #L18 - L20 were not covered by tests
1 change: 0 additions & 1 deletion config/params.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
],
],
'yiisoft/queue' => [
'handlers' => [],
'channel-definitions' => [
QueueFactoryInterface::DEFAULT_CHANNEL_NAME => AdapterInterface::class,
],
Expand Down
7 changes: 7 additions & 0 deletions config/queue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

declare(strict_types=1);

/* @var array $params */

return [];

Check warning on line 7 in config/queue.php

View check run for this annotation

Codecov / codecov/patch

config/queue.php#L7

Added line #L7 was not covered by tests
30 changes: 11 additions & 19 deletions src/Adapter/SynchronousAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
use InvalidArgumentException;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Worker\WorkerInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\Message\IdEnvelope;

final class SynchronousAdapter implements AdapterInterface
Expand All @@ -18,26 +17,18 @@ final class SynchronousAdapter implements AdapterInterface
private int $current = 0;

public function __construct(
private WorkerInterface $worker,
private QueueInterface $queue,
private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME,
private MessageSerializerInterface $messageSerializer,
private string $channel = QueueFactoryInterface::DEFAULT_CHANNEL_NAME,
) {
}

public function __destruct()
{
$this->runExisting(function (MessageInterface $message): bool {
$this->worker->process($message, $this->queue);

return true;
});
}

public function runExisting(callable $handlerCallback): void
{
$result = true;
while (isset($this->messages[$this->current]) && $result === true) {
$result = $handlerCallback($this->messages[$this->current]);
while ($result === true && isset($this->messages[$this->current])) {
$result = $handlerCallback(
$this->messageSerializer->unserialize($this->messages[$this->current])
);
unset($this->messages[$this->current]);
$this->current++;
}
Expand Down Expand Up @@ -65,9 +56,10 @@ public function status(string|int $id): JobStatus
public function push(MessageInterface $message): MessageInterface
{
$key = count($this->messages) + $this->current;
$this->messages[] = $message;
$newMessage = new IdEnvelope($message, $key);
$this->messages[] = $this->messageSerializer->serialize($newMessage);

return new IdEnvelope($message, $key);
return $newMessage;
}

public function subscribe(callable $handlerCallback): void
Expand Down
4 changes: 2 additions & 2 deletions src/Debug/QueueCollector.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

namespace Yiisoft\Queue\Debug;

use Yiisoft\Queue\Middleware\MiddlewareInterface;
use Yiisoft\Yii\Debug\Collector\CollectorTrait;
use Yiisoft\Yii\Debug\Collector\SummaryCollectorInterface;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface;
use Yiisoft\Queue\QueueInterface;

final class QueueCollector implements SummaryCollectorInterface
Expand Down Expand Up @@ -53,7 +53,7 @@ public function collectStatus(string $id, JobStatus $status): void
public function collectPush(
string $channel,
MessageInterface $message,
string|array|callable|MiddlewarePushInterface ...$middlewareDefinitions,
string|array|callable|MiddlewareInterface ...$middlewareDefinitions,
): void {
if (!$this->isActive()) {
return;
Expand Down
9 changes: 7 additions & 2 deletions src/Debug/QueueDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface;
use Yiisoft\Queue\Middleware\MiddlewareInterface;
use Yiisoft\Queue\QueueInterface;

final class QueueDecorator implements QueueInterface
Expand All @@ -28,7 +28,7 @@

public function push(
MessageInterface $message,
string|array|callable|MiddlewarePushInterface ...$middlewareDefinitions
string|array|callable|MiddlewareInterface ...$middlewareDefinitions
): MessageInterface {
$message = $this->queue->push($message, ...$middlewareDefinitions);
$this->collector->collectPush($this->queue->getChannelName(), $message, ...$middlewareDefinitions);
Expand Down Expand Up @@ -61,4 +61,9 @@
$new->queue = $this->queue->withChannelName($channel);
return $new;
}

public function getAdapter(): ?AdapterInterface

Check warning on line 65 in src/Debug/QueueDecorator.php

View check run for this annotation

Codecov / codecov/patch

src/Debug/QueueDecorator.php#L65

Added line #L65 was not covered by tests
{
return $this->queue->getAdapter();

Check warning on line 67 in src/Debug/QueueDecorator.php

View check run for this annotation

Codecov / codecov/patch

src/Debug/QueueDecorator.php#L67

Added line #L67 was not covered by tests
}
}
Loading
Loading