rabbitmq 应用场景与安装

*应用场景*

****秒杀(高并发,数据量大)

****1:**** *异步处理*

场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种1.串行的方式;2.并行的方式

(1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.

[图片上传失败...(image-b61cb2-1681699137681)]

RegisterController{
    //线性的 100MS
    public String register(){
        emailService.register();//发邮件 50MS 
        smsService.register();//发短信   50MS
    }

}

(2)并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。

[图片上传失败...(image-f7cdae-1681699137681)]

假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,英爱是写入数据库后就返回.

(3)消息队列

引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理

由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。

*2 应用解耦*

场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.

[图片上传失败...(image-faf3e5-1681699137681)]

public class OrderService(){

    public void addOrder(){
        if(stockService.hasStock(产品id)){//判断是否还有库存  故障  时间过长
            //下订单
        }
    }

}

这种做法有一个缺点:

· 当库存系统出现故障时,订单就会失败。

· 订单系统和库存系统高耦合.

· 引入消息队列

[图片上传失败...(image-a3d9b5-1681699137681)]

· 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

· 库存系统:订阅下单的消息,获取下单消息,进行库操作。

· 就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失.

1:库存系统(数据库) 数据库的吞吐能力牵扯到IO,能力反而是最弱的。
2:无法把所有压力,全部交给数据库来实现。
3:下订单====》库存数量(Redis)====》最终订单要入库===》数据库的库存也要更改。
4:下订单====》查询和更改的Redis中的库存。
    就认为下订单完成了。把信息放入消息队列中去。让消息队列慢慢的来执行。
        =======》1:下订单(进入Mysql的数据库)
        =======》2:更改数据库中的库存

3.**** *流量削峰*

流量削峰一般在秒杀活动中 应用广泛

场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。

作用:

1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^)

2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

[图片上传失败...(image-34c20e-1681699137681)]

1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.

2.秒杀业务根据消息队列中的请求信息,再做后续处理.

削峰填谷:

[图片上传失败...(image-daccfa-1681699137681)]

4.**** *系统架构*

几个概念说明:

Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

Queue:消息的载体,每个消息都会被投到一个或多个队列。

Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.

Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。

Producer:消息生产者,就是投递消息的程序.

Consumer:消息消费者,就是接受消息的程序.

Channel:消息通道,在客户端的每个连接里,可建立多个channel.

常用命令

查看所有队列消息
rabbitmqctl list_queues


查看所有交换机

rabbitmqctl list_exchanges
 

添加用户
rabbitmqctl add_user admin 123456
注意:添加用户后login failed的问题,原因是未授权,还需执行下面的命令

 

admin赋予管理员权限
rabbitmqctl set_user_tags admin administrator

 

添加权限
rabbitmqctl set_permissions -p "/" username ".*" ".*" ".*"

 

列出用户权限
rabbitmqctl list_user_permissions username

# 在rabbitmq的内部数据库添加用户;
rabbitmqctl add_user <username> <password>  

# 删除一个用户;
rabbitmqctl delete_user <username>  

# 改变用户密码(也是改变web管理登陆密码);
rabbitmqctl change_password <username> <newpassword>  

# 清除用户的密码,该用户将不能使用密码登陆,但是可以通过SASL登陆如果配置了SASL认证;
rabbitmqctl clear_password <username> 

# 设置用户tags;
rabbitmqctl set_user_tags <username> <tag> ...

# 创建一个vhosts;
rabbitmqctl add_vhost <vhostpath>  

# 删除一个vhosts;
rabbitmqctl delete_vhost <vhostpath>  

# 列出vhosts;
rabbitmqctl list_vhosts [<vhostinfoitem> ...]  

# 返回queue的信息,如果省略了-p参数,则默认显示的是"/"vhosts的信息;
rabbitmqctl list_queues [-p <vhostpath>] [<queueinfoitem> ...]  
 
# 返回exchange的信息;
rabbitmqctl list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...]  
 
# 返回绑定信息;
rabbitmqctl list_bindings [-p <vhostpath>] [<bindinginfoitem> ...] 
 
# 返回链接信息;
rabbitmqctl list_connections [<connectioninfoitem> ...]  
 
# 返回目前所有的channels;
rabbitmqctl list_channels [<channelinfoitem> ...]  

消息中间件的比较:

[图片上传失败...(image-a68938-1681699137681)]

RabbitMQ 选型和对比

