徒手撸框架--实现 RPC 远程调用

微服务,已经是每个互联网开发者必须掌握的一项技术。而 RPC 框架,是构成微服务最重要的组成部分之一。趁最近有时间。又看了看 dubbo 的源码。dubbo 为了做到灵活和解耦,使用了大量的设计模式和 SPI机制,要看懂 dubbo 的代码也不太容易。

按照《徒手撸框架》系列文章的套路,我还是会极简的实现一个 RPC 框架。帮助大家理解 RPC 框架的原理。

广义的来讲一个完整的 RPC 包含了很多组件,包括服务发现,服务治理,远程调用,调用链分析,网关等等。我将会慢慢的实现这些功能,这篇文章主要先讲解的是 RPC 的基石,远程调用的实现。

相信,读完这篇文章你也一定可以自己实现一个可以提供 RPC 调用的框架。

1. RPC 的调用过程

通过一图我们来了解一下 RPC 的调用过程,从宏观上来看看到底一次 RPC 调用经过些什么过程。

当一次调用开始:

client 会调用本地动态代理 proxy

这个代理会将调用通过协议转序列化字节流

通过 netty 网络框架,将字节流发送到服务端

服务端在受到这个字节流后,会根据协议,反序列化为原始的调用,利用反射原理调用服务方提供的方法

如果请求有返回值,又需要把结果根据协议序列化后,再通过 netty 返回给调用方

2. 框架概览和技术选型

看一看框架的组件:

clinet就是调用方。servive是服务的提供者。protocol包定义了通信协议。common包含了通用的一些逻辑组件。

技术选型项目使用 maven 作为包管理工具,json 作为序列化协议,使用spring boot管理对象的生命周期,netty作为 nio 的网路组件。所以要阅读这篇文章,你需要对spring boot和netty有基本的了解。

下面就看看每个组件的具体实现:

3. protocol

其实作为 RPC 的协议,需要考虑只有一个问题–就是怎么把一次方法的调用,变成能够被网络传输的字节流。

首先我们需要定义方法的调用和返回两个实体:

请求:

@Data

public class RpcRequest {

    // 调用编号

    private String requestId;

    // 类名

    private String className;

    // 方法名

    private String methodName;

    // 请求参数的数据类型

    private Class<?>[] parameterTypes;

    // 请求的参数

    private Object[] parameters;

}

结果:

@Data

public class RpcResponse {

    // 调用编号

    private String requestId;

    // 抛出的异常

    private Throwable throwable;

    // 返回结果

    private Object result;

}

确定了,需要序列化的对象,就要确定序列化的协议,实现两个方法,序列化和反序列化两个方法。

public interface Serialization {

    <T> byte[] serialize(T obj);

    <T> T deSerialize(byte[] data,Class<T> clz);

}

可选用的序列化的协议很多比如:

jdk 的序列化方法。(不推荐,不利于之后的跨语言调用)

json 可读性强,但是序列化速度慢,体积大。

protobuf,kyro,Hessian 等都是优秀的序列化框架,也可按需选择。

为了简单和便于调试,我们就选择 json 作为序列化协议,使用jackson作为 json 解析框架。

/**

* @author Zhengxin

*/

public class JsonSerialization implements Serialization {

    private ObjectMapper objectMapper;

    public JsonSerialization(){

        this.objectMapper = new ObjectMapper();

    }

    @Override

    public <T> byte[] serialize(T obj) {

        try {

            return objectMapper.writeValueAsBytes(obj);

        } catch (JsonProcessingException e) {

            e.printStackTrace();

        }

        return null;

    }

    @Override

    public <T> T deSerialize(byte[] data, Class<T> clz) {

        try {

            return objectMapper.readValue(data,clz);

        } catch (IOException e) {

            e.printStackTrace();

        }

        return null;

    }

}

因为 netty 支持自定义 coder 。所以只需要实现 ByteToMessageDecoder 和 MessageToByteEncoder 两个接口。就解决了序列化的问题:

public class RpcDecoder extends ByteToMessageDecoder {

    private Class<?> clz;

    private Serialization serialization;

    public RpcDecoder(Class<?> clz,Serialization serialization){

        this.clz = clz;

        this.serialization = serialization;

    }

    @Override

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        if(in.readableBytes() < 4){

            return;

        }

