带你手写基于 Spring 的可插拔式 RPC 框架(三)通信协议模块

在写代码之前我们先要想清楚几个问题。

  1. 我们的框架到底要实现什么功能?
    我们要实现一个远程调用的 RPC 协议。
  2. 最终实现效果是什么样的?
    我们能像调用本地服务一样调用远程的服务。
  3. 怎样实现上面的效果?
    前面几章已经给大家说了,使用动态代理,在客户端生成接口代理类使用,在代理类的 invoke 方法里面将方法参数等信息组装成 request 发给服务端,服务端需要起一个服务器一直等待接收这种消息,接收之后使用反射调
    用对应接口的实现类。

首先我们需要实现底层的通信的服务端和客户端,可以有一下几种实现:

  1. 基于 Socket 的客户端和服务端(同步阻塞式,不推荐),大家可以当作一个编程练习,整个和系统没有进行整合,纯粹练习使用。
    基于 Socket 的服务端。
    启动一个阻塞式的 socket server,加入一个线程池实现伪异步。

    public class SocketServer {
    
         private static SocketServer INSTANCE = new SocketServer();
     
         private SocketServer(){};
     
         public static SocketServer getInstance() {
             return INSTANCE;
         }
     
         //没有核心线程数量控制的线程池,最大线程数是 Integer 的最大值,多线程实现伪异步
         ExecutorService executorService = Executors.newCachedThreadPool();
     
         /**
          * 发布服务,bio 模型
          * @param service
          * @param port
          */
         public void publiser(int port){
             try (ServerSocket serverSocket  = new ServerSocket(port);)
             {
                 while (true){
                     Socket socket = serverSocket.accept();//接收请求
                     executorService.execute(new SocketHandler(socket));
                 }
             } catch (IOException e) {
                 e.printStackTrace();
             }
         }
     }
    
    

    对应的 hanlder,使用反射调用对应的服务,并通过 sokcet 写回结果。

    public class SocketHandler implements Runnable{
     
         private Socket socket;
     
         public SocketHandler(Socket socket) {
             this.socket = socket;
         }
     
         @Override
         public void run() {
             try (ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
                  ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());)
             {
                 Object o = inputStream.readObject(); //readObject 是 java 反序列化的过程
                 System.out.println(o);
                 Object result = invoke((RpcRequest) o);
                 //写回结果
                 outputStream.writeObject(result);
                 outputStream.flush();
             } catch (IOException e) {
                 e.printStackTrace();
             } catch (ClassNotFoundException e) {
                 e.printStackTrace();
             }
         }
     
         private Object invoke(RpcRequest invocation){
     
             //根据方法名和参数类型在 service 里获取方法
             try {
                 String interFaceName = invocation.getInterfaceName();
                 Class impClass = Class.forName(invocation.getImpl());
     
                 Method method = impClass.getMethod(invocation.getMethodName(),invocation.getParamTypes());
                 String result = (String)method.invoke(impClass.newInstance(),invocation.getParams());
                 return result;
             } catch (NoSuchMethodException e) {
                 e.printStackTrace();
             } catch (IllegalAccessException e) {
                 e.printStackTrace();
             } catch (InvocationTargetException e) {
                 e.printStackTrace();
             } catch (InstantiationException e) {
                 e.printStackTrace();
             } catch (ClassNotFoundException e) {
                 e.printStackTrace();
             }
     
             return null;
         }
     }
    

    在看客户端,拼装参数,发送给 socket 服务端。

    public class SocketClient {
    
         private static SocketClient INSTANCE = new SocketClient();
     
         private SocketClient(){};
     
         public static SocketClient getInstance() {
             return INSTANCE;
         }
     
         private Socket newSocket(String host, Integer port) {
             System.out.println("创建一个新的 socket 连接");
             try {
                 Socket socket = new Socket(host, port);
                 return socket;
             } catch (IOException e) {
                 System.out.println("建立连接失败");
                 e.printStackTrace();
             }
             return null;
         }
     
         public Object sendRequest(String host, Integer port,RpcRequest rpcRequest) {
             Socket socket = newSocket(host,port);
             try (
                 ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
                 ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());)
             {
                 outputStream.writeObject(rpcRequest);
                 outputStream.flush();
     
     
                 Object result = inputStream.readObject();
     
                 inputStream.close();
                 outputStream.close();
                 return result;
     
             } catch (Exception e) {
                 e.printStackTrace();
             }
             return null;
         }
     }
    

    通过上面的代码相信大家已经明白了这个流程了,就是一个客户端与服务端通信的过程,将需要调用的方法的参数传到服务端,服务端通过反射完成调用,最后返回结果给客户端。
    下面正式开始。

  2. 基于 Http 请求的客户端和基于 Tomcat 的服务端。
    基于 Tomcat 的服务端,单例模式,只有一个启动服务的 start 方法,监听到的请求通过 DispatcherServlet 处理。

    public class HttpServer {
    
         private static HttpServer INSTANCE = new HttpServer();
         
         private HttpServer(){}
         
         public static HttpServer getInstance(){
             return INSTANCE;
         }
         
         
         /**
          *
          * servlet 容器,tomcat
          * @param hostname
          * @param port
          */
     
         public void start(String hostname,Integer port){
     
             Tomcat tomcat = new Tomcat();
             Server server = tomcat.getServer();
             Service service = server.findService("Tomcat");
     
             Connector connector = new Connector();
             connector.setPort(port);
     
             Engine engine = new StandardEngine();
             engine.setDefaultHost(hostname);
     
             Host host = new StandardHost();
             host.setName(hostname);
     
             String contextPath = "";
             Context context = new StandardContext();
             context.setPath(contextPath);
             context.addLifecycleListener(new Tomcat.FixContextListener()); //声明周期监听器
     
             host.addChild(context);
             engine.addChild(host);
     
             service.setContainer(engine);
             service.addConnector(connector);
     
             tomcat.addServlet(contextPath,"dispatcher", new DispatcherServlet());
             context.addServletMappingDecoded("/*","dispatcher");
     
             try {
                 tomcat.start();
                 tomcat.getServer().await();
             } catch (LifecycleException e) {
                 e.printStackTrace();
             }
         }
     }
    
    

    下面来看请求分发器 DispatcherServlet 的实现,将请求派发给 HttpServletHandler 实现。

    /**
      * tomcat 是 servlet 容器,写一个 servlet
      *
      */
     public class DispatcherServlet extends HttpServlet {
     
         @Override
         protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
             new HttpServletHandler().handler(req,resp);
         }
     }
    

    HttpServletHandler 的实现其实就是解析 request,通过反射调用最后返回结果。

    public class HttpServletHandler{
    
         public void handler(HttpServletRequest req, HttpServletResponse resp) {
     
             try(InputStream inputStream = req.getInputStream();
                 OutputStream outputStream =resp.getOutputStream();){
                 ObjectInputStream ois = new ObjectInputStream(inputStream);
                 RpcRequest invocation = (RpcRequest) ois.readObject();
     
                 // 从注册中心根据接口,找接口的实现类
                 String interFaceName = invocation.getInterfaceName();
                 Class impClass = Class.forName(invocation.getImpl());
     
     
                 Method method = impClass.getMethod(invocation.getMethodName(),invocation.getParamTypes());
                 Object result = method.invoke(impClass.newInstance(),invocation.getParams());
     
                 RpcResponse rpcResponse = new RpcResponse();
                 rpcResponse.setResponseId(invocation.getRequestId());
                 rpcResponse.setData(result);
                 IOUtils.write(toByteArray(rpcResponse),outputStream);
             }catch (IOException e){
                 e.printStackTrace();
             } catch (ClassNotFoundException e) {
                 e.printStackTrace();
             } catch (NoSuchMethodException e) {
                 e.printStackTrace();
             } catch (IllegalAccessException e) {
                 e.printStackTrace();
             } catch (InvocationTargetException e) {
                 e.printStackTrace();
             } catch (InstantiationException e) {
                 e.printStackTrace();
             }
         }
     
         public byte[] toByteArray (Object obj) {
             byte[] bytes = null;
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
             try {
                 ObjectOutputStream oos = new ObjectOutputStream(bos);
                 oos.writeObject(obj);
                 oos.flush();
                 bytes = bos.toByteArray ();
                 oos.close();
                 bos.close();
             } catch (IOException ex) {
                 ex.printStackTrace();
             }
             return bytes;
         }
     
     }
    

    最后来看客户端的实现,通过 post 方法发送数据,最后解析服务端返回的结果。

    public class HttpClient {
     
         private static HttpClient INSTANCE = new HttpClient();
         
         private HttpClient(){}
         
         public static HttpClient getInstance(){
             return INSTANCE;
         }
     
         public Object post(String hostname, Integer port, RpcRequest invocation){
     
             try{
                 URL url = new URL("http",hostname,port,"/");
                 HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
     
                 httpURLConnection.setRequestMethod("POST");
                 httpURLConnection.setDoOutput(true);
     
                 OutputStream outputStream = httpURLConnection.getOutputStream();
                 ObjectOutputStream oos = new ObjectOutputStream(outputStream);
                 oos.writeObject(invocation);
                 oos.flush();
                 oos.close();
     
                 InputStream inputStream = httpURLConnection.getInputStream();
                 RpcResponse rpcResponse =  (RpcResponse)toObject(IOUtils.toByteArray(inputStream));
                 return rpcResponse.getData();
     
     
             } catch (MalformedURLException e) {
                 e.printStackTrace();
             } catch (IOException e) {
                 e.printStackTrace();
             }
             return null;
     
         }
     
         public Object toObject (byte[] bytes) {
             Object obj = null;
             try {
                 ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
                 ObjectInputStream ois = new ObjectInputStream (bis);
                 obj = ois.readObject();
                 ois.close();
                 bis.close();
             } catch (IOException ex) {
                 ex.printStackTrace();
             } catch (ClassNotFoundException ex) {
                 ex.printStackTrace();
             }
             return obj;
         }
     }
    
    
  3. Netty 模型的客户端和服务端。
    基于 Netty 的服务端,里面的编码器和解码器是我们自己实现的,大家可以先用我注释掉的那部分,等我们写到编码解码器的时候再替换。

    public class NettyServer {
     
         private static NettyServer INSTANCE = new NettyServer();
         
         private static Executor executor = Executors.newCachedThreadPool();
         
         private final static int MESSAGE_LENGTH = 4;
         
         private NettyServer(){};
     
         public static NettyServer getInstance(){
             return INSTANCE;
         }
     
     
         private SerializeType serializeType = SerializeType.queryByType(Configuration.getInstance().getSerialize());
         
         public static void submit(Runnable t){
             executor.execute(t);
         }
         
         public void start(String host, Integer port){
             EventLoopGroup bossGroup = new NioEventLoopGroup(1);
             EventLoopGroup workerGroup = new NioEventLoopGroup();
             
             try{
                 final ServerBootstrap bootstrap = new ServerBootstrap();
                 bootstrap.group(bossGroup,workerGroup)
                 .channel(NioServerSocketChannel.class)
                 .option(ChannelOption.SO_BACKLOG, 128)
                 .childOption(ChannelOption.SO_KEEPALIVE, true)
                 .childHandler(new ChannelInitializer<SocketChannel>(){
     
                     @Override
                     protected void initChannel(SocketChannel arg0) throws Exception {
                         ChannelPipeline pipeline = arg0.pipeline();
                          //ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder
                         //的初始化参数即为super(maxObjectSize, 0, 4, 0, 4); 
     //                  pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, NettyServer.MESSAGE_LENGTH, 0, NettyServer.MESSAGE_LENGTH));
                         //利用LengthFieldPrepender回填补充ObjectDecoder消息报文头
     //                  pipeline.addLast(new LengthFieldPrepender(NettyServer.MESSAGE_LENGTH));
     //                  pipeline.addLast(new ObjectEncoder());
                         //考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可
     //                  pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                         //注册解码器NettyDecoderHandler
                         pipeline.addLast(new NettyDecoderHandler(RpcRequest.class, serializeType));
                         //注册编码器NettyEncoderHandler
                         pipeline.addLast(new NettyEncoderHandler(serializeType));
                         pipeline.addLast("handler", new NettyServerHandler());
                         
                     }
                     
                 });
                 Channel channel = bootstrap.bind(host, port).sync().channel();
                 System.out.println("Server start listen at " + port);
             }catch(Exception e){
                 bossGroup.shutdownGracefully();
                 workerGroup.shutdownGracefully();
             }
         }
         
     
     }
    

    服务端对应的 handler,netty 都是这种 handler 模式,handler 里面也是将这个接收的 request 放入线程池中处理。

        public class NettyServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
     
         private ChannelHandlerContext context;
     
     
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, RpcRequest rpcRequest) throws Exception {
             System.out.println("server channelRead...");
             System.out.println(ctx.channel().remoteAddress() + "->server:" + rpcRequest.toString());
             InvokeTask it = new InvokeTask(rpcRequest,ctx);
             NettyServer.submit(it);
         }
     
     
         @Override
         public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
             this.context = ctx;     
         }
         
     }
    

    给出 InvokeTask 的对应实现。

    public class InvokeTask implements Runnable{
     
         private RpcRequest invocation;
         private ChannelHandlerContext ctx;
     
         public InvokeTask(RpcRequest invocation,ChannelHandlerContext ctx) {
             super();
             this.invocation = invocation;
             this.ctx = ctx;
         }
     
     
         @Override
         public void run() {
             
             // 从注册中心根据接口,找接口的实现类
             String interFaceName = invocation.getInterfaceName();
             Class impClass = null;
             try {
                 impClass = Class.forName(invocation.getImpl());
             } catch (ClassNotFoundException e) {
                 e.printStackTrace();
             }
     
             Method method;
             Object result = null;
             try {
                 method = impClass.getMethod(invocation.getMethodName(),invocation.getParamTypes());
                 //这块考虑实现类,是不是应该在 spring 里面拿
                 result = method.invoke(impClass.newInstance(),invocation.getParams());
             } catch (Exception e) {
                 e.printStackTrace();
             }
             RpcResponse rpcResponse = new RpcResponse();
             rpcResponse.setResponseId(invocation.getRequestId());
             rpcResponse.setData(result);
             ctx.writeAndFlush(rpcResponse).addListener(new ChannelFutureListener() {
                 public void operationComplete(ChannelFuture channelFuture) throws Exception {
                     System.out.println("RPC Server Send message-id respone:" + invocation.getRequestId());
                 }
             });
     
         }
     
     }
    

    再来看客户端,客户端有两种实现,一种是不能复用 handler(可以立即为 connection)的模式,这种模式并发不太高,另一种是能够复用 handler 的 handlerPool 模式。

    不能复用的模式。

    public class NettyClient {
         private static NettyClient INSTANCE = new NettyClient();
     
         private final static int parallel = Runtime.getRuntime().availableProcessors() * 2;
     
         private NettyClient(){};
     
         public static NettyClient getInstance(){
             return INSTANCE;
         }
     
         private SerializeType serializeType = SerializeType.queryByType(Configuration.getInstance().getSerialize());
     
         public void start(String host,Integer port){
     
             Bootstrap bootstrap = new Bootstrap();
             EventLoopGroup group = new NioEventLoopGroup(parallel);
     
             try{
                 bootstrap.group(group)
                         .channel(NioSocketChannel.class)
                         .handler(new ChannelInitializer<SocketChannel>(){
     
                             @Override
                             protected void initChannel(SocketChannel arg0) throws Exception {
                                 ChannelPipeline pipeline = arg0.pipeline();
                                 //ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder
                                 //的初始化参数即为super(maxObjectSize, 0, 4, 0, 4);
     //                          pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                                 //利用LengthFieldPrepender回填补充ObjectDecoder消息报文头
     //                          pipeline.addLast(new LengthFieldPrepender(4));
     //                          pipeline.addLast(new ObjectEncoder());
                                 //考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可
     //                          pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                                 //注册Netty编码器
                                 System.out.println("11111111:"+serializeType.getSerializeType());
                                 pipeline.addLast(new NettyEncoderHandler(serializeType));
                                 //注册Netty解码器
                                 pipeline.addLast(new NettyDecoderHandler(RpcResponse.class, serializeType));
                                 pipeline.addLast("handler", new NettyClientHandler());
     
                             }
     
                         });
                 ChannelFuture future = bootstrap.connect(host,port).sync();
             }catch(Exception e){
                 group.shutdownGracefully();
             }
     
     
         }
     }
    

    在看可复用的模式,固定 handler 数量,目前框架中使用的是可复用模式,上面的不可复用的没用上,为了给大家理解,没有删除。

    public class NettyChannelPoolFactory {
     
         //初始化Netty Channel阻塞队列的长度,该值为可配置信息
         private static final int channelConnectSize = 10;
         
         //Key为服务提供者地址,value为Netty Channel阻塞队列
         private static final Map<URL, ArrayBlockingQueue<Channel>> channelPoolMap = new ConcurrentHashMap<>();
         
         private static NettyChannelPoolFactory INSTANCE = new NettyChannelPoolFactory();
         
         private NettyChannelPoolFactory(){};
         
         public static NettyChannelPoolFactory getInstance(){
             return INSTANCE;
         }
     
         private List<ServiceProvider> serviceMetaDataList = new ArrayList<>();
         
         //根据配置文件里面需要调用的接口信息来初始化 channel
         public void initNettyChannelPoolFactory(Map<String, List<ServiceProvider>> providerMap){
     
             //将服务提供者信息存入serviceMetaDataList列表
             Collection<List<ServiceProvider>> collectionServiceMetaDataList = providerMap.values();
             for (List<ServiceProvider> serviceMetaDataModels : collectionServiceMetaDataList) {
                 if (CollectionUtils.isEmpty(serviceMetaDataModels)) {
                     continue;
                 }
                 serviceMetaDataList.addAll(serviceMetaDataModels);
             }
     
             //获取服务提供者地址列表
             Set<URL> set = new HashSet<>();
             for (ServiceProvider serviceMetaData : serviceMetaDataList) {
                 String serviceIp = serviceMetaData.getIp();
                 int servicePort = serviceMetaData.getPort();
                 URL url = new URL(serviceIp,servicePort);
                 set.add(url);
             }
             
             for(URL url:set){
                 //为每个 ip端口 建立多个 channel,并且放入阻塞队列中
                 int channelSize = 0;
                 while(channelSize < channelConnectSize){
                     Channel channel = null;
                     while(channel == null){
                         channel = registerChannel(url);
                     }
                     
                     channelSize ++;
                     
                     ArrayBlockingQueue<Channel> queue = channelPoolMap.get(url);
                     if(queue == null){
                         queue = new ArrayBlockingQueue<Channel>(channelConnectSize);
                         channelPoolMap.put(url, queue);
                     }
                     queue.offer(channel);
                     
                 }
             }
             
         }
     
         public Channel registerChannel(URL url) {
             final SerializeType serializeType = SerializeType.queryByType(Configuration.getInstance().getSerialize());
             Bootstrap bootstrap = new Bootstrap();
             EventLoopGroup group = new NioEventLoopGroup(10);
             
             try{
                 bootstrap.group(group)
                     .channel(NioSocketChannel.class)
                     .handler(new ChannelInitializer<SocketChannel>(){
     
                     @Override
                     protected void initChannel(SocketChannel arg0) throws Exception {
                         ChannelPipeline pipeline = arg0.pipeline();
                         //ObjectDecoder的基类半包解码器LengthFieldBasedFrameDecoder的报文格式保持兼容。因为底层的父类LengthFieldBasedFrameDecoder
                         //的初始化参数即为super(maxObjectSize, 0, 4, 0, 4);
     //                  pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                         //利用LengthFieldPrepender回填补充ObjectDecoder消息报文头
     //                  pipeline.addLast(new LengthFieldPrepender(4));
     //                  pipeline.addLast(new ObjectEncoder());
                         //考虑到并发性能,采用weakCachingConcurrentResolver缓存策略。一般情况使用:cacheDisabled即可
     //                  pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                         pipeline.addLast(new NettyEncoderHandler(serializeType));
                         //注册Netty解码器
                         pipeline.addLast(new NettyDecoderHandler(RpcResponse.class, serializeType));
                         pipeline.addLast("handler", new NettyClientHandler());
                         
                     }
                     
                 });
                 ChannelFuture future = bootstrap.connect(url.getHost(),url.getPort()).sync();
                 Channel channel = future.channel();
                 //等待Netty服务端链路建立通知信号
                 final CountDownLatch connectedLatch = new CountDownLatch(1);
     
                 final List<Boolean> isSuccess = new ArrayList<>(1);
                 future.addListener(new ChannelFutureListener(){
     
                     @Override
                     public void operationComplete(ChannelFuture future)
                             throws Exception {
                         if(future.isSuccess()){
                             isSuccess.add(true);
                         }else{
                             isSuccess.add(false);
                         }
                         connectedLatch.countDown();
                     }
                     
                 });
                 connectedLatch.await();
                 if(isSuccess.get(0)){
                     return channel;
                 }
             }catch(Exception e){
                 group.shutdownGracefully();
                 e.printStackTrace();
             }
             return null;
         }
         //根据 url 获取阻塞队列
         public ArrayBlockingQueue<Channel> acqiure(URL url){
             System.out.println(channelPoolMap.toString());
             return channelPoolMap.get(url);
         }
         
         //channel 使用完毕后进行回收
         public void release(ArrayBlockingQueue<Channel> queue, Channel channel, URL url){
             if(queue == null){
                 return;
             }
             //需要检查 channel 是否可用,如果不可用,重新注册一个放入阻塞队列中
             if(channel == null || !channel.isActive() || !channel.isOpen()|| !channel.isWritable()){
                 if (channel != null) {
                     channel.deregister().syncUninterruptibly().awaitUninterruptibly();
                     channel.closeFuture().syncUninterruptibly().awaitUninterruptibly();
                 }
                 Channel c = null;
                 while(c == null){
                     c = registerChannel(url);
                 }
                 queue.offer(c);
                 return;
             }
             queue.offer(channel);
         }
     
     }
    

    给出对应的 handler 实现,在 channelread0 里面读取 server 端返回的信息,因为 netty 是异步的,所以需要 MessageCallBack 来实现我们的同步调用。

    public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
    
         private ChannelHandlerContext context;
     
         @Override
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                 throws Exception {
             cause.printStackTrace();
             ctx.close();
         }
     
         @Override
         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
             System.out.println("停止时间是:"+new Date());
             System.out.println("HeartBeatClientHandler channelInactive");
         }
     
         @Override
         public void channelActive(ChannelHandlerContext ctx) throws Exception {
             this.context = ctx;
             System.out.println("激活时间是:"+ctx.channel().id());
         }
     
     
         @Override
         protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponse rpcResponse) throws Exception {
             // String res = (String)msg;
             //RpcResponse rpcResponse = (RpcResponse)msg;
             String responseId = rpcResponse.getResponseId();
             MessageCallBack callBack = ResponseHolder.getInstance().mapCallBack.get(responseId);
             if(callBack != null){
                 ResponseHolder.getInstance().mapCallBack.remove(responseId);
                 callBack.over(rpcResponse);
             }
         }
     }
    

    MessageCallBack 的实现。

    public class MessageCallBack {
     
         private RpcRequest rpcRequest;
         
         private RpcResponse rpcResponse;
         
         private Lock lock = new ReentrantLock();
         
         private Condition finish = lock.newCondition();
         
         public MessageCallBack(RpcRequest request) {
             this.rpcRequest = request;
         }
     
         public Object start() throws InterruptedException {
             try {
                 lock.lock();
                 //设定一下超时时间,rpc服务器太久没有相应的话,就默认返回空吧。
                 finish.await(10*1000, TimeUnit.MILLISECONDS);
                 if (this.rpcResponse != null) {
                     return this.rpcResponse.getData();
                 } else {
                     return null;
                 }
             } finally {
                 lock.unlock();
             }
         }
     
         public void over(RpcResponse reponse) {
             try {
                 lock.lock();
                 this.rpcResponse = reponse;
                 finish.signal();
             } finally {
                 lock.unlock();
             }
         }
     
     }
    

