Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reactphp ^0.7 || ^1.0 || ^1.1 compatibility #53

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
CHANGELOG
=========

* NEXT VERSION
* Bump react deps to 1.1.* (@phiamo)
* combined the in 0.6.0 existing ReadableStream and WritableStream
into the now also final InputStream and OutputStream classes
* adapted the cancelTimer API change in Client

* 0.2.0 (2014-11-29)

* Bump react deps to 0.4.* (@Easen)
Expand Down
7 changes: 4 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
"license": "MIT",
"require": {
"php": ">=5.4",
"evenement/evenement": "~2.0",
"react/socket": "0.4.*",
"evenement/evenement": "^3.0 || ^2.0 || ^1.0",
"react/socket": "^1.0 || ^0.7",
"react/event-loop": "^1.0 || ^0.5",
"react/promise": "~2.0"
},
"autoload": {
"psr-4": { "React\\Stomp\\": "src" }
},
"require-dev": {
"phpunit/phpunit": "^7.0 || ^6.4 || ^5.7 || ^4.8.35"
"phpunit/phpunit": "^9.0 || ^8.0 || ^7.0 || ^6.4 || ^5.7 || ^4.8.35"
}
}
17 changes: 12 additions & 5 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public function __construct(LoopInterface $loop, InputStreamInterface $input, Ou
$this->input->on('frame', array($this, 'handleFrameEvent'));
$this->input->on('error', array($this, 'handleErrorEvent'));
$this->input->on('close', array($this, 'handleCloseEvent'));
$this->input->on('heart-beat', array($this, 'handleHeartbeatEvent'));
$this->output = $output;

$this->options = $this->sanatizeOptions($options);
}

Expand All @@ -65,22 +65,23 @@ public function connect($timeout = 5)

$deferred = $this->connectDeferred = new Deferred();
$client = $this;

$loop = $this->loop;
$timer = $this->loop->addTimer($timeout, function () use ($client, $deferred) {
$deferred->reject(new ConnectionException('Connection timeout'));
$client->resetConnectDeferred();
$client->setConnectionStatus('not-connected');
});

$this->on('connect', function ($client) use ($timer, $deferred) {
$timer->cancel();
$this->on('connect', function ($client) use ($timer, $deferred, $loop) {
$loop->cancelTimer($timer);
$deferred->resolve($client);
});

$frame = $this->packageCreator->connect(
$this->options['vhost'],
$this->options['login'],
$this->options['passcode']
$this->options['passcode'],
$this->options['heart-beat']
);
$this->output->sendFrame($frame);

Expand Down Expand Up @@ -190,6 +191,11 @@ public function handleCloseEvent()
$this->emit('close');
}

public function handleHeartbeatEvent()
{
$this->emit('heart-beat');
}

public function processFrame(Frame $frame)
{
$command = $this->packageProcessor->receiveFrame($frame);
Expand Down Expand Up @@ -250,6 +256,7 @@ private function sanatizeOptions($options)
'vhost' => isset($options['host']) ? $options['host'] : null,
'login' => null,
'passcode' => null,
'heart-beat' => null,
), $options);
}

Expand Down
8 changes: 7 additions & 1 deletion src/Client/OutgoingPackageCreator.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public function __construct(State $state)
$this->state = $state;
}

public function connect($host, $login = null, $passcode = null)
public function connect($host, $login = null, $passcode = null, $heartbeat = null)
{
$this->state->startConnecting();

Expand All @@ -24,6 +24,12 @@ public function connect($host, $login = null, $passcode = null)
'passcode' => (string) $passcode,
));
}
if (null !== $heartbeat) {
$headers = array_merge($headers, array(
'heart-beat' => $heartbeat,
));
}

return new Frame('CONNECT', $headers);
}

Expand Down
1 change: 1 addition & 0 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Factory
'vhost' => '/',
'login' => 'guest',
'passcode' => 'guest',
'heart-beat' => null
);

private $loop;
Expand Down
38 changes: 36 additions & 2 deletions src/Io/InputStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

namespace React\Stomp\Io;

use React\Stomp\Protocol\Frame;
use React\Stomp\Protocol\Parser;
use React\Stream\WritableStream;
use Evenement\EventEmitter;
use React\Stream\WritableStreamInterface;

// $parser = new Parser();
// $input = new InputStream($parser);
Expand All @@ -12,8 +14,9 @@
// });
// $conn->pipe($input);

