From 0c258850474ce034b4805e16507b8beb8fa4bdb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Schr=C3=B6er?= <366399+sweikenb@users.noreply.github.com> Date: Sat, 23 Sep 2023 21:34:39 +0200 Subject: [PATCH] v5.0.0 Release (#8) * Add ProcessQueue feature (#7) --- CHANGELOG.md | 34 ++++++++++++++++++++++ README.md | 44 ++++++++++++++++++++++++++++- example/200_process_queue.php | 18 ++++++++++++ src/Api/ProcessManagerInterface.php | 7 +++-- src/Api/ProcessQueueInterface.php | 11 ++++++++ src/ProcessManager.php | 20 +++++++++---- src/ProcessQueue.php | 31 ++++++++++++++++++++ 7 files changed, 155 insertions(+), 10 deletions(-) create mode 100644 CHANGELOG.md create mode 100644 example/200_process_queue.php create mode 100644 src/Api/ProcessQueueInterface.php create mode 100644 src/ProcessQueue.php diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..e55590d --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,34 @@ +# Changelog + +## Release [v5.0.0](https://github.com/sweikenb/pcntl/releases/tag/v5.0.0) + +**Features** + +- `ProcessQueue` added as more flexible alternative to `ProcessPool` but without pre-created pool workers + +**Breaking Changes** + +- The return value of the optional `ProcessManager::wait`-callback is now used to determine if the method should + continue to wait for further children to exit. If a value other than explicitly `false` is returned, it will continue + to wait. + +* * * + +## Release [v4.0.0](https://github.com/sweikenb/pcntl/releases/tag/v4.0.0) + +**Plan for future releases** + +- Introduction of a maintained changelog for each release +- From now on, only major version releases will introduce BC breaks +- Features that are about to be removed in the next major version will be marked with `@deprecated` + +**Bugfixes** + +- [#4 Bug: very fast/empty forks will exit before the pcntl_wait() can capture it](https://github.com/sweikenb/pcntl/issues/4) + +**Breaking Changes** + +- PHP v8.2 as minimum requirement +- Due to incompatibility, this library DOES NOT work if the **grpc** PHP-extension is installed +- Removal of the optional `EventDispatcher` +- Some methods of the `ProcessPool` and `ProcessManager` has been renamed diff --git a/README.md b/README.md index e490bb7..dcee494 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,21 @@ Simple and easy to use process manager for PHP based on default PCNTL and POSIX functions. +Please also have a look into the [CHANGELOG](CHANGELOG.md) for latest updates and release information . + +**License**: [MIT](LICENSE.txt) + +**Topics** + +- [Installation](#installation) +- [Basic Usage](#usage) +- [IPC Inter Process Communication](#inter-process-communication) +- [Pool Processing](#process-pool--worker-processes) +- [Queued Processing](#process-queue) +- [Examples](#basic-example) + +* * * + ## Installation You can install this library by [composer](https://getcomposer.org/): @@ -94,7 +109,34 @@ sleep(5); $pool->closePool(); ``` -## Example +## Process Queue + +In case you want to work with dynamic callbacks instead of a process-pool, which will require predefined worker, you can +use the `ProcessQueue`. This queue limits the number of threads that will be forked and handles the return-management +for you. + +```php +addToQueue(function (ChildProcessInterface $child, ParentProcessInterface $parent) use ($i) { + sleep(mt_rand(1, 3)); + fwrite(STDOUT, sprintf("Queued thread %d processed message no. %d\n", $child->getId(), $i)); + }); +} +``` + +# Basic Example ```php addToQueue(function (ChildProcessInterface $child, ParentProcessInterface $parent) use ($i) { + sleep(mt_rand(1, 3)); + fwrite(STDOUT, sprintf("Queued thread %d processed message no. %d\n", $child->getId(), $i)); + }); +} diff --git a/src/Api/ProcessManagerInterface.php b/src/Api/ProcessManagerInterface.php index ebff23c..2b36140 100644 --- a/src/Api/ProcessManagerInterface.php +++ b/src/Api/ProcessManagerInterface.php @@ -19,10 +19,11 @@ public function getMainProcess(): ParentProcessInterface; public function runProcess(callable $callback): ChildProcessInterface; /** - * Waits until all child-processes are returned and calls the optional callback for each process that returns. - * The callback will receive the return-status and the pid of the child process. - * * Only works in the parent-process. + * By default, it waits until all child-processes are returned and calls the optional callback for each of them. + * The callback will receive the return-status and the pid of the child process as arguments. + * If the callback returns FALSE (boolean), the method will not wait for further children to exit and returns early, + * any other or none value (including NULL) will continue to wait if remaining children are present. */ public function wait(?callable $callback = null): void; } diff --git a/src/Api/ProcessQueueInterface.php b/src/Api/ProcessQueueInterface.php new file mode 100644 index 0000000..f44af03 --- /dev/null +++ b/src/Api/ProcessQueueInterface.php @@ -0,0 +1,11 @@ +isChildProcess) { return; } - $handleChildExit = function (int $pid, int $status) use ($callback): void { + $handleChildExit = function (int $pid, int $status) use ($callback): bool { if ($pid > 0) { if (isset($this->childProcesses[$pid])) { unset($this->childProcesses[$pid]); @@ -168,18 +168,26 @@ public function wait(?callable $callback = null): void if (isset($this->earlyExitChildQueue[$pid])) { unset($this->earlyExitChildQueue[$pid]); } - if (null !== $callback) { - call_user_func($callback, $status, $pid); + if (null !== $callback && call_user_func($callback, $status, $pid) === false) { + return false; } } + return true; }; + + // run the callback for all early exit children no matter what + $waitForMoreToExit = true; while (!empty($this->earlyExitChildQueue)) { [$pid, $status] = current($this->earlyExitChildQueue); - $handleChildExit($pid, $status); + $waitForMoreToExit = $waitForMoreToExit && $handleChildExit($pid, $status); } - while (!empty($this->childProcesses)) { + + // only wait for the regular children if desired + while ($waitForMoreToExit && !empty($this->childProcesses)) { $pid = pcntl_wait($status); - $handleChildExit($pid, $status); + if (!$handleChildExit($pid, $status)) { + return; + } } } } diff --git a/src/ProcessQueue.php b/src/ProcessQueue.php new file mode 100644 index 0000000..718b577 --- /dev/null +++ b/src/ProcessQueue.php @@ -0,0 +1,31 @@ +processManager = $processManager ?? new ProcessManager(); + $this->maxThreads = max(1, $maxThreads); + } + + public function addToQueue(callable $callback): ChildProcessInterface + { + while ($this->threadCounter >= $this->maxThreads) { + $this->processManager->wait(fn() => --$this->threadCounter >= $this->maxThreads); + } + $this->threadCounter++; + return $this->processManager->runProcess($callback); + } +}