From 506037fbd56e58e38839b53b7e967dc5fc202446 Mon Sep 17 00:00:00 2001 From: Paul Mejia Date: Mon, 8 Nov 2021 17:56:51 -0400 Subject: [PATCH 1/4] Use a waiting queue for the retries --- .env.example | 4 +- src/Queue/Queue.php | 226 ++++++++++++++++++++++++++++++++++++-------- 2 files changed, 188 insertions(+), 42 deletions(-) diff --git a/.env.example b/.env.example index be0df47..fe84946 100644 --- a/.env.example +++ b/.env.example @@ -31,5 +31,5 @@ RABBITMQ_DEFAULT_VHOST=/ STRIPE_SECRET= STRIPE_PUBLIC= -# Each job remains in the queue for 15 seconds -QUEUE_JOB_TIME_TO_LIVE=15000 +# Delay before retrying to process a failed job +QUEUE_RETRY_DELAY=0 diff --git a/src/Queue/Queue.php b/src/Queue/Queue.php index 87abeb0..a0588aa 100644 --- a/src/Queue/Queue.php +++ b/src/Queue/Queue.php @@ -18,44 +18,81 @@ class Queue public const EVENTS = 'events'; public const NOTIFICATIONS = 'notifications'; public const JOBS = 'jobs'; - public const EXCHANGE_NAME = 'baka_exchange'; + public const EXCHANGE_PREFIX = 'baka_exchange'; public const EXCHANGE_TYPE = 'direct'; - public static bool $passive = false; - public static bool $durable = true; - public static bool $exclusive = false; - public static bool $auto_delete = false; + private static bool $passive = false; + private static bool $durable = true; + private static bool $exclusive = false; + private static bool $auto_delete = false; + private static bool $noWait = false; /** - * Send a msg to Queue. + * Dechare an exchange on the channel. * * @param string $name - * @param array|object|mixed $msg + * @param string $type + * @param bool $force * - * @return bool + * @return void */ - public static function send(string $name, $msg) : bool + public static function declareExchange(string $name, bool $force = false) : void { $queue = Di::getDefault()->get('queue'); $channel = $queue->channel(); - $args = new AMQPTable(); + if ($force) { + $channel->exchange_delete($name); + } - $args->set('x-dead-letter-exchange', self::EXCHANGE_NAME); - $args->set( - 'x-message-ttl', - (int) envValue('QUEUE_JOB_TIME_TO_LIVE', 15000), + $channel->exchange_declare( + $name, + self::EXCHANGE_TYPE, + self::getPassive(), + self::getDurable(), + self::getExclusive(), + self::getAutoDelete(), + self::getNoWait() ); - $channel->exchange_declare(self::EXCHANGE_NAME, self::EXCHANGE_TYPE, false, true, false, false, false, $args); + $channel->close(); + } + + /** + * Dechare a queue on the channel. + * + * @param string $name + * @param string $exchange + * @param bool $force + * @param int $delay + * + * @return void + */ + public static function declareQueue(string $name, string $exchange, bool $force = false, int $delay = 0) : void + { + $queue = Di::getDefault()->get('queue'); + + $channel = $queue->channel(); + + if ($force) { + $channel->queue_delete($name); + } + + $args = new AMQPTable(); + $args->set('x-dead-letter-exchange', $exchange); + + if ($delay > 0) { + $args->set('x-message-ttl', $delay); + } /* - name: $queueName + name: $name passive: false - durable: true // the queue will survive server restarts - exclusive: false // the queue can be accessed in other channels - auto_delete: false //the queue won't be deleted once the channel is closed. + durable: true // The queue will survive server restarts. + exclusive: false // The queue can be accessed in other channels. + auto_delete: false // The queue won't be deleted once the channel is closed. + nowait: false // The client should not wait for a reply. */ $channel->queue_declare( $name, @@ -63,16 +100,105 @@ public static function send(string $name, $msg) : bool self::getDurable(), self::getExclusive(), self::getAutoDelete(), - false, + self::getNoWait(), $args ); + $channel->close(); + } + + /** + * Creates all the exchanges and queues for a given name. + * + * @param string $queueName + * @param bool $force + * + * @return void + */ + public static function createFlow(string $queueName, bool $force = false) : void + { + $delay = (int) envValue('QUEUE_RETRY_DELAY', 0); + + if ($delay > 0) { + self::createFlowWithDelay($queueName, $delay, $force); + return; + } + + $queue = Di::getDefault()->get('queue'); + + $channel = $queue->channel(); + + $exchange = self::getExchangeName($queueName); + + if ($force) { + $channel->exchange_delete($exchange); + $channel->queue_delete($queueName); + } + + self::declareExchange($exchange, $force); + + self::declareQueue($queueName, $exchange, $force); + + $channel->queue_bind($queueName, $exchange, $queueName); + } + + public static function createFlowWithDelay(string $queueName, int $delay, bool $force = false) : void + { + $queue = Di::getDefault()->get('queue'); + + $channel = $queue->channel(); + + $mainExchange = self::getExchangeName($queueName); + $nackHandleExchange = "{$mainExchange}.nack_handle"; + $requeueHandleExchange = "{$mainExchange}.requeue_handle"; + + $waitQueueName = "{$queueName}.wait_queue"; + + if ($force) { + $channel->exchange_delete($mainExchange); + $channel->exchange_delete($nackHandleExchange); + $channel->exchange_delete($requeueHandleExchange); + + $channel->queue_delete($queueName); + $channel->queue_delete($waitQueueName); + } + + self::declareExchange($mainExchange, $force); + self::declareExchange($nackHandleExchange, $force); + self::declareExchange($requeueHandleExchange, $force); + + self::declareQueue($queueName, $nackHandleExchange, $force); + self::declareQueue($waitQueueName, $requeueHandleExchange, $force, $delay); + + $channel->queue_bind($queueName, $mainExchange, $queueName); + $channel->queue_bind($queueName, $requeueHandleExchange, $queueName); + $channel->queue_bind($waitQueueName, $nackHandleExchange, $queueName); + } + + /** + * Send a msg to Queue. + * + * @param string $name + * @param array|object|mixed $msg + * + * @return bool + */ + public static function send(string $name, $msg) : bool + { + $queue = Di::getDefault()->get('queue'); + + $channel = $queue->channel(); + $msg = new AMQPMessage($msg, [ 'delivery_mode' => 2, 'message_id' => (new Random())->uuid(), ]); - $channel->basic_publish($msg, self::EXCHANGE_NAME, $name); + self::createFlow($name); + + $exchange = self::getExchangeName($name); + + $channel->basic_publish($msg, $exchange, $name); $channel->close(); @@ -87,7 +213,7 @@ public static function send(string $name, $msg) : bool * * @return void */ - public static function process(string $queueName, callable $callback) : void + public static function process(string $queueName, callable $callback, bool $force = false) : void { $queue = Di::getDefault()->get('queue'); Di::getDefault()->get('log')->info('Starting Queue ' . $queueName); @@ -97,19 +223,7 @@ public static function process(string $queueName, callable $callback) : void */ $channel = $queue->channel(); - $args = new AMQPTable(); - - $args->set('x-dead-letter-exchange', self::EXCHANGE_NAME); - $args->set( - 'x-message-ttl', - (int) envValue('QUEUE_JOB_TIME_TO_LIVE', 15000), - ); - - $channel->exchange_declare(self::EXCHANGE_NAME, self::EXCHANGE_TYPE, false, true, false, false, false, $args); - - $channel->queue_declare($queueName, false, true, false, false, false, $args); - - $channel->queue_bind($queueName, self::EXCHANGE_NAME, self::JOBS); + self::createFlow($queueName, $force); //Fair dispatch https://lukasmestan.com/rabbitmq-broken-pipe-or-closed-connection/ $prefetchSize = null; // message size in bytes or null, otherwise error @@ -119,13 +233,13 @@ public static function process(string $queueName, callable $callback) : void $channel->basic_qos($prefetchSize, $prefetchCount, $applyPerChannel); /* - queueName: Queue from where to get the messages - consumer_tag: Consumer identifier + queueName: Queue from where to get the messages. + consumer_tag: Consumer identifier. no_local: Don't receive messages published by this consumer. no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details. - exclusive: Request exclusive consumer access, meaning only this consumer can access the queue - nowait: - callback: A PHP Callback + exclusive: Request exclusive consumer access, meaning only this consumer can access the queue. + nowait: The client should not wait for a reply. + callback: A PHP Callback. */ $channel->basic_consume($queueName, '', false, false, false, false, $callback); @@ -185,6 +299,18 @@ public static function setDurable(bool $value) : void self::$durable = $value; } + /** + * Set queue $noWait config value. + * + * @param bool $value + * + * @return void + */ + public static function setNoWait(bool $value) : void + { + self::$noWait = $value; + } + /** * Get queue $passive config value. * @@ -224,4 +350,24 @@ public static function getDurable() : bool { return self::$durable; } + + /** + * Get queue $noWait config value. + * + * @return bool + */ + public static function getNoWait() : bool + { + return self::$noWait; + } + + /** + * Get the exchange name fot a given queue. + * + * @return string + */ + public static function getExchangeName($queueName) : string + { + return self::EXCHANGE_PREFIX . ".{$queueName}"; + } } From c97d657b02ae039784f1a40dc884963b567c9507 Mon Sep 17 00:00:00 2001 From: Paul Mejia Date: Mon, 8 Nov 2021 18:04:17 -0400 Subject: [PATCH 2/4] Merge queue retry with 0.7 --- src/Queue/Queue.php | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/Queue/Queue.php b/src/Queue/Queue.php index cc3ed63..a0588aa 100644 --- a/src/Queue/Queue.php +++ b/src/Queue/Queue.php @@ -4,20 +4,22 @@ namespace Baka\Queue; +use function Baka\envValue; use Phalcon\Di; +use Phalcon\Security\Random; use PhpAmqpLib\Message\AMQPMessage; +use PhpAmqpLib\Wire\AMQPTable; class Queue { /** * default canvas queues system name. */ - const EVENTS = 'events'; - const NOTIFICATIONS = 'notifications'; - const JOBS = 'jobs'; - const EXCHANGE_PREFIX = 'baka_exchange'; - const EXCHANGE_TYPE = 'direct'; - + public const EVENTS = 'events'; + public const NOTIFICATIONS = 'notifications'; + public const JOBS = 'jobs'; + public const EXCHANGE_PREFIX = 'baka_exchange'; + public const EXCHANGE_TYPE = 'direct'; private static bool $passive = false; private static bool $durable = true; @@ -92,7 +94,6 @@ public static function declareQueue(string $name, string $exchange, bool $force auto_delete: false // The queue won't be deleted once the channel is closed. nowait: false // The client should not wait for a reply. */ - $channel->queue_declare( $name, self::getPassive(), @@ -189,7 +190,8 @@ public static function send(string $name, $msg) : bool $channel = $queue->channel(); $msg = new AMQPMessage($msg, [ - 'delivery_mode' => 2 + 'delivery_mode' => 2, + 'message_id' => (new Random())->uuid(), ]); self::createFlow($name); @@ -239,7 +241,7 @@ public static function process(string $queueName, callable $callback, bool $forc nowait: The client should not wait for a reply. callback: A PHP Callback. */ - $channel->basic_consume($queueName, '', false, true, false, false, $callback); + $channel->basic_consume($queueName, '', false, false, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); From 2fde9ac506a4fc9c7c105ec67fdded056ea371a0 Mon Sep 17 00:00:00 2001 From: Paul Mejia Date: Tue, 9 Nov 2021 11:05:00 -0400 Subject: [PATCH 3/4] Add description to the Create flow with delay function --- src/Queue/Queue.php | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/Queue/Queue.php b/src/Queue/Queue.php index a0588aa..c4ba7c0 100644 --- a/src/Queue/Queue.php +++ b/src/Queue/Queue.php @@ -142,6 +142,18 @@ public static function createFlow(string $queueName, bool $force = false) : void $channel->queue_bind($queueName, $exchange, $queueName); } + /** + * Creates all the exchanges and queues for a given name. + * Add a "nack handle" exchange that sends the failed jobs to a waiting queue with a "time to live" defined. + * After the time is up a "requeue handle" exchange sends the job back to the main queue. + * For future reference, read https://engineering.nanit.com/rabbitmq-retries-the-full-story-ca4cc6c5b493. + * + * @param string $queueName + * @param int $delay + * @param bool $force + * + * @return void + */ public static function createFlowWithDelay(string $queueName, int $delay, bool $force = false) : void { $queue = Di::getDefault()->get('queue'); From 13ef025d9787f163e65d03e21f3526b027abb5b3 Mon Sep 17 00:00:00 2001 From: Paul Mejia Date: Thu, 11 Nov 2021 16:14:00 -0400 Subject: [PATCH 4/4] Add metadata trait for the queue jobs --- src/Contracts/Metadata/MetadataTrait.php | 41 +++++++++++++++++++ src/Contracts/Queue/QueueableTrait.php | 14 +++++++ src/Jobs/Job.php | 2 + .../Contracts/Metadata/TestMetadataObject.php | 10 +++++ .../unit/Contracts/Metadata/MetadataTest.php | 31 ++++++++++++++ 5 files changed, 98 insertions(+) create mode 100644 src/Contracts/Metadata/MetadataTrait.php create mode 100644 tests/_support/Contracts/Metadata/TestMetadataObject.php create mode 100644 tests/unit/Contracts/Metadata/MetadataTest.php diff --git a/src/Contracts/Metadata/MetadataTrait.php b/src/Contracts/Metadata/MetadataTrait.php new file mode 100644 index 0000000..1ebfe14 --- /dev/null +++ b/src/Contracts/Metadata/MetadataTrait.php @@ -0,0 +1,41 @@ +metadata[$key] ?? $default; + } + + /** + * Set metadata for the current object. + * + * @param string $key + * @param mixed $value + * + * @return void + */ + public function setMetadata(string $key, $value) : void + { + $this->metadata[$key] = $value; + } +} diff --git a/src/Contracts/Queue/QueueableTrait.php b/src/Contracts/Queue/QueueableTrait.php index a019f90..49dd67b 100644 --- a/src/Contracts/Queue/QueueableTrait.php +++ b/src/Contracts/Queue/QueueableTrait.php @@ -15,6 +15,20 @@ trait QueueableTrait */ public string $queue = Queue::JOBS; + /** + * whether to retry the job on fail or not. + * + * @var bool + */ + public bool $useRetry = false; + + /** + * Max quantity of retries for each job. + * + * @var bool + */ + public int $maxRetryQuantity = 0; + /** * Set the desired queue for the job. * diff --git a/src/Jobs/Job.php b/src/Jobs/Job.php index e99fc33..6723e4b 100644 --- a/src/Jobs/Job.php +++ b/src/Jobs/Job.php @@ -4,12 +4,14 @@ namespace Baka\Jobs; +use Baka\Contracts\Metadata\MetadataTrait; use Baka\Contracts\Queue\QueueableJobInterface; use Baka\Contracts\Queue\QueueableTrait; abstract class Job implements QueueableJobInterface { use QueueableTrait; + use MetadataTrait; /** * Execute de Jobs. diff --git a/tests/_support/Contracts/Metadata/TestMetadataObject.php b/tests/_support/Contracts/Metadata/TestMetadataObject.php new file mode 100644 index 0000000..39106ee --- /dev/null +++ b/tests/_support/Contracts/Metadata/TestMetadataObject.php @@ -0,0 +1,10 @@ +setMetadata('test-key', 'test'); + + $value = $testObject->getMetadata('test-key'); + + $this->assertEquals('test', $value); + } + + public function testGetMetadataInt() + { + $testObject = new TestMetadataObject(); + + $testObject->setMetadata('test-key-int', 1234); + $value = $testObject->getMetadata('test-key-int'); + + $this->assertEquals(1234, $value); + } +}