From 682e58267b86ea687d6b6048f1a0de1d2de5abb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=ADs=20Cobucci?= Date: Mon, 25 Jul 2022 21:41:06 +0200 Subject: [PATCH] blah --- phpstan-baseline.neon | 81 +++++++++++++++++------- src/Client/Channel.php | 17 +++-- src/Cluster.php | 2 +- src/Producer/Record.php | 21 +----- src/Protocol/API/ApiVersionsRequest.php | 3 - src/Protocol/API/ApiVersionsResponse.php | 7 +- src/Protocol/API/MetadataResponse.php | 2 +- src/Protocol/API/Request.php | 2 +- src/Protocol/API/RequestHeaders.php | 2 +- src/Protocol/API/Response.php | 2 +- src/Protocol/Buffer.php | 2 +- src/Protocol/Schema/Parser.php | 8 +-- test.php | 11 ++-- 13 files changed, 85 insertions(+), 75 deletions(-) diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index 03d25a3d..da54f3ea 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -1,9 +1,49 @@ parameters: ignoreErrors: - - message: "#^Parameter \\#2 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Buffer\\:\\:unpack\\(\\) expects string, mixed given\\.$#" - count: 5 - path: src/Protocol/Buffer.php + message: "#^Cannot use array destructuring on array\\\\|int\\|Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\Request\\|React\\\\Promise\\\\Deferred\\|string\\>\\|int\\|React\\\\Promise\\\\Deferred\\>\\|int\\.$#" + count: 1 + path: src/Client/Channel.php + + - + message: "#^Parameter \\#1 \\$errorCode of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\ApiVersionsResponse constructor expects int, mixed given\\.$#" + count: 1 + path: src/Protocol/API/ApiVersionsResponse.php + + - + message: "#^Parameter \\#2 \\$apiVersions of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\ApiVersionsResponse constructor expects array\\, mixed given\\.$#" + count: 1 + path: src/Protocol/API/ApiVersionsResponse.php + + - + message: "#^Parameter \\#3 \\$throttleTime of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\ApiVersionsResponse constructor expects int, mixed given\\.$#" + count: 1 + path: src/Protocol/API/ApiVersionsResponse.php + + - + message: "#^Parameter \\#1 \\$errorCode of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\MetadataResponse constructor expects int, mixed given\\.$#" + count: 1 + path: src/Protocol/API/MetadataResponse.php + + - + message: "#^Parameter \\#2 \\$apiVersions of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\MetadataResponse constructor expects array\\, mixed given\\.$#" + count: 1 + path: src/Protocol/API/MetadataResponse.php + + - + message: "#^Parameter \\#3 \\$throttleTime of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\MetadataResponse constructor expects int, mixed given\\.$#" + count: 1 + path: src/Protocol/API/MetadataResponse.php + + - + message: "#^Parameter \\#1 \\$definition of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\\\Parser\\:\\:parse\\(\\) expects array\\\\|class\\-string\\\\>, array\\\\|string\\> given\\.$#" + count: 1 + path: src/Protocol/API/Request.php + + - + message: "#^Parameter \\#1 \\$definition of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\\\Parser\\:\\:parse\\(\\) expects array\\\\|class\\-string\\\\>, array\\\\|string\\> given\\.$#" + count: 1 + path: src/Protocol/API/Response.php - message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\:\\:validateField\\(\\) expects array\\, mixed given\\.$#" @@ -15,6 +55,21 @@ parameters: count: 1 path: src/Protocol/Schema.php + - + message: "#^Parameter \\#1 \\$definition of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\\\Parser\\:\\:parse\\(\\) expects array\\\\|class\\-string\\\\>, array\\ given\\.$#" + count: 1 + path: src/Protocol/Schema/Parser.php + + - + message: "#^Parameter \\#1 \\$fieldDefinition of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\\\Parser\\:\\:parseFieldType\\(\\) expects array\\\\|class\\-string\\, mixed given\\.$#" + count: 1 + path: src/Protocol/Schema/Parser.php + + - + message: "#^Parameter \\#2 \\$nullable of class Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\\\ArrayOf constructor expects bool, mixed given\\.$#" + count: 1 + path: src/Protocol/Schema/Parser.php + - message: "#^Parameter \\#3 \\.\\.\\.\\$values of function sprintf expects bool\\|float\\|int\\|string\\|null, mixed given\\.$#" count: 1 @@ -65,11 +120,6 @@ parameters: count: 1 path: src/Protocol/Type/Int8.php - - - message: "#^Parameter \\#1 \\$content of static method Lcobucci\\\\Kafka\\\\Protocol\\\\Buffer\\:\\:fromContent\\(\\) expects string, mixed given\\.$#" - count: 1 - path: src/Protocol/Type/NonNullableBytes.php - - message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\:\\:guardClass\\(\\) expects object\\|null, mixed given\\.$#" count: 1 @@ -80,11 +130,6 @@ parameters: count: 2 path: src/Protocol/Type/NonNullableBytes.php - - - message: "#^Method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\\\NonNullableString\\:\\:read\\(\\) should return string but returns mixed\\.$#" - count: 1 - path: src/Protocol/Type/NonNullableString.php - - message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\:\\:guardLength\\(\\) expects string\\|null, mixed given\\.$#" count: 1 @@ -100,21 +145,11 @@ parameters: count: 1 path: src/Protocol/Type/NonNullableString.php - - - message: "#^Parameter \\#1 \\$content of static method Lcobucci\\\\Kafka\\\\Protocol\\\\Buffer\\:\\:fromContent\\(\\) expects string, mixed given\\.$#" - count: 1 - path: src/Protocol/Type/NullableBytes.php - - message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\:\\:guardClass\\(\\) expects object\\|null, mixed given\\.$#" count: 1 path: src/Protocol/Type/NullableBytes.php - - - message: "#^Method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\\\NullableString\\:\\:read\\(\\) should return string\\|null but returns mixed\\.$#" - count: 1 - path: src/Protocol/Type/NullableString.php - - message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\:\\:guardLength\\(\\) expects string\\|null, mixed given\\.$#" count: 1 diff --git a/src/Client/Channel.php b/src/Client/Channel.php index f17a613a..1e7073d2 100644 --- a/src/Client/Channel.php +++ b/src/Client/Channel.php @@ -3,7 +3,6 @@ namespace Lcobucci\Kafka\Client; -use Closure; use Lcobucci\Kafka\Node; use Lcobucci\Kafka\Protocol\API\Request; use Lcobucci\Kafka\Protocol\API\RequestHeaders; @@ -25,9 +24,9 @@ final class Channel { - /** @var SplPriorityQueue */ + /** @var SplPriorityQueue */ private SplPriorityQueue $processingQueue; - /** @var SplQueue */ + /** @var SplQueue */ private SplQueue $inFlightQueue; private ?ConnectionInterface $connection = null; private readonly LoopInterface $loop; @@ -36,9 +35,9 @@ public function __construct( private ConnectorInterface $connector, private LoggerInterface $logger, private Parser $schemaParser, - private Node $node + private Node $node, ) { - $this->loop = Loop::get(); + $this->loop = Loop::get(); $this->processingQueue = new SplPriorityQueue(); $this->inFlightQueue = new SplQueue(); } @@ -71,11 +70,11 @@ private function connect(): void function (Throwable $throwable): void { $this->logger->error( 'Error while connecting to node #' . $this->node->id, - ['node' => $this->node, 'exception' => $throwable] + ['node' => $this->node, 'exception' => $throwable], ); $this->loop->addTimer(1, $this->connect(...)); - } + }, ); } @@ -107,7 +106,7 @@ public function processQueue(): void $this->inFlightQueue->enqueue([$headers, $deferred]); } - $this->loop->futureTick([$this, 'processQueue']); + $this->loop->futureTick($this->processQueue(...)); } private function sendMessage(Request $request, int $correlation, string $client): RequestHeaders @@ -117,7 +116,7 @@ private function sendMessage(Request $request, int $correlation, string $client) $request->highestSupportedVersion(), $correlation, $client, - $request->responseClass()::parse(...) + $request->responseClass()::parse(...), ); $header = $headers->toBuffer($this->schemaParser); diff --git a/src/Cluster.php b/src/Cluster.php index 04cbd7fc..a86a1a0e 100644 --- a/src/Cluster.php +++ b/src/Cluster.php @@ -22,7 +22,7 @@ static function (string $server) use (&$id): Node { return new Node((string) --$id, $host, (int) $port, null); }, - explode(',', $servers) + explode(',', $servers), ); return new self(null, $brokers); diff --git a/src/Producer/Record.php b/src/Producer/Record.php index 406e4220..52128dd7 100644 --- a/src/Producer/Record.php +++ b/src/Producer/Record.php @@ -5,28 +5,9 @@ final class Record { - private string $topic; - - private mixed $value; - - private mixed $key = null; - - /** @var string[]|null */ - private ?array $headers = null; - - private ?int $partition = null; - - private ?int $timestamp = null; - /** @param string[]|null $headers */ - public function __construct(string $topic, mixed $value, mixed $key = null, ?array $headers = null, ?int $partition = null, ?int $timestamp = null) + public function __construct(private string $topic, private mixed $value, private mixed $key = null, private ?array $headers = null, private ?int $partition = null, private ?int $timestamp = null) { - $this->topic = $topic; - $this->key = $key; - $this->value = $value; - $this->headers = $headers; - $this->partition = $partition; - $this->timestamp = $timestamp; } public function topic(): string diff --git a/src/Protocol/API/ApiVersionsRequest.php b/src/Protocol/API/ApiVersionsRequest.php index e3918aac..78603a9a 100644 --- a/src/Protocol/API/ApiVersionsRequest.php +++ b/src/Protocol/API/ApiVersionsRequest.php @@ -3,9 +3,6 @@ namespace Lcobucci\Kafka\Protocol\API; -use Composer\InstalledVersions; -use PackageVersions\Versions; - use function array_key_last; final class ApiVersionsRequest extends Request diff --git a/src/Protocol/API/ApiVersionsResponse.php b/src/Protocol/API/ApiVersionsResponse.php index 38e72a79..f187dd4e 100644 --- a/src/Protocol/API/ApiVersionsResponse.php +++ b/src/Protocol/API/ApiVersionsResponse.php @@ -5,8 +5,7 @@ use Lcobucci\Kafka\Protocol\Type; -use function array_key_last; - +/** @phpstan-type ApiVersion array{api_key: int, min_version: int, max_version: int} */ final class ApiVersionsResponse extends Response { private const SCHEMA_VERSIONS = [ @@ -28,7 +27,7 @@ final class ApiVersionsResponse extends Response private const SCHEMA_V1 = self::SCHEMA_V0 + ['throttle_time_ms' => Type\Int32::class]; - /** @param list $apiVersions */ + /** @param list $apiVersions */ public function __construct(public int $errorCode, public array $apiVersions, public int $throttleTime) { } @@ -39,7 +38,7 @@ public static function fromArray(array $data): static return new self( $data['error_code'], $data['api_versions'], - $data['throttle_time_ms'] ?? 0 + $data['throttle_time_ms'] ?? 0, ); } diff --git a/src/Protocol/API/MetadataResponse.php b/src/Protocol/API/MetadataResponse.php index a466826f..d5c6a015 100644 --- a/src/Protocol/API/MetadataResponse.php +++ b/src/Protocol/API/MetadataResponse.php @@ -36,7 +36,7 @@ public static function fromArray(array $data): static return new self( $data['error_code'], $data['api_versions'], - $data['throttle_time_ms'] ?? 0 + $data['throttle_time_ms'] ?? 0, ); } diff --git a/src/Protocol/API/Request.php b/src/Protocol/API/Request.php index 6f34d7d7..48823f53 100644 --- a/src/Protocol/API/Request.php +++ b/src/Protocol/API/Request.php @@ -26,7 +26,7 @@ public function toBuffer(Parser $schemaParser, int $version): Buffer /** @return array> */ abstract public static function schemaDefinition(int $version): array; - /** @return array $data */ + /** @return array */ abstract public function asArray(int $version): array; /** @return class-string */ diff --git a/src/Protocol/API/RequestHeaders.php b/src/Protocol/API/RequestHeaders.php index 26815287..90ddd8c8 100644 --- a/src/Protocol/API/RequestHeaders.php +++ b/src/Protocol/API/RequestHeaders.php @@ -25,7 +25,7 @@ public function __construct( public int $apiVersion, public int $correlationId, public string $client, - private Closure $responseFactory + private Closure $responseFactory, ) { } diff --git a/src/Protocol/API/Response.php b/src/Protocol/API/Response.php index 4627d54f..ad73ca15 100644 --- a/src/Protocol/API/Response.php +++ b/src/Protocol/API/Response.php @@ -8,7 +8,7 @@ abstract class Response { - public static function parse(Buffer $buffer, Parser $schemaParser, int $version): self + public static function parse(Buffer $buffer, Parser $schemaParser, int $version): static { $schema = $schemaParser->parse(static::schemaDefinition($version)); diff --git a/src/Protocol/Buffer.php b/src/Protocol/Buffer.php index 759fc395..679982d8 100644 --- a/src/Protocol/Buffer.php +++ b/src/Protocol/Buffer.php @@ -140,7 +140,7 @@ public function write(string $value): void * * @throws NotEnoughBytesAllocated When trying to read from an invalid position. */ - public function read(int $length): mixed + public function read(int $length): string { $offset = $this->nextIndex($length); diff --git a/src/Protocol/Schema/Parser.php b/src/Protocol/Schema/Parser.php index b698a295..e3bc253f 100644 --- a/src/Protocol/Schema/Parser.php +++ b/src/Protocol/Schema/Parser.php @@ -11,14 +11,14 @@ final class Parser { - /** @param array> $definition */ + /** @param array|array> $definition */ public function parse(array $definition): Schema { return new Schema(...$this->parseFields($definition)); } /** - * @param array> $definition + * @param array|array> $definition * * @return iterable */ @@ -29,7 +29,7 @@ private function parseFields(array $definition): iterable } } - /** @param string|array $fieldDefinition */ + /** @param class-string|array $fieldDefinition */ private function parseFieldType(string|array $fieldDefinition): Type { if (is_string($fieldDefinition)) { @@ -39,7 +39,7 @@ private function parseFieldType(string|array $fieldDefinition): Type if (array_key_exists('_items', $fieldDefinition)) { return new Type\ArrayOf( $this->parseFieldType($fieldDefinition['_items']), - $fieldDefinition['_nullable'] ?? false + $fieldDefinition['_nullable'] ?? false, ); } diff --git a/test.php b/test.php index d4a5966f..9f9636fe 100644 --- a/test.php +++ b/test.php @@ -13,8 +13,7 @@ require 'vendor/autoload.php'; -$loop = Loop::get(); -$logger = new Logger('test-channel', [new StreamHandler('php://stderr')]); +$logger = new Logger('test-channel', [new StreamHandler(STDERR)]); $cluster = Cluster::bootstrap('localhost:9093,localhost:9094'); $channel = new Channel( @@ -24,12 +23,12 @@ $cluster->brokers[random_int(0, 1)] ); -$loop->addSignal( +Loop::addSignal( SIGINT, - $listener = static function () use (&$listener, $loop, $channel): void { + static function () use ($channel): void { $channel->disconnect(); - $loop->removeSignal(SIGINT, $listener); + Loop::stop(); } ); @@ -39,4 +38,4 @@ static function (Response $response) use ($logger): void { } ); -$loop->run(); +Loop::run();