拿上一篇的rmi的例子进行源码分析:https://www.jianshu.com/p/adb191f83e4c
1.重要类url类图
-
远程对象发布的uml图
比如HelloServiceImpl和RegistryImpl的发布uml图如下
可以发现RegistryImpl注册中心和HelloServiceImpl最终都是实现了Remote的接口,而且两个对象的继承类似,再后面源码分析的时候,后面的暴露服务的逻辑是一样的
rmi中用到的远程引用对象的url图
UnicastServerRef是作为注册中心的远程引用对象,LiveRef主要包含传输层类TcpTransport对象来进行通信,然后UnicastRef包含LiveRef持有传输层类TcpTransport和确定唯一性的ObjID
2.rmi的调用流程
现在我们先忘记Java中有RMI这种东西。假设我们需要自己实现上面例子中的效果,怎么办呢?可以想到的步骤是:
- 编写服务端服务,并将其通过某个服务机的端口暴露出去供客户端调用。
- 编写客户端程序,客户端通过指定服务所在的主机和端口号、将请求封装并序列化,最终通过网络协议发送到服务端。
- 服务端解析和反序列化请求,调用服务端上的服务,将结果序列化并返回给客户端。
- 客户端接收并反序列化服务端返回的结果,反馈给用户。
这是大致的流程,我们不难想到,RMI其实也是帮我们封装了一些细节而通用的部分,比如序列化和反序列化,连接的建立和释放等,下面是RMI的具体流程:
这里涉及到几个新概念:
Stub和Skeleton:这两个的身份是一致的,都是作为代理的存在。客户端的称作Stub,服务端的称作Skeleton。要做到对程序员屏蔽远程方法调用的细节,这两个代理是必不可少的,包括网络连接等细节。
Registry:顾名思义,可以认为Registry是一个“注册所”,提供了服务名到服务的映射。如果没有它,意味着客户端需要记住每个服务所在的端口号,这种设计显然是不优雅的。
3.rmi的源码层面分析
源码分析我参考了这边博客:https://blog.csdn.net/sinat_34596644/article/details/52599688,但是我会对这里的源码分析进行补充
服务端启动Registry服务
Registry registry=LocateRegistry.createRegistry(1099);
从上面这句代码入手,追溯下去,可以发现服务端创建了一个RegistryImpl对象,这里做了一个判断。如果服务端指定的端口号是1099并且系统开启了安全管理器,那么可以在限定的权限集内(listen和accept)绕过系统的安全校验。反之则必须进行安全校验。这里纯粹是为了效率起见。真正做的事情在setUp()方法中,继续看下去。
public RegistryImpl(final int var1) throws RemoteException {
if(var1 == 1099 && System.getSecurityManager() != null) {
//这里不会调用,默认的情况下,如果不配置的话,SecurityManager为null
try {
AccessController.doPrivileged(new PrivilegedExceptionAction() {
public Void run() throws RemoteException {
LiveRef var1x = new LiveRef(RegistryImpl.id, var1);
RegistryImpl.this.setup(new UnicastServerRef(var1x));
return null;
}
}, (AccessControlContext)null, new Permission[]{new SocketPermission("localhost:" + var1, "listen,accept")});
} catch (PrivilegedActionException var3) {
throw (RemoteException)var3.getException();
}
} else {
LiveRef var2 = new LiveRef(id, var1);
//调用setup
this.setup(new UnicastServerRef(var2));
}
}
setUp()方法将指向正在初始化的RegistryImpl对象的远程引用ref(RemoteRef)赋值为传入的UnicastServerRef对象,这里涉及了向上转型。然后继续移交UnicastServerRef的exportObject()方法。
private void setup(UnicastServerRef var1) throws RemoteException {
this.ref = var1;
var1.exportObject(this, (Object)null, true);
}
进入UnicastServerRef的exportObject()方法。可以看到,这里首先为传入的RegistryImpl创建一个代理,这个代理我们可以推断出就是后面服务于客户端的RegistryImpl的Stub对象。然后将UnicastServerRef的skel(skeleton)对象设置为当前RegistryImpl对象。最后用skeleton、stub、UnicastServerRef对象、id和一个boolean值构造了一个Target对象,也就是这个Target对象基本上包含了全部的信息。调用UnicastServerRef的ref(LiveRef)变量的exportObject()方法。
public Remote exportObject(Remote var1, Object var2, boolean var3) throws RemoteException {
Class var4 = var1.getClass();
Remote var5;
try {
//创建一个代理对象 RegistryImpl_stub,这个类是存在的,所有直接创建,如果是暴露的是普通对象,则会动态代理创建对象
var5 = Util.createProxy(var4, this.getClientRef(), this.forceStubUse);
} catch (IllegalArgumentException var7) {
throw new ExportException("remote object implements illegal remote interface", var7);
}
if(var5 instanceof RemoteStub) { //RegistryImpl_stub继承了RemoteStub对象
//会为UnicastServerRef对象的属性Skeleton skel 创建RegistryImpl_Skel对象
this.setSkeleton(var1);
}
//Target 包含了RegistryImpl,Dispatcher的具体实现类UnicastServerRef,RegistryImpl_stub,ObjID对象
Target var6 = new Target(var1, this, var5, this.ref.getObjID(), var3);
this.ref.exportObject(var6);
this.hashToMethod_Map = (Map)hashToMethod_Maps.get(var4);
return var5;
}
到上面为止,我们看到的都是一些变量的赋值和创建工作,还没有到连接层,这些引用对象将会被Stub和Skeleton对象使用。接下来就是连接层上的了。追溯LiveRef的exportObject()方法,很容易找到了TCPTransport的exportObject()方法。这个方法做的事情就是将上面构造的Target对象暴露出去。调用TCPTransport的listen()方法,listen()方法创建了一个ServerSocket,并且启动了一条线程等待客户端的请求。接着调用父类Transport的exportObject()将Target对象存放进ObjectTable中。
public void exportObject(Target var1) throws RemoteException {
synchronized(this) {
//创建ServerSocket,并监听
this.listen();
++this.exportCount;
}
boolean var2 = false;
boolean var12 = false;
try {
var12 = true;
//暴露对象,存放在ObjectTable中
super.exportObject(var1);
var2 = true;
var12 = false;
} finally {
if(var12) {
if(!var2) {
synchronized(this) {
this.decrementExportCount();
}
}
}
}
if(!var2) {
synchronized(this) {
this.decrementExportCount();
}
}
}
到这里,我们已经将RegistryImpl对象创建并且起了服务等待客户端的请求。
客户端获取服务端Rgistry代理
IHelloService helloService = (IHelloService)Naming.lookup("rmi://127.0.0.1/Hello");
从上面的代码看起,容易追溯到LocateRegistry的getRegistry()方法。这个方法做的事情是通过传入的host和port构造RemoteRef对象,并创建了一个本地代理。可以通过Debug功能发现,这个代理对象其实是RegistryImpl_Stub对象。这样客户端便有了服务端的RegistryImpl的代理(取决于ignoreStubClasses变量)。但注意此时这个代理其实还没有和服务端的RegistryImpl对象关联,毕竟是两个VM上面的对象,这里我们也可以猜测,代理和远程的Registry对象之间是通过socket消息来完成的。
public static Registry getRegistry(String host, int port,
RMIClientSocketFactory csf)
throws RemoteException
{
Registry registry = null;
if (port <= 0)
port = Registry.REGISTRY_PORT;
if (host == null || host.length() == 0) {
// If host is blank (as returned by "file:" URL in 1.0.2 used in
// java.rmi.Naming), try to convert to real local host name so
// that the RegistryImpl's checkAccess will not fail.
try {
host = java.net.InetAddress.getLocalHost().getHostAddress();
} catch (Exception e) {
// If that failed, at least try "" (localhost) anyway...
host = "";
}
}
//这里构造了TCPEndpoint,并把host和端口号构造进去了
LiveRef liveRef =
new LiveRef(new ObjID(ObjID.REGISTRY_ID),
new TCPEndpoint(host, port, csf, null),
false);
//构造了UnicastRef通信服务对象
RemoteRef ref =
(csf == null) ? new UnicastRef(liveRef) : new UnicastRef2(liveRef);
//返回的是RegistryImpl_Stub,并在RegistryImpl_Stub的构造方法注入UnicastRef对象
return (Registry) Util.createProxy(RegistryImpl.class, ref, false);
}
服务端创建服务对象
public class HelloServiceImpl extends UnicastRemoteObject implements IHelloService {
protected HelloServiceImpl() throws RemoteException {
super();
}
@Override
public String sayHello(String msg) throws RemoteException {
return "hello,"+ msg;
}
}
从HelloServiceImpl的构造函数看起。调用了父类UnicastRemoteObject的构造方法,追溯到UnicastRemoteObject的私有方法exportObject()。这里做了一个判断,判断服务的实现是不是UnicastRemoteObject的子类,如果是,则直接赋值其ref(RemoteRef)对象为传入的UnicastServerRef对象。反之则调用UnicastServerRef的exportObject()方法。这里我们是第一种情况。
private static Remote exportObject(Remote obj, UnicastServerRef sref)
throws RemoteException
{
// if obj extends UnicastRemoteObject, set its ref.
if (obj instanceof UnicastRemoteObject) {
((UnicastRemoteObject) obj).ref = sref;
}
return sref.exportObject(obj, null, false);
}
将服务实现绑定到服务端的Registry上,使得客户端只需与Registry交互。
Naming.rebind("rmi://127.0.0.1/Hello",helloService); //注冊中心
从上面这行代码开始看,容易发现Naming的方法全部都是调用的Registry的方法。这里通过host和port找到我们第一步启动的服务端Registry服务对象,追溯到其rebind()方法,可以看到,其实做的事情很是简单,就是把名字和服务实现存进一个Map里面。
public void rebind(String var1, Remote var2) throws RemoteException, AccessException {
checkAccess("Registry.rebind");
this.bindings.put(var1, var2);
}
-
客户端查找远程服务
接下来就是重头戏了,从下面代码看起。
IHelloService helloService = (IHelloService)Naming.lookup("rmi://127.0.0.1/Hello");
追溯下去,获取到远程Registry对象的代理对象之后,调用RegistryImpl_Stub的lookUp()方法。主要代码如下。做的事情是利用上面通过服务端host和port等信息创建的RegistryImpl_stub对象构造RemoteCall调用对象,operations参数中是各个Registry中声明的操作,2指明了是lookUp()操作。接下来分步骤看看...
try {
RemoteCall var2 = super.ref.newCall(this, operations, 2, 4905912898345647071L);
try {
ObjectOutput var3 = var2.getOutputStream();
var3.writeObject(var1);
} catch (IOException var18) {
throw new MarshalException("error marshalling arguments", var18);
}
super.ref.invoke(var2);
Remote var23;
try {
ObjectInput var6 = var2.getInputStream();
var23 = (Remote)var6.readObject();
} catch (IOException var15) {
throw new UnmarshalException("error unmarshalling return", var15);
} catch (ClassNotFoundException var16) {
throw new UnmarshalException("error unmarshalling return", var16);
} finally {
super.ref.done(var2);
}
return var23;
}
调用 RegistryImpl_Stub的ref(RemoteRef)对象的newCall()方法,将RegistryImpl_Stub对象传了进去,不要忘了构造它的时候我们将服务器的主机端口等信息传了进去,也就是我们把服务器相关的信息也传进了newCall()方法。newCall()方法做的事情简单来看就是建立了跟远程RegistryImpl的Skeleton对象的连接。(不要忘了上面我们说到过服务端通过TCPTransport的exportObject()方法等待着客户端的请求)
public RemoteCall newCall(RemoteObject var1, Operation[] var2, int var3, long var4) throws RemoteException {
clientRefLog.log(Log.BRIEF, "get connection");
Connection var6 = this.ref.getChannel().newConnection();
try {
clientRefLog.log(Log.VERBOSE, "create call context");
if(clientCallLog.isLoggable(Log.VERBOSE)) {
this.logClientCall(var1, var2[var3]);
}
StreamRemoteCall var7 = new StreamRemoteCall(var6, this.ref.getObjID(), var3, var4);
try {
this.marshalCustomCallData(var7.getOutputStream());
} catch (IOException var9) {
throw new MarshalException("error marshaling custom call data");
}
return var7;
} catch (RemoteException var10) {
this.ref.getChannel().free(var6, false);
throw var10;
}
}
连接建立之后自然就是发送请求了。我们知道客户端终究只是拥有Registry对象的代理,而不是真正地位于服务端的Registry对象本身,他们位于不同的虚拟机实例之中,无法直接调用。必然是通过消息进行交互的。看看super.ref.invoke()这里做了什么?容易追溯到StreamRemoteCall的executeCall()方法。看似本地调用,但其实很容易从代码中看出来是通过tcp连接发送消息到服务端。由服务端解析并且处理调用。
try {
if(this.out != null) {
var2 = this.out.getDGCAckHandler();
}
this.releaseOutputStream();
DataInputStream var3 = new DataInputStream(this.conn.getInputStream());
byte var4 = var3.readByte();
if(var4 != 81) {
if(Transport.transportLog.isLoggable(Log.BRIEF)) {
Transport.transportLog.log(Log.BRIEF, "transport return code invalid: " + var4);
}
throw new UnmarshalException("Transport return code invalid");
}
this.getInputStream();
var1 = this.in.readByte();
this.in.readID();
}
至此,我们已经将客户端的服务查询请求发出了。
服务端接收客户端的服务查询请求并返回给客户端结果
这里我用的方法是直接断点在服务端的Thread的run()方法中,因为我们知道服务端已经用线程跑起了服务(当然我是先断点在Registry_Impl的lookUp()方法并查找调用栈找到源头的)。一步一步我们找到了Transport的serviceCall()方法,这个方法是关键。瞻仰一下主要的代码,到ObjectTable.getTarget()为止做的事情是从socket流中获取ObjId,并通过ObjId和Transport对象获取Target对象,这里的Target对象已经是服务端的对象。再借由Target的派发器Dispatcher,传入参数服务实现和请求对象RemoteCall,将请求派发给服务端那个真正提供服务的RegistryImpl的lookUp()方法,这就是Skeleton移交给具体实现的过程了,Skeleton负责底层的操作。
try {
ObjID var40;
try {
var40 = ObjID.read(var1.getInputStream());
} catch (IOException var34) {
throw new MarshalException("unable to read objID", var34);
}
Transport var41 = var40.equals(dgcID)?null:this;
//通过ObjId和Transport对象获取Target对象
Target var5 = ObjectTable.getTarget(new ObjectEndpoint(var40, var41));
final Remote var38;
if(var5 != null && (var38 = var5.getImpl()) != null) {
//这里获取的Dispatcher对象,也就是UnicastServerRef对象
final Dispatcher var6 = var5.getDispatcher();
var5.incrementCallCount();
boolean var8;
try {
transportLog.log(Log.VERBOSE, "call dispatcher");
final AccessControlContext var7 = var5.getAccessControlContext();
ClassLoader var42 = var5.getContextClassLoader();
Thread var9 = Thread.currentThread();
ClassLoader var10 = var9.getContextClassLoader();
try {
var9.setContextClassLoader(var42);
currentTransport.set(this);
try {
AccessController.doPrivileged(new PrivilegedExceptionAction() {
public Void run() throws IOException {
Transport.this.checkAcceptPermission(var7);
//这里会调用UnicastServerRef的dispatch方法
var6.dispatch(var38, var1);
return null;
}
}, var7);
return true;
} catch (PrivilegedActionException var32) {
throw (IOException)var32.getException();
}
} finally {
var9.setContextClassLoader(var10);
currentTransport.set((Object)null);
}
} catch (IOException var35) {
transportLog.log(Log.BRIEF, "exception thrown by dispatcher: ", var35);
var8 = false;
} finally {
var5.decrementCallCount();
}
return var8;
}
throw new NoSuchObjectException("no such object in table");
}
接下来往下面看,UnicastServerRef的dispatch方法的代码如下,由于UnicastServerRef的skel不为null,会调用 this.oldDispatch(var1, var2, var3)的这个方法,但是最终会调用RegistryImpl_Skel的dispatch的方法
public void dispatch(Remote var1, RemoteCall var2) throws IOException {
try {
long var4;
ObjectInput var39;
try {
var39 = var2.getInputStream();
int var3 = var39.readInt();
if (var3 >= 0) {
if (this.skel != null) {
//这里会进行调用
this.oldDispatch(var1, var2, var3);
return;
}
throw new UnmarshalException("skeleton class not found but required for client version");
}
var4 = var39.readLong();
} catch (Exception var35) {
throw new UnmarshalException("error unmarshalling call header", var35);
}
MarshalInputStream var38 = (MarshalInputStream)var39;
var38.skipDefaultResolveClass();
Method var8 = (Method)this.hashToMethod_Map.get(var4);
if (var8 == null) {
throw new UnmarshalException("unrecognized method hash: method not supported by remote object");
}
this.logCall(var1, var8);
Object[] var9 = null;
try {
this.unmarshalCustomCallData(var39);
var9 = this.unmarshalParameters(var1, var8, var38);
} catch (IOException var32) {
throw new UnmarshalException("error unmarshalling arguments", var32);
} catch (ClassNotFoundException var33) {
throw new UnmarshalException("error unmarshalling arguments", var33);
} finally {
var2.releaseInputStream();
}
Object var10;
try {
var10 = var8.invoke(var1, var9);
} catch (InvocationTargetException var31) {
throw var31.getTargetException();
}
try {
ObjectOutput var11 = var2.getResultStream(true);
Class var12 = var8.getReturnType();
if (var12 != Void.TYPE) {
marshalValue(var12, var10, var11);
}
} catch (IOException var30) {
throw new MarshalException("error marshalling return", var30);
}
} catch (Throwable var36) {
Object var6 = var36;
this.logCallException(var36);
ObjectOutput var7 = var2.getResultStream(false);
if (var36 instanceof Error) {
var6 = new ServerError("Error occurred in server thread", (Error)var36);
} else if (var36 instanceof RemoteException) {
var6 = new ServerException("RemoteException occurred in server thread", (Exception)var36);
}
if (suppressStackTraces) {
clearStackTraces((Throwable)var6);
}
var7.writeObject(var6);
} finally {
var2.releaseInputStream();
var2.releaseOutputStream();
}
}
其中在 RegistryImpl_Skel的dispatch的方法里中有
public void dispatch(Remote var1, RemoteCall var2, int var3, long var4) throws Exception {
....
//获取注册中心
RegistryImpl var6 = (RegistryImpl)var1;
....
//查找服务 这里是从注册中心获取 HelloServiceImpl对象
var8 = var6.lookup(var7);
...
}
看看RegistryImpl的lookUp()实现。做了同步控制,并通过服务名从Map中取出服务对象。返回给客户端。还记得我们在bindings中存放的其实是OperationImpl的真正实现,并非是Stub对象。
public Remote lookup(String var1) throws RemoteException, NotBoundException {
Hashtable var2 = this.bindings;
synchronized(this.bindings) {
Remote var3 = (Remote)this.bindings.get(var1);
if(var3 == null) {
throw new NotBoundException(var1);
} else {
return var3;
}
}
}
客户端获取通过lookUp()查询获得的客户端HelloServiceImpl的Stub对象
这里就不多说了。。多说无益。心好累。凭什么服务端返回给客户端的是服务的实现,但是客户端获取到的是Stub对象呢?用同样的断点的方法,我们可以发现问题出在RegistryImpl_stub的lookup方法中的var6.readObject(),里面其实也是创建了一个代理。这就是那个Stub类。
public Remote lookup(String var1) throws AccessException, NotBoundException, RemoteException {
try {
RemoteCall var2 = super.ref.newCall(this, operations, 2, 4905912898345647071L);
try {
ObjectOutput var3 = var2.getOutputStream();
var3.writeObject(var1);
} catch (IOException var18) {
throw new MarshalException("error marshalling arguments", var18);
}
super.ref.invoke(var2);
Remote var23;
try {
ObjectInput var6 = var2.getInputStream();
//在于该方法会把HelloServiceImpl的对象转成stub对象,
//实际上调用的是MarshalInputStream的resolveProxyClass()
var23 = (Remote)var6.readObject();
} catch (IOException var15) {
throw new UnmarshalException("error unmarshalling return", var15);
} catch (ClassNotFoundException var16) {
throw new UnmarshalException("error unmarshalling return", var16);
} finally {
super.ref.done(var2);
}
return var23;
} catch (RuntimeException var19) {
throw var19;
} catch (RemoteException var20) {
throw var20;
} catch (NotBoundException var21) {
throw var21;
} catch (Exception var22) {
throw new UnexpectedException("undeclared checked exception", var22);
}
}
客户端进行真正地远程服务调用
到目前为止,客户端已经有了Stub对象。就可以和服务端进行愉快地交流了。
细心的朋友可能发现这个例子中的服务实现HelloServiceImpl继承UnicastRemoteObject,就像前面说的,它似乎不会像RegistryImpl一样在服务端生成Skeleton对象。(对于非UnicastRemoteObject的则会生成Skeleton没啥争议)。我的理解是必然会进行一些处理生成Skeleton对象。因为Registry只是用来查找服务,最终调用服务还是得要客户端与服务的连接。这个连接必然由Skeleton为我们屏蔽了。
总结一下:
一定要说明,在 RMI Client 实施正式的 RMI 调用前,它必须通过 LocateRegistry 或者 Naming 方式到 RMI 注册表寻找要调用的 RMI 注册信息。找到 RMI 事务注册信息后,Client 会从 RMI 注册表获取这个 RMI Remote Service 的Stub
信息。在本例子就是HelloServiceImpl的代理类,这个过程成功后, RMI Client 才能开始正式的调用过程。
另外要说明的是 RMI Client 正式调用过程,也不是由 RMIClient 直接访问 Remote Service,而是由客户端获取的Stub 作为 RMI Client 的代理访问 Remote Service 的代理Skeleton
。也就是说真实的请求调用是在 Stub-Skeleton 之间进行的
。Registry 并不参与具体的 Stub-Skeleton 的调用过程,只负责记录“哪个服务名”使用哪一个 Stub,并在 RemoteClient 询问它时将这个 Stub 拿给 Client(如果没有就会报错)。