这篇指南是关于java版RabbitMQ的客户端接口,而不是被分为好几个章节的初级教程。
在编译和运行期,5.x正式版本需要依赖jdk8库,这意味着,Android系统需要等于或大于7.0版本才被支持使用5.x正式版。4.x正式版本支持jdk6,以及Android系统版本小于7.0才可使用。
该库是一个开源项目,支持下面三种许可证:
这意味着用户可以考虑在以上的许可证下声明其它任何的许可证。举例说明,用户选择Apache Public License2.0,可以将该库使用在商业产品中。在GPLv2许可证下代码库被声明,等等。
该库的API文档是被分开的。
这里有一些命令行工具,用于支持运行java版客户端。
客户端API几乎都是以AMQP0-9-1协议说明为模型,增加的抽象是为了简单使用。
Overview-预览
java版RabbitMQ客户端使用com.rabbitmq.client作为顶级包名,核心的类和接口为:
- Channel:通道
- Connection:连接
- ConnectionFactory:连接工厂
- Consumer:消费者
操作协议主要是使用Channel接口,Connection被用来创建channels,记录连接生命周期的事件处理者,并且不需要时关闭连接。ConnectionFactory可以初始化Connections,是可以配置连接参数,像vhost或者username。
Connections and Channels-连接和通道
核心的两个类是Connection和Channel,表示遵守AMQP 0-9-1协议的connection和channel。在使用之前需要分别引入:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
Connecting to a broker-连接一个中间件
下面的代码使用给定的参数连接到一个AMQP协议的中间件(参数:主机IP,端口号等等):
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();
所有的参数和服务端运行在本地需要的参数是一致的。
另外,也可以使用uri的方式进行连接:
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();
所有的参数和服务端运行在本地需要的参数是一致的。
Connection接口可以被用作创建channel:
Channel channel = conn.createChannel();
这个channel可以用来发送和接受消息,就像之后章节描述的那样。
断开连接的话,仅需要关闭channel和connection:
channel.close();
conn.close();
说明:关闭channel被认为是一种好的习惯,但是并不是严格必须的。当底部connection被关闭时,所有的channel将一定会自动结束。
Using Exchange and Queues-使用转换器和队列
客户端应用需要用到exchanges和queues,对于高级别创建的MAQP块,在使用之前,它们一定要被声明。声明任何类型对象仅仅是为了保证它的存在,如果必要的话就创建它。
继续上面的例子,下面的代码声明了一个转换器和一个队列,并且将它们绑定在一起:
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
这将声明下面的对象,通过使用条件的参数定制的对象,目前是只有两个特别指定的参数:
- 一个有持久性,不会自动删除,direct类型的装换器
- 一个临时的,一个客户端独有的,会自动删除,并且自动生成名字的队列
上面的方法被调用后会通过给定的路由Key绑定队列到转换器上。
注明:这是一种特殊的方式去声明一个队列,因为只有一个客户需要使用它,它不需要一个明确的名字,当没有客户端使用它时,它将自动被清除掉。如果有好几个客户端想要共用一个名字明确的队列,下面的代码将会有效:
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
这将会声明:
- 一个持久化,不会自动删除,direct类型的转换器
- 一个持久化,不是独有的,不会自动删除,有明确名字的队列
说明:Channel接口中所有的方法很多都是重载的,这些方便简短的方法使用是非常明智的,如exchangeDeclare,queueDeclare和queueBind。也有一些带有许多参数的较长形式,如有必要的话可以重写这些默认的方法。只要需要就给你足够的权限。
"short form , long form"模型被运用在客户端API的始终。
Publishing messages-发布消息
发布消息到装换器,使用Channel.basicPublish,如下:
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
为了更好的控制,你可以重载变量,指定mandatory标记,或者发送设置好消息属性的消息:
channel.basicPublish(exchangeName, routingKey, mandatory,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);
发送消息的一些属性:分发模式2,1等级和内容类型“text/plain”。你可以创建你自己的消息属性对象,使用Builder类创建你所需要的属性,例如:
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("bob")
.build()),
messageBodyBytes);
使用自定义的头文件发布消息,如下:
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude", 51.5252949);
headers.put("longitude", -0.0905493);
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build()),
messageBodyBytes);
发送有时间期限的消息,如下:
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.expiration("60000")
.build()),
messageBodyBytes);
在这里我们不会阐述所有的情况。
注明:BasicProperties是AMQP类的内部类,自动持有外部类。
如果一个资源驱动的警报生效,那么通道basicpublish的调用最终会被阻塞。
Channels and Concurrency Considerations(Thread safety)-通道和并发的考虑(线程安全)
首要的规则是,避免在多个线程间共享一个Channel对象。应用倾向于每个线程中使用一个Channel,而不是多个线程中使用有一个Channel。
并发的情况有一些操作是安全的,但是有一些是不安全的,将会导致错误的框架,游离于火线上,如两次消息响应等。
并发的发布在一个共享的Channel中,会导致错误的框架,触发一个连接等级协议的错误,然后连接关闭。因为它在应用代码中需要并确实异步的(Channel#basicPublis一定得在严格的区域中调用)。在多个线程中共享Channels需要Publisher Confirms。我们强烈建议避免并发发送到一个共享的Channel中。
在一个线程中消费和在另一个线程中发送使用共同一个Channel是安全的。
服务器分发消息,保证每个Channel按照顺序保护,这个分发的机制使用java.util.concurrent.ExecutorService。每次连接,提供这个自定义的执行者可以被共享,简单的通过ConnectionFactory#setSharedExecutor的setter方法即可创建Connections。
当应答机制被使用时,考虑在什么线程中使用应答机制很重要,从接受线程将会不一样(Consumer#handleDelivery授权分开给不同的线程),在多个参数中设置为true的应答机制将会是不安全,并且回到时两次应答响应的后果,因此会发生一个Channel-level协议的错误,然后关闭Channel。只有一次应答一个消息才是安全的。
Receiving Messages By Subscription("push API")-通过订阅接受消息
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
最有效率的接受消息的方式是通过Consumer接口建立订阅,一有消息将自动被分发出去,而不需要确切的请求。
个人的订阅总是根据他们消费的标签来获取的。一个消费的标签是消费的标识符,它可以是客户端或者服务端生成的。为了让RabbitMQ生成一个广泛的并且独一无二的标签,重写Channel#basicConsume方法,因为它本身不带有一个Consumer标签参数或者传递一个空的字符串当作consumer标签,然后可以使用Channel#basicConsume方法的返回值。消费标签可以用来取消消费者。
不同的消费者对象一定有不同的消费者标签。在连接中有重复的消费者标签是强烈不建议这么做的,因为在自动恢复连接过程中会导致问题并且消费者被监听时会混淆了监听数据。
最简单的方式去实现一个Consumer的子类DefautlConsumer,这个子类对象可以被CasicConsumer调用建立监听。
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
这里,我们特别指出autoAck=false。这是必须的:确保消息分发给了消费者。在handleDelivery方法中可以方面的做大部分事情。正如说明,更多有经验的消费者会重写更多的方法,尤其是当Channels和connection关闭时调用handleShutdownSignal方法;在Consumer的其它回调方法调用之前,验证消费标签handleConsumeOk将会先被调用。
消费者也可以实现handleCancelOk和handleCancel的方法,各自表示明确的取消和含蓄的取消。
你可以含蓄的取消消费者通过Channel.basicCancel:
channel.basicCancel(consumerTag);
传递消费者标签即可。
就好像发布者一样,考虑并发给消费者带来的安全风险是非常重要的。
消费者的回调方法是在一个线程池中调用,和初始化Channel的线程是分开的。这意味着在连接的过程中,消费者可以安全的调用块级的方法,例如Channel#queueDeclare或者Channel#basicCancel。
每个Channel都有它自己分发的线程。对于大多数情况是一个Channel对应一个Consumer,这意味着消费者不会拦截其它的消费者。如果你的一个Channel有多个消费者,应该注意一个长时间运行的Consumer可能会拦截这个Channel上其它消费者的回调方法。
请参考并发的情况(线程安全)并发和相关并发安全的主题章节。
Retrieving Individual Messages("Pull API")-检索自己的消息
为了明确的检索消息, 使用Channel.basicGet方法。该方法返回的值是一个GetResponse对象,从中可以提取出请求头中信息和消息本身:
boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
// No message retrieved.
} else {
AMQP.BasicProperties props = response.getProps();
byte[] body = response.getBody();
long deliveryTag = response.getEnvelope().getDeliveryTag();
...
并且autoAck=false,你必须调用Channel.basicAck方法确认你已经成功收到了消息:
...
channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message
}
Handing unroutable messages-处理没有路由的消息
如果带有强制性标签的消息被发布出去,但是无法被路由,这个中间件将会把消息发回给发送端(通过AMQP.Basic.Return命令)。
为了得到这样返回值的通知,客户端可以实现ReturnListener接口并且调用Channel.addReturnListener方法。如果客户端没有为这个特别的channel配置返回监听,那么相关联的返回消息将默认被丢弃。
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
...
}
});
返回监听将会被调用。比如, 一个客户端发布一条带有强制标识的消息给'direct'类型的装换器,但是该装换器没有绑定一个队列。
Shutdown Protocol-关闭协议
Overview of the AMQP client shutdown-关闭AMQP客户端的预览
AMQP 0-9-1协议下的connection和channel都有相同的方式去管理网络异常,内部异常和明确的本地关闭。
AMQP 0-9-1中connection和channel有下面的生命周期状态:
- open:对象准备使用
- closing:对象已经被通知关闭,有一个关闭的请求对所有底层对象都有效,并且等待它们关闭的完成。
- closed:这个对象已经从所有底层对象中接收到所有关闭完成的通知,然后以关闭自己为结果
这些对象结尾都是关闭状态,不管被关闭的理由,就好像一个应用请求,一个内部客户端库异常,一个远程网络请求或者网路异常。
AMQP中connection和channel对象拥有下面相关关闭的方法:
- addShutdownListener(ShutdownListener listener)和removeShutdownListener(ShutdownListener listener)。为了管理所有的监听,当connection和channel对象转换到关闭状态时,所有的监听都会被清除。注意,添加一个ShutdownListener到一个已经被关闭的对象中将立刻会被清除该监听。
- getCloseReason(),允许调查到底是什么 原因导致对象被关闭
- isOpen(),用于测试对象是否是打开的状态
- close(int closeCode,String closeMessage),明确通知对象需要关闭
一些简单的监听可能会像下面那样:
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;
connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause)
{
...
}
});
information about the circumstances of a shutdown-关闭环境的信息
检索到ShutdownSignalException,它包含所有关闭原因的信息,当然也可以明确的调用getCloseReason()方法,它在ShutdownListener类中的service(ShutdownSignalException cause)方法使用参数cause来调用。
这个ShutdownSignalException类提供了一个分析关闭原因的方法,通过调用isHardError()方法,我们可以获取到是否连接或者通道有错误的信息,通过getResson()方法返回原因的信息。在AMQP方法表格中-不管是AMQP.Channel.Close 或者 AMQP.Connection.Close(如果在库中原因是一些错误是会返回null,例如网络连接失败,这种情况下发生的错误可以通过getCause()获取到错误 ):
public void shutdownCompleted(ShutdownSignalException cause)
{
if (cause.isHardError())
{
Connection conn = (Connection)cause.getReference();
if (!cause.isInitiatedByApplication())
{
Method reason = cause.getReason();
...
}
...
} else {
Channel ch = (Channel)cause.getReference();
...
}
}
Atomicity and use of the isOpen() method-原子和使用isOPen()方法
channel和connection对象在生产环境的代码中是不推荐使用isOpen()方法。因为这个方法返回的值是依赖于关闭原因,下面的代码将会阐述一些可能的情况:
public void brokenMethod(Channel channel)
{
if (channel.isOpen())
{
// The following code depends on the channel being in open state.
// However there is a possibility of the change in the channel state
// between isOpen() and basicQos(1) call
...
channel.basicQos(1);
}
}
相反,我们应该忽略这样的检查,简单的尝试行为 。如果连接的通道关闭了,代码报出了错误。一个ShutdownSignalException将会被抛出表示对象处在一个无效的状态。我们因该通过try-catch到IOException,或者SocketException。当中间件意外的关闭了连接或者报出ShutdownSignalException,中间件会自动清除关闭:
public void validMethod(Channel channel)
{
try {
...
channel.basicQos(1);
} catch (ShutdownSignalException sse) {
// possibly check if channel was closed
// by the time we started action and reasons for
// closing it
...
} catch (IOException ioe) {
// check why connection was closed
...
}
}
接着下一篇文章:java-RabbitMQ指导2