class InputStream extends WritableStream implements InputStreamInterface
final class InputStream extends EventEmitter implements WritableStreamInterface, InputStreamInterface
{
protected $closed = false;
private $buffer = '';
private $parser;

Expand All @@ -24,6 +27,11 @@ public function __construct(Parser $parser)

public function write($data)
{
if ($data === "\x0a") {
$this->emit('heart-beat', [new Frame('MESSAGE\nHEART-BEAT')]);
$data = '';
}

$data = $this->buffer.$data;
list($frames, $data) = $this->parser->parse($data);
$this->buffer = $data;
Expand All @@ -32,4 +40,30 @@ public function write($data)
$this->emit('frame', array($frame));
}
}

public function end($data = null)
{
if (null !== $data) {
$this->write($data);
}

$this->close();
}

public function isWritable()
{
return !$this->closed;
}

public function close()
{
if ($this->closed) {
return;
}

$this->closed = true;
$this->emit('close');
$this->removeAllListeners();
}

}
50 changes: 39 additions & 11 deletions src/Io/OutputStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@

use React\EventLoop\LoopInterface;
use React\Stomp\Protocol\Frame;
use React\Stream\ReadableStream;
use React\Stream\ReadableStreamInterface;
use React\Stream\Util;
use React\Stream\WritableStreamInterface;
use Evenement\EventEmitter;

// $output = new OutputStream();
// $output->pipe($conn);
// $output->sendFrame($frame);

class OutputStream extends ReadableStream implements OutputStreamInterface
final class OutputStream extends EventEmitter implements ReadableStreamInterface, OutputStreamInterface
{
protected $closed = false;

private $loop;
private $paused = false;
private $bufferedFrames = array();
Expand All @@ -32,6 +37,28 @@ public function sendFrame(Frame $frame)
$this->emit('data', array($data));
}



public function sendBufferedFrames()
{
if ($this->paused) {
return;
}

while ($frame = array_shift($this->bufferedFrames)) {
$this->sendFrame($frame);

if ($this->paused) {
return;
}
}
}
public function isReadable()
{
return !$this->closed;
}


public function pause()
{
$this->paused = true;
Expand All @@ -44,18 +71,19 @@ public function resume()
$this->loop->addTimer(0.001, array($this, 'sendBufferedFrames'));
}

public function sendBufferedFrames()
public function pipe(WritableStreamInterface $dest, array $options = array())
{
if ($this->paused) {
return Util::pipe($this, $dest, $options);
}

public function close()
{
if ($this->closed) {
return;
}

while ($frame = array_shift($this->bufferedFrames)) {
$this->sendFrame($frame);

if ($this->paused) {
return;
}
}
$this->closed = true;
$this->emit('close');
$this->removeAllListeners();
}
}
17 changes: 12 additions & 5 deletions tests/React/Tests/Stomp/ClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ public function itShouldThrowAnExceptionOnConnectedFrameOutsideWindow()
$errors = array();

$client = new Client($loop, $input, $output, array('vhost' => 'localhost'));
$client->on('error', function ($error) use (&$errors) { $errors[] = $error; });
$client->on('error', function ($error) use (&$errors) {
$errors[] = $error;
});

$frame = new Frame('CONNECTED', array('session' => '1234', 'server' => 'React/alpha'));
$input->emit('frame', array($frame));
Expand Down Expand Up @@ -537,10 +539,10 @@ public function messagesShouldGetRoutedToSubscriptions()
}

/**
* @test
* @depends messagesShouldGetRoutedToSubscriptions
* @doesNotPerformAssertions
*/
* @test
* @depends messagesShouldGetRoutedToSubscriptions
* @doesNotPerformAssertions
*/
public function callbackShouldNotBeCalledAfterUnsubscribe($data)
{
list($input, $output, $client, $capturedFrame) = $data;
Expand Down Expand Up @@ -708,7 +710,12 @@ private function createLoopMock()

private function createLoopMockWithConnectionTimer()
{
$timer = $this->createMock('React\EventLoop\TimerInterface');

$loop = $this->createLoopMock();
$loop->expects($this->once())
->method('cancelTimer')
->with($timer);

$timer = $this->getMockBuilder('React\EventLoop\Timer\TimerInterface')->getMock();
$timer->expects($this->once())
Expand Down
2 changes: 1 addition & 1 deletion tests/React/Tests/Stomp/Io/OutputStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

class OutputStreamTest extends TestCase
{
public function setUp()
public function setUp(): void
{
$this->loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
$this->loop
Expand Down
2 changes: 1 addition & 1 deletion tests/bootstrap.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<?php

$loader = require __DIR__.'/../vendor/autoload.php';
$loader = require __DIR__ . '/../vendor/autoload.php';
$loader->add('React\Tests', __DIR__);
$loader->add('React\Functional', __DIR__);