diff --git a/.env.example b/.env.example index ab5c80df..fe849462 100644 --- a/.env.example +++ b/.env.example @@ -30,3 +30,6 @@ RABBITMQ_DEFAULT_VHOST=/ # PAYTMENTS STRIPE_SECRET= STRIPE_PUBLIC= + +# Delay before retrying to process a failed job +QUEUE_RETRY_DELAY=0 diff --git a/src/Contracts/Metadata/MetadataTrait.php b/src/Contracts/Metadata/MetadataTrait.php new file mode 100644 index 00000000..1ebfe140 --- /dev/null +++ b/src/Contracts/Metadata/MetadataTrait.php @@ -0,0 +1,41 @@ +metadata[$key] ?? $default; + } + + /** + * Set metadata for the current object. + * + * @param string $key + * @param mixed $value + * + * @return void + */ + public function setMetadata(string $key, $value) : void + { + $this->metadata[$key] = $value; + } +} diff --git a/src/Contracts/Queue/QueueableTrait.php b/src/Contracts/Queue/QueueableTrait.php index a019f908..49dd67b3 100644 --- a/src/Contracts/Queue/QueueableTrait.php +++ b/src/Contracts/Queue/QueueableTrait.php @@ -15,6 +15,20 @@ trait QueueableTrait */ public string $queue = Queue::JOBS; + /** + * whether to retry the job on fail or not. + * + * @var bool + */ + public bool $useRetry = false; + + /** + * Max quantity of retries for each job. + * + * @var bool + */ + public int $maxRetryQuantity = 0; + /** * Set the desired queue for the job. * diff --git a/src/Jobs/Job.php b/src/Jobs/Job.php index e99fc33f..6723e4bb 100644 --- a/src/Jobs/Job.php +++ b/src/Jobs/Job.php @@ -4,12 +4,14 @@ namespace Baka\Jobs; +use Baka\Contracts\Metadata\MetadataTrait; use Baka\Contracts\Queue\QueueableJobInterface; use Baka\Contracts\Queue\QueueableTrait; abstract class Job implements QueueableJobInterface { use QueueableTrait; + use MetadataTrait; /** * Execute de Jobs. diff --git a/src/Queue/Queue.php b/src/Queue/Queue.php index e066abda..c4ba7c0f 100644 --- a/src/Queue/Queue.php +++ b/src/Queue/Queue.php @@ -4,58 +4,214 @@ namespace Baka\Queue; +use function Baka\envValue; use Phalcon\Di; +use Phalcon\Security\Random; use PhpAmqpLib\Message\AMQPMessage; +use PhpAmqpLib\Wire\AMQPTable; class Queue { /** * default canvas queues system name. */ - const EVENTS = 'events'; - const NOTIFICATIONS = 'notifications'; - const JOBS = 'jobs'; + public const EVENTS = 'events'; + public const NOTIFICATIONS = 'notifications'; + public const JOBS = 'jobs'; + public const EXCHANGE_PREFIX = 'baka_exchange'; + public const EXCHANGE_TYPE = 'direct'; - public static bool $passive = false; - public static bool $durable = true; - public static bool $exclusive = false; - public static bool $auto_delete = false; + private static bool $passive = false; + private static bool $durable = true; + private static bool $exclusive = false; + private static bool $auto_delete = false; + private static bool $noWait = false; /** - * Send a msg to Queue. + * Dechare an exchange on the channel. * * @param string $name - * @param array|object|mixed $msg + * @param string $type + * @param bool $force * - * @return bool + * @return void */ - public static function send(string $name, $msg) : bool + public static function declareExchange(string $name, bool $force = false) : void { $queue = Di::getDefault()->get('queue'); $channel = $queue->channel(); + if ($force) { + $channel->exchange_delete($name); + } + + $channel->exchange_declare( + $name, + self::EXCHANGE_TYPE, + self::getPassive(), + self::getDurable(), + self::getExclusive(), + self::getAutoDelete(), + self::getNoWait() + ); + + $channel->close(); + } + + /** + * Dechare a queue on the channel. + * + * @param string $name + * @param string $exchange + * @param bool $force + * @param int $delay + * + * @return void + */ + public static function declareQueue(string $name, string $exchange, bool $force = false, int $delay = 0) : void + { + $queue = Di::getDefault()->get('queue'); + + $channel = $queue->channel(); + + if ($force) { + $channel->queue_delete($name); + } + + $args = new AMQPTable(); + $args->set('x-dead-letter-exchange', $exchange); + + if ($delay > 0) { + $args->set('x-message-ttl', $delay); + } + /* - name: $queueName + name: $name passive: false - durable: true // the queue will survive server restarts - exclusive: false // the queue can be accessed in other channels - auto_delete: false //the queue won't be deleted once the channel is closed. + durable: true // The queue will survive server restarts. + exclusive: false // The queue can be accessed in other channels. + auto_delete: false // The queue won't be deleted once the channel is closed. + nowait: false // The client should not wait for a reply. */ - $channel->queue_declare( $name, self::getPassive(), self::getDurable(), self::getExclusive(), - self::getAutoDelete() + self::getAutoDelete(), + self::getNoWait(), + $args ); + $channel->close(); + } + + /** + * Creates all the exchanges and queues for a given name. + * + * @param string $queueName + * @param bool $force + * + * @return void + */ + public static function createFlow(string $queueName, bool $force = false) : void + { + $delay = (int) envValue('QUEUE_RETRY_DELAY', 0); + + if ($delay > 0) { + self::createFlowWithDelay($queueName, $delay, $force); + return; + } + + $queue = Di::getDefault()->get('queue'); + + $channel = $queue->channel(); + + $exchange = self::getExchangeName($queueName); + + if ($force) { + $channel->exchange_delete($exchange); + $channel->queue_delete($queueName); + } + + self::declareExchange($exchange, $force); + + self::declareQueue($queueName, $exchange, $force); + + $channel->queue_bind($queueName, $exchange, $queueName); + } + + /** + * Creates all the exchanges and queues for a given name. + * Add a "nack handle" exchange that sends the failed jobs to a waiting queue with a "time to live" defined. + * After the time is up a "requeue handle" exchange sends the job back to the main queue. + * For future reference, read https://engineering.nanit.com/rabbitmq-retries-the-full-story-ca4cc6c5b493. + * + * @param string $queueName + * @param int $delay + * @param bool $force + * + * @return void + */ + public static function createFlowWithDelay(string $queueName, int $delay, bool $force = false) : void + { + $queue = Di::getDefault()->get('queue'); + + $channel = $queue->channel(); + + $mainExchange = self::getExchangeName($queueName); + $nackHandleExchange = "{$mainExchange}.nack_handle"; + $requeueHandleExchange = "{$mainExchange}.requeue_handle"; + + $waitQueueName = "{$queueName}.wait_queue"; + + if ($force) { + $channel->exchange_delete($mainExchange); + $channel->exchange_delete($nackHandleExchange); + $channel->exchange_delete($requeueHandleExchange); + + $channel->queue_delete($queueName); + $channel->queue_delete($waitQueueName); + } + + self::declareExchange($mainExchange, $force); + self::declareExchange($nackHandleExchange, $force); + self::declareExchange($requeueHandleExchange, $force); + + self::declareQueue($queueName, $nackHandleExchange, $force); + self::declareQueue($waitQueueName, $requeueHandleExchange, $force, $delay); + + $channel->queue_bind($queueName, $mainExchange, $queueName); + $channel->queue_bind($queueName, $requeueHandleExchange, $queueName); + $channel->queue_bind($waitQueueName, $nackHandleExchange, $queueName); + } + + /** + * Send a msg to Queue. + * + * @param string $name + * @param array|object|mixed $msg + * + * @return bool + */ + public static function send(string $name, $msg) : bool + { + $queue = Di::getDefault()->get('queue'); + + $channel = $queue->channel(); + $msg = new AMQPMessage($msg, [ - 'delivery_mode' => 2 + 'delivery_mode' => 2, + 'message_id' => (new Random())->uuid(), ]); - $channel->basic_publish($msg, '', $name); + self::createFlow($name); + + $exchange = self::getExchangeName($name); + + $channel->basic_publish($msg, $exchange, $name); + $channel->close(); return true; @@ -69,7 +225,7 @@ public static function send(string $name, $msg) : bool * * @return void */ - public static function process(string $queueName, callable $callback) : void + public static function process(string $queueName, callable $callback, bool $force = false) : void { $queue = Di::getDefault()->get('queue'); Di::getDefault()->get('log')->info('Starting Queue ' . $queueName); @@ -79,7 +235,7 @@ public static function process(string $queueName, callable $callback) : void */ $channel = $queue->channel(); - $channel->queue_declare($queueName, false, true, false, false); + self::createFlow($queueName, $force); //Fair dispatch https://lukasmestan.com/rabbitmq-broken-pipe-or-closed-connection/ $prefetchSize = null; // message size in bytes or null, otherwise error @@ -89,15 +245,15 @@ public static function process(string $queueName, callable $callback) : void $channel->basic_qos($prefetchSize, $prefetchCount, $applyPerChannel); /* - queueName: Queue from where to get the messages - consumer_tag: Consumer identifier + queueName: Queue from where to get the messages. + consumer_tag: Consumer identifier. no_local: Don't receive messages published by this consumer. no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details. - exclusive: Request exclusive consumer access, meaning only this consumer can access the queue - nowait: - callback: A PHP Callback + exclusive: Request exclusive consumer access, meaning only this consumer can access the queue. + nowait: The client should not wait for a reply. + callback: A PHP Callback. */ - $channel->basic_consume($queueName, '', false, true, false, false, $callback); + $channel->basic_consume($queueName, '', false, false, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); @@ -155,6 +311,18 @@ public static function setDurable(bool $value) : void self::$durable = $value; } + /** + * Set queue $noWait config value. + * + * @param bool $value + * + * @return void + */ + public static function setNoWait(bool $value) : void + { + self::$noWait = $value; + } + /** * Get queue $passive config value. * @@ -194,4 +362,24 @@ public static function getDurable() : bool { return self::$durable; } + + /** + * Get queue $noWait config value. + * + * @return bool + */ + public static function getNoWait() : bool + { + return self::$noWait; + } + + /** + * Get the exchange name fot a given queue. + * + * @return string + */ + public static function getExchangeName($queueName) : string + { + return self::EXCHANGE_PREFIX . ".{$queueName}"; + } } diff --git a/tests/_support/Contracts/Metadata/TestMetadataObject.php b/tests/_support/Contracts/Metadata/TestMetadataObject.php new file mode 100644 index 00000000..39106ee6 --- /dev/null +++ b/tests/_support/Contracts/Metadata/TestMetadataObject.php @@ -0,0 +1,10 @@ +setMetadata('test-key', 'test'); + + $value = $testObject->getMetadata('test-key'); + + $this->assertEquals('test', $value); + } + + public function testGetMetadataInt() + { + $testObject = new TestMetadataObject(); + + $testObject->setMetadata('test-key-int', 1234); + $value = $testObject->getMetadata('test-key-int'); + + $this->assertEquals(1234, $value); + } +}