Skip to content

Commit

Permalink
corrections, refactor for better yield usage
Browse files Browse the repository at this point in the history
- remove `spawn_setup` channel option to `setChannelTick` method
- refactor to allow channel `send/recv` run yield
- doc-block updates
- additional methods for tracking channels
  • Loading branch information
TheTechsTech committed May 3, 2021
1 parent 61c0e48 commit cab147e
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 52 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,10 @@ $future = Spawn::create(function () {
* - This feature is for `Coroutine` package or any third party package.
* @param bool $useUv - Turn **on/off** `uv_spawn` for child subprocess operations, will use **libuv** features,
* if not **true** will use `proc_open` of **symfony/process**.
* @param callable|null $channelLoop - the Event Loop routine to use in integrationMode.
*
* @codeCoverageIgnore
*/
spawn_setup($loop, $isYield, $integrationMode, $useUv, $channelLoop);
spawn_setup($loop, $isYield, $integrationMode, $useUv);

// For checking and acting on each subprocess status use:

Expand Down
14 changes: 9 additions & 5 deletions Spawn/Channeled.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Async\Spawn;

use Async\Spawn\Future;
use Async\Spawn\FutureInterface;
use Async\Spawn\Process;
use Async\Spawn\ChanneledInterface;

Expand Down Expand Up @@ -242,8 +243,10 @@ public function send($value): void
}

$checkState = $future->isChanneling();
if ($checkState)
if ($checkState) {
$future->channelState(1);
$future->channelAdd();
}

\uv_write(
$channelInput,
Expand All @@ -255,7 +258,7 @@ function () use ($future, $checkState) {
);

if ($checkState)
$future->channelTick();
$future->channelTick($future->getChannelCount());
} elseif (null !== $value && ($this->state === 'process' || \is_resource($value))) {
$this->input[] = self::validateInput(__METHOD__, $value);
} elseif (null !== $value) {
Expand All @@ -265,7 +268,7 @@ function () use ($future, $checkState) {
}

\fwrite($this->futureOutput, \serializer([$value, $messaging]));
\usleep(1500);
\usleep(2000);
}
}
}
Expand All @@ -292,16 +295,17 @@ public function recv()

$checkState = $future->isChanneling();
if ($checkState) {
$future->channelState(0);
$future->channelAdd();
}

if ($checkState) {
$future->channelTick();
$future->channelTick($future->getChannelCount());
$value = $future->getMessage();

while (\is_null($value)) {
$future->channelAdd();
$future->channelTick();
$future->channelTick($future->getChannelCount());
$value = $future->getMessage();
}

Expand Down
2 changes: 1 addition & 1 deletion Spawn/Container.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
$output = $task(\spawn_channel());

\fflush(\STDOUT);
\usleep(1500);
\usleep(2000);
\fwrite(\STDOUT, \serializer($output));
exit(0);
} catch (\Throwable $exception) {
Expand Down
23 changes: 20 additions & 3 deletions Spawn/Core.php
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,20 @@ function paralleling(?\Closure $task = null, ?string $include = null, ...$args):
return Spawn::create($executable, 0, $channel, true)->displayOn();
}

/**
* Start the `Future` process and wait to terminate, and return any results.
* - This feature is for standalone mode only.
* - This feature runs **libuv** *uv_run(`loop`)* in *`UV::RUN_DEFAULT`* mode.
*
* @param FutureInterface $future
* @return mixed
* @see https://www.php.net/manual/en/parallel.run.php
*/
function paralleling_run(FutureInterface $future)
{
return $future->yielding()->next();
}

