Skip to content

Commit

Permalink
added option to define init tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
matthi4s committed Dec 11, 2023
1 parent 39d8539 commit 1d54523
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 1 deletion.
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,24 @@ $taskmaster->autoDetectWorkers(4, false);
$taskmaster->autoDetectWorkers(4, true, false);
```

### Init tasks

You can define tasks that are executed on every worker instance before the first task is executed.
This is helpful to run some initial setup or (in case of the [`ForkWorker`](src/Environment/Fork/ForkWorker.php))
to clear some variables that are inherited from the parent process, e.g. database connections.

```php
// init tasks are always provided by a task factory
$taskmaster->setDefaultInitTaskFactory(new InitTaskFactory());

// but taskmaster can create task factories automatically by cloning or instancing a task
$taskmaster->setDefaultInitTask(new InitTask());
$taskmaster->setDefaultInitTask(InitTask::class);

// you can also define a task factory for a specific worker
$worker->setInitTaskFactory(new InitTaskFactory());
```

## Running tasks

After writing your tasks, creating them and defining the workers, you can start running the tasks.
Expand Down
35 changes: 35 additions & 0 deletions src/Task/CloneTaskFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

namespace Aternos\Taskmaster\Task;

/**
* Class CloneTaskFactory
*
* The CloneTaskFactory creates clones of a task.
*
* @package Aternos\Taskmaster\Task
*/
class CloneTaskFactory extends TaskFactory
{
protected int $count = 0;

/**
* @param TaskInterface $task
* @param int|null $limit
*/
public function __construct(protected TaskInterface $task, protected ?int $limit = null)
{
}

/**
* @inheritDoc
*/
public function createNextTask(?string $group): ?TaskInterface
{
if ($this->limit !== null && $this->count >= $this->limit) {
return null;
}
$this->count++;
return clone $this->task;
}
}
37 changes: 37 additions & 0 deletions src/Task/InstanceTaskFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

namespace Aternos\Taskmaster\Task;

use Aternos\Taskmaster\Task\TaskFactory;

/**
* Class InstanceTaskFactory
*
* The InstanceTaskFactory creates instances of a task class.
*
* @package Aternos\Taskmaster\Task
*/
class InstanceTaskFactory extends TaskFactory
{
protected int $count = 0;

/**
* @param class-string<TaskInterface> $taskClass
* @param int|null $limit
*/
public function __construct(protected string $taskClass, protected ?int $limit = null)
{
}

/**
* @inheritDoc
*/
public function createNextTask(?string $group): ?TaskInterface
{
if ($this->limit !== null && $this->count >= $this->limit) {
return null;
}
$this->count++;
return new $this->taskClass();
}
}
42 changes: 42 additions & 0 deletions src/Taskmaster.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
use Aternos\Taskmaster\Proxy\ProcessProxy;
use Aternos\Taskmaster\Proxy\ProxyInterface;
use Aternos\Taskmaster\Proxy\ProxyStatus;
use Aternos\Taskmaster\Task\CloneTaskFactory;
use Aternos\Taskmaster\Task\InstanceTaskFactory;
use Aternos\Taskmaster\Task\TaskFactoryInterface;
use Aternos\Taskmaster\Task\TaskInterface;
use Aternos\Taskmaster\Worker\SocketWorkerInterface;
Expand Down Expand Up @@ -64,6 +66,7 @@ class Taskmaster

protected TaskmasterOptions $options;
protected float $defaultTaskTimeout = 0;
protected ?TaskFactoryInterface $initTaskFactory = null;

/**
* Taskmaster constructor
Expand Down Expand Up @@ -389,6 +392,7 @@ public function addWorker(WorkerInterface $worker): static
}

$worker->setOptionsIfNecessary($this->options);
$worker->setInitTaskFactoryIfNecessary($this->initTaskFactory);
$this->workers[] = $worker;
return $this;
}
Expand Down Expand Up @@ -561,6 +565,44 @@ public function setDefaultTaskTimeout(float $timeout): static
return $this;
}

/**
* Set the default init task factory for all workers
*
* The init task factory produces tasks that are executed once as first task on every worker instance
* to initialize the worker.
*
* You can also set the init task factory for each worker individually with {@link WorkerInterface::setInitTaskFactory()}.
*
* @param TaskFactoryInterface|null $initTaskFactory
* @return $this
*/
public function setDefaultInitTaskFactory(?TaskFactoryInterface $initTaskFactory): static
{
$this->initTaskFactory = $initTaskFactory;
return $this;
}

/**
* Set the default init task for all workers
*
* The init task is executed once as first task on every worker instance.
* This automatically creates a task factory that produces the init tasks.
*
* If you pass an object, the task will be cloned for each worker instance, if you pass
* a class name, a new instance of the class will be created for each worker instance.
*
* @param TaskInterface|class-string<TaskInterface> $task
* @return void
*/
public function setDefaultInitTask(TaskInterface|string $task): void
{
if (is_string($task)) {
$this->initTaskFactory = new InstanceTaskFactory($task);
} else {
$this->initTaskFactory = new CloneTaskFactory($task);
}
}

/**
* Get the current {@link TaskmasterOptions} instance
*
Expand Down
33 changes: 32 additions & 1 deletion src/Worker/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Aternos\Taskmaster\Proxy\ProxiedSocket;
use Aternos\Taskmaster\Proxy\ProxyInterface;
use Aternos\Taskmaster\Proxy\ProxyStatus;
use Aternos\Taskmaster\Task\TaskFactoryInterface;
use Aternos\Taskmaster\Task\TaskInterface;
use Aternos\Taskmaster\TaskmasterOptions;
use Aternos\Taskmaster\Worker\Instance\ProxyableWorkerInstanceInterface;
Expand Down Expand Up @@ -32,6 +33,8 @@ abstract class Worker implements WorkerInterface
protected ?ProxyInterface $proxy = null;
protected bool $instanceStarted = false;
protected ?TaskInterface $queuedTask = null;
protected ?TaskFactoryInterface $initTaskFactory = null;
protected ?TaskInterface $initTask = null;

/**
* @inheritDoc
Expand All @@ -53,6 +56,26 @@ public function setOptionsIfNecessary(TaskmasterOptions $options): static
return $this;
}

/**
* @inheritDoc
*/
public function setInitTaskFactory(?TaskFactoryInterface $initTaskFactory): static
{
$this->initTaskFactory = $initTaskFactory;
return $this;
}

/**
* @inheritDoc
*/
public function setInitTaskFactoryIfNecessary(?TaskFactoryInterface $initTaskFactory): static
{
if ($this->initTaskFactory === null) {
$this->setInitTaskFactory($initTaskFactory);
}
return $this;
}

/**
* Get the current instance or create a new one if necessary
*
Expand Down Expand Up @@ -86,6 +109,11 @@ protected function startInstance(): void
{
$this->instanceStarted = true;
$instance = $this->getInstance();

if ($this->initTaskFactory) {
$this->initTask = $this->initTaskFactory->createNextTask(null);
}

if (!$this->proxy) {
$instance->init()->start();
return;
Expand Down Expand Up @@ -115,7 +143,10 @@ public function update(): static
}
$instance->update();
if ($instance->getStatus() === WorkerInstanceStatus::IDLE) {
if ($this->queuedTask) {
if ($this->initTask) {
$instance->runTask($this->initTask);
$this->initTask = null;
} elseif ($this->queuedTask) {
$instance->runTask($this->queuedTask);
$this->queuedTask = null;
} else {
Expand Down
23 changes: 23 additions & 0 deletions src/Worker/WorkerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Aternos\Taskmaster\Proxy\ProxyInterface;
use Aternos\Taskmaster\Runtime\RuntimeInterface;
use Aternos\Taskmaster\Task\TaskFactoryInterface;
use Aternos\Taskmaster\Task\TaskInterface;
use Aternos\Taskmaster\Taskmaster;
use Aternos\Taskmaster\TaskmasterOptions;
Expand Down Expand Up @@ -62,6 +63,28 @@ public function setOptions(TaskmasterOptions $options): static;
*/
public function setOptionsIfNecessary(TaskmasterOptions $options): static;

/**
* Set the init task factory for the worker
*
* Init tasks are executed once as first task on every worker instance to initialize the worker.
*
* @param TaskFactoryInterface|null $initTaskFactory
* @return $this
*/
public function setInitTaskFactory(?TaskFactoryInterface $initTaskFactory): static;

/**
* Set the init task factory for the worker if necessary
*
* If the init task factory is already set, it will not be overwritten.
* This is called by {@link Taskmaster::addWorker()}.
* If you want to set a different init task factory, call this before adding the worker.
*
* @param TaskFactoryInterface|null $initTaskFactory
* @return $this
*/
public function setInitTaskFactoryIfNecessary(?TaskFactoryInterface $initTaskFactory): static;

/**
* Update the worker and its instance
*
Expand Down
18 changes: 18 additions & 0 deletions test/Integration/AsyncWorkerTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
use Aternos\Taskmaster\Exception\TaskTimeoutException;
use Aternos\Taskmaster\Taskmaster;
use Aternos\Taskmaster\Test\Util\Task\EmptyTask;
use Aternos\Taskmaster\Test\Util\Task\InitTask;
use Aternos\Taskmaster\Test\Util\Task\InitValidateTask;
use Aternos\Taskmaster\Test\Util\Task\InterruptableSleepTask;
use Aternos\Taskmaster\Test\Util\Task\SleepTask;
use Aternos\Taskmaster\Test\Util\Task\SyncTask;
Expand Down Expand Up @@ -82,4 +84,20 @@ public function testRecoverAfterTimeout(): void
}
$this->assertEquals(6, $counter);
}

public function testInitTask(): void
{
$this->taskmaster = new Taskmaster();
$this->taskmaster->setDefaultInitTask(InitTask::class);
$this->taskmaster->addWorkers($this->createWorker(), 3);

$this->addTasks(new InitValidateTask(), 3);
$counter = 0;
foreach ($this->taskmaster->waitAndHandleTasks() as $task) {
$this->assertNull($task->getError());
$this->assertTrue($task->getResult());
$counter++;
}
$this->assertEquals(3, $counter);
}
}
20 changes: 20 additions & 0 deletions test/Util/Task/InitTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

namespace Aternos\Taskmaster\Test\Util\Task;

use Aternos\Taskmaster\Task\OnChild;
use Aternos\Taskmaster\Task\Task;

class InitTask extends Task
{
public static bool $initialized = false;

/**
* @inheritDoc
*/
#[OnChild]
public function run(): void
{
static::$initialized = true;
}
}
18 changes: 18 additions & 0 deletions test/Util/Task/InitValidateTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

namespace Aternos\Taskmaster\Test\Util\Task;

use Aternos\Taskmaster\Task\OnChild;
use Aternos\Taskmaster\Task\Task;

class InitValidateTask extends Task
{
/**
* @inheritDoc
*/
#[OnChild]
public function run(): bool
{
return InitTask::$initialized;
}
}

0 comments on commit 1d54523

Please sign in to comment.