Skip to content

Commit

Permalink
blah
Browse files Browse the repository at this point in the history
  • Loading branch information
lcobucci committed Jul 25, 2022
1 parent e15ecaa commit 682e582
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 75 deletions.
81 changes: 58 additions & 23 deletions phpstan-baseline.neon
Original file line number Diff line number Diff line change
@@ -1,9 +1,49 @@
parameters:
ignoreErrors:
-
message: "#^Parameter \\#2 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Buffer\\:\\:unpack\\(\\) expects string, mixed given\\.$#"
count: 5
path: src/Protocol/Buffer.php
message: "#^Cannot use array destructuring on array\\<int\\|string, array\\<int, array\\<int, int\\|Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\Request\\|string\\>\\|int\\|Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\Request\\|React\\\\Promise\\\\Deferred\\|string\\>\\|int\\|React\\\\Promise\\\\Deferred\\>\\|int\\.$#"
count: 1
path: src/Client/Channel.php

-
message: "#^Parameter \\#1 \\$errorCode of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\ApiVersionsResponse constructor expects int, mixed given\\.$#"
count: 1
path: src/Protocol/API/ApiVersionsResponse.php

-
message: "#^Parameter \\#2 \\$apiVersions of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\ApiVersionsResponse constructor expects array\\<int, array\\{api_key\\: int, min_version\\: int, max_version\\: int\\}\\>, mixed given\\.$#"
count: 1
path: src/Protocol/API/ApiVersionsResponse.php

-
message: "#^Parameter \\#3 \\$throttleTime of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\ApiVersionsResponse constructor expects int, mixed given\\.$#"
count: 1
path: src/Protocol/API/ApiVersionsResponse.php

-
message: "#^Parameter \\#1 \\$errorCode of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\MetadataResponse constructor expects int, mixed given\\.$#"
count: 1
path: src/Protocol/API/MetadataResponse.php

-
message: "#^Parameter \\#2 \\$apiVersions of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\MetadataResponse constructor expects array\\<int, array\\{api_key\\: int, min_version\\: int, max_version\\: int\\}\\>, mixed given\\.$#"
count: 1
path: src/Protocol/API/MetadataResponse.php

-
message: "#^Parameter \\#3 \\$throttleTime of class Lcobucci\\\\Kafka\\\\Protocol\\\\API\\\\MetadataResponse constructor expects int, mixed given\\.$#"
count: 1
path: src/Protocol/API/MetadataResponse.php

-
message: "#^Parameter \\#1 \\$definition of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\\\Parser\\:\\:parse\\(\\) expects array\\<string, array\\<string, mixed\\>\\|class\\-string\\<Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\>\\>, array\\<string, array\\<string, mixed\\>\\|string\\> given\\.$#"
count: 1
path: src/Protocol/API/Request.php

-
message: "#^Parameter \\#1 \\$definition of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\\\Parser\\:\\:parse\\(\\) expects array\\<string, array\\<string, mixed\\>\\|class\\-string\\<Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\>\\>, array\\<string, array\\<string, mixed\\>\\|string\\> given\\.$#"
count: 1
path: src/Protocol/API/Response.php

-
message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\:\\:validateField\\(\\) expects array\\<string, mixed\\>, mixed given\\.$#"
Expand All @@ -15,6 +55,21 @@ parameters:
count: 1
path: src/Protocol/Schema.php

-
message: "#^Parameter \\#1 \\$definition of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\\\Parser\\:\\:parse\\(\\) expects array\\<string, array\\<string, mixed\\>\\|class\\-string\\<Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\>\\>, array\\<string, mixed\\> given\\.$#"
count: 1
path: src/Protocol/Schema/Parser.php

-
message: "#^Parameter \\#1 \\$fieldDefinition of method Lcobucci\\\\Kafka\\\\Protocol\\\\Schema\\\\Parser\\:\\:parseFieldType\\(\\) expects array\\<string, mixed\\>\\|class\\-string\\<Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\>, mixed given\\.$#"
count: 1
path: src/Protocol/Schema/Parser.php

-
message: "#^Parameter \\#2 \\$nullable of class Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\\\ArrayOf constructor expects bool, mixed given\\.$#"
count: 1
path: src/Protocol/Schema/Parser.php

-
message: "#^Parameter \\#3 \\.\\.\\.\\$values of function sprintf expects bool\\|float\\|int\\|string\\|null, mixed given\\.$#"
count: 1
Expand Down Expand Up @@ -65,11 +120,6 @@ parameters:
count: 1
path: src/Protocol/Type/Int8.php

-
message: "#^Parameter \\#1 \\$content of static method Lcobucci\\\\Kafka\\\\Protocol\\\\Buffer\\:\\:fromContent\\(\\) expects string, mixed given\\.$#"
count: 1
path: src/Protocol/Type/NonNullableBytes.php

-
message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\:\\:guardClass\\(\\) expects object\\|null, mixed given\\.$#"
count: 1
Expand All @@ -80,11 +130,6 @@ parameters:
count: 2
path: src/Protocol/Type/NonNullableBytes.php

