概述
本文介绍一下AMQP协议和RabbitMQ中几个比较重要的方法
AMQP
我们知道RabbitMQ是遵从AMQP协议的,换句话说,RabbitMQ就是AMQP协议的Erlang的实现(当然RabbitMQ还支持SROMP、MOTT等协议)。
AMQP的模型架构和RabbitMQ的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定 。当生产者发送消息时所携带的 RoutingKey与绑定时的 BindingKey相匹配时,消息即被存入相应的队 列之中 。 消费者可以订阅相应的队列来获取消息 。
RabbitMQ中的交换器、交换器类型、队列、绑定、路由键等都是遵循的AMQP协议中的相应的概念
AMQP协议本身包括三层:
-
Module Layer: 位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。
例如:客户端可以使用Queue.Declare 命令声明 一个队列或者使用Basic.Consume订阅消费一个队列中的消息。 - Session Layer: 位于中间层,主要负责将客户端的命令发送给服务器,再将服务端的应答返回给客户端,主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。
- Transport Layer: 位于最底层,主要传输二进制数据流 ,提供帧的处理、信道复用、错误检测和数据表示等。
AMQP生产者流转过程
让我们回顾一下生产者端的代码片段:
//创建连接
Connection connection = factory.newConnection() ;
//创建信道
Channel channel = connection.createChannel() ;
//消息内容
String message = "Hello RabbitMQ";
//发送消息
channel.basicPublish(EXCHANGE NAME, ROUTING KEY,MessageProperties.PERSISTENT_TEXT_PLAIN ,message.getBytes());
//关闭资源
channel.close() ;
connection .close();
当客户端与Broker建立连接的时候会调用factory.newConnection()方法,这个方法会进一步封装成AMQP协议的报文头发送给Broker,紧接着Broker返回Connection.Start来建立连接,在连接的过程中涉及6 个命令的交互:
- Connection.Start
- Connection.Start-OK
- Connection.Tune
- Connection.Tune-Ok
- Connection.Open
- Connection.Open-Ok
当客户端调用 connection .createChannel 方法准备开启信道的时候,其包装Channel.Open命令发送给 Broker,等待Channel.Open-Ok命令。
当客户端发送消息的时候,需要调用channel.basicPublish 方法,对应的AQMP命令为Basic.Publish,注意这个命令和前面涉及的命令略有不同,这个命令还包含了Content Header和Content Body。ContentHeader里面包含的是消息体的属性,例如投递模式 、优先级等,而 ContentBody包含消息体本身。
当客户端发送完消息需要关闭资源时,涉及以下命令的交互:
- Channel.Close
- Channel.Close-Ok
- Connection.Close
- Connection.Close-Ok
AMQP消费者流转过程
让我们回顾以下消费者端的代码片段
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Address[] addresses = new Address[]{
new Address(IP_ADDRESS, PORT)
};
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(PASS_WORD);
connectionFactory.setVirtualHost(VIRTUAL_HOST);
//这里的创建连接连接方式与生产者的demo略有不同,注意辨别区别
Connection connection = connectionFactory.newConnection(addresses);
Channel channel = connection.createChannel();
//设置客户端最多接收未被ack的消息的个数
channel.basicQos(64);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, consumer);
//等待回调函数执行完毕之后 , 关闭资源
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
}
消费者端和生产者端一样,也需要与Broker建立连接,创建信道,如果在消费之前设置调用了channel.basicQos(int prefetchCount)函数来设置客户端最大能保持的未确认的消息数那么协议流转还会设计Basic.Qos与Basic.Qos-Ok这两个AMQP命令。
在真正消费之前,消费者端需要向Broker发送Basic.Consume命令,也就是调用channel.basicConsume(...)方法,将信道设置为接收模式,之后Broker回执Basic.Consume-Ok以告诉消费者客户端准备好消费消息。紧接着Broker向消费者客户端推送 (Push) 消息,即Basic.Deliver命令。
消费者接收到消息并正确消费之后,向Broker发送确认,即Basic.Ack命令,也就是调用channel.basicAck(...)方法,Broker接到消费者的消费确认后认为该消息已被消费者成功消费,将消息从队列中移除。
几个重要的方法
- 连接RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(USERNAME);
factory.setPassword(P ASSWORD);
factory.setVirtualHost(virtualHost) ;
factory.setHost(IP ADDRESS);
factory.setPort(PORT) ;
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
Connection可以用来创建多个Channel实例,在创建之后,Channel可以用来发送或者接收消息了。但是Channel实例不能在线程问共享, 应用程序应该为每一个线程开辟一个Channel。某些情况下Channel的操作可以并发运行,但是在其他情况下会导致在网络上出现错误的通信帧交错,同时也会影响友送方确认(publisher confrrm)机制的运行,所以多线程问共享Channel 实例是非线程安全的。
-
使用交换器和队列
交换器和队列是AMQP中 highlevel层面的构建模块,应用程序需确保在使用它们的时候就已经存在了,在使用之前需要先声明(declare) 它们:
channel.exchangeDeclare(exchangeName, "direct" , true) ;
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
上面创建了一个持久化的、非自动删除的、绑定类型为direct的交换器,同时也创建了一个非持久化的、排他的、自动删除的队列(此队列的名称由RabbitMQ自动生成)。这里的交换器和队列也都没有设置特殊的参数。
上面的代码也展示了如何使用路由键将队列和交换器绑定起来。上面声明的队列具备如下特性 : 只对当前应用中同一个Connection 层面可用,同一个Connection的不同Channel可共用,并且也会在应用连接断开时自动删除。
如果要在应用中共享一个队列,可以做如下声明:
channel.exchangeDeclare (exchangeName, "direct" , true) ;
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey) ;
这里的队列被声明为持久化的 、非排他的 、非自动删除的,而且也被分配另 一个确定的己知的名称(由客户端分配而非RabbitMQ自动生成)。
生产者和消费者都可以声明一个交换器或者队列。如果尝试声明一个已经存在的交换器或者队列,只要声明的参数完全匹配现存的交换器或者队列, RabbitMQ就可以什么都不做并成功返回。如果声明的参数不匹配则会抛出异常。
注意:RabbitMQ提供的默认交换器为direct类型,且隐式绑定到每一个队列,路由键等于队列名称。
-
声明交换器:exchangeDeclare方法详解
上文说到exchangeDeclare用来声明一个交换器,它有多个重载方法,而这些重载方法都是由下面这个方法中缺省的某些参数构成的:
Exchange.DeclareOk exchangeDeclare(String exchange,
String type ,
boolean durable ,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
各个参数详细说明如下:
- exchange:交换器的名称
- type:交换器的类型(fanout、direct、topic等)
- durable:是否持久化,如果设置为true则表示持久化,会将交换器存盘,在服务器重启的时候不会丢失向光信息。
- autoDelete:设置是否自动删除,autoDelete设置为true则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。注意不能错误地把这个参数理解为 : 当与此交换器连接的客户端都断开时,RabbitMQ会自动删除本交换器。
- internal:设置是否是内置的。如果设置为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
- argument:其他一些结构化参数,比如alternate-exchange
exchangeDeclareNoWait方法(了解)
void exchangeDeclare(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
exchangeDeclareNoWait与exchangeDeclare师出同门,只不过exchangeDeclareNoWait是没有返回值的,意思是不需要服务器返回,而exchangeDeclare返回Exchange.DeclareOk,意思是在客户端声明了一个交换器之后,需要等待服务器的返回(服务器会返回 Exchange.Declare-Ok这个AMQP命令)。
针对 "exchangeDeclareNoWait不需要服务器任何返回值"这一点 ,考虑这样一种情况 :在声明完一个交换器之后(实际服务器还并未完成交换器的创建) ,那么此时客户端紧接着使用这个交换器,必然会发生异常。如果没有特殊的缘由和应用场景,并不建议使用这个方法。
这里还有师出同门的另一个方法 exchangeDeclarePassive,这个方法的定义如下 : Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
这个方法在实际应用过程中还是非常有用的,它主要用来检测相应的交换器是否存在。如果
存在则正常返回:如果不存在则抛出异常 : 404 channel exception,同时 Channel 也会被关闭。
有声明创建交换器的方法,当然也有删除交换器的方法。相应的方法如下 :
Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
void exchangeDeleteNoWait(String exchange , boolean ifUnused) throwsIOException;
Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throwsIOException;
其中 exchange表示交换器的名称,而ifUnused 用来设置是否在交换器没有被使用的情况下删除 。如果isUnused设置为true,则只有在此交换器没有被使用的情况下才会被删除,如果设置 false,则无论如何这个交换器都要被删除。
-
声明队列:queueDeclare方法详解
queueDeclare方法用来声明一个队列,只有2个重载的方法:
Queue.DeclareOk queueDec1are() throws IOException;
Queue.DeclareOk queueDeclare (String queue,
boolean durable,
boolean exclusive ,
boolean autoDelete,
Map<String, Object> arguments) throws IOException;
不带任何参数的queueDeclare方法默认创建一个由RabbitMQ命名的(类似这种 amq.gen-LhQzlgv3GhDOv8PIDabOXA名称,这种队列也称之为匿名队列)、排他的、自动删除的、非持久化的队列。
各个参数详细说明如下:
- queue: 队列的名称。
- durable: 设置是否持久化。为true则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。
- exclusive: 设置是否排他。为true则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。
这里需要注意三点:
1⃣️排他队列是基于连接( Connection) 可见的,同一个连接的不同信道 (Channel) 是可以同时访问同一连接创建的排他队列;
2⃣️"首次"是指如果一个连接己经声明了一个排他队列,其他连接是不允许建立同名的排他队列的。
3⃣️即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。 - autoDelete:设置是否自动删除。为true则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时才会自动删除。不能把这个参数错误地理解为: 当连接到此队列的所有客户端断开时这个队列自动删除,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
- argurnents: 设置队列的其他一些参数,如x-rnessage-ttl、x-expires、
x -rnax-length、x-rnax-length-bytes、x-dead-letter-exchange、x-dead letter-routing-key, x-rnax-priority等。
生产者和消费者都能够使用queueDeclare来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道直为"传输" 模式,之后才能声明队列。
类似exchangeDeclareNoWait方法,这里也有一个 queueDeclareNoWait 方法:
void queueDeclareNoWait(String queue,
boolean durable,
boolean exclusive,
boolean autoDelete,
Map<String, Object> arguments) throws IOException
方法的返回值也是void,表示不需要服务端的任何返回 。同样也需要注意,在调用完queueDeclareNoWait 方法之后,紧接着使用声明的队列时有可能会发生异常情况。
同样这里还有一个queueDeclarePassive的方法,也比较常用。这个方法用来检测相应的队列是否存在。 如果存在则正常返回 ,如果不存在则抛出异常: 404 channel exception,同时Channel也会被关闭。方法定义如下 :
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
队列也有删除的相应方法 :
Queue.DeleteOk queueDelete(String queue) throws IOException;
Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty)throws IOException;
void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty)throws IOException ;
其中queue表示队列的名称,ifUnused 可以参考上一小节的交换器。 ifEmpty 设置为true表示在队列为空(队列里面没有任何消息堆积)的情况下才能够删除。
与队列相关的还有一个有意思的方法:queuepurge,区别于queueDelete,这个方 法用来清空队列中的内容,而不删除队列本身,具体定义如下:
Queue.PurgeOk queuePurge(String queue) throws IOException;
-
队列绑定:queueBind方法详解
queueBind方法用来将队列和交换器进行绑定
Queue.BindOk queueBind(String queue,
String exchange,
String routingKey)throws IOException;
Queue.BindOk queueBind(String queue,
String exchange,
String routingKey,
Map<String, Object> arguments) throws IOException;
void queueBindNoWait(String queue,
String exchange,
String routingKey,
Map<String, Object> arguments) throws IOException;
参数信息:
- queue:队列名称
- exchange:交换器名称
- routingKey:用来绑定队列和交换器的路由键
- argument:定义绑定的一些参数
不仅可以将队列和交换器绑定起来,也可以将已经被绑定的队列和交换器进行解绑。具体 方法可以参考如下:
Queue.UnbindOk queueUnbind (String queue,
String exchange,
String routingKey) throws IOException;
Queue.UnbindOk queueUnbind (String queue,
String exchange,
String routingKey,
Map<String, Object> arguments) throws IOException;
-
交换器绑定:exchangeBind方法详解
我们不仅可以将交换器与队列绑定,也可以将交换器与交换器绑定
Exchange.BindOk exchangeBind(String destination ,
String source ,
String routingKey) throws IOException;
Exchange.BindOk exchangeBind(String destination,
String source,
String routingKey,
Map<String, Object> arguments) throws IOException;
void exchangeBindNoWait(String destination,
String source,
String routingKey,
Map<String, Object> arguments) throws IOException
绑定之后 ,消息从source交换器转发到destination交换器,某种程度上来说 destination交换器可以看作一个队列,示例代码:
channel.exchangeDeclare("source", "direct", false, true, null) ;
channel.exchangeDeclare("destination", "fanout", false, true, null);
channel.exchangeBind( "destination " , "source " , "exKey");
channel.queueDeclare("queue", false, false, true, null);
channel.queueBind("queue", "destination", "");
channel.basicPublish( "source", "exKey", null , "exToExDemo".getBytes()) ;
生产者发送消息至交换器source中,交换器source根据路由键找到与其匹配的另一个交换器destination,并把消息转发到 destination中, 进而存储在destination绑定的队列queue中
-
发送消息:basicPublish方法详解
Channel类的basicPublish方法用来发送一条消息,例如:
byte[] messageBodyBytes = "Hello, RabbitMQ".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
basicPublish有一些重载方法:
void basicPublish(String exchange,
String routingKey,
BasicProperties props,
byte[] body) throws IOException;
void basicPublish(String exchange,
String routingKey,
boolean mandatory,
BasicProperties props,
byte[] body) throws IOException;
void basicPublish(String exchange,
String routingKey,
boolean mandatory,
boolean immediate,
BasicProperties props,
byte[] body) throws IOException;
参数解释:
- exchange: 交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到 RabbitMQ 默认的交换器中(RabbitMQ的默认交换器为direct类型且隐式绑定到每一个队列,路由键等于队列名称)。
- routingKey: 路由键,交换器根据路由键将消息存储到相应的队列之中。
- props: 消息的基本属性集,其包含 14 个属性成员:
contentType
contentEncoding
headers (Map<String , Object>)
deliveryMode
priority
correlationld
replyTo
expiration
messageld
timestamp
type
userld
appld
clusterld - byte[] body: 消息体(payload),真正需要发送的消息 。
- mandatory:当 mandatory参数设为true时,如果交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return 命令将消息返回给生产者。当mandatory参数设置为false时,出现上述情形,则消息直接被丢弃 。
-
消费消息详解
RabbitMQ的消费模式分两种:推(Push)模式和拉(Pull)模式。推模式采用 Basic.Consume进行消费,而拉模式则是调用Basic.Get进行消费。
推模式
在推模式中可以通过持续订阅的方式来消费消息,使用到的相关类:
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
接收消息一般通过实现Consumer接口或者继承DefaultConsumer类来实现,当调用与Consumer相关的API方法时,不同的订阅采用不同的消费者标签(consumerTag)来区分彼此,在同一个Channel中的消费者也需要通过唯一的消费者标签来区分,关键代码如下:
boolean autoAck = false ;
channel .basicQos(64);
channel .basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException
String routingKey = envelope .getRoutingKey( );
String contentType = properties.getContentType() ;
long deliveryTag = envelope.getDeliveryTag() ;
// (process the message components here . .. )
channel .basicAck(deliveryTag, false );
}
}) ;
Channel类中basicConsume方法有如下几种形式:
String basicConsume(String queue,
Consumer callback) throws IOException;
String basicConsume(String queue,
boolean autoAck,
Consumer callback) throws IOException;
String bas工cConsume(String queue,
boolean autoAck,
Map<String, Object> arguments ,
Consumer callback) throws IOException;
String basicConsume(String queue,
bool ean autoAck,
String consumerTag,
Consumer callback) throws IOException;
String basicConsume(String queue,
boolean autoAck,
String consumerTag,
boolean noLocal,
boolean exclusive,
Map<String, Object> arguments,
Consumer callback) throws IOException ;
参数说明:
- queue : 队列的名称
- autoAck: 设置是否自动确认。建议设成 false,即不自动确认
- consumerTag: 消费者标签,用来区分多个消费者
- noLocal: 设置为true则表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者
- exclusive: 设置是否排他
- arguments : 设置消费者的其他参数
- callback: 设置消费者的回调函数。用来处理RabbitMQ推送过来的消息,比如
DefaultConsumer,使用时需要客户端重写 (override) 其中的方法。
对于消费者客户端来说重写handleDelivery方法是十分方便的。更复杂的消费者客户端会重写更多的方法,具体如下 :
void handleConsumeOk(String consumerTag) ;
void handleCancelOk(String consumerTag);
void handleCancel(String consumerTag) throws IOException;
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
void handleRecoverOk(String consumerTag);
比如handleShutdownSignal方法,当Channel或者Connection关闭的时候会调用。再者,handleConsumeOk方法会在其他方法之前调用,返回消费者标签。
重写handleCancelOk和handleCancel方法,这样消费端可以在显式地或者隐式地取消订阅的时候调用。也可以通过channel.basicCancel 方法来显式地取消 一个消费者的订阅:
channel.basicCancel(consumerTag) ;
注意上面这行代码会首先触发 handleConsumerOk方法,之后触handleDelivery方法,最后才触发handleCancelOk方法。
和生产者一样,消费者客户端同样需要考虑线程安全的问题。消费者客户端的这些callback会被分配到与Channel不同的线程池上 , 这意味着消费者客户端可以安全地调用这些阻塞方法,比如channel.queueDeclare、 channel.basicCancel 等。
每个 Channel 都拥有自己独立的线程。最常用的做法是一个 Channel 对应一个消费者, 也就是意味着消费者彼此之间没有任何关联。当然也可以在一个Channel中维持多个消费者,
但是要注意一个问题,如果Channel中的一个消费者一直在运行,那么其他消费者的callback 会被"耽搁"
拉模式
通过channel.basicGet方法可以单条地获取消息,其返回值是GetResponeo Channel类的basicGet 方法没有其他重载方法,只有 :
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
其中 queue 代表队列的名称,如果设置 autoAck 为 false, 那么同样需要调用
channel.basicAck来确认消息己被成功接收。
注意:Basic.Consume将信道 (Channel) 置为接收模式,直到取消队列的订阅为止。在接收模式期间,RabbitMQ会不断地推送消息给消费者,当然推送消息的个数还是会受到Basic.Qos的限制.如果只想从队列获得单条消息而不是持续订阅,建议还是使用Basic.Get进行消费.但是不能将 Basic.Get放在一个循环里来代替 Basic.Consume,这样做会严重影响RabbitMQ的性能,如果要实现高吞吐量,消费者理应使用Basic.Consume方法。