消息中间件RabbitMQ高级讲解!含代码解析!(TTL、死信/延迟队列、消息的确认与追踪)

1. RabbitMQ 高级

1.1. 过期时间TTL

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。

  • 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
  • 第二种方法是对消息进行单独设置,每条消息TTL可以不同。

如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消
息。

1.1.1. 设置队列TTL

spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml文件中添加
如下内容:

<!--定义过期队列及其属性,不存在则自动创建--> 
<rabbit:queue id="my_ttl_queue" name="my_ttl_queue" auto-declare="true">
<rabbit:queue-arguments> <!--投递到该队列的消息如果没有消费都将在6秒之后被删除--> 
<entry key="x-message-ttl" value-type="long" value="6000"/> 
</rabbit:queue-arguments> 
</rabbit:queue>

然后在测试类 spring-rabbitmq-producer\src\test\java\com\itheima\rabbitmq\ProducerTest.java中编写如下方法发送消息到上述定义的队列:

/*
** 过期队列消息 
* 投递到该队列的消息如果没有消费都将在6秒之后被删除 
 */ 
 @Test public void ttlQueueTest(){ 
 //路由键与队列同名 
 rabbitTemplate.convertAndSend("my_ttl_queue", "发送到过期队列my_ttl_queue, 
 6秒内不消费则不能再被消费。"); 
 }

参数 x-message-ttl 的值 必须是非负 32 位整数 (0 <= n <= 2^32-1) ,以毫秒为单位表示 TTL 的值。这样,值 6000 表示存在于 队列 中的当前 消息 将最多只存活 6 秒钟。

如果不设置TTL,则表示此消息不会过期。如果将TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃。


在这里插入图片描述

1.1.2. 设置消息TTL

消息的过期时间;只需要在发送消息(可以发送到任何队列,不管该队列是否属于某个交换机)的时候设置过期时间即可。在测试类中编写如下方法发送消息并设置过期时间到队列:

 /*
 ** 过期消息 
 * 该消息投递任何交换机或队列中的时候;如果到了过期时间则将从该队列中删除 
 */ 
 @Test
public void ttlMessageTest(){ 
MessageProperties messageProperties = new MessageProperties();
//设置消息的过期时间,5秒 
messageProperties.setExpiration("5000"); 
Message message = new Message("测试过期消息,5秒钟过期".getBytes(), 
messageProperties); 
//路由键与队列同名 
rabbitTemplate.convertAndSend("my_ttl_queue", message); 
}

expiration 字段以微秒为单位表示 TTL 值。且与 x-message-ttl 具有相同的约束条件。因为
expiration 字段必须为字符串类型,broker 将只会接受以字符串形式表达的数字。
当同时指定了 queue 和 message 的 TTL 值,则两者中较小的那个才会起作用。

1.2. 死信队列

DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。

消息变成死信,可能是由于以下的原因:

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可。
具体步骤如下面的章节。

1.2.1. 定义死信交换机

spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml文件中添加
如下内容:

<!--定义定向交换机中的持久化死信队列,不存在则自动创建--> 
<rabbit:queue id="my_dlx_queue" name="my_dlx_queue" auto-declare="true"/> 
<!--定义广播类型交换机;并绑定上述两个队列--> 
<rabbit:direct-exchange id="my_dlx_exchange" name="my_dlx_exchange" 
auto- declare="true"> 
<rabbit:bindings> 
<!--绑定路由键my_ttl_dlx、my_max_dlx,可以将过期的消息转移到my_dlx_queue队 列--> 
<rabbit:binding key="my_ttl_dlx" queue="my_dlx_queue"/> 
<rabbit:binding key="my_max_dlx" queue="my_dlx_queue"/> 
</rabbit:bindings> 
</rabbit:direct-exchange>

1.2.2. 队列设置死信交换机

为了测试消息在过期、队列达到最大长度后都将被投递死信交换机上;所以添加配置如下:

spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml文件中添加
如下内容:

<!--定义过期队列及其属性,不存在则自动创建--> 
<rabbit:queue id="my_ttl_dlx_queue" name="my_ttl_dlx_queue" auto- declare="true"> 
<rabbit:queue-arguments> 
<!--投递到该队列的消息如果没有消费都将在6秒之后被投递到死信交换机--> 
<entry key="x-message-ttl" value-type="long" value="6000"/> 
<!--设置当消息过期后投递到对应的死信交换机--> 
<entry key="x-dead-letter-exchange" value="my_dlx_exchange"/> 
</rabbit:queue-arguments> 
</rabbit:queue> 
<!--定义限制长度的队列及其属性,不存在则自动创建--> 
<rabbit:queue id="my_max_dlx_queue" name="my_max_dlx_queue" auto- declare="true"> 
<rabbit:queue-arguments> 
<!--投递到该队列的消息最多2个消息,如果超过则最早的消息被删除投递到死信交换机--> 
<entry key="x-max-length" value-type="long" value="2"/> 
<!--设置当消息过期后投递到对应的死信交换机--> 
<entry key="x-dead-letter-exchange" value="my_dlx_exchange"/> 
</rabbit:queue-arguments> 
</rabbit:queue> 

<!--定义定向交换机 根据不同的路由key投递消息--> 
<rabbit:direct-exchange id="my_normal_exchange" name="my_normal_exchange" 
auto-declare="true"> 
<rabbit:bindings> 
<rabbit:binding key="my_ttl_dlx" queue="my_ttl_dlx_queue"/> 
<rabbit:binding key="my_max_dlx" queue="my_max_dlx_queue"/> 
</rabbit:bindings> 
</rabbit:direct-exchange>

1.2.3. 消息过期的死信队列测试

1)发送消息代码
添加 spring-rabbitmq-producer\src\test\java\com\itheima\rabbitmq\ProducerTest.java方法

