java-RabbitMQ指导1#前山翻译

这篇指南是关于java版RabbitMQ的客户端接口,而不是被分为好几个章节的初级教程

在编译和运行期,5.x正式版本需要依赖jdk8库,这意味着,Android系统需要等于或大于7.0版本才被支持使用5.x正式版。4.x正式版本支持jdk6,以及Android系统版本小于7.0才可使用。

该库是一个开源项目,支持下面三种许可证:

这意味着用户可以考虑在以上的许可证下声明其它任何的许可证。举例说明,用户选择Apache Public License2.0,可以将该库使用在商业产品中。在GPLv2许可证下代码库被声明,等等。

该库的API文档是被分开的。

这里有一些命令行工具,用于支持运行java版客户端。

客户端API几乎都是以AMQP0-9-1协议说明为模型,增加的抽象是为了简单使用。

Overview-预览

java版RabbitMQ客户端使用com.rabbitmq.client作为顶级包名,核心的类和接口为:

  • Channel:通道
  • Connection:连接
  • ConnectionFactory:连接工厂
  • Consumer:消费者

操作协议主要是使用Channel接口,Connection被用来创建channels,记录连接生命周期的事件处理者,并且不需要时关闭连接。ConnectionFactory可以初始化Connections,是可以配置连接参数,像vhost或者username。

Connections and Channels-连接和通道

核心的两个类是Connection和Channel,表示遵守AMQP 0-9-1协议的connection和channel。在使用之前需要分别引入:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
    

Connecting to a broker-连接一个中间件

下面的代码使用给定的参数连接到一个AMQP协议的中间件(参数:主机IP,端口号等等):

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();

所有的参数和服务端运行在本地需要的参数是一致的。

另外,也可以使用uri的方式进行连接:

ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@hostName:portNumber/virtualHost");
Connection conn = factory.newConnection();

所有的参数和服务端运行在本地需要的参数是一致的。

Connection接口可以被用作创建channel:

Channel channel = conn.createChannel();

这个channel可以用来发送和接受消息,就像之后章节描述的那样。

断开连接的话,仅需要关闭channel和connection:

channel.close();
conn.close();

说明:关闭channel被认为是一种好的习惯,但是并不是严格必须的。当底部connection被关闭时,所有的channel将一定会自动结束。

Using Exchange and Queues-使用转换器和队列

客户端应用需要用到exchanges和queues,对于高级别创建的MAQP块,在使用之前,它们一定要被声明。声明任何类型对象仅仅是为了保证它的存在,如果必要的话就创建它。

继续上面的例子,下面的代码声明了一个转换器和一个队列,并且将它们绑定在一起:

channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);

这将声明下面的对象,通过使用条件的参数定制的对象,目前是只有两个特别指定的参数:

  • 一个有持久性,不会自动删除,direct类型的装换器
  • 一个临时的,一个客户端独有的,会自动删除,并且自动生成名字的队列

上面的方法被调用后会通过给定的路由Key绑定队列到转换器上。

注明:这是一种特殊的方式去声明一个队列,因为只有一个客户需要使用它,它不需要一个明确的名字,当没有客户端使用它时,它将自动被清除掉。如果有好几个客户端想要共用一个名字明确的队列,下面的代码将会有效:

channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);

这将会声明:

  • 一个持久化,不会自动删除,direct类型的转换器
  • 一个持久化,不是独有的,不会自动删除,有明确名字的队列

说明:Channel接口中所有的方法很多都是重载的,这些方便简短的方法使用是非常明智的,如exchangeDeclare,queueDeclare和queueBind。也有一些带有许多参数的较长形式,如有必要的话可以重写这些默认的方法。只要需要就给你足够的权限。

"short form , long form"模型被运用在客户端API的始终。

Publishing messages-发布消息

发布消息到装换器,使用Channel.basicPublish,如下:

byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

为了更好的控制,你可以重载变量,指定mandatory标记,或者发送设置好消息属性的消息:

channel.basicPublish(exchangeName, routingKey, mandatory,
         MessageProperties.PERSISTENT_TEXT_PLAIN,
         messageBodyBytes);

发送消息的一些属性:分发模式2,1等级和内容类型“text/plain”。你可以创建你自己的消息属性对象,使用Builder类创建你所需要的属性,例如:

channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .contentType("text/plain")
               .deliveryMode(2)
               .priority(1)
               .userId("bob")
               .build()),
               messageBodyBytes);

使用自定义的头文件发布消息,如下:

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude",  51.5252949);
headers.put("longitude", -0.0905493);

channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .headers(headers)
               .build()),
               messageBodyBytes);

发送有时间期限的消息,如下:

channel.basicPublish(exchangeName, routingKey,
             new AMQP.BasicProperties.Builder()
               .expiration("60000")
               .build()),
               messageBodyBytes);

