-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
IlluminateOutboxRepository.php
117 lines (94 loc) · 3.21 KB
/
IlluminateOutboxRepository.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
<?php
namespace EventSauce\MessageOutbox\IlluminateOutbox;
use EventSauce\EventSourcing\Message;
use EventSauce\EventSourcing\Serialization\MessageSerializer;
use EventSauce\MessageOutbox\OutboxRepository;
use Illuminate\Database\ConnectionInterface;
use Traversable;
use function array_map;
use function count;
use function json_decode;
use function json_encode;
class IlluminateOutboxRepository implements OutboxRepository
{
public const ILLUMINATE_OUTBOX_MESSAGE_ID = '__illuminate_outbox.message_id';
public function __construct(
private ConnectionInterface $connection,
private string $tableName,
private MessageSerializer $serializer
) {
}
public function persist(Message ...$messages): void
{
if (count($messages) === 0) {
return;
}
$inserts = array_map(
function (Message $message) {
return ['payload' => json_encode($this->serializer->serializeMessage($message))];
},
$messages
);
$this->connection->table($this->tableName)->insert($inserts);
}
public function retrieveBatch(int $batchSize): Traversable
{
$results = $this->connection->table($this->tableName)
->where('consumed', false)
->select()
->limit($batchSize)
->offset(0)
->get();
foreach ($results as $row) {
$payload = json_decode($row->payload, true);
$message = $this->serializer->unserializePayload($payload);
yield $message->withHeader(self::ILLUMINATE_OUTBOX_MESSAGE_ID, (int) $row->id);
}
}
public function markConsumed(Message ...$messages): void
{
if (count($messages) === 0) {
return;
}
$ids = array_map(
fn(Message $message) => $this->idFromMessage($message),
$messages,
);
$this->connection->table($this->tableName)->whereIn('id', $ids)->update(['consumed' => true]);
}
public function numberOfConsumedMessages(): int
{
return $this->connection->table($this->tableName)->where('consumed', true)->count('id');
}
public function numberOfPendingMessages(): int
{
return $this->connection->table($this->tableName)->where('consumed', false)->count('id');
}
public function numberOfMessages(): int
{
return $this->connection->table($this->tableName)->count('id');
}
public function cleanupConsumedMessages(int $amount): int
{
return $this->connection->table($this->tableName)->where('consumed', true)->orderBy('id', 'asc')->limit(
$amount
)->delete();
}
private function idFromMessage(Message $message): int
{
/** @var int|string $id */
$id = $message->header(self::ILLUMINATE_OUTBOX_MESSAGE_ID);
return (int) $id;
}
public function deleteMessages(Message ...$messages): void
{
if (count($messages) === 0) {
return;
}
$ids = array_map(
fn(Message $message) => $this->idFromMessage($message),
$messages,
);
$this->connection->table($this->tableName)->whereIn('id', $ids)->delete();
}
}