/*** 
过期消息投递到死信队列 
* 投递到一个正常的队列,但是该队列有设置过期时间,到过期时间之后消息会被投递到死信交换机 (队列)
*/ 
@Test
public void dlxTTLMessageTest(){ 
rabbitTemplate.convertAndSend("my_normal_exchange", "my_ttl_dlx", "测试过 期消息;
6秒过期后会被投递到死信交换机"); 
}

2)在rabbitMQ管理界面中结果
未过期:

在这里插入图片描述

过期后:


在这里插入图片描述

3)流程
具体因为队列消息过期而被投递到死信队列的流程:

在这里插入图片描述

1.2.4. 消息过长的死信队列测试

1)发送消息代码
添加 spring-rabbitmq-producer\src\test\java\com\itheima\rabbitmq\ProducerTest.java方法

/**
* 超过队列长度消息投递到死信队列 
* 投递到一个正常的队列,但是该队列有设置最大消息数,到最大消息数之后队列中最早的消息会被投 
递到死信交换机(队列) 
*/ 
@Test 
public void dlxMaxMessageTest(){ 
rabbitTemplate.convertAndSend("my_normal_exchange", "my_max_dlx", 
"队列my_max_dlx_queue的最大长度为2;消息超过后会被投递到死信交换机;这是 第1个消息"); 
rabbitTemplate.convertAndSend("my_normal_exchange", "my_max_dlx", 
"队列my_max_dlx_queue的最大长度为2;消息超过后会被投递到死信交换机;这是 第2个消息"); 
rabbitTemplate.convertAndSend("my_normal_exchange", "my_max_dlx", 
"队列my_max_dlx_queue的最大长度为2;消息超过后会被投递到死信交换机;这是 第3个消息");
}

2)在rabbitMQ管理界面中结果
上面发送的3条消息中的第1条消息会被投递到死信队列中(如果启动了消费者,那么队列消息很快会被取走消费掉);

在这里插入图片描述

3)消费者接收死信队列消息
与过期消息投递到死信队列的代码和配置是共用的,并不需要重新编写。

4)流程
消息超过队列最大消息长度而被投递到死信队列的流程在前面的图中已包含。

1.3. 延迟队列

延迟队列存储的对象是对应的延迟消息;所谓“延迟消息” 是指当消息被发送以后,并不想让消费者立刻
拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
在RabbitMQ中延迟队列可以通过 过期时间 + 死信队列来实现;具体如下流程图所示:

在这里插入图片描述

在上图中;分别设置了两个5秒、10秒的过期队列,然后等到时间到了则会自动将这些消息转移投递到对应的死信队列中,然后消费者再从这些死信队列接收消息就可以实现消息的延迟接收。
延迟队列的应用场景;如:

  • 在电商项目中的支付场景;如果在用户下单之后的几十分钟内没有支付成功;那么这个支付的订单算是支付失败,要进行支付失败的异常处理(将库存加回去),这时候可以通过使用延迟队列来处理
  • 在系统中如有需要在指定的某个时间之后执行的任务都可以通过延迟队列处理

1.4. 消息确认机制

确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。

1.4.1 发布确认

有两种方式:消息发送成功确认和消息发送失败回调。

  • 消息发送成功确认
    spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml connectionFactory中启用消息确认:
<!-- publisher-confirms="true" 表示:启用了消息确认 --> 
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" 
port="${rabbitmq.port}" 
username="${rabbitmq.username}" 
password="${rabbitmq.password}" 
virtual-host="${rabbitmq.virtual-host}" 
publisher-confirms="true" /> 

配置消息确认回调方法如下:

<!-- 消息回调处理类 --> 
<bean id="confirmCallback" class="com.itheima.rabbitmq.MsgSendConfirmCallBack"/> 
<!--定义rabbitTemplate对象操作可以在代码中方便发送消息--> 
<!-- confirm-callback="confirmCallback" 表示:消息失败回调 --> 
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" 
confirm-callback="confirmCallback"/>

