Skip to content

Commit

Permalink
AMQP 延时消息示例
Browse files Browse the repository at this point in the history
  • Loading branch information
Yurunsoft committed Aug 31, 2023
1 parent f1a3071 commit 4ede7df
Showing 1 changed file with 288 additions and 0 deletions.
288 changes: 288 additions & 0 deletions doc/components/mq/amqp.md
Original file line number Diff line number Diff line change
Expand Up @@ -561,3 +561,291 @@ class TestProcess extends BaseProcess
| heartbeat | 心跳时间,默认`0` |
| channelRpcTimeout | 频道 RPC 超时时间,默认`0.0` |
| sslProtocol | ssl 协议,默认`null` |

## 使用示例

### 延时消息

支付宝、微信支付都有一个逻辑,就是用户支付成功后会通过 HTTP 请求来通知我们的接口。

接口如果没有按照约定的格式返回成功,会定时重试N次,每次延时不同,直到超过一定次数后,就不再重试。

下面是一个支付通知的消费者示例:

**消费者类:**

```php
<?php

namespace PayService\Module\Pay\AMQP\PayNotify;

use Imi\AMQP\Annotation\Consumer;
use Imi\AMQP\Annotation\Exchange;
use Imi\AMQP\Annotation\Queue;
use Imi\AMQP\Base\BaseConsumer;
use Imi\AMQP\Contract\IMessage;
use Imi\AMQP\Enum\ConsumerResult;
use Imi\Bean\Annotation\AnnotationManager;
use Imi\Bean\Annotation\Bean;
use Imi\Bean\BeanFactory;
use Imi\Log\Log;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;

/**
* @Bean("PayNotifyConsumer")
*
* @Exchange(name="exchange-pay-notify")
*
* @Queue(name="pay-notify", arguments={"x-dead-letter-exchange"="exchange-pay-notify-dead"})
*
* @Exchange(name="exchange-pay-notify-step-1")
*
* @Queue(name="pay-notify-step-1", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=15000})
*
* @Exchange(name="exchange-pay-notify-step-2")
*
* @Queue(name="pay-notify-step-2", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=15000})
*
* @Exchange(name="exchange-pay-notify-step-3")
*
* @Queue(name="pay-notify-step-3", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=30000})
*
* @Exchange(name="exchange-pay-notify-step-4")
*
* @Queue(name="pay-notify-step-4", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=180000})
*
* @Exchange(name="exchange-pay-notify-step-5")
*
* @Queue(name="pay-notify-step-5", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=600000})
*
* @Exchange(name="exchange-pay-notify-step-6")
*
* @Queue(name="pay-notify-step-6", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=1200000})
*
* @Exchange(name="exchange-pay-notify-step-7")
*
* @Queue(name="pay-notify-step-7", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=1800000})
*
* @Exchange(name="exchange-pay-notify-step-8")
*
* @Queue(name="pay-notify-step-8", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=1800000})
*
* @Exchange(name="exchange-pay-notify-step-9")
*
* @Queue(name="pay-notify-step-9", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=1800000})
*
* @Exchange(name="exchange-pay-notify-step-10")
*
* @Queue(name="pay-notify-step-10", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=3600000})
*
* @Exchange(name="exchange-pay-notify-step-11")
*
* @Queue(name="pay-notify-step-11", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=10800000})
*
* @Exchange(name="exchange-pay-notify-step-12")
*
* @Queue(name="pay-notify-step-12", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=10800000})
*
* @Exchange(name="exchange-pay-notify-step-13")
*
* @Queue(name="pay-notify-step-13", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=10800000})
*
* @Exchange(name="exchange-pay-notify-step-14")
*
* @Queue(name="pay-notify-step-14", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=21600000})
*
* @Exchange(name="exchange-pay-notify-step-15")
*
* @Queue(name="pay-notify-step-15", arguments={"x-dead-letter-exchange"="exchange-pay-notify", "x-message-ttl"=21600000})
*
* @Consumer(tag="payNotify", queue="pay-notify", exchange="exchange-pay-notify", message=\PayService\Module\Pay\AMQP\PayNotify\PayNotifyMessage::class)
*/
class PayNotifyConsumer extends BaseConsumer
{
/**
* 定义消费者.
*
* @return void
*/
protected function declareConsumer(): void
{
// 定义死信
$this->channel->exchange_declare('exchange-pay-notify-dead', AMQPExchangeType::DIRECT, false, true, false);
$this->channel->queue_declare('pay-notify-dead', false, true, false, false);
$this->channel->queue_bind('pay-notify-dead', 'exchange-pay-notify-dead');
$this->channel->queue_bind('pay-notify-dead', 'exchange-pay-notify-dead', 'pay-notify');
$this->channel->queue_bind('pay-notify-dead', 'exchange-pay-notify-dead', 'pay-notify-dead');
parent::declareConsumer();
}

/**
* 消费任务
*
* @param \PayService\Module\Pay\AMQP\PayNotify\PayNotifyMessage $message
*
* @return void
*/
protected function consume(IMessage $message)
{
try
{
// 是否需要重试
$needRetry = false;
// HTTP通知逻辑
// ...
// $needRetry = true; // 通知失败,需要重试
}
// 你也可以定义一个用于重试的异常,在上面逻辑中抛出
// catch (RetryException $re)
// {
// $needRetry = true;
// }
catch (\Throwable $th)
{
throw $th;
}
finally
{
if (isset($th))
{
$result = ConsumerResult::REJECT;
}
elseif ($needRetry)
{
$stepCount = \count(AnnotationManager::getClassAnnotations(BeanFactory::getObjectClass($this), Queue::class));
if ($message->getRetryCount() < $stepCount - 1)
{
$newMessage = clone $message;
$step = $message->getRetryCount() + 1;
$newMessage->setRetryCount($step);
$amqpMessage = $newMessage->getAMQPMessage();
$amqpMessage->set('delivery_mode', AMQPMessage::DELIVERY_MODE_PERSISTENT);
$amqpMessage->setBody($newMessage->getBody());
$queueName = 'pay-notify-step-' . $step;
$exchangeName = 'exchange-pay-notify-step-' . $step;
$this->channel->queue_bind($queueName, $exchangeName);
$this->channel->basic_publish($amqpMessage, $exchangeName);
}
else
{
$result = ConsumerResult::REJECT;
}
}
}

return $result ?? ConsumerResult::ACK;
}
}
```

