From 652123931c1d5c6c877f31a7af70047af322e27f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Schr=C3=B6er?= Date: Sat, 3 Aug 2024 18:27:41 +0200 Subject: [PATCH] WIP add PoolManager --- src/Api/ProcessManagerInterface.php | 8 ++-- src/PoolManager.php | 70 +++++++++++++++++++++++++++++ src/ProcessManager.php | 10 +++-- 3 files changed, 81 insertions(+), 7 deletions(-) create mode 100644 src/PoolManager.php diff --git a/src/Api/ProcessManagerInterface.php b/src/Api/ProcessManagerInterface.php index ab94483..3c96c6e 100644 --- a/src/Api/ProcessManagerInterface.php +++ b/src/Api/ProcessManagerInterface.php @@ -22,12 +22,12 @@ public function runProcess(callable $callback, ?ProcessOutputInterface $output = /** * By default, it waits until all child-processes are returned and calls the optional callback for each of them. * The callback will receive the return-status and the pid of the child process as arguments. - * If the callback returns FALSE (boolean), the method will not wait for further children to exit and returns early, - * any other or none value (including NULL) will continue to wait if remaining children are present. - * The callback will be executed before any other registered callback. + * If the callback returns FALSE (boolean) or $block is set to FALSE (boolean), the method will not wait for further + * children to exit and returns early, any other or none value (including NULL) will continue to wait if remaining + * children are present. The callback will be executed before any other registered callback. * Only works in the parent-process. */ - public function wait(?callable $callback = null): void; + public function wait(?callable $callback = null, bool $block = true): void; /** * Registers a callback for when a child process gets created. Multiple callbacks can be registered. diff --git a/src/PoolManager.php b/src/PoolManager.php new file mode 100644 index 0000000..2936802 --- /dev/null +++ b/src/PoolManager.php @@ -0,0 +1,70 @@ +pm = $processManager ?? new ProcessManager(); + + // register/override interrupt handler for the main-process + pcntl_signal(SIGINT, [$this, 'handleInterrupt']); + + // register events + $this->pm->onThreadCreate(function (ChildProcessInterface $process) { + $this->processes[$process->getId()] = $process; + }); + $this->pm->onThreadExit(function (int $status, int $pid) { + unset($this->processes[$pid]); + }); + } + + public function handleInterrupt(): void + { + $this->interrupted = true; + $this->pm->sendSignalToChildren(SIGTERM); + } + + public function execute(int $poolSize, callable $mainLoop, callable $processLoop, ?float $killTimeout = null): int + { + while (!$this->interrupted) { + // ensure we have enough threads + $this->pm->wait(block: false); + $missing = $poolSize - count($this->processes); + for ($i = 0; $i < $missing; $i++) { + $this->pm->runProcess($processLoop); + } + + // call the main-loop + $this->pm->wait(block: false); + call_user_func($mainLoop, $this->processes, $this->pm); + } + + // wait for children to exit after sending the SIGTERM by the SIGINT handler + // send a SIGKILL to forcefully terminate the child after the kill-timeout is reached + $status = 0; + $waitStart = microtime(true); + while (!empty($this->processes)) { + if ((microtime(true) - $waitStart) >= ($killTimeout ?? self::DEFAULT_KILL_TIMEOUT)) { + $this->pm->sendSignalToChildren(SIGKILL); + $status = 1; + break; + } + $this->pm->unblock(); + } + + // final wait until all children exited after the KILL + $this->pm->wait(); + + return $status; + } +} diff --git a/src/ProcessManager.php b/src/ProcessManager.php index 4fe626d..dde777b 100644 --- a/src/ProcessManager.php +++ b/src/ProcessManager.php @@ -176,7 +176,7 @@ public function runProcess(callable $callback, ?ProcessOutputInterface $output = } } - public function wait(?callable $callback = null): void + public function wait(?callable $callback = null, bool $block = true): void { if ($this->isChildProcess) { return; @@ -188,14 +188,15 @@ public function wait(?callable $callback = null): void } // wait for all children to exit - while (!empty($this->childProcesses)) { + $wait = true; + while ($wait && !empty($this->childProcesses)) { // process the exit-queue foreach ($this->childExitQueue as $pid => $status) { if ($pid > 0) { unset($this->childExitQueue[$pid], $this->childProcesses[$pid]); foreach ($callbackStack as $callback) { if (call_user_func($callback, $status, $pid) === false) { - return; + $block = false; } } } @@ -203,6 +204,9 @@ public function wait(?callable $callback = null): void // unblock the system and dispatch queued signals $this->unblock(); + + // continue to wait? + $wait = $block; } }