本文章是在网易云课堂的课程学习中编写,部分图片从网易云课堂ppt引用
一、RabbitMQ简介
是一个开源的AMQP实现
二、RabbitMQ安装运行
1、安装依赖环境
在 http://www.rabbitmq.com/which-erlang.html 页面查看安装rabbitmq需要安装erlang对应的版本
在 https://github.com/rabbitmq/erlang-rpm/releases 页面找到需要下载的erlang版本,
erlang-*.centos.x86_64.rpm
就是centos版本的。-
复制下载地址后,使用wget命令下载
wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v21.2.3/erlang-21.2.3-1.el7.centos.x86_64.rpm
-
安装 Erlang
sudo rpm -Uvh /home/download/erlang-21.2.3-1.el7.centos.x86_64.rpm
-
安装 socat
sudo yum install -y socat
2、安装RabbitMQ
-
在官方下载页面找到CentOS7版本的下载链接,下载rpm安装包
wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.9/rabbitmq-server-3.7.9-1.el7.noarch.rpm
提示:可以在
https://github.com/rabbitmq/rabbitmq-server/tags下载历史版本
-
安装RabbitMQ
sudo rpm -Uvh /home/download/rabbitmq-server-3.7.9-1.el7.noarch.rpm
3、启动和关闭
- 启动服务
sudo systemctl start rabbitmq-server
若启动报错,可查看日志信息
-
查看状态
sudo systemctl status rabbitmq-server
-
停止服务
sudo systemctl stop rabbitmq-server
-
设置开机启动
sudo systemctl enable rabbitmq-server
4、RabbitMQ基本配置
RabbitMQ有一套默认的配置,一般能满足日常开发需求。若需要修改,需要自己创建一个配置文件
touch /etc/rabbitmq/rabbitmq.conf
官网配置项说明:
https://www.rabbitmq.com/configure.html
5、RabbitMQ管理界面
RabbitMQ安装包中带有管理插件,但要手动激活
-
开启插件
rabbitmq-plugins enable rabbitmq_management
说明:rabbitmq有一个默认的guest用户,但只能通过localhost访问,所以需要添加一个能够远程访问的用户。
-
添加用户
rabbitmqctl add_user admin admin
-
为用户分配操作权限
rabbitmqctl set_user_tags admin administrator
在一个RabbitMQ中,可以划分多个虚拟主机
- 为用户分配资源权限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
配置完毕后,可以在浏览器打开 15672 端口的控制台页面,账号密码是 admin/admin
6、RabbitMQ端口
RabbitMQ会绑定一些端口,安装完后并启动服务后,还不能进行外部通信,需要将这些端口添加至防火墙。
-
添加端口
sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
-
重启防火墙
sudo firewall-cmd --reload
下面对一些端口做一下介绍:
4369
是Erlang的端口/结点名称映射程序,用来跟踪节点名称监听地址,在集群中起到一个类似DNS的作用。5672,5671
一般使用的是5672端口,没有使用SSL。使用SSL的话,是用5671端口。25672
可理解为是用于管理的端口,用于RabbitMQ节点间和CLI工具通信,配合4369使用。15672
HTTP管理端口,通过这个端口打开web可视化的管理页面,用于管理RabbitMQ,需要启用management插件。61613, 61614
插件相关的端口,当STOMP插件启用的时候打开,作为STOMP客户端端口(根据是否使用TLS选择)1883, 8883
插件相关的端口,当MQTT插件启用的时候打开,作为MQTT客户端端口(根据是否使用TLS选择) 。默认使用的是188315674
基于WebSocket的STOMP客户端端口(当插件Web STOMP启用的时候打开)15675
基于WebSocket的MQTT客户端端口(当插件Web MQTT启用的时候打开)
7、RabbitMQ角色
none
不能访问management插件management
查看自己的virtual hosts中的queues、exchanges、bindings等资源policymaker
比management角色多了些功能,专门用来管理相关的策略。比如查看、创建和删除自己的virtual hosts所属的policies和parametersmonitoring
比management角色多了些功能,主要用来监控。可查看所有virtual hosts,其他用户的connections、channels,节点级别的数据(比如clustering、memory情况)等administrator
权限最大的角色
三、RabbitMQ的简单使用
1、maven依赖
- 在Java中使用RabbitMQ
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.1</version>
</dependency>
</dependencies>
- 在Spring中使用RabbitMQ
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、队列生产者
public class Producer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setHost("192.168.100.242");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3、从连接工厂获取连接
connection = factory.newConnection("生产者");
// 4、从链接中创建通道。一个连接可以创建多个channel
channel = connection.createChannel();
/**
* 5、声明(创建)队列
* 如果队列不存在,才会创建
* RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
*
* queueDeclare参数说明:
* @param queue 队列名称
* @param durable 队列是否持久化
* @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制
* @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
* @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
*/
channel.queueDeclare("queue1", false, false, false, null);
// 消息内容
String message = "Hello World!";
// 6、发送消息
channel.basicPublish("", "queue1", null, message.getBytes());
System.out.println("消息已发送!");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 7、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
3、队列消费者
public class Consumer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setHost("192.168.100.242");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3、从连接工厂获取连接
connection = factory.newConnection("消费者");
// 4、从链接中创建通道
channel = connection.createChannel();
/**
* 5、声明(创建)队列
* 如果队列不存在,才会创建
* RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
*
* queueDeclare参数说明:
* @param queue 队列名称
* @param durable 队列是否持久化
* @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,
* 并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制。
* 一般在队列和交换器绑定时使用
* @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
* @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
*/
channel.queueDeclare("queue1", false, false, false, null);
// 6、定义收到消息后的回调
DeliverCallback callback = new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
}
};
// 7、监听队列
channel.basicConsume("queue1", true, callback, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
}
});
System.out.println("开始接收消息");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 8、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 9、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
四、AMQP协议
AMQP ( Advanced Message Queuing Protocol) 高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。可基于实现AMQP,来实现消息中间件。
1、AMQP结构
2、流转过程
AMQP生产者流转过程
connection - channel - publish - close
AMQP消费者流转过程
五、RabbitMQ核心概念
核心概念
- Producer:生产者,创建消息发布到RabbitMQ中
- Consumer:消费者,获取消息体
- Broker:消息中间件的服务节点
- 虚拟主机:每个broker可定义多个虚拟主机,像mysql-server可以定义多个db。RabbitMQ默认的vhost(虚拟主机)是 /
- connection:连接,一个connection可以创建任意个channel
- channel:建立在connection上的通道
- queue:队列,用于存储消息
- RoutingKey:路由键,需要与交换类型和绑定键(BindingKey)结合使用。生产者发送消息给交换器时,会指定一个RoutingKey,指定路由规则
- Binding:绑定,将交换器与队列关联起来
-
exchange:交换器,将生产者发来的消息路由到一个或多个队列。若路由不到,则根据生产者的属性配置,返回给生产者或直接丢弃。
exchange有四种模式,fanout、direct、topic、headers模式,若不指定交换机,则使用默认交换机,根据消息中指定的 queue 的名称,匹配到对应的queue。
fanout模式:绑定了的所有queue都会收到消息;
direct模式:将消息路由到BindingKey和Routing Key完全匹配的队列;
topic模式:与direct类似,但可通过通配符进行模糊匹配。* 代表一个单词,# 代表多个单词
headers模式:根据消息中的 header 属性匹配。
整体运转流程:
使用 exchange
1、生产者将消息发送到topic类型的交换器上,和routing的用法类似,都是通过routingKey路由,但topic类型交换器的routingKey支持通配符
// 路由关系如下:com.# --> queue-1 *.order.* ---> queue-2
// 消息内容
String message = "Hello";
// 发送消息到topic_test交换器上
channel.basicPublish("topic-exchange", "com.order.create", null, message.getBytes());
System.out.println("消息 " + message + " 已发送!");
2、消费者通过一个临时队列和交换器绑定,接收发送到交换器上的消息
final String queueName = Thread.currentThread().getName();
try {
// 3、从连接工厂获取连接
connection = factory.newConnection("消费者");
// 4、从链接中创建通道
channel = connection.createChannel();
// 定义消息接收回调对象
DeliverCallback callback = new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(queueName + " 收到消息:" + new String(message.getBody(), "UTF-8"));
}
};
// 监听队列
channel.basicConsume(queueName, true, callback, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
}
});
System.out.println(queueName + " 开始接收消息");
System.in.read();
3、使用 fanout 型交换器实现 发布订阅模式。
启动Consumer 类会开启两个消费者,Producer 类运行后,两个消费者都能接收到消息
【注意】利用临时队列,可随时添加一个queue,且不会互相影响。比如可以启多个消费者服务,则exchange可以绑定多个临时队列,从而收到发往exchange的消息。即发布-订阅思想
1)消费者:通过一个临时队列和交换器绑定,接收发送到交换器上的消息
public class Consumer {
private static Runnable receive = new Runnable() {
public void run() {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setHost("192.168.100.242");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
Channel channel = null;
final String clientName = Thread.currentThread().getName();
try {
// 3、从连接工厂获取连接
connection = factory.newConnection("消费者");
// 4、从链接中创建通道
channel = connection.createChannel();
// 代码定义交换器,不管是生产者或消费者都可以定义
channel.exchangeDeclare("ps_test", "fanout");
// 还可以定义一个临时队列,连接关闭后会自动删除,此队列是一个排他队列
String queueName = channel.queueDeclare().getQueue();
// 将队列和交换器绑定
channel.queueBind(queueName, "ps_test", "");
// 定义消息接收回调对象
DeliverCallback callback = new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(clientName + " 收到消息:" + new String(message.getBody(), "UTF-8"));
}
};
// 监听队列
channel.basicConsume(queueName, true, callback, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
}
});
System.out.println(clientName + " 开始接收消息");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 8、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 9、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
new Thread(receive, "c1").start();
}
}
2)生产者:将消息发送到fanout类型的交换器上
public class Producer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setHost("192.168.100.242");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3、从连接工厂获取连接
connection = factory.newConnection("生产者");
// 4、从链接中创建通道
channel = connection.createChannel();
// 定义fanout类型的交换器
channel.exchangeDeclare("ps_test", "fanout");
// 消息内容
String message = "Hello Publish";
// 发送消息到ps_test交换器上
channel.basicPublish("ps_test", "", null, message.getBytes());
System.out.println("消息 " + message + " 已发送!");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 7、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
4、实际使用中,若往队列queue1中发送多条消息,queue1中堆积了大量消息,要如何加入消息的处理?
——可以创建消费者集群
basicQos参数
- 控制推送消息的大小,提前预处理机制,有助于提高数据处理效率。
- 在并发低于 1w 情况下,一般使用默认值即可,也可根据业务情况调整。若设置的太小,操作太频繁,不好。若设置的太大,某个消费者堆积处理太多消息,其他消费者分不到消息处理。
// 同一时刻,服务器会发送10条消息给消费者
channel.basicQos(10);