/**
* Returns an array of all `user defined` global variables, without `super globals`.
*
Expand Down Expand Up @@ -443,6 +457,10 @@ function channel_destroy()

/**
* Start the `Future` process and wait to terminate, and return any results.
*
* @param FutureInterface $future
* @param boolean $displayOutput
* @return mixed
*/
function spawn_run(FutureInterface $future, bool $displayOutput = false)
{
Expand Down Expand Up @@ -589,12 +607,11 @@ function deserialize($input)
* - This feature is for `Coroutine` package or any third party package.
* @param bool $useUv - Turn **on/off** `uv_spawn` for child subprocess operations, will use **libuv** features,
* if not **true** will use `proc_open` of **symfony/process**.
* @param callable|null $channelLoop - the Event Loop routine to use in integrationMode.
*
* @codeCoverageIgnore
*/
function spawn_setup($loop, bool $isYield = true, bool $integrationMode = true, bool $useUv = true, callable $channelLoop = null): void
function spawn_setup($loop, bool $isYield = true, bool $integrationMode = true, bool $useUv = true): void
{
Spawn::setup($loop, $isYield, $integrationMode, $useUv, $channelLoop);
Spawn::setup($loop, $isYield, $integrationMode, $useUv);
}
}
71 changes: 44 additions & 27 deletions Spawn/Future.php
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ class Future implements FutureInterface
/** @var callable */
protected static $channelLoop = null;

public static $channelConnected = false;

