注:这是RabbitMQ-java版Client的指导教程翻译系列文章,欢迎大家批评指正
第一篇Hello Word了解RabbitMQ的基本用法
第二篇Work Queues介绍队列的使用
第三篇Publish/Subscribe介绍转换器以及其中fanout类型
第四篇Routing介绍direct类型转换器
第五篇Topics介绍topic类型转换器
第六篇RPC介绍远程调用
远程过程调用(Remote procedure call )
在第二篇指导教程中,我们学会在多个消费者之间使用工作队列循环分发任务。但是如果我们需要在远程电脑上运行一个程序并且需要返回结果呢。这就是另一个不同的事情,这个模型就是闻名的远程过程调用,简称RPC。
在这篇指导教程中,我们将会使用RabbitMQ去创建一个RPC系统:一个客户端和一个可测试的远程服务。因为我们没有按时间消费的任务去分配,所以将会创建一个仿制的RPC服务,用于返回斐波那契数列。
客户端接口(Client interface)
为了解释RPC服务是如何使用的,我们将创建一个简单的客户端类,它将会暴露出一个call的方法,用于发送RPC请求,并且阻塞住直到结果被返回:
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);
RPC注意
尽管RPC在计算中是一个很常见的模式,但是它仍有很多不足的地点。问题产生的原因程序员没有意识到这个功能正在本地调用,还是服务端反应慢。对这样不可预料的系统结果就会有困惑,就会去增加一些不必要的复杂的调试。滥用RPC可能会导致不可维护,难于理解的代码,而不是简化软件。
请注意,考虑下面的一些建议:
功能显而易见在本地调用还是远程调用。
用文档记录你的系统,确保各组件之间依赖清晰。
处理错误情况。当RPC服务端好久没有反应,客户端如何响应?
处于困境的时候避免使用RPC。如果可以使用,你应该使用异步请求方式,而不是RPC阻塞的方式。异步返回结果会被推送到另一个计算阶段。
返回队列(Callback queue)
一般来说使用RabbitMQ来进行RPC是简单的,一个客户端发出请求消息和一个服务端返回相应消息。为了能够接受到响应,我们需要在请求中发送一个callback队列的地址,我们使用默认的队列(客户端唯一的队列),试试:
callbackQueueName = channel.queueDeclare().getQueue();
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
channel.basicPublish("", "rpc_queue", props, message.getBytes());
// ... then code to read a response message from the callback_queue ...
消息属性
AMQP 0-9-1协议预先定义了14个属性,大部分的属性很少使用,下面有一些说明:
deliveryMode:标记这个消息可以持久化(值为2),或者短暂保留(值为其它的),在第二章里面有提到过这个属性
contentType:用于描述文本的类型。例如经常使用JSON的编码方式,这是经常设置的属性:application/json
replyTo:经常用于存储返回队列的名字
correlationId:对请求的RPC相应是有用的
我们使用一个新的引用:
import com.rabbitmq.client.AMQP.BasicProperties;
相关联的Id(Correlation Id)
按照目前上述的方法,我们需要为每一个RPC请求都创建一个callBack队列,显然不够高效。幸运的是这里有一种更好的方式,一个客户端我们只需要创建一个callback队列。
这会导致一个新的问题,在队列中接收的响应不知道是哪一个请求的,这就需要使用到correlationId属性,我们为每一个请求都设置唯一correlationId的值,基于这个值我们就能都找到匹配请求的相应。如果我们有一个不匹配correlationId的值,或许可以删除这条消息,因为它不属于我们的请求。
你可能会问:在返回队列中我们为什么应该忽略掉不知道的消息,而不是当做一个错误?在服务端有一种可能,在发送我们一条消息的之后,RPC服务死了,但是反馈信息已经发出去了。如果发生这种情况,重新启动的RPC服务端将会再次处理这个消息,这就是为什么我们必须在客户端处理这个多余的相应。对于RPC也是这样理解的(这一段乱七八糟)。
总结
我们RPC就像上图这样工作:
当客户端创建,它创建一个异步唯一的callback队列
对于一个RPC请求来说,客户端发送带有两个属相的消息:replyTo,被设置callback队列的名称;correlationId被设置为唯一请求的值。
这个请求被发送到rpc-queue队列
在队列中RPC工作者等待着请求,当请求来的时候,它处理工作,把带有反馈队列的消息发送回客户端,使用replyTo中的返回队列。
客户端在callback队列中等待返回数据,当消息来得时候,它会检查correlationId的属性,如果它匹配请求中的correlationId的值,就会返回相应给应用。
综合
斐波那契数列任务:
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
我们声明了斐波那契数列的方法,它表明只有一个有效积极的数输入(不要期望去处理很多的数字,实现起来会很慢很慢的)
下面是RPCServer.javade daima ,这里下载:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] argv) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = null;
try {
connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
String response = "";
try {
String message = new String(body,"UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e){
System.out.println(" [.] " + e.toString());
} finally {
channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
//...
}}}
服务端代码是非常直观明了
跟以往一样,先建立连接,创建通道,声明队列。我们可能想要运行多个服务进程,为了创建更多的服务者,我们需要在channel.basicQos方法中设置prefetchCount的值。
我们使用bacisConsume去连接队列,队列中我们提供一个一个返回表单的对象,用于工作并且返回相应。
下面是RPCClient.java的源代码,这里下载:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
}
public String call(String message) throws IOException, InterruptedException {
String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
final BlockingQueue response = new ArrayBlockingQueue(1);
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
response.offer(new String(body, "UTF-8"));
}
}
});
return response.take();
}
public void close() throws IOException {
connection.close();
}
//...
}
客户端代码有一些调用:
我们创建连接,通道,声明了唯一作为响应的返回队列,我们订阅了callback的队列,这样我们就可以接受到RPC的响应。
我们的call方法将会调用RPC的请求。
这里,我们第一次定义了唯一的correlationId值并且保存它。在DefaultConsumer中实现的handleDelivery的方法将会使用该值去和获取到响应的correlationId比较。
下一步,我们发布一个请求的消息,并且带有两个属性:replyTo和correlationId.
这个时候,我们可以停下来,等待合适的反馈。
消费者开启另一个线程中处理消息,在响应前我们应该先搁置主线程。使用BlockingQueue来处理,这就是我们创建只有一个容器ArrayblockingQueue,正如我们只需要去等待一个响应。
这个handleDelivery方法在做一些简单的工作,对于每一个响应消息它都会检查correlationId是否是我们想要的那个,然后把结果发送到blockQueue中。
同时主线程将会等待从BloakingQueue中取出消息。
最后我们返回结果给使用者。
客户端发送请求:
RPCClient fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(30)");
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");
fibonacciRpc.close();
现在希望好好看一下我们例子中的源代码。
跟以前的指导文件中一样编译:
javac -cp $CP RPCClient.java RPCServer.java
我们RPC服务已经准备好了,现在开启服务:
java -cp $CP RPCServer
# => [x] Awaiting RPC requests
运行想要获取斐波那契数列的客户端:
java -cp $CP RPCClient
# => [x] Requesting fib(30)
目前的设计不仅仅只是实现RPC服务的接口,它还有一些其它的重要优势:如果RPC服务太慢,你可以按比例增加运行一个RPC,在一个新的控制台运行第二个RPC服务。
在客户端,RPC只能需要发送和接收一条消息,同步请求像queueDeclare是必须的,因此RPC客户端的结果需要连接网络才能获取到一个简单的RPC请求。(乱七八糟)
我们的代码依然是非常的简单,并没有解决很复杂的问题,像:
- 如果没有运行服务端,客户端如何响应?
- 客户端应该对RPC设置超时操作么?
- 如果服务端发生故障了并且爆出了异常,应该把异常发送给客户端么?
- 在进行处理之前,保护无效的消息么(检查绑定和类型)?
第六节的内容大致翻译完了,这里是原文链接。
终篇是我对RabbitMQ使用理解的总结文章,欢迎讨教。
--谢谢--