diff --git a/.github/workflows/behat.yml b/.github/workflows/behat.yml new file mode 100644 index 0000000..6d35c7f --- /dev/null +++ b/.github/workflows/behat.yml @@ -0,0 +1,61 @@ +name: "Behat tests" + +on: + pull_request: + push: + +jobs: + phpunit: + name: "Behat tests" + + runs-on: ${{ matrix.operating-system }} + + strategy: + matrix: + dependencies: + - "locked" + php-version: + - "8.1" + operating-system: + - "ubuntu-latest" + + steps: + - name: "Checkout" + uses: "actions/checkout@v3" + + - name: "Install PHP" + uses: "shivammathur/setup-php@2.18.1" + with: + php-version: "${{ matrix.php-version }}" + ini-values: memory_limit=-1 + tools: composer:v2, cs2pr + + - name: Get composer cache directory + id: composer-cache + run: echo "::set-output name=dir::$(composer config cache-files-dir)" + + - name: "Cache dependencies" + uses: "actions/cache@v3.0.4" + with: + path: ${{ steps.composer-cache.outputs.dir }} + key: "php-${{ matrix.php-version }}-composer-${{ matrix.dependencies }}-${{ hashFiles('**/composer.lock') }}" + restore-keys: "php-${{ matrix.php-version }}-composer-${{ matrix.dependencies }}-" + + - name: "Install lowest dependencies" + if: ${{ matrix.dependencies == 'lowest' }} + run: "composer update --prefer-lowest --no-interaction --no-progress" + + - name: "Install highest dependencies" + if: ${{ matrix.dependencies == 'highest' }} + run: "composer update --no-interaction --no-progress" + + - name: "Install locked dependencies" + if: ${{ matrix.dependencies == 'locked' }} + run: "composer install --no-interaction --no-progress" + + - name: "Install development dependencies" + if: ${{ matrix.dependencies == 'development' }} + run: "composer config minimum-stability dev && composer update --no-interaction --no-progress" + + - name: "Tests" + run: "make behat" diff --git a/Makefile b/Makefile index 381ebab..f15eaab 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ PARALLELISM := $(shell nproc) .PHONY: all -all: install phpcbf phpcs phpstan phpunit infection +all: install phpcbf phpcs phpstan phpunit behat infection .PHONY: install install: vendor/composer/installed.json @@ -14,6 +14,10 @@ vendor/composer/installed.json: composer.json composer.lock phpunit: @php -d zend.assertions=1 vendor/bin/phpunit --testsuite=unit +.PHONY: behat +behat: + @php -d zend.assertions=1 vendor/bin/behat + .PHONY: infection infection: @php -d zend.assertions=1 -d xdebug.mode=coverage vendor/bin/phpunit --testsuite=unit --coverage-xml=build/coverage-xml --log-junit=build/junit.xml $(PHPUNIT_FLAGS) @@ -21,7 +25,7 @@ infection: .PHONY: phpcbf phpcbf: - @vendor/bin/phpcbf --parallel=$(PARALLELISM) + @vendor/bin/phpcbf --parallel=$(PARALLELISM) || true .PHONY: phpcs phpcs: diff --git a/behat.yml b/behat.yml new file mode 100644 index 0000000..cf75041 --- /dev/null +++ b/behat.yml @@ -0,0 +1,8 @@ +default: + gherkin: + filters: + tags: ~@wip + + suites: + acceptance: + contexts: [] diff --git a/composer.json b/composer.json index 269cc86..fa7001b 100644 --- a/composer.json +++ b/composer.json @@ -22,9 +22,11 @@ "php-64bit": "^8.1", "ext-pcntl": "*", "psr/log": "^3.0", + "react/async": "^4.0", "react/socket": "^1.6" }, "require-dev": { + "behat/behat": "^3.11", "infection/infection": "^0.26", "lcobucci/coding-standard": "^8.0", "monolog/monolog": "^3.2", diff --git a/composer.lock b/composer.lock index 6f8fb55..bb8067c 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "0dc8490785023b65152783fa7a6341fe", + "content-hash": "a1c4404a9eece5511eac5153a2920dc1", "packages": [ { "name": "evenement/evenement", @@ -103,6 +103,84 @@ }, "time": "2021-07-14T16:46:02+00:00" }, + { + "name": "react/async", + "version": "v4.0.0", + "source": { + "type": "git", + "url": "https://github.com/reactphp/async.git", + "reference": "2aa8d89057e1059f59666e4204100636249b7be0" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/reactphp/async/zipball/2aa8d89057e1059f59666e4204100636249b7be0", + "reference": "2aa8d89057e1059f59666e4204100636249b7be0", + "shasum": "" + }, + "require": { + "php": ">=8.1", + "react/event-loop": "^1.2", + "react/promise": "^3.0 || ^2.8 || ^1.2.1" + }, + "require-dev": { + "phpunit/phpunit": "^9.3" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions_include.php" + ], + "psr-4": { + "React\\Async\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Christian Lück", + "email": "christian@clue.engineering", + "homepage": "https://clue.engineering/" + }, + { + "name": "Cees-Jan Kiewiet", + "email": "reactphp@ceesjankiewiet.nl", + "homepage": "https://wyrihaximus.net/" + }, + { + "name": "Jan Sorgalla", + "email": "jsorgalla@gmail.com", + "homepage": "https://sorgalla.com/" + }, + { + "name": "Chris Boden", + "email": "cboden@gmail.com", + "homepage": "https://cboden.dev/" + } + ], + "description": "Async utilities and fibers for ReactPHP", + "keywords": [ + "async", + "reactphp" + ], + "support": { + "issues": "https://github.com/reactphp/async/issues", + "source": "https://github.com/reactphp/async/tree/v4.0.0" + }, + "funding": [ + { + "url": "https://github.com/WyriHaximus", + "type": "github" + }, + { + "url": "https://github.com/clue", + "type": "github" + } + ], + "time": "2022-07-11T14:21:02+00:00" + }, { "name": "react/cache", "version": "v1.1.1", @@ -664,6 +742,204 @@ } ], "packages-dev": [ + { + "name": "behat/behat", + "version": "v3.11.0", + "source": { + "type": "git", + "url": "https://github.com/Behat/Behat.git", + "reference": "a19c72c78eb0cdf7b7c4dabfeec9eb3a282728fc" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/Behat/Behat/zipball/a19c72c78eb0cdf7b7c4dabfeec9eb3a282728fc", + "reference": "a19c72c78eb0cdf7b7c4dabfeec9eb3a282728fc", + "shasum": "" + }, + "require": { + "behat/gherkin": "^4.9.0", + "behat/transliterator": "^1.2", + "ext-mbstring": "*", + "php": "^7.2 || ^8.0", + "psr/container": "^1.0 || ^2.0", + "symfony/config": "^4.4 || ^5.0 || ^6.0", + "symfony/console": "^4.4 || ^5.0 || ^6.0", + "symfony/dependency-injection": "^4.4 || ^5.0 || ^6.0", + "symfony/event-dispatcher": "^4.4 || ^5.0 || ^6.0", + "symfony/translation": "^4.4 || ^5.0 || ^6.0", + "symfony/yaml": "^4.4 || ^5.0 || ^6.0" + }, + "require-dev": { + "herrera-io/box": "~1.6.1", + "phpunit/phpunit": "^8.5 || ^9.0", + "symfony/process": "^4.4 || ^5.0 || ^6.0", + "vimeo/psalm": "^4.8" + }, + "suggest": { + "ext-dom": "Needed to output test results in JUnit format." + }, + "bin": [ + "bin/behat" + ], + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "3.x-dev" + } + }, + "autoload": { + "psr-4": { + "Behat\\Hook\\": "src/Behat/Hook/", + "Behat\\Step\\": "src/Behat/Step/", + "Behat\\Behat\\": "src/Behat/Behat/", + "Behat\\Testwork\\": "src/Behat/Testwork/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Konstantin Kudryashov", + "email": "ever.zet@gmail.com", + "homepage": "http://everzet.com" + } + ], + "description": "Scenario-oriented BDD framework for PHP", + "homepage": "http://behat.org/", + "keywords": [ + "Agile", + "BDD", + "ScenarioBDD", + "Scrum", + "StoryBDD", + "User story", + "business", + "development", + "documentation", + "examples", + "symfony", + "testing" + ], + "support": { + "issues": "https://github.com/Behat/Behat/issues", + "source": "https://github.com/Behat/Behat/tree/v3.11.0" + }, + "time": "2022-07-07T09:49:27+00:00" + }, + { + "name": "behat/gherkin", + "version": "v4.9.0", + "source": { + "type": "git", + "url": "https://github.com/Behat/Gherkin.git", + "reference": "0bc8d1e30e96183e4f36db9dc79caead300beff4" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/Behat/Gherkin/zipball/0bc8d1e30e96183e4f36db9dc79caead300beff4", + "reference": "0bc8d1e30e96183e4f36db9dc79caead300beff4", + "shasum": "" + }, + "require": { + "php": "~7.2|~8.0" + }, + "require-dev": { + "cucumber/cucumber": "dev-gherkin-22.0.0", + "phpunit/phpunit": "~8|~9", + "symfony/yaml": "~3|~4|~5" + }, + "suggest": { + "symfony/yaml": "If you want to parse features, represented in YAML files" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "4.x-dev" + } + }, + "autoload": { + "psr-0": { + "Behat\\Gherkin": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Konstantin Kudryashov", + "email": "ever.zet@gmail.com", + "homepage": "http://everzet.com" + } + ], + "description": "Gherkin DSL parser for PHP", + "homepage": "http://behat.org/", + "keywords": [ + "BDD", + "Behat", + "Cucumber", + "DSL", + "gherkin", + "parser" + ], + "support": { + "issues": "https://github.com/Behat/Gherkin/issues", + "source": "https://github.com/Behat/Gherkin/tree/v4.9.0" + }, + "time": "2021-10-12T13:05:09+00:00" + }, + { + "name": "behat/transliterator", + "version": "v1.5.0", + "source": { + "type": "git", + "url": "https://github.com/Behat/Transliterator.git", + "reference": "baac5873bac3749887d28ab68e2f74db3a4408af" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/Behat/Transliterator/zipball/baac5873bac3749887d28ab68e2f74db3a4408af", + "reference": "baac5873bac3749887d28ab68e2f74db3a4408af", + "shasum": "" + }, + "require": { + "php": ">=7.2" + }, + "require-dev": { + "chuyskywalker/rolling-curl": "^3.1", + "php-yaoi/php-yaoi": "^1.0", + "phpunit/phpunit": "^8.5.25 || ^9.5.19" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.x-dev" + } + }, + "autoload": { + "psr-4": { + "Behat\\Transliterator\\": "src/Behat/Transliterator" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "Artistic-1.0" + ], + "description": "String transliterator", + "keywords": [ + "i18n", + "slug", + "transliterator" + ], + "support": { + "issues": "https://github.com/Behat/Transliterator/issues", + "source": "https://github.com/Behat/Transliterator/tree/v1.5.0" + }, + "time": "2022-03-30T09:27:43+00:00" + }, { "name": "composer/pcre", "version": "3.0.0", @@ -2826,6 +3102,56 @@ }, "time": "2021-11-05T16:47:00+00:00" }, + { + "name": "psr/event-dispatcher", + "version": "1.0.0", + "source": { + "type": "git", + "url": "https://github.com/php-fig/event-dispatcher.git", + "reference": "dbefd12671e8a14ec7f180cab83036ed26714bb0" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/event-dispatcher/zipball/dbefd12671e8a14ec7f180cab83036ed26714bb0", + "reference": "dbefd12671e8a14ec7f180cab83036ed26714bb0", + "shasum": "" + }, + "require": { + "php": ">=7.2.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0.x-dev" + } + }, + "autoload": { + "psr-4": { + "Psr\\EventDispatcher\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "http://www.php-fig.org/" + } + ], + "description": "Standard interfaces for event handling.", + "keywords": [ + "events", + "psr", + "psr-14" + ], + "support": { + "issues": "https://github.com/php-fig/event-dispatcher/issues", + "source": "https://github.com/php-fig/event-dispatcher/tree/1.0.0" + }, + "time": "2019-01-08T18:20:26+00:00" + }, { "name": "sanmai/later", "version": "0.1.2", @@ -4095,55 +4421,42 @@ "time": "2022-06-18T07:21:10+00:00" }, { - "name": "symfony/console", - "version": "v6.1.2", + "name": "symfony/config", + "version": "v6.1.0", "source": { "type": "git", - "url": "https://github.com/symfony/console.git", - "reference": "7a86c1c42fbcb69b59768504c7bca1d3767760b7" + "url": "https://github.com/symfony/config.git", + "reference": "ed8d12417bcacd2d969750feb1fe1aab1c11e613" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/console/zipball/7a86c1c42fbcb69b59768504c7bca1d3767760b7", - "reference": "7a86c1c42fbcb69b59768504c7bca1d3767760b7", + "url": "https://api.github.com/repos/symfony/config/zipball/ed8d12417bcacd2d969750feb1fe1aab1c11e613", + "reference": "ed8d12417bcacd2d969750feb1fe1aab1c11e613", "shasum": "" }, "require": { "php": ">=8.1", "symfony/deprecation-contracts": "^2.1|^3", - "symfony/polyfill-mbstring": "~1.0", - "symfony/service-contracts": "^1.1|^2|^3", - "symfony/string": "^5.4|^6.0" + "symfony/filesystem": "^5.4|^6.0", + "symfony/polyfill-ctype": "~1.8" }, "conflict": { - "symfony/dependency-injection": "<5.4", - "symfony/dotenv": "<5.4", - "symfony/event-dispatcher": "<5.4", - "symfony/lock": "<5.4", - "symfony/process": "<5.4" - }, - "provide": { - "psr/log-implementation": "1.0|2.0|3.0" + "symfony/finder": "<5.4" }, "require-dev": { - "psr/log": "^1|^2|^3", - "symfony/config": "^5.4|^6.0", - "symfony/dependency-injection": "^5.4|^6.0", "symfony/event-dispatcher": "^5.4|^6.0", - "symfony/lock": "^5.4|^6.0", - "symfony/process": "^5.4|^6.0", - "symfony/var-dumper": "^5.4|^6.0" + "symfony/finder": "^5.4|^6.0", + "symfony/messenger": "^5.4|^6.0", + "symfony/service-contracts": "^1.1|^2|^3", + "symfony/yaml": "^5.4|^6.0" }, "suggest": { - "psr/log": "For using the console logger", - "symfony/event-dispatcher": "", - "symfony/lock": "", - "symfony/process": "" + "symfony/yaml": "To use the yaml reference dumper" }, "type": "library", "autoload": { "psr-4": { - "Symfony\\Component\\Console\\": "" + "Symfony\\Component\\Config\\": "" }, "exclude-from-classmap": [ "/Tests/" @@ -4163,16 +4476,10 @@ "homepage": "https://symfony.com/contributors" } ], - "description": "Eases the creation of beautiful and testable command line interfaces", + "description": "Helps you find, load, combine, autofill and validate configuration values of any kind", "homepage": "https://symfony.com", - "keywords": [ - "cli", - "command line", - "console", - "terminal" - ], "support": { - "source": "https://github.com/symfony/console/tree/v6.1.2" + "source": "https://github.com/symfony/config/tree/v6.1.0" }, "funding": [ { @@ -4188,10 +4495,193 @@ "type": "tidelift" } ], - "time": "2022-06-26T13:01:30+00:00" + "time": "2022-05-17T12:56:32+00:00" }, { - "name": "symfony/deprecation-contracts", + "name": "symfony/console", + "version": "v6.1.2", + "source": { + "type": "git", + "url": "https://github.com/symfony/console.git", + "reference": "7a86c1c42fbcb69b59768504c7bca1d3767760b7" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/console/zipball/7a86c1c42fbcb69b59768504c7bca1d3767760b7", + "reference": "7a86c1c42fbcb69b59768504c7bca1d3767760b7", + "shasum": "" + }, + "require": { + "php": ">=8.1", + "symfony/deprecation-contracts": "^2.1|^3", + "symfony/polyfill-mbstring": "~1.0", + "symfony/service-contracts": "^1.1|^2|^3", + "symfony/string": "^5.4|^6.0" + }, + "conflict": { + "symfony/dependency-injection": "<5.4", + "symfony/dotenv": "<5.4", + "symfony/event-dispatcher": "<5.4", + "symfony/lock": "<5.4", + "symfony/process": "<5.4" + }, + "provide": { + "psr/log-implementation": "1.0|2.0|3.0" + }, + "require-dev": { + "psr/log": "^1|^2|^3", + "symfony/config": "^5.4|^6.0", + "symfony/dependency-injection": "^5.4|^6.0", + "symfony/event-dispatcher": "^5.4|^6.0", + "symfony/lock": "^5.4|^6.0", + "symfony/process": "^5.4|^6.0", + "symfony/var-dumper": "^5.4|^6.0" + }, + "suggest": { + "psr/log": "For using the console logger", + "symfony/event-dispatcher": "", + "symfony/lock": "", + "symfony/process": "" + }, + "type": "library", + "autoload": { + "psr-4": { + "Symfony\\Component\\Console\\": "" + }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Fabien Potencier", + "email": "fabien@symfony.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + } + ], + "description": "Eases the creation of beautiful and testable command line interfaces", + "homepage": "https://symfony.com", + "keywords": [ + "cli", + "command line", + "console", + "terminal" + ], + "support": { + "source": "https://github.com/symfony/console/tree/v6.1.2" + }, + "funding": [ + { + "url": "https://symfony.com/sponsor", + "type": "custom" + }, + { + "url": "https://github.com/fabpot", + "type": "github" + }, + { + "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", + "type": "tidelift" + } + ], + "time": "2022-06-26T13:01:30+00:00" + }, + { + "name": "symfony/dependency-injection", + "version": "v6.1.2", + "source": { + "type": "git", + "url": "https://github.com/symfony/dependency-injection.git", + "reference": "5635ff016a814d7984b1c4644ad28e7df546077b" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/dependency-injection/zipball/5635ff016a814d7984b1c4644ad28e7df546077b", + "reference": "5635ff016a814d7984b1c4644ad28e7df546077b", + "shasum": "" + }, + "require": { + "php": ">=8.1", + "psr/container": "^1.1|^2.0", + "symfony/deprecation-contracts": "^2.1|^3", + "symfony/service-contracts": "^1.1.6|^2.0|^3.0" + }, + "conflict": { + "ext-psr": "<1.1|>=2", + "symfony/config": "<6.1", + "symfony/finder": "<5.4", + "symfony/proxy-manager-bridge": "<5.4", + "symfony/yaml": "<5.4" + }, + "provide": { + "psr/container-implementation": "1.1|2.0", + "symfony/service-implementation": "1.1|2.0|3.0" + }, + "require-dev": { + "symfony/config": "^6.1", + "symfony/expression-language": "^5.4|^6.0", + "symfony/yaml": "^5.4|^6.0" + }, + "suggest": { + "symfony/config": "", + "symfony/expression-language": "For using expressions in service container configuration", + "symfony/finder": "For using double-star glob patterns or when GLOB_BRACE portability is required", + "symfony/proxy-manager-bridge": "Generate service proxies to lazy load them", + "symfony/yaml": "" + }, + "type": "library", + "autoload": { + "psr-4": { + "Symfony\\Component\\DependencyInjection\\": "" + }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Fabien Potencier", + "email": "fabien@symfony.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + } + ], + "description": "Allows you to standardize and centralize the way objects are constructed in your application", + "homepage": "https://symfony.com", + "support": { + "source": "https://github.com/symfony/dependency-injection/tree/v6.1.2" + }, + "funding": [ + { + "url": "https://symfony.com/sponsor", + "type": "custom" + }, + { + "url": "https://github.com/fabpot", + "type": "github" + }, + { + "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", + "type": "tidelift" + } + ], + "time": "2022-06-26T13:01:30+00:00" + }, + { + "name": "symfony/deprecation-contracts", "version": "v3.1.1", "source": { "type": "git", @@ -4257,6 +4747,168 @@ ], "time": "2022-02-25T11:15:52+00:00" }, + { + "name": "symfony/event-dispatcher", + "version": "v6.1.0", + "source": { + "type": "git", + "url": "https://github.com/symfony/event-dispatcher.git", + "reference": "a0449a7ad7daa0f7c0acd508259f80544ab5a347" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/event-dispatcher/zipball/a0449a7ad7daa0f7c0acd508259f80544ab5a347", + "reference": "a0449a7ad7daa0f7c0acd508259f80544ab5a347", + "shasum": "" + }, + "require": { + "php": ">=8.1", + "symfony/event-dispatcher-contracts": "^2|^3" + }, + "conflict": { + "symfony/dependency-injection": "<5.4" + }, + "provide": { + "psr/event-dispatcher-implementation": "1.0", + "symfony/event-dispatcher-implementation": "2.0|3.0" + }, + "require-dev": { + "psr/log": "^1|^2|^3", + "symfony/config": "^5.4|^6.0", + "symfony/dependency-injection": "^5.4|^6.0", + "symfony/error-handler": "^5.4|^6.0", + "symfony/expression-language": "^5.4|^6.0", + "symfony/http-foundation": "^5.4|^6.0", + "symfony/service-contracts": "^1.1|^2|^3", + "symfony/stopwatch": "^5.4|^6.0" + }, + "suggest": { + "symfony/dependency-injection": "", + "symfony/http-kernel": "" + }, + "type": "library", + "autoload": { + "psr-4": { + "Symfony\\Component\\EventDispatcher\\": "" + }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Fabien Potencier", + "email": "fabien@symfony.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + } + ], + "description": "Provides tools that allow your application components to communicate with each other by dispatching events and listening to them", + "homepage": "https://symfony.com", + "support": { + "source": "https://github.com/symfony/event-dispatcher/tree/v6.1.0" + }, + "funding": [ + { + "url": "https://symfony.com/sponsor", + "type": "custom" + }, + { + "url": "https://github.com/fabpot", + "type": "github" + }, + { + "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", + "type": "tidelift" + } + ], + "time": "2022-05-05T16:51:07+00:00" + }, + { + "name": "symfony/event-dispatcher-contracts", + "version": "v3.1.1", + "source": { + "type": "git", + "url": "https://github.com/symfony/event-dispatcher-contracts.git", + "reference": "02ff5eea2f453731cfbc6bc215e456b781480448" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/event-dispatcher-contracts/zipball/02ff5eea2f453731cfbc6bc215e456b781480448", + "reference": "02ff5eea2f453731cfbc6bc215e456b781480448", + "shasum": "" + }, + "require": { + "php": ">=8.1", + "psr/event-dispatcher": "^1" + }, + "suggest": { + "symfony/event-dispatcher-implementation": "" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-main": "3.1-dev" + }, + "thanks": { + "name": "symfony/contracts", + "url": "https://github.com/symfony/contracts" + } + }, + "autoload": { + "psr-4": { + "Symfony\\Contracts\\EventDispatcher\\": "" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Nicolas Grekas", + "email": "p@tchwork.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + } + ], + "description": "Generic abstractions related to dispatching event", + "homepage": "https://symfony.com", + "keywords": [ + "abstractions", + "contracts", + "decoupling", + "interfaces", + "interoperability", + "standards" + ], + "support": { + "source": "https://github.com/symfony/event-dispatcher-contracts/tree/v3.1.1" + }, + "funding": [ + { + "url": "https://symfony.com/sponsor", + "type": "custom" + }, + { + "url": "https://github.com/fabpot", + "type": "github" + }, + { + "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", + "type": "tidelift" + } + ], + "time": "2022-02-25T11:15:52+00:00" + }, { "name": "symfony/filesystem", "version": "v6.1.0", @@ -4945,6 +5597,257 @@ ], "time": "2022-06-26T16:35:04+00:00" }, + { + "name": "symfony/translation", + "version": "v6.1.0", + "source": { + "type": "git", + "url": "https://github.com/symfony/translation.git", + "reference": "b254416631615bc6fe49b0a67f18658827288147" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/translation/zipball/b254416631615bc6fe49b0a67f18658827288147", + "reference": "b254416631615bc6fe49b0a67f18658827288147", + "shasum": "" + }, + "require": { + "php": ">=8.1", + "symfony/polyfill-mbstring": "~1.0", + "symfony/translation-contracts": "^2.3|^3.0" + }, + "conflict": { + "symfony/config": "<5.4", + "symfony/console": "<5.4", + "symfony/dependency-injection": "<5.4", + "symfony/http-kernel": "<5.4", + "symfony/twig-bundle": "<5.4", + "symfony/yaml": "<5.4" + }, + "provide": { + "symfony/translation-implementation": "2.3|3.0" + }, + "require-dev": { + "psr/log": "^1|^2|^3", + "symfony/config": "^5.4|^6.0", + "symfony/console": "^5.4|^6.0", + "symfony/dependency-injection": "^5.4|^6.0", + "symfony/finder": "^5.4|^6.0", + "symfony/http-client-contracts": "^1.1|^2.0|^3.0", + "symfony/http-kernel": "^5.4|^6.0", + "symfony/intl": "^5.4|^6.0", + "symfony/polyfill-intl-icu": "^1.21", + "symfony/routing": "^5.4|^6.0", + "symfony/service-contracts": "^1.1.2|^2|^3", + "symfony/yaml": "^5.4|^6.0" + }, + "suggest": { + "psr/log-implementation": "To use logging capability in translator", + "symfony/config": "", + "symfony/yaml": "" + }, + "type": "library", + "autoload": { + "files": [ + "Resources/functions.php" + ], + "psr-4": { + "Symfony\\Component\\Translation\\": "" + }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Fabien Potencier", + "email": "fabien@symfony.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + } + ], + "description": "Provides tools to internationalize your application", + "homepage": "https://symfony.com", + "support": { + "source": "https://github.com/symfony/translation/tree/v6.1.0" + }, + "funding": [ + { + "url": "https://symfony.com/sponsor", + "type": "custom" + }, + { + "url": "https://github.com/fabpot", + "type": "github" + }, + { + "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", + "type": "tidelift" + } + ], + "time": "2022-05-11T12:12:29+00:00" + }, + { + "name": "symfony/translation-contracts", + "version": "v3.1.1", + "source": { + "type": "git", + "url": "https://github.com/symfony/translation-contracts.git", + "reference": "606be0f48e05116baef052f7f3abdb345c8e02cc" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/translation-contracts/zipball/606be0f48e05116baef052f7f3abdb345c8e02cc", + "reference": "606be0f48e05116baef052f7f3abdb345c8e02cc", + "shasum": "" + }, + "require": { + "php": ">=8.1" + }, + "suggest": { + "symfony/translation-implementation": "" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-main": "3.1-dev" + }, + "thanks": { + "name": "symfony/contracts", + "url": "https://github.com/symfony/contracts" + } + }, + "autoload": { + "psr-4": { + "Symfony\\Contracts\\Translation\\": "" + }, + "exclude-from-classmap": [ + "/Test/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Nicolas Grekas", + "email": "p@tchwork.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + } + ], + "description": "Generic abstractions related to translation", + "homepage": "https://symfony.com", + "keywords": [ + "abstractions", + "contracts", + "decoupling", + "interfaces", + "interoperability", + "standards" + ], + "support": { + "source": "https://github.com/symfony/translation-contracts/tree/v3.1.1" + }, + "funding": [ + { + "url": "https://symfony.com/sponsor", + "type": "custom" + }, + { + "url": "https://github.com/fabpot", + "type": "github" + }, + { + "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", + "type": "tidelift" + } + ], + "time": "2022-06-27T17:24:16+00:00" + }, + { + "name": "symfony/yaml", + "version": "v6.1.2", + "source": { + "type": "git", + "url": "https://github.com/symfony/yaml.git", + "reference": "b01c4e7dc6a51cbf114567af04a19789fd1011fe" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/symfony/yaml/zipball/b01c4e7dc6a51cbf114567af04a19789fd1011fe", + "reference": "b01c4e7dc6a51cbf114567af04a19789fd1011fe", + "shasum": "" + }, + "require": { + "php": ">=8.1", + "symfony/polyfill-ctype": "^1.8" + }, + "conflict": { + "symfony/console": "<5.4" + }, + "require-dev": { + "symfony/console": "^5.4|^6.0" + }, + "suggest": { + "symfony/console": "For validating YAML files using the lint command" + }, + "bin": [ + "Resources/bin/yaml-lint" + ], + "type": "library", + "autoload": { + "psr-4": { + "Symfony\\Component\\Yaml\\": "" + }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Fabien Potencier", + "email": "fabien@symfony.com" + }, + { + "name": "Symfony Community", + "homepage": "https://symfony.com/contributors" + } + ], + "description": "Loads and dumps YAML files", + "homepage": "https://symfony.com", + "support": { + "source": "https://github.com/symfony/yaml/tree/v6.1.2" + }, + "funding": [ + { + "url": "https://symfony.com/sponsor", + "type": "custom" + }, + { + "url": "https://github.com/fabpot", + "type": "github" + }, + { + "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", + "type": "tidelift" + } + ], + "time": "2022-06-20T12:01:07+00:00" + }, { "name": "thecodingmachine/safe", "version": "v2.2.2", diff --git a/docker-compose.yml b/docker-compose.yml index 6c95128..4ed2a76 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,7 +5,7 @@ services: - ALLOW_ANONYMOUS_LOGIN=yes kafka1: - image: bitnami/kafka:2.8.0 + image: bitnami/kafka:3.2.0 ports: - "9093:9093" environment: @@ -17,9 +17,7 @@ services: - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT kafka2: - image: bitnami/kafka:2.8.0 - profiles: - - clustered + image: bitnami/kafka:3.2.0 ports: - "9094:9094" environment: diff --git a/phpcs.xml.dist b/phpcs.xml.dist index 635847f..4051cd4 100644 --- a/phpcs.xml.dist +++ b/phpcs.xml.dist @@ -1,8 +1,10 @@ - + - diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index 03d25a3..da54f3e 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -1,9 +1,49 @@ parameters: ignoreErrors: - - message: "#^Parameter \\#2 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Buffer\\:\\:unpack\\(\\) expects string, mixed given\\.$#" - count: 5 - path: src/Protocol/Buffer.php + message: "#^Cannot use array destructuring on array\\\\|int\\|Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\Request\\|React\\\\Promise\\\\Deferred\\|string\\>\\|int\\|React\\\\Promise\\\\Deferred\\>\\|int\\.$#" + count: 1 + path: src/Client/Channel.php + + - + message: "#^Parameter \\#1 \\$errorCode of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\ApiVersionsResponse constructor expects int, mixed given\\.$#" + count: 1 + path: src/Protocol/API/ApiVersionsResponse.php + + - + message: "#^Parameter \\#2 \\$apiVersions of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\ApiVersionsResponse constructor expects array\\, mixed given\\.$#" + count: 1 + path: src/Protocol/API/ApiVersionsResponse.php + + - + message: "#^Parameter \\#3 \\$throttleTime of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\ApiVersionsResponse constructor expects int, mixed given\\.$#" + count: 1 + path: src/Protocol/API/ApiVersionsResponse.php + + - + message: "#^Parameter \\#1 \\$errorCode of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\MetadataResponse constructor expects int, mixed given\\.$#" + count: 1 + path: src/Protocol/API/MetadataResponse.php + + - + message: "#^Parameter \\#2 \\$apiVersions of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\MetadataResponse constructor expects array\\, mixed given\\.$#" + count: 1 + path: src/Protocol/API/MetadataResponse.php + + - + message: "#^Parameter \\#3 \\$throttleTime of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\MetadataResponse constructor expects int, mixed given\\.$#" + count: 1 + path: src/Protocol/API/MetadataResponse.php + + - + message: "#^Parameter \\#1 \\$definition of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\\\Parser\\:\\:parse\\(\\) expects array\\\\|class\\-string\\\\>, array\\\\|string\\> given\\.$#" + count: 1 + path: src/Protocol/API/Request.php + + - + message: "#^Parameter \\#1 \\$definition of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\\\Parser\\:\\:parse\\(\\) expects array\\\\|class\\-string\\\\>, array\\\\|string\\> given\\.$#" + count: 1 + path: src/Protocol/API/Response.php - message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\:\\:validateField\\(\\) expects array\\, mixed given\\.$#" @@ -15,6 +55,21 @@ parameters: count: 1 path: src/Protocol/Schema.php + - + message: "#^Parameter \\#1 \\$definition of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\\\Parser\\:\\:parse\\(\\) expects array\\\\|class\\-string\\\\>, array\\ given\\.$#" + count: 1 + path: src/Protocol/Schema/Parser.php + + - + message: "#^Parameter \\#1 \\$fieldDefinition of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\\\Parser\\:\\:parseFieldType\\(\\) expects array\\\\|class\\-string\\, mixed given\\.$#" + count: 1 + path: src/Protocol/Schema/Parser.php + + - + message: "#^Parameter \\#2 \\$nullable of class Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\\\ArrayOf constructor expects bool, mixed given\\.$#" + count: 1 + path: src/Protocol/Schema/Parser.php + - message: "#^Parameter \\#3 \\.\\.\\.\\$values of function sprintf expects bool\\|float\\|int\\|string\\|null, mixed given\\.$#" count: 1 @@ -65,11 +120,6 @@ parameters: count: 1 path: src/Protocol/Type/Int8.php - - - message: "#^Parameter \\#1 \\$content of static method Lcobucci\\\\Kafka\\\\Protocol\\\\Buffer\\:\\:fromContent\\(\\) expects string, mixed given\\.$#" - count: 1 - path: src/Protocol/Type/NonNullableBytes.php - - message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\:\\:guardClass\\(\\) expects object\\|null, mixed given\\.$#" count: 1 @@ -80,11 +130,6 @@ parameters: count: 2 path: src/Protocol/Type/NonNullableBytes.php - - - message: "#^Method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\\\NonNullableString\\:\\:read\\(\\) should return string but returns mixed\\.$#" - count: 1 - path: src/Protocol/Type/NonNullableString.php - - message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\:\\:guardLength\\(\\) expects string\\|null, mixed given\\.$#" count: 1 @@ -100,21 +145,11 @@ parameters: count: 1 path: src/Protocol/Type/NonNullableString.php - - - message: "#^Parameter \\#1 \\$content of static method Lcobucci\\\\Kafka\\\\Protocol\\\\Buffer\\:\\:fromContent\\(\\) expects string, mixed given\\.$#" - count: 1 - path: src/Protocol/Type/NullableBytes.php - - message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\:\\:guardClass\\(\\) expects object\\|null, mixed given\\.$#" count: 1 path: src/Protocol/Type/NullableBytes.php - - - message: "#^Method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\\\NullableString\\:\\:read\\(\\) should return string\\|null but returns mixed\\.$#" - count: 1 - path: src/Protocol/Type/NullableString.php - - message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\:\\:guardLength\\(\\) expects string\\|null, mixed given\\.$#" count: 1 diff --git a/src/Client/Channel.php b/src/Client/Channel.php new file mode 100644 index 0000000..1e7073d --- /dev/null +++ b/src/Client/Channel.php @@ -0,0 +1,161 @@ + */ + private SplPriorityQueue $processingQueue; + /** @var SplQueue */ + private SplQueue $inFlightQueue; + private ?ConnectionInterface $connection = null; + private readonly LoopInterface $loop; + + public function __construct( + private ConnectorInterface $connector, + private LoggerInterface $logger, + private Parser $schemaParser, + private Node $node, + ) { + $this->loop = Loop::get(); + $this->processingQueue = new SplPriorityQueue(); + $this->inFlightQueue = new SplQueue(); + } + + public function send(Request $request, int $correlation, string $client): PromiseInterface + { + $this->ensureConnected(); + + $deferred = new Deferred(); + $this->processingQueue->insert([[$request, $correlation, $client], $deferred], 0); + + return $deferred->promise(); + } + + private function ensureConnected(): void + { + if ($this->connection !== null) { + return; + } + + $this->connect(); + } + + private function connect(): void + { + $this->logger->info('Opening connection to node', ['node' => $this->node]); + + $this->connector->connect($this->node->host . ':' . $this->node->port)->then( + $this->initializeConnection(...), + function (Throwable $throwable): void { + $this->logger->error( + 'Error while connecting to node #' . $this->node->id, + ['node' => $this->node, 'exception' => $throwable], + ); + + $this->loop->addTimer(1, $this->connect(...)); + }, + ); + } + + public function initializeConnection(ConnectionInterface $connection): void + { + $this->logger->info('Connection to node established', ['node' => $this->node]); + + $this->connection = $connection; + $this->connection->on('data', $this->onData(...)); + $this->connection->on('error', $this->cleanUpConnection(...)); + $this->connection->on('close', $this->cleanUpConnection(...)); + + $this->loop->futureTick($this->processQueue(...)); + } + + public function processQueue(): void + { + if (! $this->processingQueue->valid()) { + return; + } + + $this->logger->debug('Processing message queue of node', ['node' => $this->node]); + + for ($i = 0, $max = min(15, $this->processingQueue->count()); $i < $max; ++$i) { + [[$request, $correlation, $client], $deferred] = $this->processingQueue->current(); + + $this->processingQueue->next(); + $headers = $this->sendMessage($request, $correlation, $client); + $this->inFlightQueue->enqueue([$headers, $deferred]); + } + + $this->loop->futureTick($this->processQueue(...)); + } + + private function sendMessage(Request $request, int $correlation, string $client): RequestHeaders + { + $headers = new RequestHeaders( + $request->apiKey(), + $request->highestSupportedVersion(), + $correlation, + $client, + $request->responseClass()::parse(...), + ); + + $header = $headers->toBuffer($this->schemaParser); + $body = $request->toBuffer($this->schemaParser, $headers->apiVersion); + + $length = Buffer::allocate(4); + $length->writeInt($header->length() + $body->length()); + + assert($this->connection instanceof ConnectionInterface); + $this->connection->write($length->bytes() . $header->bytes() . $body->bytes()); + + return $headers; + } + + public function onData(string $data): void + { + $this->logger->debug('Message received', ['node' => $this->node]); + + [$headers, $deferred] = $this->inFlightQueue->dequeue(); + + $buffer = Buffer::fromContent($data); + $length = $buffer->readInt(); + + $deferred->resolve($headers->parseResponse(Buffer::fromContent($buffer->read($length)), $this->schemaParser)); + } + + public function disconnect(): void + { + if ($this->connection === null) { + return; + } + + $this->connection->end(); + } + + public function cleanUpConnection(): void + { + $this->logger->info('Closing connection to node', ['node' => $this->node]); + + $this->connection = null; + } +} diff --git a/src/Cluster.php b/src/Cluster.php new file mode 100644 index 0000000..a86a1a0 --- /dev/null +++ b/src/Cluster.php @@ -0,0 +1,30 @@ +topic; + } + + public function value(): mixed + { + return $this->value; + } + + public function key(): mixed + { + return $this->key; + } + + /** @return string[]|null */ + public function headers(): ?array + { + return $this->headers; + } + + public function partition(): ?int + { + return $this->partition; + } + + public function timestamp(): ?int + { + return $this->timestamp; + } +} diff --git a/src/Protocol/API/ApiVersionsRequest.php b/src/Protocol/API/ApiVersionsRequest.php new file mode 100644 index 0000000..78603a9 --- /dev/null +++ b/src/Protocol/API/ApiVersionsRequest.php @@ -0,0 +1,46 @@ + Type\Int16::class, + 'api_versions' => [ + '_items' => [ + 'api_key' => Type\Int16::class, + 'min_version' => Type\Int16::class, + 'max_version' => Type\Int16::class, + ], + ], + ]; + + private const SCHEMA_V1 = self::SCHEMA_V0 + ['throttle_time_ms' => Type\Int32::class]; + + /** @param list $apiVersions */ + public function __construct(public int $errorCode, public array $apiVersions, public int $throttleTime) + { + } + + /** @inheritdoc */ + public static function fromArray(array $data): static + { + return new self( + $data['error_code'], + $data['api_versions'], + $data['throttle_time_ms'] ?? 0, + ); + } + + /** @inheritdoc */ + public static function schemaDefinition(int $version): array + { + return self::SCHEMA_VERSIONS[$version]; + } +} diff --git a/src/Protocol/API/MetadataRequest.php b/src/Protocol/API/MetadataRequest.php new file mode 100644 index 0000000..119bee0 --- /dev/null +++ b/src/Protocol/API/MetadataRequest.php @@ -0,0 +1,61 @@ + ['_items' => Type\NonNullableString::class]], + self::SCHEMA_V1, + self::SCHEMA_V1, + self::SCHEMA_V1, + self::SCHEMA_V4, + self::SCHEMA_V4, + self::SCHEMA_V4, + self::SCHEMA_V4, + ]; + + private const SCHEMA_V1 = [ + 'topics' => ['_nullable' => true, '_items' => Type\NonNullableString::class], + ]; + + private const SCHEMA_V4 = self::SCHEMA_V1 + ['allow_auto_topic_creation' => Type\Boolean::class]; + + /** @param list|null $topics */ + public function __construct(public ?array $topics, public bool $allowTopicCreation) + { + } + + public function apiKey(): int + { + return 3; + } + + public function highestSupportedVersion(): int + { + return 7; + } + + /** @inheritdoc */ + public static function schemaDefinition(int $version): array + { + return self::SCHEMA_VERSIONS[$version]; + } + + /** @inheritdoc */ + public function asArray(int $version): array + { + return [ + 'topics' => $version === 0 && $this->topics === null ? [] : $this->topics, + 'allow_auto_topic_creation' => $this->allowTopicCreation, + ]; + } + + public function responseClass(): string + { + return MetadataResponse::class; + } +} diff --git a/src/Protocol/API/MetadataResponse.php b/src/Protocol/API/MetadataResponse.php new file mode 100644 index 0000000..d5c6a01 --- /dev/null +++ b/src/Protocol/API/MetadataResponse.php @@ -0,0 +1,48 @@ + Type\Int16::class, + 'api_versions' => [ + '_items' => [ + 'api_key' => Type\Int16::class, + 'min_version' => Type\Int16::class, + 'max_version' => Type\Int16::class, + ], + ], + ]; + private const SCHEMA_V1 = self::SCHEMA_V0 + ['throttle_time_ms' => Type\Int32::class]; + + /** @param list $apiVersions */ + public function __construct(public int $errorCode, public array $apiVersions, public int $throttleTime) + { + } + + /** @inheritdoc */ + public static function fromArray(array $data): static + { + return new self( + $data['error_code'], + $data['api_versions'], + $data['throttle_time_ms'] ?? 0, + ); + } + + /** @inheritdoc */ + public static function schemaDefinition(int $version): array + { + return self::SCHEMA_VERSIONS[$version]; + } +} diff --git a/src/Protocol/API/Request.php b/src/Protocol/API/Request.php new file mode 100644 index 0000000..48823f5 --- /dev/null +++ b/src/Protocol/API/Request.php @@ -0,0 +1,34 @@ +parse(static::schemaDefinition($version)); + $data = $this->asArray($version); + + $buffer = Buffer::allocate($schema->sizeOf($data)); + $schema->write($data, $buffer); + + return $buffer; + } + + /** @return array> */ + abstract public static function schemaDefinition(int $version): array; + + /** @return array */ + abstract public function asArray(int $version): array; + + /** @return class-string */ + abstract public function responseClass(): string; +} diff --git a/src/Protocol/API/RequestHeaders.php b/src/Protocol/API/RequestHeaders.php new file mode 100644 index 0000000..90ddd8c --- /dev/null +++ b/src/Protocol/API/RequestHeaders.php @@ -0,0 +1,56 @@ + Type\Int16::class, + 'api_version' => Type\Int16::class, + 'correlation_id' => Type\Int32::class, + 'client_id' => Type\NullableString::class, + ]; + + public function __construct( + public int $apiKey, + public int $apiVersion, + public int $correlationId, + public string $client, + private Closure $responseFactory, + ) { + } + + public function toBuffer(Parser $schemaParser): Buffer + { + $schema = $schemaParser->parse(self::SCHEMA); + + $data = [ + 'api_key' => $this->apiKey, + 'api_version' => $this->apiVersion, + 'correlation_id' => $this->correlationId, + 'client_id' => $this->client, + ]; + + $buffer = Buffer::allocate($schema->sizeOf($data)); + $schema->write($data, $buffer); + + return $buffer; + } + + public function parseResponse(Buffer $buffer, Parser $schemaParser): Response + { + $correlationId = $buffer->readInt(); + assert($this->correlationId === $correlationId); + + return ($this->responseFactory)($buffer, $schemaParser, $this->apiVersion); + } +} diff --git a/src/Protocol/API/Response.php b/src/Protocol/API/Response.php new file mode 100644 index 0000000..ad73ca1 --- /dev/null +++ b/src/Protocol/API/Response.php @@ -0,0 +1,23 @@ +parse(static::schemaDefinition($version)); + + return static::fromArray($schema->read($buffer)); + } + + /** @param array $data */ + abstract public static function fromArray(array $data): static; + + /** @return array> */ + abstract public static function schemaDefinition(int $version): array; +} diff --git a/src/Protocol/Buffer.php b/src/Protocol/Buffer.php index 1723f7b..679982d 100644 --- a/src/Protocol/Buffer.php +++ b/src/Protocol/Buffer.php @@ -29,8 +29,10 @@ final class Buffer private int $position = 0; - private function __construct(private string $bytes, private int $length) - { + private function __construct( + private string $bytes, + private readonly int $length, + ) { } /** @@ -138,7 +140,7 @@ public function write(string $value): void * * @throws NotEnoughBytesAllocated When trying to read from an invalid position. */ - public function read(int $length): mixed + public function read(int $length): string { $offset = $this->nextIndex($length); diff --git a/src/Protocol/Message.php b/src/Protocol/Message.php new file mode 100644 index 0000000..921fbea --- /dev/null +++ b/src/Protocol/Message.php @@ -0,0 +1,8 @@ +|array> $definition */ + public function parse(array $definition): Schema + { + return new Schema(...$this->parseFields($definition)); + } + + /** + * @param array|array> $definition + * + * @return iterable + */ + private function parseFields(array $definition): iterable + { + foreach ($definition as $name => $fieldDefinition) { + yield new Field($name, $this->parseFieldType($fieldDefinition)); + } + } + + /** @param class-string|array $fieldDefinition */ + private function parseFieldType(string|array $fieldDefinition): Type + { + if (is_string($fieldDefinition)) { + return new $fieldDefinition(); + } + + if (array_key_exists('_items', $fieldDefinition)) { + return new Type\ArrayOf( + $this->parseFieldType($fieldDefinition['_items']), + $fieldDefinition['_nullable'] ?? false, + ); + } + + return $this->parse($fieldDefinition); + } +} diff --git a/test.php b/test.php new file mode 100644 index 0000000..9f9636f --- /dev/null +++ b/test.php @@ -0,0 +1,41 @@ +brokers[random_int(0, 1)] +); + +Loop::addSignal( + SIGINT, + static function () use ($channel): void { + $channel->disconnect(); + + Loop::stop(); + } +); + +$channel->send(new ApiVersionsRequest(), 0, 'producer-test')->then( + static function (Response $response) use ($logger): void { + $logger->debug('Response received', ['response' => $response]); + } +); + +Loop::run(); diff --git a/testing.php b/testing.php new file mode 100644 index 0000000..23920ea --- /dev/null +++ b/testing.php @@ -0,0 +1,119 @@ + [ + 'version' => 2, + 'request' => new Schema(), + 'response' => new Schema($errorCode, $apiVersions, $throttleTime), + ], +]; + +$currentId = 0; +$client = 'testing-' . bin2hex(random_bytes(5)); +$messages = [1 => API_VERSIONS]; + +$parseResponse = function (string $rawContent) use ($schemas, $responseHeader, &$messages): array { + $response = Message::fromContent($rawContent); + $response->readInt(); + $correlationId = $responseHeader->read($response)['correlation_id']; + + $apiKey = $messages[$correlationId]; + unset($messages[$correlationId]); + + $schema = $schemas[$apiKey]['response']; + assert($schema instanceof Schema); + + return [ + 'correlation_id' => $correlationId, + 'content' => $schema->read($response), + ]; +}; + + +$connector = new Connector($loop); + +$connector->connect('localhost:9093')->then( + static function (ConnectionInterface $connection) use ($parseResponse, $loop): void { + $loop->addSignal( + SIGINT, + $func = static function () use ($connection, $loop, &$func): void { + $connection->end(); + $loop->removeSignal(SIGINT, $func); + } + ); + + $connection->on('data', static function (string $chunk) use ($parseResponse, $func): void { + $response = $parseResponse($chunk); + print_r($response); + $func(); + }); + + $connection->on('error', static function (Throwable $e): void { + echo $e->getMessage(), PHP_EOL; + }); + + + $connection->write(hex2bin('0000001c')); + $connection->write(hex2bin('0012000200000001001274657374696e672d64656339303232333833')); + } +)->otherwise( + static function (Throwable $e): void { + echo $e->getMessage(), PHP_EOL; + } +); + +$loop->run(); + +//###################################################### +// +//$producer = new Producer(); +//$producer->send(); +//$producer->close();