When you want to apply CQRS the command bus is your best friend. It takes an incoming command message and routes it to the responsible command handler. The advantage of using a CommandBus instead of calling command handlers directly is that you can change your model without affecting the application logic. You can work with command versions to dispatch a newer version to a new command handler and older versions to old command handlers. Your model can support different versions at the same time which makes migrations a lot easier. Another feature of a command bus could be automatic transaction handling. And for distributed systems it is also interesting to push the command on a queue and handle it asynchronously.
When dividing your domain logic into modules or bounded contexts you need a way to inform the outside world about events that happened in your model. An EventBus is responsible for dispatching event messages to all interested listeners. If a listener is part of another system the event may need to be sent to a remote interface.
A system based on Microservices requires a lightweight
communication channel.
The two most used protocols are HTTP request-response with resource API's and lightweight messaging. The latter
is supported by prooph/service-bus out-of-the-box but HTTP API's can be integrated too.
The QueryBus is responsible for routing a query message to a so-called finder. The query indicates that the
producer expects a response.
The finder's responsibility is to fetch data from a data source using the query parameters defined in the query
message. It is up to the finder if the data is fetched synchronous or asynchronous, so the QueryBus returns
a React\Promise\Promise
to the callee.
All three bus types extend the same base class Prooph\ServiceBus\MessageBus
and therefore make use of an
event-driven message dispatch process. Take a look at the CommandBus API. It is the same for EventBus and
QueryBus except that the QueryBus returns a promise from QueryBus::dispatch
.
class CommandBus extends MessageBus
{
public function attach(string $eventName, callable $listener, int $priority = 0): ListenerHandler
public function detach(ListenerHandler $handler): void
/**
* @param mixed $command
*
* @throws CommandDispatchException
*/
public function dispatch($command): void;
}
The public API of a message bus is very simple. You can attach and detach plugins which are simple event listener aggregates and you can dispatch a message.
Internally a prooph message bus uses an event-driven process to dispatch messages. This offers a lot of flexibility without the need to define interfaces for messages. A message can be anything, even a string. prooph/service-bus doesn't care. But using some defaults will reduce the number of required plugins and increase performance.
But first let's take a look at the internals of a message dispatch process and the differences between the bus types.
This action event is triggered right after MessageBus::dispatch($message)
is invoked.
The following default priorities are integrated:
public const PRIORITY_INITIALIZE = 400000;
public const PRIORITY_DETECT_MESSAGE_NAME = 300000;
public const PRIORITY_ROUTE = 200000;
public const PRIORITY_LOCATE_HANDLER = 100000;
public const PRIORITY_INVOKE_HANDLER = 0;
At this time the action event only contains the message
. You can attach any listeners for initialization.
Before a message handler can be located, the message bus needs to know how the message is named. Their are two
possibilities to provide the information. The message can implement the
Prooph\Common\Messaging\HasMessageName
interface.
In this case the message bus picks the name directly from the message and sets it as the param message-name
in the
action event for use later. The detect-message-name
event is not triggered. If the message
does not implement the interface the detect-message-name
priority can be used to add a plugin to inject the
name using ActionEvent::setParam('message-name', $messageName)
.
If no message-name
was set by a listener, the message bus uses a fallback:
- FQCN of message in case of object
- message => message-name in case of string
gettype($message)
in all other cases
During the route
phase a plugin (typically a router) should provide the responsible message handler either
in the form of a ready to use callable
, an object or just a string.
The latter should be a service id that can be passed to a service locator to get an instance of the message
handler.
The message handler should be set as action event param message-handler
(for CommandBus and QueryBus) or
event-listeners
(for EventBus).
As you can see, the command and query bus work with a single message handler whereas the event bus works with multiple listeners. This is one of the most important differences. Only the event bus allows multiple message handlers per message and therefore uses a slightly different dispatch process.
After routing the message, the message bus checks if the handler was provided as a string. This is the last chance to provide an object or callable as message handler. If no plugin was able to provide one the message bus throws an exception.
With the message handler in place, it's time to invoke it with the message. callable
message handlers
are invoked by the bus. However, the invoke-handler
/ invoke-finder
events are always triggered.
At this stage all three bus types behave a bit different.
- CommandBus: invokes the handler with the command message. A
invoke-handler
event is triggered. - QueryBus: much the same as the command bus but the message handler is invoked with the query message
and a
React\Promise\Deferred
that needs to be resolved by the message handler aka finder. The query bus triggers ainvoke-finder
action event to indicate that a finder should be invoked and not a normal message handler. - EventBus: loops over all
event-listeners
and triggers thelocate-handler
andinvoke-handler
action events for each message listener.
Note: The command and query bus have a mechanism to check if the command or query was handled. If not they throw an exception. The event bus does not have such a mechanism as having no listener for an event is a valid case.
This action event is always triggered at the end of the process no matter if the process was successful or an exception was thrown. It is the ideal place to attach a monitoring plugin.
If at any time a plugin or the message bus itself throws an exception it is caught and passed as param
exception
to this action event. The normal action event chain breaks and a finalize
event is triggered
instead. Plugins can then access the exception by getting it from the action event.
A finalize
plugin can unset the exception by calling ActionEvent::setParam("exception", null)
.
When all plugins are informed about the error and no one has unset the exception the message bus
throws a Prooph\ServiceBus\Exception\MessageDispatchException to inform the outside world about the error.
Note: The query bus has another additional priority PRIORITY_PROMISE_REJECT
which is used to reject the promise
in case of an exception during the finalize event. If you want to remove the exception with a listener, you need
to add your listener with a higher priority than that.
There are two things to consider when upgrading from v5.
- The
handle-error
event is gone. If you want to have a plugin that tracks exceptions, you need to use thefinalize
event and check for the existence of an exception. This can look like this:
$commandBus->attach(
CommandBus::EVENT_FINALIZE,
function (ActionEvent $actionEvent) {
if ($ex = $actionEvent->getParam(CommandBus::EVENT_PARAM_EXCEPTION)) {
// do something
}
}
);
The event bus has a listener exception collection mode. This means that you can activate the mode and the event bus will
invoke all event listeners, catch possible exceptions and push them to an exception collection. If an exception is caught
the event bus will throw an Prooph\ServiceBus\Exception\EventListenerException
at the end which contains all caught listener exceptions.
To enable the collection mode you can attach the plugin Prooph\ServiceBus\Plugin\ListenerExceptionCollectionMode
.
Detach the plugin to deactivate the mode again.
- There is a new
dispatch
event replacing all other previously existing events. It is controlled by event priorities instead. So if your previous plugin looked like this:
$commandBus->attach(
CommandBus::EVENT_INVOKE_HANDLER,
function (ActionEvent $actionEvent) {
if ($ex = $actionEvent->getParam(CommandBus::EVENT_PARAM_EXCEPTION) {
// do something
}
}
);
it now has to look like this:
$commandBus->attach(
CommandBus::EVENT_DISPATCH,
function (ActionEvent $actionEvent) {
if ($ex = $actionEvent->getParam(CommandBus::EVENT_PARAM_EXCEPTION) {
// do something
}
},
CommandBus::PRIORITY_INVOKE_HANDLER
);
- Attaching to ActionEvents
Instead of calling:
$commandBus
->getActionEventEmitter()
->attachListener(string $event, callable $listener, int $priority = 1): ListenerHandler;
It's more simple now:
$commandBus->attach(string $event, callable $listener, int $priority = 1): ListenerHandler;
- Plugins
Instead of implementing Prooph\Common\Event\ActionEventListenerAggregate
a plugin now has to
implement Prooph\ServiceBus\Plugin\Plugin
. The signature is:
public function attachToMessageBus(MessageBus $messageBus): void;
public function detachFromMessageBus(MessageBus $messageBus): void;
Instead of having this:
$finder->findQueryOne(QueryOne $query, Deferred $deferred = null): void;
you simply have this:
$finder->find(QueryOne $query, Deferred $deferred = null): void;
If you want to go back to the old behaviour, you can do the following things:
a)
class MyFinder
{
public function find(Query $query, Deferred $deferred = null): void
{
if ($query instanceof QueryOne) {
$this->findQueryOne($query, $deferred);
} elseif ($query instanceof QueryTwo) {
$this->findQueryTwo($query, $deferred);
} else {
throw new \InvalidArgumentException('Unknown query passed');
}
}
}
or b) Write a custom FinderInvokeStrategy.
Same as for FinderInvokeStrategy, the handler is only expected have a handle(Command $command): void
method.
If you need the old behaviour back, implement this in your handlers or write a custom plugin.
Same as above: There handler is only expected to have a onEvent(Event $message): void
method.