Skip to content

Commit

Permalink
update docs, doc-blocks, test Channel buffered
Browse files Browse the repository at this point in the history
  • Loading branch information
TheTechsTech committed Apr 29, 2021
1 parent 875231a commit cfd11f3
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 46 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ This package is part of our [symplely/coroutine](https://symplely.github.io/coro

To learn more about **libuv** features read the online tutorial [book](https://nikhilm.github.io/uvbook/index.html).

The terminology in this version **3x** was changed to be inline with [`ext-parallel`](https://www.php.net/manual/en/book.parallel.php) extension usage, and to behave as a `Thread`, but without many of the extension's limitations.

The `Channeled` and `Future` classes are both designed in a way to be extend from to create your own **implementation** of a `Parallel` based library. Currently `libuv` will be required to get full benefits of the implementation.

## Installation

```cmd
Expand Down Expand Up @@ -67,6 +71,9 @@ include 'vendor/autoload.php';

use Async\Spawn\Spawn;

// Shows output by default and Channel instance is extracted for args.
$future = \parallel($function, ...$args)
// Or Does not show output by default and channel instance has to be explicitly passed ins.
$future = \spawn($function, $timeout, $channel)
// Or
$future = Spawn::create(function () use ($thing) {
Expand Down
90 changes: 54 additions & 36 deletions Spawn/Channeled.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public function __destruct()
$this->open = false;
$this->channel = null;
$this->process = null;
//unset($this->buffered);
//$this->buffered = null;
unset($this->buffered);
$this->buffered = null;
}

public function __construct(
Expand All @@ -70,7 +70,7 @@ public function __construct(

$this->type = empty($capacity) ? 'unbuffered' : 'buffered';
$this->capacity = $capacity;
//$this->buffered = new \SplQueue;
$this->buffered = new \SplQueue;

if ($anonymous) {
self::$anonymous++;
Expand Down Expand Up @@ -211,54 +211,72 @@ public static function isChannel($name): bool

public static function isMessenger($message): bool
{
return \is_array($message) && isset($message[1]) && ($message[1] === 'message' || $message[1] === 'closures');
return \is_array($message) && isset($message[1])
&& ($message[1] === '___message' || $message[1] === 'closures');
}

public function send($value): void
{
if ($this->isClosed())
$this->throwClosed(\sprintf('channel(%s) closed', $this->name));
global $___channeled___;

$messaging = 'message';
if (null !== $value && $this->process instanceof \UVProcess) {
$channelInput = $this->channel->getStdio()[0];
$future = $this->channel;
if (!$future->isStarted()) {
$future->start();
if ($future->getChannelState() === Future::STATE[3])
$future->channelState(1);
}
if (
!isset($___channeled___) && null !== $value && $this->process === null
&& !\is_resource($value) && ($this->capacity > $this->buffered->count()) && $this->type === 'buffered'
) {
$this->buffered->enqueue($value);
} else {
if ($this->isClosed())
$this->throwClosed(\sprintf('channel(%s) closed', $this->name));

$messaging = '___message';
if (null !== $value && $this->process instanceof \UVProcess) {
$channelInput = $this->channel->getStdio()[0];
$future = $this->channel;
if (!$future->isStarted()) {
$future->start();
if ($future->getChannelState() === Future::STATE[3])
$future->channelState(1);
}

$checkState = $future->isChanneling();
if ($checkState)
$future->channelAdd();
$checkState = $future->isChanneling();
if ($checkState)
$future->channelAdd();

\uv_write(
$channelInput,
\serializer([$value, $messaging]) . \EOL,
function () use ($future, $checkState) {
if ($checkState)
$future->channelRemove();
}
);

\uv_write(
$channelInput,
\serializer([$value, $messaging]) . \EOL,
function () use ($future, $checkState) {
if ($checkState)
$future->channelRemove();
if ($checkState)
$future->channelTick();
} elseif (null !== $value && ($this->state === 'process' || \is_resource($value))) {
$this->input[] = self::validateInput(__METHOD__, $value);
} elseif (null !== $value) {
if (!\is_resource($this->futureOutput)) {
$this->futureOutput = \STDOUT;
\stream_set_write_buffer($this->futureOutput, 0);
}
);

if ($checkState)
$future->channelTick();
} elseif (null !== $value && ($this->state === 'process' || \is_resource($value))) {
$this->input[] = self::validateInput(__METHOD__, $value);
} elseif (null !== $value) {
if (!\is_resource($this->futureOutput)) {
$this->futureOutput = \STDOUT;
\stream_set_write_buffer($this->futureOutput, 0);
\fwrite($this->futureOutput, \serializer([$value, $messaging]));
\usleep(1500);
}

\fwrite($this->futureOutput, \serializer([$value, $messaging]));
\usleep(1100);
}
}

public function recv()
{
global $___channeled___;

if (
!isset($___channeled___) && $this->type === 'buffered'
&& $this->process === null && !$this->buffered->isEmpty()
)
return $this->buffered->dequeue();

if ($this->isClosed())
$this->throwClosed(\sprintf('channel(%s) closed', $this->name));

Expand Down
18 changes: 18 additions & 0 deletions Spawn/Core.php
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@
*
* @return FutureInterface
* @throws LogicException In case the `Future` process is already running.
* @see https://www.php.net/manual/en/parallel.run.php
*/
function spawn(
$executable,
Expand All @@ -310,6 +311,17 @@ function spawn(
return Spawn::create($executable, $timeout, $channel, $isYield);
}

/**
* Create an Future `sub/child` process **task**.
* This function exists to give same behavior as **parallel\run** of `ext-parallel` extension,
* but without any of the it limitations.
*
* @param callable $task
* @param Channeled|mixed|null ...$argv - if a `Channel` instance is passed, it wil be used to set `Future` **IPC/CSP** handler.
*
* @return FutureInterface
* @see https://www.php.net/manual/en/parallel.run.php
*/
function parallel($task, ...$argv): FutureInterface
{
global $___parallel___;
Expand Down Expand Up @@ -388,6 +400,7 @@ function spawn_channel(): Channeled
* @param SerializableClosure|string $task
*
* @return callable|object
* @see https://opis.io/closure/3.x/context.html
*
* @codeCoverageIgnore
*/
Expand All @@ -402,6 +415,7 @@ function spawn_decode(string $task)
* @param callable $task
*
* @return string
* @see https://opis.io/closure/3.x/context.html
*
* @codeCoverageIgnore
*/
Expand All @@ -423,6 +437,10 @@ function get_globals(array $vars): array
return $global;
}

/**
* Returns an array of all `user defined` global variables, without `super globals`.
* @return array
*/
function parallel_globals(): array
{
return \get_globals(get_defined_vars());
Expand Down
21 changes: 11 additions & 10 deletions tests/ChanneledTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ function ($type, $data) use ($ipc) {

\spawn_run($future);

//$this->assertSame('pingpangpong', $future->getOutput());
$this->assertSame('pingpangpong', $future->getOutput());
$this->assertSame('pong', $future->getLast());
$this->assertSame(9, \spawn_result($future));
}
Expand Down Expand Up @@ -253,7 +253,7 @@ public function testChannelDelclaredInsideObjectProperties()
$this->expectOutputString('OK');
$channel->send($foo);
}
/*

public function testChannelDrains()
{
$chan = Channel::make("hi", 10001);
Expand All @@ -265,17 +265,18 @@ public function testChannelDrains()

$chan->close();

$counter = 0;
while (($value = $chan->recv()) > -1) {
var_dump($value);
$this->assertEquals($value, $counter);
$counter++;

if ($value == $limit)
if ($value == $limit) {
break;
}
}

try {
$chan->recv();
} catch (\Error $ex) {
var_dump($ex->getMessage());
}
}*/
$this->expectException(\Error::class);
$this->expectExceptionMessageMatches('/[channel(hi) closed]/');
$chan->recv();
}
}

0 comments on commit cfd11f3

Please sign in to comment.