Skip to content

Commit

Permalink
send socket messages in chunks, test larger messages
Browse files Browse the repository at this point in the history
  • Loading branch information
matthi4s committed Jan 26, 2024
1 parent d3a854b commit 0cf1e57
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 7 deletions.
41 changes: 34 additions & 7 deletions src/Communication/Socket/Socket.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Aternos\Taskmaster\Communication\MessageInterface;
use Aternos\Taskmaster\Communication\Socket\Exception\SocketReadException;
use Aternos\Taskmaster\Communication\Socket\Exception\SocketWriteException;
use Aternos\Taskmaster\Taskmaster;
use Generator;

/**
Expand All @@ -22,6 +23,8 @@ class Socket implements SocketInterface, SelectableSocketInterface
*/
protected mixed $socket;

protected string $receiveBuffer = "";

/**
* @param resource|Socket $socket
*/
Expand Down Expand Up @@ -50,6 +53,13 @@ public function receiveMessages(): Generator
{
foreach ($this->receiveRaw() as $data) {
yield unserialize($data);
$error = error_get_last();
if ($error && str_starts_with($error["message"], "unserialize(): Error at offset")) {
fwrite(STDERR, getmypid() . ": Unserialize error: " . $error["message"] . PHP_EOL);
fwrite(STDERR, getmypid() . ": Unserialize length: " . strlen($data) . PHP_EOL);
fwrite(STDERR, getmypid() . ": Unserialize data: " . $data . PHP_EOL);
error_clear_last();
}
}
}

Expand All @@ -62,10 +72,23 @@ public function receiveRaw(): Generator
if (!is_resource($this->socket) || feof($this->socket)) {
throw new SocketReadException("Could not read from socket.");
}
$result = fgets($this->socket);
$result = $this->receiveBuffer;
do {
$chunk = fgets($this->socket, 10_001);
if ($chunk === false || strlen($chunk) === 0) {
break;
}

$result .= $chunk;
} while (!str_ends_with($result, PHP_EOL));
if (!$result) {
break;
}
if (!str_ends_with($result, PHP_EOL)) {
$this->receiveBuffer = $result;
break;
}
$this->receiveBuffer = "";
$decoded = base64_decode($result);
yield $decoded;
} while (true);
Expand Down Expand Up @@ -102,18 +125,22 @@ public function sendRaw(string $data): bool
}
$data = base64_encode($data);
$data .= PHP_EOL;
$total = 0;
$expected = strlen($data);
$current = 0;
$total = strlen($data);
do {
if (!is_resource($this->socket) || feof($this->socket)) {
throw new SocketWriteException("Could not write to socket.");
}
$result = @fwrite($this->socket, $data);
if ($result === false || $result === 0) {
$chunk = substr($data, $current, 10_000);
$result = @fwrite($this->socket, $chunk);
if ($result === false) {
throw new SocketWriteException("Could not write to socket.");
}
$total += $result;
} while ($total < $expected);
if ($result === 0) {
usleep(Taskmaster::SOCKET_WAIT_TIME);
}
$current += $result;
} while ($current < $total);
return true;
}

Expand Down
9 changes: 9 additions & 0 deletions test/Integration/WorkerTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Aternos\Taskmaster\Test\Util\Task\CallbackTask;
use Aternos\Taskmaster\Test\Util\Task\ChildExceptionTask;
use Aternos\Taskmaster\Test\Util\Task\EmptyTask;
use Aternos\Taskmaster\Test\Util\Task\LargeTask;
use Aternos\Taskmaster\Test\Util\Task\ParentExceptionTask;
use Aternos\Taskmaster\Test\Util\Task\SynchronizedFieldTask;
use Aternos\Taskmaster\Test\Util\Task\UnsynchronizedFieldTask;
Expand Down Expand Up @@ -68,6 +69,14 @@ public function testGetTaskResult(): void
$this->assertEquals(3, $task->getResult());
}

public function testRunLargeTask(): void
{
$task = new LargeTask(1_000_000);
$this->taskmaster->runTask($task);
$this->taskmaster->wait();
$this->assertEquals(1_000_000, strlen($task->getResult()));
}

public function testGetTaskResultFromPromise(): void
{
$task = new AdditionTask(1, 2);
Expand Down
25 changes: 25 additions & 0 deletions test/Util/Task/LargeTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

namespace Aternos\Taskmaster\Test\Util\Task;

use Aternos\Taskmaster\Task\OnBoth;
use Aternos\Taskmaster\Task\OnChild;
use Aternos\Taskmaster\Task\Task;

class LargeTask extends Task
{
#[OnBoth] protected string $data;

/**
* @param int $length
*/
public function __construct(int $length = 100_000)
{
$this->data = str_repeat("T", $length);
}

#[OnChild] public function run()
{
return $this->data;
}
}

0 comments on commit 0cf1e57

Please sign in to comment.