Skip to content

Commit

Permalink
test, refactor to allow passing globals between Futures as `ext-paral…
Browse files Browse the repository at this point in the history
…lel`

- allowing Future and Channel to store each others instances
  • Loading branch information
TheTechsTech committed May 6, 2021
1 parent 43d6f5d commit 48062c6
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 63 deletions.
32 changes: 32 additions & 0 deletions Spawn/Channeled.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class Channeled implements ChanneledInterface
*/
protected $channel = null;
protected $process = null;
protected $paralleling = null;
protected $futureInput = \STDIN;
protected $futureOutput = \STDOUT;
protected $futureError = \STDERR;
Expand All @@ -52,6 +53,7 @@ public function __destruct()
$this->open = false;
$this->channel = null;
$this->process = null;
$this->paralleling = null;
unset($this->buffered);
$this->buffered = null;
}
Expand All @@ -61,6 +63,10 @@ public function __construct(
string $name = __FILE__,
bool $anonymous = true
) {
global $___channeled___;
if ($this->capacity !== null && $___channeled___ === 'parallel')
$capacity = $this->capacity;

if (($capacity < -1) || ($capacity == 0))
throw new \TypeError('capacity may be -1 for unlimited, or a positive integer');

Expand Down Expand Up @@ -167,6 +173,32 @@ public function setFuture($handle): ChanneledInterface
return $this;
}

/**
* Store an handle that has an `->value()` method.
*
* @param Object $handle for storing an `ext-parallel` like Future object
* @codeCoverageIgnore
*/
public function setParalleling($handle): ChanneledInterface
{
if (\is_object($handle) && \method_exists($handle, 'value')) {
$this->paralleling = $handle;
}

return $this;
}

/**
* Return an `ext-parallel` Future like object
*
* @return object|null
* @codeCoverageIgnore
*/
public function getParalleling()
{
return $this->paralleling;
}