        in.markReaderIndex();

        int dataLength = in.readInt();

        if (in.readableBytes() < dataLength) {

            in.resetReaderIndex();

            return;

        }

        byte[] data = new byte[dataLength];

        in.readBytes(data);

        Object obj = serialization.deSerialize(data, clz);

        out.add(obj);

    }

}

public class RpcEncoder extends MessageToByteEncoder {

    private Class<?> clz;

    private Serialization serialization;

    public RpcEncoder(Class<?> clz, Serialization serialization){

        this.clz = clz;

        this.serialization = serialization;

    }

    @Override

    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {

        if(clz != null){

            byte[] bytes = serialization.serialize(msg);

            out.writeInt(bytes.length);

            out.writeBytes(bytes);

        }

    }

}

至此,protocol 就实现了,我们就可以把方法的调用和结果的返回,转换为一串可以在网络中传输的 byte[] 数组了。

4. server

server 是负责处理客户端请求的组件。在互联网高并发的环境下,使用 Nio 非阻塞的方式可以相对轻松的应付高并发的场景。netty 是一个优秀的 Nio 处理框架。Server 的关键代码如下:

netty 是基于 Recotr 模型的。所以需要初始化两组线程 boss 和 worker 。boss 负责分发请求,worker 负责执行相应的 handler:

@Bean

  public ServerBootstrap serverBootstrap() throws InterruptedException {

      ServerBootstrap serverBootstrap = new ServerBootstrap();

      serverBootstrap.group(bossGroup(), workerGroup())

              .channel(NioServerSocketChannel.class)

              .handler(new LoggingHandler(LogLevel.DEBUG))

              .childHandler(serverInitializer);

      Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();

      Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();

      for (@SuppressWarnings("rawtypes") ChannelOption option : keySet) {

          serverBootstrap.option(option, tcpChannelOptions.get(option));

      }

      return serverBootstrap;

  }

netty 的操作是基于 pipeline 的。所以我们需要把在 protocol 实现的几个 coder 注册到 netty 的 pipeline 中。

ChannelPipeline pipeline = ch.pipeline();

// 处理 tcp 请求中粘包的 coder,具体作用可以自行 google

pipeline.addLast(new LengthFieldBasedFrameDecoder(65535,0,4));

// protocol 中实现的 序列化和反序列化 coder

pipeline.addLast(new RpcEncoder(RpcResponse.class,new JsonSerialization()));

pipeline.addLast(new RpcDecoder(RpcRequest.class,new JsonSerialization()));

// 具体处理请求的 handler 下文具体解释

pipeline.addLast(serverHandler);

实现具体的 ServerHandler 用于处理真正的调用。

ServerHandler 继承 SimpleChannelInboundHandler<RpcRequest>。简单来说这个 InboundHandler 会在数据被接受时或者对于的 Channel 的状态发生变化的时候被调用。当这个 handler 读取数据的时候方法 channelRead0() 会被用,所以我们就重写这个方法就够了。

@Override

protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {

    RpcResponse rpcResponse = new RpcResponse();

    rpcResponse.setRequestId(msg.getRequestId());

    try{

        // 收到请求后开始处理请求

        Object handler = handler(msg);

        rpcResponse.setResult(handler);

    }catch (Throwable throwable){

        // 如果抛出异常也将异常存入 response 中

        rpcResponse.setThrowable(throwable);

        throwable.printStackTrace();

    }

    // 操作完以后写入 netty 的上下文中。netty 自己处理返回值。

    ctx.writeAndFlush(rpcResponse);

}

handler(msg) 实际上使用的是 cglib 的 Fastclass 实现的,其实根本原理,还是反射。学好 java 中的反射真的可以为所欲为。

private Object handler(RpcRequest request) throws Throwable {

    Class<?> clz = Class.forName(request.getClassName());

    Object serviceBean = applicationContext.getBean(clz);

    Class<?> serviceClass = serviceBean.getClass();

    String methodName = request.getMethodName();

    Class<?>[] parameterTypes = request.getParameterTypes();

    Object[] parameters = request.getParameters();

    // 根本思路还是获取类名和方法名,利用反射实现调用

    FastClass fastClass = FastClass.create(serviceClass);

    FastMethod fastMethod = fastClass.getMethod(methodName,parameterTypes);

    // 实际调用发生的地方

    return fastMethod.invoke(serviceBean,parameters);

}

