手撕RPC框架

手撕RPC

使用Netty+Zookeeper+Spring实现简易的RPC框架。阅读本文需要有一些Netty使用基础。

服务信息在网络传输,需要讲服务类进行序列化,服务端使用Spring作为容器。服务端发布服务,将接口的全路径当做节点名称,服务的ip+端口作为节点值,存储到Zookeeper中。客户端调用的时候,去Zookeeper查询,获得提供该接口的服务器ip和端口,通过Netty进行调用。

工程引用的jar包

io.netty

netty-all

4.1.32.Final

org.springframework

spring-context

5.1.3.RELEASE

org.apache.zookeeper

zookeeper

3.4.10

1.请求和响应

服务端和客户端进行通信,完成远程服务调用,需要统一封装请求和响应。

请求统一封装类,服务端接收请求之后,通过反射获得客户端传送的对象。

package com.test.rpc.pojo;

import java.io.Serializable;

public class Request implements Serializable {

private String serviceName;//调用的服务名字

private String methodName;//调用的方法名

private Class[] parameterTypes;//方法参数的类型

private Object[] args;//方法的参数

//getter/setter

}

响应统一封装类,服务端返回给客户端的响应

package com.test.rpc.pojo;

import java.io.Serializable;

public class Response implements Serializable {

private Object result;//响应结果

private Exception exception;//响应的异常

//getter/setter

}

2.通用工具类

因为对象在网络上传输需要转换成字节,所以需要序列化。

序列化工具类,可以替换成第三方,如json。

package com.test.rpc.util;

import java.io.*;

/**

* 序列化工具,可以替换成第三方

*/

public class SerializeUtil {

//序列化

public static byte[] serialize(Object o) throws IOException {

ByteArrayOutputStream baos = null;

ObjectOutputStream oos = null;

try {

// 序列化

baos = new ByteArrayOutputStream();

oos = new ObjectOutputStream(baos);

oos.writeObject(o);

byte[] bytes = baos.toByteArray();

return bytes;

} catch (Exception e) {

e.printStackTrace();

} finally {

oos.close();

baos.close();

}

return null;

}

// 反序列化

public static Object unserialize(byte[] bytes)throws IOException {

ByteArrayInputStream bais = null;

ObjectInputStream ois = null;

try {

bais = new ByteArrayInputStream(bytes);

ois = new ObjectInputStream(bais);

return ois.readObject();

} catch (Exception e) {

e.printStackTrace();

} finally {

bais.close();

ois.close();

}

return null;

}

}

Zookeeper工具类,连接zookeeper的工具

package com.test.rpc.util;

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.Stat;

import java.io.IOException;