1.从社区活跃度

按照目前网络上的资料,RabbitMQactiveMZeroMQ 三者中,综合来看,RabbitMQ 是首选。

2.持久化消息比较

ZeroMq 不支持,ActiveMqRabbitMq 都支持。持久化消息主要是指我们机器在不可抗力因素等情况下挂掉了,消息不会丢失的机制。

3.综合技术实现

可靠性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、插件系统等等。

RabbitMq / Kafka 最好,ActiveMq 次之,ZeroMq 最差。当然ZeroMq 也可以做到,不过自己必须手动写代码实现,代码量不小。尤其是可靠性中的:持久性、投递确认、发布者证实和高可用性。

4.高并发

毋庸置疑,RabbitMQ 最高,原因是它的实现语言是天生具备高并发高可用的erlang 语言。

5.比较关注的比较, RabbitMQ 和 Kafka

RabbitMqKafka 成熟,在可用性上,稳定性上,可靠性上,RabbitMQ胜于 Kafka(理论上)。

另外,Kafka 的定位主要在日志等方面, 因为Kafka 设计的初衷就是处理日志的,可以看做是一个日志(消息)系统一个重要组件,针对性很强,所以 如果业务方面还是建议选择 RabbitMq

还有就是,Kafka 的性能(吞吐量、TPS )比RabbitMq 要高出来很多。

Windows安装

MQ的其他产品:
ActiveMQ: Apache的JMS消息服务器。Kafka:分布式发布订阅消息系统。

一: RabbitMQ的安装
1、Erlang的安装
下载并运行Erlang for Windows 安装程序。下载地址http://www.erlang.org/downloads

