Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature - SAW-1922: Add support to retry queue jobs on fail #157

Open
wants to merge 6 commits into
base: 0.7
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ RABBITMQ_DEFAULT_VHOST=/
# PAYTMENTS
STRIPE_SECRET=
STRIPE_PUBLIC=

# Delay before retrying to process a failed job
QUEUE_RETRY_DELAY=0
230 changes: 203 additions & 27 deletions src/Queue/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,58 +4,202 @@

namespace Baka\Queue;

use function Baka\envValue;
use Phalcon\Di;
use Phalcon\Security\Random;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class Queue
{
/**
* default canvas queues system name.
*/
const EVENTS = 'events';
const NOTIFICATIONS = 'notifications';
const JOBS = 'jobs';
public const EVENTS = 'events';
public const NOTIFICATIONS = 'notifications';
public const JOBS = 'jobs';
public const EXCHANGE_PREFIX = 'baka_exchange';
public const EXCHANGE_TYPE = 'direct';

public static bool $passive = false;
public static bool $durable = true;
public static bool $exclusive = false;
public static bool $auto_delete = false;
private static bool $passive = false;
private static bool $durable = true;
private static bool $exclusive = false;
private static bool $auto_delete = false;
private static bool $noWait = false;

/**
* Send a msg to Queue.
* Dechare an exchange on the channel.
*
* @param string $name
* @param array|object|mixed $msg
* @param string $type
* @param bool $force
*
* @return bool
* @return void
*/
public static function send(string $name, $msg) : bool
public static function declareExchange(string $name, bool $force = false) : void
PaulMejiaFeliz marked this conversation as resolved.
Show resolved Hide resolved
{
$queue = Di::getDefault()->get('queue');

$channel = $queue->channel();

if ($force) {
$channel->exchange_delete($name);
}

$channel->exchange_declare(
$name,
self::EXCHANGE_TYPE,
self::getPassive(),
self::getDurable(),
self::getExclusive(),
self::getAutoDelete(),
self::getNoWait()
);

$channel->close();
}

/**
* Dechare a queue on the channel.
*
* @param string $name
* @param string $exchange
* @param bool $force
* @param int $delay
*
* @return void
*/
public static function declareQueue(string $name, string $exchange, bool $force = false, int $delay = 0) : void
{
$queue = Di::getDefault()->get('queue');

$channel = $queue->channel();

if ($force) {
$channel->queue_delete($name);
}

$args = new AMQPTable();
$args->set('x-dead-letter-exchange', $exchange);

if ($delay > 0) {
$args->set('x-message-ttl', $delay);
}

/*
name: $queueName
name: $name
passive: false
durable: true // the queue will survive server restarts
exclusive: false // the queue can be accessed in other channels
auto_delete: false //the queue won't be deleted once the channel is closed.
durable: true // The queue will survive server restarts.
exclusive: false // The queue can be accessed in other channels.
auto_delete: false // The queue won't be deleted once the channel is closed.
nowait: false // The client should not wait for a reply.
*/

$channel->queue_declare(
$name,
self::getPassive(),
self::getDurable(),
self::getExclusive(),
self::getAutoDelete()
self::getAutoDelete(),
self::getNoWait(),
$args
);

$channel->close();
}

/**
* Creates all the exchanges and queues for a given name.
*
* @param string $queueName
* @param bool $force
*
* @return void
*/
public static function createFlow(string $queueName, bool $force = false) : void
{
$delay = (int) envValue('QUEUE_RETRY_DELAY', 0);

if ($delay > 0) {
self::createFlowWithDelay($queueName, $delay, $force);
return;
}

$queue = Di::getDefault()->get('queue');

$channel = $queue->channel();

$exchange = self::getExchangeName($queueName);

if ($force) {
$channel->exchange_delete($exchange);
$channel->queue_delete($queueName);
}

self::declareExchange($exchange, $force);

self::declareQueue($queueName, $exchange, $force);

$channel->queue_bind($queueName, $exchange, $queueName);
}

public static function createFlowWithDelay(string $queueName, int $delay, bool $force = false) : void
PaulMejiaFeliz marked this conversation as resolved.
Show resolved Hide resolved
{
$queue = Di::getDefault()->get('queue');

$channel = $queue->channel();

$mainExchange = self::getExchangeName($queueName);
$nackHandleExchange = "{$mainExchange}.nack_handle";
$requeueHandleExchange = "{$mainExchange}.requeue_handle";

$waitQueueName = "{$queueName}.wait_queue";

if ($force) {
$channel->exchange_delete($mainExchange);
$channel->exchange_delete($nackHandleExchange);
$channel->exchange_delete($requeueHandleExchange);

$channel->queue_delete($queueName);
$channel->queue_delete($waitQueueName);
}

self::declareExchange($mainExchange, $force);
self::declareExchange($nackHandleExchange, $force);
self::declareExchange($requeueHandleExchange, $force);

self::declareQueue($queueName, $nackHandleExchange, $force);
self::declareQueue($waitQueueName, $requeueHandleExchange, $force, $delay);

$channel->queue_bind($queueName, $mainExchange, $queueName);
$channel->queue_bind($queueName, $requeueHandleExchange, $queueName);
$channel->queue_bind($waitQueueName, $nackHandleExchange, $queueName);
}

/**
* Send a msg to Queue.
*
* @param string $name
* @param array|object|mixed $msg
*
* @return bool
*/
public static function send(string $name, $msg) : bool
{
$queue = Di::getDefault()->get('queue');

$channel = $queue->channel();

$msg = new AMQPMessage($msg, [
'delivery_mode' => 2
'delivery_mode' => 2,
'message_id' => (new Random())->uuid(),
]);

$channel->basic_publish($msg, '', $name);
self::createFlow($name);

$exchange = self::getExchangeName($name);

$channel->basic_publish($msg, $exchange, $name);

$channel->close();

return true;
Expand All @@ -69,7 +213,7 @@ public static function send(string $name, $msg) : bool
*
* @return void
*/
public static function process(string $queueName, callable $callback) : void
public static function process(string $queueName, callable $callback, bool $force = false) : void
{
$queue = Di::getDefault()->get('queue');
Di::getDefault()->get('log')->info('Starting Queue ' . $queueName);
Expand All @@ -79,7 +223,7 @@ public static function process(string $queueName, callable $callback) : void
*/
$channel = $queue->channel();

$channel->queue_declare($queueName, false, true, false, false);
self::createFlow($queueName, $force);

//Fair dispatch https://lukasmestan.com/rabbitmq-broken-pipe-or-closed-connection/
$prefetchSize = null; // message size in bytes or null, otherwise error
Expand All @@ -89,15 +233,15 @@ public static function process(string $queueName, callable $callback) : void
$channel->basic_qos($prefetchSize, $prefetchCount, $applyPerChannel);

/*
queueName: Queue from where to get the messages
consumer_tag: Consumer identifier
queueName: Queue from where to get the messages.
consumer_tag: Consumer identifier.
no_local: Don't receive messages published by this consumer.
no_ack: If set to true, automatic acknowledgement mode will be used by this consumer. See https://www.rabbitmq.com/confirms.html for details.
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
nowait:
callback: A PHP Callback
exclusive: Request exclusive consumer access, meaning only this consumer can access the queue.
nowait: The client should not wait for a reply.
callback: A PHP Callback.
*/
$channel->basic_consume($queueName, '', false, true, false, false, $callback);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
$channel->wait();
Expand Down Expand Up @@ -155,6 +299,18 @@ public static function setDurable(bool $value) : void
self::$durable = $value;
}

/**
* Set queue $noWait config value.
*
* @param bool $value
*
* @return void
*/
public static function setNoWait(bool $value) : void
{
self::$noWait = $value;
}

/**
* Get queue $passive config value.
*
Expand Down Expand Up @@ -194,4 +350,24 @@ public static function getDurable() : bool
{
return self::$durable;
}

/**
* Get queue $noWait config value.
*
* @return bool
*/
public static function getNoWait() : bool
{
return self::$noWait;
}

/**
* Get the exchange name fot a given queue.
*
* @return string
*/
public static function getExchangeName($queueName) : string
{
return self::EXCHANGE_PREFIX . ".{$queueName}";
}
}