实现一个简单的RPC系统
provider:
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
public class Provider {
public static void main(String[] args) throws Exception {
//新建一个server socket 端口号为1234,普通的RPC调用可以通过socket来实现
ServerSocket server=new ServerSocket(1234);
//监听socket连接并响应
while(true)
{
Socket socket=server.accept();
ObjectInputStream input=new ObjectInputStream(socket.getInputStream());
//获得服务端要调用的类名
String classname=input.readUTF();
//获得服务端要调用的方法名称
String methodName=input.readUTF();
//获得服务端要调用方法的参数类型
Class<?>[] parameterTypes=(Class<?>[]) input.readObject();
//获得服务端要调用方法的每一个参数的值
Object[] arguments=(Object[]) input.readObject();
//创建类
Class serviceClass=Class.forName(classname);
//创建对象
Object object = serviceClass.newInstance();
//获得该类的对应的方法
Method method=serviceClass.getMethod(methodName, parameterTypes);
//该对象调用指定方法
Object result=method.invoke(object, arguments);
ObjectOutputStream output=new ObjectOutputStream(socket.getOutputStream());
output.writeObject(result);
socket.close();
}
}
}
consumer:
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.Socket;
public class Consumer {
public void doBusiness() throws Exception{
//设置调用类的路径和要调用的方法
String classname="com.dfire.demo.rpc.RPCServiceImpl";
String method="sayHello";
Class[] argumentsType={String.class};
//获取本机计算机名称
InetAddress inetAddress = InetAddress.getLocalHost();
String hostName = inetAddress.getHostName().toString();
Object[] arguments={hostName};
//与10.1.134.145主机建立socket连接进行通讯
Socket socket = new Socket("127.0.0.1",1234);
ObjectOutputStream output=new ObjectOutputStream(socket.getOutputStream());
//输入数据
output.writeUTF(classname);
output.writeUTF(method);
output.writeObject(argumentsType);
output.writeObject(arguments);
//得到返回数据
ObjectInputStream input=new ObjectInputStream(socket.getInputStream());
Object result=input.readObject();
System.out.println(Thread.currentThread().getName()+" "+result);
socket.close();
}
public static void main(String[] args) {
try {
new Consumer().doBusiness();
}catch (Exception e){
e.printStackTrace();
}
}
}
输入命令查看:
lsof -i tcp:1234
上图显示线程16419在监听TCP的通讯,做为RPC的server端
下面我们看下我们的dubbo的通讯监听情况:
先看服务提供者:
lsof -i tcp:20881
下面为服务消费者:
lsof -i tcp:20880
最后边的许多连接表示10.1.134.145连接了多个机器的服务【服务提供者端口为20880】
上面通过两个简单的类Provider和Consumer实现了一个简单的RPC系统,大家应该就能清楚认识到RPC的整个架构和通讯是比较简单的,当然上面的这个RPC系统有很多缺点:
- BIO,不能支持高并发。
- 不支持负载均衡
- 不支持容错机制
- 不支持SPI扩展
- 不支持各种序列化
- 不支持自动注册和发现
当然还有其他很多缺点,业界有很多RPC的框架,例如:dubbo,我们下面来讲一下讲。
下面是从dubbo官网copy下面的一段话,对dubbo的简单描述:
dubbo的拓扑图
我们上面自己实现的RPC系统就缺少Registry和Monitor这两个模块,当然其他模块也很弱
整体设计
- config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类
- proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory
- registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService
- cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口 为 Cluster, Directory, Router, LoadBalance
- monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorServiceprotocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
- exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
- transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codecserialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool
调用链:
服务提供者暴露一个服务的过程:
暴露服务时序图
服务消费者消费一个服务的过程
引用服务时序
服务提供 Invoker 和服务消费 Invoker
核心领域模型(Microkernel + Plugin 模式)
Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口,它负责 Invoker 的生命周期管理。
functionalities是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可
执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可
能一个集群实现。Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等。
dubbo:// 数据传输
Dubbo 缺省协议采用单一长连接和 NIO 异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。反之,Dubbo 缺省协议不适合传送大数据量的服务,比如传文件,传视频等,除非请求量很低。
+ Transporter: mina, netty, grizzy
+ Serialization: dubbo, hessian2, java, json
+ Dispatcher: all, direct, message, execution, connection
+ ThreadPool: fixed, cached, limited
Dispatcher
对于Dubbo集群中的Provider角色,有IO线程池和业务处理线程池(默认200)两个线程池,所以当业务的并发比较高,或者某些业务处理变慢,业务线程池就很容易被“打满”,抛出“RejectedExecutionException: Thread pool is EXHAUSTED! ”异常。
- all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。
- direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。
- message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在IO线程上执行。
- execution 只请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
- connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。
<dubbo:provider version="1.0" delay="-5000" timeout="5000" dispatcher="all" threads="400" loadbalance="leastactive" actives="400" />
ThreadPool
fixed 固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)。
cached 缓存线程池,空闲一分钟自动删除,需要时重建。
limited 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
eager 优先创建Worker线程池。在任务数量大于corePoolSize但是小于maximumPoolSize时,优先创建Worker来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比于cached:cached在任务数量超
过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)。
zookeeper 注册中心
Zookeeper 是 Apacahe Hadoop 的子项目,是一个树型的目录服务,支持变更推送,适合作为 Dubbo 服务的注册中心,工业强度较高,可用于生产环境,并推荐使用。
流程说明:
- 服务提供者启动时: 向 /dubbo/com.foo.BarService/providers 目录下写入自己的 URL 地址。
- 服务消费者启动时: 订阅 /dubbo/com.foo.BarService/providers 目录下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目录下写入自己的 URL 地址。
- 监控中心启动时: 订阅 /dubbo/com.foo.BarService 目录下的所有提供者和消费者 URL 地址。
Fault Tolerance 容错
- Failover – FailoverClusterInvoker
失败自动切换,尝试其他服务器。 (默认的方案) - Failfast – FailfastClusterInvoker
失败立即抛出异常,。通常用于非幂等性的写操作,比如新增记录。 - Failsafe – FailsafeClusterInvoker
失败忽略异常。通常用于写入审计日志等操作。 - Failback – FailbackClusterInvoker
失败自动恢复,记录日志并定时重试。 通常用于消息通知操作。 - Forking – ForkingClusterInvoker
并行调用多个服务,一个成功立即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=“2” 来设置最大并行数。 - Broadcast – BroadcastClusterInvoker
广播调用所有提供者,任意一个报错则报错
Load Balancing 负载均衡
random (随机,按权重设置随机概率) 默认的策略
round-robin (轮循,按公约后的权重设置轮循比率)
存在慢的提供者累积请求问题,比如:第二台机器很慢,但没挂,
当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。least-active (最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。) 使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大,不支持权重。 【线上用得比较多,注意dubbo低版本有bug】
consistent-hash(一致性Hash,相同参数的请求总是发到同一提供者)
当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动,会导致压力分摊不均。支持扩展
需要实现AbstractLoadBalance接口
SPI扩展实现
一、调用拦截扩展
扩展说明:服务提供方和服务消费方调用过程拦截,Dubbo 本身的大多功能均基于此扩展点实现,每次远程方法执行,该拦截都会被执行,请注意对性能的影响。dubbo层的限流和熔断可以用这个filter扩展来实现。
约定:
- 用户自定义 filter 默认在内置 filter 之后。
- 特殊值 default,表示缺省扩展点插入的位置。比如:filter=“xxx,default,yyy”,表示 xxx 在缺省 filter 之前,yyy 在缺省 filter 之后。
- 特殊符号 -,表示剔除。比如:filter=“-foo1”,剔除添加缺省扩展点 foo1。比如:filter=“-default”,剔除添加所有缺省扩展点。
- provider 和 service 同时配置的 filter 时,累加所有 filter,而不是覆盖。比如:<dubbo:provider filter="xxx,yyy"/> 和 <dubbo:service filter="aaa,bbb" />,则 xxx,yyy,aaa,bbb 均会生效。如果要覆盖,需配置:<dubbo:service filter="-xxx,-yyy,aaa,bbb" />
接口com.alibaba.dubbo.rpc.Filter
扩展配置
<dubbo:reference filter= “xxx,yyy” />
<dubbo:consumer filter= “xxx,yyy” />
<dubbo:service filter= “xxx,yyy” />
<dubbo:provider filter= “xxx,yyy” />
已知扩展
com.alibaba.dubbo.rpc.filter.EchoFilter
com.alibaba.dubbo.rpc.filter.GenericFilter
com.alibaba.dubbo.rpc.filter.GenericImplFilter
com.alibaba.dubbo.rpc.filter.TokenFilter
com.alibaba.dubbo.rpc.filter.AccessLogFilter
com.alibaba.dubbo.rpc.filter.CountFilter
com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
com.alibaba.dubbo.rpc.filter.ContextFilter
com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
com.alibaba.dubbo.rpc.filter.ExceptionFilter
com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
com.alibaba.dubbo.rpc.filter.DeprecatedFilter
总结得比较简单,摘自我的培训课件,适合根据这个文章来做分享,欢迎大家一起交流。