在这里我们不会阐述所有的情况。

注明:BasicProperties是AMQP类的内部类,自动持有外部类。

如果一个资源驱动的警报生效,那么通道basicpublish的调用最终会被阻塞。

Channels and Concurrency Considerations(Thread safety)-通道和并发的考虑(线程安全)

首要的规则是,避免在多个线程间共享一个Channel对象。应用倾向于每个线程中使用一个Channel,而不是多个线程中使用有一个Channel。

并发的情况有一些操作是安全的,但是有一些是不安全的,将会导致错误的框架,游离于火线上,如两次消息响应等。

并发的发布在一个共享的Channel中,会导致错误的框架,触发一个连接等级协议的错误,然后连接关闭。因为它在应用代码中需要并确实异步的(Channel#basicPublis一定得在严格的区域中调用)。在多个线程中共享Channels需要Publisher Confirms。我们强烈建议避免并发发送到一个共享的Channel中。

在一个线程中消费和在另一个线程中发送使用共同一个Channel是安全的。

服务器分发消息,保证每个Channel按照顺序保护,这个分发的机制使用java.util.concurrent.ExecutorService。每次连接,提供这个自定义的执行者可以被共享,简单的通过ConnectionFactory#setSharedExecutor的setter方法即可创建Connections。

当应答机制被使用时,考虑在什么线程中使用应答机制很重要,从接受线程将会不一样(Consumer#handleDelivery授权分开给不同的线程),在多个参数中设置为true的应答机制将会是不安全,并且回到时两次应答响应的后果,因此会发生一个Channel-level协议的错误,然后关闭Channel。只有一次应答一个消息才是安全的。

Receiving Messages By Subscription("push API")-通过订阅接受消息

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

最有效率的接受消息的方式是通过Consumer接口建立订阅,一有消息将自动被分发出去,而不需要确切的请求。

个人的订阅总是根据他们消费的标签来获取的。一个消费的标签是消费的标识符,它可以是客户端或者服务端生成的。为了让RabbitMQ生成一个广泛的并且独一无二的标签,重写Channel#basicConsume方法,因为它本身不带有一个Consumer标签参数或者传递一个空的字符串当作consumer标签,然后可以使用Channel#basicConsume方法的返回值。消费标签可以用来取消消费者。

不同的消费者对象一定有不同的消费者标签。在连接中有重复的消费者标签是强烈不建议这么做的,因为在自动恢复连接过程中会导致问题并且消费者被监听时会混淆了监听数据。

最简单的方式去实现一个Consumer的子类DefautlConsumer,这个子类对象可以被CasicConsumer调用建立监听。

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.getContentType();
             long deliveryTag = envelope.getDeliveryTag();
             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
         }
     });

这里,我们特别指出autoAck=false。这是必须的:确保消息分发给了消费者。在handleDelivery方法中可以方面的做大部分事情。正如说明,更多有经验的消费者会重写更多的方法,尤其是当Channels和connection关闭时调用handleShutdownSignal方法;在Consumer的其它回调方法调用之前,验证消费标签handleConsumeOk将会先被调用。

消费者也可以实现handleCancelOk和handleCancel的方法,各自表示明确的取消和含蓄的取消。

你可以含蓄的取消消费者通过Channel.basicCancel:

channel.basicCancel(consumerTag);

传递消费者标签即可。

就好像发布者一样,考虑并发给消费者带来的安全风险是非常重要的。

消费者的回调方法是在一个线程池中调用,和初始化Channel的线程是分开的。这意味着在连接的过程中,消费者可以安全的调用块级的方法,例如Channel#queueDeclare或者Channel#basicCancel。

每个Channel都有它自己分发的线程。对于大多数情况是一个Channel对应一个Consumer,这意味着消费者不会拦截其它的消费者。如果你的一个Channel有多个消费者,应该注意一个长时间运行的Consumer可能会拦截这个Channel上其它消费者的回调方法。

请参考并发的情况(线程安全)并发和相关并发安全的主题章节。

Retrieving Individual Messages("Pull API")-检索自己的消息

为了明确的检索消息, 使用Channel.basicGet方法。该方法返回的值是一个GetResponse对象,从中可以提取出请求头中信息和消息本身:

boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
    // No message retrieved.
} else {
    AMQP.BasicProperties props = response.getProps();
    byte[] body = response.getBody();
    long deliveryTag = response.getEnvelope().getDeliveryTag();
    ...

并且autoAck=false,你必须调用Channel.basicAck方法确认你已经成功收到了消息:

 ...
    channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message
}

Handing unroutable messages-处理没有路由的消息

如果带有强制性标签的消息被发布出去,但是无法被路由,这个中间件将会把消息发回给发送端(通过AMQP.Basic.Return命令)。

