概述
本文主要介绍一下RabbitMQ中的备份交换器、死信队列、延迟队列以及优先级队列
备份交换器
mandatory参数
在介绍备份交换器之前我们在回顾一下channel.basicPublish中的mandatory参数,它有当消息传递过程中不可达目的地时将消息返回给生产者的功能:mandatory参数如果设为true,当交换器无法根据自身的类型和路由键找到一个符合条件的队列时,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者 。当mandatory参数设置为false时,出现上述情形,则消息直接被丢弃。
那么生产者如何知道发布的消息有没有被正确的路由到合适的队列呢?这时候可以通过调用channel.addReturnListener监听器实现:
channel.basicPublish(EXCHANGE_NAME, "", true,
MessageProperties.PERSISTENT_TEXT_PLAIN,
"mandatory test".getBytes());
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties basicProperties,
byte[] body) throws IOException {
String message = new String(body);
System.out.println("Basic.Return 返回的结果是: "+ message);
}
});
上面代码中生产者没有成功地将消息路由到队列,此时RabbitMQ会通过Basic.Return返回"mandatory test"这条消息,之后生产者客户端通过ReturnListener监昕到了这个事件,上面代码的最后输出应该是:"Basic.Return 返回的结果是 : mandatory test"。
备份交换器,英文名称为:Altemate Exchange(简称AE),生产者在发送消息的时候如果不设置mandatory参数 ,那么消息在未被路由的情况下将会丢失 : 如果设置了mandatory参数,那么需要添加ReturnListener的编程逻辑,生产者的代码将变得复杂。如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在RabbitMQ中,再在需要的时候去处理这些消息。
声明备份交换器
声明备份交换器可以在调用channel.exchangeDeclare方法时添加alternate-exchange参数来实现,也可以通过策略Policy的方式实现。如果两者同时使用,则前者的优先级更高,会覆盖掉Policy的设置。
Map<String, Object> args = new HashMap<String, Object>();
args.put("alternate-exchange", "myAe");
//声明交换器
channel.exchangeDeclare("normalExchange", "direct", true, false, args);
channel.exchangeDeclare("myAe", "fanout", true, false, null);
//声明队列
channel.queueDeclare("normalQueue", true, false, false, null);
channel.queueDeclare("unroutedQueue", true, false, false, null);
//交换器与队列绑定
channel.queueBind("normalQueue", "normalExchange", "normalKey");
channel.queueBind( "unroutedQueue", "myAe", "" ) ;
上面的代码中声明了两个交换器:normalExchange和myAe,分别绑定了normalQueue和uroutedQueue这两个队列,同时将myAe设置为normalExchange的备份交换器(注意myAe的交换器类型为fanout)。
如果此时发送一条消息到normalExchange上,当路由键等于"normalKey"的时候,消息能正确路由到normalQueue这个队列中。如果路由键设为其他值,比如"errorKey" 则消息不能被正确地路由到与normalExchange绑定的任何队列上,此时就会发送给myAe,进而发送到unroutedQueue这个队列。
备份交换器其实和普通的交换器没有太大的区别,为了方便使用,建议设置为fanout类型,如若想设置为direct或者topic的类型也没有什么不妥,个人建议使用fanout类型:考虑这样一种情况,如果备份交换器的类型是direct,并且有一个与其绑定的队列,假设绑定的路由键是key1,当某条携带路由键为key2的消息被转发到这个备份交换器的时候,备份交换器没有匹配到合适的队列,则消息仍丢失,这样备份交换器只能存储携带路由键key1的消息。
需要注意的是,消息被重新发送到备份交换器时的路由键和从生产者发出的路由键是一样的。
对于备份交换器,总结了以下几种特殊情况:
- 如果设置的备份交换器不存在,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失。
- 如果备份交换器没有绑定任何队列,客户端和RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
- 如果备份交换器没有任何匹配的队列,客户端和RabbitMQ服务端都不会有异常出现,此时消息会丢失。
- 如果备份交换器和mandatory参数一起使用,那么mandatory参数无效。
设置消息过期时间(TTL:Time to Live)
RabbitMQ支持对消息和队列设置过期时间
设置过期时间的方式可以在声明队列时给队列设置过期时间属性,如果一个队列设置了过期时间,那么队列中的所有消息都有相同的过期时间,也可以在发送消息时对消息本身进行单独设置
如果我们同时设置了队列过期时间和消息过期时间,那么消息最终的TTL取最小值,消息在队列中生存时间如果超过设置的TTL值就会变成"死信"(Dead Message),消费者将无法再收到该条消息(不是绝对的,可以使用死信队列)
对于给队列设置过期时间这种方式一旦消息过期,就会从队列中移除,而给消息设置过期时间这种方式,即使消息过期,也不会马上从队列中移除,因为每条消息是否过期是在即将投递到消费者之前判定的。
为什么这两种方法处理的方式不一样?因为队列设置过期时间这种方式,队列中己过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期的消息即可。而给消息设置过期时间,每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。
-
给队列设置过期时间
在队列声明channel.queueDeclare方法中加入x-message-ttl可以实现给队列设置过期时间,参数的单位是毫秒。
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, argss);
也可以通过Policy的方式来设置TTL,示例如下 :
./rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues
如果不设置TTL则表示此消息不会过期
如果将TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃。
-
给消息单独设置过期时间
在发送消息channel.basicPublish方法中加入expiration的属性参数,单位为毫秒。
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
//持久化消息
builder.deliveryMode(2);
//设置消息过期时间为60000ms
builder.expiration("60000");
AMQP.BasicProperties properties = builder.build() ;
channel.basicPublish(exchangeName, routingKey, mandatory, properties,"Hello RabbitMQ".getBytes());
死信队列
死信队列全称为Dead-Letter-Exchange(DLX),也可以称为死信交换器或死信邮箱,当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。
消息变成死信的情形:
-- 消息被拒绝,并且设置requeque参数为false
-- 消息过期
-- 队列长度达到最大值
DLX也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定 ,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。可以监听这个队列中的消息以进行相应的处理。
配置死信队列
- 方式一:通过在队列声明channel.queueDeclare方法中设置x-dead-letter-exchange参数可以为一个队列添加一个死信队列DLX
//先声明一个死信队列: dlx_exchange
channel.exchangeDeclare("dlx_exchange", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "dlx_exchange ");
//为队列myqueue添加死信队列
channel.queueDeclare("myqueue", false, false, false, args);
RabbitMQ允许我们为死信队列指定路由键,如果没有特殊指定,则使用原队列的路由键。
- 方式二:当然这里也可以通过Policy的方式设置:
./rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":" dlx_exchange"}' --apply-to queues
对于 RabbitMQ 来说,死信队列是一个非常有用的特性,消息不能被消费者正确消费(消费者调用了Basic.Nack 或者 Basic.Reject) 而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。
延迟队列
延迟队列存储的对象是对应的延迟消息,所谓延迟消息就是指消息被发送后,并不想让消费者立即拿到消息,而是在等待特定时间后消费者才可以拿到这个消息进行消费。
在AMQP协议中,或者RabbitMQ本身没有直接支持延迟队列的功能,但是可以通过前面介绍的DLX和TTL模拟出延迟队列的功能:假设一个应用中需要将每条消息都设置为10秒的延迟,生产者通过 exchange.normal这个交换器将发送的消息存储在 queue.normal这个队列中。消费者订阅的并非是 queue.normal 这个队列,而是它的死信队列queue.dlx 。当 消息从queue.normal 这个队列中过期之后被存入queue.dlx这个队列中,消费者就恰巧消费到了延迟10秒的这条消息。
优先级队列
优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。
可以通过设置队列的x-max-priority参数来实现
Map<String, Object> args = new HashMap<String, Object>() ;
args.put( "x-max-priority", 10) ;
channel.queueDeclare("queue.priority", true, fa1se, false, args) ;
上面的代码演示的是如何配置一个队列的最大优先级。在此之后,需要在发送时在消息中设置消息当前的优先级:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(5) ;
AMQP.BasicProperties properties = builder.build() ;
channel.basicPublish("exchange_priority", "rk_priority", properties, "Hello RabbitMQ".getBytes());
上面的代码中设置消息的优先级为5。默认最低为0,最高为队列设置的最大优先级。优先级高的消息可以被优先消费,这个也是有前提的 : 如果在消费者的消费速度大于生产者的速度,且 Broker中没有消息堆积的情况下,对发送的消息设置优先级也就没有什么实际意义。因为生产者刚发送完一条消息就被消费者消费了,那么就相当于 Broker中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。