public class ZkUtil {

private ZooKeeper zk;

public ZkUtil(String address, int sessionTimeout) {

try {

zk = new ZooKeeper(address, sessionTimeout, null);

} catch (IOException e) {

e.printStackTrace();

}

}

public Stat exist(String path, boolean needWatch) {

try {

return this.zk.exists(path, needWatch);

} catch (Exception e) {

e.printStackTrace();

return null;

}

}

public boolean createNode(String path, String data) {

try {

this.exist(path, true);

zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

return true;

} catch (Exception e) {

e.printStackTrace();

return false;

}

}

public boolean updateNode(String path, String data) {

try {

this.exist(path, true);

zk.setData(path, data.getBytes(), -1);

return true;

} catch (Exception e) {

e.printStackTrace();

return false;

}

}

public boolean deleteNode(String path) {

try {

this.exist(path, true);

zk.delete(path, -1);

return true;

} catch (Exception e) {

e.printStackTrace();

return false;

}

}

public String getNodeData(String path) {

try {

byte[] data = zk.getData(path, false, null);

return new String(data);

} catch (KeeperException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

return null;

}

public void close() {

try {

if (zk != null) {

zk.close();

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

3.Netty编码解码

package com.test.rpc.netty;

import com.test.rpc.util.SerializeUtil;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.MessageToByteEncoder;

/**

* 消息编码器,实现服务消息序列化

*/

public class ServiceEncoder extends MessageToByteEncoder {

@Override

protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf out) throws Exception {

byte[] bytes = SerializeUtil.serialize(o);//序列化

int dataLength = bytes.length; //读取消息的长度

out.writeInt(dataLength); //先将消息长度写入,也就是消息头,解决粘包和半包的问题

out.writeBytes(bytes);

}

}

package com.test.rpc.netty;

import com.test.rpc.util.SerializeUtil;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

/**

* 通信解码,实现消息的反序列化

*/

public class ServiceDecoder extends ReplayingDecoder {

@Override

protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List out) throws Exception {

int length = in.readInt();//消息头,消息的总长度

byte[] content = new byte[length];

in.readBytes(content);

Object o = SerializeUtil.unserialize(content); //byte数据转化为我们需要的对象。

out.add(o);

}

}

4.服务端代码

为了发布服务方便,定义一个注解,让Spring扫描此注解,统一发布服务

package com.test.rpc.server;

import org.springframework.stereotype.Component;

import java.lang.annotation.ElementType;

import java.lang.annotation.Retention;

import java.lang.annotation.RetentionPolicy;

import java.lang.annotation.Target;

@Target({ElementType.TYPE})

@Retention(RetentionPolicy.RUNTIME)

@Component

public @interface RpcService {

Class value();//发布的服务类型

}

注册服务就是把服务名称和提供者地址存到Zookeeper中。

package com.test.rpc.server;

import com.test.rpc.util.ZkUtil;

import org.apache.zookeeper.data.Stat;

import org.springframework.context.ApplicationContext;

import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.Map;

public class ServerContext {

private static String ip = "127.0.0.1";//服务端地址

private static int port = 8000;//服务端口

private static final ApplicationContext applicationContext;//spring容器

public static Object getBean(Class clazz) {//获得容器中对象

return applicationContext.getBean(clazz);

}

static {

//创建Spring容器,如果在web环境的话可以用Listener

applicationContext = new ClassPathXmlApplicationContext("spring-server.xml");

ZkUtil zookeeper = applicationContext.getBean(ZkUtil.class);

//通过注解获得bean

Map serviceBeanMap = applicationContext.getBeansWithAnnotation(RpcService.class);

if (serviceBeanMap != null) {

for (Object bean : serviceBeanMap.values()) {

RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);

String serviceName = "/" + rpcService.value().getName();//注册的节点名,是接口全路径

Stat stat= zookeeper.exist(serviceName,false);

if(stat!=null){//如果节点存在就删除

zookeeper.deleteNode(serviceName);

}

zookeeper.createNode(serviceName, ip + ":" + port);//注册服务

}

}

}

}

package com.test.rpc.server;

import com.test.rpc.pojo.Request;

import com.test.rpc.pojo.Response;

import io.netty.channel.ChannelFutureListener;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import java.lang.reflect.Method;

/**

* 处理客户端发送的请求

*/

public class ServerRequestHandler extends SimpleChannelInboundHandler {

@Override

protected void channelRead0(ChannelHandlerContext channelHandlerContext, Request request) throws Exception {

Response response = new Response();

try {

Object result = handleRequest(request);

response.setResult(result);

} catch (Exception e) {

e.printStackTrace();

response.setException(e);

}

//写入响应并关闭

channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);

}

private Object handleRequest(Request request) throws Exception {

String serviceName = request.getServiceName();//被的调用的服务

String methodName = request.getMethodName();//被调用的方法

Class[] parameterTypes = request.getParameterTypes();//方法参数的类型

Object[] args = request.getArgs();//方法的参数

//实例化被调用的服务对象

Class clazz = Class.forName(serviceName);

Object target = ServerContext.getBean(clazz);//从容器获得对象

//获得方法对象

Method method = clazz.getMethod(methodName, parameterTypes);

Object result = method.invoke(target, args);//执行方法

return result;//返回结果

}

}

服务启动类

package com.test.rpc.server;

import com.test.rpc.netty.ServiceDecoder;

import com.test.rpc.netty.ServiceEncoder;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

public class ServerMain {

public static void main(String[] args) {

int port=8000;

new ServerContext();

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

try {

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.childHandler(new ChannelInitializer() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(new ServiceEncoder());//编码

ch.pipeline().addLast(new ServiceDecoder());//解码

ch.pipeline().addLast(new ServerRequestHandler());//请求处理

}

})

.option(ChannelOption.SO_BACKLOG, 128)

.childOption(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture f = b.bind(port).sync();

f.channel().closeFuture().sync();

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

workerGroup.shutdownGracefully();

bossGroup.shutdownGracefully();

}

}

}

