[MessageOutbox] Outbox relay design #215
-
We've been using The docs say:
However, the We can easily provide our own implementation for our use case, sure, but I was wondering if this wouldn't be useful for the package itself. Untested implementation<?php
namespace EventSauce\MessageOutbox;
use EventSauce\BackOff\BackOffStrategy;
use EventSauce\BackOff\ExponentialBackOffStrategy;
use EventSauce\EventSourcing\Message;
use EventSauce\EventSourcing\MessageConsumer;
use EventSauce\EventSourcing\MessageDispatcher;
use Throwable;
use Traversable;
use function count;
final readonly class OutboxRelay
{
public function __construct(
private OutboxRepository $repository,
private MessageDispatcher $dispatcher,
private BackOffStrategy $backOff = new ExponentialBackOffStrategy(100000, 25),
private RelayCommitStrategy $commitStrategy = new MarkMessagesConsumedOnCommit(),
) {
}
public function publishBatch(int $batchSize, int $commitSize = 1): int
{
$numberPublished = 0;
$messages = $this->repository->retrieveBatch($batchSize);
foreach ($this->batchByCommitSize($messages, $commitSize) as $batch) {
$numberPublished += $this->dispatchMessages($batch);
}
return $numberPublished;
}
/**
* @param Traversable<Message>
* @return iterable<list<Message>>
*/
private function batchByCommitSize(Traversable $messages, int $size): iterable
{
$batch = [];
foreach ($messages as $message) {
$batch[] = $message;
if (count($batch) === $size) {
yield $batch;
$batch = [];
}
}
if (count($batch) > 0) {
yield $batch;
}
}
/** @param list<Message> $messages */
private function dispatchMessages(array $messages): int
{
$tries = 0;
start_relay:
try {
$tries++;
$this->dispatcher->dispatch(...$messages);
} catch (Throwable $throwable) {
$this->backOff->backOff($tries, $throwable);
goto start_relay;
}
$this->commitStrategy->commitMessages($this->repository, ...$messages);
return count($messages);
}
} Can you please confirm if our assumptions around batching are correct and if the implementation shared is something useful? We can turn it into a PR 👍 |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Hi @lcobucci, this sounds like a very much worthwhile addition. Shall we introduce this as a |
Beta Was this translation helpful? Give feedback.
Hi @lcobucci, this sounds like a very much worthwhile addition. Shall we introduce this as a
BatchedOutboxRelay
? That way we can introduce it in a non-breaking way. The old implementation can then stay in case people use/prefer it for their own case. Alternatively, we can rename the existing toRelayOutboxThroughConsumer
and call the new oneRelayOutboxThroughDispatcher
and create a class that extends the RelayThroughConsumer for backward compatibility. I'm OK either way, do you have any preference or ideas about this?