总体上来看,server 的实现不是很困难。核心的知识点是 netty 的 channel 的使用和 cglib 的反射机制。

5. client

future

其实,对于我来说,client 的实现难度,远远大于 server 的实现。netty 是一个异步框架,所有的返回都是基于 Future 和 Callback 的机制。

所以在阅读以下文字前强烈推荐,我之前写的一篇文章 Future 研究。利用经典的 wite 和 notify 机制,实现异步的获取请求的结果。

/**

* @author zhengxin

*/

public class DefaultFuture {

private RpcResponse rpcResponse;

private volatile boolean isSucceed = false;

private final Object object = new Object();

public RpcResponse getResponse(int timeout){

synchronized (object){

while (!isSucceed){

try {

                    //wait

object.wait(timeout);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

return rpcResponse;

}

}

public void setResponse(RpcResponse response){

if(isSucceed){

return;

}

synchronized (object) {

this.rpcResponse = response;

this.isSucceed = true;

            //notiy

object.notify();

}

}

}

复用资源

为了能够提升 client 的吞吐量,可提供的思路有以下几种:

使用对象池:建立多个 client 以后保存在对象池中。但是代码的复杂度和维护 client 的成本会很高。

尽可能的复用 netty 中的 channel。

之前你可能注意到,为什么要在 RpcRequest 和 RpcResponse 中增加一个 ID。因为 netty 中的 channel 是会被多个线程使用的。当一个结果异步的返回后,你并不知道是哪个线程返回的。这个时候就可以考虑利用一个 Map,建立一个 ID 和 Future 映射。这样请求的线程只要使用对应的 ID 就能获取,相应的返回结果。

/**

* @author Zhengxin

*/

public class ClientHandler extends ChannelDuplexHandler {

    // 使用 map 维护 id 和 Future 的映射关系,在多线程环境下需要使用线程安全的容器

    private final Map<String, DefaultFuture> futureMap = new ConcurrentHashMap<>();

    @Override

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {

        if(msg instanceof RpcRequest){

            RpcRequest request = (RpcRequest) msg;

            // 写数据的时候,增加映射

            futureMap.putIfAbsent(request.getRequestId(),new DefaultFuture());

        }

        super.write(ctx, msg, promise);

    }

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        if(msg instanceof RpcResponse){

            RpcResponse response = (RpcResponse) msg;

            // 获取数据的时候 将结果放入 future 中

            DefaultFuture defaultFuture = futureMap.get(response.getRequestId());

            defaultFuture.setResponse(response);

        }

        super.channelRead(ctx, msg);

    }

    public RpcResponse getRpcResponse(String requestId){

        try {

            // 从 future 中获取真正的结果。

            DefaultFuture defaultFuture = futureMap.get(requestId);

            return defaultFuture.getResponse(10);

        }finally {

            // 完成后从 map 中移除。

            futureMap.remove(requestId);

        }

    }

}

这里没有继承 server 中的 InboundHandler 而使用了 ChannelDuplexHandler。顾名思义就是在写入和读取数据的时候,都会触发相应的方法。写入的时候在 Map 中保存 ID 和 Future。读到数据的时候从 Map 中取出 Future 并将结果放入 Future 中。获取结果的时候需要对应的 ID。

使用 Transporters 对请求进行封装。

public class Transporters {

    public static RpcResponse send(RpcRequest request){

        NettyClient nettyClient = new NettyClient("127.0.0.1", 8080);

        nettyClient.connect(nettyClient.getInetSocketAddress());

        RpcResponse send = nettyClient.send(request);

        return send;

    }

}

动态代理的实现

动态代理技术最广为人知的应用,应该就是 Spring Aop,面向切面的编程实现。动态的在原有方法Before 或者 After 添加代码。而 RPC 框架中动态代理的作用就是彻底替换原有方法,直接调用远程方法。

代理工厂类:

public class ProxyFactory {

    @SuppressWarnings("unchecked")

    public static <T> T create(Class<T> interfaceClass){

        return (T) Proxy.newProxyInstance(

                interfaceClass.getClassLoader(),

                new Class<?>[]{interfaceClass},

                new RpcInvoker<T>(interfaceClass)

        );

    }

}

当 proxyFactory 生成的类被调用的时候,就会执行 RpcInvoker 方法。

public class RpcInvoker<T> implements InvocationHandler {

