Skip to content

Commit

Permalink
WIP add PoolManager
Browse files Browse the repository at this point in the history
  • Loading branch information
sweikenb committed Aug 3, 2024
1 parent d676a59 commit 6521239
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 7 deletions.
8 changes: 4 additions & 4 deletions src/Api/ProcessManagerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ public function runProcess(callable $callback, ?ProcessOutputInterface $output =
/**
* By default, it waits until all child-processes are returned and calls the optional callback for each of them.
* The callback will receive the return-status and the pid of the child process as arguments.
* If the callback returns FALSE (boolean), the method will not wait for further children to exit and returns early,
* any other or none value (including NULL) will continue to wait if remaining children are present.
* The callback will be executed before any other registered callback.
* If the callback returns FALSE (boolean) or $block is set to FALSE (boolean), the method will not wait for further
* children to exit and returns early, any other or none value (including NULL) will continue to wait if remaining
* children are present. The callback will be executed before any other registered callback.
* Only works in the parent-process.
*/
public function wait(?callable $callback = null): void;
public function wait(?callable $callback = null, bool $block = true): void;

/**
* Registers a callback for when a child process gets created. Multiple callbacks can be registered.
Expand Down
70 changes: 70 additions & 0 deletions src/PoolManager.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?php
namespace Sweikenb\Library\Pcntl;

use Sweikenb\Library\Pcntl\Api\ChildProcessInterface;
use Sweikenb\Library\Pcntl\Api\ProcessManagerInterface;

class PoolManager
{
const DEFAULT_KILL_TIMEOUT = 1.0;
private ProcessManagerInterface $pm;
private bool $interrupted = false;
private array $processes = [];

public function __construct(?ProcessManagerInterface $processManager = null,)
{
// create the process manager instance if not provided
$this->pm = $processManager ?? new ProcessManager();

// register/override interrupt handler for the main-process
pcntl_signal(SIGINT, [$this, 'handleInterrupt']);

// register events
$this->pm->onThreadCreate(function (ChildProcessInterface $process) {
$this->processes[$process->getId()] = $process;
});
$this->pm->onThreadExit(function (int $status, int $pid) {
unset($this->processes[$pid]);
});
}

public function handleInterrupt(): void
{
$this->interrupted = true;
$this->pm->sendSignalToChildren(SIGTERM);
}

public function execute(int $poolSize, callable $mainLoop, callable $processLoop, ?float $killTimeout = null): int
{
while (!$this->interrupted) {
// ensure we have enough threads
$this->pm->wait(block: false);
$missing = $poolSize - count($this->processes);
for ($i = 0; $i < $missing; $i++) {
$this->pm->runProcess($processLoop);
}

// call the main-loop
$this->pm->wait(block: false);
call_user_func($mainLoop, $this->processes, $this->pm);
}

// wait for children to exit after sending the SIGTERM by the SIGINT handler
// send a SIGKILL to forcefully terminate the child after the kill-timeout is reached
$status = 0;
$waitStart = microtime(true);
while (!empty($this->processes)) {
if ((microtime(true) - $waitStart) >= ($killTimeout ?? self::DEFAULT_KILL_TIMEOUT)) {
$this->pm->sendSignalToChildren(SIGKILL);
$status = 1;
break;
}
$this->pm->unblock();
}

// final wait until all children exited after the KILL
$this->pm->wait();

return $status;
}
}
10 changes: 7 additions & 3 deletions src/ProcessManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public function runProcess(callable $callback, ?ProcessOutputInterface $output =
}
}

public function wait(?callable $callback = null): void
public function wait(?callable $callback = null, bool $block = true): void
{
if ($this->isChildProcess) {
return;
Expand All @@ -188,21 +188,25 @@ public function wait(?callable $callback = null): void
}

// wait for all children to exit
while (!empty($this->childProcesses)) {
$wait = true;
while ($wait && !empty($this->childProcesses)) {
// process the exit-queue
foreach ($this->childExitQueue as $pid => $status) {
if ($pid > 0) {
unset($this->childExitQueue[$pid], $this->childProcesses[$pid]);
foreach ($callbackStack as $callback) {
if (call_user_func($callback, $status, $pid) === false) {
return;
$block = false;
}
}
}
}

// unblock the system and dispatch queued signals
$this->unblock();

// continue to wait?
$wait = $block;
}
}

Expand Down

0 comments on commit 6521239

Please sign in to comment.