什么是死信队列
消息变成死信的三种情况:
- 消息被拒绝(basic.reject / basic.nack),并且requeue = false
- 消息TTL过期
- 队列达到最大长度
“死信”是RabbitMQ中的一种消息机制,当消息出现以上三种情况,就会变成死信,死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
什么是延迟队列
延迟队列就是利用TTL和死信队列来实现的,在过期时间后将消息转发到死信队列,在死信队列做逻辑处理。从上图中可以看出,如果普通消费者收到消息后啥也不干,等到了过期时间消息就会转发到死信队列中,相当于,消息只是在普通消费者那里暂存了TTL时间,然后交给了死信队列,也就实现了延时效果。
延迟队列使用场景
- 订单超时未支付自动关闭
- 召回N天前注册的用户
- 订单购买成功N分钟后检查下游环节是否正常,如开通会员权益
如何配置死信队列
在业务队列中配置死信队列信息
'x-dead-letter-exchange' => '',//死信交换机
'x-dead-letter-routing-key' => '',//死信路由key
'x-message-ttl' => '',//超时时间,单位毫秒
'x-max-length' => '',//队列最大长度
如何让消息过期
让消息过期有2种方式,一种是在队列中设置x-message-ttl
//声明支付通知队列
$channel->queue_declare($queue_pay_notice,false,false,false,false,false,new \PhpAmqpLib\Wire\AMQPTable([
'x-dead-letter-exchange'=>$exchange_order_delay,
'x-dead-letter-routing-key'=>$routing_key_order_delay,
'x-message-ttl'=>10000,//过期时间10S
]));
这种设置方式会让每个消息的过期时间都一样,都是10S,如果想灵活的设置过期时间,可以再发送消息的时候设置expiration
$msg = new \PhpAmqpLib\Message\AMQPMessage('hello',['expiration'=>10000]) //10s后过期;
$channel->basic_publish(msg, $exchange_name, $routing_key);
如何实现死信队列
普通消费者
死信队列在普通消费者中的代码需要处理的工作是最多的,需要声明普通队列和死信队列,普通交换机和死信交换机,并绑定普通交换机的路由键,死信队列的路由键,并在普通队列中配置死信队列的交换机和路由键。
生产者
生产者还是一如既往只要负责发消息即可。生产者并不知道消息最终会进入什么流程,他只管把消息发送到对应的普通交换机即可。
死信队列消费者
死信队列消费者也不需要关心之前的逻辑,只要将收到的消息做对应的处理即可。
PHP demo
生产者 send.php
<?php
require_once __DIR__ . '/../../vendor/autoload.php';
$queue_order = 'queue_order';
$queue_delay_order = 'queue_delay_order';
$exchange_delay_order = 'exchange_delay_order';
$routing_delay_order = 'routing_delay_order';
$delay_ttl = 10 * 1000;
//场景 关闭10分钟后未支付的订单
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
$channel = $conn->channel();
$channel->queue_declare($queue_order, false, false, false, false, false, New \PhpAmqpLib\Wire\AMQPTable([
'x-dead-letter-exchange' => $exchange_delay_order,
'x-dead-letter-routing-key' => $routing_delay_order,
'x-message-ttl' => $delay_ttl
]));
//绑定死信队列
$channel->queue_declare($queue_delay_order, false, false, false, false);
$channel->exchange_declare($exchange_delay_order, 'direct', false, false, false);
$channel->queue_bind($queue_delay_order, $exchange_delay_order, $routing_delay_order);
for ($i = 1; $i <= 10; $i++) {
$msg = json_encode(['orderId' => $i,'time'=>date('H:i:s')]);
echo 'seng msg' . $msg . PHP_EOL;
$msg = new \PhpAmqpLib\Message\AMQPMessage($msg);
$channel->basic_publish($msg, '',$queue_order);
}
$channel->close();
$conn->close();
业务消费者 receive.php
<?php
require_once __DIR__ . '/../../vendor/autoload.php';
$queue_order = 'queue_order';
//场景 关闭10分钟后未支付的订单
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
$channel = $conn->channel();
$channel->basic_consume($queue_order, '', false, false, false, false, function ($msg) {
$data = json_decode($msg->body, true);
if ($data['orderId'] % 3 == 0) {//3,6,9
//确认消息
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);//ack
echo 'get msg & ack orderId:' . $data['orderId'] . $msg->body . PHP_EOL;
} else if ($data['orderId'] % 3 == 1) {//1,4,7,10
//不确认消息
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);//nack
echo 'get msg & nack orderId:' . $data['orderId'] . $msg->body . PHP_EOL;
} else {//2,5,8
//拒绝消息
$msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
echo 'get msg & reject orderId:' . $data['orderId'] . $msg->body . PHP_EOL;
}
});
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$conn->close();
延迟队列消费者 delay_receive.php
<?php
require_once __DIR__ . '/../../vendor/autoload.php';
$queue_delay_order = 'queue_delay_order';
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
$channel = $conn->channel();
$channel->queue_declare($queue_delay_order, false, false, false, false);
$channel->basic_consume($queue_delay_order, '', false, false, false, false, function ($msg) {
echo date('H:i:s').' get msg ' . $msg->body . PHP_EOL;
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);//确认收到消息
});
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$conn->close();
可以看到 nack和reject的消息进入到了死信队列中。如果没有运行receive.php,那么消息将会在设置的$delay_ttl后进入死信队列。