Skip to content

Commit

Permalink
#11 proper posix signal handling and php8.3 test-support (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
sweikenb committed Jul 20, 2024
1 parent 792d5dd commit ad3b9e2
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 85 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name: Build Documentation
on:
# Runs on pushes targeting the default branch
push:
branches: ["main", "cleanup-and-tests"]
branches: ["main"]

# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/phpunit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ jobs:
matrix:
php-version:
- "8.2"
- "8.3"
dependency-versions:
- "lowest"
- "highest"
Expand Down
22 changes: 21 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,32 @@
# Changelog

## Release [v7.0.0](https://github.com/sweikenb/pcntl/releases/tag/v7.0.0)

**Bugfixes**

- Proper signal handling and propagation #11

**Features**

- Introduced a `wait()`-function for the `ProcessQueue` itself which should be used when working with queues instead
of using the `wait()` method of the `ProcessManager` itself.
- Adding PHP 8.3 support to phpunit test-matrix

**Breaking Changes**

- NOTE: The POSIX signal handling fix might affect the order in which callbacks will be called.
For the most part, this should not change the functionality of your application, but just to make sure nothing breaks
unexpectedly, this is the reason for the major version bump instead of just a feature-release.

* * *

## Release [v6.0.0](https://github.com/sweikenb/pcntl/releases/tag/v6.0.0)

**Bugfixes**

- Sending IPC messages will now honor the returned bytes of the written buffer correctly

- **Features**
**Features**

