*应用场景*
****秒杀(高并发,数据量大)
****1:**** *异步处理*
场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种1.串行的方式;2.并行的方式
(1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.
[图片上传失败...(image-b61cb2-1681699137681)]
RegisterController{
//线性的 100MS
public String register(){
emailService.register();//发邮件 50MS
smsService.register();//发短信 50MS
}
}
(2)并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。
[图片上传失败...(image-f7cdae-1681699137681)]
假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,英爱是写入数据库后就返回.
(3)消息队列
引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理
由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。
*2 应用解耦*
场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.
[图片上传失败...(image-faf3e5-1681699137681)]
public class OrderService(){
public void addOrder(){
if(stockService.hasStock(产品id)){//判断是否还有库存 故障 时间过长
//下订单
}
}
}
这种做法有一个缺点:
· 当库存系统出现故障时,订单就会失败。
· 订单系统和库存系统高耦合.
· 引入消息队列
[图片上传失败...(image-a3d9b5-1681699137681)]
· 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
· 库存系统:订阅下单的消息,获取下单消息,进行库操作。
· 就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失.
1:库存系统(数据库) 数据库的吞吐能力牵扯到IO,能力反而是最弱的。
2:无法把所有压力,全部交给数据库来实现。
3:下订单====》库存数量(Redis)====》最终订单要入库===》数据库的库存也要更改。
4:下订单====》查询和更改的Redis中的库存。
就认为下订单完成了。把信息放入消息队列中去。让消息队列慢慢的来执行。
=======》1:下订单(进入Mysql的数据库)
=======》2:更改数据库中的库存
3.**** *流量削峰*
流量削峰一般在秒杀活动中 应用广泛
场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:
1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^)
2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
[图片上传失败...(image-34c20e-1681699137681)]
1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.
2.秒杀业务根据消息队列中的请求信息,再做后续处理.
削峰填谷:
[图片上传失败...(image-daccfa-1681699137681)]
4.**** *系统架构*
几个概念说明:
Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息的载体,每个消息都会被投到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
Producer:消息生产者,就是投递消息的程序.
Consumer:消息消费者,就是接受消息的程序.
Channel:消息通道,在客户端的每个连接里,可建立多个channel.
常用命令
查看所有队列消息
rabbitmqctl list_queues
查看所有交换机
rabbitmqctl list_exchanges
添加用户
rabbitmqctl add_user admin 123456
注意:添加用户后login failed的问题,原因是未授权,还需执行下面的命令
admin赋予管理员权限
rabbitmqctl set_user_tags admin administrator
添加权限
rabbitmqctl set_permissions -p "/" username ".*" ".*" ".*"
列出用户权限
rabbitmqctl list_user_permissions username
# 在rabbitmq的内部数据库添加用户;
rabbitmqctl add_user <username> <password>
# 删除一个用户;
rabbitmqctl delete_user <username>
# 改变用户密码(也是改变web管理登陆密码);
rabbitmqctl change_password <username> <newpassword>
# 清除用户的密码,该用户将不能使用密码登陆,但是可以通过SASL登陆如果配置了SASL认证;
rabbitmqctl clear_password <username>
# 设置用户tags;
rabbitmqctl set_user_tags <username> <tag> ...
# 创建一个vhosts;
rabbitmqctl add_vhost <vhostpath>
# 删除一个vhosts;
rabbitmqctl delete_vhost <vhostpath>
# 列出vhosts;
rabbitmqctl list_vhosts [<vhostinfoitem> ...]
# 返回queue的信息,如果省略了-p参数,则默认显示的是"/"vhosts的信息;
rabbitmqctl list_queues [-p <vhostpath>] [<queueinfoitem> ...]
# 返回exchange的信息;
rabbitmqctl list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...]
# 返回绑定信息;
rabbitmqctl list_bindings [-p <vhostpath>] [<bindinginfoitem> ...]
# 返回链接信息;
rabbitmqctl list_connections [<connectioninfoitem> ...]
# 返回目前所有的channels;
rabbitmqctl list_channels [<channelinfoitem> ...]
消息中间件的比较:
[图片上传失败...(image-a68938-1681699137681)]
RabbitMQ 选型和对比
1.从社区活跃度
按照目前网络上的资料,RabbitMQ
、activeM
、ZeroMQ
三者中,综合来看,RabbitMQ
是首选。
2.持久化消息比较
ZeroMq
不支持,ActiveMq
和RabbitMq
都支持。持久化消息主要是指我们机器在不可抗力因素等情况下挂掉了,消息不会丢失的机制。
3.综合技术实现
可靠性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、插件系统等等。
RabbitMq
/ Kafka
最好,ActiveMq
次之,ZeroMq
最差。当然ZeroMq
也可以做到,不过自己必须手动写代码实现,代码量不小。尤其是可靠性中的:持久性、投递确认、发布者证实和高可用性。
4.高并发
毋庸置疑,RabbitMQ
最高,原因是它的实现语言是天生具备高并发高可用的erlang
语言。
5.比较关注的比较, RabbitMQ 和 Kafka
RabbitMq
比Kafka
成熟,在可用性上,稳定性上,可靠性上,RabbitMQ胜于 Kafka(理论上)。
另外,Kafka
的定位主要在日志等方面, 因为Kafka
设计的初衷就是处理日志的,可以看做是一个日志(消息)系统一个重要组件,针对性很强,所以 如果业务方面还是建议选择 RabbitMq
。
还有就是,Kafka
的性能(吞吐量、TPS
)比RabbitMq
要高出来很多。
Windows安装
MQ的其他产品:
ActiveMQ: Apache的JMS消息服务器。Kafka:分布式发布订阅消息系统。
一: RabbitMQ的安装
1、Erlang的安装
下载并运行Erlang for Windows 安装程序。下载地址http://www.erlang.org/downloads
2、RabbitMQ安装程序
运行RabbitMQ安装程序(下载地址http://www.rabbitmq.com/install-windows.html)
3、自定义环境变量
该服务将使用其默认设置正常运行。你可以自定义RabbitMQ环境或编辑配置。
(1)erl环境变量配置
ERLANG_HOME=C:\Program Files\erl10.3
在Path中加入
%ERLANG_HOME%\bin;
测试erl配置是否正确,开始-运行-cmd,输入erl
(2)RabbitMQ环境变量配置
RABBITMQ_SERVER=C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14
在Path中加入
%RABBITMQ_SERVER%\sbin;
4、激活rabbitmq_management
在CMD中键入如下命令
"C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin\rabbitmq-plugins.bat" enable rabbitmq_management
5、启动RabbitMQ服务
直接在命令行界面键入如下命令
net start RabbitMQ
关闭RabbitMQ服务命令如下:
net stop RabbitMQ
二、RabbitMQ测试
localhost:15672访问RabbitMQ的后台管理页面(初始化用户名和密码都是guest)
三、开发:
https://www.rabbitmq.com/getstarted.html
https://www.rabbitmq.com/tutorials/tutorial-one-java.html
Docker安装
docker search rabbitmq
版本
官网地址 https://hub.docker.com/_/rabbitmq
下载
docker pull rabbitmq:management
ps:带有alpine的是用最小linux镜像构建的,体积最小可以达5M初学者不建议这么折腾,而且 Alpine Linux使用了muslmusl实现的DNS服务不会使用resolv.conf文件中的search和domain两个配置,通过DNS来进行服务发现时需要注意。,带有-management的是带有web控制台
启动
创建容器并运行(15672是管理界面的端口,5672是服务的端口。这里顺便将管理系统的用户名和密码设置为admin ilovelaohan)
docker run -dit --name laohanmq \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=ilovelaohan \
-p 15672:15672 \
-p 5672:5672 \
rabbitmq:management
防火墙
端口号开启:
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
重启防火墙
firewall-cmd --reload
查询有哪些端口是开启的:
firewall-cmd --list-port
Docker的集群
一、为什么使用集群?
内建集群作为RabbitMQ最优秀的功能之一,它的作用有两个:
允许消费者和生产者在Rabbit节点崩溃的情况下继续运行;
通过增加节点来扩展Rabbit处理更多的消息,承载更多的业务量;
二、集群的特点
RabbitMQ的集群是由多个节点组成的,但我们发现不是每个节点都有所有队列的完全拷贝。
RabbitMQ节点不完全拷贝特性
为什么默认情况下RabbitMQ不将所有队列内容和状态复制到所有节点?
有两个原因:
存储空间——如果每个节点都拥有所有队列的完全拷贝,这样新增节点不但没有新增存储空间,反而增加了更多的冗余数据。
性能——如果消息的发布需安全拷贝到每一个集群节点,那么新增节点对网络和磁盘负载都会有增加,这样违背了建立集群的初衷,新增节点并没有提升处理消息的能力,最多是保持和单节点相同的性能甚至是更糟。
所以其他非所有者节点只知道队列的元数据,和指向该队列节点的指针。
三: RabbitMQ三种模式:
(1)单一模式。
(2)普通模式(默认的集群模式)。
(3) 镜像模式(把需要的队列做成镜像队列,存在于多个节点,属于RabbiMQ的HA方案,在对业务可靠性要求较高的场合中比较适用)。
要实现镜像模式,需要先搭建一个普通集群模式,在这个模式的基础上再配置镜像模式以实现高可用。
1:普通模式(默认的集群模式)。
默认的集群模式。需要创建多个 RabbitMQ 节点。但对于 Queue 和消息来说,只存在于其中一个节点,其他节点仅同步元数据,即队列的结构信息。
当消息进入Queue后,如果 Consumer 从创建Queue的这个节点消费消息时,可以直接取出来;但如果 consumer 连接的是其他节点,那 rabbitmq 会把 queue 中的消息从创建它的节点中取出并经过连接节点转发后再发送给consumer。
所以 consumer 应尽量连接每一个节点。并针对同一个逻辑队列,要在多个节点建立物理 Queue。否则无论 consumer 连接哪个节点,都会从创建 queue 的节点获取消息,会产生瓶颈。
特点:
(1)Exchange 的元数据信息在所有节点上是一致的,而 Queue(存放消息的队列)的完整数据则只会存在于创建它的那个节点上。其他节点只知道这个 queue 的 metadata 信息和一个指向 queue 的 owner node 的指针;
(2)RabbitMQ 集群会始终同步四种类型的内部元数据(类似索引):
a.队列元数据:队列名称和它的属性;
b.交换器元数据:交换器名称、类型和属性;
c.绑定元数据:一张简单的表格展示了如何将消息路由到队列;
d.vhost元数据:为 vhost 内的队列、交换器和绑定提供命名空间和安全属性;
因此,当用户访问其中任何一个 RabbitMQ 节点时,通过 rabbitmqctl 查询到的元数据信息都是相同的。
(3)无法实现高可用性,当创建 queue 的节点故障后,其他节点是无法取到消息实体的。如果做了消息持久化,那么得等创建 queue 的节点恢复后,才可以被消费。如果没有持久化的话,就会产生消息丢失的现象。
拉取镜像
docker pull rabbitmq:management
运行容器
docker run -dit --hostname rabbit_host1 --name rabbitmq1 \
-p 15672:15672 \
-p 5672:5672 \
-e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' \
rabbitmq:management
docker run -dit --hostname rabbit_host2 --name rabbitmq2 \
-p 5673:5672 \
-e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' \
--link rabbitmq1:rabbit_host1 \
rabbitmq:management
docker run -dit --hostname rabbit_host3 --name rabbitmq3 \
-p 5674:5672 \
-e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' \
--link rabbitmq1:rabbit_host1 \
--link rabbitmq2:rabbit_host2 \
rabbitmq:management
主要参数:
-p 15672:15672 management 界面管理访问端口
-p 5672:5672 amqp 访问端口
--link 容器之间连接
Erlang Cookie 值必须相同,也就是一个集群内 RABBITMQ_ERLANG_COOKIE 参数的值必须相同。因为 RabbitMQ 是用Erlang实现的,Erlang Cookie 相当于不同节点之间通讯的密钥,Erlang节点通过交换 Erlang Cookie 获得认证。
加入节点到集群
设置节点1:
docker exec -it rabbitmq1 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
设置节点2,加入到集群:
#docker run -d --hostname rabbit_host1 --name rabbitmq1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' rabbitmq:management
#docker run -d --hostname rabbit_host2 --name rabbitmq2 -p 5673:5672 --link rabbitmq1:rabbit_host1 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' rabbitmq:management
#docker run -d --hostname rabbit_host3 --name rabbitmq3 -p 5674:5672 --link rabbitmq1:rabbit_host1 --link rabbitmq2:rabbit_host2 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' rabbitmq:management
docker run -dit --hostname rabbit_host1 --name rabbitmq1 \
-p 15672:15672 \
-p 5672:5672 \
-e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' \
rabbitmq:management
docker run -dit --hostname rabbit_host2 --name rabbitmq2 \
-p 5673:5672 \
-e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' \
--link rabbitmq1:rabbit_host1 \
rabbitmq:management
docker run -dit --hostname rabbit_host3 --name rabbitmq3 \
-p 5674:5672 \
-e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' \
--link rabbitmq1:rabbit_host1 \
--link rabbitmq2:rabbit_host2 \
rabbitmq:management
erlang-cookie
docker exec -it rabbitmq1 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit
docker exec -it rabbitmq2 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbitmq_host1
rabbitmqctl start_app
exit
/etc/hosts 中把各个主机节点的hostname 加上
127.0.0.1 rabbitmq_host1
127.0.0.1 rabbitmq_host2
127.0.0.1 rabbitmq_host3
service network restart
Centos安装
1:准备工作:
安装编译工具 如果是最小化安装,那么缺少一些最基础的东西。
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
官网下载地址
http://erlang.org/download/
官方下载地址:http://erlang.org/download/otp_src_18.3.tar.gz
2: 安装包及依赖下载,
Rabbitmq安装主要依赖两个rpm, erlang和socat
使用rpm命令安装,也可以使用yum install *.rpm安装。
sudo rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm
测试是否安装成功:
erl 命令
sudo rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
sudo rpm -ivh rabbitmq-server-3.7.8-1.el7.noarch.rpm
在安装rabbitmq时提示依赖socat
=================================
3: 启动和关闭:
/sbin/service rabbitmq-server stop #关闭
/sbin/service rabbitmq-server start #启动
/sbin/service rabbitmq-server --ram start #启动 消息内存
默认磁盘存储
参数“--ram”表示设置为内存节点,忽略次参数默认为磁盘节点
/sbin/service rabbitmq-server status #状态
执行sudo service rabbitmq-server start
=================================
4: cd 到/sbin目录下,查看插件
./rabbitmq-plugins list
查看完了,可以查看状态
./rabbitmqctl status
=================================
5:执行命令
rabbitmq-plugins enable rabbitmq_management,打开rabbitmq web管理。
=================================
6:增加用户
rabbitmqctl add_user admin ilovelaohan
rabbitmqctl set_user_tags admin administrator,设置admin为管理员权限
sudo rabbitmqctl set_permissions -p / admin '.' '.' '.*'
rabbitmqctl list_users
添加rabbitmq固定节点名字,保证数据文件不变,添加新用户
echo 'NODENAME=rabbit@r1' | sudo tee -a /etc/rabbitmq/rabbitmq-env.conf
查看集群状态
rabbitmqctl cluster_status
7:防火墙:
sudo service firewalld start
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=25672/tcp --permanent
firewall-cmd --zone=public --add-port=4369/tcp --permanent
iptables -I INPUT -p tcp --dport 4369 --syn -j ACCEPT
iptables -I INPUT -p tcp --dport 59984 --syn -j ACCEPT
sudo service firewalld start(如果系统不要求开启防火墙,可以在设置完以后再关闭它)
备注:即使防火墙处于关闭状态,也应该先打开端口再关闭,否则在有些机器上会仍然端口不通。
8:自启动
chkconfig rabbitmq-server on
RabbitMQ用户角色及权限控制
#######################
用户角色
#######################
RabbitMQ的用户角色分类:
none、management、policymaker、monitoring、administrator
RabbitMQ各类角色描述:
none
不能访问 management plugin
management
用户可以通过AMQP做的任何事外加:
列出自己可以通过AMQP登入的virtual hosts
查看自己的virtual hosts中的queues, exchanges 和 bindings
查看和关闭自己的channels 和 connections
查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。
policymaker
management可以做的任何事外加:
查看、创建和删除自己的virtual hosts所属的policies和parameters
monitoring
management可以做的任何事外加:
列出所有virtual hosts,包括他们不能登录的virtual hosts
查看其他用户的connections和channels
查看节点级别的数据如clustering和memory使用情况
查看真正的关于所有virtual hosts的全局的统计信息
administrator
policymaker和monitoring可以做的任何事外加:
创建和删除virtual hosts
查看、创建和删除users
查看创建和删除permissions
关闭其他用户的connections
创建用户并设置角色:
可以创建管理员用户,负责整个MQ的运维,例如:
[plain] view plain copy
在CODE上查看代码片派生到我的代码片
$sudo rabbitmqctl add_user user_admin passwd_admin
赋予其administrator角色:
[plain] view plain copy
在CODE上查看代码片派生到我的代码片
$sudo rabbitmqctl set_user_tags user_admin administrator
可以创建RabbitMQ监控用户,负责整个MQ的监控,例如:
[plain] view plain copy
在CODE上查看代码片派生到我的代码片
$sudo rabbitmqctl add_user user_monitoring passwd_monitor
赋予其monitoring角色:
[plain] view plain copy
在CODE上查看代码片派生到我的代码片
$sudo rabbitmqctl set_user_tags user_monitoring monitoring
可以创建某个项目的专用用户,只能访问项目自己的virtual hosts
[plain] view plain copy
在CODE上查看代码片派生到我的代码片
$sudo rabbitmqctl add_user user_proj passwd_proj
赋予其monitoring角色:
[plain] view plain copy
在CODE上查看代码片派生到我的代码片
$sudo rabbitmqctl set_user_tags user_proj management
创建和赋角色完成后查看并确认:
[plain] view plain copy
在CODE上查看代码片派生到我的代码片
$sudo rabbitmqctl list_users
########################
RabbitMQ 权限控制:
########################
默认virtual host:"/"
默认用户:guest
guest具有"/"上的全部权限,仅能有localhost访问RabbitMQ包括Plugin,建议删除或更改密码。可通过将配置文件中loopback_users置孔来取消其本地访问的限制:
[{rabbit, [{loopback_users, []}]}]
用户仅能对其所能访问的virtual hosts中的资源进行操作。这里的资源指的是virtual hosts中的exchanges、queues等,操作包括对资源进行配置、写、读。配置权限可创建、删除、资源并修改资源的行为,写权限可向资源发送消息,读权限从资源获取消息。比如:
exchange和queue的declare与delete分别需要exchange和queue上的配置权限
exchange的bind与unbind需要exchange的读写权限
queue的bind与unbind需要queue写权限exchange的读权限
发消息(publish)需exchange的写权限
获取或清除(get、consume、purge)消息需queue的读权限
对何种资源具有配置、写、读的权限通过正则表达式来匹配,具体命令如下:
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
其中,<conf> <write> <read>的位置分别用正则表达式来匹配特定的资源,如'^(amq.gen.*|amq.default)'不匹配任何资源
需要注意的是RabbitMQ会缓存每个connection或channel的权限验证结果、因此权限发生变化后需要重连才能生效。
为用户赋权:
[plain] view plain copy
在CODE上查看代码片派生到我的代码片
$sudo rabbitmqctl set_permissions -p /vhost1 user_admin '.' '.' '.*'
该命令使用户user_admin具有/vhost1这个virtual host中所有资源的配置、写、读权限以便管理其中的资源
服务器地址:
xawn.f3322.net
web: 15672
代码: 5672
sudo rabbitmqctl add_user team1 2342432
sudo rabbitmqctl set_user_tags team1 management
sudo rabbitmqctl add_user team2 sfdadfsf
sudo rabbitmqctl set_user_tags team2 management
sudo rabbitmqctl add_user team3 sdfsdfsdf
sudo rabbitmqctl set_user_tags team3 management
sudo rabbitmqctl add_user team4 sdfsdfdf
sudo rabbitmqctl set_user_tags team4 management
sudo rabbitmqctl set_permissions -p / team1 '.' '.' '.*'
5、修改rabbitmq.conf,使外部可以访问
cp /usr/share/doc/rabbitmq-server-3.6.6/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
进入rabbit文件夹下:cd /etc/rabbitmq
修改rabbitmq.config文件,使外部可以访问;vim rabbitmq.config
讲 %% {loopback_users, []},
改为:{loopback_users, []}
安装插件:
/sbin/rabbitmq-plugins enable rabbitmq_management
{tcp_listeners, [5672]}, {loopback_users, ["admin"]},
通过命令查看rabbitmq的状态发现
./rabbitmqctl status
结果
Status of node rabbit@localhost
[{pid,4151},
?{running_applications,
? ? ?[{rabbitmq_management,"RabbitMQ Management Console","3.6.11"},
? ? ? {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.11"},
? ? ? {rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.11"},
? ? ? {rabbit,"RabbitMQ","3.6.11"},
? ? ? {os_mon,"CPO ?CXC 138 46","2.2.14"},
? ? ? {cowboy,"Small, fast, modular HTTP server.","1.0.4"},
? ? ? {ranch,"Socket acceptor pool for TCP protocols.","1.3.0"},
? ? ? {ssl,"Erlang/OTP SSL application","5.3.3"},
? ? ? {public_key,"Public key infrastructure","0.21"},
? ? ? {cowlib,"Support library for manipulating Web protocols.","1.0.2"},
? ? ? {crypto,"CRYPTO version 2","3.2"},
? ? ? {amqp_client,"RabbitMQ AMQP Client","3.6.11"},
? ? ? {rabbit_common,
? ? ? ? ? "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
? ? ? ? ? "3.6.11"},
? ? ? {inets,"INETS ?CXC 138 49","5.9.8"},
? ? ? {mnesia,"MNESIA ?CXC 138 12","4.11"},
? ? ? {compiler,"ERTS ?CXC 138 10","4.9.4"},
? ? ? {xmerl,"XML parser","1.3.6"},
? ? ? {syntax_tools,"Syntax tools","1.6.13"},
? ? ? {asn1,"The Erlang ASN1 compiler version 2.0.4","2.0.4"},
? ? ? {sasl,"SASL ?CXC 138 11","2.3.4"},
? ? ? {stdlib,"ERTS ?CXC 138 10","1.19.4"},
? ? ? {kernel,"ERTS ?CXC 138 10","2.16.4"}]},
?{os,{unix,linux}},
?{erlang_version,
? ? ?"Erlang R16B03-1 (erts-5.10.4) [source] [64-bit] [smp:4:4] [async-threads:64] [hipe] [kernel-poll:true]\n"},
?{memory,
? ? ?[{connection_readers,0},
? ? ? {connection_writers,0},
? ? ? {connection_channels,0},
? ? ? {connection_other,2800},
? ? ? {queue_procs,82208},
? ? ? {queue_slave_procs,0},
? ? ? {plugins,406168},
? ? ? {other_proc,23722384},
? ? ? {metrics,142848},
? ? ? {mgmt_db,139192},
? ? ? {mnesia,65240},
? ? ? {other_ets,2331656},
? ? ? {binary,802256},
? ? ? {msg_index,87440},
? ? ? {code,26963978},
? ? ? {atom,992409},
? ? ? {other_system,19562285},
? ? ? {total,75300864}]},
?{alarms,[]},
?{listeners,[{clustering,25672,"::"},{amqp,5672,"0.0.0.0"},{http,15672,"::"}]},
?{vm_memory_calculation_strategy,rss},
?{vm_memory_high_watermark,0.4},
?{vm_memory_limit,1583901900},
?{disk_free_limit,50000000},
?{disk_free,10830860288},
?{file_descriptors,
? ? ?[{total_limit,924},{total_used,3},{sockets_limit,829},{sockets_used,0}]},
?{processes,[{limit,1048576},{used,332}]},
?{run_queue,0},
?{uptime,11},
?{kernel,{net_ticktime,60}}]
其中
{listeners,[{clustering,25672,"::"},{amqp,5672,"0.0.0.0"},{http,15672,"::"}]},
这是修改后的
{amqp,5672,"0.0.0.0"}
修改前
{amqp,5672,"127.0.0.1"}
amqp绑定死了本地ip
通过修改rabbitmq-env.conf配置文件:/etc/rabbitmq/rabbitmq-env.conf
vim /etc/rabbitmq/rabbitmq-env.conf
NODENAME=rabbit@localhost
NODE_IP_ADDRESS=127.0.0.1
NODE_IP_ADDRESS=0.0.0.0
NODE_IP_ADDRESS修改成0.0.0.0,也可以设置为空值,表示匹配所有ip
rabbitmqctl change_password guest ilovelaohan
Centos下的集群
一、为什么使用集群?
内建集群作为RabbitMQ最优秀的功能之一,它的作用有两个:
允许消费者和生产者在Rabbit节点崩溃的情况下继续运行;
通过增加节点来扩展Rabbit处理更多的消息,承载更多的业务量;
二、集群的特点
RabbitMQ的集群是由多个节点组成的,但我们发现不是每个节点都有所有队列的完全拷贝。
RabbitMQ节点不完全拷贝特性
为什么默认情况下RabbitMQ不将所有队列内容和状态复制到所有节点?
有两个原因:
存储空间——如果每个节点都拥有所有队列的完全拷贝,这样新增节点不但没有新增存储空间,反而增加了更多的冗余数据。
性能——如果消息的发布需安全拷贝到每一个集群节点,那--------------------------么新增节点对网络和磁盘负载都会有增加,这样违背了建立集群的初衷,新增节点并没有提升处理消息的能力,最多是保持和单节点相同的性能甚至是更糟。
所以其他非所有者节点只知道队列的元数据,和指向该队列节点的指针。
三: RabbitMQ模式大概分为以下三种:
(1)单一模式。
(2)普通模式(默认的集群模式)。
(3) 镜像模式(把需要的队列做成镜像队列,存在于多个节点,属于RabbiMQ的HA方案,在对业务可靠性要求较高的场合中比较适用)。
要实现镜像模式,需要先搭建一个普通集群模式,在这个模式的基础上再配置镜像模式以实现高可用。
普通模式
具体步骤:
1:配置hosts文件
更改两台MQ节点的计算机名分别为r1、r2,然后修改hosts配置文件。
vi /etc/hostname //其他两台相同
r1.localdomain
2:修改配置能够互相PING通。
vi /etc/hosts
192.168.2.101 r1 //注意不能带.注意-主机名称也要更改
192.168.2.102 r2
添加rabbitmq固定节点名字,保证数据文件不变,添加新用户
echo 'NODENAME=rabbit@r1' | sudo tee -a /etc/rabbitmq/rabbitmq-env.conf
3.拷贝erlang.cookie
Rabbitmq的集群是依附于erlang的集群来工作的,所以必须先构建起erlang的集群镜像。Erlang的集群中各节点是经由过程一个magic cookie来实现的,这个cookie存放在/var/lib/rabbitmq/.erlang.cookie中,文件是400的权限。所以必须保证各节点cookie一致,不然节点之间就无法通信。
cat /var/lib/rabbitmq/.erlang.cookie
QZERHXYJPHUNILXVQOQO
用scp的方式将r1节点的.erlang.cookie的值复制到其他节点中。
1:停服务
2:再拷贝
scp /var/lib/rabbitmq/.erlang.cookie root@192.168.2.102:/var/lib/rabbitmq/.erlang.cookie
4:将r2作为内存节点加入r1节点集群中
在r2执行如下命令:
rabbitmqctl stop_app //停掉rabbit应用
rabbitmqctl join_cluster --ram rabbit@r1 //加入到磁盘节点
rabbitmqctl join_cluster rabbit@r1
rabbitmqctl start_app //启动rabbit应用
5:查看集群状态
rabbitmqctl cluster_status
在RabbitMQ集群集群中,必须至少有一个磁盘节点,否则队列元数据无法写入到集群中,当磁盘节点宕掉时,集群将无法写入新的队列元数据信息。
第一、成功的搭建了haproxy+rabbitmq集群,确认集群肯定没问题的。
第二、3个集群节点中两个ram和一个disc。
第三、目前java程序的写法是往第一个ram节点发送消息,再从第二个ram节点接收消息,disc节点只做同步不做操作。
Hostname mismatch: node "rabbit@localhost" believes its host is different. Please ensure that hostnames resolve the same way locally and on "rabbit@localhost"
于是修改rabbitmq-env.conf配置文件(rabbitmq默认路径:/etc/rabbitmq/rabbitmq-env.conf)
在集群每台机器上执行 vim /etc/rabbitmq/rabbitmq-env.conf(该文件默认不存在,需手动添加),添加配置如下
[root@master rabbitmq]# vi /etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_NODENAME=rabbit@rabbit-node1
.安装RabbitMQ集群十步走!(3.6.5版本、采用rpm安装方式,我们要安装3个节点的集群,192.168.11.71 192.168.11.72 192.168.11.73、以71节点为例:)
第一步:下载所需依赖包
Rabbitmq基于erlang语言编写,所以必须要有。
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
socat包为秘钥包,如果没有会提示缺少此依赖。
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
rabbitmq-server-3.6.5核心服务包。
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
第二步:安装并配置
安装命令:
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
安装成功后:修改配置
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app文件
修改:loopback_users 中的 <<"guest">>,只保留guest
第三步:3个节点同时进行前两步骤操作:
第四步:启停单点服务与开启管控台
/etc/init.d/rabbitmq-server start | stop | status | restart
分别启动三个节点,然后执行命令启动控制台插件如下:
rabbitmq-plugins enable rabbitmq_management最后使用 guest/guest登录成功即可!
第五步:接下来进行集群构建:
71、72、73任意一个节点为Master(这里选择71为Master)
也就是说我们需要把71的Cookie文件同步到72、73节点上去,先停止所有服务器:/etc/init.d/rabbitmq-server stop
然后进入制定目录(/var/lib/rabbitmq/)并远程copy 文件到72、73节点,如下操作:scp /var/lib/rabbitmq/.erlang.cookie到192.168.11.72和192.168.11.73中
第六步:组成集群
首先启动三个节点:rabbitmq-server -detached
然后把72和73分别加入到71中,组成集群 [--ram]为节点以什么方式加入到集群中
ram为内存 存储 默认不加为disk磁盘存储,操作如下:
node72:rabbitmqctl stop_app
node72:rabbitmqctl join_cluster [--ram] rabbit@bhz71
node72:rabbitmqctl start_app
第七步:修改集群名称:
rabbitmqctl set_cluster_name rabbitmq_cluster1
第八步:查看集群状态:
rabbitmqctl cluster_status ,
第九步:构建镜像队列,任意节点执行命令如下:
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
第十步:查看管控台发现集群已经构建成功!
RabbitMQ与SpringBoot2.x整合
生产者端:
第一步:pom.xml配置如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bfxy</groupId>
<artifactId>rabbitmq-springboot-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>rabbitmq-springboot-producer</name>
<description>rabbitmq-springboot-producer</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
第二步:application.properties配置文件
spring.rabbitmq.addresses=192.168.11.71:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.publisher-confirms=true #confirm模式
spring.rabbitmq.publisher-returns=true #return机制
spring.rabbitmq.template.mandatory=true #与return机制结合配置次属性
第三步:编写RabbitSender生产端代码
package com.example.springboot.producer;
import java.util.Map;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class RabbitSender {
//自动注入RabbitTemplate模板类
@Autowired
private RabbitTemplate rabbitTemplate;
//回调函数: confirm确认
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
System.err.println("ack: " + ack);
if(!ack){
System.err.println("异常处理....");
}
}
};
//回调函数: return返回
final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.err.println("return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
}
};
//发送消息方法调用: 构建Message消息
public void send(Object message, Map<String, Object> properties) throws Exception {
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 时间戳 全局唯一
CorrelationData correlationData = new CorrelationData("1234567890");
rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
}
}
消费者:
第一步:pom文件同生产者一致
第二步:application.properties配置文件
spring.rabbitmq.addresses=192.168.11.76:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.listener.simple.acknowledge-mode=manual #手工签收
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
第三步:RabbitRecever消费端代码
package com.example.springboot.conusmer;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
public class RabbitReceiver {@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue-1",
durable="true"),
exchange = @Exchange(value = "exchange-1",
durable="true",
type= "topic",
ignoreDeclarationExceptions = "true"),
key = "springboot.*"
)
)
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端Payload: " + message.getPayload());
Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
}
生产端测试:
package com.example.springboot;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.bfxy.springboot.producer.RabbitSender;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Test
public void contextLoads() {
}
@Autowired
private RabbitSender rabbitSender;
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Test
public void testSender1() throws Exception {
Map<String, Object> properties = new HashMap<>();
properties.put("number", "12345");
properties.put("send_time", simpleDateFormat.format(new Date()));
rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
}