- RabbitMQ采用Erlang编写,需安装语言库才能运行RabbitMQ代理服务器。
- AMQP:高级消息队列协议。
- Github代码
理解消息通信
生产者和消费者
生产者创建消息并创建标签,然后发布到代理服务器RabbitMQ。
消息包含两部分内容
- 有效载荷:需传输的数据,可以是任何内容。
- 标签:用于描述有效载荷,并指定谁将获得消息的拷贝。
消费者连接到代理服务器,并订阅到队列。
消费者只会接收到有效载荷。在消息路由过程中,消息的标签并没有随有效载荷一同传递。
建立RabbitMQ连接
- 创建
应用程序→RabbitMQ代理服务器
的TCP连接。- 在TCP连接上创建信道,AMQP命令也是通过信道发送的。每条信道都会被分配一个唯一的ID。
队列
AMQP消息路由三要素
- 交换器
- 队列
- 绑定
消费者通过以下两种方式从特定的队列中接收消息
- 连续获得:通过AMQP的
basic.consume
命令订阅。这样做会将信道置为接收模式,直到取消对队列的订阅为止。订阅了消息后,消费者在消费|拒绝最近接收的消息后,就能从队列中自动接收下一条消息。- 获得单条:通过AMQP的
basic.get
命令订阅。basic.get
会订阅消息,获得单条消息,然后取消订阅。
消费者接收到的每一条消息都必须进行确认,确认后RabbitMQ才会从队列移除该消息
- 通过AMQP的
basic.ack
命令显示地向RabbitMQ发送确认。- 订阅到队列时设置auto_ack参数为true,则消费者接收到消息,RabbitMQ会自动视其确认了消息。
拒绝接收消息
- 把消费者从RabbitMQ服务器断开连接。
- 使用AMQP的
basic.reject
命令(RabbitMQ2.0.0+)
。其参数requeue可指定该消息是否发送给下一个消费者进行处理。在新版本中,该参数置为false会让消息进入死信队列,被拒绝而不重入队列的消息,可用于检测问题。
声明队列
- 使用AMQP的
queue.declare
命令来创建队列。 - 若消费者在同一条信道上订阅了另一个队列,需先取消订阅并将信道置为传输模式。
- 消费者创建队列时需指定队列名称,若未指定Rabbit会分配一个随机名称并在
queue.declare
命令的响应中返回。
- 参数说明
- exclusive:是否私有,可用于限制一个队列只有一个消费者。
- auto-delete:是否自动移除,可用于只有一个消费者服务。结合
exclusive
和auto-delete
,可实现当消费者断开连接时,则移除队列。- passive:检测是否已存在队列,存在则成功返回;否则不创建队列且返回一个错误。
- 声明一个已存在的队列
- 参数完全匹配现存的队列,Rabbit什么都不做,并成功返回。参数不匹配则会失败。
生产者还是消费者来创建队列
发送出去的消息如果路由到了不存在的队列,Rabbit会忽略它们
- 避免消息进入黑洞而丢失,则生产者和消费者都应该尝试去创建队列。
- 可忽略消息进入黑洞而丢失,或者自定义方法来重新发布未处理消息,则可只让自己的消费者来声明队列。
交换器和绑定
- 消息通过路由键绑定到交换器,服务器会根据路由将消息从交换器路由到队列。
- 如果路由的消息不匹配任何绑定模式,则消息将进入黑洞。
使用交换器和绑定的优点
- 完成不同的使用场景
- 对于消息的生产者来说,它不需要关心服务器的另一端队列和消费者的逻辑。
交换器四种类型
- direct:若路由键匹配,消息就被投递到相应的队列。交换器名称为空白字符串。
- fanout:会将收到的消息广播到所有绑定的队列中。
- topic:可让来自不同源头的消息到达同一个队列。
- headers:允许匹配AMQP消息的header而非路由键,headers和direct交换器完全一致,但性能会差很多,几乎不用。
队列绑定到交换器可使用通配符
-
.
可将路由键分割成若干个部分 -
*
匹配任意文本 -
#
匹配所有规则,没有分割概念,将.
视为关键字的匹配部分
虚拟主机和隔离
虚拟主机vhost
- 默认vhost:/
- 默认用户和密码:guest
- Rabbit管理工具:RabbitMQ安装目录下的rabbitmqctl,该命令不带参数则显示帮助信息。
- 拥有自己的队列,交换器和绑定。
- 拥有自己的权限机制,vhost之间是绝对隔离的。
- 可只运行一个Rabbit,然后按需启动若干个vhost。
- 在RabbitMQ集群上创建vhost时,整个集群上都会创建该vhost。
rabbitmqctl参数
-
-n rabbit@[server_name|ip]
:指定远程节点,默认为本地。需确保运行Rabbit节点的服务器和运行rabbitmqctl的工作站安装了相同的Erlang cookie。
管理vhost
- 创建vhost:
rabbitmqctl add_vhost [vhost_name]
- 删除vhost:
rabbitmqctl delete_vhost [vhost_name]
- 列出Rabbit服务器上的vhost:
rabbitmqctl list_vhosts
持久化策略
设置参数实现持久化
- 消息投递模式设置为2(持久化)
- 发送到持久化的交换器
- 到达持久化的队列
- 队列和交换器的durable属性:是否持久化。默认fasle。
- 消息的投递模式设值为2:标记成持久化。
- 持久化方式实现:将持久化消息写入持久化日志文件。
- 持久化消息在RabbitMQ内建集群环境下工作得并不好。
- 持久化消息→持久化交换器:Rabbit会在消息提交到日志文件后才发送响应。
- 持久化消息→非持久化队列,则会将该消息从日志中移除。
AMQP事务
优点:
- 填补了生产者发布消息和RabbitMQ将它们提交到磁盘过程中差生错误的漏洞。
缺点:
- 会降低大约2~10倍的消息吞吐量。
- 会使生产者应用程序产生同步。
发送方确认模式
需要将信道置为confirm模式,只能通过重新创建信道来关闭该设置。
优点:
- 异步执行
- 更加轻量级,对Rabbit性能的影响几乎可以忽略不计。
消息的生命周期
生产者
需要完成的任务
- 创建RabbitMQ连接
- 获取信道
- 声明交换器
- 创建消息
- 发布消息
- 关闭信道
- 关闭RabbitMQ连接
- 代码展示
消费者
需要完成的任务
- 创建RabbitMQ连接
- 获取信道
- 声明交换器
- 声明队列
- 队列和交换器绑定
- 消费信息
- 关闭信道
- 关闭RabbitMQ连接
- 代码展示
*发送方确认模式进行确认投递
- 若同时拥有众多运行的信道,则需要为每条信道维护一个内部的ID计数器。
- 代码展示
Rabbit的运行管理
- Erlang节点和Erlang应用程序。
- 节点:指代RabbitMQ服务器实例。
- JVM:一个实例一个应用程序。
- Erlang:一个节点可同时运行多个应用程序。
- 如果应用程序崩溃了,节点会自动尝试重启应用程序。
- RabbitMQ节点:通常指RabbitMQ应用程序和其所在的Erlang节点。
管理节点
- 启动Erlang节点和Rabbit应用程序:
rabbitmq -server
,参数-detached
:后台启动。- 停止节点(应用程序和节点同时关闭):
rabbitmqctl stop
,参数-n rabbit@[hostname]
可指定远程节点。- 只停止RabbitMQ程序:
rabbitmqctl stop_app
Rabbit配置文件
rabbitmq.config
- Erlang数据结构
- mnesia:Mnesia数据库配置选项,Mnesia是RabbitMQ用来存储交换器和队列元数据的。Mnesia配置选项
- rabbit:Rabbit特定的配置选项
- 配置文件不能完成对RabbitMQ的访问控制
管理权限
单个用户可以跨越多个vhost进行授权。
用户管理
- 创建用户:
rabbitmqctl add_user username password
- 删除用户:
rabbitmqctl delete_user username
,访问控制条目一并删除。- 列出用户:
rabbitmqctl list_users
- 修改用户密码:
rabbitmqctl change_password username new_password
Rabbit权限系统
- 创建权限:
rabbitmqctl set_permissions -p vhost_name username ".*" ".*" ".*"
,参数-p
指定vhost(默认为/),后面三个正则式参数分别对应:配置,写,读权限。- 列出指定vhost的权限信息:
rabbitmqctl list_permissions -p vhost_name
- 删除权限:
rabbitmqctl clear_permissions -p vhost_name username
- 列出用户在Rabbit上的所有权限信息:
rabbitmqctl list_user_permissions username
列出队列,交换器和绑定相关信息
列出队列以及队列信息
- 列出队列:
rabbitmqctl list_queues -p vhost_name
,参数-p
指定vhost(默认为/)- 列出队列其他信息:
rabbitmqctl list_queues queue_info_item...
,queue_info_items选项列表
列出交换器信息
rabbitmqctl list_exchanges
:默认返回交换器名称和类型。rabbitmqctl list_exchanges exchange_info_item...
:指定返回其他信息。exchange_info_items选项列表
列出绑定信息
rabbitmqctl list_bindings
,参数-p
可指定vhost(默认为/),默认返回的信息每行包含:交换器名称,队列名称,路由键和参数。
RabbitMQ日志
LOG_BASE环境变量
- rabbitmq-server:
LOG_BASE=/var/log/rabbitmq
- rabbit-sasl.log:Erlang运行相关信息。
- rabbit.log:Rabbit运行相关信息,如网络流量,用户,交换器队列等。
轮换日志
rabbitmqctl rotate_logs suffix
:切换新日志文件,原日志文件名+suffix
Erlang常见错误
Erlang cookie
- Erlang节点通过交换作为令牌的Erlang cookie以获得认证。
- 通常存储在~/.erlang.cookie
Erlang节点
- 长节点名name:
rabbit@hostname.network.tld
- 短节点名sname:
rabbit@hostname
,默认方式
Mnesia数据库
- RabbitMQ启动时会先启动Mnesia
- MNESIA_BASE目录写权限,Mnesia基于主机名创建数据库。
Erlang相关操作
- 启动节点:
erl -sname node_name
- 列出已连接的节点:
node().
- 查看已运行的节点:
net_adm:name().
,使用net_adm
模块调用name()
函数,此处显示的RabbitMQ端口并不是AMQP连接的端口。- ping检测其它节点:
net_adm:ping('rabbit@hostname').
,响应为pong则连接成功,为pang则连接失败。需共享Erlang cookie。- 远程执行rabbit函数:
rpc:call('rabbit@hostname',module_name,function_name,[show_items])
,如:rpc:call('rabbit@hostname',erlang,system_info,[process_count])
,rpc:call('rabbit@hostname',mnesia,info,[])
- 退出节点:
q().
RabbitMQ应用程序设计
从同步编程模型转向异步编程模型
- 图片上传并行处理
- 告警通知邮件
- 实现分布式远程过程调用
发后既忘处理模型
- 批处理:将单张图片上传并转换成众多其它尺寸和格式。
- 告警通知:不需要担心发送目标和发送方式。
- 用户积分奖励
- 代码展示
实现RPC远程调用
私有队列和发送确认
- 生产者通过
消息头的reply_to
字段确定队列名称,并监听队列等待应答。- 消费者能够检查
reply_to字段
,然后创建包含应答内容的新的消息,并以队列名称作为路由键。- 此处可在用于消费的同一条信道上发布应答消息。
- 使用
reply_to
作为发布应答消息的目的地;发布时无须指定交换器。- 代码展示