如何调用他人的远程服务?
由于各服务部署在不同机器,服务间的调用免不了网络通信过程,服务消费方每调用一个服务都要写一坨网络通信相关的代码,不仅复杂而且极易出错。要让网络通信细节对使用者透明,我们需要对通信细节进行封装,我们先看下一个RPC调用的流程涉及到哪些通信细节:
- 服务消费方(client)调用以本地调用方式调用服务;
- client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;
- client stub找到服务地址,并将消息发送到服务端;
- server stub收到消息后进行解码;
- server stub根据解码结果调用本地的服务;
- 本地服务执行并将结果返回给server stub;
- server stub将返回结果打包成消息并发送至消费方;
- client stub接收到消息,并进行解码;
- 服务消费方得到最终结果。
RPC的目标就是要2~8这些步骤都封装起来,让用户对这些细节透明。
1. 怎么做到透明化远程服务调用?
怎么封装通信细节才能让用户像以本地调用方式调用远程服务呢?对java来说就是使用代理!java代理有两种方式:1) jdk 动态代理;2)字节码生成。尽管字节码生成方式实现的代理更为强大和高效,但代码维护不易,大部分公司实现RPC框架时还是选择动态代理方式。我们这个最简易版的自然也是采用动态代理的方式。
2. 怎么对消息进行编码和解码?
2.1 确定消息数据结构
- 接口名称:在我们的例子里接口名是“HelloWorldService”,如果不传,服务端就不知道调用哪个接口了;
- 方法名:一个接口内可能有很多方法,如果不传方法名服务端也就不知道调用哪个方法;
- 参数类型&参数值参数类型有很多,比如有bool、int、long、double、string、map、list,甚至如struct(class)以及相应的参数值;超时时间
2.2 序列化
从RPC的角度上看,主要看三点:1)通用性,比如是否能支持Map等复杂的数据结构;2)性能,包括时间复杂度和空间复杂度,由于RPC框架将会被公司几乎所有服务使用,如果序列化上能节约一点时间,对整个公司的收益都将非常可观,同理如果序列化上能节约一点内存,网络带宽也能省下不少;3)可扩展性,对互联网公司而言,业务变化飞快,如果序列化协议具有良好的可扩展性,支持自动增加新的业务字段,而不影响老的服务,这将大大提供系统的灵活度。我们的是最简易版所以就采用了jdk序列化的方式来处理。
开始撸代码
-
初始化工程
首先创建2个项目分别是server和client;server项目下两个模块分别是rpc-server-api和rpc-server-provider。
为什么server项目要创建两个模块?
client在调用服务端的服务时需要知道服务端的一些信息,client可以依赖于这个模块。我们的项目中SDK和契约包就是提供了这个功能。而真正的实现是放在rpc-server-provider中。
- rpc-server-api
public interface IHelloService {
String sayHello(String content);
String saveUser(User user);
}
请求参数类
private String className;
private String methodName;
private Object[] parameters;
-
rpc-server-provider
首先rpc-server-provider是依赖rpc-server-api的。我们写一个实现类,来实现api中定义的接口。
public class HelloServiceImpl implements IHelloService{ @Override public String sayHello(String content) { System.out.println("request in sayHello:"+content); return "Say Hello:"+content; } }
我这么写好了实现远程要怎么才能调用的到呢?我们还需把服务暴露出去,那就需要一个服务暴露的方法。这里就是不断去接受请求,每一个socket交给一个processorHandler来处理。
public class RpcProxyServer {
ExecutorService executorService = Executors.newCachedThreadPool();
public void publisher(Object service, int port) {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
while (true) {//不断接受请求
Socket socket = serverSocket.accept();//BIO
//每一个socket 交给一个processorHandler来处理
executorService.execute(new ProcessorHandler(socket, service));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (serverSocket != null) {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
具体processorHandler的代码,从socket中获取请求对象,并是使用请求对象调用本服务方法,同时返回方法执行结果,将返回结果写入socket中。
public class ProcessorHandler implements Runnable {
private Socket socket;
private Object service;
public ProcessorHandler(Socket socket, Object service) {
this.socket = socket;
this.service = service;
}
@Override
public void run() {
try (InputStream inputStream = socket.getInputStream();
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
//输入流中应该有什么东西?
//请求哪个类,方法名称、参数
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Object result = invoke(rpcRequest); //反射调用本地服务
objectOutputStream.writeObject(result);
objectOutputStream.flush();
} catch (IOException | ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
}
private Object invoke(RpcRequest request) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
//反射调用
Object[] args = request.getParameters(); //拿到客户端请求的参数
Class<?>[] types = new Class[args.length]; //获得每个参数的类型
for (int i = 0; i < args.length; i++) {
types[i] = args[i].getClass();
}
Class clazz = Class.forName(request.getClassName()); //跟去请求的类进行加载
Method method = clazz.getMethod(request.getMethodName(), types); //sayHello, saveUser找到这个类中的方法
return method.invoke(service, args);
}
}
大功告成,把服务发布出去。
/**
* Hello world!
*
*/
public class App {
public static void main( String[] args ){
IHelloService helloService=new HelloServiceImpl();
RpcProxyServer proxyServer=new RpcProxyServer();
// 发布到8080端口
proxyServer.publisher(helloService,8080);
}
}
-
客户端代码开撸。我们现在在客户端依赖了服务端的api(SDK、契约包)如何才能实现调用远程方法呢?类似于服务端代理类。
public class RpcProxyClient { public <T> T clientProxy(final Class<T> interfaceCls,final String host,final int port){ return (T)Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls},new RemoteInvocationHandler(host,port)); } }
public class RemoteInvocationHandler implements InvocationHandler { private String host; private int port; public RemoteInvocationHandler(String host, int port) { this.host = host; this.port = port; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //请求数据的包装 RpcRequest rpcRequest=new RpcRequest(); rpcRequest.setClassName(method.getDeclaringClass().getName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setParameters(args); //远程通信 RpcNetTransport netTransport=new RpcNetTransport(host,port); Object result=netTransport.send(rpcRequest); return result; } }
处理网络传输的类
public class RpcNetTransport { private String host; private int port; public RpcNetTransport(String host, int port) { this.host = host; this.port = port; } public Object send(RpcRequest request) { Object result = null; try (//建立连接 Socket socket = new Socket(host, port); //网络socket ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream()); ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream())) { outputStream.writeObject(request); //序列化() outputStream.flush(); result = inputStream.readObject(); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); } return result; }
最后使用这个远程调用:
public class App { public static void main(String[] args) { RpcProxyClient rpcProxyClient = new RpcProxyClient(); IHelloService iHelloService = rpcProxyClient.clientProxy(IHelloService.class,"localhost",8080); } }
最后画一张来总结一下整个流程