From 8d2a300c065f6ae586abdb8ed2b3a9a9b9cd471c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=ADs=20Cobucci?= Date: Tue, 26 Jul 2022 00:51:50 +0200 Subject: [PATCH] Blah MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Luís Cobucci --- composer.json | 1 + composer.lock | 80 ++++++++++- docker-compose.yml | 6 +- phpstan-baseline.neon | 81 ++++++++---- src/Client/Channel.php | 161 +++++++++++++++++++++++ src/Cluster.php | 30 +++++ src/Node.php | 11 ++ src/Producer/Record.php | 43 ++++++ src/Protocol/API/ApiVersionsRequest.php | 46 +++++++ src/Protocol/API/ApiVersionsResponse.php | 50 +++++++ src/Protocol/API/MetadataRequest.php | 61 +++++++++ src/Protocol/API/MetadataResponse.php | 48 +++++++ src/Protocol/API/Request.php | 34 +++++ src/Protocol/API/RequestHeaders.php | 56 ++++++++ src/Protocol/API/Response.php | 23 ++++ src/Protocol/Buffer.php | 8 +- src/Protocol/Message.php | 8 ++ src/Protocol/Schema/Parser.php | 48 +++++++ test.php | 41 ++++++ testing.php | 119 +++++++++++++++++ 20 files changed, 924 insertions(+), 31 deletions(-) create mode 100644 src/Client/Channel.php create mode 100644 src/Cluster.php create mode 100644 src/Node.php create mode 100644 src/Producer/Record.php create mode 100644 src/Protocol/API/ApiVersionsRequest.php create mode 100644 src/Protocol/API/ApiVersionsResponse.php create mode 100644 src/Protocol/API/MetadataRequest.php create mode 100644 src/Protocol/API/MetadataResponse.php create mode 100644 src/Protocol/API/Request.php create mode 100644 src/Protocol/API/RequestHeaders.php create mode 100644 src/Protocol/API/Response.php create mode 100644 src/Protocol/Message.php create mode 100644 src/Protocol/Schema/Parser.php create mode 100644 test.php create mode 100644 testing.php diff --git a/composer.json b/composer.json index 9cd1d677..fa7001b9 100644 --- a/composer.json +++ b/composer.json @@ -22,6 +22,7 @@ "php-64bit": "^8.1", "ext-pcntl": "*", "psr/log": "^3.0", + "react/async": "^4.0", "react/socket": "^1.6" }, "require-dev": { diff --git a/composer.lock b/composer.lock index 98baa170..bb8067ca 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "f6d099e62aeed7666a1cae996357fe86", + "content-hash": "a1c4404a9eece5511eac5153a2920dc1", "packages": [ { "name": "evenement/evenement", @@ -103,6 +103,84 @@ }, "time": "2021-07-14T16:46:02+00:00" }, + { + "name": "react/async", + "version": "v4.0.0", + "source": { + "type": "git", + "url": "https://github.com/reactphp/async.git", + "reference": "2aa8d89057e1059f59666e4204100636249b7be0" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/reactphp/async/zipball/2aa8d89057e1059f59666e4204100636249b7be0", + "reference": "2aa8d89057e1059f59666e4204100636249b7be0", + "shasum": "" + }, + "require": { + "php": ">=8.1", + "react/event-loop": "^1.2", + "react/promise": "^3.0 || ^2.8 || ^1.2.1" + }, + "require-dev": { + "phpunit/phpunit": "^9.3" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions_include.php" + ], + "psr-4": { + "React\\Async\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Christian Lück", + "email": "christian@clue.engineering", + "homepage": "https://clue.engineering/" + }, + { + "name": "Cees-Jan Kiewiet", + "email": "reactphp@ceesjankiewiet.nl", + "homepage": "https://wyrihaximus.net/" + }, + { + "name": "Jan Sorgalla", + "email": "jsorgalla@gmail.com", + "homepage": "https://sorgalla.com/" + }, + { + "name": "Chris Boden", + "email": "cboden@gmail.com", + "homepage": "https://cboden.dev/" + } + ], + "description": "Async utilities and fibers for ReactPHP", + "keywords": [ + "async", + "reactphp" + ], + "support": { + "issues": "https://github.com/reactphp/async/issues", + "source": "https://github.com/reactphp/async/tree/v4.0.0" + }, + "funding": [ + { + "url": "https://github.com/WyriHaximus", + "type": "github" + }, + { + "url": "https://github.com/clue", + "type": "github" + } + ], + "time": "2022-07-11T14:21:02+00:00" + }, { "name": "react/cache", "version": "v1.1.1", diff --git a/docker-compose.yml b/docker-compose.yml index 6c951286..4ed2a765 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,7 +5,7 @@ services: - ALLOW_ANONYMOUS_LOGIN=yes kafka1: - image: bitnami/kafka:2.8.0 + image: bitnami/kafka:3.2.0 ports: - "9093:9093" environment: @@ -17,9 +17,7 @@ services: - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT kafka2: - image: bitnami/kafka:2.8.0 - profiles: - - clustered + image: bitnami/kafka:3.2.0 ports: - "9094:9094" environment: diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index 03d25a3d..da54f3ea 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -1,9 +1,49 @@ parameters: ignoreErrors: - - message: "#^Parameter \\#2 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Buffer\\:\\:unpack\\(\\) expects string, mixed given\\.$#" - count: 5 - path: src/Protocol/Buffer.php + message: "#^Cannot use array destructuring on array\\\\|int\\|Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\Request\\|React\\\\Promise\\\\Deferred\\|string\\>\\|int\\|React\\\\Promise\\\\Deferred\\>\\|int\\.$#" + count: 1 + path: src/Client/Channel.php + + - + message: "#^Parameter \\#1 \\$errorCode of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\ApiVersionsResponse constructor expects int, mixed given\\.$#" + count: 1 + path: src/Protocol/API/ApiVersionsResponse.php + + - + message: "#^Parameter \\#2 \\$apiVersions of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\ApiVersionsResponse constructor expects array\\, mixed given\\.$#" + count: 1 + path: src/Protocol/API/ApiVersionsResponse.php + + - + message: "#^Parameter \\#3 \\$throttleTime of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\ApiVersionsResponse constructor expects int, mixed given\\.$#" + count: 1 + path: src/Protocol/API/ApiVersionsResponse.php + + - + message: "#^Parameter \\#1 \\$errorCode of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\MetadataResponse constructor expects int, mixed given\\.$#" + count: 1 + path: src/Protocol/API/MetadataResponse.php + + - + message: "#^Parameter \\#2 \\$apiVersions of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\MetadataResponse constructor expects array\\, mixed given\\.$#" + count: 1 + path: src/Protocol/API/MetadataResponse.php + + - + message: "#^Parameter \\#3 \\$throttleTime of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\MetadataResponse constructor expects int, mixed given\\.$#" + count: 1 + path: src/Protocol/API/MetadataResponse.php + + - + message: "#^Parameter \\#1 \\$definition of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\\\Parser\\:\\:parse\\(\\) expects array\\\\|class\\-string\\\\>, array\\\\|string\\> given\\.$#" + count: 1 + path: src/Protocol/API/Request.php + + - + message: "#^Parameter \\#1 \\$definition of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\\\Parser\\:\\:parse\\(\\) expects array\\\\|class\\-string\\\\>, array\\\\|string\\> given\\.$#" + count: 1 + path: src/Protocol/API/Response.php - message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\:\\:validateField\\(\\) expects array\\, mixed given\\.$#" @@ -15,6 +55,21 @@ parameters: count: 1 path: src/Protocol/Schema.php + - + message: "#^Parameter \\#1 \\$definition of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\\\Parser\\:\\:parse\\(\\) expects array\\\\|class\\-string\\\\>, array\\ given\\.$#" + count: 1 + path: src/Protocol/Schema/Parser.php + + - + message: "#^Parameter \\#1 \\$fieldDefinition of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\\\Parser\\:\\:parseFieldType\\(\\) expects array\\\\|class\\-string\\, mixed given\\.$#" + count: 1 + path: src/Protocol/Schema/Parser.php + + - + message: "#^Parameter \\#2 \\$nullable of class Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\\\ArrayOf constructor expects bool, mixed given\\.$#" + count: 1 + path: src/Protocol/Schema/Parser.php + - message: "#^Parameter \\#3 \\.\\.\\.\\$values of function sprintf expects bool\\|float\\|int\\|string\\|null, mixed given\\.$#" count: 1 @@ -65,11 +120,6 @@ parameters: count: 1 path: src/Protocol/Type/Int8.php - - - message: "#^Parameter \\#1 \\$content of static method Lcobucci\\\\Kafka\\\\Protocol\\\\Buffer\\:\\:fromContent\\(\\) expects string, mixed given\\.$#" - count: 1 - path: src/Protocol/Type/NonNullableBytes.php - - message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\:\\:guardClass\\(\\) expects object\\|null, mixed given\\.$#" count: 1 @@ -80,11 +130,6 @@ parameters: count: 2 path: src/Protocol/Type/NonNullableBytes.php - - - message: "#^Method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\\\NonNullableString\\:\\:read\\(\\) should return string but returns mixed\\.$#" - count: 1 - path: src/Protocol/Type/NonNullableString.php - - message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\:\\:guardLength\\(\\) expects string\\|null, mixed given\\.$#" count: 1 @@ -100,21 +145,11 @@ parameters: count: 1 path: src/Protocol/Type/NonNullableString.php - - - message: "#^Parameter \\#1 \\$content of static method Lcobucci\\\\Kafka\\\\Protocol\\\\Buffer\\:\\:fromContent\\(\\) expects string, mixed given\\.$#" - count: 1 - path: src/Protocol/Type/NullableBytes.php - - message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\:\\:guardClass\\(\\) expects object\\|null, mixed given\\.$#" count: 1 path: src/Protocol/Type/NullableBytes.php - - - message: "#^Method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\\\NullableString\\:\\:read\\(\\) should return string\\|null but returns mixed\\.$#" - count: 1 - path: src/Protocol/Type/NullableString.php - - message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\:\\:guardLength\\(\\) expects string\\|null, mixed given\\.$#" count: 1 diff --git a/src/Client/Channel.php b/src/Client/Channel.php new file mode 100644 index 00000000..1e7073d2 --- /dev/null +++ b/src/Client/Channel.php @@ -0,0 +1,161 @@ + */ + private SplPriorityQueue $processingQueue; + /** @var SplQueue */ + private SplQueue $inFlightQueue; + private ?ConnectionInterface $connection = null; + private readonly LoopInterface $loop; + + public function __construct( + private ConnectorInterface $connector, + private LoggerInterface $logger, + private Parser $schemaParser, + private Node $node, + ) { + $this->loop = Loop::get(); + $this->processingQueue = new SplPriorityQueue(); + $this->inFlightQueue = new SplQueue(); + } + + public function send(Request $request, int $correlation, string $client): PromiseInterface + { + $this->ensureConnected(); + + $deferred = new Deferred(); + $this->processingQueue->insert([[$request, $correlation, $client], $deferred], 0); + + return $deferred->promise(); + } + + private function ensureConnected(): void + { + if ($this->connection !== null) { + return; + } + + $this->connect(); + } + + private function connect(): void + { + $this->logger->info('Opening connection to node', ['node' => $this->node]); + + $this->connector->connect($this->node->host . ':' . $this->node->port)->then( + $this->initializeConnection(...), + function (Throwable $throwable): void { + $this->logger->error( + 'Error while connecting to node #' . $this->node->id, + ['node' => $this->node, 'exception' => $throwable], + ); + + $this->loop->addTimer(1, $this->connect(...)); + }, + ); + } + + public function initializeConnection(ConnectionInterface $connection): void + { + $this->logger->info('Connection to node established', ['node' => $this->node]); + + $this->connection = $connection; + $this->connection->on('data', $this->onData(...)); + $this->connection->on('error', $this->cleanUpConnection(...)); + $this->connection->on('close', $this->cleanUpConnection(...)); + + $this->loop->futureTick($this->processQueue(...)); + } + + public function processQueue(): void + { + if (! $this->processingQueue->valid()) { + return; + } + + $this->logger->debug('Processing message queue of node', ['node' => $this->node]); + + for ($i = 0, $max = min(15, $this->processingQueue->count()); $i < $max; ++$i) { + [[$request, $correlation, $client], $deferred] = $this->processingQueue->current(); + + $this->processingQueue->next(); + $headers = $this->sendMessage($request, $correlation, $client); + $this->inFlightQueue->enqueue([$headers, $deferred]); + } + + $this->loop->futureTick($this->processQueue(...)); + } + + private function sendMessage(Request $request, int $correlation, string $client): RequestHeaders + { + $headers = new RequestHeaders( + $request->apiKey(), + $request->highestSupportedVersion(), + $correlation, + $client, + $request->responseClass()::parse(...), + ); + + $header = $headers->toBuffer($this->schemaParser); + $body = $request->toBuffer($this->schemaParser, $headers->apiVersion); + + $length = Buffer::allocate(4); + $length->writeInt($header->length() + $body->length()); + + assert($this->connection instanceof ConnectionInterface); + $this->connection->write($length->bytes() . $header->bytes() . $body->bytes()); + + return $headers; + } + + public function onData(string $data): void + { + $this->logger->debug('Message received', ['node' => $this->node]); + + [$headers, $deferred] = $this->inFlightQueue->dequeue(); + + $buffer = Buffer::fromContent($data); + $length = $buffer->readInt(); + + $deferred->resolve($headers->parseResponse(Buffer::fromContent($buffer->read($length)), $this->schemaParser)); + } + + public function disconnect(): void + { + if ($this->connection === null) { + return; + } + + $this->connection->end(); + } + + public function cleanUpConnection(): void + { + $this->logger->info('Closing connection to node', ['node' => $this->node]); + + $this->connection = null; + } +} diff --git a/src/Cluster.php b/src/Cluster.php new file mode 100644 index 00000000..a86a1a0e --- /dev/null +++ b/src/Cluster.php @@ -0,0 +1,30 @@ +topic; + } + + public function value(): mixed + { + return $this->value; + } + + public function key(): mixed + { + return $this->key; + } + + /** @return string[]|null */ + public function headers(): ?array + { + return $this->headers; + } + + public function partition(): ?int + { + return $this->partition; + } + + public function timestamp(): ?int + { + return $this->timestamp; + } +} diff --git a/src/Protocol/API/ApiVersionsRequest.php b/src/Protocol/API/ApiVersionsRequest.php new file mode 100644 index 00000000..78603a9a --- /dev/null +++ b/src/Protocol/API/ApiVersionsRequest.php @@ -0,0 +1,46 @@ + Type\Int16::class, + 'api_versions' => [ + '_items' => [ + 'api_key' => Type\Int16::class, + 'min_version' => Type\Int16::class, + 'max_version' => Type\Int16::class, + ], + ], + ]; + + private const SCHEMA_V1 = self::SCHEMA_V0 + ['throttle_time_ms' => Type\Int32::class]; + + /** @param list $apiVersions */ + public function __construct(public int $errorCode, public array $apiVersions, public int $throttleTime) + { + } + + /** @inheritdoc */ + public static function fromArray(array $data): static + { + return new self( + $data['error_code'], + $data['api_versions'], + $data['throttle_time_ms'] ?? 0, + ); + } + + /** @inheritdoc */ + public static function schemaDefinition(int $version): array + { + return self::SCHEMA_VERSIONS[$version]; + } +} diff --git a/src/Protocol/API/MetadataRequest.php b/src/Protocol/API/MetadataRequest.php new file mode 100644 index 00000000..119bee0e --- /dev/null +++ b/src/Protocol/API/MetadataRequest.php @@ -0,0 +1,61 @@ + ['_items' => Type\NonNullableString::class]], + self::SCHEMA_V1, + self::SCHEMA_V1, + self::SCHEMA_V1, + self::SCHEMA_V4, + self::SCHEMA_V4, + self::SCHEMA_V4, + self::SCHEMA_V4, + ]; + + private const SCHEMA_V1 = [ + 'topics' => ['_nullable' => true, '_items' => Type\NonNullableString::class], + ]; + + private const SCHEMA_V4 = self::SCHEMA_V1 + ['allow_auto_topic_creation' => Type\Boolean::class]; + + /** @param list|null $topics */ + public function __construct(public ?array $topics, public bool $allowTopicCreation) + { + } + + public function apiKey(): int + { + return 3; + } + + public function highestSupportedVersion(): int + { + return 7; + } + + /** @inheritdoc */ + public static function schemaDefinition(int $version): array + { + return self::SCHEMA_VERSIONS[$version]; + } + + /** @inheritdoc */ + public function asArray(int $version): array + { + return [ + 'topics' => $version === 0 && $this->topics === null ? [] : $this->topics, + 'allow_auto_topic_creation' => $this->allowTopicCreation, + ]; + } + + public function responseClass(): string + { + return MetadataResponse::class; + } +} diff --git a/src/Protocol/API/MetadataResponse.php b/src/Protocol/API/MetadataResponse.php new file mode 100644 index 00000000..d5c6a015 --- /dev/null +++ b/src/Protocol/API/MetadataResponse.php @@ -0,0 +1,48 @@ + Type\Int16::class, + 'api_versions' => [ + '_items' => [ + 'api_key' => Type\Int16::class, + 'min_version' => Type\Int16::class, + 'max_version' => Type\Int16::class, + ], + ], + ]; + private const SCHEMA_V1 = self::SCHEMA_V0 + ['throttle_time_ms' => Type\Int32::class]; + + /** @param list $apiVersions */ + public function __construct(public int $errorCode, public array $apiVersions, public int $throttleTime) + { + } + + /** @inheritdoc */ + public static function fromArray(array $data): static + { + return new self( + $data['error_code'], + $data['api_versions'], + $data['throttle_time_ms'] ?? 0, + ); + } + + /** @inheritdoc */ + public static function schemaDefinition(int $version): array + { + return self::SCHEMA_VERSIONS[$version]; + } +} diff --git a/src/Protocol/API/Request.php b/src/Protocol/API/Request.php new file mode 100644 index 00000000..48823f53 --- /dev/null +++ b/src/Protocol/API/Request.php @@ -0,0 +1,34 @@ +parse(static::schemaDefinition($version)); + $data = $this->asArray($version); + + $buffer = Buffer::allocate($schema->sizeOf($data)); + $schema->write($data, $buffer); + + return $buffer; + } + + /** @return array> */ + abstract public static function schemaDefinition(int $version): array; + + /** @return array */ + abstract public function asArray(int $version): array; + + /** @return class-string */ + abstract public function responseClass(): string; +} diff --git a/src/Protocol/API/RequestHeaders.php b/src/Protocol/API/RequestHeaders.php new file mode 100644 index 00000000..90ddd8c8 --- /dev/null +++ b/src/Protocol/API/RequestHeaders.php @@ -0,0 +1,56 @@ + Type\Int16::class, + 'api_version' => Type\Int16::class, + 'correlation_id' => Type\Int32::class, + 'client_id' => Type\NullableString::class, + ]; + + public function __construct( + public int $apiKey, + public int $apiVersion, + public int $correlationId, + public string $client, + private Closure $responseFactory, + ) { + } + + public function toBuffer(Parser $schemaParser): Buffer + { + $schema = $schemaParser->parse(self::SCHEMA); + + $data = [ + 'api_key' => $this->apiKey, + 'api_version' => $this->apiVersion, + 'correlation_id' => $this->correlationId, + 'client_id' => $this->client, + ]; + + $buffer = Buffer::allocate($schema->sizeOf($data)); + $schema->write($data, $buffer); + + return $buffer; + } + + public function parseResponse(Buffer $buffer, Parser $schemaParser): Response + { + $correlationId = $buffer->readInt(); + assert($this->correlationId === $correlationId); + + return ($this->responseFactory)($buffer, $schemaParser, $this->apiVersion); + } +} diff --git a/src/Protocol/API/Response.php b/src/Protocol/API/Response.php new file mode 100644 index 00000000..ad73ca15 --- /dev/null +++ b/src/Protocol/API/Response.php @@ -0,0 +1,23 @@ +parse(static::schemaDefinition($version)); + + return static::fromArray($schema->read($buffer)); + } + + /** @param array $data */ + abstract public static function fromArray(array $data): static; + + /** @return array> */ + abstract public static function schemaDefinition(int $version): array; +} diff --git a/src/Protocol/Buffer.php b/src/Protocol/Buffer.php index 1723f7b2..679982d8 100644 --- a/src/Protocol/Buffer.php +++ b/src/Protocol/Buffer.php @@ -29,8 +29,10 @@ final class Buffer private int $position = 0; - private function __construct(private string $bytes, private int $length) - { + private function __construct( + private string $bytes, + private readonly int $length, + ) { } /** @@ -138,7 +140,7 @@ public function write(string $value): void * * @throws NotEnoughBytesAllocated When trying to read from an invalid position. */ - public function read(int $length): mixed + public function read(int $length): string { $offset = $this->nextIndex($length); diff --git a/src/Protocol/Message.php b/src/Protocol/Message.php new file mode 100644 index 00000000..921fbea3 --- /dev/null +++ b/src/Protocol/Message.php @@ -0,0 +1,8 @@ +|array> $definition */ + public function parse(array $definition): Schema + { + return new Schema(...$this->parseFields($definition)); + } + + /** + * @param array|array> $definition + * + * @return iterable + */ + private function parseFields(array $definition): iterable + { + foreach ($definition as $name => $fieldDefinition) { + yield new Field($name, $this->parseFieldType($fieldDefinition)); + } + } + + /** @param class-string|array $fieldDefinition */ + private function parseFieldType(string|array $fieldDefinition): Type + { + if (is_string($fieldDefinition)) { + return new $fieldDefinition(); + } + + if (array_key_exists('_items', $fieldDefinition)) { + return new Type\ArrayOf( + $this->parseFieldType($fieldDefinition['_items']), + $fieldDefinition['_nullable'] ?? false, + ); + } + + return $this->parse($fieldDefinition); + } +} diff --git a/test.php b/test.php new file mode 100644 index 00000000..9f9636fe --- /dev/null +++ b/test.php @@ -0,0 +1,41 @@ +brokers[random_int(0, 1)] +); + +Loop::addSignal( + SIGINT, + static function () use ($channel): void { + $channel->disconnect(); + + Loop::stop(); + } +); + +$channel->send(new ApiVersionsRequest(), 0, 'producer-test')->then( + static function (Response $response) use ($logger): void { + $logger->debug('Response received', ['response' => $response]); + } +); + +Loop::run(); diff --git a/testing.php b/testing.php new file mode 100644 index 00000000..23920eae --- /dev/null +++ b/testing.php @@ -0,0 +1,119 @@ + [ + 'version' => 2, + 'request' => new Schema(), + 'response' => new Schema($errorCode, $apiVersions, $throttleTime), + ], +]; + +$currentId = 0; +$client = 'testing-' . bin2hex(random_bytes(5)); +$messages = [1 => API_VERSIONS]; + +$parseResponse = function (string $rawContent) use ($schemas, $responseHeader, &$messages): array { + $response = Message::fromContent($rawContent); + $response->readInt(); + $correlationId = $responseHeader->read($response)['correlation_id']; + + $apiKey = $messages[$correlationId]; + unset($messages[$correlationId]); + + $schema = $schemas[$apiKey]['response']; + assert($schema instanceof Schema); + + return [ + 'correlation_id' => $correlationId, + 'content' => $schema->read($response), + ]; +}; + + +$connector = new Connector($loop); + +$connector->connect('localhost:9093')->then( + static function (ConnectionInterface $connection) use ($parseResponse, $loop): void { + $loop->addSignal( + SIGINT, + $func = static function () use ($connection, $loop, &$func): void { + $connection->end(); + $loop->removeSignal(SIGINT, $func); + } + ); + + $connection->on('data', static function (string $chunk) use ($parseResponse, $func): void { + $response = $parseResponse($chunk); + print_r($response); + $func(); + }); + + $connection->on('error', static function (Throwable $e): void { + echo $e->getMessage(), PHP_EOL; + }); + + + $connection->write(hex2bin('0000001c')); + $connection->write(hex2bin('0012000200000001001274657374696e672d64656339303232333833')); + } +)->otherwise( + static function (Throwable $e): void { + echo $e->getMessage(), PHP_EOL; + } +); + +$loop->run(); + +//###################################################### +// +//$producer = new Producer(); +//$producer->send(); +//$producer->close();