既然是可插拔式框架,那么底层协议一定要是可选择的,所以我们定义一个顶层接口来支持我们选择协议。
start 方法是启动服务端,send 方法是客户端发送数据。

public interface Procotol {
 
     void start(URL url);
     Object send(URL url, RpcRequest invocation);
 }

对应的三个协议的接口实现。
Netty 的实现

public class DubboProcotol implements Procotol {
     @Override
     public void start(URL url) {
         NettyServer nettyServer = NettyServer.getInstance();
         nettyServer.start(url.getHost(),url.getPort());
     }
 
     @Override
     public Object send(URL url, RpcRequest invocation) {
         ArrayBlockingQueue<Channel> queue = NettyChannelPoolFactory.getInstance().acqiure(url);
         Channel channel = null;
         try {
             channel = queue.poll(invocation.getTimeout(), TimeUnit.MILLISECONDS);
             if(channel == null || !channel.isActive() || !channel.isOpen()|| !channel.isWritable()){
                 channel = queue.poll(invocation.getTimeout(), TimeUnit.MILLISECONDS);
                 if(channel == null){
                     channel = NettyChannelPoolFactory.getInstance().registerChannel(url);
                 }
             }
             //将本次调用的信息写入Netty通道,发起异步调用
             ChannelFuture channelFuture = channel.writeAndFlush(invocation);
             channelFuture.syncUninterruptibly();
             MessageCallBack callback = new MessageCallBack(invocation);
             ResponseHolder.getInstance().mapCallBack.put(invocation.getRequestId(), callback);
             try {
                 return callback.start();
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             return null;
         } catch (InterruptedException e1) {
             e1.printStackTrace();
         }finally{
             System.out.println("release:"+channel.id());
             NettyChannelPoolFactory.getInstance().release(queue, channel, url);
         }
         return null;
     }
 }

http 的实现

public class HttpProcotol implements Procotol {
     @Override
     public void start(URL url) {
         HttpServer httpServer = HttpServer.getInstance();
         httpServer.start(url.getHost(),url.getPort());
     }
 