-
message: "#^Method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\\\NonNullableString\\:\\:read\\(\\) should return string but returns mixed\\.$#"
count: 1
path: src/Protocol/Type/NonNullableString.php

-
message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\:\\:guardLength\\(\\) expects string\\|null, mixed given\\.$#"
count: 1
Expand All @@ -100,21 +145,11 @@ parameters:
count: 1
path: src/Protocol/Type/NonNullableString.php

-
message: "#^Parameter \\#1 \\$content of static method Lcobucci\\\\Kafka\\\\Protocol\\\\Buffer\\:\\:fromContent\\(\\) expects string, mixed given\\.$#"
count: 1
path: src/Protocol/Type/NullableBytes.php

-
message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\:\\:guardClass\\(\\) expects object\\|null, mixed given\\.$#"
count: 1
path: src/Protocol/Type/NullableBytes.php

-
message: "#^Method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\\\NullableString\\:\\:read\\(\\) should return string\\|null but returns mixed\\.$#"
count: 1
path: src/Protocol/Type/NullableString.php

-
message: "#^Parameter \\#1 \\$data of method Lcobucci\\\\Kafka\\\\Protocol\\\\Type\\:\\:guardLength\\(\\) expects string\\|null, mixed given\\.$#"
count: 1
Expand Down
17 changes: 8 additions & 9 deletions src/Client/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

namespace Lcobucci\Kafka\Client;

use Closure;
use Lcobucci\Kafka\Node;
use Lcobucci\Kafka\Protocol\API\Request;
use Lcobucci\Kafka\Protocol\API\RequestHeaders;
Expand All @@ -25,9 +24,9 @@

final class Channel
{
/** @var SplPriorityQueue<int, array{0: array{0: Request, 1: int, 2: string}, 1: Deferred}> */
/** @var SplPriorityQueue<int, array{array{Request, int, string}, Deferred}> */
private SplPriorityQueue $processingQueue;
/** @var SplQueue<array{0: RequestHeaders, 1: Deferred}> */
/** @var SplQueue<array{RequestHeaders, Deferred}> */
private SplQueue $inFlightQueue;
private ?ConnectionInterface $connection = null;
private readonly LoopInterface $loop;
Expand All @@ -36,9 +35,9 @@ public function __construct(
private ConnectorInterface $connector,
private LoggerInterface $logger,
private Parser $schemaParser,
private Node $node
private Node $node,
) {
$this->loop = Loop::get();
$this->loop = Loop::get();
$this->processingQueue = new SplPriorityQueue();
$this->inFlightQueue = new SplQueue();
}
Expand Down Expand Up @@ -71,11 +70,11 @@ private function connect(): void
function (Throwable $throwable): void {
$this->logger->error(
'Error while connecting to node #' . $this->node->id,
['node' => $this->node, 'exception' => $throwable]
['node' => $this->node, 'exception' => $throwable],
);

$this->loop->addTimer(1, $this->connect(...));
}
},
);
}

Expand Down Expand Up @@ -107,7 +106,7 @@ public function processQueue(): void
$this->inFlightQueue->enqueue([$headers, $deferred]);
}

$this->loop->futureTick([$this, 'processQueue']);
$this->loop->futureTick($this->processQueue(...));
}

private function sendMessage(Request $request, int $correlation, string $client): RequestHeaders
Expand All @@ -117,7 +116,7 @@ private function sendMessage(Request $request, int $correlation, string $client)
$request->highestSupportedVersion(),
$correlation,
$client,
$request->responseClass()::parse(...)
$request->responseClass()::parse(...),
);

$header = $headers->toBuffer($this->schemaParser);
Expand Down
2 changes: 1 addition & 1 deletion src/Cluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ static function (string $server) use (&$id): Node {

return new Node((string) --$id, $host, (int) $port, null);
},
explode(',', $servers)
explode(',', $servers),
);

return new self(null, $brokers);
Expand Down
21 changes: 1 addition & 20 deletions src/Producer/Record.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,9 @@

