Skip to content

Commit

Permalink
updates, have FutureInterface to include Channel api, allow channel…
Browse files Browse the repository at this point in the history
…Tick override on channel instances
  • Loading branch information
TheTechsTech committed May 3, 2021
1 parent cab147e commit 43d6f5d
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 40 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ This package is part of our [symplely/coroutine](https://symplely.github.io/coro

To learn more about **libuv** features read the online tutorial [book](https://nikhilm.github.io/uvbook/index.html).

The terminology in this version **3x** was changed to be inline with [`ext-parallel`](https://www.php.net/manual/en/book.parallel.php) extension usage, and to behave as a `Thread`, but without many of the extension's limitations.
The terminology in this version **3x** was changed to be inline with [`ext-parallel`](https://www.php.net/manual/en/book.parallel.php) extension usage, and to behave as a `Thread`, but without many of that library extension's limitations.

The `Channeled` and `Future` classes are both designed in a way to be extend from to create your own **implementation** of a `Parallel` based library. Currently `libuv` will be required to get full benefits of the implementation.

Expand Down Expand Up @@ -71,8 +71,10 @@ include 'vendor/autoload.php';

use Async\Spawn\Spawn;

// Shows output by default and Channel instance is extracted for args.
// Shows output by default and Channel instance is extracted from args.
$future = \parallel($function, ...$args)
// Shows output by default, turns on yield usage, can include additional file, and the Channel instance is extracted from args.
$future = \paralleling($function, $includeFile, ...$args)
// Or Does not show output by default and channel instance has to be explicitly passed ins.
$future = \spawn($function, $timeout, $channel)
// Or
Expand All @@ -87,6 +89,8 @@ $future = Spawn::create(function () use ($thing) {

\spawn_run($future);
// Or
\paralleling_run($future);
// Or
$future->run();

// Second option can be used to set to display child output, default is false
Expand Down
88 changes: 56 additions & 32 deletions Spawn/Future.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class Future implements FutureInterface
protected $out;
protected $err;
protected $timer;
protected $uvCounter = 0;
protected $channelCounter = 0;

protected $output;
protected $errorOutput;
Expand Down Expand Up @@ -83,6 +83,9 @@ class Future implements FutureInterface
/** @var callable */
protected static $channelLoop = null;

/** @var callable|boolean */
protected $channelOverride = false;

private function __construct(
$process,
int $id,
Expand Down Expand Up @@ -359,66 +362,73 @@ protected function yieldRun()
return yield $this->getResult();
}

/**
* Sets the `Channel` current state, Either `reading`, `writing`, `progressing`, `pending`.
*
* @param integer $status 0 - `reading`, 1 - `writing`, 2 - `progressing`, 3 - `pending`.
* @return void
*/
public function channelState(int $status)
public function channelState(int $status): void
{
$this->channelState = self::STATE[$status];
}

/**
* Return current `Channel` state, Either `reading`, `writing`, `progressing`, `pending`.
*
* @return string
*/
public function getChannelState()
public function getChannelState(): string
{
return $this->channelState;
}

public function isChannelYield(): bool
{
return $this->isYield;
}

public function isChanneling(): bool
{
return ($this->channelState !== Future::STATE[3]) && ($this->channelState !== Future::STATE[2]);
}

public function channelAdd()
public function channelAdd(): void
{
$this->uvCounter++;
$this->channelCounter++;
}

public function channelRemove()
public function channelRemove(): void
{
$this->uvCounter--;
$this->channelCounter--;
}

public function getChannelCount(): int
{
return $this->uvCounter;
return $this->channelCounter;
}

public function setChannelTick(callable $loop)
/**
* @codeCoverageIgnore
*/
public static function setChannelTick(callable $loop): void
{
self::$channelLoop = $loop;
}

/**
* @codeCoverageIgnore
*/
public function channelOverrideTick($looper = null): void
{
if (!empty($looper) && \is_callable($this->channelOverride))
$this->channelOverride = $looper;
else
$this->channelOverride = true;
}

public function channelTick($wait_count)
{
$loop = self::$channelLoop;
if ($this->isYield)
if ($this->isYield && !$this->channelOverride) {
return $this->channelTickYield($loop, $wait_count)->next();
} elseif ($this->channelOverride && \is_callable($this->channelOverride)) {
// @codeCoverageIgnoreStart
$loop = $this->channelOverride;
return $loop($wait_count);
// @codeCoverageIgnoreEnd
}

$loop($wait_count);
}

/**
* @codeCoverageIgnore
*/
protected function channelTickYield(callable $loop, $wait_count)
{
yield $loop($wait_count);
Expand Down Expand Up @@ -514,6 +524,14 @@ public function isStarted(): bool
return $this->hasStarted;
}

/**
* @codeCoverageIgnore
*/
public function isYield(): bool
{
return $this->isYield;
}

/**
* Check if input is a `Channeled` **message** from a `Future`.
*
Expand Down Expand Up @@ -680,6 +698,14 @@ public function getSignaled(): ?int
return $this->signal;
}

/**
* @codeCoverageIgnore
*/
public function getThen(): array
{
return $this->successCallbacks;
}

/**
* @codeCoverageIgnore
*/
Expand Down Expand Up @@ -775,14 +801,12 @@ public function triggerProgress(string $type, $buffer)
$liveOutput = $this->lastResult = $buffer;
}

if (\count($this->progressCallbacks) > 0) {
if ((\count($this->progressCallbacks) > 0) && \is_string($liveOutput) && !\is_base64($liveOutput)) {
foreach ($this->progressCallbacks as $progressCallback) {
if (\is_string($liveOutput) && !\is_base64($liveOutput)) {
if ($this->getChannelState() === Future::STATE[3])
$this->channelState(2);
if ($this->getChannelState() === Future::STATE[3])
$this->channelState(2);

$progressCallback($type, $liveOutput);
}
$progressCallback($type, $liveOutput);
}
}
}
Expand Down
96 changes: 90 additions & 6 deletions Spawn/FutureInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Async\Spawn;

use Async\Spawn\Process;
use Generator;

interface FutureInterface
{
Expand Down Expand Up @@ -63,17 +64,24 @@ public function close();
*/
public function wait($waitTimer = 1000, bool $useYield = false);

/**
* Return the handlers to be called when the `Future` process is successful.
*
* @return array
*/
public function getThen(): array;

/**
* Add handlers to be called when the `Future` process is successful, erred or progressing in real time.
*
* @param callable $doneCallback
* @param callable $thenCallback
* @param callable $failCallback
* @param callable $progressCallback
*
* @return FutureInterface
*/
public function then(
callable $doneCallback,
callable $thenCallback,
callable $failCallback = null,
callable $progressCallback = null
): FutureInterface;
Expand All @@ -90,8 +98,9 @@ public function then(
public function signal(int $signal, callable $signalCallback): FutureInterface;

/**
* Add handlers to be called when the `Future` process progressing, it's producing output.
* This can be use as a IPC handler for real time interaction.
* Add handlers to be called when the `Future` process **events** progressing, it's producing output.
* - The events are in relations to `stdin`, `stdout`, and `stderr` output.
* - This can be use as a IPC handler for real time interaction.
*
* The callback will receive **output type** either(`out` or `err`),
* and **the output** in real-time.
Expand Down Expand Up @@ -217,6 +226,13 @@ public function isSuccessful(): bool;
*/
public function isStarted(): bool;

/**
* Check if `Future` is in `yield` integration mode.
*
* @return boolean
*/
public function isYield(): bool;

/**
* Set `Future` process to display output of child process.
*
Expand Down Expand Up @@ -256,9 +272,77 @@ public function getStdio(): array;
public function getSignaled(): ?int;

/**
* Call the progress callbacks on the child subprocess output in real time.
* Sets the `Channel` current state, Either `reading`, `writing`, `progressing`, `pending`.
*
* @param integer $status 0 - `reading`, 1 - `writing`, 2 - `progressing`, 3 - `pending`.
* @return void
*/
public function channelState(int $status): void;

/**
* Return current `Channel` state, Either `reading`, `writing`, `progressing`, `pending`.
*
* @return string
*/
public function getChannelState(): string;

/**
* Check if `channel` currently in a `send/recv` state.
*
* @return boolean
*/
public function isChanneling(): bool;

/**
* **Add** a `send/recv` channel call.
*
* @return void
*/
public function channelAdd(): void;

/**
* **Remove** a `send/recv` channel call.
*
* @return void
*/
public function channelRemove(): void;

/**
* Total **added** `send/recv` channel calls.
*
* @return integer
*/
public function getChannelCount(): int;

/**
* Set the global callable routine for `Channel` blocking event loop for `send/recv` calls.
*
* @param callable $loop
* @return void
*/
public static function setChannelTick(callable $loop): void;

/**
* Auto sets `true`, to bypass `channelTick`, or override `channelTick` routine with another `$looper`.
*
* @param callable|null $looper sets a `Channel` instance specific event loop for `send/recv`.
* @return void
*/
public function channelOverrideTick($looper = null): void;

/**
* Execute `Channel` blocking event loop for `send/recv` calls.
*
* @param int $wait_count `added` channel calls to wait for.
* @return void
*/
public function channelTick($wait_count);

/**
* Call the progress callbacks on the child `Future` process **events** in real time.
* - The events are in relations to `stdin`, `stdout`, and `stderr`.
*
* @param string $type
* @param string $type - Either `ERR`, or `OUT`
* @param string $buffer
*
* @return void
Expand Down

0 comments on commit 43d6f5d

Please sign in to comment.