forked from php-enqueue/stomp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
BufferedStompClient.php
116 lines (92 loc) · 2.7 KB
/
BufferedStompClient.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
<?php
namespace Enqueue\Stomp;
use Stomp\Client;
use Stomp\Transport\Frame;
class BufferedStompClient extends Client
{
/**
* [
* 'subscriptionId' => Frame[],
* ].
*
* @var array
*/
private $buffer;
/**
* @var int int
*/
private $bufferSize;
/**
* @var int
*/
private $currentBufferSize;
/**
* @param \Stomp\Network\Connection|string $broker
* @param int $bufferSize
*/
public function __construct($broker, $bufferSize = 1000)
{
parent::__construct($broker);
$this->bufferSize = $bufferSize;
$this->buffer = [];
$this->currentBufferSize = 0;
}
/**
* @return int
*/
public function getBufferSize()
{
return $this->bufferSize;
}
/**
* Timeout is in milliseconds.
*/
public function readMessageFrame(string $subscriptionId, int $timeout): ?Frame
{
// pop up frame from the buffer
if (isset($this->buffer[$subscriptionId]) && ($frame = array_shift($this->buffer[$subscriptionId]))) {
--$this->currentBufferSize;
return $frame;
}
// do nothing when buffer is full
if ($this->currentBufferSize >= $this->bufferSize) {
return null;
}
$startTime = microtime(true);
$remainingTimeout = $timeout * 1000;
while (true) {
$this->getConnection()->setReadTimeout(0, $remainingTimeout);
// there is nothing to read
if (false === $frame = $this->readFrame()) {
return null;
}
if ('MESSAGE' !== $frame->getCommand()) {
throw new \LogicException(sprintf('Unexpected frame was received: "%s"', $frame->getCommand()));
}
$headers = $frame->getHeaders();
if (false == isset($headers['subscription'])) {
throw new \LogicException('Got message frame with missing subscription header');
}
// frame belongs to another subscription
if ($headers['subscription'] !== $subscriptionId) {
$this->buffer[$headers['subscription']][] = $frame;
++$this->currentBufferSize;
$remainingTimeout -= (microtime(true) - $startTime) * 1000000;
if ($remainingTimeout <= 0) {
return null;
}
continue;
}
return $frame;
}
}
/**
* {@inheritdoc}
*/
public function disconnect($sync = false)
{
parent::disconnect($sync);
$this->buffer = [];
$this->currentBufferSize = 0;
}
}