1.作用
- 在大多应用中,通过消息服务中间件来提升系统异步通信,扩展解耦能力。
- 消息服务(
消息代理和目的地
)- 当消息发送者发送消息以后,将由消息代理管理,然后消息代理把消息传递到目的地
- 消息队列的俩种目的地
- 1.队列(queue):点对点消息通信(p-to-p)
- 2.主题(topic):发布/订阅消息通信(p/s)
2.消息的消费方式
- 1.点对点式
- 生产者生产并发送消息,消息代理将这些产生的消息放入队列中,消费者从队列这把这些消息获取并消费掉,当消息被读取后将被移出队列
- 消息只有唯一的发送者和接受者,但并不是只能有一个接收者。简记:
一生一代多消费
- 2.发布订阅式
- 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么接收者就会在消息到达时间同时收到消息。
类似于微信公众号,微信用户订阅这个公众号(主题),当公众号拥有者将消息发送到公众号,关注了这个公众号的所有人都可以同时收到消息
- 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么接收者就会在消息到达时间同时收到消息。
- 3.JMS(Java Message Service) JAVA消息服务
- 基于JVM消息代理的规范。ActiveMQ,HornetMQ是JMS实现
- 4.AMQP(Advanced Message Queuing Protocol)
- 高级消息队列协议,也是一个消息代理的规范,兼容JMS
- RabbitMQ就是AMQP的实现
3.支持
- 1.Spring支持
- spring-jms提供了对JMS的支持
- spring-rabbit提供了对AMQP的支持
- 通过实现ConnectionFactory工厂来连接消息代理
- 提供JmsTemplate,RabbitTemplate来发送消息
- @JmsListener(JMS),@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息
- @EnableJms,@EnableRabbit开启支持
- 2.SpringBoot自动配置
- JmsAutoConfiguration
- RabbitAutoConfiguration
4.RabbitMQ使用及概念
1.十大核心
- 1.Message(消息):由消息头和消息体组成。消息体是不透明的,消息头由routing-key(路由键),priority(相对于其他消息的优先权),delivery-mode(指出该消息可能需要持久化存储)等可选属性组成
- 2.Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
- 3.Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Exchange有四种类型:direct(默认),fanout,topic和headers,类型不同,转发消息的策略也不同
- 4.Queue:消息队列,用来保存消息直到发送给消费者。一个消息可放入一个或多个队列中,消息如果没有被消费,将一直存在队列里面。
- 5.Binding:绑定,用于消息队列和交换器之间的关联。所有交换器可以理解成一个由绑定构成的路由表。
Exchange和Queue的绑定可以是多对多的关系。
- 6.Connection:网络连接,比如一个TCP连接。
- 7.Channel:信道,多路复用连接中的一条独立的双向数据流通道。
- 8.Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
- 9.Virtual Host:虚拟主机,表示一批交换器,消息队列和相关对象。每个vhost本质上是一个小型的RabbitMQ服务器,vhost是AMQP概念的基础,必须在连接时指定,RabbitMQ默认的vhost是/。
- 10.Broker:表示消息队服务器实体。
注:在使用RabbitMQ图形管理时(程序控制也一样),交换器中的routing key中的路由规则#代表匹配一个或多个单词,*代表匹配一个单词
2.springboot整合并使用RabbitMQ
- 1.导入所需的pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
- 2.简单测试使用
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private AmqpAdmin amqpAdmin;
@Test
public void create(){
// 创建交换器
amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
// 创建队列
amqpAdmin.declareQueue(new Queue("amqpadmin.queue", true));
// 创建绑定规则
amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE, "amqpadmin.exchange", "amqp.hhh", null));
// 删除队列
amqpAdmin.deleteQueue("amqpadmin.queue");
// 删除交换器
amqpAdmin.deleteExchange("amqpadmin.exchange");
}
/**
* 1.单播(点对点)
*/
@Test
public void sendMessagePTP() {
// Message需要自己构造一个,定义消息
// rabbitTemplate.send(exchange, routeKey, message);
// object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbit
// rabbitTemplate.convertAndSend(exchange, routeKey, object);
Map<String, Object> map = new HashMap<>();
map.put("msg", "小小侦探团");
map.put("data", Arrays.asList("柯南", "灰原哀", 123, true));
// 对象被默认序列化以后发送出去
rabbitTemplate.convertAndSend("exchange.direct", "moying.news", map);
}
// 接受数据,如果想要json数据,在配置里面配置
@Test
public void receive(){
Object o = rabbitTemplate.receiveAndConvert("moying.news");
System.out.println(o.getClass());
System.out.println(o);
}
/**
* 广播发送消息
*/
@Test
public void sendMsg(){
rabbitTemplate.convertAndSend("exchange.fanout", "", new Book("奥特曼", "lily"));
}
配置json序列化的config如下:
package com.moying.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
注:在使用springboot测试类测试的时候,注意springboot的版本,如果版本是2.2以下,是要导入junit5,并且在使用单元测试的时候可能会发生自动注入为空的情况,导致空指针异常,因此我使用的是2.2以上版本。
- 3.springboot使用注解消费消息
在启动类开启RabbitMQ注解
@EnableRabbit // 开启基于注解的RabbitMQ模式
在方法上使用注解监听
// 只有队列中有注解就调用这个方法
@RabbitListener(queues = "moying.news")
public void receive(Book book){
System.out.println("收到消息:" + book);
}
// 监听消息的body和消息的信息
@RabbitListener(queues = "moying")
public void receiveMsg(Message message){
System.out.println(message.getBody());
System.out.println(message.getMessageProperties());
}