消息确认回调方法com.itheima.rabbitmq.MsgSendConfirmCallBack如下:

 public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback { 
 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
 if (ack) { 
 System.out.println("消息确认成功...."); } 
 else { 
 //处理丢失的消息 System.out.println("消息确认失败," + cause);
     } 
      }
   }

功能测试如下:
发送消息
com.itheima.rabbitmq.ProducerTest#queueTest

 @Test public void queueTest(){
  //路由键与队列同名
 rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消 息。"); 
 }

管理界面确认消息发送成功


在这里插入图片描述

消息确认回调


在这里插入图片描述
  • 消息发送失败回调

在spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml

connectionFactory 中启用回调:

<!-- publisher-returns="true" 表示:启用了失败回调 --> 
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" 
port="${rabbitmq.port}" 
username="${rabbitmq.username}" 
password="${rabbitmq.password}" 
virtual-host="${rabbitmq.virtual-host}" 
publisher-returns="true" />

配置消息失败回调方法如下:

注意:同时需配置mandatory="true",否则消息则丢失

<!-- 消息失败回调类 --> 
<bean id="sendReturnCallback" 
class="com.itheima.rabbitmq.MsgSendReturnCallback"/> 
<!-- return-callback="sendReturnCallback" 表示:消息失败回调 ,同时需配置 
mandatory="true",否则消息则丢失--> 
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" 
confirm-callback="confirmCallback" return-callback="sendReturnCallback" 
mandatory="true"/>

消息失败回调方法com.itheima.rabbitmq.MsgSendReturnCallback如下:

public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback { 
public void returnedMessage(Message message, int i, String s, String s1, String s2) { 
String msgJson = new String(message.getBody()); 
System.out.println("Returned Message:"+msgJson); 
}
 }

功能测试如下:

模拟消息发送失败

com.itheima.rabbitmq.ProducerTest#testFailQueueTest

@Test 
public void testFailQueueTest() throws InterruptedException { 
//exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 
replyText:NO_ROUTE 
amqpTemplate.convertAndSend("test_fail_exchange", "", "测试消息发送失败进行确认 应答。"); 
}

失败回调结果如下:


在这里插入图片描述

1.4.2 事务支持

场景:业务处理伴随消息的发送,业务处理失败(事务回滚)后要求消息不发送。rabbitmq 使用调用者的外部事务,通常是首选,因为它是非侵入性的(低耦合)。

外部事务的配置:spring-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml

<!-- channel-transacted="true" 表示:支持事务操作 --> 
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" 
confirm-callback="confirmCallback" return-callback="sendReturnCallback" 
channel-transacted="true" /> 

<!--平台事务管理器--> 
<bean id="transactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager"> 
<property name="connectionFactory" ref="connectionFactory"/> 
</bean>
  • 模拟业务处理失败的场景:

测试类或者测试方法上加入@Transactional注解

@Transactional 
public class ProducerTest 
@Test 
public void queueTest2(){ 
//路由键与队列同名
 rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息- -01。"); System.out.println("----------------dosoming:可以是数据库的操作,也可以是其他业务 
 类型的操作---------------");
  //模拟业务处理失败 
  System.out.println(1/0); 
  rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息- -02。");
  }

测试结果:


在这里插入图片描述

1.5. 消息追踪

消息中心的消息追踪需要使用Trace实现,Trace是Rabbitmq用于记录每一次发送的消息,方便使用Rabbitmq的开发者调试、排错。可通过插件形式提供可视化界面。Trace启动后会自动创建系统Exchange:amq.rabbitmq.trace ,每个队列会自动绑定该Exchange,绑定后发送到队列的消息都会记录到Trace日志。

1.5.1 消息追踪启用与查看

以下是trace的相关命令和使用(要使用需要先rabbitmq启用插件,再打开开关才能使用):

命令集 描述
rabbitmq-plugins list 查看插件列表
rabbitmq-plugins enable rabbitmq_tracing rabbitmq启用trace插件
rabbitmqctl trace_on 打开trace的开关
rabbitmqctl trace_on -p itcast 打开trace的开关(itcast为需要日志追踪的 vhost)
rabbitmqctl trace_off 关闭trace的开关
rabbitmq-plugins disable rabbitmq_tracing rabbitmq关闭Trace插件
rabbitmqctl set_user_tags heimaadministrator 只有administrator的角色才能查看日志界面

安装插件并开启 trace_on 之后,会发现多个 exchange:amq.rabbitmq.trace ,类型为:topic。


在这里插入图片描述

1.5.2 日志追踪

第一步:发送消息

rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息--01。");

发送成功,web查看多了一条消息


在这里插入图片描述

第二步:查看trace

在这里插入图片描述

第三步:点击Tracing查看Trace log files

在这里插入图片描述

第四步:点击itcast-trace.log确认消息轨迹正确性

在这里插入图片描述

浏览器截图:


在这里插入图片描述
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容