- `ProcessOutput` allows to modify the console output beside the default `STDOUT` and `STDERR`
- Unit and functional tests added using PHPUnit and GitHub actions
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
}
],
"require": {
"php": ">=8.2",
"php": "^8.2",
"ext-pcntl": "*",
"ext-posix": "*",
"ext-sockets": "*"
Expand Down
2 changes: 1 addition & 1 deletion docs/src/common-pitfalls-and-workarounds.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ $results = $connection->getReults();
$connection->close();
foreach ($results as $result) {
$pm->runProcess(function () use ($result) {
// close connection
// re-open connection
$connection = new Connection();

// TODO process data
Expand Down
9 changes: 8 additions & 1 deletion docs/src/features/process-manager.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,14 @@ Please note that the callback of the `wait()` method gets called BEFORE the life
- default: `true` _RECOMMENDED!_
- `$propagateSignals`
- list of signals that should be propagated to the child-processes
- default: [`SIGTERM`, `SIGHUP`, `SIGALRM`, `SIGUSR1`, `SIGUSR2`]
- default signals:
- `SIGTERM` graceful exit request by the system or user
- `SIGINT` user interrupts the execution (e.g. `ctrl` + `c` in the terminal)
- `SIGHUP` usually used to request a config reload
- `SIGALRM` usually used for timeout management
- `SIGUSR1` custom signal 1
- `SIGUSR2` custom signal 2
- please note that `SIGCHLD` can NOT be propagated due to how the process-manager internally handles this signal
- `$processFactory`
- factory instance that should be used to create the process models
- default: `Sweikenb\Library\Pcntl\Factory\ProcessFactory`
Expand Down
3 changes: 3 additions & 0 deletions docs/src/features/process-queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ $queue = new ProcessQueue($maxThreads);
for ($i = 0; $i < 100; $i++) {
$queue->addToQueue(fn() => sleep(3));
}

// wait until the whole queue is done
$queue->wait();
```

## Settings
Expand Down
5 changes: 5 additions & 0 deletions docs/src/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ Install the latest version using [composer](https://getcomposer.org/):
composer require sweikenb/pcntl
```

## Changelog

Please consult the [CHANGELOG.md](https://github.com/sweikenb/pcntl/blob/main/CHANGELOG.md) for latest update
information.

## System Requirements

This library requires at least **PHP v8.2** with the following extensions enabled:
Expand Down
20 changes: 20 additions & 0 deletions src/Api/ProcessQueueInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,24 @@ interface ProcessQueueInterface
* If you specify an $output it will win over the output of the parent process.
*/
public function addToQueue(callable $callback, ?ProcessOutputInterface $output = null): ChildProcessInterface;

/**
* Handles the internal thread count and dispatches the wait call to the process-manager.
*/
public function wait(?callable $callback = null): void;

/**
* Returns the number of active threads. Might be zero if no tasks are scheduled.
*/
public function getThreadCounter(): int;

/**
* Returns the maximum number of threads to spawn. Can not be less than one.
*/
public function getMaxThreads(): int;

/**
* Returns the process-manager used for handling this queue.
*/
public function getProcessManager(): ProcessManagerInterface;
}
128 changes: 60 additions & 68 deletions src/ProcessManager.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?php declare(strict_types=1);
<?php declare(strict_types=1, ticks=1);

namespace Sweikenb\Library\Pcntl;

Expand All @@ -15,11 +15,12 @@
class ProcessManager implements ProcessManagerInterface
{
const PROPAGATE_SIGNALS = [
SIGTERM,
SIGHUP,
SIGALRM,
SIGUSR1,
SIGUSR2
SIGTERM, // exit request
SIGINT, // ctrl + c
SIGHUP, // reload config
SIGALRM, // alarm
SIGUSR1, // custom 1
SIGUSR2, // custom 2
];
private ProcessFactoryInterface $processFactory;
private ProcessOutputInterface $processOutput;
Expand All @@ -29,9 +30,9 @@ class ProcessManager implements ProcessManagerInterface
*/
private array $childProcesses = [];
/**
* @var array<int, int>
* @var int[]
*/
private array $earlyExitChildQueue = [];
private array $childExitQueue = [];
private bool $isChildProcess = false;
/**
* @var array<int, callable>
Expand Down Expand Up @@ -60,54 +61,58 @@ public function __construct(
? self::PROPAGATE_SIGNALS
: $propagateSignals;

// register a signale queue for early exit children
pcntl_async_signals(false);
pcntl_signal(SIGCHLD, [$this, "childEarlyExitQueue"]);

// register the signal-handler for each signal that should be handled
foreach ($propagateSignals as $handleSignal) {
pcntl_signal(
$handleSignal,
function (int $dispatchSignal) {
foreach ($this->childProcesses as $childProcess) {
@posix_kill($childProcess->getId(), $dispatchSignal);
}
}
);
// we need to make sure we handle early child exists too, so add this signal no matter what
$propagateSignals[] = SIGCHLD;
pcntl_async_signals(false);
foreach (array_unique($propagateSignals) as $handleSignal) {
pcntl_signal($handleSignal, [$this, 'handleSignal']);
}

// prevent zombie apocalypse
register_shutdown_function(
function () use ($autoWait) {
if ($autoWait) {
$this->wait();
} else {
if (!empty($this->childProcesses)) {
foreach ($this->childProcesses as $childProcess) {
$this->processOutput->stderr(
sprintf(
"[PCNTL ProcessManager] Forcing child process exit for pid %s\n",
$childProcess->getId()
)
);
@posix_kill($childProcess->getId(), SIGKILL);
}
$this->wait();

// In case we had to force a child kill, exit with the exit code 125 (operation canceled)
exit(125);
}
}
if (!empty($this->childProcesses)) {
$this->sendSignalToChildren(
SIGKILL,
fn(ChildProcessInterface $childProcess) => $this->processOutput->stderr(
sprintf(
"[PCNTL ProcessManager] Forcing child process exit for pid %s\n",
$childProcess->getId()
)
)
);
$this->wait();
exit(1);
}
}
);
}

public function childEarlyExitQueue(): void
public function handleSignal(int $signal): void
{
if (!$this->isChildProcess) {
if ($this->isChildProcess) {
return;
}
if ($signal === SIGCHLD) {
while (($pid = pcntl_waitpid(-1, $status, WNOHANG)) > 0) {
$this->earlyExitChildQueue[$pid] = [$pid, pcntl_wexitstatus($status)];
$this->childExitQueue[$pid] = pcntl_wexitstatus($status);
}
} else {
$this->sendSignalToChildren($signal);
}
}

public function sendSignalToChildren(int $signal, ?callable $callback = null): void
{
foreach ($this->childProcesses as $childProcess) {
if ($callback) {
call_user_func($callback, $childProcess);
}
@posix_kill($childProcess->getId(), $signal);
}
}

Expand Down Expand Up @@ -145,6 +150,7 @@ public function runProcess(callable $callback, ?ProcessOutputInterface $output =
foreach ($this->onThreadCreated as $callback) {
call_user_func($callback, $childProcess);
}

return $childProcess;
}

Expand Down Expand Up @@ -181,49 +187,35 @@ public function wait(?callable $callback = null): void
array_unshift($callbackStack, $callback);
}

$handleChildExit = function (int $pid, int $status) use ($callbackStack): bool {
$continueWait = true;
if ($pid > 0) {
if (isset($this->childProcesses[$pid])) {
unset($this->childProcesses[$pid]);
}
if (isset($this->earlyExitChildQueue[$pid])) {
unset($this->earlyExitChildQueue[$pid]);
}
foreach ($callbackStack as $callback) {
if (call_user_func($callback, $status, $pid) === false) {
$continueWait = false;
// wait for all children to exit
while (!empty($this->childProcesses)) {
// process the exit-queue
foreach ($this->childExitQueue as $pid => $status) {
if ($pid > 0) {
unset($this->childExitQueue[$pid], $this->childProcesses[$pid]);
foreach ($callbackStack as $callback) {
if (call_user_func($callback, $status, $pid) === false) {
return;
}
}
}
}
return $continueWait;
};

// run the callback for all early exit children no matter what
$waitForMoreToExit = true;
while (!empty($this->earlyExitChildQueue)) {
[$pid, $status] = current($this->earlyExitChildQueue);
$waitForMoreToExit = $waitForMoreToExit && $handleChildExit($pid, $status);
}

// only wait for the regular children if desired
while ($waitForMoreToExit && !empty($this->childProcesses)) {
$pid = pcntl_wait($status);
if (!$handleChildExit($pid, $status)) {
return;
}
usleep(5000);
pcntl_signal_dispatch();
}
}

public function onThreadCreate(callable $callback): self
{
$this->onThreadCreated[] = $callback;

return $this;
}

public function onThreadExit(callable $callback): self
{
$this->onThreadExit[] = $callback;

return $this;
}
}
2 changes: 1 addition & 1 deletion src/ProcessOutput.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?php
<?php declare(strict_types=1, ticks=1);

namespace Sweikenb\Library\Pcntl;

Expand Down
28 changes: 27 additions & 1 deletion src/ProcessQueue.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?php
<?php declare(strict_types=1, ticks=1);

namespace Sweikenb\Library\Pcntl;

Expand All @@ -21,12 +21,38 @@ public function __construct(
$this->maxThreads = max(1, $maxThreads);
}

public function getThreadCounter(): int
{
return $this->threadCounter;
}

public function getMaxThreads(): int
{
return $this->maxThreads;
}

public function getProcessManager(): ProcessManagerInterface
{
return $this->processManager;
}

public function addToQueue(callable $callback, ?ProcessOutputInterface $output = null): ChildProcessInterface
{
while ($this->threadCounter >= $this->maxThreads) {
$this->processManager->wait(fn() => --$this->threadCounter >= $this->maxThreads);
}
$this->threadCounter++;

return $this->processManager->runProcess($callback, $output);
}

public function wait(?callable $callback = null): void
{
$this->processManager->wait(function () use ($callback) {
--$this->threadCounter;
if ($callback) {
call_user_func($callback);
}
});
}
}
Loading

0 comments on commit ad3b9e2

Please sign in to comment.