    private Class<T> clz;

    public RpcInvoker(Class<T> clz){

        this.clz = clz;

    }

    @Override

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        RpcRequest request = new RpcRequest();

        String requestId = UUID.randomUUID().toString();

        String className = method.getDeclaringClass().getName();

        String methodName = method.getName();

        Class<?>[] parameterTypes = method.getParameterTypes();

        request.setRequestId(requestId);

        request.setClassName(className);

        request.setMethodName(methodName);

        request.setParameterTypes(parameterTypes);

        request.setParameters(args);

        return Transporters.send(request).getResult();

    }

}

看到这个 invoke 方法,主要三个作用,

生成 RequestId。

拼装 RpcRequest。

调用 Transports 发送请求,获取结果。

至此终于,整个调用链完整了。我们终于完成了一次 RPC 调用。

与 Spring 集成

为了使我们的 client 能够易于使用我们需要考虑,定义一个自定义注解 @RpcInterface 当我们的项目接入 Spring 以后,Spring 扫描到这个注解之后,自动的通过我们的 ProxyFactory 创建代理对象,并存放在 spring 的 applicationContext 中。这样我们就可以通过 @Autowired 注解直接注入使用了。

@Target({ElementType.TYPE})

@Retention(RetentionPolicy.RUNTIME)

public @interface RpcInterface {

}

@Configuration

@Slf4j

public class RpcConfig implements ApplicationContextAware,InitializingBean {

private ApplicationContext applicationContext;

@Override

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

this.applicationContext = applicationContext;

}

@Override

public void afterPropertiesSet() throws Exception {

Reflections reflections = new Reflections("com.xilidou");

DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();

        // 获取 @RpcInterfac 标注的接口

Set<Class<?>> typesAnnotatedWith = reflections.getTypesAnnotatedWith(RpcInterface.class);

for (Class<?> aClass : typesAnnotatedWith) {

            // 创建代理对象,并注册到 spring 上下文。

beanFactory.registerSingleton(aClass.getSimpleName(),ProxyFactory.create(aClass));

}

log.info("afterPropertiesSet is {}",typesAnnotatedWith);

}

}

终于我们最简单的 RPC 框架就开发完了。下面可以测试一下。

6. Demo

api

@RpcInterface

public interface IHelloService {

    String sayHi(String name);

}

server

IHelloSerivce 的实现:

@Service

@Slf4j

public class TestServiceImpl implements IHelloService {

    @Override

    public String sayHi(String name) {

        log.info(name);

        return "Hello " + name;

    }

}

启动服务:

@SpringBootApplication

public class Application {

    public static void main(String[] args) throws InterruptedException {

        ConfigurableApplicationContext context = SpringApplication.run(Application.class);

        TcpService tcpService = context.getBean(TcpService.class);

        tcpService.start();

    }

}

`

client

@SpringBootApplication()

public class ClientApplication {

    public static void main(String[] args) {

        ConfigurableApplicationContext context = SpringApplication.run(ClientApplication.class);

    IHelloService helloService = context.getBean(IHelloService.class);

        System.out.println(helloService.sayHi("doudou"));

    }

}

运行以后输出的结果:

Hello doudou

总结

终于我们实现了一个最简版的 RPC 远程调用的模块。

想了解更多可加 扣扣 7 9 8 8 9 1 7 1 0

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,497评论 18 139
  • RPC框架远程调用的实现方式在原理上是比较简单的,即将调用的方法(接口名、方法名、参数类型、参数)序列化之后发送到...
    谜碌小孩阅读 3,074评论 0 13
  • 在小的时候,经常听爷爷说过。许多鬼神的传说。
    友谊天长地久阅读 155评论 0 0
  • 当夜晚一来时 我就开始写诗 记录今天发生的事 所有快乐、忧愁 都随笔尖的滚动 被带进回忆 记录,不是为了别的 当然...
    想要养只喵阅读 238评论 0 1
  • 5月29韶关资格审查,6月4湖南资格审查,6月9乳源事业单位笔试,6月13广东省考面试,6月16端午假期父亲节回家...
    更向远行阅读 240评论 0 0