Skip to content

Commit

Permalink
#11 properly handle posix signals in the process- and queue-manager
Browse files Browse the repository at this point in the history
  • Loading branch information
sweikenb committed Jul 20, 2024
1 parent 792d5dd commit ac24385
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 82 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ name: Build Documentation
on:
# Runs on pushes targeting the default branch
push:
branches: ["main", "cleanup-and-tests"]
branches: ["main"]

# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
Expand Down
9 changes: 8 additions & 1 deletion docs/src/features/process-manager.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,14 @@ Please note that the callback of the `wait()` method gets called BEFORE the life
- default: `true` _RECOMMENDED!_
- `$propagateSignals`
- list of signals that should be propagated to the child-processes
- default: [`SIGTERM`, `SIGHUP`, `SIGALRM`, `SIGUSR1`, `SIGUSR2`]
- default signals:
- `SIGTERM` graceful exit request by the system or user
- `SIGINT` user interrupts the execution (e.g. `ctrl` + `c` in the terminal)
- `SIGHUP` usually used to request a config reload
- `SIGALRM` usually used for timeout management
- `SIGUSR1` custom signal 1
- `SIGUSR2` custom signal 2
- please note that `SIGCHLD` can NOT be propagated due to how the process-manager internally handles this signal
- `$processFactory`
- factory instance that should be used to create the process models
- default: `Sweikenb\Library\Pcntl\Factory\ProcessFactory`
Expand Down
3 changes: 3 additions & 0 deletions docs/src/features/process-queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ $queue = new ProcessQueue($maxThreads);
for ($i = 0; $i < 100; $i++) {
$queue->addToQueue(fn() => sleep(3));
}

