前言
消息队列在现今数据量超大,并发量超高的系统中是十分常用的。本文将会对现时最常用到的几款消息队列框架 ActiveMQ、RabbitMQ、Kafka 进行分析对比。
详细介绍 RabbitMQ 在 Spring 框架下的结构及实现原理,从Producer 端的事务、回调函数(ConfirmCallback / ReturnCallback)到 Consumer 端的 MessageListenerContainer 信息接收容器进行详细的分析。通过对 RabbitTemplate、SimpleMessageListenerContainer、DirectMessageListenerContainer 等常用类型介绍,深入剖析在消息处理各个传输环节中的原理及注意事项。
并举以实例对死信队列、持久化操作进行一一介绍。
一、RabbitMQ 与 AMQP 的关系
1.1 AMQP简介
AMQP(Advanced Message Queue Protocol 高级消息队列协议)是一个消息队列协议,它支持符合条件的客户端和消息代理中间件(message middleware broker)进行通讯。RabbitMQ 则是 AMQP 协议的实现者,主要用于在分布式系统中信息的存储发送与接收,RabbitMQ 的服务器端用 Erlang 语言编写,客户端支持多种开发语言:Python、.NET、Java、Ruby、C、PHP、ActionScript、XMPP、STOMP 等。
1.2 ActiveMQ、RabbitMQ、Kafka 对比
现在在市场上有 ActiveMQ、RabbitMQ、Kafka 等多个常用的消息队列框架,与其他框架对比起来,RabbitMQ 在易用性、扩展性、高可用性、多协议、支持多语言客户端等方面都有不俗表现。
1.2.1 AcitveMQ 特点
ActiveMQ 是 Apache 以 Java 语言开发的消息模型,它完美地支持 JMS(Java Message Service)消息服务,客户端支持 Java、C、C++、C#、Ruby、Perl、Python、PHP 等多种开主发语言,支持OpenWire、Stomp、REST、XMPP、AMQP 等多种协议。ActiveMQ 采用异步消息传递方式,在设计上保证了多主机集群,客户端-服务器,点对点等模式的有效通信。从开始它就是按照 JMS 1.1 和 J2EE 1.4 规范进行开发,实现了消息持久化,XA,事务支撑等功能。经历多年的升级完善,现今已成为 Java 应用开发中主流的消息解决方案。但相比起 RabbitMQ、Kafka 它的主要缺点表现为资源消耗比较大,吞吐量较低,在高并发的情况下系统支撑能力较弱。如果系统全程使用 Java 开发,其并发量在可控范围内,或系统需要支持多种不同的协议,使用 ActiveMQ 可更轻便地搭建起消息队列服务。
1.2.2 Kafka 特点
Kafka 天生是面向分布式系统开发的消息队列,它具有高性能、容灾性、可动态扩容等特点。Kafka 与生俱来的特点在于它会把每个Partition 的数据都备份到不同的服务器当中,并与 ZooKeeper 配合,当某个Broker 故障失效时,ZooKeeper 服务就会将通知生产者和消费者,从备份服务器进行数据恢复。在性能上 Kafka 也大大超越了传统的 ActiveMQ、RabbitMQ ,由于 Kafka 集群可支持动态扩容,在负载量到达峰值时可动态增加新的服务器进集群而无需重启服务。但由于 Kafka 属于分布式系统,所以它只能在同一分区内实现消息有序,无法实现全局消息有序。而且它内部的监控机制不够完善,需要安装插件,依赖ZooKeeper 进行元数据管理。如果系统属于分布式管理机制,数据量较大且并发量难以预估的情况下,建议使用 Kafka 队列。
1.2.3 RabbitMQ 对比
由于 ActiveMQ 过于依赖 JMS 的规范而限制了它的发展,所以 RabbitMQ 在性能和吞吐量上明显会优于 ActiveMQ。
由于上市时间较长,在可用性、稳定性、可靠性上 RabbitMq 会比 Kafka 技术成熟,而且 RabbitMq 使用 Erlang 开发,所以天生具备高并发高可用的特点。而 Kafka 属于分布式系统,它的性能、吞吐量、TPS 都会比 RabbitMq 要强。
二、RabbitMQ 的实现原理
2.1 生产者(Producer)、消费者(Consumer)、服务中心(Broker)之间的关系
首先简单介绍 RabbitMQ 的运行原理,在 RabbitMQ 使用时,系统会先安装并启动 Broker Server,也就是 RabbitMQ 的服务中心。无论是生产者 (Producer),消费者(Consumer)都会通过连接池(Connection)使用 TCP/IP 协议(默认)来与 BrokerServer 进行连接。然后 Producer 会把 Exchange / Queue 的绑定信息发送到 Broker Server,Broker Server 根据 Exchange 的类型逻辑选择对应 Queue ,最后把信息发送到与 Queue 关联的对应 Consumer 。
2.2 交换器(Exchange)、队列(Queue)、信道(Channel)、绑定(Binding)的概念
2.2.1 交换器 Exchange
Producer 建立连接后,并非直接将消息投递到队列 Queue 中,而是把消息发送到交换器 Exchange,由 Exchange 根据不同逻辑把消息发送到一个或多个对应的队列当中。目前 Exchange 提供了四种不同的常用类型:Fanout、Direct、Topic、Header。
- Fanout类型
此类型是最为常见的交换器,它会将消息转发给所有与之绑定的队列上。比如,有N个队列与 Fanout 交换器绑定,当产生一条消息时,Exchange 会将该消息的N个副本分别发给每个队列,类似于广播机制。
- Direct类型
此类型的 Exchange 会把消息发送到 Routing_Key 完全相等的队列当中。多个 Cousumer 可以使用相同的关键字进行绑定,类似于数据库的一对多关系。比如,Producer 以 Direct 类型的 Exchange 推送 Routing_Key 为 direct.key1 的队列,系统再指定多个 Cousumer 绑定 direct.key1。如此,消息就会被分发至多个不同的 Cousumer 当中。
- Topic类型
此类型是最灵活的一种方式配置方式,它可以使用模糊匹配,根据 Routing_Key 绑定到包含该关键字的不同队列中。比如,Producer 使用 Topic类型的 Exchange 分别推送 Routing_Key 设置为 topic.guangdong.guangzhou 、topic.guangdong.shenzhen 的不同队列,Cousumer 只需要把 Routing_Key 设置为 topic.guangdong.# ,就可以把所有消息接收处理。
- Headers类型
该类型的交换器与前面介绍的稍有不同,它不再是基于关键字 Routing_Key 进行路由,而是基于多个属性进行路由的,这些属性比路由关键字更容易表示为消息的头。也就是说,用于路由的属性是取自于消息 Header 属性,当消息 Header 的值与队列绑定时指定的值相同时,消息就会路由至相应的队列中。
2.2.2 Queue 队列
Queue 队列是消息的载体,每个消息都会被投入到 Queue 当中,它包含 name,durable,arguments 等多个属性,name 用于定义它的名称,当 durable(持久化)为 true 时,队列将会持久化保存到硬盘上。反之为 false 时,一旦 Broker Server 被重启,对应的队列就会消失,后面还会有例子作详细介绍。
2.2.3 Channel 通道
当 Broker Server 使用 Connection 连接 Producer / Cousumer 时会使用到信道(Channel),一个 Connection上可以建立多个 Channel,每个 Channel 都有一个会话任务,可以理解为逻辑上的连接。主要用作管理相关的参数定义,发送消息,获取消息,事务处理等。
2.2.4 Binding 绑定
Binding 主要用于绑定交换器 Exchange 与 队列 Queue 之间的对应关系,并记录路由的 Routing-Key。Binding 信息会保存到系统当中,用于 Broker Server 信息的分发依据。
三、RabbitMQ 应用实例
3.1 Rabbit 常用类说明
3.1.1 RabbitTemplate 类
Spring 框架已经封装了 RabbitTemplate 对 RabbitMQ 的绑定、队列发送、接收进行简化管理
3.2 初探 RabbitMQ
在官网下载并成功安装完 RabbitMQ 后,打开默认路径 http://localhost:15672/#/ 即可看到 RabbitMQ 服务中心的管理界面
3.2.1 Producer 端开发
先在 pom 中添加 RabbitMQ 的依赖,并在 application.yml 中加入 RabbitMQ 帐号密码等信息。此例子,我们尝试使用 Direct 交换器把队列发送到不同的 Consumer。
**********************pom *************************
<project>
.............
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.0.5.RELEASE</version>
</dependency>
</project>
**************** application.yml ****************
spring:
application:
name: rabbitMqProducer
rabbitmq:
host: localhost
port: 5672
username: admin
password: 12345678
virtual-host: /LeslieHost
首先使用 CachingConnectionFactory 建立链接,通过 BindingBuilder 绑定 Exchange、Queue、RoutingKey之间的关系。
然后通过 void convertAndSend (String exchange, String routingKey, Object object, CorrelationData data) 方法把信息发送到 Broken Server
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String second="direct.second";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
public final static String RoutingKey2="directKey2";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public Queue queueSecond(){
return new Queue(second);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME,true,true);
}
//利用BindingBuilder绑定Direct与queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
//利用BindingBuilder绑定Direct与queueSecond
@Bean
public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
}
}
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/send")
public void send() {
for(int n=0;n<100;n++){
template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"I'm the first queue! "+String.valueOf(n),getCorrelationData());
template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey2,"I'm the second queue! "+String.valueOf(n),getCorrelationData());
}
}
private CorrelationData getCorrelationData(){
return new CorrelationData(UUID.randomUUID().toString());
}
}
此时,打开 RabbitMQ 管理界面,可看到 Producer 已经向 Broken Server 的 direct.first / direct.second 两个 Queue 分别发送100 个 Message
3.2.2 Consumer 端开发
分别建立两个不同的 Consumer ,一个绑定 direct.first 别一个绑定 direct.second , 然后通过注解 @RabbitListener 监听不同的 queue,当接到到 Producer 推送队列时,显示队列信息。
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder绑定Direct与queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
}
@Configuration
@RabbitListener(queues="direct.first")
public class RabbitMqListener {
@RabbitHandler
public void handler(String message){
System.out.println(message);
}
}
@SpringBootApplication
public class App {
public static void main(String[] args){
SpringApplication.run(App.class, args);
}
}
运行后可以观察到不同的 Consumer 会收到不同队列的消息
如果觉得使用 Binding 代码绑定过于繁琐,还可以直接在监听类RabbitMqListener中使用 @QueueBinding 注解绑定
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
@RabbitListener(bindings=@QueueBinding(
exchange=@Exchange(value="directExchange"),
value=@Queue(value="direct.second"),
key="directKey2"))
public class RabbitMqListener {
@RabbitHandler
public void handler(String message){
System.out.println(message);
}
}
@SpringBootApplication
public class App {
public static void main(String[] args){
SpringApplication.run(App.class, args);
}
}
运行结果
四、Producer 端的消息发送与监控
前面一节已经介绍了RabbitMQ的基本使用方法,这一节将从更深入的层面讲述 Producer 的应用。
试想一下这种的情形,如果因 RabbitTemplate 发送时 Exchange 名称绑定错误,或 Broken Server 因网络问题或服务负荷过大引发异常,Producer 发送的队列丢失,系统无法正常工作。此时,开发人员应该进行一系列应对措施进行监测,确保每个数据都能正常推送到 Broken Server 。有见及此,RabbitMQ 专门为大家提供了两种解决方案,一是使用传统的事务模式,二是使用回调函数,下面为大家作详介绍。
4.1 Producer 端的事务管理
在需要使用事务时,可以通过两种方法
第一可以调用 channel 类的方法以传统模式进行管理,事务开始时调用 channel.txSelect(),信息发送后进行确认 channel.txCommit(),一旦捕捉到异常进行回滚 channel.txRollback(),最后关闭事务。
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/send")
public void send1(HttpServletResponse response)
throws InterruptedException, IOException, TimeoutException{
Channel channel=template.getConnectionFactory().createConnection().createChannel(true);
.......
try{
channel.txSelect();
channel.basicPublish("ErrorExchange", BindingConfig.Routing_Key_First, new AMQP.BasicProperties(),"Nothing".getBytes());
channel.txCommit();
}catch(Exception e){
channel.txRollback();
}finally{
channel.close();
}
......
......
......
}
}
第二还可以直接通过 RabbitTemplate 的配置方法 void setChannelTransacted(bool isTransacted) 直接开启事务
public class ProducerController {
@Autowired
private ConnectionConfig connection;
@Autowired
@Bean
private RabbitTemplate template(){
RabbitTemplate template=new RabbitTemplate(connection.getConnectionFactory());
template.setChannelTransacted(true);
return template;
}
@RequestMapping("/send")
@Transactional(rollbackFor=Exception.class)
public void send(HttpServletResponse response) throws InterruptedException, IOException,TimeoutException{
..........
..........
..........
}
}
4.2 利用 ConfirmCallback 回调确认消息是否成功发送到 Exchange
使用事务模式消耗的系统资源比较大,系统往往会处理长期等待的状态,在并发量较高的时候也有可能造成死锁的隐患。有见及此,系统提供了轻量级的回调函数方式进行异步处理。
当需要确认消息是否成功发送到 Exchange 的时候,可以使用 ConfirmCallback 回调函数。使用该函数,系统推送消息后,该线程便会得到释放,等 Exchange 接收到消息后系统便会异步调用 ConfirmCallback 绑定的方法进行处理。ConfirmCallback 只包含一个方法 void confirm(CorrelationData correlationData, boolean ack, String cause),此方法会把每条数据发送到 Exchange 时候的 ack 状态(成功/失败),cause 成败原因,及对应的 correlationData(CorrelationData 只包含一个属性 id,是绑定发送对象的唯一标识符) 返还到 Producer,让Producer 进行相应处理。
注意:在绑定 ConfirmCallback 回调函数前,请先把 publisher-confirms 属性设置为 true
spring:
application:
name: rabbitmqproducer
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: 12345678
virtual-host: /LeslieHost
例如:下面的例子,特意将 RabbitTemplate 发送时所绑定的 Exchange 名称填写为错误名称 “ ErrorExchange ”,造成发送失败,然后在回调函数中检查失败的原因。
Producer 端代码:
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
System.out.println(host);
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setPublisherConfirms(true);
factory.setPublisherReturns(true);
return factory;
}
}
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
}
@Component
public class MyConfirmCallback implements ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// TODO 自动生成的方法存根
// TODO 自动生成的方法存根
if(ack){
System.out.println(correlationData.getId()+" ack is: true! \ncause:"+cause);
}else
System.out.println(correlationData.getId()+" ack is: false! \ncause:"+cause);
}
}
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@Autowired
private MyConfirmCallback confirmCallback;
@RequestMapping("/send")
public void send() {
template.setConfirmCallback(confirmCallback);
for(int n=0;n<2;n++){
template.convertAndSend("ErrorExchange",
BindingConfig.RoutingKey1,"I'm the first queue! "
+String.valueOf(n),getCorrelationData());
}
}
private CorrelationData getCorrelationData(){
return new CorrelationData(UUID.randomUUID().toString());
}
}
Consumer端代码
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
@RabbitListener(bindings=@QueueBinding(
exchange=@Exchange(value="directExchange"),
value=@Queue(value="direct.first"),
key="directKey1"))
public class RabbitMqListener {
@RabbitHandler
public void handler(String message){
System.out.println(message);
}
}
@SpringBootApplication
public class App {
public static void main(String[] args){
SpringApplication.run(App.class, args);
}
}
运行结果:
4.3 绑定 CorrelationData 与发送对象的关系
上面的例子当中,CorrelationData 只是用一个随机的 UUID 作为 CorrelationID,而在现实的应用场景中,由于 ConfirmCallback 只反回标识值 CorrelationData,而没有把队列里的对象值也一同返回。所以,在推送队列时可以先用 Key-Value 保存 CorrelationID 与所发送信息的关系,这样当 ConfirmCallback 回调时,就可根据 CorrelationID 找回对象,作进一步处理。
下面例子,我们把要发送的对象放在虚拟数据 DataSource 类中,用 DataRelation 记录 CorrelationID 与发送对象 OrderID 的关系,然后在回调函数 ConfirmCallback 中根据 CorrelationID 查找对应的 OrderEntity,如果发送成功,则删除绑定。如果发送失败,可以重新发送或根据情况再作处理。
Producer端代码:
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
System.out.println(host);
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setPublisherConfirms(true);
factory.setPublisherReturns(true);
return factory;
}
}
@Configuration
public class BindingConfig {
public final static String first="direct.first";
//Exchange 使用 direct 模式
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
}
@Data
public class OrderEntity implements Serializable{
private String id;
private String goods;
private Double price;
private Integer count;
public OrderEntity(String id,String goods,Double price,Integer count){
this.id=id;
this.goods=goods;
this.price=price;
this.count=count;
}
public OrderEntity(){}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getGoods() {
return goods;
}
public void setGoodsId(String goods) {
this.goods = goods;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
}
@Component
public class DataSource {
//加入虚拟数据
private static List<OrderEntity> list=new ArrayList<OrderEntity>(
Arrays.asList(new OrderEntity("001","Nikon D750",13990.00,1),
new OrderEntity("002","Huwei P30 Plus",5400.00,1),
..........));
public DataSource(){
}
public List<OrderEntity> getOrderList(){
return list;
}
//根据Id获取对应order
public OrderEntity getOrder(String id){
for(OrderEntity order:list){
if(order.getId()==id)
return order;
}
return null;
}
}
public class DataRelation {
public static Map map=new HashMap();
//绑定关系
public static void add(String key,String value){
if(!map.containsKey(key))
map.put(key,value);
}
//返回orderId
public static Object get(String key){
if(map.containsKey(key))
return map.get(key);
else
return null;
}
//根据 orderId 删除绑定关系
public static void del(String key){
if(map.containsKey(key))
map.remove(key);
}
}
@Component
public class MyConfirmCallback implements ConfirmCallback {
@Autowired
private DataSource datasource;
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String correlationId=correlationData.getId();
//根据 correclationId取回对应的orderId
String orderId=DataRelation.get(correlationId).toString();
//在datasource中找回对应的order
OrderEntity order=datasource.getOrder(orderId);
if(ack){
System.out.println("--------------------ConfirmCallback-------------------\n"
+" order's ack is true!\nId:"+order.getId()+" Goods:"+order.getGoods()
+" Count:"+order.getCount().toString()+" Price:"+order.getPrice());
DataRelation.del(correlationId); //操作完成删除对应绑定
}else {
System.out.println(order.getId()+" order's ack is: false! \ncause:"+cause);
//可在记录日志后把Order推送到队列进行重新发送
.......
}
}
}
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@Autowired
private MyConfirmCallback confirmCallback;
@Autowired
private DataSource dataSource;
@RequestMapping("/send")
public void send() throws InterruptedException, IOException{
//绑定 ConfirmCallback 回调函数
template.setConfirmCallback(confirmCallback);
for(OrderEntity order:dataSource.getOrderList()){
CorrelationData correlationData=getCorrelationData();
//保存 CorrelationId 与 orderId关系
DataRelation.add(correlationData.getId(), order.getId());
//把 order 插入队列
template.convertAndSend("directExchange",BindingConfig.RoutingKey1,order,correlationData);
}
}
private CorrelationData getCorrelationData(){
return new CorrelationData(UUID.randomUUID().toString());
}
}
Consumer 端代码
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
@RabbitListener(bindings=@QueueBinding(
exchange=@Exchange(value="directExchange"),
value=@Queue(value="direct.first"),
key="directKey1"))
public class RabbitMqListener {
@RabbitHandler
public void handler(String message){
System.out.println(message);
}
}
@SpringBootApplication
public class App {
public static void main(String[] args){
SpringApplication.run(App.class, args);
}
}
运行结果
4.4 利用 ReturnCallback 处理队列 Queue 错误
使用 ConfirmCallback 函数只能判断消息是否成功发送到 Exchange,但并不能保证消息已经成功进行队列 Queue。所以,系统预备了另一个回调函数 ReturnCallback 来监听 Queue 队列处理的成败。如果队列错误绑定不存在的 queue,或者 Broken Server 瞬间出现问题末能找到对应的 queue,系统就会激发 Producer 端 ReturnCallback 的回调函数来进行错误处理。 ReturnCallback 回调接口只包含一个方法 void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey),它会把出错的 replyCode,replyText,exchange,routingKey等值都一起返还。与 ConfirmCallback 不同的是,returnedMessage 会把队列中的对象保存到 Message 的 Body 属性中并返还到回调函数。
注意:在绑定 ReturnCallback 回调函数前,请先把 publisher-returns 及 mandatory 属性设置为 true 。 mandatory 参数默认为 false,用于判断 broken server是否把错误的对象返还到 Producer。如末进行设置,系统将把错误的消息丢弃。
下面例子我们在调用 convertAndSend 方法时特意把 routingKey 设置为 ErrorKey,触发 ReturnCallback 回调,然后在 ReturenCallback 的回调方法显示 replyCode,replyText,exchange,routingKey 等值,并把队列中对象属性一并显示。
Producer 端代码
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
System.out.println(host);
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setPublisherConfirms(true);
factory.setPublisherReturns(true);
return factory;
}
}
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
}
@Data
public class OrderEntity implements Serializable{
private String id;
private String goods;
private Double price;
private Integer count;
public OrderEntity(String id,String goods,Double price,Integer count){
this.id=id;
this.goods=goods;
this.price=price;
this.count=count;
}
public OrderEntity(){}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getGoods() {
return goods;
}
public void setGoodsId(String goods) {
this.goods = goods;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
}
@Component
public class DataSource {
//虚拟数据
private static List<OrderEntity> list=new ArrayList<OrderEntity>(
Arrays.asList(new OrderEntity("001","Nikon D750",13990.00,1),
new OrderEntity("002","Huwei P30 Plus",5400.00,1),
......));
public DataSource(){
}
public List<OrderEntity> getOrderList(){
return list;
}
//根据Id获取对应order
public OrderEntity getOrder(String id){
for(OrderEntity order:list){
if(order.getId()==id)
return order;
}
return null;
}
}
@Component
public class MyReturnCallback implements ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode,
String replyText, String exchange, String routingKey){
//把messageBody反序列化为 OrderEntity对象
OrderEntity order=convertToOrder(message.getBody());
//显示错误原因
System.out.println("-------------ReturnCallback!------------\n"
+" exchange:"+exchange+" replyCode:"+String.valueOf(replyCode)
+" replyText:"+replyText+" key:"+routingKey+"\n OrderId:"+order.getId()
+" Goods:"+order.getGoods()+" Count:"+order.getCount().toString()
+" Price:"+order.getPrice()+" ");
}
//把byte[]反序列化为 OrderEntity对象
private OrderEntity convertToOrder(byte[] bytes){
OrderEntity order=null;
ByteArrayInputStream bis = new ByteArrayInputStream (bytes);
ObjectInputStream ois;
try {
ois = new ObjectInputStream (bis);
Object obj = ois.readObject();
order=(OrderEntity)obj;
ois.close();
bis.close();
} catch (IOException | ClassNotFoundException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
return order;
}
}
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@Autowired
private MyReturnCallback returnCallback;
@Autowired
private DataSource dataSource;
@RequestMapping("/send")
public void send() throws InterruptedException, IOException{
//把 mandatory 属性设定为true
template.setMandatory(true);
//绑定 ReturnCallback 回调函数
template.setReturnCallback(returnCallback);
for(OrderEntity order:dataSource.getOrderList()){
CorrelationData correlationData=getCorrelationData();
template.convertAndSend("directExchange","ErrorKey",order,correlationData);
}
}
private CorrelationData getCorrelationData(){
return new CorrelationData(UUID.randomUUID().toString());
}
}
Consumer 代码
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
@RabbitListener(bindings=@QueueBinding(
exchange=@Exchange(value="directExchange"),
value=@Queue(value="direct.first"),
key="directKey1"))
public class RabbitMqListener {
@RabbitHandler
public void handler(String message){
System.out.println(message);
}
}
@SpringBootApplication
public class App {
public static void main(String[] args){
SpringApplication.run(App.class, args);
}
}
运行结果:
五、Consumer 消息接收管控
在第四节主要介绍了 Producer 端的队列发送与监控,它只能管理 Producer 与 Broker Server 之间的通信,但并不能确认 Consumer 是否能成功接收到队列,在这节内容将介绍 Consumer 端的队列接收与监听。前面几节里,Consumer 端都是简单地直接使用 RabbitListener 对队列进行监听,其实 RabbitMQ 已经为用户准备了功能更强大的 MessageListenerContainer 容器用于管理 Message ,下面将为大家介绍。
5.1 AbstractMessageListenerContainer 介绍
AbstractMeessageListenerContainer 虚拟类是 RabbitMQ 封装好的一个容器,本身并没有对消息进行处理,而是把消息的处理方式交给了 MessageListener 。而它的主要功能是实现 MessageListener 的绑定,ApplicationContext 上下文的绑定,ErrorHandler 的错误处理方法的绑定、对消息消费的开始、结束等等默认参数进行配置,让开发人员可以在容器中对 Consumer 实现统一管理。SimpleMessageListenerContainer、DirectMessageLinstenerCoontainer 都是它的子类,分别应用于不同的场景,在下面会再作详细介绍。
MessageListener 是监听消息最常用 Listener,它只包含了一个方法 void onMessage(Message message),这是消息接收最常用的一个方法,开发者只需要实现此方法即可对接收到的 Message 进行处理。
ChannelAwareMessageListener 相当于是 MessageListener的一个扩展,包含了方法 void onMessage(Message message, Channel channel),除了对 Message 进行处理外,还可以对接收此 Message 的 Channel 进行检测。
5.2 SimpleMessageListenerContainer 常用方法
SimpleMessageListenerContainer 是最常用的 MessageListener 容器,它可以通过下面的方法设置默认消费者数量与最大的消费者数量。下面例子中尝试把 consurrentConsumers 设置为3,把maxConcurrentConsumers 设置为4,并同时监控 direct 模式交换器的 direct.first,direct.second 队列。
通过截图可以看到,系统默认会为每个 queue 都创建 3 个 consumers,不同的 queue 中的 consumers 是共享相同的 3 个 channel 。
当 Producer 端发送消息时,consumers 的实际数量可根据 maxConcurrentConsumers 的配置限制进行扩展。
Producer 端代码
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String second="direct.second";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
public final static String RoutingKey2="directKey2";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public Queue queueSecond(){
return new Queue(second);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder绑定Direct与queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
//利用BindingBuilder绑定Direct与queueSecond
@Bean
public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
}
}
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/send")
public void send(HttpServletResponse response) throws InterruptedException, IOException{
for(Integer n=0;n<100;n++){
CorrelationData correlationData=getCorrelationData();
template.convertAndSend("directExchange","directKey1",
"queue1"+" "+n.toString(),correlationData);
template.convertAndSend("directExchange","directKey2"," queue2"+" "+n.toString(),correlationData);
Thread.currentThread().sleep(30);
}
}
private CorrelationData getCorrelationData(){
return new CorrelationData(UUID.randomUUID().toString());
}
}
Consumer 端代码:
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String second="direct.second";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
public final static String RoutingKey2="directKey2";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public Queue queueSecond(){
return new Queue(second);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder绑定Direct与queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
//利用BindingBuilder绑定Direct与queueSecond
@Bean
public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
}
}
@Configuration
public class SimpleMessListener {
@Autowired
private RabbitTemplate template;
private int index=0;
@Bean
public SimpleMessageListenerContainer messageContainer(){
SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionConfig.getConnectionFactory());
// 绑定Queue1/Queue2
container.setQueueNames("direct.first");
container.addQueueNames("direct.second");
//设置默认 consumer 数为3
container.setConcurrentConsumers(3);
//设置最大 consumer 数为4
container.setMaxConcurrentConsumers(4);
//标记 consumerTag
container.setConsumerTagStrategy(queue -> "consumer"+(++index));
//绑定MessageListener显示接收信息
container.setMessageListener(new MessageListener(){
@Override
public void onMessage(Message message) {
// TODO 自动生成的方法存根
Thread thread=Thread.currentThread();
MessageProperties messProp=message.getMessageProperties();
try {
System.out.println(" ConsumerTag:"+messProp.getConsumerTag()
+" ThreadId is:"+thread.getId()+" Queue:"+messProp.getConsumerQueue()
+" "+new String(message.getBody(),"UTF-8"));
} catch (UnsupportedEncodingException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
}
});
return container;
}
}
运行结果
5.3 SimpleMessageListenerContainer 的运作原理
在 SimpleMessageListenerContainer 模式中,无论系统监听多少个 queue 队列,channel 都是共享的,类似上面的例子,4个 channel 会把接收到不同的队列请求并分发到对应的 consumer 进行处理。这样做的好处是系统可以通过 concurrentConsumers、maxConcurrentConsumers 灵活设定当前队列中消费者的数量,系统可以跟据实际需求灵活处理。但由于每个 channel 都是在固定线程中运行的,一个 channel 要游走于多个 consumer 当中,这无疑增加了系统在上下文切换中的开销。下面用系统提供的 ChannelAwareMessageListener 接口,以更直观的例子说明一下 SimpleMessageListenerContainer 当中 channel、queue、consumer 之间的关系。
Producer 端代码
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String second="direct.second";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
public final static String RoutingKey2="directKey2";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public Queue queueSecond(){
return new Queue(second);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder绑定Direct与queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
//利用BindingBuilder绑定Direct与queueSecond
@Bean
public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
}
}
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/send")
public void send(HttpServletResponse response) throws InterruptedException, IOException{
for(Integer n=0;n<100;n++){
CorrelationData correlationData=getCorrelationData();
template.convertAndSend("directExchange","directKey1",
" queue1"+" "+n.toString(),correlationData);
template.convertAndSend("directExchange","directKey2",
"queue2"+" "+n.toString(),correlationData);
Thread.currentThread().sleep(30);
}
}
private CorrelationData getCorrelationData(){
return new CorrelationData(UUID.randomUUID().toString());
}
}
Consumer 端代码
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String second="direct.second";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
public final static String RoutingKey2="directKey2";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public Queue queueSecond(){
return new Queue(second);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder绑定Direct与queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
//利用BindingBuilder绑定Direct与queueSecond
@Bean
public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
}
}
@Configuration
public class SimpleMessListener {
@Autowired
private RabbitTemplate template;
@Autowired
private ConnectionConfig connectionConfig;
private int index=0;
@Bean
public SimpleMessageListenerContainer messageContainer(){
SimpleMessageListenerContainer container=new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionConfig.getConnectionFactory());
// 绑定Queue1/Queue2
container.setQueueNames("direct.first");
container.addQueueNames("direct.second");
//设置默认 consumer 数为3
container.setConcurrentConsumers(3);
//设置最大 consumer 数为4
container.setMaxConcurrentConsumers(4);
//标记 consumerTag
container.setConsumerTagStrategy(queue -> "consumer"+(++index));
//绑定ChannelAwareMessageListener显示接收信息
container.setChannelAwareMessageListener(new ChannelAwareMessageListener(){
@Override
public void onMessage(Message message, com.rabbitmq.client.Channel channel)
throws Exception {
// TODO 自动生成的方法存根
// TODO 自动生成的方法存根
Thread thread=Thread.currentThread();
System.out.println("Channel:"+channel.getChannelNumber()
+" ThreadId is:"+thread.getId()
+" ConsumerTag:"+message.getMessageProperties().getConsumerTag()
+" Queue:"+message.getMessageProperties().getConsumerQueue());
}
});
return container;
}
}
运行结果:
观察运行结果可以看到:每个 channel 都在固定的线程中运行,一个 channel 会向不同的 consumer 发送队列信息。了解 channel、thread、queue、consumer 之间的关系,会对 SimpleMessageListenerContainer 有更深入认识。
5.4 DirectMessageListenerContainer
SimpleMessageListenerContainer 是经典的容器,使用 channel 共享,一旦某个 channel 关闭或重启,意味着每个队列 queue 中使用当前 channel 的 consumer 都会受到影响。 有见及此,在 RabbitMQ 2.0 后,系统引入了 DirectMessageListenerContainer ,它允许每个 consumer 都有各自的对应的 channel 的,channel 只管理负责管理当前 consumer 的通道。这样令 consumer 运用更灵活,同时线程并没有跟 channel 绑定,而是由独立的线程池进行管理,这是更好地解决了 SimpleMessageListenerContainer 中上下文切换所带来的资源消耗问题。
下面的例子,我们尝试使用把 consumersPerQueue 设置为 4,并同时监控 direct 模式 exchange 的 direct.first,direct.second 队列。
从管理界面可以看到,系统会为每个 consumer 都生成一个独立的 channel 进行管理。
Producer 端代码
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String second="direct.second";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
public final static String RoutingKey2="directKey2";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public Queue queueSecond(){
return new Queue(second);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder绑定Direct与queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
//利用BindingBuilder绑定Direct与queueSecond
@Bean
public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
}
}
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/send")
public void send(HttpServletResponse response) throws InterruptedException, IOException{
for(Integer n=0;n<100;n++){
CorrelationData correlationData=getCorrelationData();
template.convertAndSend("directExchange","directKey1",
" queue1"+" "+n.toString(),correlationData);
template.convertAndSend("directExchange","directKey2",
"queue2"+" "+n.toString(),correlationData);
Thread.currentThread().sleep(30);
}
}
private CorrelationData getCorrelationData(){
return new CorrelationData(UUID.randomUUID().toString());
}
}
Consumer 端代码
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String second="direct.second";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
public final static String RoutingKey2="directKey2";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public Queue queueSecond(){
return new Queue(second);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder绑定Direct与queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
//利用BindingBuilder绑定Direct与queueSecond
@Bean
public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
}
}
@Configuration
public class DirectMessListener {
@Autowired
private ConnectionConfig connectionConfig;
@Autowired
private RabbitTemplate template;
private int index=0;
@Bean
public DirectMessageListenerContainer messageContainer(){
DirectMessageListenerContainer container=new DirectMessageListenerContainer();
container.setConnectionFactory(connectionConfig.getConnectionFactory());
// 设置每个队列的 consumer 数量
container.setConsumersPerQueue(4);
container.addQueueNames("direct.first");
container.addQueueNames("direct.second");
container.setConsumerTagStrategy(queue -> "consumer"+(++index));
container.setMessageListener(new ChannelAwareMessageListener(){
@Override
public void onMessage(Message message, com.rabbitmq.client.Channel channel)
throws Exception {
// TODO 自动生成的方法存根
// TODO 自动生成的方法存根
Thread thread=Thread.currentThread();
System.out.println("Channel:"+channel.getChannelNumber()
+" ThreadId is:"+thread.getId()
+" ConsumerTag:"+message.getMessageProperties().getConsumerTag()
+" Queue:"+message.getMessageProperties().getConsumerQueue());
}
});
return container;
}
}
通过运行结果进一步可以证实,consumer 信息接收是由独立的线程池进行管理的,并没有与 channel 绑定,每个 consumer 都有自己单独的 channel,即使 channel 发生问题时,也不会对其他的 consumer 发生影响,这正是 DirectMessageListenerContainer 的优胜之处。
5.5 Consumer 的信息接收确认方式
在第四节曾经介绍过在 Producer 端利用 ConfirmCallback / ReturnCallback 监控信息的发送,在这节将为大家在 Consumer 端监控信息的接收。
Consumer 的信息接收确认模式可以通过 AcknowledgeMode 设定,一共有三种模式:NONE、MANUAL、AUTO,默认是 AUTO 模式。其中 NONE 为系统确认,MANUAL 是手动确认。
而 AUTO 为自动模式,系统可以根据执行情况自动发送 ack / nack。如果方法未抛出异常,则发送 ack。如果抛出异常 AmqpRejectAndDontRequeueException 顾名思义消息被拒绝且不会重新加入队列。如果方法抛出非 AmqpRejectAndDontRequeueException 异常,则系统发送 nack 消息重归队列。
Channel 消息接收的常用方法
AcknowledgeMode 配置为 MANUAL 后,用户可通过 Channel 类的 void basicAck(long deliveryTag, boolean multiple) 方法手动确认消息接收是否成功。
若检测到有异常,可通过void basicReject(long deliveryTag, boolean requeue) 或 void basicNack(long deliveryTag, boolean multiple, boolean requeue) 确认是否重新把消息推送。
通过配置 prefetchCount 可设置 consumer 每次接收到的信息数量,系统默认值为 250,这表示当 consumer 队列接收到 250 请求其状态皆为 unacked 时,broker server 将暂停向 consumer 发送消息,待消息处理后再继续。
下面例子中我们尝试把 prefetchCount 设置为 10,即每个 consumer 单次最多接收到的消息为 10 条,并把 consumersPerQueue 设置为 4,然后把 AcknowledgeMode 设置为 MANUAL,通过手动确认消息接收,一旦发生错误,消息重新加入队列。
Producer 端代码
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String second="direct.second";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
public final static String RoutingKey2="directKey2";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public Queue queueSecond(){
return new Queue(second);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder绑定Direct与queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
//利用BindingBuilder绑定Direct与queueSecond
@Bean
public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
}
}
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/send")
public void send(HttpServletResponse response) throws InterruptedException, IOException{
for(Integer n=0;n<100;n++){
CorrelationData correlationData=getCorrelationData();
template.convertAndSend("directExchange","directKey1",
" queue1"+" "+n.toString(),correlationData);
template.convertAndSend("directExchange","directKey2",
"queue2"+" "+n.toString(),correlationData);
}
}
private CorrelationData getCorrelationData(){
return new CorrelationData(UUID.randomUUID().toString());
}
}
运行后可看到 Broker Server 每条 queue 会有 100 条数据处于待处理状态
Consumer 端代码
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
public class BindingConfig {
public final static String first="direct.first";
public final static String second="direct.second";
public final static String Exchange_NAME="directExchange";
public final static String RoutingKey1="directKey1";
public final static String RoutingKey2="directKey2";
@Bean
public Queue queueFirst(){
return new Queue(first);
}
@Bean
public Queue queueSecond(){
return new Queue(second);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(Exchange_NAME);
}
//利用BindingBuilder绑定Direct与queueFirst
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1);
}
//利用BindingBuilder绑定Direct与queueSecond
@Bean
public Binding bindingExchangeSecond(Queue queueSecond, DirectExchange directExchange){
return BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2);
}
}
@Configuration
public class DirectMessListener {
@Autowired
private ConnectionConfig connectionConfig;
@Autowired
private RabbitTemplate template;
private int index=0;
@Bean
public DirectMessageListenerContainer messageContainer(){
DirectMessageListenerContainer container=new DirectMessageListenerContainer();
container.setConnectionFactory(connectionConfig.getConnectionFactory());
// 设置每个队列的 consumer 数量
container.setConsumersPerQueue(4);
// 设置每个 consumer 每次的接收的消息数量为10个
container.setPrefetchCount(10);
// 使用MANUAL进行手动确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.addQueueNames("direct.first");
container.addQueueNames("direct.second");
container.setConsumerTagStrategy(queue -> "consumer"+(++index));
container.setMessageListener(new ChannelAwareMessageListener(){
@Override
public void onMessage(Message message, com.rabbitmq.client.Channel channel)
throws Exception {
Thread thread=Thread.currentThread();
MessageProperties prop=message.getMessageProperties();
try{
System.out.println("Channel:"+channel.getChannelNumber()
+" ThreadId is:"+thread.getId()
+" ConsumerTag:"+prop.getConsumerTag()
+" Queue:"+prop.getConsumerQueue());
//通过Tag单个确认
channel.basicAck(prop.getDeliveryTag(), false);
}catch(Exception ex){
//判定单个接收失败,重新加入consumer队列
channel.basicReject(prop.getDeliveryTag(), true);
}
thread.sleep(1000);
}
});
return container;
}
}
观察信息接收情况,每个 consumer 一次可处理10条信息,对队列进行分批处理。
六、死信队列
死信队列(Dead-Letter-Exchange) 可被看作是死信交换器。当消息在一个队列中变成死信后,它能被重新被发送到特定的交换器中,这个交换器就是DLX ,绑定DLX 的队列就称之为死信队列。消息变成死信一般是由于以下几种情况:
- 消息被拒绝,requeue 被设置为 false, 可通过上一介绍的 void basicReject (deliveryTag, requeue) 或 void basicNack(deliveryTag,multiple, requeue) 完成设置 ;
- 消息过期;
- 队列超出最大长度。
其实死信队列 DLX 也是一个正常的交换器,和一般的交换器没有什么区别,我们可以用一般建立队列的方法,建立一个死信队列。然后建立一个正常的队列,在正常队列中加入参数 x-dead-letter-exchange、x-dead-letter-routing-key 与死信队列进行绑定,完成绑定后在管理界面 Features 选项中 direct.queue.first 会显示 DLX DLK。这时当被绑定的队列出现超时,超长,或被拒绝时(注意requeue被设置为false时,对会激发死信),信息就会流入死信队列被处理。
具体的例子Producer端:
@Configuration
public class BindingConfig {
public final static String Queue_First="direct.queue.first";
public final static String Exchange_Name="directExchange";
public final static String Routing_Key_First="directKey1";
@Bean
public Queue queueFirst(){
return new Queue(this.Queue_First);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(this.Exchange_Name);
}
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First);
}
}
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
System.out.println(host);
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Controller
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/send")
public void send() {
for(int n=0;n<10;n++){
template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"Hello World! "
+String.valueOf(n),getCorrelationData());
}
}
private CorrelationData getCorrelationData(){
return new CorrelationData(UUID.randomUUID().toString());
}
}
Customer 端
@Configuration
public class BindingConfig {
//普通队列参数
public final static String Queue_First="direct.queue.first";
public final static String Exchange_Name="directExchange";
public final static String Routing_Key_First="directKey1";
//死信队列参数
public final static String Queue_Dead="direct.queue.dead";
public final static String Exchange_Dead="directDead";
public final static String Routing_Key_Dead="directDeadKey";
@Bean
public Queue queueFirst(){
Map<String, Object> args=new HashMap<String,Object>();
//声明当前死信的 Exchange
args.put("x-dead-letter-exchange", this.Exchange_Dead);
//声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", this.Routing_Key_Dead);
//把死信队列的参数绑定到当前队列中
return QueueBuilder.durable(Queue_First).withArguments(args).build();
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange(this.Exchange_Name);
}
@Bean
public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
return BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First);
}
@Bean
public Queue queueDead(){
return new Queue(this.Queue_Dead);
}
@Bean
public DirectExchange directExchangeDead(){
return new DirectExchange(this.Exchange_Dead);
}
@Bean
public Binding bindingExchangeDead(Queue queueDead,DirectExchange directExchangeDead){
return BindingBuilder.bind(queueDead).to(directExchangeDead).with(this.Routing_Key_Dead);
}
}
@Configuration
public class ConnectionConfig {
@Value("${spring.rabbitmq.host}")
public String host;
@Value("${spring.rabbitmq.port}")
public int port;
@Value("${spring.rabbitmq.username}")
public String username;
@Value("${spring.rabbitmq.password}")
public String password;
@Value("${spring.rabbitmq.virtual-host}")
public String virtualHost;
@Bean
public ConnectionFactory getConnectionFactory(){
CachingConnectionFactory factory=new CachingConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
return factory;
}
}
@Configuration
public class DirectMessListener {
@Autowired
private ConnectionConfig connectionConfig;
@Autowired
private RabbitTemplate template;
private int index=0,normalIndex=0,deadIndex=0;
@Bean
public DirectMessageListenerContainer messageContainer(){
DirectMessageListenerContainer container=new DirectMessageListenerContainer();
container.setConnectionFactory(connectionConfig.getConnectionFactory());
// 设置每个队列的 consumer 数量
container.setConsumersPerQueue(4);
// 设置每个 consumer 每次的接收的消息数量
container.setPrefetchCount(10);
// 使用MANUAL手动确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 监听队列
container.addQueueNames(BindingConfig.Queue_First);
container.addQueueNames(BindingConfig.Queue_Dead);
container.setConsumerTagStrategy(queue -> "consumer"+(++index));
container.setMessageListener(new ChannelAwareMessageListener(){
@Override
public void onMessage(Message message, com.rabbitmq.client.Channel channel)
throws Exception {
MessageProperties prop=message.getMessageProperties();
if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_First)){
System.out.println("This is a normal queue! "+(++normalIndex));
//把当前的队列转送到死信队列中
channel.basicReject(prop.getDeliveryTag(), false);
}
if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_Dead)){
System.out.println("This is a dead queue! "+(++deadIndex));
//模拟对死信队列处理
Thread.currentThread().sleep(5000);
.......
//处理完毕
channel.basicAck(prop.getDeliveryTag(), false);
}
}
});
return container;
}
}
通过管理界面可以看,信息会先发送到 direct.queue.first,然后被放进死信队列作处理。
运行结果
死信队列最常用的场景可以在订单支付,流程审批等环节。例如在 京、淘 等平台,当下单成功后,客户要在一定的时间内完成支付操作,否则订单被视作无效,这些业务流程就可以使用死信队列来处理。
七、持久化操作
RabbitMq 的持久化操作包含有 Queue 持久化、Message 持久化和 Exchange 持久化三类。
7.1 Queue 的持久化
队列持久化只需要在 Queue 的构造函数 public Queue(String name, boolean durable) 把 durable 参数置为 true 就可实现。如果队列不设置持久化( (durable 默认为 false), 那么在RabbitMQ 服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。
7.2 Message 持久化
设置了Queue 持久化以后,当 RabbitMQ 服务重启之后,队列依然存在,但消息已经消失,可见单单设置队列的持久化而不设置消息持久化显得毫无意义,所以通常列队持久化会与消息持久化共同使用。
在 RabbitMQ 原生态的框架下,需要把信息属性设置为 MessageProperties.PERSISTENT TEXT PLAIN 才会实现消息的持久化。
而在 Spring 框架下,由于在使用回调函数时需要把 Message 重新返回队列再进行处理,所以 Message 默认已经是持久化的。
7.3 Exchage 的持久化
交换器持久化可通过构造函数 public DirectExchange(String name, boolean durable, boolean autoDelete) 把 durable 参数置为 true 就可实现,而 autoDelete 则是指在所在消费者都解除订阅的情况下自动删除。如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是消息不再发送到该 Exchange 。对一个长期使用的交换器来说,持久化还是有其必要性的。
本章总结
RabbitMQ 发展至今,被越来越多的人认可,这和它在易用性、扩展性、可靠性和高可用性等方面的卓著表现是密不可分的。
相比于传统的 ActiveMQ 和分布式 Kafka,它具有自己独有的特点。
希望文章有帮于大家对 RabbitMQ 消息队列方面有更深入的了解,在不同的开发环境中灵活运用。
由于时间仓促,文章当中有不明确的地方或有错漏敬请点明。
对 JAVA 开发有兴趣的朋友欢迎加入QQ群:833145934 里面资深架构师会分享一些整理好的录制视频录像和BATJ面试题:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多。
共同探讨!