How to use the bridge
The main usage for a message bus is the asynchronous handling of tasks from so called ‘workers’ or ‘handlers’ (Symfony). Therefore a value object (message) is dispatched on a bus. A worker is specified to handle this message (asynchronously or syncronously). This prevents long running jobs from blocking a response for the user, if we stick to the http context.
A message could be as simple as this:
namespace App\Worker;
class UnimportantAsyncMessage
{
private string $title;
private ?string $message;
public function __construct(string $title, ?string $message = null)
{
$this->title = $title;
$this->message = $message;
}
public function getTitle(): string
{
return $this->title;
}
public function getMessage(): ?string
{
return $this->message;
}
}
To handle that message, we also define a worker, that is specific to this message. It uses the __invoke()
method
and has to be configured correctly. See the “Configuration” section for details.
namespace App\Worker;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Log\LoggerInterface;
final class UnimportantWorker
{
private LoggerInterface $logger;
public function __construct(ContainerInterface $container)
{
$this->logger = $container->get('MessageBusLogger');
}
public function __invoke(UnimportantAsyncMessage $message): void
{
$this->logger->info(sprintf('Working on message with title "%s"', $message->getTitle()));
// handle the message, e.g. updating the database or send an e-mail
}
}
The following example represents a handler that uses a form to gather info, which will be stuffed into an asynchronous
message. We can directly answer the user after submitting and later display calculated results. The SentStamp
will
be added by Symfony’s middleware and indicates the successful delegation to a transport. Don’t forget to pass the
MessageBusInterface
service in the handlers factory!
namespace App\Handler;
use App\Worker\ImportantSyncMessage;
use App\Worker\UnimportantAsyncMessage;
use Laminas\Diactoros\Response\HtmlResponse;
use Mezzio\Template\TemplateRendererInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Http\Server\RequestHandlerInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
final class MessageHandler implements RequestHandlerInterface
{
private TemplateRendererInterface $template;
private MessageBusInterface $bus;
public function __construct(TemplateRendererInterface $template, MessageBusInterface $bus)
{
$this->template = $template;
$this->bus = $bus;
}
public function handle(ServerRequestInterface $request): ResponseInterface
{
$params = $request->getParsedBody();
if ('POST' === $request->getMethod() && is_array($params)) {
$title = $params['title'] ?? null;
$description = $params['message'] ?? null;
if (!empty($title)) {
$message = new UnimportantAsyncMessage($title, $description);
$envelope = $this->bus->dispatch($amqpMessage);
if (null !== $envelope->last(SentStamp::class)) {
// success
} else {
// failure
}
} else {
// required params missing
}
}
return new HtmlResponse($this->template->render('...'));
}
}
Asynchronous message consuming
Taking the example above as granted, we now have at least one un-handled message in our broker (RabbitMQ, Database or
Redis) that needs to be “consumed”. To handle this asynchronously, the package provides a vendor/bin/message-bus
binary that is based on the Symfony/Console
component. Try php vendor/bin/message-bus
to list possible commands
for the consumers.
With php vendor/bin/message-bus messenger:consume Transport::$name
you can start a consumer that will take and
handle all messages from the given transport one by one. You should also take a look at the supervisor configuration
from Symfony Docs for making the consumer
sustainable.
Using the Logger
With this package there is a Monolog service registered as ‘MessageBusLogger’ in the Psr\Container
(Laminas Service
Manager). You can use the service to write into the same log files, as the Symfony services do. Take a look on the
‘UnimportantWorker’ above for an example.