5. 待发布的服务

package com.test.rpc.service;

public interface HelloService {

String hello(String message);

}

package com.test.rpc.service;

import com.test.rpc.server.RpcService;

@RpcService(HelloService.class)//自定义的注解

public class HelloServiceImpl implements HelloService {

@Override

public String hello(String message) {

return "server response:" + message;

}

}

spring-server.xml配置文件


xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

6. 客户端代码

package com.test.rpc.client;

import com.test.rpc.netty.ServiceDecoder;

import com.test.rpc.netty.ServiceEncoder;

import com.test.rpc.pojo.Request;

import com.test.rpc.pojo.Response;

import com.test.rpc.util.ZkUtil;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioSocketChannel;

import org.springframework.context.ApplicationContext;

public class ClientSender extends SimpleChannelInboundHandler {

private ApplicationContext applicationContext;

public ClientSender(ApplicationContext applicationContext) {

this.applicationContext = applicationContext;

}

private Response response;

@Override

protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {

this.response = response;//处理响应

}

//发送请求

public Response send(Request request) throws Exception {

EventLoopGroup group = new NioEventLoopGroup();

try {

Bootstrap bootstrap = new Bootstrap();

bootstrap.group(group)

.channel(NioSocketChannel.class)

.handler(new ChannelInitializer() {

@Override

protected void initChannel(Channel ch) {

ch.pipeline().addLast(new ServiceEncoder());

ch.pipeline().addLast(new ServiceDecoder());

ch.pipeline().addLast(ClientSender.this);

}

});

//去注册中心查看服务发布者

ZkUtil zk = applicationContext.getBean(ZkUtil.class);

String address = zk.getNodeData("/"+request.getServiceName());

String[] serverAndPort = address.split(":");//查到的服务提供者地址

// 连接 RPC 服务器

ChannelFuture future = bootstrap.connect(serverAndPort[0], Integer.parseInt(serverAndPort[1])).sync();

// 写入 RPC 请求数据并关闭连接

Channel channel = future.channel();

channel.writeAndFlush(request).sync();

channel.closeFuture().sync();

return response;

} finally {

group.shutdownGracefully();

}

}

}

请求动态代理类,调用服务的时候需要创建动态代理,把服务对象封装成Request

package com.test.rpc.client;

import com.test.rpc.pojo.Request;

import com.test.rpc.pojo.Response;

import org.springframework.beans.BeansException;

import org.springframework.context.ApplicationContext;

import org.springframework.context.ApplicationContextAware;

import java.lang.reflect.InvocationHandler;

import java.lang.reflect.Method;

import java.lang.reflect.Proxy;

public class RequestProxy implements ApplicationContextAware {

private ApplicationContext applicationContext;

@Override

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

this.applicationContext = applicationContext;

}

//动态代理

public T createProxy(final Class interfaceClass) throws Exception {

return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {

@Override

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

Request request = new Request();

request.setServiceName(method.getDeclaringClass().getName());

request.setMethodName(method.getName());

request.setParameterTypes(method.getParameterTypes());

request.setArgs(args);

ClientSender sender = new ClientSender(applicationContext);

Response response = sender.send(request);

if (response.getException() != null) {

throw response.getException();

}

return response.getResult();

}

});

}

}

spring-client.xml


xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

package com.test.rpc.client;

import com.test.rpc.service.HelloService;

import org.springframework.context.ApplicationContext;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ClientMain {

public static void main(String[] args) throws Exception {

ApplicationContext applicationContext=new ClassPathXmlApplicationContext("spring-client.xml");

RequestProxy proxy=applicationContext.getBean(RequestProxy.class);

HelloService helloService=proxy.createProxy(HelloService.class);

System.out.println(helloService.hello("netty"));

}

}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容