MQ(Message Queue)消息队列
通过典型的生产者和消费者模式,生产者不断向消息队列生产消息,消费者不断从队列中获取消息。生产和消费的过程都是异步的,实现系统间的解耦。
RabbitMQ
使用erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点的发布和订阅)、可靠性、安全。
AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求其次。
RabbitMQ安装
拉取镜像
docker pull rabbitmq:3.7.7-management
启动容器并设置用户密码
docker run -d --name rabbitmq --hostname rabbitmq -p 5672:5672 -p 15672:15672 -v /home/rabbitmq/data:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_VHOST=myvhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123 rabbitmq:3.7.7-management
-d 后台运行容器;
--name 指定容器名;
--hostname 为启动后的rabbitmq指定名称,集群时启动多个rabbitmq需要分别指定hostname,以此区分
-p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);
-v 映射目录或文件;
-e 指定环境变量;
RABBITMQ_DEFAULT_VHOST:默认虚拟主机名(一个hostname下有可以有多个 virtual host);
RABBITMQ_DEFAULT_USER:默认的用户名;
RABBITMQ_DEFAULT_PASS:默认用户名的密码
通过浏览器访问web管理界面
http://****:15672
虚拟主机
虚拟主机概念是RabbitMQ的核心,在用户未自定义虚拟主机前已经内置有虚拟主机,在使用RabbitMQ中,可以进行自定义配置虚拟主机.一个虚拟主机中可以含有多个队列信息。
虚拟主机最大的好处在于可以根据不同的用户分配不同的操作空间。
使用
首先,创建虚拟主机
创建用户,并将用户绑定到虚拟主机上
Java整合RabbitMQ
# IDEA创建一个maven项目
# 在pom.xml导入依赖
<!--引入rabbitmq的相关依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.3</version>
</dependency>
Hello World 模型
//消息生产者
public class Provider {
@Test
public void testSendMessage() throws IOException, TimeoutException {
//创建连接mq的连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置主机ip
connectionFactory.setHost("120.***");
//设置端口
connectionFactory.setPort(5672);
//设置虚拟主机
connectionFactory.setVirtualHost("/ems");
//设置虚拟主机的账户密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接通道
Channel channel = connection.createChannel();
//通道绑定对应消息队列
//参数1:队列名称,不存在是自动创建
//2:队列是否持久化,true为 队列持久化 ,但并不对消息持久化,重启后消息会丢失,
// false时队列消息都消失
//3:是否独占队列,true独占
//4:消费完成后是否自动删除队列,true为自动删除队列。
//并且消费者也需要设置为true,在消费者消费完关闭通道后就会自动删除
//5:附加参数
channel.queueDeclare("hello", false, false, false, null);
//发布消息
//参数1:交换机名称;2:队列名称;
// 3:传递消息额外设置;设置为null时,消息不能持久化。
// MessageProperties.PERSISTENT_TEXT_PLAIN可实现消息持久化
// 4:消息的具体内容
channel.basicPublish("","hello",null,"hello zhangsan...".getBytes());
channel.close();
connection.close();
}
}
//消费者消费消息
public class Customer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("120.*");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("123");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
//消费消息
//参数1:消息队列名称;2:开始消息的自动确认机制,3:消费时的毁掉接口
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("new String(body) = " + new String(body));
}
});
//此时最好不要关闭channel和connection,才能保证消费者始终监听消费队列中的情况。
//channel.close();
//connection.close();
}
}
Work Queue模型(平均分配消息)
当消息处理比较耗时的时候,生产消息的速度远远大于消费的速度。长此以往,消息会慢慢堆积越来越多,无法及时处理。
此时可以采用work queue模型,让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费就会消失,不会被重复执行。
测试结果是
小结:
默认情况下,RabbitMQ将按顺序对每个消息发送给下一个使用中,每个消费者会收到相同数量的消息,这种分发消息的方式称为循环。
消息自动确认机制
RabbitMQ在分配消息给消费者时,无论消费者不论是否完成消息处理,都会自动确认消息:如下autoAck:true。因此消息会堆积在通道中。
将autoAck:false时 关闭 自动确认消息,处理完消息才会向队列确认消息。并设置通道中一次只能有一个消息,channel.basicQos(1);
这样就能实现消费者之间能者多劳。
由于关闭了自动确认消息,所以需要手动确认。见如下代码channel.basicAck(envelope.getDeliveryTag(),false);
参数1:手动确认消息,参数2:false每次确认一个
否则,会出现消息未确认的情况:
消费者代码改进,实现消费者能者多劳。
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = Utils.getConnection();
//获取连接通道
Channel channel = connection.createChannel();
//通道中一次只能有一个消息。
channel.basicQos(1);
channel.queueDeclare("work", true, false, false, null);
//消费消息
//参数1:消息队列名称;
// 2:消息的自动确认机制,true时自动确认消息,不论消息是否已经处理完成。消费者都会向队列自动确认,消息可以堆积在通道中
// false时 关闭 自动确认消息,处理完消息才会向队列确认消息,。
// 3:消费时的毁掉接口
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 = " + new String(body));
//由于关闭了自动确认消息,所以需要手动确认
//参数1:手动确认消息,参数2:false每次确认一个
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
fanout 广播模型
生产者发送消息,只能发送到交换机,交换机把消息发送给绑定过的所有的队列,实现一条消息被多个消费者使用。
#广播模式的生产者
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = Utils.getConnection();
Channel channel = connection.createChannel();
//将通道绑定交换机,
//参数1:交换机名称
//参数2:交换机类型,fanout为广播模型
channel.exchangeDeclare("logs","fanout");
//发布消息
channel.basicPublish("logs","",null,"fanout test".getBytes());
Utils.closeConnectionAndChannel(channel,connection);
}
}
# 广播模型消费者1
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = Utils.getConnection();
Channel channel = connection.createChannel();
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//临时队列绑定交换机
channel.queueBind(queue,"logs","");
channel.basicConsume(queue,true,new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 = " + new String(body));
}
})
}
}
Routing路由模型
Direct 直连
在fanout模式下,一条消息会被所有订阅的队列都消费。
但是在某些场景下,我们希望不同的消息被不同的队列消费,这时就要用到Direct类型的exchange。
Direct类型下:
- 队列和交换机不再是任意绑定。需要指定一个队列RoutingKey。
- 消息的生产者向Exchange发送消息时,也必须指定消息的RoutingKey。
- Exchange根据消息的 RoutingKey与队列RoutingKey进行判断,两者一致时队列才会收到消息。
图解过程:
- P生产者:向Exchange发送消息,并指定一个消息RoutingKey
- X交换机:接收生产者的消息,然后把消息传递给 RoutingKey匹配的队列。
- C1消费者:其所在队列指定了只需要RoutingKey为error的消息。
- C2消费者:其所在队列指定了只需要RoutingKey为info、error、warning的消息。
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = Utils.getConnection();
Channel channel = connection.createChannel();
//将通道声明指定交换机,
//参数1:交换机名称
//参数2:交换机类型,direct直连类型
channel.exchangeDeclare("directLogs","direct");
//设置消息的RoutingKey
String routingKey="error";
channel.basicPublish("directLogs",routingKey,null,"direct模式的error1111111111111".getBytes());
Utils.closeConnectionAndChannel(channel,connection);
}
}
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = Utils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("directLogs","direct");
String queue = channel.queueDeclare().getQueue();
//临时队列绑定交换机,并指定队列的RoutingKey。此时只接收RoutingKey为info的消息
channel.queueBind(queue,"directLogs","info");
//channel.queueBind(queue,"directLogs","error");
channel.basicConsume(queue,true,new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者info = " + new String(body));
}
});
}
Topic 订阅
Topic类型和Direct类型都是根据RoutingKey把消息路由到不同的队列。只不过Topic类型可以让队列在绑定RoutingKey的时候使用通配符。Topic的RoutingKey一般都是由一个或多个单词组成,多个单词之间以“.”分割。
通配符
“ * ”:匹配一个单词
“ # ”:匹配一个或多个单词
#Topic之生产者
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = Utils.getConnection();
Channel channel = connection.createChannel();
//声明指定交换机,
//参数2:交换机类型,topic
channel.exchangeDeclare("topics","topic");
//设置消息的RoutingKey
String routingKey="user.save.all";
channel.basicPublish("topics",routingKey,null,"topic模式的测试".getBytes());
Utils.closeConnectionAndChannel(channel,connection);
}
}
#topic之消费者
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = Utils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topics","topic");
String queue = channel.queueDeclare().getQueue();
//临时队列绑定交换机,并指定队列的RoutingKey。此时只接收RoutingKey为user接任意个数字符的消息
channel.queueBind(queue,"topics","user.#");
channel.basicConsume(queue,true,new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者user.save = " + new String(body));
}
});
}
}
SpringBoot整合RabbitMQ
<!-- rabbitmq 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
#配置文件application.properties
spring.application.name=rabbitmq-springboot
spring.rabbitmq.host=120.79.28.120
spring.rabbitmq.port=5672
spring.rabbitmq.username=ems
spring.rabbitmq.password=123
spring.rabbitmq.virtual-host=/ems
#简单测试 helloworld 模式
#生产者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
//注入RabbitMQTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
rabbitTemplate.convertAndSend("hello","hello,world.....");
}
}
# 消费者
/**
* @RabbitListener 声明消费者监听
* queuesToDeclare声明一个队列
*/
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "hello"))
public class HelloCustomer {
/*@RabbitHandler声明receive()为处理消息队列的回调方法*/
@RabbitHandler
public void receive(String message) {
System.out.println("-------------------message=" + message);
}
}
#work模型
#生产者
@Test
public void testWork() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work", "work model " + i);
}
}
#两个消费者
@Component
public class WorkCustomer {
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
#广播模式
#生产者
@Test
public void testFanout() {
/**
* 参数1:exchange
* 参数2:routingKey
*/
rabbitTemplate.convertAndSend("fanout","", "fanout model ");
}
#消费者
@Component
public class FanoutCustomer {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,//创建临时队列
exchange = @Exchange(value = "fanout",type = "fanout"))//绑定交换机
})
public void receive1(String m){
System.out.println("receive1 = " + m);
}
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,//创建临时队列
exchange = @Exchange(value = "fanout",type = "fanout"))//绑定交换机
})
public void receive2(String m){
System.out.println("receive2 = " + m);
}
}
#路由模式
#生产者
@Test
public void testRouting() {
/**
* 参数1:exchange
* 参数2:routingKey
*/
rabbitTemplate.convertAndSend("direct","info", "direct模式进行info路由发送 ");
}
#消费者
@Component
public class RoutingCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//创建临时队列
exchange = @Exchange(value = "direct",type = "direct"),//自定义交换机和指定模型
key = {"info","error"}//指定路由Key
)
})
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,//创建临时队列
exchange = @Exchange(value = "direct",type = "direct"),//自定义交换机和指定模型
key = {"error"}//指定路由Key
)
})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
#此时,receive1有路由key(“info”),所以只有receive1能够收到消息。
#Topic模式,订阅模式,动态路由
#生产者
@Test
public void testTopic() {
/**
* 参数1:exchange
* 参数2:routingKey
*/
rabbitTemplate.convertAndSend("topic","user.save", "user.save模式进行发送 ");
}
#消费者
@Component
public class TopicCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "topic",type = "topic"),//交换机=topic,模式type=topic
key = {"user.*","order.#"}//*匹配一个字符串,#匹配0个或多个
)
})
public void receive1(String message){
System.out.println("message1 = " + message);
}
}
RabbitMQ 使用场景
解耦
(为面向服务的架构(SOA)提供基本的最终一致性实现)
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。
传统模式的缺点:
- 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败
- 订单系统与库存系统耦合
引入消息队列
- 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
- 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
- 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦
- 为了保证库存肯定有,可以将队列大小设置成库存数量,或者采用其他方式解决。
基于消息的模型,关心的是“通知”,而非“处理”。
短信、邮件通知、缓存刷新等操作使用消息队列进行通知。
异步提升效率
场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种
1.串行的方式;2.并行方式
- 串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
- 并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
- 引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:
流量削峰
应用场景:流量削锋是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。可以在前端加入消息队列。
引入消息队列的优缺点
优点
优点就是以上的那些场景应用,就是在特殊场景下有其对应的好处,解耦、异步、削峰。缺点
- 系统的可用性降低
系统引入的外部依赖越多,系统越容易挂掉,本来只是A系统调用BCD三个系统接口就好,ABCD四个系统不报错整个系统会正常运行。引入了MQ之后,虽然ABCD系统没出错,但MQ挂了以后,整个系统也会崩溃。 - 系统的复杂性提高
引入了MQ之后,需要考虑的问题也变得多了,如何保证消息没有重复消费?如何保证消息不丢失?怎么保证消息传递的顺序? - 一致性问题
A系统发送完消息直接返回成功,但是BCD系统之中若有系统写库失败,则会产生数据不一致的问题。
RabbitMQ 集群
普通模式(同步交换机)
实现 rabbitMQ 的高可用集群,一般在并发和数据量不高的情况下,这种模式非常的好用且简单。
队列中的所有消息将在所有节点之间复制,但是消息队列只位于主节点上。
生产者发布的消息只放在Master节点的交换机上。消费者消费时可以从slave节点中消费消息,但如果master节点宕机,slave节点也无法进行消费。
镜像集群模式(队列也能同步)
保证 100% 数据不丢失。在实际工作中也是用得最多的,并且实现非常的简单,一般互联网大厂都会构建这种镜像集群模式。
镜像队列,目的是为了保证 rabbitMQ 数据的高可靠性解决方案,主要就是实现数据的同步,一般来讲是 2 - 3 个节点实现数据同步。对于 100% 数据可靠性解决方案,一般是采用 3 个节点。
集群搭建
- 创建文件夹目录
mkdir cluster
cd cluster/
mkdir rabbitmq01 rabbitmq02 rabbitmq03
- docker创建容器
docker run -d --hostname rabbitmq01 --name rabbitmqCluster01 -v /home/rabbitmq/cluster/rabbitmq01:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' rabbitmq:3.7-management
参数说明:
--hostname 指定rabbitmq主机名称
--name 指定docker容器名
-v 容器卷挂载,实现docker容器内的文件同步到主机
-p 指定 主机端口:容器端口 之间的映射
-e 指定环境变量
--link 实现两个容器间的互相通信
集群中 RABBITMQ_ERLANG_COOKIE 参数的值必须相同。
因为RabbitMQ是用Erlang实现的,Erlang Cookie相当于不同节点之间相互通讯的秘钥,Erlang节点通过交换Erlang Cookie获得认证。
docker run -d --hostname rabbitmq02 --name rabbitmqCluster02 -v /home/rabbitmq/cluster/rabbitmq02:/var/lib/rabbitmq -p 15673:15672 -p 5673:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' --link rabbitmqCluster01:rabbitmq01 rabbitmq:3.7-management
docker run -d --hostname rabbitmq03 --name rabbitmqCluster03 -v /home/rabbitmq/cluster/rabbitmq03:/var/lib/rabbitmq -p 15674:15672 -p 5674:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' --link rabbitmqCluster01:rabbitmq01 --link rabbitmqCluster02:rabbitmq02 rabbitmq:3.7-management
- 启动成功
可以分别访问一下管理界面的网址
http://192.168.9.219:15672
http://192.168.9.219:15673
http://192.168.9.219:15674
默认账号密码:guest/guest
- 容器节点加入集群
docker exec -it rabbitmqCluster01 bash #进入容器1
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit
docker exec -it rabbitmqCluster02 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbitmq01
rabbitmqctl start_app
exit
#参数“--ram”表示设置为内存节点,忽略此参数默认为磁盘节点。
#@rabbitmq01 表示被加入的主机名,这个主机名就是集群的名称,也是主节点
docker exec -it rabbitmqCluster03 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbitmq01
rabbitmqctl start_app
exit
至此,完成普通模式的集群。在管理界面中测试:
主节点rabbitmq01上新建一个Exchange,会立即同步到rabbitmq02、rabbitmq03。
- 实现镜像模式集群
在上面的基础上,再完成以下操作。
#在cluster中任意节点启用策略,策略会自动同步到集群节点
#此处进入第一个节点
docker exec -it rabbitmqCluster01 bash
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Definition: 镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode: 指明镜像队列的模式,有效值为 all/exactly/nodes
all: 表示在集群中所有的节点上进行镜像
exactly: 表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes: 表示在指定的节点上进行镜像,节点名称通过ha-params指定
ha-params: ha-mode模式需要用到的参数
ha-sync-mode: 进行队列中消息的同步方式,有效值为automatic和manual
priority: 可选参数,policy的优先级
#将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态保持一直。
#这行命令在vhost名称为hrsystem创建了一个策略,
#策略名称为ha-all,策略模式为 all 即复制到所有节点,
#包含新增节点,策略正则表达式为 “^” 表示所有匹配所有队列名称。
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
#清除策略
rabbitmqctl clear_policy ha-all
#策略的名称以”two”开始的队列镜像到群集中的任意两个节点,并进行自动同步:
rabbitmqctl set_policy ha-two "^two." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
#以”node”开头的队列镜像到集群中的特定节点的策略:
rabbitmqctl set_policy ha-nodes "^nodes." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
#在rabbitmq02主机中查看策略:
root@rabbitmq02:/ rabbitmqctl list_policies
Listing policies for vhost "/" ...
/ ha-all ^ all {"ha-mode":"all"} 0
rabbitmq 常用命令
rabbitmqctl list_queues:查看所有队列信息
rabbitmqctl stop_app:关闭应用(关闭当前启动的节点)
rabbitmqctl start_app:启动应用,和上述关闭命令配合使用,达到清空队列的目的
rabbitmqctl reset:从管理数据库中移除所有数据,例如配置过的用户和虚拟宿主, 删除所有持久化的消息(这个命令要在rabbitmqctl stop_app之后使用),重置以后,用户,虚拟vhost,都会清除
rabbitmqctl force_reset:作用和rabbitmqctl reset一样,区别是无条件重置节点,不管当前管理数据库状态以及集群的配置。如果数据库或者集群配置发生错误才使用这个最后的手段
rabbitmqctl status:节点状态
rabbitmqctl add_user username password:添加用户
rabbitmqctl list_users:列出所有用户
rabbitmqctl list_user_permissions username:列出用户权限
rabbitmqctl change_password username newpassword:修改密码
rabbitmqctl add_vhost vhostpath:创建虚拟主机
rabbitmqctl list_vhosts:列出所有虚拟主机
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*":设置用户权限
rabbitmqctl list_permissions -p vhostpath:列出虚拟主机上的所有权限
rabbitmqctl clear_permissions -p vhostpath username:清除用户权限
rabbitmqctl -p vhostpath purge_queue blue:清除队列里的消息
rabbitmqctl delete_user username:删除用户
rabbitmqctl delete_vhost vhostpath:删除虚拟主机
rabbitmqctl cluster_status 查看集群状态
#修改集群节点类型,使用此命令前要停止rabbitmq应用
rabbitmqctl change_cluster_node_type {disc|ram} node_name
#将节点重集群中删除,允许离线执行
rabbitmqctl forget_cluster_node [--offiine]
#在集群中的节点应用启动前咨询clusternode节点的最新信息,并更新
#相应的集群信息。这个和join_cluster不同,他不加入集群
rabbitmqctl update_cluster_nodes {clusternode}
#确保节点可以启动,即使他不是最后一个关闭的节点
rabbitmqctl force_boot
(集群中的节点相继宕机,如果要恢复之前的集群,则需要按照节点宕机的先后顺序,从后向前启动节点,因为最后宕机的节点数据最完善)
#设置集群名称。集群名称在客户端连接的时候回通报给客户端。
#集群名称默认是集群中第一个节点的名称
rabbitmqctl set_cluster_name {name}