diff --git a/README.md b/README.md index ab52377..09ef7a3 100644 --- a/README.md +++ b/README.md @@ -208,11 +208,10 @@ $future = Spawn::create(function () { * - This feature is for `Coroutine` package or any third party package. * @param bool $useUv - Turn **on/off** `uv_spawn` for child subprocess operations, will use **libuv** features, * if not **true** will use `proc_open` of **symfony/process**. - * @param callable|null $channelLoop - the Event Loop routine to use in integrationMode. * * @codeCoverageIgnore */ -spawn_setup($loop, $isYield, $integrationMode, $useUv, $channelLoop); +spawn_setup($loop, $isYield, $integrationMode, $useUv); // For checking and acting on each subprocess status use: diff --git a/Spawn/Channeled.php b/Spawn/Channeled.php index d50fcee..3cf7a7c 100644 --- a/Spawn/Channeled.php +++ b/Spawn/Channeled.php @@ -5,6 +5,7 @@ namespace Async\Spawn; use Async\Spawn\Future; +use Async\Spawn\FutureInterface; use Async\Spawn\Process; use Async\Spawn\ChanneledInterface; @@ -242,8 +243,10 @@ public function send($value): void } $checkState = $future->isChanneling(); - if ($checkState) + if ($checkState) { + $future->channelState(1); $future->channelAdd(); + } \uv_write( $channelInput, @@ -255,7 +258,7 @@ function () use ($future, $checkState) { ); if ($checkState) - $future->channelTick(); + $future->channelTick($future->getChannelCount()); } elseif (null !== $value && ($this->state === 'process' || \is_resource($value))) { $this->input[] = self::validateInput(__METHOD__, $value); } elseif (null !== $value) { @@ -265,7 +268,7 @@ function () use ($future, $checkState) { } \fwrite($this->futureOutput, \serializer([$value, $messaging])); - \usleep(1500); + \usleep(2000); } } } @@ -292,16 +295,17 @@ public function recv() $checkState = $future->isChanneling(); if ($checkState) { + $future->channelState(0); $future->channelAdd(); } if ($checkState) { - $future->channelTick(); + $future->channelTick($future->getChannelCount()); $value = $future->getMessage(); while (\is_null($value)) { $future->channelAdd(); - $future->channelTick(); + $future->channelTick($future->getChannelCount()); $value = $future->getMessage(); } diff --git a/Spawn/Container.php b/Spawn/Container.php index 30a98a6..139611e 100644 --- a/Spawn/Container.php +++ b/Spawn/Container.php @@ -34,7 +34,7 @@ $output = $task(\spawn_channel()); \fflush(\STDOUT); - \usleep(1500); + \usleep(2000); \fwrite(\STDOUT, \serializer($output)); exit(0); } catch (\Throwable $exception) { diff --git a/Spawn/Core.php b/Spawn/Core.php index f5755e1..bf59d9d 100644 --- a/Spawn/Core.php +++ b/Spawn/Core.php @@ -387,6 +387,20 @@ function paralleling(?\Closure $task = null, ?string $include = null, ...$args): return Spawn::create($executable, 0, $channel, true)->displayOn(); } + /** + * Start the `Future` process and wait to terminate, and return any results. + * - This feature is for standalone mode only. + * - This feature runs **libuv** *uv_run(`loop`)* in *`UV::RUN_DEFAULT`* mode. + * + * @param FutureInterface $future + * @return mixed + * @see https://www.php.net/manual/en/parallel.run.php + */ + function paralleling_run(FutureInterface $future) + { + return $future->yielding()->next(); + } + /** * Returns an array of all `user defined` global variables, without `super globals`. * @@ -443,6 +457,10 @@ function channel_destroy() /** * Start the `Future` process and wait to terminate, and return any results. + * + * @param FutureInterface $future + * @param boolean $displayOutput + * @return mixed */ function spawn_run(FutureInterface $future, bool $displayOutput = false) { @@ -589,12 +607,11 @@ function deserialize($input) * - This feature is for `Coroutine` package or any third party package. * @param bool $useUv - Turn **on/off** `uv_spawn` for child subprocess operations, will use **libuv** features, * if not **true** will use `proc_open` of **symfony/process**. - * @param callable|null $channelLoop - the Event Loop routine to use in integrationMode. * * @codeCoverageIgnore */ - function spawn_setup($loop, bool $isYield = true, bool $integrationMode = true, bool $useUv = true, callable $channelLoop = null): void + function spawn_setup($loop, bool $isYield = true, bool $integrationMode = true, bool $useUv = true): void { - Spawn::setup($loop, $isYield, $integrationMode, $useUv, $channelLoop); + Spawn::setup($loop, $isYield, $integrationMode, $useUv); } } diff --git a/Spawn/Future.php b/Spawn/Future.php index e35d939..b73acf0 100644 --- a/Spawn/Future.php +++ b/Spawn/Future.php @@ -83,8 +83,6 @@ class Future implements FutureInterface /** @var callable */ protected static $channelLoop = null; - public static $channelConnected = false; - private function __construct( $process, int $id, @@ -94,8 +92,7 @@ private function __construct( \UVLoop $loop = null, bool $isYield = false, $task = null, - $channel = null, - callable $channelLoop = null + $channel = null ) { $this->timeout = $timeout; $this->process = $process; @@ -109,11 +106,17 @@ private function __construct( $this->messages = new \SplQueue(); self::$uv = $loop; self::$future[$id] = $this; - self::$channelConnected = true; - if (!empty($channelLoop)) - self::$channelLoop = $channelLoop; if ($channel instanceof Channeled) $channel->setFuture($this); + + if (self::$channelLoop === null) { + self::$channelLoop = function ($wait_count) { + if ($this->isChanneling()) + \uv_run(self::$uv, ($wait_count ? \UV::RUN_ONCE : \UV::RUN_NOWAIT)); + else + \uv_run(self::$uv); + }; + } } /** @@ -162,12 +165,11 @@ public function close() $this->signal = null; unset($this->messages); $this->messages = null; - self::$channelConnected = false; } public static function create(Process $process, int $id, int $timeout = 0, bool $isYield = false, $channel = null): FutureInterface { - return new self($process, $id, $timeout, [null, null, null], null, null, $isYield, null, $channel, self::$channelLoop); + return new self($process, $id, $timeout, [null, null, null], null, null, $isYield, null, $channel); } public static function add( @@ -179,8 +181,7 @@ public static function add( bool $isInitialized = false, int $timeout = 0, bool $isYield = false, - $channel = null, - $channelLoop = null + $channel = null ): FutureInterface { if (!$isInitialized) { [$autoload, $containerScript, $isInitialized] = Spawn::init(); @@ -276,7 +277,7 @@ public static function add( }); } - return new self($process, (int) $getId, $timeout, [$in, $out, $err], $timer, $uvLoop, $isYield, $task, $channel, $channelLoop); + return new self($process, (int) $getId, $timeout, [$in, $out, $err], $timer, $uvLoop, $isYield, $task, $channel); } public function start(): FutureInterface @@ -319,6 +320,7 @@ public function start(): FutureInterface public function restart(): FutureInterface { + $this->hasStarted = false; if ($this->process instanceof Process) { if ($this->isRunning()) $this->stop(); @@ -326,7 +328,7 @@ public function restart(): FutureInterface $process = clone $this->process; $future = $this->create($process, $this->id, $this->timeout); } else { - $future = self::add($this->task, $this->id, \PHP_BINARY, '', '', false, (int) $this->timeout, $this->isYield, null, self::$channelLoop); + $future = self::add($this->task, $this->id, \PHP_BINARY, '', '', false, (int) $this->timeout, $this->isYield, null); if ($this->isRunning()) $this->stop(); } @@ -336,7 +338,8 @@ public function restart(): FutureInterface public function run(bool $useYield = false) { - $this->start(); + if (!$this->isStarted()) + $this->start(); if ($useYield) return $this->wait(1000, true); @@ -377,7 +380,12 @@ public function getChannelState() return $this->channelState; } - public function isChanneling() + public function isChannelYield(): bool + { + return $this->isYield; + } + + public function isChanneling(): bool { return ($this->channelState !== Future::STATE[3]) && ($this->channelState !== Future::STATE[2]); } @@ -392,19 +400,28 @@ public function channelRemove() $this->uvCounter--; } - public function channelTick() + public function getChannelCount(): int { - if (!Spawn::isIntegration() || !\is_callable(self::$channelLoop)) { - if ($this->channelState === Future::STATE[0]) - \uv_run(self::$uv, ($this->uvCounter ? \UV::RUN_ONCE : \UV::RUN_NOWAIT)); - else - \uv_run(self::$uv); - } else { - // @codeCoverageIgnoreStart - $loop = self::$channelLoop; - $loop(); - // @codeCoverageIgnoreEnd - } + return $this->uvCounter; + } + + public function setChannelTick(callable $loop) + { + self::$channelLoop = $loop; + } + + public function channelTick($wait_count) + { + $loop = self::$channelLoop; + if ($this->isYield) + return $this->channelTickYield($loop, $wait_count)->next(); + + $loop($wait_count); + } + + protected function channelTickYield(callable $loop, $wait_count) + { + yield $loop($wait_count); } public function wait($waitTimer = 1000, bool $useYield = false) diff --git a/Spawn/Spawn.php b/Spawn/Spawn.php index bcbd827..6ecae83 100644 --- a/Spawn/Spawn.php +++ b/Spawn/Spawn.php @@ -157,11 +157,10 @@ public static function create( * - This feature is for `Coroutine` package or any third party package. * @param bool $useUv - Turn **on/off** `uv_spawn` for child subprocess operations, will use **libuv** features, * if not **true** will use `proc_open` of **symfony/process**. - * @param callable|null $channelLoop - the Event Loop routine to use in integrationMode. * * @codeCoverageIgnore */ - public static function setup($loop, bool $isYield = true, bool $integrationMode = true, bool $useUv = true, callable $channelLoop = null): void + public static function setup($loop, bool $isYield = true, bool $integrationMode = true, bool $useUv = true): void { if ($loop instanceof \UVLoop) { Future::uvLoop($loop); @@ -170,8 +169,6 @@ public static function setup($loop, bool $isYield = true, bool $integrationMode self::$integrationMode = $integrationMode; self::$isYield = $isYield; self::$useUv = $useUv; - if (!empty($channelLoop)) - self::$channelLoop = $channelLoop; } /** diff --git a/tests/ChanneledTest.php b/tests/ChanneledTest.php index dda9402..a49b4fc 100644 --- a/tests/ChanneledTest.php +++ b/tests/ChanneledTest.php @@ -279,4 +279,48 @@ public function testChannelDrains() $this->expectExceptionMessageMatches('/[channel(hi) closed]/'); $chan->recv(); } + + public function testParallelingInclude() + { + $future = \paralleling(function () { + return foo(); + }, \sprintf("%s/ChannelInclude.inc", __DIR__)); + + $this->expectOutputString('OK'); + echo \paralleling_run($future); + } + + public function testChannelRecvYield() + { + $channel = Channel::make("channel"); + + paralleling(function ($channel) { + $data = $channel->recv(); + echo $data; + }, null, $channel); + + $this->expectOutputString('OK'); + $channel->send("OK"); + } + + public function testChannelSendYield() + { + $channel = Channel::make("io"); + + paralleling(function ($channel) { + $channel = Channel::open($channel); + + for ($count = 0; $count <= 10; $count++) { + $channel->send($count); + } + + $channel->send(false); + }, null, (string) $channel); + + $counter = 0; + while (($value = $channel->recv()) !== false) { + $this->assertEquals($value, $counter); + $counter++; + } + } } diff --git a/tests/SpawnTest.php b/tests/SpawnTest.php index 3541391..cda62f1 100644 --- a/tests/SpawnTest.php +++ b/tests/SpawnTest.php @@ -349,14 +349,4 @@ public function testSetGetGlobals() $this->assertIsArray($global); $this->assertEquals($global['test'], 4); } - - public function testParallelingInclude() - { - $future = \paralleling(function () { - return foo(); - }, sprintf("%s/ChannelInclude.inc", __DIR__)); - - $this->expectOutputString('OK'); - echo $future->yielding()->current(); - } }