private function __construct(
$process,
int $id,
Expand All @@ -94,8 +92,7 @@ private function __construct(
\UVLoop $loop = null,
bool $isYield = false,
$task = null,
$channel = null,
callable $channelLoop = null
$channel = null
) {
$this->timeout = $timeout;
$this->process = $process;
Expand All @@ -109,11 +106,17 @@ private function __construct(
$this->messages = new \SplQueue();
self::$uv = $loop;
self::$future[$id] = $this;
self::$channelConnected = true;
if (!empty($channelLoop))
self::$channelLoop = $channelLoop;
if ($channel instanceof Channeled)
$channel->setFuture($this);

if (self::$channelLoop === null) {
self::$channelLoop = function ($wait_count) {
if ($this->isChanneling())
\uv_run(self::$uv, ($wait_count ? \UV::RUN_ONCE : \UV::RUN_NOWAIT));
else
\uv_run(self::$uv);
};
}
}

/**
Expand Down Expand Up @@ -162,12 +165,11 @@ public function close()
$this->signal = null;
unset($this->messages);
$this->messages = null;
self::$channelConnected = false;
}

public static function create(Process $process, int $id, int $timeout = 0, bool $isYield = false, $channel = null): FutureInterface
{
return new self($process, $id, $timeout, [null, null, null], null, null, $isYield, null, $channel, self::$channelLoop);
return new self($process, $id, $timeout, [null, null, null], null, null, $isYield, null, $channel);
}

public static function add(
Expand All @@ -179,8 +181,7 @@ public static function add(
bool $isInitialized = false,
int $timeout = 0,
bool $isYield = false,
$channel = null,
$channelLoop = null
$channel = null
): FutureInterface {
if (!$isInitialized) {
[$autoload, $containerScript, $isInitialized] = Spawn::init();
Expand Down Expand Up @@ -276,7 +277,7 @@ public static function add(
});
}

return new self($process, (int) $getId, $timeout, [$in, $out, $err], $timer, $uvLoop, $isYield, $task, $channel, $channelLoop);
return new self($process, (int) $getId, $timeout, [$in, $out, $err], $timer, $uvLoop, $isYield, $task, $channel);
}

public function start(): FutureInterface
Expand Down Expand Up @@ -319,14 +320,15 @@ public function start(): FutureInterface

public function restart(): FutureInterface
{
$this->hasStarted = false;
if ($this->process instanceof Process) {
if ($this->isRunning())
$this->stop();

$process = clone $this->process;
$future = $this->create($process, $this->id, $this->timeout);
} else {
$future = self::add($this->task, $this->id, \PHP_BINARY, '', '', false, (int) $this->timeout, $this->isYield, null, self::$channelLoop);
$future = self::add($this->task, $this->id, \PHP_BINARY, '', '', false, (int) $this->timeout, $this->isYield, null);
if ($this->isRunning())
$this->stop();
}
Expand All @@ -336,7 +338,8 @@ public function restart(): FutureInterface

public function run(bool $useYield = false)
{
$this->start();
if (!$this->isStarted())
$this->start();

if ($useYield)
return $this->wait(1000, true);
Expand Down Expand Up @@ -377,7 +380,12 @@ public function getChannelState()
return $this->channelState;
}

public function isChanneling()
public function isChannelYield(): bool
{
return $this->isYield;
}

public function isChanneling(): bool
{
return ($this->channelState !== Future::STATE[3]) && ($this->channelState !== Future::STATE[2]);
}
Expand All @@ -392,19 +400,28 @@ public function channelRemove()
$this->uvCounter--;
}

public function channelTick()
public function getChannelCount(): int
{
if (!Spawn::isIntegration() || !\is_callable(self::$channelLoop)) {
if ($this->channelState === Future::STATE[0])
\uv_run(self::$uv, ($this->uvCounter ? \UV::RUN_ONCE : \UV::RUN_NOWAIT));
else
\uv_run(self::$uv);
} else {
// @codeCoverageIgnoreStart
$loop = self::$channelLoop;
$loop();
// @codeCoverageIgnoreEnd
}
return $this->uvCounter;
}

public function setChannelTick(callable $loop)
{
self::$channelLoop = $loop;
}

public function channelTick($wait_count)
{
$loop = self::$channelLoop;
if ($this->isYield)
return $this->channelTickYield($loop, $wait_count)->next();

$loop($wait_count);
}

protected function channelTickYield(callable $loop, $wait_count)
{
yield $loop($wait_count);
}

public function wait($waitTimer = 1000, bool $useYield = false)
Expand Down
5 changes: 1 addition & 4 deletions Spawn/Spawn.php
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,10 @@ public static function create(
* - This feature is for `Coroutine` package or any third party package.
* @param bool $useUv - Turn **on/off** `uv_spawn` for child subprocess operations, will use **libuv** features,
* if not **true** will use `proc_open` of **symfony/process**.
* @param callable|null $channelLoop - the Event Loop routine to use in integrationMode.
*
* @codeCoverageIgnore
*/
public static function setup($loop, bool $isYield = true, bool $integrationMode = true, bool $useUv = true, callable $channelLoop = null): void
public static function setup($loop, bool $isYield = true, bool $integrationMode = true, bool $useUv = true): void
{
if ($loop instanceof \UVLoop) {
Future::uvLoop($loop);
Expand All @@ -170,8 +169,6 @@ public static function setup($loop, bool $isYield = true, bool $integrationMode
self::$integrationMode = $integrationMode;
self::$isYield = $isYield;
self::$useUv = $useUv;
if (!empty($channelLoop))
self::$channelLoop = $channelLoop;
}

/**
Expand Down
44 changes: 44 additions & 0 deletions tests/ChanneledTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,48 @@ public function testChannelDrains()
$this->expectExceptionMessageMatches('/[channel(hi) closed]/');
$chan->recv();
}

public function testParallelingInclude()
{
$future = \paralleling(function () {
return foo();
}, \sprintf("%s/ChannelInclude.inc", __DIR__));

$this->expectOutputString('OK');
echo \paralleling_run($future);
}

public function testChannelRecvYield()
{
$channel = Channel::make("channel");

paralleling(function ($channel) {
$data = $channel->recv();
echo $data;
}, null, $channel);

$this->expectOutputString('OK');
$channel->send("OK");
}

public function testChannelSendYield()
{
$channel = Channel::make("io");

paralleling(function ($channel) {
$channel = Channel::open($channel);

for ($count = 0; $count <= 10; $count++) {
$channel->send($count);
}

$channel->send(false);
}, null, (string) $channel);

$counter = 0;
while (($value = $channel->recv()) !== false) {
$this->assertEquals($value, $counter);
$counter++;
}
}
}
10 changes: 0 additions & 10 deletions tests/SpawnTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -349,14 +349,4 @@ public function testSetGetGlobals()
$this->assertIsArray($global);
$this->assertEquals($global['test'], 4);
}

public function testParallelingInclude()
{
$future = \paralleling(function () {
return foo();
}, sprintf("%s/ChannelInclude.inc", __DIR__));

$this->expectOutputString('OK');
echo $future->yielding()->current();
}
}

0 comments on commit cab147e

Please sign in to comment.