Sometimes you want to make sure that messages in Symfony Messenger are sent to the consumer in batches rather than singly. Recently, we needed to send updated lines of text from our programs to a translation service provider via Messenger.
- . , : , , , 100 .
, :
// Symfony Messenger Message:
class TranslationUpdate
{
public function __construct(
public string $locale,
public string $key,
public string $value,
) {
}
}
class TranslationUpdateHandler implements MessageHandlerInterface
{
private const BUFFER_TIMER = 10; // in seconds
private const BUFFER_LIMIT = 100;
private array $buffer = [];
public function __construct(
private MessageBusInterface $messageBus,
) {
pcntl_async_signals(true);
pcntl_signal(SIGALRM, \Closure::fromCallable([$this, 'batchBuffer']));
}
public function __invoke(TranslationUpdate $message): void
{
$this->buffer[] = $message;
if (\count($this->buffer) >= self::BUFFER_LIMIT) {
$this->batchBuffer();
} else {
pcntl_alarm(self::BUFFER_TIMER);
}
}
private function batchBuffer(): void
{
if (0 === \count($this->buffer)) {
return;
}
$translationBatch = new TranslationBatch($this->buffer);
$this->messageBus->dispatch($translationBatch);
$this->buffer = [];
}
}
Messenger, , ( ).
, . 100 , batchBuffer
.
pcntl_alarm, batchBuffer
.
PHP- PCNTL ( PHP, , ). , SIGALRM . , , , pcntl_signal. , .
batchBuffer
Messenger (. dispatch
), , PCNTL, Messenger .
class TranslationBatch
{
/**
* @param TranslationUpdate[] $notifications
*/
public function __construct(
private array $notifications,
) {
}
}
class TranslationBatchHandler implements MessageHandlerInterface
{
public function __invoke(TranslationBatch $message): void
{
// handle all our messages
}
}
, , . Messenger , cron.
. โ . , , Redis.
"Symfony Framework". - ยซ -ยป. :
โ Symfony ClickHouse ( , ClickHouse). Business Intelligence- ,
โ API,
โ .
.