diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index a5e4197..321bcee 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -7,7 +7,7 @@ name: Build Documentation on: # Runs on pushes targeting the default branch push: - branches: ["main", "cleanup-and-tests"] + branches: ["main"] # Allows you to run this workflow manually from the Actions tab workflow_dispatch: diff --git a/docs/src/features/process-manager.md b/docs/src/features/process-manager.md index 4833294..59e5380 100644 --- a/docs/src/features/process-manager.md +++ b/docs/src/features/process-manager.md @@ -133,7 +133,14 @@ Please note that the callback of the `wait()` method gets called BEFORE the life - default: `true` _RECOMMENDED!_ - `$propagateSignals` - list of signals that should be propagated to the child-processes - - default: [`SIGTERM`, `SIGHUP`, `SIGALRM`, `SIGUSR1`, `SIGUSR2`] + - default signals: + - `SIGTERM` graceful exit request by the system or user + - `SIGINT` user interrupts the execution (e.g. `ctrl` + `c` in the terminal) + - `SIGHUP` usually used to request a config reload + - `SIGALRM` usually used for timeout management + - `SIGUSR1` custom signal 1 + - `SIGUSR2` custom signal 2 + - please note that `SIGCHLD` can NOT be propagated due to how the process-manager internally handles this signal - `$processFactory` - factory instance that should be used to create the process models - default: `Sweikenb\Library\Pcntl\Factory\ProcessFactory` diff --git a/docs/src/features/process-queue.md b/docs/src/features/process-queue.md index 841fed6..f16bb53 100644 --- a/docs/src/features/process-queue.md +++ b/docs/src/features/process-queue.md @@ -18,6 +18,9 @@ $queue = new ProcessQueue($maxThreads); for ($i = 0; $i < 100; $i++) { $queue->addToQueue(fn() => sleep(3)); } + +// wait until the whole queue is done +$queue->wait(); ``` ## Settings diff --git a/src/Api/ProcessQueueInterface.php b/src/Api/ProcessQueueInterface.php index 4da313f..c485754 100644 --- a/src/Api/ProcessQueueInterface.php +++ b/src/Api/ProcessQueueInterface.php @@ -9,4 +9,24 @@ interface ProcessQueueInterface * If you specify an $output it will win over the output of the parent process. */ public function addToQueue(callable $callback, ?ProcessOutputInterface $output = null): ChildProcessInterface; + + /** + * Handles the internal thread count and dispatches the wait call to the process-manager. + */ + public function wait(?callable $callback = null): void; + + /** + * Returns the number of active threads. Might be zero if no tasks are scheduled. + */ + public function getThreadCounter(): int; + + /** + * Returns the maximum number of threads to spawn. Can not be less than one. + */ + public function getMaxThreads(): int; + + /** + * Returns the process-manager used for handling this queue. + */ + public function getProcessManager(): ProcessManagerInterface; } diff --git a/src/ProcessManager.php b/src/ProcessManager.php index 106b70e..ce94131 100644 --- a/src/ProcessManager.php +++ b/src/ProcessManager.php @@ -1,4 +1,4 @@ - + * @var int[] */ - private array $earlyExitChildQueue = []; + private array $childExitQueue = []; private bool $isChildProcess = false; /** * @var array @@ -60,20 +61,12 @@ public function __construct( ? self::PROPAGATE_SIGNALS : $propagateSignals; - // register a signale queue for early exit children - pcntl_async_signals(false); - pcntl_signal(SIGCHLD, [$this, "childEarlyExitQueue"]); - // register the signal-handler for each signal that should be handled - foreach ($propagateSignals as $handleSignal) { - pcntl_signal( - $handleSignal, - function (int $dispatchSignal) { - foreach ($this->childProcesses as $childProcess) { - @posix_kill($childProcess->getId(), $dispatchSignal); - } - } - ); + // we need to make sure we handle early child exists too, so add this signal no matter what + $propagateSignals[] = SIGCHLD; + pcntl_async_signals(false); + foreach (array_unique($propagateSignals) as $handleSignal) { + pcntl_signal($handleSignal, [$this, 'handleSignal']); } // prevent zombie apocalypse @@ -81,33 +74,45 @@ function (int $dispatchSignal) { function () use ($autoWait) { if ($autoWait) { $this->wait(); - } else { - if (!empty($this->childProcesses)) { - foreach ($this->childProcesses as $childProcess) { - $this->processOutput->stderr( - sprintf( - "[PCNTL ProcessManager] Forcing child process exit for pid %s\n", - $childProcess->getId() - ) - ); - @posix_kill($childProcess->getId(), SIGKILL); - } - $this->wait(); - - // In case we had to force a child kill, exit with the exit code 125 (operation canceled) - exit(125); - } + } + if (!empty($this->childProcesses)) { + $this->sendSignalToChildren( + SIGKILL, + fn(ChildProcessInterface $childProcess) => $this->processOutput->stderr( + sprintf( + "[PCNTL ProcessManager] Forcing child process exit for pid %s\n", + $childProcess->getId() + ) + ) + ); + $this->wait(); + exit(1); } } ); } - public function childEarlyExitQueue(): void + public function handleSignal(int $signal): void { - if (!$this->isChildProcess) { + if ($this->isChildProcess) { + return; + } + if ($signal === SIGCHLD) { while (($pid = pcntl_waitpid(-1, $status, WNOHANG)) > 0) { - $this->earlyExitChildQueue[$pid] = [$pid, pcntl_wexitstatus($status)]; + $this->childExitQueue[$pid] = pcntl_wexitstatus($status); } + } else { + $this->sendSignalToChildren($signal); + } + } + + public function sendSignalToChildren(int $signal, ?callable $callback = null): void + { + foreach ($this->childProcesses as $childProcess) { + if ($callback) { + call_user_func($callback, $childProcess); + } + @posix_kill($childProcess->getId(), $signal); } } @@ -145,6 +150,7 @@ public function runProcess(callable $callback, ?ProcessOutputInterface $output = foreach ($this->onThreadCreated as $callback) { call_user_func($callback, $childProcess); } + return $childProcess; } @@ -181,49 +187,35 @@ public function wait(?callable $callback = null): void array_unshift($callbackStack, $callback); } - $handleChildExit = function (int $pid, int $status) use ($callbackStack): bool { - $continueWait = true; - if ($pid > 0) { - if (isset($this->childProcesses[$pid])) { - unset($this->childProcesses[$pid]); - } - if (isset($this->earlyExitChildQueue[$pid])) { - unset($this->earlyExitChildQueue[$pid]); - } - foreach ($callbackStack as $callback) { - if (call_user_func($callback, $status, $pid) === false) { - $continueWait = false; + // wait for all children to exit + while (!empty($this->childProcesses)) { + // process the exit-queue + foreach ($this->childExitQueue as $pid => $status) { + if ($pid > 0) { + unset($this->childExitQueue[$pid], $this->childProcesses[$pid]); + foreach ($callbackStack as $callback) { + if (call_user_func($callback, $status, $pid) === false) { + return; + } } } } - return $continueWait; - }; - - // run the callback for all early exit children no matter what - $waitForMoreToExit = true; - while (!empty($this->earlyExitChildQueue)) { - [$pid, $status] = current($this->earlyExitChildQueue); - $waitForMoreToExit = $waitForMoreToExit && $handleChildExit($pid, $status); - } - - // only wait for the regular children if desired - while ($waitForMoreToExit && !empty($this->childProcesses)) { - $pid = pcntl_wait($status); - if (!$handleChildExit($pid, $status)) { - return; - } + usleep(5000); + pcntl_signal_dispatch(); } } public function onThreadCreate(callable $callback): self { $this->onThreadCreated[] = $callback; + return $this; } public function onThreadExit(callable $callback): self { $this->onThreadExit[] = $callback; + return $this; } } diff --git a/src/ProcessOutput.php b/src/ProcessOutput.php index 0282248..d33d309 100644 --- a/src/ProcessOutput.php +++ b/src/ProcessOutput.php @@ -1,4 +1,4 @@ -maxThreads = max(1, $maxThreads); } + public function getThreadCounter(): int + { + return $this->threadCounter; + } + + public function getMaxThreads(): int + { + return $this->maxThreads; + } + + public function getProcessManager(): ProcessManagerInterface + { + return $this->processManager; + } + public function addToQueue(callable $callback, ?ProcessOutputInterface $output = null): ChildProcessInterface { while ($this->threadCounter >= $this->maxThreads) { $this->processManager->wait(fn() => --$this->threadCounter >= $this->maxThreads); } $this->threadCounter++; + return $this->processManager->runProcess($callback, $output); } + + public function wait(?callable $callback = null): void + { + $this->processManager->wait(function () use ($callback) { + --$this->threadCounter; + if ($callback) { + call_user_func($callback); + } + }); + } } diff --git a/tests/ProcessQueueTest.php b/tests/ProcessQueueTest.php index c8242fe..dd067cd 100644 --- a/tests/ProcessQueueTest.php +++ b/tests/ProcessQueueTest.php @@ -8,27 +8,32 @@ class ProcessQueueTest extends TestCase { + const TEST_MAX_THREADS = 4; + /** * @covers \Sweikenb\Library\Pcntl\ProcessQueue::addToQueue */ public function testAddToQueue(): void { - $active = 0; - $maxThreads = 4; - $pm = new ProcessManager(); - $pm->onThreadExit(function () use ($maxThreads, &$active) { - $active--; - $this->assertLessThanOrEqual($maxThreads, $active); + $queue = new ProcessQueue(self::TEST_MAX_THREADS, $pm); + + $pm->onThreadCreate(function () use ($queue) { + $this->assertLessThanOrEqual(self::TEST_MAX_THREADS, $queue->getThreadCounter()); + }); + $pm->onThreadExit(function () use ($queue) { + $this->assertLessThanOrEqual(self::TEST_MAX_THREADS, $queue->getThreadCounter()); }); - $queue = new ProcessQueue($maxThreads, $pm); for ($i = 0; $i < 20; $i++) { - $active++; $queue->addToQueue(fn() => 'test'); } - $pm->wait(function () use ($maxThreads, &$active) { - $this->assertLessThanOrEqual($maxThreads, $active); + + $queue->wait(function () use ($queue) { + $this->assertLessThanOrEqual(self::TEST_MAX_THREADS, $queue->getThreadCounter()); }); + + $this->assertSame(self::TEST_MAX_THREADS, $queue->getMaxThreads()); + $this->assertSame(0, $queue->getThreadCounter()); } }