为了得到这样返回值的通知,客户端可以实现ReturnListener接口并且调用Channel.addReturnListener方法。如果客户端没有为这个特别的channel配置返回监听,那么相关联的返回消息将默认被丢弃。

channel.addReturnListener(new ReturnListener() {
    public void handleReturn(int replyCode,
                                  String replyText,
                                  String exchange,
                                  String routingKey,
                                  AMQP.BasicProperties properties,
                                  byte[] body)
    throws IOException {
        ...
    }
});

返回监听将会被调用。比如, 一个客户端发布一条带有强制标识的消息给'direct'类型的装换器,但是该装换器没有绑定一个队列。

Shutdown Protocol-关闭协议

Overview of the AMQP client shutdown-关闭AMQP客户端的预览

AMQP 0-9-1协议下的connection和channel都有相同的方式去管理网络异常,内部异常和明确的本地关闭。

AMQP 0-9-1中connection和channel有下面的生命周期状态:

  • open:对象准备使用
  • closing:对象已经被通知关闭,有一个关闭的请求对所有底层对象都有效,并且等待它们关闭的完成。
  • closed:这个对象已经从所有底层对象中接收到所有关闭完成的通知,然后以关闭自己为结果

这些对象结尾都是关闭状态,不管被关闭的理由,就好像一个应用请求,一个内部客户端库异常,一个远程网络请求或者网路异常。

AMQP中connection和channel对象拥有下面相关关闭的方法:

  • addShutdownListener(ShutdownListener listener)和removeShutdownListener(ShutdownListener listener)。为了管理所有的监听,当connection和channel对象转换到关闭状态时,所有的监听都会被清除。注意,添加一个ShutdownListener到一个已经被关闭的对象中将立刻会被清除该监听。
  • getCloseReason(),允许调查到底是什么 原因导致对象被关闭
  • isOpen(),用于测试对象是否是打开的状态
  • close(int closeCode,String closeMessage),明确通知对象需要关闭

一些简单的监听可能会像下面那样:

import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;

connection.addShutdownListener(new ShutdownListener() {
    public void shutdownCompleted(ShutdownSignalException cause)
    {
        ...
    }
});

information about the circumstances of a shutdown-关闭环境的信息

检索到ShutdownSignalException,它包含所有关闭原因的信息,当然也可以明确的调用getCloseReason()方法,它在ShutdownListener类中的service(ShutdownSignalException cause)方法使用参数cause来调用。

这个ShutdownSignalException类提供了一个分析关闭原因的方法,通过调用isHardError()方法,我们可以获取到是否连接或者通道有错误的信息,通过getResson()方法返回原因的信息。在AMQP方法表格中-不管是AMQP.Channel.Close 或者 AMQP.Connection.Close(如果在库中原因是一些错误是会返回null,例如网络连接失败,这种情况下发生的错误可以通过getCause()获取到错误 ):

public void shutdownCompleted(ShutdownSignalException cause)
{
  if (cause.isHardError())
  {
    Connection conn = (Connection)cause.getReference();
    if (!cause.isInitiatedByApplication())
    {
      Method reason = cause.getReason();
      ...
    }
    ...
  } else {
    Channel ch = (Channel)cause.getReference();
    ...
  }
}

Atomicity and use of the isOpen() method-原子和使用isOPen()方法

channel和connection对象在生产环境的代码中是不推荐使用isOpen()方法。因为这个方法返回的值是依赖于关闭原因,下面的代码将会阐述一些可能的情况:

public void brokenMethod(Channel channel)
{
    if (channel.isOpen())
    {
        // The following code depends on the channel being in open state.
        // However there is a possibility of the change in the channel state
        // between isOpen() and basicQos(1) call
        ...
        channel.basicQos(1);
    }
}

相反,我们应该忽略这样的检查,简单的尝试行为 。如果连接的通道关闭了,代码报出了错误。一个ShutdownSignalException将会被抛出表示对象处在一个无效的状态。我们因该通过try-catch到IOException,或者SocketException。当中间件意外的关闭了连接或者报出ShutdownSignalException,中间件会自动清除关闭:

public void validMethod(Channel channel)
{
    try {
        ...
        channel.basicQos(1);
    } catch (ShutdownSignalException sse) {
        // possibly check if channel was closed
        // by the time we started action and reasons for
        // closing it
        ...
    } catch (IOException ioe) {
        // check why connection was closed
        ...
    }
}

接着下一篇文章:java-RabbitMQ指导2

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容

  • 本文章翻译自http://www.rabbitmq.com/api-guide.html,并没有及时更新。 术语对...
    joyenlee阅读 7,632评论 0 3
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,585评论 18 139
  • 接着上一篇文章:java-RabbitMQ指导1 Advanced Connection options-连接的高...
    前山饭店阅读 946评论 0 1
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,343评论 2 34
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,335评论 0 1