2、RabbitMQ安装程序
运行RabbitMQ安装程序(下载地址http://www.rabbitmq.com/install-windows.html

3、自定义环境变量
该服务将使用其默认设置正常运行。你可以自定义RabbitMQ环境或编辑配置。
(1)erl环境变量配置

ERLANG_HOME=C:\Program Files\erl10.3

在Path中加入

%ERLANG_HOME%\bin;

测试erl配置是否正确,开始-运行-cmd,输入erl

(2)RabbitMQ环境变量配置
RABBITMQ_SERVER=C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14

在Path中加入

%RABBITMQ_SERVER%\sbin;

4、激活rabbitmq_management
在CMD中键入如下命令

"C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin\rabbitmq-plugins.bat" enable rabbitmq_management

5、启动RabbitMQ服务
直接在命令行界面键入如下命令

net start RabbitMQ

关闭RabbitMQ服务命令如下:

net stop RabbitMQ

二、RabbitMQ测试
localhost:15672访问RabbitMQ的后台管理页面(初始化用户名和密码都是guest)

三、开发:
https://www.rabbitmq.com/getstarted.html
https://www.rabbitmq.com/tutorials/tutorial-one-java.html

Docker安装

docker search rabbitmq

版本

官网地址 https://hub.docker.com/_/rabbitmq

下载

 docker pull rabbitmq:management

ps:带有alpine的是用最小linux镜像构建的,体积最小可以达5M初学者不建议这么折腾,而且 Alpine Linux使用了muslmusl实现的DNS服务不会使用resolv.conf文件中的search和domain两个配置,通过DNS来进行服务发现时需要注意。,带有-management的是带有web控制台

启动

创建容器并运行(15672是管理界面的端口,5672是服务的端口。这里顺便将管理系统的用户名和密码设置为admin ilovelaohan)

docker run -dit --name laohanmq \
    -e RABBITMQ_DEFAULT_USER=admin \
    -e RABBITMQ_DEFAULT_PASS=ilovelaohan \
    -p 15672:15672 \
    -p 5672:5672 \
    rabbitmq:management

防火墙

端口号开启:

firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent

重启防火墙
firewall-cmd --reload

查询有哪些端口是开启的:
firewall-cmd --list-port

Docker的集群

一、为什么使用集群?

内建集群作为RabbitMQ最优秀的功能之一,它的作用有两个:

允许消费者和生产者在Rabbit节点崩溃的情况下继续运行;
通过增加节点来扩展Rabbit处理更多的消息,承载更多的业务量;

二、集群的特点

RabbitMQ的集群是由多个节点组成的,但我们发现不是每个节点都有所有队列的完全拷贝。

RabbitMQ节点不完全拷贝特性

为什么默认情况下RabbitMQ不将所有队列内容和状态复制到所有节点?

有两个原因:

存储空间——如果每个节点都拥有所有队列的完全拷贝,这样新增节点不但没有新增存储空间,反而增加了更多的冗余数据。
性能——如果消息的发布需安全拷贝到每一个集群节点,那么新增节点对网络和磁盘负载都会有增加,这样违背了建立集群的初衷,新增节点并没有提升处理消息的能力,最多是保持和单节点相同的性能甚至是更糟。
所以其他非所有者节点只知道队列的元数据,和指向该队列节点的指针。

三: RabbitMQ三种模式:

(1)单一模式。
(2)普通模式(默认的集群模式)。
(3) 镜像模式(把需要的队列做成镜像队列,存在于多个节点,属于RabbiMQ的HA方案,在对业务可靠性要求较高的场合中比较适用)。
要实现镜像模式,需要先搭建一个普通集群模式,在这个模式的基础上再配置镜像模式以实现高可用。

1:普通模式(默认的集群模式)。

默认的集群模式。需要创建多个 RabbitMQ 节点。但对于 Queue 和消息来说,只存在于其中一个节点,其他节点仅同步元数据,即队列的结构信息。
当消息进入Queue后,如果 Consumer 从创建Queue的这个节点消费消息时,可以直接取出来;但如果 consumer 连接的是其他节点,那 rabbitmq 会把 queue 中的消息从创建它的节点中取出并经过连接节点转发后再发送给consumer。
所以 consumer 应尽量连接每一个节点。并针对同一个逻辑队列,要在多个节点建立物理 Queue。否则无论 consumer 连接哪个节点,都会从创建 queue 的节点获取消息,会产生瓶颈。

特点:

(1)Exchange 的元数据信息在所有节点上是一致的,而 Queue(存放消息的队列)的完整数据则只会存在于创建它的那个节点上。其他节点只知道这个 queue 的 metadata 信息和一个指向 queue 的 owner node 的指针;

(2)RabbitMQ 集群会始终同步四种类型的内部元数据(类似索引):
  a.队列元数据:队列名称和它的属性;
  b.交换器元数据:交换器名称、类型和属性;
  c.绑定元数据:一张简单的表格展示了如何将消息路由到队列;
  d.vhost元数据:为 vhost 内的队列、交换器和绑定提供命名空间和安全属性;
  因此,当用户访问其中任何一个 RabbitMQ 节点时,通过 rabbitmqctl 查询到的元数据信息都是相同的。

(3)无法实现高可用性,当创建 queue 的节点故障后,其他节点是无法取到消息实体的。如果做了消息持久化,那么得等创建 queue 的节点恢复后,才可以被消费。如果没有持久化的话,就会产生消息丢失的现象。
拉取镜像
docker pull rabbitmq:management
运行容器
docker run -dit --hostname rabbit_host1 --name rabbitmq1 \
    -p 15672:15672 \
    -p 5672:5672 \
    -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' \
    rabbitmq:management

docker run -dit --hostname rabbit_host2 --name rabbitmq2 \
    -p 5673:5672 \
    -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' \
    --link rabbitmq1:rabbit_host1 \
    rabbitmq:management

docker run -dit --hostname rabbit_host3 --name rabbitmq3 \
    -p 5674:5672 \
    -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' \
    --link rabbitmq1:rabbit_host1 \
    --link rabbitmq2:rabbit_host2 \
    rabbitmq:management
主要参数:
-p 15672:15672 management 界面管理访问端口
-p 5672:5672 amqp 访问端口
--link 容器之间连接

Erlang Cookie 值必须相同,也就是一个集群内 RABBITMQ_ERLANG_COOKIE 参数的值必须相同。因为 RabbitMQ 是用Erlang实现的,Erlang Cookie 相当于不同节点之间通讯的密钥,Erlang节点通过交换 Erlang Cookie 获得认证。
加入节点到集群
设置节点1:
docker exec -it rabbitmq1 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
设置节点2,加入到集群:
#docker run -d --hostname rabbit_host1 --name rabbitmq1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' rabbitmq:management
#docker run -d --hostname rabbit_host2 --name rabbitmq2 -p 5673:5672 --link rabbitmq1:rabbit_host1 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' rabbitmq:management
#docker run -d --hostname rabbit_host3 --name rabbitmq3 -p 5674:5672 --link rabbitmq1:rabbit_host1 --link rabbitmq2:rabbit_host2 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' rabbitmq:management


docker run -dit --hostname rabbit_host1 --name rabbitmq1 \
    -p 15672:15672 \
    -p 5672:5672 \
    -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' \
    rabbitmq:management

docker run -dit --hostname rabbit_host2 --name rabbitmq2 \
    -p 5673:5672 \
    -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' \
    --link rabbitmq1:rabbit_host1 \
    rabbitmq:management

docker run -dit --hostname rabbit_host3 --name rabbitmq3 \
    -p 5674:5672 \
    -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' \
    --link rabbitmq1:rabbit_host1 \
    --link rabbitmq2:rabbit_host2 \
    rabbitmq:management


erlang-cookie




docker exec -it rabbitmq1 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit

docker exec -it rabbitmq2 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbitmq_host1
rabbitmqctl start_app
exit


/etc/hosts 中把各个主机节点的hostname 加上


127.0.0.1 rabbitmq_host1
127.0.0.1 rabbitmq_host2
127.0.0.1 rabbitmq_host3

service network restart


Centos安装

1:准备工作:

安装编译工具 如果是最小化安装,那么缺少一些最基础的东西。
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel

官网下载地址
http://erlang.org/download/

官方下载地址:http://erlang.org/download/otp_src_18.3.tar.gz

2: 安装包及依赖下载,

Rabbitmq安装主要依赖两个rpm, erlang和socat

使用rpm命令安装,也可以使用yum install *.rpm安装。
sudo rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm

测试是否安装成功:
erl 命令

sudo rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm

sudo rpm -ivh rabbitmq-server-3.7.8-1.el7.noarch.rpm
在安装rabbitmq时提示依赖socat

=================================

3: 启动和关闭:

/sbin/service rabbitmq-server stop #关闭

/sbin/service rabbitmq-server start #启动

/sbin/service rabbitmq-server --ram start #启动 消息内存
默认磁盘存储
参数“--ram”表示设置为内存节点,忽略次参数默认为磁盘节点

/sbin/service rabbitmq-server status #状态

执行sudo service rabbitmq-server start

=================================

4: cd 到/sbin目录下,查看插件

./rabbitmq-plugins list

查看完了,可以查看状态
./rabbitmqctl status

=================================

5:执行命令

rabbitmq-plugins enable rabbitmq_management,打开rabbitmq web管理。

=================================

6:增加用户

rabbitmqctl add_user admin ilovelaohan

rabbitmqctl set_user_tags admin administrator,设置admin为管理员权限

sudo rabbitmqctl set_permissions -p / admin '.' '.' '.*'

rabbitmqctl list_users

添加rabbitmq固定节点名字,保证数据文件不变,添加新用户

echo 'NODENAME=rabbit@r1' | sudo tee -a /etc/rabbitmq/rabbitmq-env.conf

查看集群状态

rabbitmqctl cluster_status

7:防火墙:

sudo service firewalld start
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=25672/tcp --permanent
firewall-cmd --zone=public --add-port=4369/tcp --permanent

iptables -I INPUT -p tcp --dport 4369 --syn -j ACCEPT
iptables -I INPUT -p tcp --dport 59984 --syn -j ACCEPT

sudo service firewalld start(如果系统不要求开启防火墙,可以在设置完以后再关闭它)

备注:即使防火墙处于关闭状态,也应该先打开端口再关闭,否则在有些机器上会仍然端口不通。

8:自启动

chkconfig rabbitmq-server on

RabbitMQ用户角色及权限控制
#######################

用户角色

#######################

RabbitMQ的用户角色分类:
none、management、policymaker、monitoring、administrator

RabbitMQ各类角色描述:
none
不能访问 management plugin

management
用户可以通过AMQP做的任何事外加:
列出自己可以通过AMQP登入的virtual hosts
查看自己的virtual hosts中的queues, exchanges 和 bindings
查看和关闭自己的channels 和 connections
查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。

policymaker
management可以做的任何事外加:
查看、创建和删除自己的virtual hosts所属的policies和parameters

monitoring
management可以做的任何事外加:
列出所有virtual hosts,包括他们不能登录的virtual hosts
查看其他用户的connections和channels
查看节点级别的数据如clustering和memory使用情况
查看真正的关于所有virtual hosts的全局的统计信息

administrator
policymaker和monitoring可以做的任何事外加:
创建和删除virtual hosts
查看、创建和删除users
查看创建和删除permissions
关闭其他用户的connections

创建用户并设置角色:
可以创建管理员用户,负责整个MQ的运维,例如:

[plain] view plain copy

在CODE上查看代码片派生到我的代码片
$sudo rabbitmqctl add_user user_admin passwd_admin
赋予其administrator角色:

[plain] view plain copy

在CODE上查看代码片派生到我的代码片
$sudo rabbitmqctl set_user_tags user_admin administrator

可以创建RabbitMQ监控用户,负责整个MQ的监控,例如:

[plain] view plain copy

在CODE上查看代码片派生到我的代码片
$sudo rabbitmqctl add_user user_monitoring passwd_monitor
赋予其monitoring角色:

[plain] view plain copy

在CODE上查看代码片派生到我的代码片
$sudo rabbitmqctl set_user_tags user_monitoring monitoring

可以创建某个项目的专用用户,只能访问项目自己的virtual hosts

[plain] view plain copy

在CODE上查看代码片派生到我的代码片
$sudo rabbitmqctl add_user user_proj passwd_proj
赋予其monitoring角色:

[plain] view plain copy

在CODE上查看代码片派生到我的代码片
$sudo rabbitmqctl set_user_tags user_proj management

创建和赋角色完成后查看并确认:

[plain] view plain copy

在CODE上查看代码片派生到我的代码片
$sudo rabbitmqctl list_users

########################

RabbitMQ 权限控制:

########################
默认virtual host:"/"
默认用户:guest
guest具有"/"上的全部权限,仅能有localhost访问RabbitMQ包括Plugin,建议删除或更改密码。可通过将配置文件中loopback_users置孔来取消其本地访问的限制:
[{rabbit, [{loopback_users, []}]}]

用户仅能对其所能访问的virtual hosts中的资源进行操作。这里的资源指的是virtual hosts中的exchanges、queues等,操作包括对资源进行配置、写、读。配置权限可创建、删除、资源并修改资源的行为,写权限可向资源发送消息,读权限从资源获取消息。比如:
exchange和queue的declare与delete分别需要exchange和queue上的配置权限
exchange的bind与unbind需要exchange的读写权限
queue的bind与unbind需要queue写权限exchange的读权限
发消息(publish)需exchange的写权限
获取或清除(get、consume、purge)消息需queue的读权限

对何种资源具有配置、写、读的权限通过正则表达式来匹配,具体命令如下:
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
其中,<conf> <write> <read>的位置分别用正则表达式来匹配特定的资源,如'^(amq.gen.*|amq.default)'可以匹配server生成的和默认的exchange,'^'不匹配任何资源

需要注意的是RabbitMQ会缓存每个connection或channel的权限验证结果、因此权限发生变化后需要重连才能生效。

为用户赋权:

[plain] view plain copy

在CODE上查看代码片派生到我的代码片
$sudo rabbitmqctl set_permissions -p /vhost1 user_admin '.' '.' '.*'
该命令使用户user_admin具有/vhost1这个virtual host中所有资源的配置、写、读权限以便管理其中的资源

服务器地址:

xawn.f3322.net
web: 15672
代码: 5672

sudo rabbitmqctl add_user team1 2342432
sudo rabbitmqctl set_user_tags team1 management

sudo rabbitmqctl add_user team2 sfdadfsf
sudo rabbitmqctl set_user_tags team2 management

sudo rabbitmqctl add_user team3 sdfsdfsdf
sudo rabbitmqctl set_user_tags team3 management

sudo rabbitmqctl add_user team4 sdfsdfdf
sudo rabbitmqctl set_user_tags team4 management

sudo rabbitmqctl set_permissions -p / team1 '.' '.' '.*'

5、修改rabbitmq.conf,使外部可以访问
cp /usr/share/doc/rabbitmq-server-3.6.6/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

进入rabbit文件夹下:cd /etc/rabbitmq

修改rabbitmq.config文件,使外部可以访问;vim rabbitmq.config
讲 %% {loopback_users, []},

改为:{loopback_users, []}

安装插件:
/sbin/rabbitmq-plugins enable rabbitmq_management

{tcp_listeners, [5672]}, {loopback_users, ["admin"]},

通过命令查看rabbitmq的状态发现

./rabbitmqctl status
结果
Status of node rabbit@localhost
[{pid,4151},
?{running_applications,
? ? ?[{rabbitmq_management,"RabbitMQ Management Console","3.6.11"},
? ? ? {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.11"},
? ? ? {rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.11"},
? ? ? {rabbit,"RabbitMQ","3.6.11"},
? ? ? {os_mon,"CPO ?CXC 138 46","2.2.14"},
? ? ? {cowboy,"Small, fast, modular HTTP server.","1.0.4"},
? ? ? {ranch,"Socket acceptor pool for TCP protocols.","1.3.0"},
? ? ? {ssl,"Erlang/OTP SSL application","5.3.3"},
? ? ? {public_key,"Public key infrastructure","0.21"},
? ? ? {cowlib,"Support library for manipulating Web protocols.","1.0.2"},
? ? ? {crypto,"CRYPTO version 2","3.2"},
? ? ? {amqp_client,"RabbitMQ AMQP Client","3.6.11"},
? ? ? {rabbit_common,
? ? ? ? ? "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
? ? ? ? ? "3.6.11"},
? ? ? {inets,"INETS ?CXC 138 49","5.9.8"},
? ? ? {mnesia,"MNESIA ?CXC 138 12","4.11"},
? ? ? {compiler,"ERTS ?CXC 138 10","4.9.4"},
? ? ? {xmerl,"XML parser","1.3.6"},
? ? ? {syntax_tools,"Syntax tools","1.6.13"},
? ? ? {asn1,"The Erlang ASN1 compiler version 2.0.4","2.0.4"},
? ? ? {sasl,"SASL ?CXC 138 11","2.3.4"},
? ? ? {stdlib,"ERTS ?CXC 138 10","1.19.4"},
? ? ? {kernel,"ERTS ?CXC 138 10","2.16.4"}]},
?{os,{unix,linux}},
?{erlang_version,
? ? ?"Erlang R16B03-1 (erts-5.10.4) [source] [64-bit] [smp:4:4] [async-threads:64] [hipe] [kernel-poll:true]\n"},
?{memory,
? ? ?[{connection_readers,0},
? ? ? {connection_writers,0},
? ? ? {connection_channels,0},
? ? ? {connection_other,2800},
? ? ? {queue_procs,82208},
? ? ? {queue_slave_procs,0},
? ? ? {plugins,406168},
? ? ? {other_proc,23722384},
? ? ? {metrics,142848},
? ? ? {mgmt_db,139192},
? ? ? {mnesia,65240},
? ? ? {other_ets,2331656},
? ? ? {binary,802256},
? ? ? {msg_index,87440},
? ? ? {code,26963978},
? ? ? {atom,992409},
? ? ? {other_system,19562285},
? ? ? {total,75300864}]},
?{alarms,[]},
?{listeners,[{clustering,25672,"::"},{amqp,5672,"0.0.0.0"},{http,15672,"::"}]},
?{vm_memory_calculation_strategy,rss},
?{vm_memory_high_watermark,0.4},
?{vm_memory_limit,1583901900},
?{disk_free_limit,50000000},
?{disk_free,10830860288},
?{file_descriptors,
? ? ?[{total_limit,924},{total_used,3},{sockets_limit,829},{sockets_used,0}]},
?{processes,[{limit,1048576},{used,332}]},
?{run_queue,0},
?{uptime,11},
?{kernel,{net_ticktime,60}}]

其中

{listeners,[{clustering,25672,"::"},{amqp,5672,"0.0.0.0"},{http,15672,"::"}]},

这是修改后的

{amqp,5672,"0.0.0.0"}

修改前

{amqp,5672,"127.0.0.1"}

amqp绑定死了本地ip
通过修改rabbitmq-env.conf配置文件:/etc/rabbitmq/rabbitmq-env.conf

vim /etc/rabbitmq/rabbitmq-env.conf

NODENAME=rabbit@localhost

NODE_IP_ADDRESS=127.0.0.1

NODE_IP_ADDRESS=0.0.0.0

NODE_IP_ADDRESS修改成0.0.0.0,也可以设置为空值,表示匹配所有ip

rabbitmqctl change_password guest ilovelaohan

Centos下的集群

一、为什么使用集群?
内建集群作为RabbitMQ最优秀的功能之一,它的作用有两个:

允许消费者和生产者在Rabbit节点崩溃的情况下继续运行;
通过增加节点来扩展Rabbit处理更多的消息,承载更多的业务量;

二、集群的特点
RabbitMQ的集群是由多个节点组成的,但我们发现不是每个节点都有所有队列的完全拷贝。

RabbitMQ节点不完全拷贝特性

为什么默认情况下RabbitMQ不将所有队列内容和状态复制到所有节点?

有两个原因:

存储空间——如果每个节点都拥有所有队列的完全拷贝,这样新增节点不但没有新增存储空间,反而增加了更多的冗余数据。
性能——如果消息的发布需安全拷贝到每一个集群节点,那--------------------------么新增节点对网络和磁盘负载都会有增加,这样违背了建立集群的初衷,新增节点并没有提升处理消息的能力,最多是保持和单节点相同的性能甚至是更糟。
所以其他非所有者节点只知道队列的元数据,和指向该队列节点的指针。

三: RabbitMQ模式大概分为以下三种:
(1)单一模式。
(2)普通模式(默认的集群模式)。
(3) 镜像模式(把需要的队列做成镜像队列,存在于多个节点,属于RabbiMQ的HA方案,在对业务可靠性要求较高的场合中比较适用)。
要实现镜像模式,需要先搭建一个普通集群模式,在这个模式的基础上再配置镜像模式以实现高可用。

普通模式

具体步骤:
1:配置hosts文件

更改两台MQ节点的计算机名分别为r1、r2,然后修改hosts配置文件。

vi /etc/hostname    //其他两台相同
  r1.localdomain

2:修改配置能够互相PING通。

vi /etc/hosts
192.168.2.101 r1  //注意不能带.注意-主机名称也要更改
192.168.2.102 r2

添加rabbitmq固定节点名字,保证数据文件不变,添加新用户

echo 'NODENAME=rabbit@r1' | sudo tee -a /etc/rabbitmq/rabbitmq-env.conf

3.拷贝erlang.cookie
Rabbitmq的集群是依附于erlang的集群来工作的,所以必须先构建起erlang的集群镜像。Erlang的集群中各节点是经由过程一个magic cookie来实现的,这个cookie存放在/var/lib/rabbitmq/.erlang.cookie中,文件是400的权限。所以必须保证各节点cookie一致,不然节点之间就无法通信。

cat /var/lib/rabbitmq/.erlang.cookie 
QZERHXYJPHUNILXVQOQO

用scp的方式将r1节点的.erlang.cookie的值复制到其他节点中。

1:停服务
2:再拷贝

scp /var/lib/rabbitmq/.erlang.cookie root@192.168.2.102:/var/lib/rabbitmq/.erlang.cookie

4:将r2作为内存节点加入r1节点集群中
在r2执行如下命令:

rabbitmqctl stop_app    //停掉rabbit应用

rabbitmqctl join_cluster --ram rabbit@r1 //加入到磁盘节点
rabbitmqctl join_cluster  rabbit@r1

rabbitmqctl start_app  //启动rabbit应用

5:查看集群状态
rabbitmqctl cluster_status

在RabbitMQ集群集群中,必须至少有一个磁盘节点,否则队列元数据无法写入到集群中,当磁盘节点宕掉时,集群将无法写入新的队列元数据信息。

第一、成功的搭建了haproxy+rabbitmq集群,确认集群肯定没问题的。
第二、3个集群节点中两个ram和一个disc。
第三、目前java程序的写法是往第一个ram节点发送消息,再从第二个ram节点接收消息,disc节点只做同步不做操作。

Hostname mismatch: node "rabbit@localhost" believes its host is different. Please ensure that hostnames resolve the same way locally and on "rabbit@localhost"

于是修改rabbitmq-env.conf配置文件(rabbitmq默认路径:/etc/rabbitmq/rabbitmq-env.conf)
在集群每台机器上执行 vim /etc/rabbitmq/rabbitmq-env.conf(该文件默认不存在,需手动添加),添加配置如下

[root@master rabbitmq]# vi /etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_NODENAME=rabbit@rabbit-node1


.安装RabbitMQ集群十步走!(3.6.5版本、采用rpm安装方式,我们要安装3个节点的集群,192.168.11.71 192.168.11.72 192.168.11.73、以71节点为例:)

第一步:下载所需依赖包

Rabbitmq基于erlang语言编写,所以必须要有。

wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm



socat包为秘钥包,如果没有会提示缺少此依赖。

wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm



rabbitmq-server-3.6.5核心服务包。

wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm

第二步:安装并配置

安装命令:

rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm

rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm

rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm

安装成功后:修改配置

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app文件

修改:loopback_users 中的 <<"guest">>,只保留guest

第三步:3个节点同时进行前两步骤操作:

第四步:启停单点服务与开启管控台

/etc/init.d/rabbitmq-server start | stop | status | restart

分别启动三个节点,然后执行命令启动控制台插件如下:

rabbitmq-plugins enable rabbitmq_management最后使用 guest/guest登录成功即可!

第五步:接下来进行集群构建:

71、72、73任意一个节点为Master(这里选择71为Master)

也就是说我们需要把71的Cookie文件同步到72、73节点上去,先停止所有服务器:/etc/init.d/rabbitmq-server stop

然后进入制定目录(/var/lib/rabbitmq/)并远程copy 文件到72、73节点,如下操作:scp /var/lib/rabbitmq/.erlang.cookie到192.168.11.72和192.168.11.73中

第六步:组成集群

首先启动三个节点:rabbitmq-server -detached

然后把72和73分别加入到71中,组成集群 [--ram]为节点以什么方式加入到集群中

ram为内存 存储 默认不加为disk磁盘存储,操作如下:

node72:rabbitmqctl stop_app

node72:rabbitmqctl join_cluster [--ram] rabbit@bhz71

node72:rabbitmqctl start_app

第七步:修改集群名称:

rabbitmqctl set_cluster_name rabbitmq_cluster1

第八步:查看集群状态:

rabbitmqctl cluster_status ,

第九步:构建镜像队列,任意节点执行命令如下:

rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

第十步:查看管控台发现集群已经构建成功!

RabbitMQ与SpringBoot2.x整合

生产者端:

第一步:pom.xml配置如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.bfxy</groupId>
<artifactId>rabbitmq-springboot-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>rabbitmq-springboot-producer</name>
<description>rabbitmq-springboot-producer</description>

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.2.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

</project>

第二步:application.properties配置文件

spring.rabbitmq.addresses=192.168.11.71:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true  #confirm模式
spring.rabbitmq.publisher-returns=true   #return机制
spring.rabbitmq.template.mandatory=true  #与return机制结合配置次属性

第三步:编写RabbitSender生产端代码

package com.example.springboot.producer;

import java.util.Map;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class RabbitSender {
//自动注入RabbitTemplate模板类
@Autowired
private RabbitTemplate rabbitTemplate;  

//回调函数: confirm确认
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.err.println("correlationData: " + correlationData);
        System.err.println("ack: " + ack);
        if(!ack){
            System.err.println("异常处理....");
        }
    }
};

//回调函数: return返回
final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
    @Override
    public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
            String exchange, String routingKey) {
        System.err.println("return exchange: " + exchange + ", routingKey: " 
            + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
    }
};

//发送消息方法调用: 构建Message消息
public void send(Object message, Map<String, Object> properties) throws Exception {
    MessageHeaders mhs = new MessageHeaders(properties);
    Message msg = MessageBuilder.createMessage(message, mhs);
    rabbitTemplate.setConfirmCallback(confirmCallback);
    rabbitTemplate.setReturnCallback(returnCallback);
    //id + 时间戳 全局唯一 
    CorrelationData correlationData = new CorrelationData("1234567890");
    rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
}

}

消费者:

第一步:pom文件同生产者一致

第二步:application.properties配置文件

spring.rabbitmq.addresses=192.168.11.76:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.listener.simple.acknowledge-mode=manual #手工签收
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10

第三步:RabbitRecever消费端代码

package com.example.springboot.conusmer;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

@Component
public class RabbitReceiver {@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "queue-1", 
        durable="true"),
        exchange = @Exchange(value = "exchange-1", 
        durable="true", 
        type= "topic", 
        ignoreDeclarationExceptions = "true"),
        key = "springboot.*"
        )
)
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
    System.err.println("--------------------------------------");
    System.err.println("消费端Payload: " + message.getPayload());
    Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    //手工ACK
    channel.basicAck(deliveryTag, false);
}
}

生产端测试:

package com.example.springboot;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.bfxy.springboot.producer.RabbitSender;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Test
public void contextLoads() {
}

@Autowired
private RabbitSender rabbitSender;

private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

@Test
public void testSender1() throws Exception {
     Map<String, Object> properties = new HashMap<>();
     properties.put("number", "12345");
     properties.put("send_time", simpleDateFormat.format(new Date()));
     rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容