public function getFuture(): ?FutureInterface
{
return $this->channel;
Expand Down
63 changes: 22 additions & 41 deletions Spawn/Core.php
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ function spawn(
*/
function parallel($task, ...$argv): FutureInterface
{
global $___parallel___;
global $___paralleling;
$channel = null;
foreach ($argv as $isChannel) {
if ($isChannel instanceof ChanneledInterface) {
Expand All @@ -337,8 +337,8 @@ function parallel($task, ...$argv): FutureInterface
}

// @codeCoverageIgnoreStart
$executable = function () use ($task, $argv, $___parallel___) {
\parallel_setup($___parallel___);
$executable = function () use ($task, $argv, $___paralleling) {
\paralleling_setup(null, $___paralleling);
return $task(...$argv);
};
// @codeCoverageIgnoreEnd
Expand All @@ -361,7 +361,7 @@ function parallel($task, ...$argv): FutureInterface
*/
function paralleling(?\Closure $task = null, ?string $include = null, ...$args): FutureInterface
{
global $___parallel___;
global $___paralleling;

$channel = null;
foreach ($args as $isChannel) {
Expand All @@ -375,16 +375,16 @@ function paralleling(?\Closure $task = null, ?string $include = null, ...$args):
}

// @codeCoverageIgnoreStart
$executable = function () use ($task, $args, $include, $___parallel___) {
if (!empty($include) && \is_string($include))
require $include;

\parallel_setup($___parallel___);
$executable = function () use ($task, $args, $include, $___paralleling) {
\paralleling_setup($include, $___paralleling);
return $task(...$args);
};
// @codeCoverageIgnoreEnd
$future = Spawn::create($executable, 0, $channel, true)->displayOn();
if ($channel instanceof ChanneledInterface)
$future->setChannel($channel);

return Spawn::create($executable, 0, $channel, true)->displayOn();
return $future;
}

/**
Expand All @@ -402,39 +402,32 @@ function paralleling_run(FutureInterface $future)
}

/**
* Returns an array of all `user defined` global variables, without `super globals`.
* Returns an array of **Future** `user defined` *global* variables.
*
* @param array $vars only **get_defined_vars()** should be passed in.
* @return array
* @return array|null
*/
function get_globals(array $vars): array
function paralleling_globals(): ?array
{
$global = @\array_diff($vars, array(array()));
unset($global['argc']);
return $global;
}
global $___paralleling;

/**
* Returns an array of all `user defined` global variables, without `super globals`.
* @return array
*
* @codeCoverageIgnore
*/
function parallel_globals(): array
{
return \get_globals(get_defined_vars());
return $___paralleling;
}

/**
* Setup `user defined` global `key => value` pair to be transferred to `Future` **subprocess**.
* - Also an indicator for a `Channel` that it been started by `subprocess` Future.
* - Can `include/require` an additional **file** to execute.
* - Also an indicator for a `Channel` that it has been started by `subprocess` Future.
*
* @param string $include additional file to execute
* @param array|null $keyValue
* @return void
*/
function parallel_setup(?array $keyValue = null): void
function paralleling_setup(?string $include = null, ?array $keyValue = null): void
{
global $___channeled___;
if (!empty($include) && \is_string($include)) {
require $include;
}

if (\is_array($keyValue))
foreach ($keyValue as $key => $value)
Expand All @@ -443,18 +436,6 @@ function parallel_setup(?array $keyValue = null): void
$___channeled___ = 'parallel';
}

/**
* Destroy `All` Channel instances.
*
* @return void
*
* @codeCoverageIgnore
*/
function channel_destroy()
{
Channeled::destroy();
}

/**
* Start the `Future` process and wait to terminate, and return any results.
*
Expand Down
38 changes: 30 additions & 8 deletions Spawn/Future.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class Future implements FutureInterface
protected $out;
protected $err;
protected $timer;
protected $channelCounter = 0;

protected $output;
protected $errorOutput;
Expand Down Expand Up @@ -72,6 +71,14 @@ class Future implements FutureInterface
*/
protected $channelState = self::STATE[3];

/** @var callable */
protected static $channelLoop = null;

/** @var callable|boolean */
protected $channelOverride = false;
protected $channelInstance = null;
protected $channelCounter = 0;

/**
* @var int
*/
Expand All @@ -80,11 +87,6 @@ class Future implements FutureInterface
protected static $future = [];
protected static $uv = null;

/** @var callable */
protected static $channelLoop = null;

/** @var callable|boolean */
protected $channelOverride = false;

private function __construct(
$process,
Expand Down Expand Up @@ -168,6 +170,7 @@ public function close()
$this->signal = null;
unset($this->messages);
$this->messages = null;
$this->channelInstance = null;
}

public static function create(Process $process, int $id, int $timeout = 0, bool $isYield = false, $channel = null): FutureInterface
Expand Down Expand Up @@ -362,6 +365,16 @@ protected function yieldRun()
return yield $this->getResult();
}

public function setChannel(ChanneledInterface $handle): void
{
$this->channelInstance = $handle;
}

public function getChannel(): ChanneledInterface
{
return $this->channelInstance;
}

public function channelState(int $status): void
{
$this->channelState = self::STATE[$status];
Expand Down Expand Up @@ -656,13 +669,22 @@ public function getLast()

public function getResult()
{
global $___parallel___;
global $___paralleling;

if (!$this->finalResult) {
$this->finalResult = $this->decoded($this->lastResult);

if ($this->isFinal($this->finalResult)) {
[$this->finalResult,, $___parallel___] = $this->finalResult;
[$this->finalResult,, $___paralleling] = $this->finalResult;
if (isset($___paralleling) && \is_array($___paralleling)) {
$global = $___paralleling['GLOBALS'];
unset($___paralleling['GLOBALS']);
unset($___paralleling['_GET'], $___paralleling['_POST'], $___paralleling['_COOKIE'], $___paralleling['_FILES']);
unset($___paralleling['_ENV'], $___paralleling['_REQUEST'], $___paralleling['_SERVER'], $___paralleling['argc']);
unset($___paralleling['argv'], $___paralleling['autoload'], $___paralleling['serializedClosure']);
unset($___paralleling['__composer_autoload_files'], $___paralleling['error'], $___paralleling['task'], $___paralleling['results']);
$___paralleling = \is_array($global) ? \array_merge($global, $___paralleling) : $___paralleling;
}
}
}

Expand Down
17 changes: 16 additions & 1 deletion Spawn/FutureInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,21 @@ public function getStdio(): array;
*/
public function getSignaled(): ?int;

/**
* Store connected `Channel` instance.
*
* @param ChanneledInterface $handle
* @return void
*/
public function setChannel(ChanneledInterface $handle): void;

/**
* Return the stored connected `Channel` instance.
*
* @return ChanneledInterface
*/
public function getChannel(): ChanneledInterface;

/**
* Sets the `Channel` current state, Either `reading`, `writing`, `progressing`, `pending`.
*
Expand Down Expand Up @@ -308,7 +323,7 @@ public function channelAdd(): void;
public function channelRemove(): void;

/**
* Total **added** `send/recv` channel calls.
* Return total **added** `send/recv` channel calls.
*
* @return integer
*/
Expand Down
8 changes: 2 additions & 6 deletions Spawn/UVContainer.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,8 @@
$results = $task(\spawn_channel());

\fflush(\STDOUT);
\usleep(5);

\fwrite(\STDOUT, \serializer([$results, '___final', \parallel_globals()]));

\usleep(5);
\fflush(\STDOUT);
\usleep(500);
\fwrite(\STDOUT, \serializer([$results, '___final', $GLOBALS]));
exit(0);
} catch (\Throwable $exception) {
\fwrite(\STDERR, \serializer(new SerializableException($exception)));
Expand Down
16 changes: 14 additions & 2 deletions tests/ChanneledTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ protected function setUp(): void
if (!\function_exists('uv_loop_new'))
$this->markTestSkipped('Test skipped "uv_loop_new" missing.');

\channel_destroy();
Channel::destroy();
\spawn_setup(null, false, false, true);
}

Expand Down Expand Up @@ -290,17 +290,29 @@ public function testParallelingInclude()
echo \paralleling_run($future);
}

public function testParallelingNoInclude()
{
$future = \paralleling(function () {
echo 'foo';
}, \sprintf("%s/nope.inc", __DIR__));

$this->expectOutputRegex('/[failed to open stream: No such file or directory]/');
echo \paralleling_run($future);
}

public function testChannelRecvYield()
{
$channel = Channel::make("channel");

paralleling(function ($channel) {
$future = paralleling(function ($channel) {
$data = $channel->recv();
echo $data;
}, null, $channel);

$this->expectOutputString('OK');
$channel->send("OK");

$this->assertSame($future->getChannel(), $channel);
}

public function testChannelSendYield()
Expand Down
Loading

0 comments on commit 48062c6

Please sign in to comment.