一、概念介绍
Rabbitmq的整体结构图如下:
- Producer:生产者是投递消息(消息包含两个部分,消息体及消息标签)的一方
- Consumer:消费者是接受消息的一方
- Borker:消息中间件的服务节点
- Queue:队列是RabbitMQ的内部对象,用于存储消息(多个消费者可以同时订阅一个队列,这是队列中的消息会被平均分配到各个消费者)
- Exchange:交换机,交换机有四种类型:
- fanout:他会把所有发送到该交换机的消息路由到所有与该交换机绑定的队列中
- direct:他会把消息路由到那些
BindingKey
与RoutingKey
完全匹配的队列上 -
topic:
- header:该类型的交换机不依赖于路由键的匹配规则,而是根据发送消息内容中的
header
属性进行匹配
- RoutingKey:路由键,生产者将消息发送给交换机的时候,会指定一个
RoutingKey
,用来指定这个消息的路由规则。 - Binding:绑定。RabbitMQ中通过绑定将交换器与队列关联起来,在绑定时会指定一个绑定键
BindingKey
- Connection:一条客户端与
Broker
之间建立的TCP连接。 - Channel:
AMOP
信道,每一个都会被指派一个唯一的ID,信道是建立在Connection
上的虚拟连接。RabbitMQ处理的每条AMQP指令都是通过信道完成的(在多线程间共享Channel
实例是非线程安全的)。
引入Channel的原因:对于操作系统而言,建立和销毁TCP连接时非常昂贵的代价。RabbitMQ采用了类似于NIO的做法,选择TCP连接的复用,不仅可以减少性能的开销,同时也便于管理。每个线程把持一个信道,同时RabbitMQ可以确保每一个线程的私秘境,就像拥有了独立的连接一样。当每个信道的流量不是很大的时候,复用Connection可以有效的节省TCP资源。但是当信道本身的流量很大的时候,这时候复用Connection就会产生性能的瓶颈
二、建立连接
在建立连接后,可以通过捕获ShutdownSignalException
来判断Connection
或者Channel
是否已断开。生产者与消费者都可以声明一个交换机或者队列。如果尝试声明一个已经存在的交换机或队列,只要声明的参数完全匹配现有参数,那么RabbitMQ就可以什么都不做并成功返回,否则抛出异常。
Exchange声明参数详解
- autoDelete:是否自动删除。自动删除的前提是至少有一个队列或者交换机与这个交换机绑定过,之后所有与这个交换机绑定的队列或者交换机都与此解绑。
- internal:是否内置交换机。内置交换机,客户端程序无法直接发送消息到这个交换机,只能通过交换机路由到交换机的这种方式发送消息。
Queue声明参数详解
- durable:是否持久化。持久化的队列会存盘,再服务器重启的时候也可以保证不丢失相关信息。
- exclusive:是否排他。排他队列仅对首次声明它的连接可见,并在连接断开时自动删除。排它队列是基于连接可见的,同一个连接的不同信道,是可以同时访问同一个连接创建的排他队列的。如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名队列的。无论排他队列是否是持久化的,一旦连接关闭或者客户端退出,排他队列均会自动被删除。
- autoDelete:是否自动删除。自动删除的前提是,至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除(与生产者无关,必须是消费者客户端)。
ExchangeBind
不仅可以将交换机与队列绑定,也可以将交换机与交换机绑定。
RabbitMQ的消息存在消息队列中,交换机并不真正的消耗服务器的性能,但是队列会。
断线重连
Automatic Recovery From Network Failures
三、消息消费
RabbitMQ的消息消费方式有两种:推
与拉
模式。推模式采用Basic.Consume
,拉模式调用Basic.Get
进行消费。
推模式
在推模式中,可以通过订阅的方式来消费消息,不同的订阅采用不同的消费者标签(consumerTag
)来区分彼此。与生产者一样,消费者客户端也要考虑线程安全的问题。消费者客户端的callback
会被分配到与Channel
不同的线程上执行。
每一个Channel都有自己独立的线程,最常用的方法是一个Channel对应一个消费者,不推荐在一个Channel中维护多个消费者
拉模式
通过basicGet
方法可以单条的获取消息。
GetResponse response = channel.basicGet(QUEUE_NAME,false); //autoback=false
channel.basicAck(response.getEnvelope().getDeliveryTag(),false); //使用basicAck手动确认接收
推模式会将信道
Channel
设置为投递模式,直到取消队列的订阅为止。在投递模式期间,RabbitMQ会不断地推送消息给消费者,当然,推送消息的个数还会收到Basic.Qos
的限制。如果只是想从队列中获取单条信息而不是持续订阅,推荐使用拉模式。
消息分发
当RabbitMQ的队列拥有多个消费者时,队列收到的消息将以轮询的方式发送给消费者,并且每条消息只会发送给订阅列表中的一个消费者。RabbitMQ可以通过使用channel.basicQos
的方法限制信道上的消费者所能保持的最大为确认消息数量,以此实现更为“公平的”分配原则。例如如下代码:
channel.basicQos(5)
表示消费者所能接受未确认消息的总体个数。
消息的顺序性
目前许多资料显示RabbitMQ是可以保证消息的顺序性,但是这个观点是十分不可靠的,或者说这个观点是十分有局限性的。在不使用RabbitMQ的高级特性,也没有消息丢失,网络故障等等的理想情况下,并且只有一个消费者时,同时最好只有一个生产者时,是可以保证保证消息的顺序性。
因此要保证消息的顺序性,需要根据具体的业务来保证顺序及幂等性的。
消费者的确认与拒绝
为了保证消息从队列可靠达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement)。消费者在订阅队列时。可以指定autoAck
参数,当autoAck=false
时候,RabbitMQ会等待消费者显示的回复确认信号后才从内存(或硬盘)中移除消息(实际上时先打上删除标记,之后才会删除)。当autoAck=true
时,RabbitMQ会把送出的消息设置为确认状态,然后从内存(或硬盘)中移除消息,无论消费者是否已经正确的消费了消息。
RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递的唯一依据是消费该消息的消费者是否已经断开
消息拒绝
RabbitMQ2.0版本开始引入了Reject
命令,消费者客户端可以调用channel.basicReject
方法告诉RabbitMQ拒绝这个消息
void basicReject(long deliveryTag, boolean requeue)
其中deliveryTag
为长整数类型,表示消息的编号。requeue=true
则RabbitMQ会重新将消息存入队列,否则立刻将消息从队列中移除。
Reject
命令一次只能拒绝一条消息,如果想批量拒绝消息,则可以使用Nack
这个命令。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
其中multiple=false
表示拒绝当前deliveryTag
的消息,multiple=true
表示拒绝该编号之前,所有未被当前消费者确认的消息。
basicReject
或者basicNack
中的requeue
设置为false,可以启用死信功能
四、消息投递
在生产者客户端投递消息时,可以通过设置mandatory
或者immediate(已弃用)
参数来实现当消息无法达到目的地时返回给生产者这一功能。而RabbitMQ提供的备份交换器功能可以将未能被交换机路由的消息(没有绑定队列或没有匹配的绑定)存储起来,而不是返回给生产者客户端。
mandatory
当mandatory=true
时,交换机无法将消息投递到符合条件的队列时,RabbitMQ会调用Basic.Return
命令,将消息返回给生产者。当mandatory=false
时,出现上述情景,消息直接被丢弃
immediate
该参数在3.0版本之后已经废弃
备份交换机
由于mandatory
参数的使用会增加生产者客户端的代码复杂程度,可以使用AE(Alternate Exchange)交换机达到不丢失消息的目的。
可以通过在声明交换机的时候,指定alternate-exchange
参数来实现为交换机添加备用交换机。
备用交换机与普通交换机在技术上没有太大区别,为了方便使用,建议将备用交换机设置为
topic
类型。消息发送到备用交换机时的路由键和从生产者发出的路由键是一样的。
消息过期时间(TTL)
TTL,即过期时间。RabbitMQ可以对消息和队列设置TTL
设置消息的过期时间
目前有两种方法设置消息的TTL,第一种是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种是对消息本身进行单独的设置,每条消息的TTL可以不同。如果同时设置了两种方法,则以二者之间较小者为准。
消息在队列中的时间一旦超过设置的TTL值,就会变成死信,消费者无法再收到该消息(消息会从队列中删除)。如果不设置TTL,则此消息不会过期;如果TTL设置为0,则表示除非此消息可以直接投递到消费者,否则立即丢弃。
设置队列的过期时间
为队列设置过期时间,可以控制队列被自动删除前处于未使用状态的时间。未使用的意思是队列上没有任何消费者,队列也没有被重新声明,并且过期时间段内也未调用过Basic.Get
命令。
死信队列
DLX
(Dead-Letter-Exchange),死信交换机。当一个消息在队列中变成死信后,它能被重新发送到另一个交换机中,这个交换机就是DLX
,绑定死信交换机的队列为死信队列。
消息变为死信一般有如下几种情况:
- 消息被Rejected,并且
requeue
设置为false - 消息过期
- 队列达到最大长度
延迟队列
延迟队列存储的对象是延迟消息。RabbitMQ本身不支持延迟队列特性,如果需要使用RabbitMQ实现延迟消息,有如下途径:
- 延迟队列插件
- DLX和TTL配合模拟延迟队列(但是这种方式具有局限性,不能灵活的控制每个消息的过期时间。虽然RabbitMQ支持设置消息的过期时间,但通过这种方式设置,当消息过期时,消息并不能立刻从队列中抹去,因为每条消息的过期时间是在即将投递给消费者时判断的。因此这种延迟队列,只适应于为整个消息队列设置统一的过期时间的方式)
优先级队列
优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备有限被消费的权力。
可以通过设置队列的x-max-priority
参数来实现。
Map<string,object> args = new HashMap(string,object);
args.put("x-max-priority",10);
channel.queueDeclare("queueName",true,false.false,args);
在设置了队列的最大优先级后,需要在发送时在消息中设置消息当前的下优先级。默认最低为0
如果消费者的消费速度大于生产者的投递速度,且Broker中没有消息堆积,那么对发送的消息设置优先级就没有什么实际意义了。
五、持久化
在RabbitMQ中,持久化分为三个部分:交换机、队列、消息
- 交换机:如果交换机不设置持久化,那么当RabbitMQ重启后,相关的交换机元数据会丢失,不过不会丢失消息,只是不能将消息发送到这个交换机中了
- 队列:如果队列不设置持久化,那么当RabbitMQ重启后,相关队列的元数据会丢失,并且队列中的消息也会丢失。
- 消息:队列的持久化能够保证其本身的元数据不会丢失,但是并不能保证内部所存储的消息不丢失。要确保消息不丢失,需要将消息设置为持久化的。
有时将所有队列跟消息都设置为持久化并不是非常好的决定,这样会严重影响RabbitMQ的性能。在选择是否要将消息持久化时,需要在可靠性与吞吐量之间做一个权衡。
交换机、队列、消息均设置为持久化之后也不能保证消息100%的不丢失,例如:
- 消费者将
autoAck
设置为true,并且在接受消息后,还没来得及处理消息就宕机了。此时消息会丢失 - 持久化消息正确存储RabbitMQ之后,还需要有一段时间(虽然很短,但是确实存在)的处理才能存储至系统磁盘中。RabbitMQ并不会为每条消息都进行磁盘同步(
fsync
)的处理,可能消息仅仅保存到了操作系统缓存之中,此时宕机依旧会造成消息丢失的情况。(这个问题可以通过发送方确认机制来避免,详情见下节)
六、生产者确认
在使用RabbitMQ时,虽然持久化可以在很大程度上解决消息丢失的问题,但是如何确保生产者将消息投递到服务器也非常重要。默认情况下,生产者是不知道消息是否正确的投递到服务器上的,如果消息在抵达服务器前已经丢失,那么持久化也就没有了意义。针对这种情况,RabbitMQ提供两种解决方案:
- 通过事务机制
- 通过发送方确认(
publisher confirm
)机制实现
事务机制(AMQP协议层面)
RabbitMQ的客户端提供三个与事务机制相关的方法:
-
channel.txSelect
:用于将当前信道(channel
)设置为事务模式 -
channel.txCommit
:用于提交事务 -
channel.txRollback
:用于事务回滚
如果事务提交成功,则消息一定到达了RabbitMQ中。如果事务在提交执行之前由于RabbitMQ异常崩溃或其他原因抛出了异常,这时候我们可以将其捕获,并执行事务回滚(与传统的数据库事务不同,这里一定要显示的调用回滚)。
发送方确认机制
虽然事务能过解决发送方和RabbitMQ之间的消息确认问题,但是事务机制是比较消耗性能的。虽然从AMQP协议层面上很难有更好的办法,但是RabbitMQ提供了一个改进方案,即发送方确认机制。
生产者将信道设置成(confirm
)模式,一旦信道进入该模式,所有在该信道上发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到匹配的队列后,RabbitMQ会发送一个确认(Ack)给生产者(包含消息的唯一ID),这样生产者便知道了该消息已经正确到达目的地。如果消息和队列是持久化的,那么RabbitMQ会在消息写入磁盘后发出确认回复(但是如果消息没能正确的投递到队列上,也会发送确认回复)。
事务机制在一条消息发送之后会使客户端阻塞并等待RabbitMQ的响应,接到响应之后才能继续发送下一条消息。相比之下发送放确认机制是异步的,一旦发布一条消息,生产者可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后(ack或nack)生产者便可以通过回调的方式处理该响应事件。
生产者确认机制在大部分的客户端中有3种使用方式,分别为同步模式
、批量同步模式
、异步模式
。同步模式
下的QPS其实与事务机制相比并没有特别大的提升。批量同步模式
当遇到超时或被nack
的消息时,往往需要重新批量发送全部的消息,导致性能更差。
异步模式
的编程实现最为复杂。客户端Channel需要监听Ack
,Nack
两个回调函数方法(具体实例可以参看Publisher Confirms)
与事务机制对比,
同步模式
的生产者确认机制会少发一个命令帧报文。事务机制发送publish
,commit
,commit-ok
3条命令,而生产者确认模式发送publich
,ack
这2条命令。
注意要点:
- 事务机制和生产者确认机制是互斥的,不能共存。
- 事务机制和生产者确认机制确保的是消息可以正确的发送至RabbitMQ,这里的含义是被正确的发送到RabbitMQ的交换机,如果此交换机没有匹配的队列,那么消息也会丢失。因此在使用这两种机制的时候,还需使用备份交换机等技术来保证消息传输的可靠性。
七、消息传送保障
消息的传输一般是业务系统接入消息中间件的时候首要考虑的问题,一般可分为三个层级:
- 最多一次:消息可能丢失,但是绝对不会重复传输
- 最少一次:消息绝对不能丢失,但是可能会重复传输
- 恰好一次:每条消息肯定会被传输一次且仅传输一次
RabbitMQ支持其中的“最多一次”和“最少一次”。其中“最少一次”需要考虑一下几个问题:
- 生产者开启事务或者确认机制
- 生产者需要使用备份交换机确保消息被路由到队列中而不是被丢弃
- 消息,队列都要持久化
- 消费者关闭
autoAck
,然后手动确认。
“恰好一次”目前RabbitMQ是无法保证的,其实大都数主流的中间件都没有去重机制。因此在业务上来处理“恰好一次”更为合适。
八、关闭连接
在应用程序使用完之后,需要关闭连接,释放资源:
channel.close();
conn.close();
主动关闭Channel
是个好习惯,但不是必须的。在Connection
关闭的时候,其下的Channel
也会自动关闭。
在AMQP中的Connection和Channel具有如下的生命周期:
- Open:开启状态,代表当前对象可以使用。
- Closing:正在关闭。当前对象被显示的调用关闭方法(shutdown),这样就产生了一个关闭请求让其内部对象进行响应的操作,并等待这些关闭操作的完成。
- Closed:已经关闭的状态。当前对象已经接收到其内部所有对象已关闭动作的通知,并且自身也关闭了。
Connection
与Channel
最终都会成为Closed
状态,无论程序是正常关闭或是客户端的异常、网络等原因导致的。
九、RabbitMQ配置
RabbitMQ提供了三种方式来定制化服务
环境变量
RabbitMQ的环境变量都是以RabbitMQ_
开头的,可以在Shell环境变量中配置,也可以在rabbitmq-env.conf这个文件中配置。如果在非Shell中配置,需要去掉前缀RabbitMQ_
。优先级为Shell,其次rabbitmq-env.conf,最后默认配置。推荐在rabbitmq-env.conf
文件中修改配置。
- rabbitmq-env.conf:默认在
$RABBITMQ_HOME/etc/rabbitmq
目录下,也可以通过在启动RabbitMQ服务的时候,通过指令RABBITMQ_CONF_ENV_FILE
变量老更改此文件路 - 默认配置:在
$RABBITMQ_HOME/sbin/rabbitmq-defaults
文件中。
配置文件
由于不同的操作系统和安装包的默认配置文件位置不一样。因此查看配置文件地址的最好方法就是看RabbitMQ的启动日志。
可以通过RABBITMQ_CONFIG_FILE
或rabbitmq-env.conf
来设置文件地址。