一、简介
1.1 什么是AMQP
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
1.2 什么是RabbitMQ
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。
1.3 RabbitMQ 的特点
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
- 可靠性(Reliability):RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认
- 灵活的路由(Flexible Routing):在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange
- 消息集群(Clustering):多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker
- 高可用(Highly Available Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用
- 多种协议(Multi-protocol):RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等
- 多语言客户端(Many Clients):RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等
- 管理界面(Management UI):RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面
- 跟踪机制(Tracing):如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么
- 插件机制(Plugin System):RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件
1.4 RabbitMQ 的基本概念
下面展示了RabbitMQ 消息的过程,我们会围绕这张图学习一下 RabbitMQ 的一些基本概念
1.4.1 生产者与消费者
与其它的消息中间件一样,RabbitMQ中包含消息生产者和消息消费者,生产者创建消息发布到代理服务器,消费者从代理服务器获取消息。在实际应用中,生产者和消费者之间的角色是可以相互转换的。
1.4.2 消息
消息由有效载荷(payload)和标签(label)组成。有效载荷就是你想要传输的数据,可以是任何内容,一个数组,一个集合,甚至二进制数据都可以。而标签描述了有效载荷,由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
1.4.3 信道
多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
1.4.4 交换器与绑定(原文)
交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:
- fanout:把所有发送到该Exchange的消息路由到所有与它绑定的Queue中
- direct:把消息路由到bindingKey与routingKey完全匹配的Queue中
- topic:把消息路由到bindingKey与routingKey模糊匹配的Queue中
- headers:headers类型的Exchange不依赖于routingKey与bindingKey的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配
1.4.4.1 Fanout
- 生产者(P)发送到Exchange(X)的所有消息都会路由到图中的两个Queue,并最终被两个消费者(C1与C2)消费
- 如果配置了routing_key会被忽略
1.4.4.2 direct
- routingKey=”error”发送消息,则会同时路由到Queue1(amqp.gen-S9b…)和Queue2(amqp.gen-Agl…)
- routingKey=”info”或routingKey=”warning”发送消息,则只会路由到Queue2
- 以其它routingKey发送消息,则不会路由到这两个Queue中
1.4.4.3 topic
- routingKey=”quick.orange.rabbit”发送信息,则会同时路由到Q1与Q2
- routingKey=”lazy.orange.fox”发送信息,则只会路由到Q1
- routingKey=”lazy.brown.fox”发送消息,则只会路由到Q2
- routingKey=”lazy.pink.rabbit”发送消息,则只会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配)
- routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”发送消息,则会被丢弃,它们并没有匹配任何bindingKey
1.4.4.4 header
headers类型的Exchange不依赖于routingKey与bindingKey的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
1.5 虚拟主机
一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”
二、Spring Boot 2.0 整合RabbitMQ
2.1 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2 配置文件
spring.application.name=rabbitmq
#IP地址
spring.rabbitmq.host=127.0.0.1
#rabbitmq默认端口号
spring.rabbitmq.port=5672
#账户名和密码
spring.rabbitmq.username=simon
spring.rabbitmq.password=123456
2.3 rabbitmq 配置
@Configuration
public class RabbitConfig {
//fanout 把所有发送到该Exchange的消息路由到所有与它绑定的Queue中
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
public static final String FANOUT_EXCHANGE = "fanout.exchange";
//redirect 把消息路由到bindingKey与routingKey完全匹配的Queue中
public static final String DIRECT_QUEUE = "direct.queue";
public static final String DIRECT_EXCHANGE = "direct.exchange";
//topic 把消息路由到bindingKey与routingKey模糊匹配的Queue中
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
public static final String TOPIC_EXCHANGE = "topic.exchange";
/**
* Fanout模式 队列1
*
* @return
*/
@Bean
public Queue fanoutQueue1() {
return new Queue(FANOUT_QUEUE1);
}
/**
* Fanout模式 队列2
*
* @return
*/
@Bean
public Queue fanoutQueue2() {
return new Queue(FANOUT_QUEUE2);
}
/**
* Fanout模式 交换器
*
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}
/**
* Fanout模式 队列1与交换器绑定
*
* @return
*/
@Bean
public Binding fanoutBinding1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
/**
* Fanout模式 队列2与交换器绑定
*
* @return
*/
@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
/**
* direct模式 队列
*
* @return
*/
@Bean
public Queue directQueue() {
return new Queue(DIRECT_QUEUE);
}
/**
* direct模式 队列与交换机绑定
*
* @return
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}
@Bean
public Binding directBinding1() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with("rabbitmq.springboot");
}
/**
* Topic模式 队列1
*
* @return
*/
@Bean
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE1);
}
/**
* Topic模式 队列2
*
* @return
*/
@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE2);
}
/**
* Topic模式 交换器
*
* @return
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
/**
* Topic模式 队列1与交换器绑定
*
* @return
*/
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("rabbitmq.springboot");
}
/**
* Topic模式 队列2与交换器绑定
*
* @return
*/
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("rabbitmq.#");
}
}
2.4 消息实体
public class User implements Serializable {
private String id;
private String username;
private String password;
public User(String id, String username, String password) {
this.id = id;
this.username = username;
this.password = password;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
2.5 消息发送
package priv.simon.springboot.rabbitmq.sender;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import priv.simon.springboot.rabbitmq.config.RabbitConfig;
import priv.simon.springboot.rabbitmq.entity.User;
/**
* Created by simon on 2018/12/20.
*/
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
/**
* fanout 发送信息
*
* @param user 用户信息
*/
public void fanoutSend(User user) {
this.rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE, "", user);
}
/**
* direct 发送信息
*
* @param user 用户信息
*/
public void directSend(User user) {
this.rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, "rabbitmq.springboot", user);
}
/**
* topic 发送信息
*
* @param user 用户信息
*/
public void topicSend(User user) {
this.rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE,"rabbitmq.springboot", user);
this.rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, "rabbitmq.springcloud", user);
}
}
2.6 消息消费
@Component
public class Receiver {
@RabbitListener(queues = RabbitConfig.FANOUT_QUEUE1)
public void receiveFanout1(User user) {
System.out.println("fanout_queue1接收消息" + user);
}
@RabbitListener(queues = RabbitConfig.FANOUT_QUEUE2)
public void receiveFanout2(User user) {
System.out.println("fanout_queue2接收消息" + user);
}
@RabbitListener(queues = RabbitConfig.DIRECT_QUEUE)
public void receiveDirect1(User user) {
System.out.println("direct_queue接收消息" + user);
}
@RabbitListener(queues = RabbitConfig.TOPIC_QUEUE1)
public void receiveTopic1(User user) {
System.out.println("topic_queue1接收消息" + user.toString());
}
@RabbitListener(queues = RabbitConfig.TOPIC_QUEUE2)
public void receiveTopic2(User user) {
System.out.println("topic_queue2接收消息" + user.toString());
}
}
2.7 测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {
@Autowired
private Sender sender;
@Test
public void testFanout(){
User user=new User("1","simon","fanout");
sender.directSend(user);
}
@Test
public void testDirect(){
User user=new User("1","simon","direct");
sender.directSend(user);
}
@Test
public void testTopic(){
User user=new User("1","simon","topic");
sender.directSend(user);
}
}