6.RPC#前山翻译

注:这是RabbitMQ-java版Client的指导教程翻译系列文章,欢迎大家批评指正
第一篇Hello Word了解RabbitMQ的基本用法
第二篇Work Queues介绍队列的使用
第三篇Publish/Subscribe介绍转换器以及其中fanout类型
第四篇Routing介绍direct类型转换器
第五篇Topics介绍topic类型转换器
第六篇RPC介绍远程调用

远程过程调用(Remote procedure call )

在第二篇指导教程中,我们学会在多个消费者之间使用工作队列循环分发任务。但是如果我们需要在远程电脑上运行一个程序并且需要返回结果呢。这就是另一个不同的事情,这个模型就是闻名的远程过程调用,简称RPC。

在这篇指导教程中,我们将会使用RabbitMQ去创建一个RPC系统:一个客户端和一个可测试的远程服务。因为我们没有按时间消费的任务去分配,所以将会创建一个仿制的RPC服务,用于返回斐波那契数列。

客户端接口(Client interface)

为了解释RPC服务是如何使用的,我们将创建一个简单的客户端类,它将会暴露出一个call的方法,用于发送RPC请求,并且阻塞住直到结果被返回:

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();

String result = fibonacciRpc.call("4");

System.out.println( "fib(4) is " + result);  

RPC注意

尽管RPC在计算中是一个很常见的模式,但是它仍有很多不足的地点。问题产生的原因程序员没有意识到这个功能正在本地调用,还是服务端反应慢。对这样不可预料的系统结果就会有困惑,就会去增加一些不必要的复杂的调试。滥用RPC可能会导致不可维护,难于理解的代码,而不是简化软件。

请注意,考虑下面的一些建议:

  • 功能显而易见在本地调用还是远程调用。

  • 用文档记录你的系统,确保各组件之间依赖清晰。

  • 处理错误情况。当RPC服务端好久没有反应,客户端如何响应?

处于困境的时候避免使用RPC。如果可以使用,你应该使用异步请求方式,而不是RPC阻塞的方式。异步返回结果会被推送到另一个计算阶段。

返回队列(Callback queue)

一般来说使用RabbitMQ来进行RPC是简单的,一个客户端发出请求消息和一个服务端返回相应消息。为了能够接受到响应,我们需要在请求中发送一个callback队列的地址,我们使用默认的队列(客户端唯一的队列),试试:

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties

.Builder()

.replyTo(callbackQueueName)

.build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...

消息属性

AMQP 0-9-1协议预先定义了14个属性,大部分的属性很少使用,下面有一些说明:

deliveryMode:标记这个消息可以持久化(值为2),或者短暂保留(值为其它的),在第二章里面有提到过这个属性

contentType:用于描述文本的类型。例如经常使用JSON的编码方式,这是经常设置的属性:application/json

replyTo:经常用于存储返回队列的名字

correlationId:对请求的RPC相应是有用的

我们使用一个新的引用:

 import com.rabbitmq.client.AMQP.BasicProperties;

相关联的Id(Correlation Id)

按照目前上述的方法,我们需要为每一个RPC请求都创建一个callBack队列,显然不够高效。幸运的是这里有一种更好的方式,一个客户端我们只需要创建一个callback队列。

这会导致一个新的问题,在队列中接收的响应不知道是哪一个请求的,这就需要使用到correlationId属性,我们为每一个请求都设置唯一correlationId的值,基于这个值我们就能都找到匹配请求的相应。如果我们有一个不匹配correlationId的值,或许可以删除这条消息,因为它不属于我们的请求。

你可能会问:在返回队列中我们为什么应该忽略掉不知道的消息,而不是当做一个错误?在服务端有一种可能,在发送我们一条消息的之后,RPC服务死了,但是反馈信息已经发出去了。如果发生这种情况,重新启动的RPC服务端将会再次处理这个消息,这就是为什么我们必须在客户端处理这个多余的相应。对于RPC也是这样理解的(这一段乱七八糟)。

总结

python-six.png

我们RPC就像上图这样工作:

当客户端创建,它创建一个异步唯一的callback队列

对于一个RPC请求来说,客户端发送带有两个属相的消息:replyTo,被设置callback队列的名称;correlationId被设置为唯一请求的值。

这个请求被发送到rpc-queue队列

在队列中RPC工作者等待着请求,当请求来的时候,它处理工作,把带有反馈队列的消息发送回客户端,使用replyTo中的返回队列。

客户端在callback队列中等待返回数据,当消息来得时候,它会检查correlationId的属性,如果它匹配请求中的correlationId的值,就会返回相应给应用。

综合

斐波那契数列任务:

private static int fib(int n) {

      if (n == 0) return 0;

      if (n == 1) return 1;

      return fib(n-1) + fib(n-2);

    }

