diff --git a/.travis.yml b/.travis.yml index 130250b..0a88ea2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -25,7 +25,8 @@ before_script: sudo service rabbitmq-server start; fi - if [ "$STOMP_PROVIDER" = 'activemq' ]; then - sudo apt install activemq tree; + sudo apt update; + sudo apt install activemq; sudo cp tests/utils/activemq.xml /etc/activemq/instances-available/main/; sudo ln -s /etc/activemq/instances-available/main /etc/activemq/instances-enabled/main; sudo service activemq start; diff --git a/composer.json b/composer.json index 202c286..9abd311 100644 --- a/composer.json +++ b/composer.json @@ -6,8 +6,8 @@ "require": { "php": ">=5.4", "evenement/evenement": "~2.0", - "react/socket": "0.4.*", - "react/promise": "~2.0" + "react/socket": "^1.1", + "react/promise": "^2.2.1" }, "autoload": { "psr-4": { "React\\Stomp\\": "src" } diff --git a/composer.lock b/composer.lock index 09dcf27..34c5509 100644 --- a/composer.lock +++ b/composer.lock @@ -1,10 +1,10 @@ { "_readme": [ "This file locks the dependencies of your project to a known state", - "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file", + "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "ff2121cc38a06173915aa056aa3b7619", + "content-hash": "f45a878cedbacf3be6ea97a8c0a636e5", "packages": [ { "name": "evenement/evenement", @@ -54,30 +54,115 @@ ], "time": "2017-07-17T17:39:19+00:00" }, + { + "name": "react/cache", + "version": "v0.5.0", + "source": { + "type": "git", + "url": "https://github.com/reactphp/cache.git", + "reference": "7d7da7fb7574d471904ba357b39bbf110ccdbf66" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/reactphp/cache/zipball/7d7da7fb7574d471904ba357b39bbf110ccdbf66", + "reference": "7d7da7fb7574d471904ba357b39bbf110ccdbf66", + "shasum": "" + }, + "require": { + "php": ">=5.3.0", + "react/promise": "~2.0|~1.1" + }, + "require-dev": { + "phpunit/phpunit": "^6.4 || ^5.7 || ^4.8.35" + }, + "type": "library", + "autoload": { + "psr-4": { + "React\\Cache\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "Async, Promise-based cache interface for ReactPHP", + "keywords": [ + "cache", + "caching", + "promise", + "reactphp" + ], + "time": "2018-06-25T12:52:40+00:00" + }, + { + "name": "react/dns", + "version": "v0.4.16", + "source": { + "type": "git", + "url": "https://github.com/reactphp/dns.git", + "reference": "0a0bedfec72b38406413c6ea01e1c015bd0bf72b" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/reactphp/dns/zipball/0a0bedfec72b38406413c6ea01e1c015bd0bf72b", + "reference": "0a0bedfec72b38406413c6ea01e1c015bd0bf72b", + "shasum": "" + }, + "require": { + "php": ">=5.3.0", + "react/cache": "^0.5 || ^0.4 || ^0.3", + "react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3.5", + "react/promise": "^2.1 || ^1.2.1", + "react/promise-timer": "^1.2", + "react/stream": "^1.0 || ^0.7 || ^0.6 || ^0.5 || ^0.4.5" + }, + "require-dev": { + "clue/block-react": "^1.2", + "phpunit/phpunit": "^6.4 || ^5.7 || ^4.8.35" + }, + "type": "library", + "autoload": { + "psr-4": { + "React\\Dns\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "Async DNS resolver for ReactPHP", + "keywords": [ + "async", + "dns", + "dns-resolver", + "reactphp" + ], + "time": "2018-11-11T11:21:13+00:00" + }, { "name": "react/event-loop", - "version": "v0.4.3", + "version": "v1.1.0", "source": { "type": "git", "url": "https://github.com/reactphp/event-loop.git", - "reference": "8bde03488ee897dc6bb3d91e4e17c353f9c5252f" + "reference": "a0ecac955c67b57c40fe4a1b88a7cca1b58c982d" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/reactphp/event-loop/zipball/8bde03488ee897dc6bb3d91e4e17c353f9c5252f", - "reference": "8bde03488ee897dc6bb3d91e4e17c353f9c5252f", + "url": "https://api.github.com/repos/reactphp/event-loop/zipball/a0ecac955c67b57c40fe4a1b88a7cca1b58c982d", + "reference": "a0ecac955c67b57c40fe4a1b88a7cca1b58c982d", "shasum": "" }, "require": { - "php": ">=5.4.0" + "php": ">=5.3.0" }, "require-dev": { - "phpunit/phpunit": "~4.8" + "phpunit/phpunit": "^7.0 || ^6.4 || ^5.7 || ^4.8.35" }, "suggest": { - "ext-event": "~1.0", - "ext-libev": "*", - "ext-libevent": ">=0.1.0" + "ext-event": "~1.0 for ExtEventLoop", + "ext-pcntl": "For signal handling support when using the StreamSelectLoop", + "ext-uv": "* for ExtUvLoop" }, "type": "library", "autoload": { @@ -89,25 +174,25 @@ "license": [ "MIT" ], - "description": "Event loop abstraction layer that libraries can use for evented I/O.", + "description": "ReactPHP's core reactor event loop that libraries can use for evented I/O.", "keywords": [ "asynchronous", "event-loop" ], - "time": "2017-04-27T10:56:23+00:00" + "time": "2019-02-07T16:19:49+00:00" }, { "name": "react/promise", - "version": "v2.5.1", + "version": "v2.7.1", "source": { "type": "git", "url": "https://github.com/reactphp/promise.git", - "reference": "62785ae604c8d69725d693eb370e1d67e94c4053" + "reference": "31ffa96f8d2ed0341a57848cbb84d88b89dd664d" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/reactphp/promise/zipball/62785ae604c8d69725d693eb370e1d67e94c4053", - "reference": "62785ae604c8d69725d693eb370e1d67e94c4053", + "url": "https://api.github.com/repos/reactphp/promise/zipball/31ffa96f8d2ed0341a57848cbb84d88b89dd664d", + "reference": "31ffa96f8d2ed0341a57848cbb84d88b89dd664d", "shasum": "" }, "require": { @@ -140,33 +225,87 @@ "promise", "promises" ], - "time": "2017-03-25T12:08:31+00:00" + "time": "2019-01-07T21:25:54+00:00" + }, + { + "name": "react/promise-timer", + "version": "v1.5.0", + "source": { + "type": "git", + "url": "https://github.com/reactphp/promise-timer.git", + "reference": "a11206938ca2394dc7bb368f5da25cd4533fa603" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/reactphp/promise-timer/zipball/a11206938ca2394dc7bb368f5da25cd4533fa603", + "reference": "a11206938ca2394dc7bb368f5da25cd4533fa603", + "shasum": "" + }, + "require": { + "php": ">=5.3", + "react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3.5", + "react/promise": "^2.7.0 || ^1.2.1" + }, + "require-dev": { + "phpunit/phpunit": "^6.4 || ^5.7 || ^4.8.35" + }, + "type": "library", + "autoload": { + "psr-4": { + "React\\Promise\\Timer\\": "src/" + }, + "files": [ + "src/functions.php" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Christian Lück", + "email": "christian@lueck.tv" + } + ], + "description": "A trivial implementation of timeouts for Promises, built on top of ReactPHP.", + "homepage": "https://github.com/reactphp/promise-timer", + "keywords": [ + "async", + "event-loop", + "promise", + "reactphp", + "timeout", + "timer" + ], + "time": "2018-06-13T16:45:37+00:00" }, { "name": "react/socket", - "version": "v0.4.6", + "version": "v1.2.0", "source": { "type": "git", "url": "https://github.com/reactphp/socket.git", - "reference": "cf074e53c974df52388ebd09710a9018894745d2" + "reference": "23b7372bb25cea934f6124f5bdac34e30161959e" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/reactphp/socket/zipball/cf074e53c974df52388ebd09710a9018894745d2", - "reference": "cf074e53c974df52388ebd09710a9018894745d2", + "url": "https://api.github.com/repos/reactphp/socket/zipball/23b7372bb25cea934f6124f5bdac34e30161959e", + "reference": "23b7372bb25cea934f6124f5bdac34e30161959e", "shasum": "" }, "require": { - "evenement/evenement": "~2.0|~1.0", + "evenement/evenement": "^3.0 || ^2.0 || ^1.0", "php": ">=5.3.0", - "react/event-loop": "0.4.*|0.3.*", - "react/promise": "^2.0 || ^1.1", - "react/stream": "^0.4.5" + "react/dns": "^0.4.13", + "react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3.5", + "react/promise": "^2.6.0 || ^1.2.1", + "react/promise-timer": "^1.4.0", + "react/stream": "^1.1" }, "require-dev": { - "clue/block-react": "^1.1", - "phpunit/phpunit": "~4.8", - "react/socket-client": "^0.5.1" + "clue/block-react": "^1.2", + "phpunit/phpunit": "^6.4 || ^5.7 || ^4.8.35" }, "type": "library", "autoload": { @@ -178,38 +317,38 @@ "license": [ "MIT" ], - "description": "Async, streaming plaintext TCP/IP and secure TLS socket server for React PHP", + "description": "Async, streaming plaintext TCP/IP and secure TLS socket server and client connections for ReactPHP", "keywords": [ - "Socket" + "Connection", + "Socket", + "async", + "reactphp", + "stream" ], - "time": "2017-01-26T09:23:38+00:00" + "time": "2019-01-07T14:10:13+00:00" }, { "name": "react/stream", - "version": "v0.4.6", + "version": "v1.1.0", "source": { "type": "git", "url": "https://github.com/reactphp/stream.git", - "reference": "44dc7f51ea48624110136b535b9ba44fd7d0c1ee" + "reference": "50426855f7a77ddf43b9266c22320df5bf6c6ce6" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/reactphp/stream/zipball/44dc7f51ea48624110136b535b9ba44fd7d0c1ee", - "reference": "44dc7f51ea48624110136b535b9ba44fd7d0c1ee", + "url": "https://api.github.com/repos/reactphp/stream/zipball/50426855f7a77ddf43b9266c22320df5bf6c6ce6", + "reference": "50426855f7a77ddf43b9266c22320df5bf6c6ce6", "shasum": "" }, "require": { - "evenement/evenement": "^2.0|^1.0", - "php": ">=5.3.8" + "evenement/evenement": "^3.0 || ^2.0 || ^1.0", + "php": ">=5.3.8", + "react/event-loop": "^1.0 || ^0.5 || ^0.4 || ^0.3.5" }, "require-dev": { "clue/stream-filter": "~1.2", - "react/event-loop": "^0.4|^0.3", - "react/promise": "^2.0|^1.0" - }, - "suggest": { - "react/event-loop": "^0.4", - "react/promise": "^2.0" + "phpunit/phpunit": "^6.4 || ^5.7 || ^4.8.35" }, "type": "library", "autoload": { @@ -221,12 +360,18 @@ "license": [ "MIT" ], - "description": "Basic readable and writable stream interfaces that support piping.", + "description": "Event-driven readable and writable streams for non-blocking I/O in ReactPHP", "keywords": [ + "event-driven", + "io", + "non-blocking", "pipe", - "stream" + "reactphp", + "readable", + "stream", + "writable" ], - "time": "2017-01-25T14:44:14+00:00" + "time": "2019-01-01T16:15:09+00:00" } ], "packages-dev": [ @@ -328,16 +473,16 @@ }, { "name": "phpdocumentor/reflection-common", - "version": "1.0", + "version": "1.0.1", "source": { "type": "git", "url": "https://github.com/phpDocumentor/ReflectionCommon.git", - "reference": "144c307535e82c8fdcaacbcfc1d6d8eeb896687c" + "reference": "21bdeb5f65d7ebf9f43b1b25d404f87deab5bfb6" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpDocumentor/ReflectionCommon/zipball/144c307535e82c8fdcaacbcfc1d6d8eeb896687c", - "reference": "144c307535e82c8fdcaacbcfc1d6d8eeb896687c", + "url": "https://api.github.com/repos/phpDocumentor/ReflectionCommon/zipball/21bdeb5f65d7ebf9f43b1b25d404f87deab5bfb6", + "reference": "21bdeb5f65d7ebf9f43b1b25d404f87deab5bfb6", "shasum": "" }, "require": { @@ -378,7 +523,7 @@ "reflection", "static analysis" ], - "time": "2015-12-27T11:43:31+00:00" + "time": "2017-09-11T18:02:19+00:00" }, { "name": "phpdocumentor/reflection-docblock", @@ -474,33 +619,33 @@ }, { "name": "phpspec/prophecy", - "version": "v1.7.0", + "version": "1.8.0", "source": { "type": "git", "url": "https://github.com/phpspec/prophecy.git", - "reference": "93d39f1f7f9326d746203c7c056f300f7f126073" + "reference": "4ba436b55987b4bf311cb7c6ba82aa528aac0a06" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpspec/prophecy/zipball/93d39f1f7f9326d746203c7c056f300f7f126073", - "reference": "93d39f1f7f9326d746203c7c056f300f7f126073", + "url": "https://api.github.com/repos/phpspec/prophecy/zipball/4ba436b55987b4bf311cb7c6ba82aa528aac0a06", + "reference": "4ba436b55987b4bf311cb7c6ba82aa528aac0a06", "shasum": "" }, "require": { "doctrine/instantiator": "^1.0.2", "php": "^5.3|^7.0", - "phpdocumentor/reflection-docblock": "^2.0|^3.0.2", - "sebastian/comparator": "^1.1|^2.0", + "phpdocumentor/reflection-docblock": "^2.0|^3.0.2|^4.0", + "sebastian/comparator": "^1.1|^2.0|^3.0", "sebastian/recursion-context": "^1.0|^2.0|^3.0" }, "require-dev": { "phpspec/phpspec": "^2.5|^3.2", - "phpunit/phpunit": "^4.8 || ^5.6.5" + "phpunit/phpunit": "^4.8.35 || ^5.7 || ^6.5 || ^7.1" }, "type": "library", "extra": { "branch-alias": { - "dev-master": "1.6.x-dev" + "dev-master": "1.8.x-dev" } }, "autoload": { @@ -533,7 +678,7 @@ "spy", "stub" ], - "time": "2017-03-02T20:05:34+00:00" + "time": "2018-08-05T17:53:17+00:00" }, { "name": "phpunit/php-code-coverage", @@ -600,16 +745,16 @@ }, { "name": "phpunit/php-file-iterator", - "version": "1.4.2", + "version": "1.4.5", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/php-file-iterator.git", - "reference": "3cc8f69b3028d0f96a9078e6295d86e9bf019be5" + "reference": "730b01bc3e867237eaac355e06a36b85dd93a8b4" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/php-file-iterator/zipball/3cc8f69b3028d0f96a9078e6295d86e9bf019be5", - "reference": "3cc8f69b3028d0f96a9078e6295d86e9bf019be5", + "url": "https://api.github.com/repos/sebastianbergmann/php-file-iterator/zipball/730b01bc3e867237eaac355e06a36b85dd93a8b4", + "reference": "730b01bc3e867237eaac355e06a36b85dd93a8b4", "shasum": "" }, "require": { @@ -643,7 +788,7 @@ "filesystem", "iterator" ], - "time": "2016-10-03T07:40:28+00:00" + "time": "2017-11-27T13:52:08+00:00" }, { "name": "phpunit/php-text-template", @@ -1438,25 +1583,87 @@ "homepage": "https://github.com/sebastianbergmann/version", "time": "2016-10-03T07:35:21+00:00" }, + { + "name": "symfony/polyfill-ctype", + "version": "v1.10.0", + "source": { + "type": "git", + "url": "https://github.com/symfony/polyfill-ctype.git", + "reference": "e3d826245268269cd66f8326bd8bc066687b4a19" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/polyfill-ctype/zipball/e3d826245268269cd66f8326bd8bc066687b4a19", + "reference": "e3d826245268269cd66f8326bd8bc066687b4a19", + "shasum": "" + }, + "require": { + "php": ">=5.3.3" + }, + "suggest": { + "ext-ctype": "For best performance" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.9-dev" + } + }, + "autoload": { + "psr-4": { + "Symfony\\Polyfill\\Ctype\\": "" + }, + "files": [ + "bootstrap.php" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + }, + { + "name": "Gert de Pagter", + "email": "BackEndTea@gmail.com" + } + ], + "description": "Symfony polyfill for ctype functions", + "homepage": "https://symfony.com", + "keywords": [ + "compatibility", + "ctype", + "polyfill", + "portable" + ], + "time": "2018-08-06T14:22:27+00:00" + }, { "name": "symfony/yaml", - "version": "v3.3.5", + "version": "v3.4.22", "source": { "type": "git", "url": "https://github.com/symfony/yaml.git", - "reference": "1f93a8d19b8241617f5074a123e282575b821df8" + "reference": "ba11776e9e6c15ad5759a07bffb15899bac75c2d" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/yaml/zipball/1f93a8d19b8241617f5074a123e282575b821df8", - "reference": "1f93a8d19b8241617f5074a123e282575b821df8", + "url": "https://api.github.com/repos/symfony/yaml/zipball/ba11776e9e6c15ad5759a07bffb15899bac75c2d", + "reference": "ba11776e9e6c15ad5759a07bffb15899bac75c2d", "shasum": "" }, "require": { - "php": ">=5.5.9" + "php": "^5.5.9|>=7.0.8", + "symfony/polyfill-ctype": "~1.8" + }, + "conflict": { + "symfony/console": "<3.4" }, "require-dev": { - "symfony/console": "~2.8|~3.0" + "symfony/console": "~3.4|~4.0" }, "suggest": { "symfony/console": "For validating YAML files using the lint command" @@ -1464,7 +1671,7 @@ "type": "library", "extra": { "branch-alias": { - "dev-master": "3.3-dev" + "dev-master": "3.4-dev" } }, "autoload": { @@ -1491,24 +1698,25 @@ ], "description": "Symfony Yaml Component", "homepage": "https://symfony.com", - "time": "2017-06-15T12:58:50+00:00" + "time": "2019-01-16T10:59:17+00:00" }, { "name": "webmozart/assert", - "version": "1.2.0", + "version": "1.4.0", "source": { "type": "git", "url": "https://github.com/webmozart/assert.git", - "reference": "2db61e59ff05fe5126d152bd0655c9ea113e550f" + "reference": "83e253c8e0be5b0257b881e1827274667c5c17a9" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/webmozart/assert/zipball/2db61e59ff05fe5126d152bd0655c9ea113e550f", - "reference": "2db61e59ff05fe5126d152bd0655c9ea113e550f", + "url": "https://api.github.com/repos/webmozart/assert/zipball/83e253c8e0be5b0257b881e1827274667c5c17a9", + "reference": "83e253c8e0be5b0257b881e1827274667c5c17a9", "shasum": "" }, "require": { - "php": "^5.3.3 || ^7.0" + "php": "^5.3.3 || ^7.0", + "symfony/polyfill-ctype": "^1.8" }, "require-dev": { "phpunit/phpunit": "^4.6", @@ -1541,7 +1749,7 @@ "check", "validate" ], - "time": "2016-11-23T20:04:58+00:00" + "time": "2018-12-25T11:19:39+00:00" } ], "aliases": [], diff --git a/src/Client.php b/src/Client.php index 073d87c..717b7ed 100644 --- a/src/Client.php +++ b/src/Client.php @@ -4,20 +4,19 @@ use Evenement\EventEmitter; use React\Promise\Deferred; -use React\Promise\PromiseInterface; -use React\Stomp\Client\IncomingPackageProcessor; -use React\Stomp\Client\OutgoingPackageCreator; use React\Stomp\Client\State; -use React\Stomp\Client\Command\CommandInterface; -use React\Stomp\Client\Command\CloseCommand; -use React\Stomp\Client\Command\ConnectionEstablishedCommand; -use React\Stomp\Client\Command\NullCommand; -use React\Stomp\Exception\ProcessingException; -use React\Stomp\Exception\ConnectionException; -use React\Stomp\Io\InputStreamInterface; -use React\Stomp\Io\OutputStreamInterface; use React\Stomp\Protocol\Frame; use React\EventLoop\LoopInterface; +use React\Stream\ReadableStreamInterface; +use React\Stream\WritableStreamInterface; +use React\Stomp\Client\Command\NullCommand; +use React\Stomp\Client\Command\CloseCommand; +use React\Stomp\Client\OutgoingPackageCreator; +use React\Stomp\Exception\ConnectionException; +use React\Stomp\Exception\ProcessingException; +use React\Stomp\Client\Command\CommandInterface; +use React\Stomp\Client\IncomingPackageProcessor; +use React\Stomp\Client\Command\ConnectionEstablishedCommand; /** * @event connect @@ -39,7 +38,7 @@ class Client extends EventEmitter /** @var PromiseInterface */ private $connectPromise; - public function __construct(LoopInterface $loop, InputStreamInterface $input, OutputStreamInterface $output, array $options) + public function __construct(LoopInterface $loop, WritableStreamInterface $input, ReadableStreamInterface $output, array $options) { $this->loop = $loop; $state = new State(); @@ -61,7 +60,7 @@ public function connect($timeout = 5) return $this->connectPromise; } - $this->connectionStatus = 'connecting'; + $this->setConnectionStatus('connecting'); $deferred = $this->connectDeferred = new Deferred(); $client = $this; @@ -73,7 +72,7 @@ public function connect($timeout = 5) }); $this->on('connect', function ($client) use ($timer, $deferred) { - $timer->cancel(); + $this->loop->cancelTimer($timer); $deferred->resolve($client); }); @@ -82,7 +81,7 @@ public function connect($timeout = 5) $this->options['login'], $this->options['passcode'] ); - $this->output->sendFrame($frame); + $this->sendFrameToOutput($frame); return $this->connectPromise = $deferred->promise()->then(function () use ($client) { $client->setConnectionStatus('connected'); @@ -90,10 +89,15 @@ public function connect($timeout = 5) }); } + private function sendFrameToOutput(Frame $frame) + { + $this->output->emit('data', array($frame)); + } + public function send($destination, $body, array $headers = array()) { $frame = $this->packageCreator->send($destination, $body, $headers); - $this->output->sendFrame($frame); + $this->sendFrameToOutput($frame); } public function subscribe($destination, $callback, array $headers = array()) @@ -112,7 +116,7 @@ public function subscribeWithAck($destination, $ack, $callback, array $headers = private function doSubscription($destination, $callback, $ack, array $headers) { $frame = $this->packageCreator->subscribe($destination, $ack, $headers); - $this->output->sendFrame($frame); + $this->sendFrameToOutput($frame); $subscriptionId = $frame->getHeader('id'); @@ -125,7 +129,7 @@ private function doSubscription($destination, $callback, $ack, array $headers) public function unsubscribe($subscriptionId, array $headers = array()) { $frame = $this->packageCreator->unsubscribe($subscriptionId, $headers); - $this->output->sendFrame($frame); + $this->sendFrameToOutput($frame); unset($this->acknowledgements[$subscriptionId]); unset($this->subscriptions[$subscriptionId]); @@ -134,24 +138,24 @@ public function unsubscribe($subscriptionId, array $headers = array()) public function ack($subscriptionId, $messageId, array $headers = array()) { $frame = $this->packageCreator->ack($subscriptionId, $messageId, $headers); - $this->output->sendFrame($frame); + $this->sendFrameToOutput($frame); } public function nack($subscriptionId, $messageId, array $headers = array()) { $frame = $this->packageCreator->nack($subscriptionId, $messageId, $headers); - $this->output->sendFrame($frame); + $this->sendFrameToOutput($frame); } public function disconnect() { $receipt = $this->generateReceiptId(); $frame = $this->packageCreator->disconnect($receipt); - $this->output->sendFrame($frame); + $this->sendFrameToOutput($frame); $this->connectDeferred = null; $this->connectPromise = null; - $this->connectionStatus = 'not-connected'; + $this->setConnectionStatus('not-connected'); } public function resetConnectDeferred() @@ -163,6 +167,7 @@ public function resetConnectDeferred() public function handleFrameEvent(Frame $frame) { try { + $this->emit('frame', array($frame)); $this->processFrame($frame); } catch (ProcessingException $e) { $this->emit('error', array($e)); @@ -171,7 +176,7 @@ public function handleFrameEvent(Frame $frame) $this->connectDeferred->reject($e); $this->connectDeferred = null; $this->connectPromise = null; - $this->connectionStatus = 'not-connected'; + $this->setConnectionStatus('not-connected'); } } } @@ -185,7 +190,7 @@ public function handleCloseEvent() { $this->connectDeferred = null; $this->connectPromise = null; - $this->connectionStatus = 'not-connected'; + $this->setConnectionStatus('not-connected'); $this->emit('close'); } @@ -261,11 +266,12 @@ public function isConnected() public function setConnectionStatus($status) { $this->connectionStatus = $status; + + $this->emit('connection-status', array($this->connectionStatus)); } public function generateReceiptId() { return mt_rand(); } - } diff --git a/src/Factory.php b/src/Factory.php index d805661..e08cbc7 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -2,12 +2,13 @@ namespace React\Stomp; +use React\Socket\Connection; +use React\Stomp\FrameBuffer; +use React\Stream\ThroughStream; use React\EventLoop\LoopInterface; +use React\Stream\ReadableResourceStream; +use React\Stream\WritableResourceStream; use React\Stomp\Exception\ConnectionException; -use React\Stomp\Io\InputStream; -use React\Stomp\Io\OutputStream; -use React\Stomp\Protocol\Parser; -use React\Socket\Connection; class Factory { @@ -26,23 +27,34 @@ public function __construct(LoopInterface $loop) $this->loop = $loop; } - public function createClient(array $options = array()) + public function createClient(array $options = array(), $silent = false) { $options = array_merge($this->defaultOptions, $options); - $conn = $this->createConnection($options); + $connection = $this->createConnection($options); + + $frameBuffer = new FrameBuffer; + $input = new WritableResourceStream(fopen('php://stdout', 'w'), $this->loop); + $output = new ReadableResourceStream(fopen('php://stdin', 'r'), $this->loop); - $parser = new Parser(); - $input = new InputStream($parser); - $conn->pipe($input); + $output->pipe($connection); + if (! $silent) { + $connection->pipe($input); + } + + $connection->pipe(new ThroughStream(function ($data) use ($input, $frameBuffer) { + $frames = $frameBuffer->addToBuffer($data)->pullFrames(); - $output = new OutputStream($this->loop); - $output->pipe($conn); + foreach ($frames as $frame) { + $input->emit('frame', array($frame)); + } + })); - $conn->on('error', function ($e) use ($input) { - $input->emit('error', array($e)); + $connection->on('error', function ($error) use ($input) { + $input->emit('error', array($error)); }); - $conn->on('close', function () use ($input) { + + $connection->on('close', function () use ($input) { $input->emit('close'); }); @@ -58,8 +70,8 @@ public function createConnection($options) throw new ConnectionException($message, $errno); } - $conn = new Connection($fd, $this->loop); + $connection = new Connection($fd, $this->loop); - return $conn; + return $connection; } } diff --git a/src/FrameBuffer.php b/src/FrameBuffer.php new file mode 100644 index 0000000..aba8633 --- /dev/null +++ b/src/FrameBuffer.php @@ -0,0 +1,34 @@ +parser = new Parser(); + } + + public function addToBuffer($data) + { + $this->buffer .= $data; + + return $this; + } + + public function pullFrames() + { + $data = $this->buffer; + + list($frames, $data) = $this->parser->parse($data); + + $this->buffer = $data; + + return $frames; + } +} diff --git a/src/Io/InputStream.php b/src/Io/InputStream.php deleted file mode 100644 index 907c206..0000000 --- a/src/Io/InputStream.php +++ /dev/null @@ -1,35 +0,0 @@ -on('frame', function ($frame) { -// lulz -// }); -// $conn->pipe($input); - -class InputStream extends WritableStream implements InputStreamInterface -{ - private $buffer = ''; - private $parser; - - public function __construct(Parser $parser) - { - $this->parser = $parser; - } - - public function write($data) - { - $data = $this->buffer.$data; - list($frames, $data) = $this->parser->parse($data); - $this->buffer = $data; - - foreach ($frames as $frame) { - $this->emit('frame', array($frame)); - } - } -} diff --git a/src/Io/InputStreamInterface.php b/src/Io/InputStreamInterface.php deleted file mode 100644 index aca410d..0000000 --- a/src/Io/InputStreamInterface.php +++ /dev/null @@ -1,13 +0,0 @@ -pipe($conn); -// $output->sendFrame($frame); - -class OutputStream extends ReadableStream implements OutputStreamInterface -{ - private $loop; - private $paused = false; - private $bufferedFrames = array(); - - public function __construct(LoopInterface $loop) - { - $this->loop = $loop; - } - - public function sendFrame(Frame $frame) - { - if ($this->paused) { - $this->bufferedFrames[] = $frame; - return; - } - - $data = (string) $frame; - $this->emit('data', array($data)); - } - - public function pause() - { - $this->paused = true; - } - - public function resume() - { - $this->paused = false; - - $this->loop->addTimer(0.001, array($this, 'sendBufferedFrames')); - } - - public function sendBufferedFrames() - { - if ($this->paused) { - return; - } - - while ($frame = array_shift($this->bufferedFrames)) { - $this->sendFrame($frame); - - if ($this->paused) { - return; - } - } - } -} diff --git a/src/Io/OutputStreamInterface.php b/src/Io/OutputStreamInterface.php deleted file mode 100644 index 5bd46ff..0000000 --- a/src/Io/OutputStreamInterface.php +++ /dev/null @@ -1,17 +0,0 @@ -createInputStreamMock(); - - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); - $output - ->expects($this->once()) - ->method('sendFrame') - ->with($this->frameIsEqual(new Frame('CONNECT', array('accept-version' => '1.1', 'host' => 'localhost')))); + $emitted = false; + $input = $this->createInputStream(); + $output = $this->createOutputStream(); + $output->on('data', function ($frame) use (&$emitted) { + $emitted = true; + $expected = new Frame('CONNECT', array('accept-version' => '1.1', 'host' => 'localhost')); + $this->assertEquals((string) $expected, (string) $frame); + }); $client = new Client($this->createLoopMock(), $input, $output, array('vhost' => 'localhost')); $client->connect(); + + $this->assertTrue($emitted); } /** @test */ public function connectShouldReturnPromise() { - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = new Client($this->createLoopMock(), $input, $output, array('vhost' => 'localhost')); $promise = $client->connect(); @@ -43,16 +48,16 @@ public function connectShouldReturnPromise() */ public function connectShouldRejectMissingHostOrVhost() { - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = new Client($this->createLoopMock(), $input, $output, array()); } /** @test */ public function connectTwiceShouldReturnTheSamePromise() { - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = new Client($this->createLoopMock(), $input, $output, array('vhost' => 'localhost')); $promise1 = $client->connect(); @@ -66,24 +71,21 @@ public function connectTwiceShouldReturnTheSamePromise() /** @test */ public function disconnectThenConnectShouldReturnNewPromise() { - $input = $this->createInputStreamMock(); - - $connectFrame = new Frame('CONNECT', array('accept-version' => '1.1', 'host' => 'localhost')); - - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); - $output - ->expects($this->at(0)) - ->method('sendFrame') - ->with($this->frameIsEqual($connectFrame)); - $output - ->expects($this->at(2)) - ->method('sendFrame') - ->with($this->frameIsEqual($connectFrame)); + $emitted = false; + $input = $this->createInputStream(); + $output = $this->createOutputStream(); + $output->on('connect', function ($frame) use (&$emitted) { + $emitted = true; + $expected = new Frame('CONNECT', array('accept-version' => '1.1', 'host' => 'localhost')); + $this->assertEquals((string) $expected, (string) $frame); + }); $client = new Client($this->createLoopMock(), $input, $output, array('vhost' => 'localhost')); $promise1 = $client->connect(); + $this->assertFalse($emitted); // Deferred $client->disconnect(); $promise2 = $client->connect(); + $this->assertFalse($emitted); // Deferred $this->assertInstanceOf('React\Promise\PromiseInterface', $promise1); $this->assertInstanceOf('React\Promise\PromiseInterface', $promise2); @@ -93,22 +95,22 @@ public function disconnectThenConnectShouldReturnNewPromise() /** @test */ public function itShouldEmitConnectAfterHandshake() { - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = new Client($this->createLoopMockWithConnectionTimer(), $input, $output, array('vhost' => 'localhost')); $client->on('connect', $this->expectCallableOnce()); $client->connect(); $frame = new Frame('CONNECTED', array('session' => '1234', 'server' => 'React/alpha')); - $input->emit('frame', array($frame)); + $input->emit('frame', [$frame]); } /** @test */ public function itShouldRejectPromiseIfConnectionFails() { - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = new Client($this->createLoopMock(), $input, $output, array('vhost' => 'localhost')); $client->connect() @@ -121,8 +123,8 @@ public function itShouldRejectPromiseIfConnectionFails() /** @test */ public function itShouldRejectPromiseIfConnectionTimeout() { - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $loop = $this->createLoopMock(); $timeout = 30; @@ -148,8 +150,8 @@ public function itShouldRejectPromiseIfConnectionTimeout() /** @test */ public function timeoutThenConnectShouldReturnANewPromise() { - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $loop = $this->createLoopMock(); $timeout = 30; @@ -178,8 +180,8 @@ public function timeoutThenConnectShouldReturnANewPromise() /** @test */ public function itShouldCancelTimerOnConnection() { - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $timeout = 30; $signature = uniqid('signature'); @@ -197,8 +199,8 @@ public function itShouldCancelTimerOnConnection() /** @test */ public function itShouldNotBeConnectedAfterConstructor() { - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = new Client($this->createLoopMock(), $input, $output, array('vhost' => 'localhost')); @@ -208,8 +210,8 @@ public function itShouldNotBeConnectedAfterConstructor() /** @test */ public function itShouldThrowAnExceptionOnConnectedFrameOutsideWindow() { - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $loop = $this->createLoopMock(); $errors = array(); @@ -228,8 +230,8 @@ public function itShouldThrowAnExceptionOnConnectedFrameOutsideWindow() /** @test */ public function itShouldBeConnectedWhenThePromiseIsResolved() { - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = new Client($this->createLoopMockWithConnectionTimer(), $input, $output, array('vhost' => 'localhost')); $client->connect(); @@ -243,8 +245,8 @@ public function itShouldBeConnectedWhenThePromiseIsResolved() /** @test */ public function itShouldNotBeConnectedAfterDisconnection() { - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = new Client($this->createLoopMockWithConnectionTimer(), $input, $output, array('vhost' => 'localhost')); $client->connect(); @@ -259,8 +261,8 @@ public function itShouldNotBeConnectedAfterDisconnection() /** @test */ public function itShouldResolveConnectPromiseAfterHandshake() { - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = new Client($this->createLoopMockWithConnectionTimer(), $input, $output, array('vhost' => 'localhost')); $client->on('connect', $this->expectCallableOnce()); @@ -275,8 +277,8 @@ public function itShouldResolveConnectPromiseAfterHandshake() /** @test */ public function itShouldChangeToConnectedStateWhenReceivingConnectedResponse() { - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = $this->getConnectedClient($input, $output); } @@ -284,18 +286,19 @@ public function itShouldChangeToConnectedStateWhenReceivingConnectedResponse() /** @test */ public function sendShouldSend() { - $input = $this->createInputStreamMock(); - - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); - $output - ->expects($this->at(1)) - ->method('sendFrame') - ->with($this->frameIsEqual( - new Frame('SEND', array('destination' => '/foo', 'content-length' => '5', 'content-type' => 'text/plain'), 'hello') - )); - + $emitted = false; + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = $this->getConnectedClient($input, $output); + $output->on('data', function ($frame) use (&$emitted) { + $emitted = true; + $expected = new Frame('SEND', array('destination' => '/foo', 'content-length' => '5', 'content-type' => 'text/plain'), 'hello'); + $this->assertEquals((string) $expected, (string) $frame); + }); + $client->send('/foo', 'hello'); + + $this->assertTrue($emitted); } /** @@ -304,20 +307,20 @@ public function sendShouldSend() */ public function subscribeWithAckMustIncludeAValidAckMethod($ack) { + $emitted = false; + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $callback = $this->createCallableMock(); - - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); - - $output - ->expects($this->at(1)) - ->method('sendFrame') - ->with($this->frameIsEqual( - new Frame('SUBSCRIBE', array('id' => 0, 'destination' => '/foo', 'ack' => $ack)) - )); - $client = $this->getConnectedClient($input, $output); + $output->on('data', function ($frame) use (&$emitted, $ack) { + $emitted = true; + $expected = new Frame('SUBSCRIBE', array('id' => 0, 'destination' => '/foo', 'ack' => $ack)); + $this->assertEquals((string) $expected, (string) $frame); + }); + $client->subscribeWithAck('/foo', $ack, $callback); + + $this->assertTrue($emitted); } public function provideAvailableAckMethods() @@ -331,43 +334,44 @@ public function provideAvailableAckMethods() /** @test */ public function subscribeHasUniqueIdHeader() { + $expectedId = 0; + $emitted = false; + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $callback = $this->createCallableMock(); - - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); - - $output - ->expects($this->at(1)) - ->method('sendFrame') - ->with($this->frameHasHeader('id', 0)); - - $output - ->expects($this->at(2)) - ->method('sendFrame') - ->with($this->frameHasHeader('id', 1)); - $client = $this->getConnectedClient($input, $output); + $output->on('data', function ($frame) use (&$emitted, &$expectedId) { + $emitted = true; + $this->assertEquals($expectedId, $frame->getHeader('id')); + }); + + $expectedId = 0; $client->subscribe('/foo', $callback); + $this->assertTrue($emitted); + + $expectedId = 1; + $emitted = false; $client->subscribe('/bar', $callback); + $this->assertTrue($emitted); } /** @test */ public function subscribeCanEmbedCustomHeader() { + $emitted = false; + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $callback = $this->createCallableMock(); - - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); - - $output - ->expects($this->at(1)) - ->method('sendFrame') - ->with($this->frameIsEqual( - new Frame('SUBSCRIBE', array('foo' => 'bar', 'id' => 0, 'destination' => '/foo', 'ack' => 'auto')) - )); - $client = $this->getConnectedClient($input, $output); + $output->on('data', function ($frame) use (&$emitted) { + $emitted = true; + $expected = new Frame('SUBSCRIBE', array('foo' => 'bar', 'id' => 0, 'destination' => '/foo', 'ack' => 'auto')); + $this->assertEquals((string) $expected, (string) $frame); + }); + $client->subscribe('/foo', $callback, array('foo' => 'bar')); + + $this->assertTrue($emitted); } /** @@ -378,8 +382,8 @@ public function subscribeWithAckDoesNotWorkWithAutoAckMode() { $callback = $this->createCallableMock(); - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = $this->getConnectedClient($input, $output); $client->subscribeWithAck('/foo', 'auto', $callback); @@ -401,8 +405,8 @@ public function subscribeWithAckCallbackShouldHaveAckResolverArgument($ack) $capturedResolver = $resolver; })); - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = $this->getConnectedClient($input, $output); $subscriptionId = $client->subscribeWithAck('/foo', $ack, $callback); @@ -438,15 +442,8 @@ public function acknowledgeWithAckResolverArgumentShouldSendAckFrame() $capturedResolver = $resolver; })); - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); - - $output - ->expects($this->at(2)) - ->method('sendFrame') - ->with($this->frameIsEqual( - new Frame('ACK', array('subscription' => 0, 'message-id' => 54321)) - )); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = $this->getConnectedClient($input, $output); $subscriptionId = $client->subscribeWithAck('/foo', 'client', $callback); @@ -458,7 +455,15 @@ public function acknowledgeWithAckResolverArgumentShouldSendAckFrame() ); $input->emit('frame', array($responseFrame)); + $emitted = false; + $output->on('data', function ($frame) use (&$emitted) { + $emitted = true; + $expected = new Frame('ACK', array('subscription' => 0, 'message-id' => 54321)); + $this->assertEquals((string) $expected, (string) $frame); + }); + $capturedResolver->ack(); + $this->assertTrue($emitted); } /** @test */ @@ -474,15 +479,8 @@ public function negativeAcknowledgeWithAckResolverArgumentShouldSendNackFrame() $capturedResolver = $resolver; })); - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); - - $output - ->expects($this->at(2)) - ->method('sendFrame') - ->with($this->frameIsEqual( - new Frame('NACK', array('subscription' => 0, 'message-id' => 54321)) - )); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = $this->getConnectedClient($input, $output); $subscriptionId = $client->subscribeWithAck('/foo', 'client', $callback); @@ -494,7 +492,15 @@ public function negativeAcknowledgeWithAckResolverArgumentShouldSendNackFrame() ); $input->emit('frame', array($responseFrame)); + $emitted = false; + $output->on('data', function ($frame) use (&$emitted) { + $emitted = true; + $expected = new Frame('NACK', array('subscription' => 0, 'message-id' => 54321)); + $this->assertEquals((string) $expected, (string) $frame); + }); + $capturedResolver->nack(); + $this->assertTrue($emitted); } /** @test */ @@ -510,8 +516,8 @@ public function messagesShouldGetRoutedToSubscriptions() $capturedFrame = $frame; })); - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = $this->getConnectedClient($input, $output); $subscriptionId = $client->subscribe('/foo', $callback); @@ -557,76 +563,76 @@ public function callbackShouldNotBeCalledAfterUnsubscribe($data) /** @test */ public function ackShouldSendAckFrame() { - $input = $this->createInputStreamMock(); - - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); - $output - ->expects($this->at(1)) - ->method('sendFrame') - ->with($this->frameIsEqual( - new Frame('ACK', array('subscription' => 12345, 'message-id' => 54321)) - )); - + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = $this->getConnectedClient($input, $output); + + $emitted = false; + $output->on('data', function ($frame) use (&$emitted) { + $emitted = true; + $expected = new Frame('ACK', array('subscription' => 12345, 'message-id' => 54321)); + $this->assertEquals((string) $expected, (string) $frame); + }); $client->ack(12345, 54321); + $this->assertTrue($emitted); } /** @test */ public function ackShouldSendAckFrameWithCustomHeaders() { - $input = $this->createInputStreamMock(); - - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); - $output - ->expects($this->at(1)) - ->method('sendFrame') - ->with($this->frameIsEqual( - new Frame('ACK', array('foo' => 'bar', 'subscription' => 12345, 'message-id' => 54321)) - )); - + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = $this->getConnectedClient($input, $output); + + $emitted = false; + $output->on('data', function ($frame) use (&$emitted) { + $emitted = true; + $expected = new Frame('ACK', array('foo' => 'bar', 'subscription' => 12345, 'message-id' => 54321)); + $this->assertEquals((string) $expected, (string) $frame); + }); $client->ack(12345, 54321, array('foo' => 'bar')); + $this->assertTrue($emitted); } /** @test */ public function nackShouldSendNackFrame() { - $input = $this->createInputStreamMock(); - - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); - $output - ->expects($this->at(1)) - ->method('sendFrame') - ->with($this->frameIsEqual( - new Frame('NACK', array('subscription' => 12345, 'message-id' => 54321)) - )); - + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = $this->getConnectedClient($input, $output); + + $emitted = false; + $output->on('data', function ($frame) use (&$emitted) { + $emitted = true; + $expected = new Frame('NACK', array('subscription' => 12345, 'message-id' => 54321)); + $this->assertEquals((string) $expected, (string) $frame); + }); $client->nack(12345, 54321); + $this->assertTrue($emitted); } /** @test */ public function nackShouldSendNackFrameWithCustomHeaders() { - $input = $this->createInputStreamMock(); - - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); - $output - ->expects($this->at(1)) - ->method('sendFrame') - ->with($this->frameIsEqual( - new Frame('NACK', array('foo' => 'bar', 'subscription' => 12345, 'message-id' => 54321)) - )); - + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = $this->getConnectedClient($input, $output); + + $emitted = false; + $output->on('data', function ($frame) use (&$emitted) { + $emitted = true; + $expected = new Frame('NACK', array('foo' => 'bar', 'subscription' => 12345, 'message-id' => 54321)); + $this->assertEquals((string) $expected, (string) $frame); + }); $client->nack(12345, 54321, array('foo' => 'bar')); + $this->assertTrue($emitted); } /** @test */ public function disconnectShouldGracefullyDisconnect() { - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = $this->getMockBuilder('React\Stomp\Client') ->setConstructorArgs(array($this->createLoopMock(), $input, $output, array('vhost' => 'localhost'))) @@ -639,18 +645,19 @@ public function disconnectShouldGracefullyDisconnect() $input->emit('frame', array(new Frame('CONNECTED'))); $client->disconnect(); - $output - ->expects($this->once()) - ->method('close'); - + $emitted = false; + $output->on('close', function () use (&$emitted) { + $emitted = true; + }); $input->emit('frame', array(new Frame('RECEIPT', array('receipt-id' => '1234')))); + $this->assertTrue($emitted); } /** @test */ public function processingErrorShouldResultInClientError() { - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = $this->getConnectedClient($input, $output); $client->on('error', $this->expectCallableOnce()); @@ -661,8 +668,8 @@ public function processingErrorShouldResultInClientError() /** @test */ public function inputErrorShouldResultInClientError() { - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = $this->getConnectedClient($input, $output); $client->on('error', $this->expectCallableOnce()); @@ -674,8 +681,8 @@ public function inputErrorShouldResultInClientError() /** @test */ public function inputCloseShouldResultInClientClose() { - $input = $this->createInputStreamMock(); - $output = $this->createMock('React\Stomp\Io\OutputStreamInterface'); + $input = $this->createInputStream(); + $output = $this->createOutputStream(); $client = $this->getConnectedClient($input, $output); $client->on('close', $this->expectCallableOnce()); @@ -683,7 +690,7 @@ public function inputCloseShouldResultInClientClose() $input->emit('close'); } - private function getConnectedClient(InputStreamInterface $input, OutputStreamInterface $output) + private function getConnectedClient(WritableResourceStream $input, ReadableResourceStream $output) { $client = new Client($this->createLoopMockWithConnectionTimer(), $input, $output, array('vhost' => 'localhost')); $client->connect(); @@ -695,7 +702,10 @@ private function getConnectedClient(InputStreamInterface $input, OutputStreamInt private function createInputStreamMock() { return $this->getMockBuilder('React\Stomp\Io\InputStream') - ->setConstructorArgs(array($this->createMock('React\Stomp\Protocol\Parser'))) + ->setConstructorArgs([ + fopen('php://temp', 'r+'), + $this->createMock('React\EventLoop\StreamSelectLoop') + ]) ->setMethods(array('isWritable', 'write', 'end', 'close')) ->getMock(); } @@ -709,9 +719,13 @@ private function createLoopMockWithConnectionTimer() { $loop = $this->createLoopMock(); - $timer = $this->createMock('React\EventLoop\Timer\TimerInterface'); - $timer->expects($this->once()) - ->method('cancel'); + $timer = $this->createMock('React\EventLoop\TimerInterface'); + + // $timer->expects($this->once()) + // ->method('cancel'); + $loop->expects($this->once()) + ->method('cancelTimer') + ->with($this->equalTo($timer)); $loop->expects($this->once()) ->method('addTimer') @@ -719,4 +733,14 @@ private function createLoopMockWithConnectionTimer() return $loop; } + + private function createInputStream() + { + return new WritableResourceStream(fopen('php://temp', 'w+'), $this->createLoopMock()); + } + + private function createOutputStream() + { + return new ReadableResourceStream(fopen('php://temp', 'r+'), $this->createLoopMock()); + } } diff --git a/tests/React/Tests/Stomp/Constraint/FrameHasHeader.php b/tests/React/Tests/Stomp/Constraint/FrameHasHeader.php index 4bd150c..c12f4c5 100644 --- a/tests/React/Tests/Stomp/Constraint/FrameHasHeader.php +++ b/tests/React/Tests/Stomp/Constraint/FrameHasHeader.php @@ -3,9 +3,9 @@ namespace React\Tests\Stomp\Constraint; use React\Stomp\Protocol\Frame; -use PHPUnit_Framework_Constraint as Constraint; +use PHPUnit\Framework\Constraint\Constraint as TestContraint; -class FrameHasHeader extends Constraint +class FrameHasHeader extends TestContraint { protected $name; protected $value; diff --git a/tests/React/Tests/Stomp/Constraint/FrameIsEqual.php b/tests/React/Tests/Stomp/Constraint/FrameIsEqual.php index c0af1e6..bc2dc58 100644 --- a/tests/React/Tests/Stomp/Constraint/FrameIsEqual.php +++ b/tests/React/Tests/Stomp/Constraint/FrameIsEqual.php @@ -3,9 +3,9 @@ namespace React\Tests\Stomp\Constraint; use React\Stomp\Protocol\Frame; -use PHPUnit_Framework_Constraint as Constraint; +use PHPUnit\Framework\Constraint\Constraint as TestContraint; -class FrameIsEqual extends Constraint +class FrameIsEqual extends TestContraint { protected $frame; diff --git a/tests/React/Tests/Stomp/FrameBufferTest.php b/tests/React/Tests/Stomp/FrameBufferTest.php new file mode 100644 index 0000000..0f0ba3b --- /dev/null +++ b/tests/React/Tests/Stomp/FrameBufferTest.php @@ -0,0 +1,38 @@ + '1.1', 'host' => 'localhost')); + $incompleteFrame = str_replace("\x00", '', $frame); + + $frameBuffer->addToBuffer($incompleteFrame); + + $this->assertEquals(0, count($frameBuffer->pullFrames())); + } + + /** @test */ + public function onlyCompleteFramesArePulled() + { + $frameBuffer = new FrameBuffer; + $frame = new Frame('CONNECT', array('accept-version' => '1.1', 'host' => 'localhost')); + $incompleteFrame = str_replace("\x00", '', $frame); + + $frameBuffer->addToBuffer((string) $frame); + $frameBuffer->addToBuffer($incompleteFrame); + + $this->assertEquals(1, count($frameBuffer->pullFrames())); + $this->assertEquals(0, count($frameBuffer->pullFrames())); + $frameBuffer->addToBuffer("\x00"); + $this->assertEquals(1, count($frameBuffer->pullFrames())); + $this->assertEquals(0, count($frameBuffer->pullFrames())); + } +} diff --git a/tests/React/Tests/Stomp/Io/InputStreamTest.php b/tests/React/Tests/Stomp/Io/InputStreamTest.php deleted file mode 100644 index cdb5b71..0000000 --- a/tests/React/Tests/Stomp/Io/InputStreamTest.php +++ /dev/null @@ -1,107 +0,0 @@ -assertTrue($input->isWritable()); - } - - /** @test */ - public function incompleteWriteShouldNotEmitFrame() - { - $input = new InputStream(new Parser()); - $input->on('frame', $this->expectCallableNever()); - - $input->write("FOO\n\n"); - } - - /** @test */ - public function singleFrameWriteShouldEmitExactlyOneFrame() - { - $callback = $this->createCallableMock(); - $callback - ->expects($this->once()) - ->method('__invoke') - ->with($this->frameIsEqual(new Frame('CONNECTED'))); - - $input = new InputStream(new Parser()); - $input->on('frame', $callback); - - $input->write("CONNECTED\n\n\x00"); - } - - /** @test */ - public function manySegmentedFramesWrittenShouldEmitThoseFrames() - { - $callback = $this->createCallableMock(); - $callback - ->expects($this->at(0)) - ->method('__invoke') - ->with($this->frameIsEqual(new Frame('CONNECTED'))); - $callback - ->expects($this->at(1)) - ->method('__invoke') - ->with($this->frameIsEqual(new Frame('MESSAGE', array(), 'Body'))); - - $input = new InputStream(new Parser()); - $input->on('frame', $callback); - - $input->write("CONNECTED\n\n"); - $input->write("\x00"); - $input->write("MESSAGE\n\n"); - $input->write("Body\x00"); - $input->write("MESSAGE"); - } - - /** @test */ - public function endShouldWriteGivenDataThenClose() - { - $callback = $this->createCallableMock(); - $callback - ->expects($this->at(0)) - ->method('__invoke') - ->with($this->frameIsEqual(new Frame('CONNECTED'))); - - $input = new InputStream(new Parser()); - $input->on('frame', $callback); - $input->on('close', $this->expectCallableOnce()); - - $input->write("CONNECTED\n\n"); - $input->end("\x00"); - - $this->assertFalse($input->isWritable()); - } - - /** @test */ - public function closeShouldClose() - { - $input = new InputStream(new Parser()); - $input->on('frame', $this->expectCallableNever()); - $input->on('close', $this->expectCallableOnce()); - - $input->close(); - - $this->assertFalse($input->isWritable()); - } - - /** @test */ - public function writingAfterCloseShouldDoNothing() - { - $input = new InputStream(new Parser()); - $input->close(); - - $input->write('whoops'); - } -} diff --git a/tests/React/Tests/Stomp/Io/OutputStreamTest.php b/tests/React/Tests/Stomp/Io/OutputStreamTest.php deleted file mode 100644 index 153d32e..0000000 --- a/tests/React/Tests/Stomp/Io/OutputStreamTest.php +++ /dev/null @@ -1,71 +0,0 @@ -loop = $this->createMock('React\EventLoop\LoopInterface'); - $this->loop - ->expects($this->any()) - ->method('addTimer') - ->will($this->returnCallback(function ($seconds, $callback) { - call_user_func($callback); - })); - } - - /** @test */ - public function itShouldBeReadableByDefault() - { - $output = new OutputStream($this->loop); - - $this->assertTrue($output->isReadable()); - } - - /** @test */ - public function sendFrameShouldDumpAndEmitFrameData() - { - $frame = new Frame('CONNECT'); - - $callback = $this->createCallableMock(); - $callback - ->expects($this->once()) - ->method('__invoke') - ->with($this->frameIsEqual($frame)); - - $output = new OutputStream($this->loop); - $output->on('data', $callback); - $output->sendFrame($frame); - } - - /** @test */ - public function pausedStreamShouldQueueFrames() - { - $frame = new Frame('CONNECT'); - - $output = new OutputStream($this->loop); - $output->pause(); - - $output->on('data', $this->expectCallableNever()); - $output->sendFrame($frame); - $output->removeAllListeners(); - - $output->on('data', $this->expectCallableOnce()); - $output->resume(); - } - - /** @test */ - public function closeShouldMakeStreamUnreadable() - { - $output = new OutputStream($this->loop); - $output->close(); - - $this->assertFalse($output->isReadable()); - } -}