**PayNotifyMessage:**

```php
<?php

namespace PayService\Module\Pay\AMQP\PayNotify;

use Imi\AMQP\Message;

class PayNotifyMessage extends Message
{
/**
* 支付订单ID.
*
* @var int
*/
private $payOrderId;

/**
* 延时多少秒执行.
*
* @var int
*/
private $retryCount = 0;

public function __construct()
{
parent::__construct();
$this->format = \Imi\Util\Format\Json::class;
}

/**
* 设置主体数据.
*
* @param mixed $data
*
* @return self
*/
public function setBodyData($data): self
{
foreach ($data as $k => $v)
{
$this->$k = $v;
}

return $this;
}

/**
* 获取主体数据.
*
* @return mixed
*/
public function getBodyData()
{
return [
'payOrderId' => $this->payOrderId,
'retryCount' => $this->retryCount,
];
}

/**
* Get 支付订单ID.
*
* @return int
*/
public function getPayOrderId()
{
return $this->payOrderId;
}

/**
* Set 支付订单ID.
*
* @param int $payOrderId 支付订单ID
*
* @return self
*/
public function setPayOrderId(int $payOrderId)
{
$this->payOrderId = $payOrderId;

return $this;
}

/**
* Get 延时多少秒执行.
*
* @return int
*/
public function getRetryCount()
{
return $this->retryCount;
}

/**
* Set 延时多少秒执行.
*
* @param int $retryCount 延时多少秒执行
*
* @return self
*/
public function setRetryCount(int $retryCount)
{
$this->retryCount = $retryCount;

return $this;
}
}
```

0 comments on commit 4ede7df

Please sign in to comment.