final class Record
{
private string $topic;

private mixed $value;

private mixed $key = null;

/** @var string[]|null */
private ?array $headers = null;

private ?int $partition = null;

private ?int $timestamp = null;

/** @param string[]|null $headers */
public function __construct(string $topic, mixed $value, mixed $key = null, ?array $headers = null, ?int $partition = null, ?int $timestamp = null)
public function __construct(private string $topic, private mixed $value, private mixed $key = null, private ?array $headers = null, private ?int $partition = null, private ?int $timestamp = null)
{
$this->topic = $topic;
$this->key = $key;
$this->value = $value;
$this->headers = $headers;
$this->partition = $partition;
$this->timestamp = $timestamp;
}

public function topic(): string
Expand Down
3 changes: 0 additions & 3 deletions src/Protocol/API/ApiVersionsRequest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@

namespace Lcobucci\Kafka\Protocol\API;

use Composer\InstalledVersions;
use PackageVersions\Versions;

use function array_key_last;

final class ApiVersionsRequest extends Request
Expand Down
7 changes: 3 additions & 4 deletions src/Protocol/API/ApiVersionsResponse.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

use Lcobucci\Kafka\Protocol\Type;

use function array_key_last;

/** @phpstan-type ApiVersion array{api_key: int, min_version: int, max_version: int} */
final class ApiVersionsResponse extends Response
{
private const SCHEMA_VERSIONS = [
Expand All @@ -28,7 +27,7 @@ final class ApiVersionsResponse extends Response

private const SCHEMA_V1 = self::SCHEMA_V0 + ['throttle_time_ms' => Type\Int32::class];

/** @param list<array{api_key: int, min_version: int, max_version: int}> $apiVersions */
/** @param list<ApiVersion> $apiVersions */
public function __construct(public int $errorCode, public array $apiVersions, public int $throttleTime)
{
}
Expand All @@ -39,7 +38,7 @@ public static function fromArray(array $data): static
return new self(
$data['error_code'],
$data['api_versions'],
$data['throttle_time_ms'] ?? 0
$data['throttle_time_ms'] ?? 0,
);
}

Expand Down
2 changes: 1 addition & 1 deletion src/Protocol/API/MetadataResponse.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public static function fromArray(array $data): static
return new self(
$data['error_code'],
$data['api_versions'],
$data['throttle_time_ms'] ?? 0
$data['throttle_time_ms'] ?? 0,
);
}

Expand Down
2 changes: 1 addition & 1 deletion src/Protocol/API/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public function toBuffer(Parser $schemaParser, int $version): Buffer
/** @return array<string, string|array<string, mixed>> */
abstract public static function schemaDefinition(int $version): array;

/** @return array<string, mixed> $data */
/** @return array<string, mixed> */
abstract public function asArray(int $version): array;

/** @return class-string<Response> */
Expand Down
2 changes: 1 addition & 1 deletion src/Protocol/API/RequestHeaders.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public function __construct(
public int $apiVersion,
public int $correlationId,
public string $client,
private Closure $responseFactory
private Closure $responseFactory,
) {
}

Expand Down
2 changes: 1 addition & 1 deletion src/Protocol/API/Response.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

abstract class Response
{
public static function parse(Buffer $buffer, Parser $schemaParser, int $version): self
public static function parse(Buffer $buffer, Parser $schemaParser, int $version): static
{
$schema = $schemaParser->parse(static::schemaDefinition($version));

Expand Down
2 changes: 1 addition & 1 deletion src/Protocol/Buffer.php
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public function write(string $value): void
*
* @throws NotEnoughBytesAllocated When trying to read from an invalid position.
*/
public function read(int $length): mixed
public function read(int $length): string
{
$offset = $this->nextIndex($length);

Expand Down
8 changes: 4 additions & 4 deletions src/Protocol/Schema/Parser.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@

final class Parser
{
/** @param array<string, string|array<string, mixed>> $definition */
/** @param array<string, class-string<Type>|array<string, mixed>> $definition */
public function parse(array $definition): Schema
{
return new Schema(...$this->parseFields($definition));
}

/**
* @param array<string, string|array<string, mixed>> $definition
* @param array<string, class-string<Type>|array<string, mixed>> $definition
*
* @return iterable<Field>
*/
Expand All @@ -29,7 +29,7 @@ private function parseFields(array $definition): iterable
}
}

/** @param string|array<string, mixed> $fieldDefinition */
/** @param class-string<Type>|array<string, mixed> $fieldDefinition */
private function parseFieldType(string|array $fieldDefinition): Type
{
if (is_string($fieldDefinition)) {
Expand All @@ -39,7 +39,7 @@ private function parseFieldType(string|array $fieldDefinition): Type
if (array_key_exists('_items', $fieldDefinition)) {
return new Type\ArrayOf(
$this->parseFieldType($fieldDefinition['_items']),
$fieldDefinition['_nullable'] ?? false
$fieldDefinition['_nullable'] ?? false,
);
}

Expand Down
11 changes: 5 additions & 6 deletions test.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@

require 'vendor/autoload.php';

$loop = Loop::get();
$logger = new Logger('test-channel', [new StreamHandler('php://stderr')]);
$logger = new Logger('test-channel', [new StreamHandler(STDERR)]);

$cluster = Cluster::bootstrap('localhost:9093,localhost:9094');
$channel = new Channel(
Expand All @@ -24,12 +23,12 @@
$cluster->brokers[random_int(0, 1)]
);

$loop->addSignal(
Loop::addSignal(
SIGINT,
$listener = static function () use (&$listener, $loop, $channel): void {
static function () use ($channel): void {
$channel->disconnect();

$loop->removeSignal(SIGINT, $listener);
Loop::stop();
}
);

Expand All @@ -39,4 +38,4 @@ static function (Response $response) use ($logger): void {
}
);

$loop->run();
Loop::run();

0 comments on commit 682e582

Please sign in to comment.