     @Override
     public Object send(URL url, RpcRequest invocation) {
         HttpClient httpClient = HttpClient.getInstance();
         return httpClient.post(url.getHost(),url.getPort(),invocation);
     }
 }

Socket 的实现

public class SocketProcotol implements Procotol {
     @Override
     public void start(URL url) {
         SocketServer socketServer = SocketServer.getInstance();
         socketServer.publiser(url.getPort());
     }
 
     @Override
     public Object send(URL url, RpcRequest invocation) {
         SocketClient socketClient = SocketClient.getInstance();
         return socketClient.sendRequest(url.getHost(),url.getPort(),invocation);
     }
 }

这样一个可选择协议的模型就实现了,我们可已通过这个模块选择协议,并且与服务端通信。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,902评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,037评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,978评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,867评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,763评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,104评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,565评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,236评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,379评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,313评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,363评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,034评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,637评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,719评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,952评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,371评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,948评论 2 341

推荐阅读更多精彩内容

  • RPC框架远程调用的实现方式在原理上是比较简单的,即将调用的方法(接口名、方法名、参数类型、参数)序列化之后发送到...
    谜碌小孩阅读 3,077评论 0 13
  • Netty实践与NIO原理 一、阻塞IO与非阻塞IO Linux网络IO模型(5种) (1)阻塞IO模型 所有文件...
    fly_wings阅读 222评论 0 0
  • 原文出处: [黄勇](http://my.oschina.net/huangyong/blog/361751) R...
    卡普奇诺108阅读 513评论 0 2
  • RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,...
    守住阳光阅读 2,655评论 0 4
  • 还未到傍晚,天色已经整个阴沉下来,厚厚的云层盖住了邪恶的都市。有个老妇人跟在一只瘸腿狗后面,没人在意他们。风中带着...
    歌尔德蒙阅读 520评论 0 2