diff --git a/README.md b/README.md index aaa0ed3..cc85a86 100644 --- a/README.md +++ b/README.md @@ -553,6 +553,24 @@ $taskmaster->autoDetectWorkers(4, false); $taskmaster->autoDetectWorkers(4, true, false); ``` +### Init tasks + +You can define tasks that are executed on every worker instance before the first task is executed. +This is helpful to run some initial setup or (in case of the [`ForkWorker`](src/Environment/Fork/ForkWorker.php)) +to clear some variables that are inherited from the parent process, e.g. database connections. + +```php +// init tasks are always provided by a task factory +$taskmaster->setDefaultInitTaskFactory(new InitTaskFactory()); + +// but taskmaster can create task factories automatically by cloning or instancing a task +$taskmaster->setDefaultInitTask(new InitTask()); +$taskmaster->setDefaultInitTask(InitTask::class); + +// you can also define a task factory for a specific worker +$worker->setInitTaskFactory(new InitTaskFactory()); +``` + ## Running tasks After writing your tasks, creating them and defining the workers, you can start running the tasks. diff --git a/src/Task/CloneTaskFactory.php b/src/Task/CloneTaskFactory.php new file mode 100644 index 0000000..a62511b --- /dev/null +++ b/src/Task/CloneTaskFactory.php @@ -0,0 +1,35 @@ +limit !== null && $this->count >= $this->limit) { + return null; + } + $this->count++; + return clone $this->task; + } +} \ No newline at end of file diff --git a/src/Task/InstanceTaskFactory.php b/src/Task/InstanceTaskFactory.php new file mode 100644 index 0000000..b0f3ba5 --- /dev/null +++ b/src/Task/InstanceTaskFactory.php @@ -0,0 +1,37 @@ + $taskClass + * @param int|null $limit + */ + public function __construct(protected string $taskClass, protected ?int $limit = null) + { + } + + /** + * @inheritDoc + */ + public function createNextTask(?string $group): ?TaskInterface + { + if ($this->limit !== null && $this->count >= $this->limit) { + return null; + } + $this->count++; + return new $this->taskClass(); + } +} \ No newline at end of file diff --git a/src/Taskmaster.php b/src/Taskmaster.php index 1ee5fd1..4b84da2 100644 --- a/src/Taskmaster.php +++ b/src/Taskmaster.php @@ -12,6 +12,8 @@ use Aternos\Taskmaster\Proxy\ProcessProxy; use Aternos\Taskmaster\Proxy\ProxyInterface; use Aternos\Taskmaster\Proxy\ProxyStatus; +use Aternos\Taskmaster\Task\CloneTaskFactory; +use Aternos\Taskmaster\Task\InstanceTaskFactory; use Aternos\Taskmaster\Task\TaskFactoryInterface; use Aternos\Taskmaster\Task\TaskInterface; use Aternos\Taskmaster\Worker\SocketWorkerInterface; @@ -64,6 +66,7 @@ class Taskmaster protected TaskmasterOptions $options; protected float $defaultTaskTimeout = 0; + protected ?TaskFactoryInterface $initTaskFactory = null; /** * Taskmaster constructor @@ -389,6 +392,7 @@ public function addWorker(WorkerInterface $worker): static } $worker->setOptionsIfNecessary($this->options); + $worker->setInitTaskFactoryIfNecessary($this->initTaskFactory); $this->workers[] = $worker; return $this; } @@ -561,6 +565,44 @@ public function setDefaultTaskTimeout(float $timeout): static return $this; } + /** + * Set the default init task factory for all workers + * + * The init task factory produces tasks that are executed once as first task on every worker instance + * to initialize the worker. + * + * You can also set the init task factory for each worker individually with {@link WorkerInterface::setInitTaskFactory()}. + * + * @param TaskFactoryInterface|null $initTaskFactory + * @return $this + */ + public function setDefaultInitTaskFactory(?TaskFactoryInterface $initTaskFactory): static + { + $this->initTaskFactory = $initTaskFactory; + return $this; + } + + /** + * Set the default init task for all workers + * + * The init task is executed once as first task on every worker instance. + * This automatically creates a task factory that produces the init tasks. + * + * If you pass an object, the task will be cloned for each worker instance, if you pass + * a class name, a new instance of the class will be created for each worker instance. + * + * @param TaskInterface|class-string $task + * @return void + */ + public function setDefaultInitTask(TaskInterface|string $task): void + { + if (is_string($task)) { + $this->initTaskFactory = new InstanceTaskFactory($task); + } else { + $this->initTaskFactory = new CloneTaskFactory($task); + } + } + /** * Get the current {@link TaskmasterOptions} instance * diff --git a/src/Worker/Worker.php b/src/Worker/Worker.php index ef8171a..e358903 100644 --- a/src/Worker/Worker.php +++ b/src/Worker/Worker.php @@ -5,6 +5,7 @@ use Aternos\Taskmaster\Proxy\ProxiedSocket; use Aternos\Taskmaster\Proxy\ProxyInterface; use Aternos\Taskmaster\Proxy\ProxyStatus; +use Aternos\Taskmaster\Task\TaskFactoryInterface; use Aternos\Taskmaster\Task\TaskInterface; use Aternos\Taskmaster\TaskmasterOptions; use Aternos\Taskmaster\Worker\Instance\ProxyableWorkerInstanceInterface; @@ -32,6 +33,8 @@ abstract class Worker implements WorkerInterface protected ?ProxyInterface $proxy = null; protected bool $instanceStarted = false; protected ?TaskInterface $queuedTask = null; + protected ?TaskFactoryInterface $initTaskFactory = null; + protected ?TaskInterface $initTask = null; /** * @inheritDoc @@ -53,6 +56,26 @@ public function setOptionsIfNecessary(TaskmasterOptions $options): static return $this; } + /** + * @inheritDoc + */ + public function setInitTaskFactory(?TaskFactoryInterface $initTaskFactory): static + { + $this->initTaskFactory = $initTaskFactory; + return $this; + } + + /** + * @inheritDoc + */ + public function setInitTaskFactoryIfNecessary(?TaskFactoryInterface $initTaskFactory): static + { + if ($this->initTaskFactory === null) { + $this->setInitTaskFactory($initTaskFactory); + } + return $this; + } + /** * Get the current instance or create a new one if necessary * @@ -86,6 +109,11 @@ protected function startInstance(): void { $this->instanceStarted = true; $instance = $this->getInstance(); + + if ($this->initTaskFactory) { + $this->initTask = $this->initTaskFactory->createNextTask(null); + } + if (!$this->proxy) { $instance->init()->start(); return; @@ -115,7 +143,10 @@ public function update(): static } $instance->update(); if ($instance->getStatus() === WorkerInstanceStatus::IDLE) { - if ($this->queuedTask) { + if ($this->initTask) { + $instance->runTask($this->initTask); + $this->initTask = null; + } elseif ($this->queuedTask) { $instance->runTask($this->queuedTask); $this->queuedTask = null; } else { diff --git a/src/Worker/WorkerInterface.php b/src/Worker/WorkerInterface.php index 5b7346c..7894db1 100644 --- a/src/Worker/WorkerInterface.php +++ b/src/Worker/WorkerInterface.php @@ -4,6 +4,7 @@ use Aternos\Taskmaster\Proxy\ProxyInterface; use Aternos\Taskmaster\Runtime\RuntimeInterface; +use Aternos\Taskmaster\Task\TaskFactoryInterface; use Aternos\Taskmaster\Task\TaskInterface; use Aternos\Taskmaster\Taskmaster; use Aternos\Taskmaster\TaskmasterOptions; @@ -62,6 +63,28 @@ public function setOptions(TaskmasterOptions $options): static; */ public function setOptionsIfNecessary(TaskmasterOptions $options): static; + /** + * Set the init task factory for the worker + * + * Init tasks are executed once as first task on every worker instance to initialize the worker. + * + * @param TaskFactoryInterface|null $initTaskFactory + * @return $this + */ + public function setInitTaskFactory(?TaskFactoryInterface $initTaskFactory): static; + + /** + * Set the init task factory for the worker if necessary + * + * If the init task factory is already set, it will not be overwritten. + * This is called by {@link Taskmaster::addWorker()}. + * If you want to set a different init task factory, call this before adding the worker. + * + * @param TaskFactoryInterface|null $initTaskFactory + * @return $this + */ + public function setInitTaskFactoryIfNecessary(?TaskFactoryInterface $initTaskFactory): static; + /** * Update the worker and its instance * diff --git a/test/Integration/AsyncWorkerTestCase.php b/test/Integration/AsyncWorkerTestCase.php index 25981ae..46155a2 100644 --- a/test/Integration/AsyncWorkerTestCase.php +++ b/test/Integration/AsyncWorkerTestCase.php @@ -6,6 +6,8 @@ use Aternos\Taskmaster\Exception\TaskTimeoutException; use Aternos\Taskmaster\Taskmaster; use Aternos\Taskmaster\Test\Util\Task\EmptyTask; +use Aternos\Taskmaster\Test\Util\Task\InitTask; +use Aternos\Taskmaster\Test\Util\Task\InitValidateTask; use Aternos\Taskmaster\Test\Util\Task\InterruptableSleepTask; use Aternos\Taskmaster\Test\Util\Task\SleepTask; use Aternos\Taskmaster\Test\Util\Task\SyncTask; @@ -82,4 +84,20 @@ public function testRecoverAfterTimeout(): void } $this->assertEquals(6, $counter); } + + public function testInitTask(): void + { + $this->taskmaster = new Taskmaster(); + $this->taskmaster->setDefaultInitTask(InitTask::class); + $this->taskmaster->addWorkers($this->createWorker(), 3); + + $this->addTasks(new InitValidateTask(), 3); + $counter = 0; + foreach ($this->taskmaster->waitAndHandleTasks() as $task) { + $this->assertNull($task->getError()); + $this->assertTrue($task->getResult()); + $counter++; + } + $this->assertEquals(3, $counter); + } } \ No newline at end of file diff --git a/test/Util/Task/InitTask.php b/test/Util/Task/InitTask.php new file mode 100644 index 0000000..8e5b318 --- /dev/null +++ b/test/Util/Task/InitTask.php @@ -0,0 +1,20 @@ +