// wait until the whole queue is done
$queue->wait();
```

## Settings
Expand Down
20 changes: 20 additions & 0 deletions src/Api/ProcessQueueInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,24 @@ interface ProcessQueueInterface
* If you specify an $output it will win over the output of the parent process.
*/
public function addToQueue(callable $callback, ?ProcessOutputInterface $output = null): ChildProcessInterface;

/**
* Handles the internal thread count and dispatches the wait call to the process-manager.
*/
public function wait(?callable $callback = null): void;

/**
* Returns the number of active threads. Might be zero if no tasks are scheduled.
*/
public function getThreadCounter(): int;

/**
* Returns the maximum number of threads to spawn. Can not be less than one.
*/
public function getMaxThreads(): int;

/**
* Returns the process-manager used for handling this queue.
*/
public function getProcessManager(): ProcessManagerInterface;
}
128 changes: 60 additions & 68 deletions src/ProcessManager.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?php declare(strict_types=1);
<?php declare(strict_types=1, ticks=1);

namespace Sweikenb\Library\Pcntl;

Expand All @@ -15,11 +15,12 @@
class ProcessManager implements ProcessManagerInterface
{
const PROPAGATE_SIGNALS = [
SIGTERM,
SIGHUP,
SIGALRM,
SIGUSR1,
SIGUSR2
SIGTERM, // exit request
SIGINT, // ctrl + c
SIGHUP, // reload config
SIGALRM, // alarm
SIGUSR1, // custom 1
SIGUSR2, // custom 2
];
private ProcessFactoryInterface $processFactory;
private ProcessOutputInterface $processOutput;
Expand All @@ -29,9 +30,9 @@ class ProcessManager implements ProcessManagerInterface
*/
private array $childProcesses = [];
/**
* @var array<int, int>
* @var int[]
*/
private array $earlyExitChildQueue = [];
private array $childExitQueue = [];
private bool $isChildProcess = false;
/**
* @var array<int, callable>
Expand Down Expand Up @@ -60,54 +61,58 @@ public function __construct(
? self::PROPAGATE_SIGNALS
: $propagateSignals;

// register a signale queue for early exit children
pcntl_async_signals(false);
pcntl_signal(SIGCHLD, [$this, "childEarlyExitQueue"]);

// register the signal-handler for each signal that should be handled
foreach ($propagateSignals as $handleSignal) {
pcntl_signal(
$handleSignal,
function (int $dispatchSignal) {
foreach ($this->childProcesses as $childProcess) {
@posix_kill($childProcess->getId(), $dispatchSignal);
}
}
);
// we need to make sure we handle early child exists too, so add this signal no matter what
$propagateSignals[] = SIGCHLD;
pcntl_async_signals(false);
foreach (array_unique($propagateSignals) as $handleSignal) {
pcntl_signal($handleSignal, [$this, 'handleSignal']);
}

// prevent zombie apocalypse
register_shutdown_function(
function () use ($autoWait) {
if ($autoWait) {
$this->wait();
} else {
if (!empty($this->childProcesses)) {
foreach ($this->childProcesses as $childProcess) {
$this->processOutput->stderr(
sprintf(
"[PCNTL ProcessManager] Forcing child process exit for pid %s\n",
$childProcess->getId()
)
);
@posix_kill($childProcess->getId(), SIGKILL);
}
$this->wait();

// In case we had to force a child kill, exit with the exit code 125 (operation canceled)
exit(125);
}
}
if (!empty($this->childProcesses)) {
$this->sendSignalToChildren(
SIGKILL,
fn(ChildProcessInterface $childProcess) => $this->processOutput->stderr(
sprintf(
"[PCNTL ProcessManager] Forcing child process exit for pid %s\n",
$childProcess->getId()
)
)
);
$this->wait();
exit(1);
}
}
);
}

public function childEarlyExitQueue(): void
public function handleSignal(int $signal): void
{
if (!$this->isChildProcess) {
if ($this->isChildProcess) {
return;
}
if ($signal === SIGCHLD) {
while (($pid = pcntl_waitpid(-1, $status, WNOHANG)) > 0) {
$this->earlyExitChildQueue[$pid] = [$pid, pcntl_wexitstatus($status)];
$this->childExitQueue[$pid] = pcntl_wexitstatus($status);
}
} else {
$this->sendSignalToChildren($signal);
}
}

public function sendSignalToChildren(int $signal, ?callable $callback = null): void
{
foreach ($this->childProcesses as $childProcess) {
if ($callback) {
call_user_func($callback, $childProcess);
}
@posix_kill($childProcess->getId(), $signal);
}
}

Expand Down Expand Up @@ -145,6 +150,7 @@ public function runProcess(callable $callback, ?ProcessOutputInterface $output =
foreach ($this->onThreadCreated as $callback) {
call_user_func($callback, $childProcess);
}

return $childProcess;
}

Expand Down Expand Up @@ -181,49 +187,35 @@ public function wait(?callable $callback = null): void
array_unshift($callbackStack, $callback);
}

$handleChildExit = function (int $pid, int $status) use ($callbackStack): bool {
$continueWait = true;
if ($pid > 0) {
if (isset($this->childProcesses[$pid])) {
unset($this->childProcesses[$pid]);
}
if (isset($this->earlyExitChildQueue[$pid])) {
unset($this->earlyExitChildQueue[$pid]);
}
foreach ($callbackStack as $callback) {
if (call_user_func($callback, $status, $pid) === false) {
$continueWait = false;
// wait for all children to exit
while (!empty($this->childProcesses)) {
// process the exit-queue
foreach ($this->childExitQueue as $pid => $status) {
if ($pid > 0) {
unset($this->childExitQueue[$pid], $this->childProcesses[$pid]);
foreach ($callbackStack as $callback) {
if (call_user_func($callback, $status, $pid) === false) {
return;
}
}
}
}
return $continueWait;
};

// run the callback for all early exit children no matter what
$waitForMoreToExit = true;
while (!empty($this->earlyExitChildQueue)) {
[$pid, $status] = current($this->earlyExitChildQueue);
$waitForMoreToExit = $waitForMoreToExit && $handleChildExit($pid, $status);
}

// only wait for the regular children if desired
while ($waitForMoreToExit && !empty($this->childProcesses)) {
$pid = pcntl_wait($status);
if (!$handleChildExit($pid, $status)) {
return;
}
usleep(5000);
pcntl_signal_dispatch();
}
}

public function onThreadCreate(callable $callback): self
{
$this->onThreadCreated[] = $callback;

return $this;
}

public function onThreadExit(callable $callback): self
{
$this->onThreadExit[] = $callback;

return $this;
}
}
2 changes: 1 addition & 1 deletion src/ProcessOutput.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?php
<?php declare(strict_types=1, ticks=1);

namespace Sweikenb\Library\Pcntl;

Expand Down
28 changes: 27 additions & 1 deletion src/ProcessQueue.php
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<?php
<?php declare(strict_types=1, ticks=1);

namespace Sweikenb\Library\Pcntl;

Expand All @@ -21,12 +21,38 @@ public function __construct(
$this->maxThreads = max(1, $maxThreads);
}

public function getThreadCounter(): int
{
return $this->threadCounter;
}

public function getMaxThreads(): int
{
return $this->maxThreads;
}

public function getProcessManager(): ProcessManagerInterface
{
return $this->processManager;
}

public function addToQueue(callable $callback, ?ProcessOutputInterface $output = null): ChildProcessInterface
{
while ($this->threadCounter >= $this->maxThreads) {
$this->processManager->wait(fn() => --$this->threadCounter >= $this->maxThreads);
}
$this->threadCounter++;

return $this->processManager->runProcess($callback, $output);
}

public function wait(?callable $callback = null): void
{
$this->processManager->wait(function () use ($callback) {
--$this->threadCounter;
if ($callback) {
call_user_func($callback);
}
});
}
}
25 changes: 15 additions & 10 deletions tests/ProcessQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,32 @@

class ProcessQueueTest extends TestCase
{
const TEST_MAX_THREADS = 4;

/**
* @covers \Sweikenb\Library\Pcntl\ProcessQueue::addToQueue
*/
public function testAddToQueue(): void
{
$active = 0;
$maxThreads = 4;

$pm = new ProcessManager();
$pm->onThreadExit(function () use ($maxThreads, &$active) {
$active--;
$this->assertLessThanOrEqual($maxThreads, $active);
$queue = new ProcessQueue(self::TEST_MAX_THREADS, $pm);

$pm->onThreadCreate(function () use ($queue) {
$this->assertLessThanOrEqual(self::TEST_MAX_THREADS, $queue->getThreadCounter());
});
$pm->onThreadExit(function () use ($queue) {
$this->assertLessThanOrEqual(self::TEST_MAX_THREADS, $queue->getThreadCounter());
});

$queue = new ProcessQueue($maxThreads, $pm);
for ($i = 0; $i < 20; $i++) {
$active++;
$queue->addToQueue(fn() => 'test');
}
$pm->wait(function () use ($maxThreads, &$active) {
$this->assertLessThanOrEqual($maxThreads, $active);

$queue->wait(function () use ($queue) {
$this->assertLessThanOrEqual(self::TEST_MAX_THREADS, $queue->getThreadCounter());
});

$this->assertSame(self::TEST_MAX_THREADS, $queue->getMaxThreads());
$this->assertSame(0, $queue->getThreadCounter());
}
}

0 comments on commit ac24385

Please sign in to comment.