diff --git a/.github/workflows/phpunit.yml b/.github/workflows/phpunit.yml index b03028e..89829ba 100644 --- a/.github/workflows/phpunit.yml +++ b/.github/workflows/phpunit.yml @@ -21,7 +21,19 @@ on: jobs: main: name: PHP ${{ matrix.php-versions }} Unit Tests - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 + + services: + redis: + image: redis + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval=10s + --health-timeout=5s + --health-retries=3 + if: "!contains(github.event.head_commit.message, '[ci skip]')" strategy: matrix: @@ -36,7 +48,7 @@ jobs: with: php-version: ${{ matrix.php-versions }} tools: composer, phive, phpunit - extensions: intl, json, mbstring, gd, xdebug, xml, sqlite3 + extensions: intl, json, mbstring, gd, xdebug, xml, sqlite3, redis coverage: xdebug env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} @@ -79,6 +91,7 @@ jobs: coveralls: needs: [main] name: Coveralls Finished + if: github.repository_owner == 'michalsn' runs-on: ubuntu-latest steps: - name: Upload Coveralls results diff --git a/composer.json b/composer.json index a1c889f..2fff215 100644 --- a/composer.json +++ b/composer.json @@ -3,7 +3,7 @@ "description": "Queues for CodeIgniter 4 framework", "license": "MIT", "type": "library", - "keywords": ["codeigniter", "codeigniter4", "queue"], + "keywords": ["codeigniter", "codeigniter4", "queue", "database", "redis", "predis"], "authors": [ { "name": "michalsn", @@ -18,7 +18,7 @@ "require-dev": { "codeigniter4/devkit": "^1.0", "codeigniter4/framework": "^4.4", - "rector/rector": "0.18.6" + "predis/predis": "^2.0" }, "minimum-stability": "dev", "prefer-stable": true, @@ -32,6 +32,10 @@ "Tests\\": "tests" } }, + "suggest": { + "ext-redis": "If you want to use RedisHandler", + "predis/predis": "If you want to use PredisHandler" + }, "config": { "allow-plugins": { "phpstan/extension-installer": true diff --git a/docs/configuration.md b/docs/configuration.md index c872654..ea6b75a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -15,6 +15,8 @@ Available options: - [$defaultHandler](#defaultHandler) - [$handlers](#handlers) - [$database](#database) +- [$redis](#redis) +- [$predis](#predis) - [$keepDoneJobs](#keepDoneJobs) - [$keepFailedJobs](#keepFailedJobs) - [$queueDefaultPriority](#queueDefaultPriority) @@ -27,7 +29,7 @@ The default handler used by the library. Default value: `database`. ### $handlers -An array of available handlers. By now only `database` handler is implemented. +An array of available handlers. By now only `database`, `redis` and `predis` handlers are implemented. ### $database @@ -36,6 +38,29 @@ The configuration settings for `database` handler. * `dbGroup` - The database group to use. Default value: `default`. * `getShared` - Weather to use shared instance. Default value: `true`. +### $redis + +The configuration settings for `redis` handler. You need to have a [ext-redis](https://github.com/phpredis/phpredis) installed to use it. + +* `host` - The host name or unix socket. Default value: `127.0.0.1`. +* `password` - The password. Default value: `null`. +* `port` - The port number. Default value: `6379`. +* `timeout` - The timeout for connection. Default value: `0`. +* `database` - The database number. Default value: `0`. +* `prefix` - The default key prefix. Default value: `''` (not set). + +### $predis + +The configuration settings for `predis` handler. You need to have [Predis](https://github.com/predis/predis) installed to use it. + +* `scheme` - The scheme to use: `tcp`, `tls` or `unix`. Default value: `tcp`. +* `host` - The host name. Default value: `127.0.0.1`. +* `password` - The password. Default value: `null`. +* `port` - The port number (when `tcp`). Default value: `6379`. +* `timeout` - The timeout for connection. Default value: `5`. +* `database` - The database number. Default value: `0`. +* `prefix` - The default key prefix. Default value: `''` (not set). + ### $keepDoneJobs If the job is done, should we keep it in the table? Default value: `false`. diff --git a/phpstan.neon.dist b/phpstan.neon.dist index d1d87e5..ede8df0 100644 --- a/phpstan.neon.dist +++ b/phpstan.neon.dist @@ -20,12 +20,25 @@ parameters: message: '#Call to an undefined method Michalsn\\CodeIgniterQueue\\Handlers\\BaseHandler::push\(\).#' paths: - src/Handlers/BaseHandler.php + - + message: '#Call to deprecated function random_string\(\):#' + paths: + - src/Handlers/RedisHandler.php + - src/Handlers/PredisHandler.php + - + message: '#Cannot access property \$timestamp on array\|bool\|float\|int\|object\|string.#' + paths: + - tests/_support/Database/Seeds/TestRedisQueueSeeder.php - message: '#Access to an undefined property CodeIgniter\\I18n\\Time::\$timestamp.#' paths: - src/Handlers/BaseHandler.php - src/Handlers/DatabaseHandler.php + - src/Handlers/RedisHandler.php + - src/Handlers/PredisHandler.php - src/Models/QueueJobModel.php + - tests/RedisHandlerTest.php + - tests/PredisHandlerTest.php - message: '#Call to an undefined method Michalsn\\CodeIgniterQueue\\Models\\QueueJobFailedModel::affectedRows\(\).#' paths: diff --git a/src/Config/Queue.php b/src/Config/Queue.php index fc8a37b..180beee 100644 --- a/src/Config/Queue.php +++ b/src/Config/Queue.php @@ -5,6 +5,8 @@ use CodeIgniter\Config\BaseConfig; use Michalsn\CodeIgniterQueue\Exceptions\QueueException; use Michalsn\CodeIgniterQueue\Handlers\DatabaseHandler; +use Michalsn\CodeIgniterQueue\Handlers\PredisHandler; +use Michalsn\CodeIgniterQueue\Handlers\RedisHandler; class Queue extends BaseConfig { @@ -18,6 +20,8 @@ class Queue extends BaseConfig */ public array $handlers = [ 'database' => DatabaseHandler::class, + 'redis' => RedisHandler::class, + 'predis' => PredisHandler::class, ]; /** @@ -28,6 +32,31 @@ class Queue extends BaseConfig 'getShared' => true, ]; + /** + * Redis handler config. + */ + public array $redis = [ + 'host' => '127.0.0.1', + 'password' => null, + 'port' => 6379, + 'timeout' => 0, + 'database' => 0, + 'prefix' => '', + ]; + + /** + * Predis handler config. + */ + public array $predis = [ + 'scheme' => 'tcp', + 'host' => '127.0.0.1', + 'password' => null, + 'port' => 6379, + 'timeout' => 5, + 'database' => 0, + 'prefix' => '', + ]; + /** * Whether to keep the DONE jobs in the queue. */ diff --git a/src/Exceptions/QueueException.php b/src/Exceptions/QueueException.php index 93b240c..f78d4bf 100644 --- a/src/Exceptions/QueueException.php +++ b/src/Exceptions/QueueException.php @@ -11,6 +11,16 @@ public static function forIncorrectHandler(): static return new self(lang('Queue.incorrectHandler')); } + public static function forIncorrectQueueFormat(): static + { + return new self(lang('Queue.incorrectQueueFormat')); + } + + public static function forTooLongQueueName(): static + { + return new self(lang('Queue.tooLongQueueName')); + } + public static function forIncorrectJobHandler(): static { return new self(lang('Queue.incorrectJobHandler')); diff --git a/src/Handlers/BaseHandler.php b/src/Handlers/BaseHandler.php index aad22b4..35ef1f0 100644 --- a/src/Handlers/BaseHandler.php +++ b/src/Handlers/BaseHandler.php @@ -136,6 +136,9 @@ protected function logFailed(QueueJob $queueJob, Throwable $err): bool */ protected function validateJobAndPriority(string $queue, string $job): void { + // Validate queue + $this->validateQueue($queue); + // Validate jobHandler. if (! in_array($job, array_keys($this->config->jobHandlers), true)) { throw QueueException::forIncorrectJobHandler(); @@ -150,4 +153,18 @@ protected function validateJobAndPriority(string $queue, string $job): void throw QueueException::forIncorrectQueuePriority($this->priority, $queue); } } + + /** + * Validate queue name. + */ + protected function validateQueue(string $queue): void + { + if (! preg_match('/^[a-z0-9_-]+$/', $queue)) { + throw QueueException::forIncorrectQueueFormat(); + } + + if (strlen($queue) > 64) { + throw QueueException::forTooLongQueueName(); + } + } } diff --git a/src/Handlers/PredisHandler.php b/src/Handlers/PredisHandler.php new file mode 100644 index 0000000..2133ab4 --- /dev/null +++ b/src/Handlers/PredisHandler.php @@ -0,0 +1,147 @@ +predis = new Client($config->predis, ['prefix' => $config->predis['prefix']]); + $this->predis->time(); + } catch (Exception $e) { + throw new CriticalError('Queue: Predis connection refused (' . $e->getMessage() . ').'); + } + } + + /** + * Add job to the queue. + */ + public function push(string $queue, string $job, array $data): bool + { + $this->validateJobAndPriority($queue, $job); + + helper('text'); + + $queueJob = new QueueJob([ + 'id' => random_string('numeric', 16), + 'queue' => $queue, + 'payload' => new Payload($job, $data), + 'priority' => $this->priority, + 'status' => Status::PENDING->value, + 'attempts' => 0, + 'available_at' => Time::now()->timestamp, + ]); + + $result = $this->predis->zadd("queues:{$queue}:{$this->priority}", [json_encode($queueJob) => Time::now()->timestamp]); + + $this->priority = null; + + return $result > 0; + } + + /** + * Get job from the queue. + */ + public function pop(string $queue, array $priorities): ?QueueJob + { + $tasks = []; + $now = Time::now()->timestamp; + + foreach ($priorities as $priority) { + if ($tasks = $this->predis->zrangebyscore("queues:{$queue}:{$priority}", '-inf', $now, ['LIMIT' => [0, 1]])) { + if ($this->predis->zrem("queues:{$queue}:{$priority}", ...$tasks)) { + break; + } + $tasks = []; + } + } + + if (empty($tasks[0])) { + return null; + } + + $queueJob = new QueueJob(json_decode((string) $tasks[0], true)); + + // Set the actual status as in DB. + $queueJob->status = Status::RESERVED->value; + $queueJob->syncOriginal(); + + $this->predis->hset("queues:{$queue}::reserved", $queueJob->id, json_encode($queueJob)); + + return $queueJob; + } + + /** + * Schedule job for later + */ + public function later(QueueJob $queueJob, int $seconds): bool + { + $queueJob->status = Status::PENDING->value; + $queueJob->available_at = Time::now()->addSeconds($seconds)->timestamp; + + if ($result = $this->predis->zadd("queues:{$queueJob->queue}:{$queueJob->priority}", [json_encode($queueJob) => $queueJob->available_at->timestamp])) { + $this->predis->hdel("queues:{$queueJob->queue}::reserved", $queueJob->id); + } + + return $result > 0; + } + + /** + * Move job to failed table or move and delete. + */ + public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool + { + if ($keepJob) { + $this->logFailed($queueJob, $err); + } + + return (bool) $this->predis->hdel("queues:{$queueJob->queue}::reserved", $queueJob->id); + } + + /** + * Change job status to DONE or delete it. + */ + public function done(QueueJob $queueJob, bool $keepJob): bool + { + if ($keepJob) { + $queueJob->status = Status::DONE->value; + $this->predis->lpush("queues:{$queueJob->queue}::done", [json_encode($queueJob)]); + } + + return (bool) $this->predis->hdel("queues:{$queueJob->queue}::reserved", $queueJob->id); + } + + /** + * Delete queue jobs + */ + public function clear(?string $queue = null): bool + { + if ($queue !== null) { + if ($keys = $this->predis->keys("queues:{$queue}:*")) { + return $this->predis->del($keys) > 0; + } + + return true; + } + + if ($keys = $this->predis->keys('queues:*')) { + return $this->predis->del($keys) > 0; + } + + return true; + } +} diff --git a/src/Handlers/RedisHandler.php b/src/Handlers/RedisHandler.php new file mode 100644 index 0000000..b58af92 --- /dev/null +++ b/src/Handlers/RedisHandler.php @@ -0,0 +1,172 @@ +redis = new Redis(); + + try { + if (! $this->redis->connect($config->redis['host'], ($config->redis['host'][0] === '/' ? 0 : $config->redis['port']), $config->redis['timeout'])) { + throw new CriticalError('Queue: Redis connection failed. Check your configuration.'); + } + + if (isset($config->redis['password']) && ! $this->redis->auth($config->redis['password'])) { + throw new CriticalError('Queue: Redis authentication failed.'); + } + + if (isset($config->redis['database']) && ! $this->redis->select($config->redis['database'])) { + throw new CriticalError('Queue: Redis select database failed.'); + } + + if (isset($config->redis['prefix']) && ! $this->redis->setOption(Redis::OPT_PREFIX, $config->redis['prefix'])) { + throw new CriticalError('Queue: Redis setting prefix failed.'); + } + } catch (RedisException $e) { + throw new CriticalError('Queue: RedisException occurred with message (' . $e->getMessage() . ').'); + } + } + + /** + * Add job to the queue. + * + * @throws RedisException + */ + public function push(string $queue, string $job, array $data): bool + { + $this->validateJobAndPriority($queue, $job); + + helper('text'); + + $queueJob = new QueueJob([ + 'id' => random_string('numeric', 16), + 'queue' => $queue, + 'payload' => new Payload($job, $data), + 'priority' => $this->priority, + 'status' => Status::PENDING->value, + 'attempts' => 0, + 'available_at' => Time::now()->timestamp, + ]); + + $result = (int) $this->redis->zAdd("queues:{$queue}:{$this->priority}", Time::now()->timestamp, json_encode($queueJob)); + + $this->priority = null; + + return $result > 0; + } + + /** + * Get job from the queue. + * + * @throws RedisException + */ + public function pop(string $queue, array $priorities): ?QueueJob + { + $tasks = []; + $now = Time::now()->timestamp; + + foreach ($priorities as $priority) { + if ($tasks = $this->redis->zRangeByScore("queues:{$queue}:{$priority}", '-inf', $now, ['limit' => [0, 1]])) { + if ($this->redis->zRem("queues:{$queue}:{$priority}", ...$tasks)) { + break; + } + $tasks = []; + } + } + + if (empty($tasks[0])) { + return null; + } + + $queueJob = new QueueJob(json_decode((string) $tasks[0], true)); + + // Set the actual status as in DB. + $queueJob->status = Status::RESERVED->value; + $queueJob->syncOriginal(); + + $this->redis->hSet("queues:{$queue}::reserved", $queueJob->id, json_encode($queueJob)); + + return $queueJob; + } + + /** + * Schedule job for later + * + * @throws RedisException + */ + public function later(QueueJob $queueJob, int $seconds): bool + { + $queueJob->status = Status::PENDING->value; + $queueJob->available_at = Time::now()->addSeconds($seconds)->timestamp; + + if ($result = (int) $this->redis->zAdd("queues:{$queueJob->queue}:{$queueJob->priority}", $queueJob->available_at->timestamp, json_encode($queueJob))) { + $this->redis->hDel("queues:{$queueJob->queue}::reserved", $queueJob->id); + } + + return $result > 0; + } + + /** + * Move job to failed table or move and delete. + */ + public function failed(QueueJob $queueJob, Throwable $err, bool $keepJob): bool + { + if ($keepJob) { + $this->logFailed($queueJob, $err); + } + + return (bool) $this->redis->hDel("queues:{$queueJob->queue}::reserved", $queueJob->id); + } + + /** + * Change job status to DONE or delete it. + * + * @throws RedisException + */ + public function done(QueueJob $queueJob, bool $keepJob): bool + { + if ($keepJob) { + $queueJob->status = Status::DONE->value; + $this->redis->lPush("queues:{$queueJob->queue}::done", json_encode($queueJob)); + } + + return (bool) $this->redis->hDel("queues:{$queueJob->queue}::reserved", $queueJob->id); + } + + /** + * Delete queue jobs + * + * @throws RedisException + */ + public function clear(?string $queue = null): bool + { + if ($queue !== null) { + if ($keys = $this->redis->keys("queues:{$queue}:*")) { + return (int) $this->redis->del($keys) > 0; + } + + return true; + } + + if ($keys = $this->redis->keys('queues:*')) { + return (int) $this->redis->del($keys) > 0; + } + + return true; + } +} diff --git a/src/Language/en/Queue.php b/src/Language/en/Queue.php index d7c1a09..c4bab3c 100644 --- a/src/Language/en/Queue.php +++ b/src/Language/en/Queue.php @@ -7,6 +7,8 @@ ], ], 'incorrectHandler' => 'This queue handler is incorrect.', + 'incorrectQueueFormat' => 'The queue name should consists only lowercase letters or numbers.', + 'tooLongQueueName' => 'The queue name is too long. It should be no longer than 64 letters.', 'incorrectJobHandler' => 'This job name is not defined in the $jobHandlers array.', 'incorrectPriorityFormat' => 'The priority name should consists only lowercase letters.', 'tooLongPriorityName' => 'The priority name is too long. It should be no longer than 64 letters.', diff --git a/tests/DatabaseHandlerTest.php b/tests/DatabaseHandlerTest.php index 26eb20a..167f1ef 100644 --- a/tests/DatabaseHandlerTest.php +++ b/tests/DatabaseHandlerTest.php @@ -11,7 +11,7 @@ use Michalsn\CodeIgniterQueue\Models\QueueJobFailedModel; use ReflectionException; use Tests\Support\Config\Queue as QueueConfig; -use Tests\Support\Database\Seeds\TestQueueSeeder; +use Tests\Support\Database\Seeds\TestDatabaseQueueSeeder; use Tests\Support\TestCase; /** @@ -21,7 +21,7 @@ final class DatabaseHandlerTest extends TestCase { use ReflectionHelper; - protected $seed = TestQueueSeeder::class; + protected $seed = TestDatabaseQueueSeeder::class; private QueueConfig $config; protected function setUp(): void @@ -94,6 +94,38 @@ public function testPushWithPriority() ]); } + public function testPushAndPopWithPriority() + { + $handler = new DatabaseHandler($this->config); + $result = $handler->push('queue', 'success', ['key1' => 'value1']); + + $this->assertTrue($result); + $this->seeInDatabase('queue_jobs', [ + 'queue' => 'queue', + 'payload' => json_encode(['job' => 'success', 'data' => ['key1' => 'value1']]), + 'priority' => 'low', + ]); + + $result = $handler->setPriority('high')->push('queue', 'success', ['key2' => 'value2']); + + $this->assertTrue($result); + $this->seeInDatabase('queue_jobs', [ + 'queue' => 'queue', + 'payload' => json_encode(['job' => 'success', 'data' => ['key2' => 'value2']]), + 'priority' => 'high', + ]); + + $result = $handler->pop('queue', ['high', 'low']); + $this->assertInstanceOf(QueueJob::class, $result); + $payload = ['job' => 'success', 'data' => ['key2' => 'value2']]; + $this->assertSame($payload, $result->payload); + + $result = $handler->pop('queue', ['high', 'low']); + $this->assertInstanceOf(QueueJob::class, $result); + $payload = ['job' => 'success', 'data' => ['key1' => 'value1']]; + $this->assertSame($payload, $result->payload); + } + /** * @throws ReflectionException */ @@ -118,6 +150,30 @@ public function testPushWithPriorityException() $handler->setPriority('invalid')->push('queue', 'success', ['key' => 'value']); } + /** + * @throws ReflectionException + */ + public function testPushWithIncorrectQueueFormatException() + { + $this->expectException(QueueException::class); + $this->expectExceptionMessage('The queue name should consists only lowercase letters or numbers.'); + + $handler = new DatabaseHandler($this->config); + $handler->push('queue*', 'success', ['key' => 'value']); + } + + /** + * @throws ReflectionException + */ + public function testPushWithTooLongQueueNameException() + { + $this->expectException(QueueException::class); + $this->expectExceptionMessage('The queue name is too long. It should be no longer than 64 letters.'); + + $handler = new DatabaseHandler($this->config); + $handler->push(str_repeat('a', 65), 'success', ['key' => 'value']); + } + /** * @throws ReflectionException */ @@ -284,6 +340,14 @@ public function testForget() ]); } + public function testForgetFalse() + { + $handler = new DatabaseHandler($this->config); + $result = $handler->forget(1111); + + $this->assertFalse($result); + } + /** * @throws ReflectionException */ diff --git a/tests/PredisHandlerTest.php b/tests/PredisHandlerTest.php new file mode 100644 index 0000000..bc7d1e5 --- /dev/null +++ b/tests/PredisHandlerTest.php @@ -0,0 +1,294 @@ +config = config(QueueConfig::class); + } + + public function testPredisHandler() + { + $handler = new PredisHandler($this->config); + $this->assertInstanceOf(PredisHandler::class, $handler); + } + + public function testPriority() + { + $handler = new PredisHandler($this->config); + $handler->setPriority('high'); + + $this->assertSame('high', self::getPrivateProperty($handler, 'priority')); + } + + public function testPriorityException() + { + $this->expectException(QueueException::class); + $this->expectExceptionMessage('The priority name should consists only lowercase letters.'); + + $handler = new PredisHandler($this->config); + $handler->setPriority('high_:'); + } + + /** + * @throws ReflectionException + */ + public function testPush() + { + $handler = new PredisHandler($this->config); + $result = $handler->push('queue', 'success', ['key' => 'value']); + + $this->assertTrue($result); + + $predis = self::getPrivateProperty($handler, 'predis'); + $this->assertSame(1, $predis->zcard('queues:queue:low')); + + $task = $predis->zrangebyscore('queues:queue:low', '-inf', Time::now()->timestamp, ['limit' => [0, 1]]); + $queueJob = new QueueJob(json_decode((string) $task[0], true)); + $this->assertSame('success', $queueJob->payload['job']); + $this->assertSame(['key' => 'value'], $queueJob->payload['data']); + } + + /** + * @throws ReflectionException + */ + public function testPushWithPriority() + { + $handler = new PredisHandler($this->config); + $result = $handler->setPriority('high')->push('queue', 'success', ['key' => 'value']); + + $this->assertTrue($result); + + $predis = self::getPrivateProperty($handler, 'predis'); + $this->assertSame(1, $predis->zcard('queues:queue:high')); + + $task = $predis->zrangebyscore('queues:queue:high', '-inf', Time::now()->timestamp, ['limit' => [0, 1]]); + $queueJob = new QueueJob(json_decode((string) $task[0], true)); + $this->assertSame('success', $queueJob->payload['job']); + $this->assertSame(['key' => 'value'], $queueJob->payload['data']); + } + + public function testPushException() + { + $this->expectException(QueueException::class); + $this->expectExceptionMessage('This job name is not defined in the $jobHandlers array.'); + + $handler = new PredisHandler($this->config); + $handler->push('queue', 'not-exists', ['key' => 'value']); + } + + public function testPushWithPriorityException() + { + $this->expectException(QueueException::class); + $this->expectExceptionMessage('This queue has incorrectly defined priority: "invalid" for the queue: "queue".'); + + $handler = new PredisHandler($this->config); + $handler->setPriority('invalid')->push('queue', 'success', ['key' => 'value']); + } + + /** + * @throws ReflectionException + */ + public function testPop() + { + $handler = new PredisHandler($this->config); + $result = $handler->pop('queue1', ['default']); + + $this->assertInstanceOf(QueueJob::class, $result); + + $predis = self::getPrivateProperty($handler, 'predis'); + $this->assertSame(1_234_567_890_654_321, $result->id); + $this->assertSame(0, $predis->zcard('queues:queue1:default')); + $this->assertSame(1, $predis->hexists('queues:queue1::reserved', $result->id)); + } + + public function testPopEmpty() + { + $handler = new PredisHandler($this->config); + $result = $handler->pop('queue123', ['default']); + + $this->assertNull($result); + } + + /** + * @throws ReflectionException + */ + public function testLater() + { + $handler = new PredisHandler($this->config); + $queueJob = $handler->pop('queue1', ['default']); + + $predis = self::getPrivateProperty($handler, 'predis'); + $this->assertSame(1, $predis->hexists('queues:queue1::reserved', $queueJob->id)); + $this->assertSame(0, $predis->zcard('queues:queue1:default')); + + $result = $handler->later($queueJob, 60); + + $this->assertTrue($result); + $this->assertSame(0, $predis->hexists('queues:queue1::reserved', $queueJob->id)); + $this->assertSame(1, $predis->zcard('queues:queue1:default')); + } + + /** + * @throws ReflectionException + */ + public function testFailedAndKeepJob() + { + $handler = new PredisHandler($this->config); + $queueJob = $handler->pop('queue1', ['default']); + + $err = new Exception('Sample exception'); + $result = $handler->failed($queueJob, $err, true); + + $predis = self::getPrivateProperty($handler, 'predis'); + + $this->assertTrue($result); + $this->assertSame(0, $predis->hexists('queues:queue1::reserved', $queueJob->id)); + $this->assertSame(0, $predis->zcard('queues:queue1:default')); + + $this->seeInDatabase('queue_jobs_failed', [ + 'id' => 2, + 'connection' => 'database', + 'queue' => 'queue1', + ]); + } + + /** + * @throws ReflectionException + */ + public function testFailedAndDontKeepJob() + { + $handler = new PredisHandler($this->config); + $queueJob = $handler->pop('queue1', ['default']); + + $err = new Exception('Sample exception'); + $result = $handler->failed($queueJob, $err, false); + + $predis = self::getPrivateProperty($handler, 'predis'); + + $this->assertTrue($result); + $this->assertSame(0, $predis->hexists('queues:queue1::reserved', $queueJob->id)); + $this->assertSame(0, $predis->zcard('queues:queue1:default')); + + $this->dontSeeInDatabase('queue_jobs_failed', [ + 'id' => 2, + 'connection' => 'database', + 'queue' => 'queue1', + ]); + } + + /** + * @throws ReflectionException + */ + public function testDoneAndKeepJob() + { + $handler = new PredisHandler($this->config); + $queueJob = $handler->pop('queue1', ['default']); + + $result = $handler->done($queueJob, true); + + $predis = self::getPrivateProperty($handler, 'predis'); + + $this->assertTrue($result); + $this->assertSame(0, $predis->hexists('queues:queue1::reserved', $queueJob->id)); + $this->assertSame(1, $predis->llen('queues:queue1::done')); + } + + /** + * @throws ReflectionException + */ + public function testDoneAndDontKeepJob() + { + $handler = new PredisHandler($this->config); + $queueJob = $handler->pop('queue1', ['default']); + + $predis = self::getPrivateProperty($handler, 'predis'); + $this->assertSame(0, $predis->zcard('queues:queue1:default')); + + $result = $handler->done($queueJob, false); + + $this->assertTrue($result); + $this->assertSame(0, $predis->hexists('queues:queue1::reserved', $queueJob->id)); + $this->assertSame(0, $predis->llen('queues:queue1::done')); + } + + /** + * @throws ReflectionException + */ + public function testClear() + { + $handler = new PredisHandler($this->config); + $result = $handler->clear('queue1'); + + $this->assertTrue($result); + + $predis = self::getPrivateProperty($handler, 'predis'); + $this->assertSame(0, $predis->zcard('queues:queue1:default')); + + $result = $handler->clear('queue1'); + $this->assertTrue($result); + } + + /** + * @throws ReflectionException + */ + public function testClearAll() + { + $handler = new PredisHandler($this->config); + $result = $handler->clear(); + + $this->assertTrue($result); + + $predis = self::getPrivateProperty($handler, 'predis'); + $this->assertCount(0, $predis->keys('queues:*')); + + $result = $handler->clear(); + $this->assertTrue($result); + } + + /** + * @throws ReflectionException + */ + public function testRetry() + { + $handler = new PredisHandler($this->config); + $count = $handler->retry(1, 'queue1'); + + $this->assertSame($count, 1); + + $predis = self::getPrivateProperty($handler, 'predis'); + $this->assertSame(2, $predis->zcard('queues:queue1:default')); + + $task = $predis->zrangebyscore('queues:queue1:default', '-inf', Time::now()->timestamp, ['limit' => [0, 2]]); + $queueJob = new QueueJob(json_decode((string) $task[1], true)); + $this->assertSame('failure', $queueJob->payload['job']); + $this->assertSame(['failed' => true], $queueJob->payload['data']); + + $this->dontSeeInDatabase('queue_jobs_failed', [ + 'id' => 1, + ]); + } +} diff --git a/tests/QueueTest.php b/tests/QueueTest.php index f9fe243..7c50942 100644 --- a/tests/QueueTest.php +++ b/tests/QueueTest.php @@ -6,7 +6,7 @@ use Michalsn\CodeIgniterQueue\Handlers\DatabaseHandler; use Michalsn\CodeIgniterQueue\Queue; use Tests\Support\Config\Queue as QueueConfig; -use Tests\Support\Database\Seeds\TestQueueSeeder; +use Tests\Support\Database\Seeds\TestDatabaseQueueSeeder; use Tests\Support\TestCase; /** @@ -14,7 +14,7 @@ */ final class QueueTest extends TestCase { - protected $seed = TestQueueSeeder::class; + protected $seed = TestDatabaseQueueSeeder::class; private QueueConfig $config; protected function setUp(): void diff --git a/tests/RedisHandlerTest.php b/tests/RedisHandlerTest.php new file mode 100644 index 0000000..24658e0 --- /dev/null +++ b/tests/RedisHandlerTest.php @@ -0,0 +1,260 @@ +config = config(QueueConfig::class); + } + + public function testRedisHandler() + { + $handler = new RedisHandler($this->config); + $this->assertInstanceOf(RedisHandler::class, $handler); + } + + public function testPriority() + { + $handler = new RedisHandler($this->config); + $handler->setPriority('high'); + + $this->assertSame('high', self::getPrivateProperty($handler, 'priority')); + } + + public function testPriorityException() + { + $this->expectException(QueueException::class); + $this->expectExceptionMessage('The priority name should consists only lowercase letters.'); + + $handler = new RedisHandler($this->config); + $handler->setPriority('high_:'); + } + + public function testPush() + { + $handler = new RedisHandler($this->config); + $result = $handler->push('queue', 'success', ['key' => 'value']); + + $this->assertTrue($result); + + $redis = self::getPrivateProperty($handler, 'redis'); + $this->assertSame(1, $redis->zCard('queues:queue:low')); + + $task = $redis->zRangeByScore('queues:queue:low', '-inf', Time::now()->timestamp, ['limit' => [0, 1]]); + $queueJob = new QueueJob(json_decode((string) $task[0], true)); + $this->assertSame('success', $queueJob->payload['job']); + $this->assertSame(['key' => 'value'], $queueJob->payload['data']); + } + + public function testPushWithPriority() + { + $handler = new RedisHandler($this->config); + $result = $handler->setPriority('high')->push('queue', 'success', ['key' => 'value']); + + $this->assertTrue($result); + + $redis = self::getPrivateProperty($handler, 'redis'); + $this->assertSame(1, $redis->zCard('queues:queue:high')); + + $task = $redis->zRangeByScore('queues:queue:high', '-inf', Time::now()->timestamp, ['limit' => [0, 1]]); + $queueJob = new QueueJob(json_decode((string) $task[0], true)); + $this->assertSame('success', $queueJob->payload['job']); + $this->assertSame(['key' => 'value'], $queueJob->payload['data']); + } + + public function testPushException() + { + $this->expectException(QueueException::class); + $this->expectExceptionMessage('This job name is not defined in the $jobHandlers array.'); + + $handler = new RedisHandler($this->config); + $handler->push('queue', 'not-exists', ['key' => 'value']); + } + + public function testPushWithPriorityException() + { + $this->expectException(QueueException::class); + $this->expectExceptionMessage('This queue has incorrectly defined priority: "invalid" for the queue: "queue".'); + + $handler = new RedisHandler($this->config); + $handler->setPriority('invalid')->push('queue', 'success', ['key' => 'value']); + } + + public function testPop() + { + $handler = new RedisHandler($this->config); + $result = $handler->pop('queue1', ['default']); + + $this->assertInstanceOf(QueueJob::class, $result); + + $redis = self::getPrivateProperty($handler, 'redis'); + $this->assertSame(1_234_567_890_654_321, $result->id); + $this->assertSame(0, $redis->zCard('queues:queue1:default')); + $this->assertTrue($redis->hExists('queues:queue1::reserved', $result->id)); + } + + public function testPopEmpty() + { + $handler = new RedisHandler($this->config); + $result = $handler->pop('queue123', ['default']); + + $this->assertNull($result); + } + + public function testLater() + { + $handler = new RedisHandler($this->config); + $queueJob = $handler->pop('queue1', ['default']); + + $redis = self::getPrivateProperty($handler, 'redis'); + $this->assertTrue($redis->hExists('queues:queue1::reserved', $queueJob->id)); + $this->assertSame(0, $redis->zCard('queues:queue1:default')); + + $result = $handler->later($queueJob, 60); + + $this->assertTrue($result); + $this->assertFalse($redis->hExists('queues:queue1::reserved', $queueJob->id)); + $this->assertSame(1, $redis->zCard('queues:queue1:default')); + } + + public function testFailedAndKeepJob() + { + $handler = new RedisHandler($this->config); + $queueJob = $handler->pop('queue1', ['default']); + + $err = new Exception('Sample exception'); + $result = $handler->failed($queueJob, $err, true); + + $redis = self::getPrivateProperty($handler, 'redis'); + + $this->assertTrue($result); + $this->assertFalse($redis->hExists('queues:queue1::reserved', $queueJob->id)); + $this->assertSame(0, $redis->zCard('queues:queue1:default')); + + $this->seeInDatabase('queue_jobs_failed', [ + 'id' => 2, + 'connection' => 'database', + 'queue' => 'queue1', + ]); + } + + public function testFailedAndDontKeepJob() + { + $handler = new RedisHandler($this->config); + $queueJob = $handler->pop('queue1', ['default']); + + $err = new Exception('Sample exception'); + $result = $handler->failed($queueJob, $err, false); + + $redis = self::getPrivateProperty($handler, 'redis'); + + $this->assertTrue($result); + $this->assertFalse($redis->hExists('queues:queue1::reserved', $queueJob->id)); + $this->assertSame(0, $redis->zCard('queues:queue1:default')); + + $this->dontSeeInDatabase('queue_jobs_failed', [ + 'id' => 2, + 'connection' => 'database', + 'queue' => 'queue1', + ]); + } + + public function testDoneAndKeepJob() + { + $handler = new RedisHandler($this->config); + $queueJob = $handler->pop('queue1', ['default']); + + $result = $handler->done($queueJob, true); + + $redis = self::getPrivateProperty($handler, 'redis'); + + $this->assertTrue($result); + $this->assertFalse($redis->hExists('queues:queue1::reserved', $queueJob->id)); + $this->assertSame(1, $redis->lLen('queues:queue1::done')); + } + + public function testDoneAndDontKeepJob() + { + $handler = new RedisHandler($this->config); + $queueJob = $handler->pop('queue1', ['default']); + + $redis = self::getPrivateProperty($handler, 'redis'); + $this->assertSame(0, $redis->zCard('queues:queue1:default')); + + $result = $handler->done($queueJob, false); + + $this->assertTrue($result); + $this->assertFalse($redis->hExists('queues:queue1::reserved', $queueJob->id)); + $this->assertSame(0, $redis->lLen('queues:queue1::done')); + } + + public function testClear() + { + $handler = new RedisHandler($this->config); + $result = $handler->clear('queue1'); + + $this->assertTrue($result); + + $redis = self::getPrivateProperty($handler, 'redis'); + $this->assertSame(0, $redis->zCard('queues:queue1:default')); + + $result = $handler->clear('queue1'); + $this->assertTrue($result); + } + + public function testClearAll() + { + $handler = new RedisHandler($this->config); + + $result = $handler->clear(); + $this->assertTrue($result); + + $redis = self::getPrivateProperty($handler, 'redis'); + $this->assertCount(0, $redis->keys('queues:*')); + + $result = $handler->clear(); + $this->assertTrue($result); + } + + public function testRetry() + { + $handler = new RedisHandler($this->config); + $count = $handler->retry(1, 'queue1'); + + $this->assertSame($count, 1); + + $redis = self::getPrivateProperty($handler, 'redis'); + $this->assertSame(2, $redis->zCard('queues:queue1:default')); + + $task = $redis->zRangeByScore('queues:queue1:default', '-inf', Time::now()->timestamp, ['limit' => [0, 2]]); + $queueJob = new QueueJob(json_decode((string) $task[1], true)); + $this->assertSame('failure', $queueJob->payload['job']); + $this->assertSame(['failed' => true], $queueJob->payload['data']); + + $this->dontSeeInDatabase('queue_jobs_failed', [ + 'id' => 1, + ]); + } +} diff --git a/tests/_support/Config/Queue.php b/tests/_support/Config/Queue.php index 57f7256..de8f3ce 100644 --- a/tests/_support/Config/Queue.php +++ b/tests/_support/Config/Queue.php @@ -4,6 +4,8 @@ use Michalsn\CodeIgniterQueue\Config\Queue as BaseQueue; use Michalsn\CodeIgniterQueue\Handlers\DatabaseHandler; +use Michalsn\CodeIgniterQueue\Handlers\PredisHandler; +use Michalsn\CodeIgniterQueue\Handlers\RedisHandler; use Tests\Support\Jobs\Failure; use Tests\Support\Jobs\Success; @@ -19,6 +21,8 @@ class Queue extends BaseQueue */ public array $handlers = [ 'database' => DatabaseHandler::class, + 'redis' => RedisHandler::class, + 'predis' => PredisHandler::class, ]; /** @@ -29,6 +33,30 @@ class Queue extends BaseQueue 'getShared' => true, ]; + /** + * Redis and Predis handler config. + */ + public array $redis = [ + 'host' => '127.0.0.1', + 'password' => null, + 'port' => 6379, + 'timeout' => 0, + 'database' => 0, + ]; + + /** + * Predis handler config. + */ + public array $predis = [ + 'scheme' => 'tcp', + 'host' => '127.0.0.1', + 'password' => null, + 'port' => 6379, + 'timeout' => 5, + 'database' => 0, + 'prefix' => '', + ]; + /** * Whether to keep the DONE jobs in the queue. */ diff --git a/tests/_support/Database/Seeds/TestQueueSeeder.php b/tests/_support/Database/Seeds/TestDatabaseQueueSeeder.php similarity index 96% rename from tests/_support/Database/Seeds/TestQueueSeeder.php rename to tests/_support/Database/Seeds/TestDatabaseQueueSeeder.php index 5d3b315..ee6dfd6 100644 --- a/tests/_support/Database/Seeds/TestQueueSeeder.php +++ b/tests/_support/Database/Seeds/TestDatabaseQueueSeeder.php @@ -9,7 +9,7 @@ use Michalsn\CodeIgniterQueue\Models\QueueJobFailedModel; use Michalsn\CodeIgniterQueue\Models\QueueJobModel; -class TestQueueSeeder extends Seeder +class TestDatabaseQueueSeeder extends Seeder { public function run(): void { diff --git a/tests/_support/Database/Seeds/TestRedisQueueSeeder.php b/tests/_support/Database/Seeds/TestRedisQueueSeeder.php new file mode 100644 index 0000000..ef991b1 --- /dev/null +++ b/tests/_support/Database/Seeds/TestRedisQueueSeeder.php @@ -0,0 +1,64 @@ +connect($config->redis['host'], ($config->redis['host'][0] === '/' ? 0 : $config->redis['port']), $config->redis['timeout']); + } catch (RedisException $e) { + throw new CriticalError('Queue: RedisException occurred with message (' . $e->getMessage() . ').'); + } + + $redis->flushDB(); + + $jobQueue = new QueueJob([ + 'id' => '1234567890123456', + 'queue' => 'queue1', + 'payload' => ['job' => 'success', 'data' => []], + 'priority' => 'default', + 'status' => Status::RESERVED->value, + 'attempts' => 0, + 'available_at' => 1_697_269_864, + ]); + $redis->hSet("queues:{$jobQueue->queue}::reserved", $jobQueue->id, json_encode($jobQueue)); + + $jobQueue = new QueueJob([ + 'id' => '1234567890654321', + 'queue' => 'queue1', + 'payload' => ['job' => 'failure', 'data' => []], + 'priority' => 'default', + 'status' => Status::PENDING->value, + 'attempts' => 0, + 'available_at' => 1_697_269_860, + ]); + $redis->zAdd("queues:{$jobQueue->queue}:{$jobQueue->priority}", $jobQueue->available_at->timestamp, json_encode($jobQueue)); + + model(QueueJobFailedModel::class)->insert(new QueueJobFailed([ + 'connection' => 'database', + 'queue' => 'queue1', + 'payload' => ['job' => 'failure', 'data' => ['failed' => true]], + 'priority' => 'default', + 'exception' => 'Exception info', + ])); + } +}