我们声明了斐波那契数列的方法,它表明只有一个有效积极的数输入(不要期望去处理很多的数字,实现起来会很慢很慢的)
下面是RPCServer.javade daima ,这里下载

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] argv) {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");

        Connection connection = null;

        try {

            connection = factory.newConnection();

            Channel channel = connection.createChannel();

            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Consumer consumer = new DefaultConsumer(channel) {

                @Override

                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();

                   String response = "";

                    try {

                        String message = new String(body,"UTF-8");

                         int n = Integer.parseInt(message);

                         System.out.println(" [.] fib(" + message + ")");

                         response += fib(n);

                     }  catch (RuntimeException e){

                          System.out.println(" [.] " + e.toString());

                     }  finally {

                             channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));

                            channel.basicAck(envelope.getDeliveryTag(), false);

                 }

              }

         };

         channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

       //...

}}}

服务端代码是非常直观明了
跟以往一样,先建立连接,创建通道,声明队列。我们可能想要运行多个服务进程,为了创建更多的服务者,我们需要在channel.basicQos方法中设置prefetchCount的值。
我们使用bacisConsume去连接队列,队列中我们提供一个一个返回表单的对象,用于工作并且返回相应。
下面是RPCClient.java的源代码,这里下载

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.UUID;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.TimeoutException;

public class RPCClient {

    private Connection connection;

    private Channel channel;

    private String requestQueueName = "rpc_queue";

    private String replyQueueName;

    public RPCClient() throws IOException, TimeoutException {

    ConnectionFactory factory = new ConnectionFactory();

    factory.setHost("localhost");

    connection = factory.newConnection();

    channel = connection.createChannel();

    replyQueueName = channel.queueDeclare().getQueue();

}

    public String call(String message) throws IOException, InterruptedException {

    String corrId = UUID.randomUUID().toString();

    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();

    channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

    final BlockingQueue response = new ArrayBlockingQueue(1);

    channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {

        @Override

        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

        if (properties.getCorrelationId().equals(corrId)) {

             response.offer(new String(body, "UTF-8"));

        }

       }

      });

     return response.take();

    }

  public void close() throws IOException {

   connection.close();

   }

//...

}

客户端代码有一些调用:

我们创建连接,通道,声明了唯一作为响应的返回队列,我们订阅了callback的队列,这样我们就可以接受到RPC的响应。

我们的call方法将会调用RPC的请求。

这里,我们第一次定义了唯一的correlationId值并且保存它。在DefaultConsumer中实现的handleDelivery的方法将会使用该值去和获取到响应的correlationId比较。

下一步,我们发布一个请求的消息,并且带有两个属性:replyTo和correlationId.

这个时候,我们可以停下来,等待合适的反馈。

消费者开启另一个线程中处理消息,在响应前我们应该先搁置主线程。使用BlockingQueue来处理,这就是我们创建只有一个容器ArrayblockingQueue,正如我们只需要去等待一个响应。

这个handleDelivery方法在做一些简单的工作,对于每一个响应消息它都会检查correlationId是否是我们想要的那个,然后把结果发送到blockQueue中。

同时主线程将会等待从BloakingQueue中取出消息。

最后我们返回结果给使用者。
客户端发送请求:

RPCClient fibonacciRpc = new RPCClient();

System.out.println(" [x] Requesting fib(30)");

String response = fibonacciRpc.call("30");

System.out.println(" [.] Got '" + response + "'");

fibonacciRpc.close();

现在希望好好看一下我们例子中的源代码。

跟以前的指导文件中一样编译:

 javac -cp $CP RPCClient.java RPCServer.java

我们RPC服务已经准备好了,现在开启服务:

java -cp $CP RPCServer

# => [x] Awaiting RPC requests

运行想要获取斐波那契数列的客户端:

java -cp $CP RPCClient

# => [x] Requesting fib(30)

目前的设计不仅仅只是实现RPC服务的接口,它还有一些其它的重要优势:如果RPC服务太慢,你可以按比例增加运行一个RPC,在一个新的控制台运行第二个RPC服务。

在客户端,RPC只能需要发送和接收一条消息,同步请求像queueDeclare是必须的,因此RPC客户端的结果需要连接网络才能获取到一个简单的RPC请求。(乱七八糟)

我们的代码依然是非常的简单,并没有解决很复杂的问题,像:

  • 如果没有运行服务端,客户端如何响应?
  • 客户端应该对RPC设置超时操作么?
  • 如果服务端发生故障了并且爆出了异常,应该把异常发送给客户端么?
  • 在进行处理之前,保护无效的消息么(检查绑定和类型)?

第六节的内容大致翻译完了,这里是原文链接

终篇是我对RabbitMQ使用理解的总结文章,欢迎讨教。
--谢谢--

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,598评论 18 139
  • “ 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列...
    落羽成霜丶阅读 3,974评论 1 41
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,343评论 2 34
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,302评论 1 15
  • 亲爱的爱π: 你好啊!未来已经来到了,你实现了当初你曾经有过的梦想吗? 你还记得自己曾经的梦想吗? ...
    美生活阅读 143评论 0 1