Skip to content

Commit

Permalink
Merge pull request #4 from michalsn/feat/redis
Browse files Browse the repository at this point in the history
feat: redis
  • Loading branch information
michalsn authored Nov 20, 2023
2 parents b27a50e + ba832bc commit 0844c4a
Show file tree
Hide file tree
Showing 17 changed files with 1,152 additions and 10 deletions.
17 changes: 15 additions & 2 deletions .github/workflows/phpunit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,19 @@ on:
jobs:
main:
name: PHP ${{ matrix.php-versions }} Unit Tests
runs-on: ubuntu-latest
runs-on: ubuntu-22.04

services:
redis:
image: redis
ports:
- 6379:6379
options: >-
--health-cmd "redis-cli ping"
--health-interval=10s
--health-timeout=5s
--health-retries=3
if: "!contains(github.event.head_commit.message, '[ci skip]')"
strategy:
matrix:
Expand All @@ -36,7 +48,7 @@ jobs:
with:
php-version: ${{ matrix.php-versions }}
tools: composer, phive, phpunit
extensions: intl, json, mbstring, gd, xdebug, xml, sqlite3
extensions: intl, json, mbstring, gd, xdebug, xml, sqlite3, redis
coverage: xdebug
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
Expand Down Expand Up @@ -79,6 +91,7 @@ jobs:
coveralls:
needs: [main]
name: Coveralls Finished
if: github.repository_owner == 'michalsn'
runs-on: ubuntu-latest
steps:
- name: Upload Coveralls results
Expand Down
8 changes: 6 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"description": "Queues for CodeIgniter 4 framework",
"license": "MIT",
"type": "library",
"keywords": ["codeigniter", "codeigniter4", "queue"],
"keywords": ["codeigniter", "codeigniter4", "queue", "database", "redis", "predis"],
"authors": [
{
"name": "michalsn",
Expand All @@ -18,7 +18,7 @@
"require-dev": {
"codeigniter4/devkit": "^1.0",
"codeigniter4/framework": "^4.4",
"rector/rector": "0.18.6"
"predis/predis": "^2.0"
},
"minimum-stability": "dev",
"prefer-stable": true,
Expand All @@ -32,6 +32,10 @@
"Tests\\": "tests"
}
},
"suggest": {
"ext-redis": "If you want to use RedisHandler",
"predis/predis": "If you want to use PredisHandler"
},
"config": {
"allow-plugins": {
"phpstan/extension-installer": true
Expand Down
27 changes: 26 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ Available options:
- [$defaultHandler](#defaultHandler)
- [$handlers](#handlers)
- [$database](#database)
- [$redis](#redis)
- [$predis](#predis)
- [$keepDoneJobs](#keepDoneJobs)
- [$keepFailedJobs](#keepFailedJobs)
- [$queueDefaultPriority](#queueDefaultPriority)
Expand All @@ -27,7 +29,7 @@ The default handler used by the library. Default value: `database`.

### $handlers

An array of available handlers. By now only `database` handler is implemented.
An array of available handlers. By now only `database`, `redis` and `predis` handlers are implemented.

### $database

Expand All @@ -36,6 +38,29 @@ The configuration settings for `database` handler.
* `dbGroup` - The database group to use. Default value: `default`.
* `getShared` - Weather to use shared instance. Default value: `true`.

### $redis

The configuration settings for `redis` handler. You need to have a [ext-redis](https://github.com/phpredis/phpredis) installed to use it.

* `host` - The host name or unix socket. Default value: `127.0.0.1`.
* `password` - The password. Default value: `null`.
* `port` - The port number. Default value: `6379`.
* `timeout` - The timeout for connection. Default value: `0`.
* `database` - The database number. Default value: `0`.
* `prefix` - The default key prefix. Default value: `''` (not set).

### $predis

The configuration settings for `predis` handler. You need to have [Predis](https://github.com/predis/predis) installed to use it.

* `scheme` - The scheme to use: `tcp`, `tls` or `unix`. Default value: `tcp`.
* `host` - The host name. Default value: `127.0.0.1`.
* `password` - The password. Default value: `null`.
* `port` - The port number (when `tcp`). Default value: `6379`.
* `timeout` - The timeout for connection. Default value: `5`.
* `database` - The database number. Default value: `0`.
* `prefix` - The default key prefix. Default value: `''` (not set).

### $keepDoneJobs

If the job is done, should we keep it in the table? Default value: `false`.
Expand Down
13 changes: 13 additions & 0 deletions phpstan.neon.dist
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,25 @@ parameters:
message: '#Call to an undefined method Michalsn\\CodeIgniterQueue\\Handlers\\BaseHandler::push\(\).#'
paths:
- src/Handlers/BaseHandler.php
-
message: '#Call to deprecated function random_string\(\):#'
paths:
- src/Handlers/RedisHandler.php
- src/Handlers/PredisHandler.php
-
message: '#Cannot access property \$timestamp on array\|bool\|float\|int\|object\|string.#'
paths:
- tests/_support/Database/Seeds/TestRedisQueueSeeder.php
-
message: '#Access to an undefined property CodeIgniter\\I18n\\Time::\$timestamp.#'
paths:
- src/Handlers/BaseHandler.php
- src/Handlers/DatabaseHandler.php
- src/Handlers/RedisHandler.php
- src/Handlers/PredisHandler.php
- src/Models/QueueJobModel.php
- tests/RedisHandlerTest.php
- tests/PredisHandlerTest.php
-
message: '#Call to an undefined method Michalsn\\CodeIgniterQueue\\Models\\QueueJobFailedModel::affectedRows\(\).#'
paths:
Expand Down
29 changes: 29 additions & 0 deletions src/Config/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
use CodeIgniter\Config\BaseConfig;
use Michalsn\CodeIgniterQueue\Exceptions\QueueException;
use Michalsn\CodeIgniterQueue\Handlers\DatabaseHandler;
use Michalsn\CodeIgniterQueue\Handlers\PredisHandler;
use Michalsn\CodeIgniterQueue\Handlers\RedisHandler;

class Queue extends BaseConfig
{
Expand All @@ -18,6 +20,8 @@ class Queue extends BaseConfig
*/
public array $handlers = [
'database' => DatabaseHandler::class,
'redis' => RedisHandler::class,
'predis' => PredisHandler::class,
];

/**
Expand All @@ -28,6 +32,31 @@ class Queue extends BaseConfig
'getShared' => true,
];

/**
* Redis handler config.
*/
public array $redis = [
'host' => '127.0.0.1',
'password' => null,
'port' => 6379,
'timeout' => 0,
'database' => 0,
'prefix' => '',
];

/**
* Predis handler config.
*/
public array $predis = [
'scheme' => 'tcp',
'host' => '127.0.0.1',
'password' => null,
'port' => 6379,
'timeout' => 5,
'database' => 0,
'prefix' => '',
];

/**
* Whether to keep the DONE jobs in the queue.
*/
Expand Down
10 changes: 10 additions & 0 deletions src/Exceptions/QueueException.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ public static function forIncorrectHandler(): static
return new self(lang('Queue.incorrectHandler'));
}

public static function forIncorrectQueueFormat(): static
{
return new self(lang('Queue.incorrectQueueFormat'));
}

public static function forTooLongQueueName(): static
{
return new self(lang('Queue.tooLongQueueName'));
}

public static function forIncorrectJobHandler(): static
{
return new self(lang('Queue.incorrectJobHandler'));
Expand Down
17 changes: 17 additions & 0 deletions src/Handlers/BaseHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ protected function logFailed(QueueJob $queueJob, Throwable $err): bool
*/
protected function validateJobAndPriority(string $queue, string $job): void
{
// Validate queue
$this->validateQueue($queue);

// Validate jobHandler.
if (! in_array($job, array_keys($this->config->jobHandlers), true)) {
throw QueueException::forIncorrectJobHandler();
Expand All @@ -150,4 +153,18 @@ protected function validateJobAndPriority(string $queue, string $job): void
throw QueueException::forIncorrectQueuePriority($this->priority, $queue);
}
}

/**
* Validate queue name.
*/
protected function validateQueue(string $queue): void
{
if (! preg_match('/^[a-z0-9_-]+$/', $queue)) {
throw QueueException::forIncorrectQueueFormat();
}

if (strlen($queue) > 64) {
throw QueueException::forTooLongQueueName();
}
}
}
147 changes: 147 additions & 0 deletions src/Handlers/PredisHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
<?php

namespace Michalsn\CodeIgniterQueue\Handlers;

use CodeIgniter\Exceptions\CriticalError;
use CodeIgniter\I18n\Time;
use Exception;
use Michalsn\CodeIgniterQueue\Config\Queue as QueueConfig;
use Michalsn\CodeIgniterQueue\Entities\QueueJob;
use Michalsn\CodeIgniterQueue\Enums\Status;
use Michalsn\CodeIgniterQueue\Interfaces\QueueInterface;
use Michalsn\CodeIgniterQueue\Payload;
use Predis\Client;
use Throwable;

class PredisHandler extends BaseHandler implements QueueInterface
{
private readonly Client $predis;

public function __construct(protected QueueConfig $config)
{
try {
$this->predis = new Client($config->predis, ['prefix' => $config->predis['prefix']]);
$this->predis->time();
} catch (Exception $e) {
throw new CriticalError('Queue: Predis connection refused (' . $e->getMessage() . ').');
}
}

/**
* Add job to the queue.
*/
public function push(string $queue, string $job, array $data): bool
{
$this->validateJobAndPriority($queue, $job);

helper('text');

$queueJob = new QueueJob([
'id' => random_string('numeric', 16),
'queue' => $queue,
'payload' => new Payload($job, $data),
'priority' => $this->priority,
'status' => Status::PENDING->value,
'attempts' => 0,
'available_at' => Time::now()->timestamp,
]);

$result = $this->predis->zadd("queues:{$queue}:{$this->priority}", [json_encode($queueJob) => Time::now()->timestamp]);

$this->priority = null;

return $result > 0;
}

/**
* Get job from the queue.
*/
public function pop(string $queue, array $priorities): ?QueueJob
{
$tasks = [];
$now = Time::now()->timestamp;

foreach ($priorities as $priority) {
if ($tasks = $this->predis->zrangebyscore("queues:{$queue}:{$priority}", '-inf', $now, ['LIMIT' => [0, 1]])) {
if ($this->predis->zrem("queues:{$queue}:{$priority}", ...$tasks)) {
break;
}
$tasks = [];
}
}

if (empty($tasks[0])) {
return null;
}

$queueJob = new QueueJob(json_decode((string) $tasks[0], true));

// Set the actual status as in DB.
$queueJob->status = Status::RESERVED->value;
$queueJob->syncOriginal();

$this->predis->hset("queues:{$queue}::reserved", $queueJob->id, json_encode($queueJob));

return $queueJob;
}

/**
* Schedule job for later
*/
public function later(QueueJob $queueJob, int $seconds): bool
{
$queueJob->status = Status::PENDING->value;
$queueJob->available_at = Time::now()->addSeconds($seconds)->timestamp;

if ($result = $this->predis->zadd("queues:{$queueJob->queue}:{$queueJob->priority}", [json_encode($queueJob) => $queueJob->available_at->timestamp])) {
$this->predis->hdel("queues:{$queueJob->queue}::reserved", $queueJob->id);
}

return $result > 0;
}

/**
* Move job to failed table or move and delete.
*/
public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool
{
if ($keepJob) {
$this->logFailed($queueJob, $err);
}

return (bool) $this->predis->hdel("queues:{$queueJob->queue}::reserved", $queueJob->id);
}

/**
* Change job status to DONE or delete it.
*/
public function done(QueueJob $queueJob, bool $keepJob): bool
{
if ($keepJob) {
$queueJob->status = Status::DONE->value;
$this->predis->lpush("queues:{$queueJob->queue}::done", [json_encode($queueJob)]);
}

return (bool) $this->predis->hdel("queues:{$queueJob->queue}::reserved", $queueJob->id);
}

/**
* Delete queue jobs
*/
public function clear(?string $queue = null): bool
{
if ($queue !== null) {
if ($keys = $this->predis->keys("queues:{$queue}:*")) {
return $this->predis->del($keys) > 0;
}

return true;
}

if ($keys = $this->predis->keys('queues:*')) {
return $this->predis->del($keys) > 0;
}

return true;
}
}
Loading

0 comments on commit 0844c4a

Please sign in to comment.