基于Netty版本的RPC框架

netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。

1 构建工程 目录 

api 

public interface IRpcHelloService {

        String   hello(String name);

}

public interface IRpcService {

    int add(int a, int b);

    int sub(int a, int b);

    int mult(int a, int b);

    int div(int a, int b);

}

2 protocol

@Data

public class InvokerProtocol implements Serializable {

    private String className;//服务名

    private String methodName;//方法名

    private Class[] prames;//形参列表

    private Object[] values;//实例参数

}

3 provider

public class RpcHelloServiceImpl implements IRpcHelloService {

    public String  hello(String name) {

            return "Hello " + name +"!";

    }

}

public class RpcServiceImpl implements IRpcService {

public int add(int a, int b) {

        return a + b;

    }

public int sub(int a, int b) {

        return a - b;

    }

public int mult(int a, int b) {

        return a * b;

    }

public int div(int a, int b) {

            return a / b;

    }

3 proxy

public class RpcProxy {

    public static T create(Class clazz){

       MethodProxy methodProxy =  new MethodProxy(clazz);

        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, methodProxy);

    }

private static class MethodProxy implements InvocationHandler {

        private Class clazz;

        public MethodProxy(Class clazz) {

            this.clazz = clazz;

        }

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

     if (Object.class.equals(method.getDeclaringClass()))

            return method.invoke(this, args);

        else

                return rpcInvoker(proxy, method, args);

        }

private Object rpcInvoker(Object proxy, Method method, Object[] args) {

            //先构造一个消息协议的内容

            //发起网络请求

            EventLoopGroup worker = new NioEventLoopGroup();

            try {

                InvokerProtocol msg = new InvokerProtocol();

                msg.setClassName(this.clazz.getName());

                msg.setMethodName(method.getName());

                msg.setPrames(method.getParameterTypes());

                msg.setValues(args);

                final RpcProxyHandler proxyHandler =new RpcProxyHandler();

                Bootstrap client = new Bootstrap();

                client.group(worker)

                .channel(NioSocketChannel.class)

                .option(ChannelOption.TCP_NODELAY,  true)

                .handler(new ChannelInitializer(){

         protected void initChannel(SocketChannel socketChannel) throws Exception {

                               //在netty中 把所有的业务逻辑中全部归到一个队列中 这个队列中包含了各种各样的逻辑

                               //pipleline

                                ChannelPipeline channelPipeline = socketChannel.pipeline();

                                //就是对我们处理逻辑的封装

                                //对自定义协议 进行编码 解码

                                channelPipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4, 0,4));

                                //定义编码器

                                channelPipeline.addLast(new LengthFieldPrepender(4));

                                //参数的处理 反序列化java识别的对象

                                channelPipeline.    ("encoder", new ObjectEncoder());

                                channelPipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));

                                //最后一步 完成对数据的解析 执行属于自己的逻辑

                                //1 注册 2 服务位置做登记

                                channelPipeline.addLast(proxyHandler);

                            }

});

                ChannelFuture future = client.connect("localhost", 8080).sync();

                future.channel().writeAndFlush(msg).sync();

                future.channel().closeFuture().sync();

                return proxyHandler.getResponse();

            } catch (Exception e) {

                e.printStackTrace();

            }   finally {

                worker.shutdownGracefully();

            }

            return null;

        }

    }

}

public class RpcProxyHandler extends ChannelInboundHandlerAdapter {

    private Object response;

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {

        response = msg;

    }

    @Override

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {

        cause.printStackTrace();

        System.out.println("client is exception");

    }

    public Object getResponse() {

        return response;

    }

}

4 register

public class RegisterHandler extends ChannelInboundHandlerAdapter {

        private List<String> className = new ArrayList();

        private Map <String, Object>  registerMap = new ConcurrentHashMap();

        //有客户端连上的时候 回调

        public RegisterHandler() {

        //根据包名 将class文件扫描

        scanClass("rpcnetty.provider");

        //根据包名 将class文件扫描到容器中

        doRegister();

    }

    private void doRegister() {

        if (className.isEmpty()) return;

        for (String className : className) {

            try {

                Class clazz = Class.forName(className);

                Class i = clazz.getInterfaces()[0];

                String serviceName = i.getName();

                //本来存的是网络路径 调用的时候解析

                registerMap.put(serviceName, clazz.newInstance());

            } catch (Exception e) {

                e.printStackTrace();

            }

    }

}

private void scanClass(String name) {

        URL url = this.getClass().getClassLoader().getResource(name.replaceAll("\\.", "/"));

        File classpath = new File(url.getFile());

        for (File file : classpath.listFiles()) {

            if (file.isDirectory()) {

                scanClass(name +"." + file.getName());

            } else {

                className.add(name +"." + file.getName().replaceAll(".class", ""));

            }

    }

}

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {

        Object result = new Object();

        InvokerProtocol request = (InvokerProtocol)msg;

        //去容器中找到符合的服务对象

        if (registerMap.containsKey(request.getClassName())) {

            Object service = registerMap.get(request.getClassName());

            Method method = service.getClass().getMethod(request.getMethodName(),             request.getPrames());

            result = method.invoke(service, request.getValues());

        }

            ctx.write(result);

            ctx.flush();

            ctx.close();

    }

    //发生异常 回调

    @Override

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {

        }

}

public class RegisterHandler extends ChannelInboundHandlerAdapter {

   private List<String> className = new ArrayList();

   private Map<String, Object> registerMap = new ConcurrentHashMap();

    //有客户端连上的时候 回调

    public RegisterHandler() {

         //根据包名 将class文件扫描

        scanClass("rpcnetty.provider");

        //根据包名 将class文件扫描到容器中

        doRegister();

    }

private void doRegister() {

        if (className.isEmpty()) return;

        for (String className : className) {

    try {

                Class clazz = Class.forName(className);

                Class i = clazz.getInterfaces()[0];

                String serviceName = i.getName();

                //本来存的是网络路径 调用的时候解析

                registerMap.put(serviceName, clazz.newInstance());

            } catch (Exception e) {

            e.printStackTrace();

            }

        }

}

private void scanClass(String name) {

        URL url = this.getClass().getClassLoader().getResource(name.replaceAll("\\.", "/"));

        File classpath = new File(url.getFile());

        for (File file : classpath.listFiles()) {

                if (file.isDirectory()) {

                    scanClass(name +"." + file.getName());

            } else {

                    className.add(name +"." + file.getName().replaceAll(".class", " "));

            }

    }

}

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        Object result = new Object();

        InvokerProtocol request = (InvokerProtocol)msg;

        //去容器中找到符合的服务对象

        if (registerMap.containsKey(request.getClassName())) {

            Object service = registerMap.get(request.getClassName());

            Method method = service.getClass().getMethod(request.getMethodName(),             request.getPrames());

            result = method.invoke(service, request.getValues());

        }

            ctx.write(result);

           ctx.flush();

            ctx.close();

    }

    //发生异常 回调

    @Override

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {

    }

}

public class RpcRegister {

    private int port =8080;

    public RpcRegister(int port) {

        this.port = port;

    }

    public void start() {

        try {

            ServerBootstrap server = new ServerBootstrap();

            //基于 nio 来实现 selector主线程 worker 线程

            //主线程池

            EventLoopGroup bossGroup = new NioEventLoopGroup();

            //子线程池

            EventLoopGroup workGroup = new NioEventLoopGroup();

            server.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)

                    .childHandler(new ChannelInitializer() {

                            protected void initChannel(SocketChannel ch)throws Exception {

                        / /在netty中 把所有的业务逻辑中全部归到一个队列中 这个队列中包含了各种各样的逻辑

                            //pipleline

                            ChannelPipeline channelPipeline = ch.pipeline();

                            //就是对我们处理逻辑的封装

                            //对自定义协议 进行编码 解码

                            channelPipeline.addLast(new                                 LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4, 0,4));

                            //定义编码器

                            channelPipeline.addLast(new LengthFieldPrepender(4));

                            //参数的处理 反序列化java识别的对象

                            channelPipeline.addLast("encoder", new ObjectEncoder());

                            channelPipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,                                 ClassResolvers.cacheDisabled(null)));

                            //最后一步 完成对数据的解析 执行属于自己的逻辑

                            //1 注册 2 服务位置做登记

                            channelPipeline.addLast(new RegisterHandler());

                        }

        })

             .option(ChannelOption.SO_BACKLOG, 128)

             .childOption(ChannelOption.SO_KEEPALIVE, true);

            //正式启动服务相当于死循环轮询

            ChannelFuture future =  server.bind(this.port).sync();

            System.out.println("GP RPC Register start listen at : " + this.port);

            future.channel().closeFuture().sync();

        } catch (Exception e) {

            e.printStackTrace();

        }

}

    public static void main(String[] args) {

            new RpcRegister(8080).start();

    }

}

consumer

public class RpcConsumer {

    public static void main(String[] args) {

       IRpcService service = RpcProxy.create(IRpcService.class);

        System.out.println(" 8 + 2 = " + service.add(8, 2));

        System.out.println(" 8 - 2 = " + service.sub(8, 2));

        System.out.println(" 8 * 2 = " + service.mult(8, 2));

        System.out.println(" 8 / 2 = " + service.div(8, 2));

    }

测试 启动 注册中心 

运行结果:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容