SPI机制是jdk5出现的新特性,很多框架用它来做服务发现,简单的说就是根据接口加载实现类,目的是为了实现开发的灵活性,实现可插拔式的系统架构。JDK java.util.servieloader文档下有如下举例说明:
1)定义好服务接口用以提供约束
如:服务类型为 com.example.CodecSet,它用来表示某些协议的编码器/解码器对集合。在这种情况下,它是一个具有两种抽象方法的抽象类:
public abstract Encoder getEncoder(String encodingName);
public abstract Decoder getDecoder(String encodingName);
2)编写实现,指定位置
如果 com.example.impl.StandardCodecs 是 CodecSet 服务的实现,则其 jar 文件还包含一个指定如下的文件:
META-INF/services/com.example.CodecSet
此文件包含一行:
com.example.impl.StandardCodecs # Standard codecs
3)使用ServiceLoader加载实现
ServiceLoader codecSetLoader<codecset>= ServiceLoader.load(CodecSet.class)
dubbo采用微核+插件化的思想,dubbo许多组件的加载都依赖于spi机制,那么dubbo是怎么做的呢,由于jdk原有的spi太过简陋,dubbo重新实现了一套spi机制。
上图为dubbo各种组件,dubbo协议、网络传输、序列化、动态代理、字节码编译、注册中心等都是灵活加载的,自己有默认的实现也提供了其他选择。在dubbo源码中经常会看到如下的插件加载代码。spi实现都在ExtensionLoader这个类里,这个类是带泛型的,
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
加载过程如下:
@SuppressWarnings("unchecked")
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
.................................
//从EXTENSION_LOADERS这个集合中根据类型获取扩展加载
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {//获取不到,直接new一个,并放到集合中缓存起来
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}//每个类型都有一个扩展加载器
return loader;
}
获取到扩展的加载器之后,接下来就要获取适配类
@SuppressWarnings("unchecked")
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();//先尝试从缓存集合中获取
if (instance == null) {
if(createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {//多线程访问安全性 此处加锁
instance = cachedAdaptiveInstance.get();//double check机制,再次尝试从缓存集合中获取
if (instance == null) {//集合中为空
try {
instance = createAdaptiveExtension();//尝试创建一个
cachedAdaptiveInstance.set(instance);//缓存起来
}
.......................................................
}
return (T) instance;
}
缓存中没有扩展类,创建扩展类
private T createAdaptiveExtension() {
try {//1、先获取扩展类class 2、然后注入扩展
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
}
}
//创建扩展类
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();//从文件中读取,此处在后面展开
/**从如下路径下寻找扩展实现定义文件,文件名称为接口名称,文件内容形式为name=扩展实现类路径
* META-INF/services/
* META-INF/dubbo/
* META-INF/dubbo/internal/
*1)若实现类上有@Adaptive注解 则为扩展适配类,缓存到cachedAdaptiveClass中
*2)如果实现类有构造方法参数为接口类型,则为扩展包装类,缓存到cachedWrapperClasses中
*3)都不是则是具体的扩展实现类,查看是否有@Activate注解,有缓存到cachedActivates中
*然后将实现类缓存到cachedNames中
*/
if (cachedAdaptiveClass != null) {//扩展加载器中已经存在,直接返回
return cachedAdaptiveClass;//扩展加载器中适配类是一对一关系
}
//扩展加载器中不存在,动态生成适配类,并缓存到扩展加载器中
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
//动态生成适配类
private Class<?> createAdaptiveExtensionClass() {
//创建代码字符串,只会为接口方法上带有@Adaptive注解的方法生成适配类
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();//找类加载器
/*使用spi机制加载编译器,compiler默认的适配类已经创建好,不需要动态生成,否则会出现死循环
*Compiler默认有两个实现,默认加载Javaassit实现
*/
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
//编译,load到内存,返回
return compiler.compile(code, classLoader);
}
最终动态生成的适配类如下形式所示(以protocol为例):
package com.alibaba.dubbo.demo.rayhong.test;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
// 没有打上@Adaptive的方法如果被调到抛异常
public void destroy() {
throw new UnsupportedOperationException(
"method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() "
+ "of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
// 没有打上@Adaptive的方法如果被调到抛异常
public int getDefaultPort() {
throw new UnsupportedOperationException(
"method public abstractint com.alibaba.dubbo.rpc.Protocol.getDefaultPort() "
+ "of interfacecom.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
// 接口中export方法打上@Adaptive注册
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invokerargument == null");
// 参数类中要有URL属性
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invokerargument getUrl() == null");
// 从入参获取统一数据模型URL
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
// 从统一数据模型URL获取协议,协议名就是spi扩展点实现类的key
if (extName == null)
throw new IllegalStateException("Fail to getextension(com.alibaba.dubbo.rpc.Protocol) "
+ "name from url(" + url.toString() + ") usekeys([protocol])");
// 利用dubbo服务查找机制根据名称找到具体的扩展点实现
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)
ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
// 调具体扩展点的方法
return extension.export(arg0);
}
// 接口中refer方法打上@Adaptive注册
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) {
// 统一数据模型URL不能为空
if (arg1 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
// 从统一数据模型URL获取协议,协议名就是spi扩展点实现类的key
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) "
+ "name from url(" + url.toString() + ") use keys([protocol])");
// 利用dubbo服务查找机制根据名称找到具体的扩展点实现
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)
ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
// 调具体扩展点的方法
return extension.refer(arg0, arg1);
}
}
下面进入适配中接口方法调用分析,在扩展适配类中,根据URL中不同扩展的名称,需要获取具体的扩展实现类(具体包装类),如protocol接口,在服务发布过程中经历registryProtocol--dubboProtocol的转变
ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
@SuppressWarnings("unchecked")
public T getExtension(String name) {
Holder<Object> holder = cachedInstances.get(name);//从缓存中接口获取具体实现类实例
if (holder == null) {//没有,new一个空的
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {//线程安全,依然dubbo check
instance = holder.get();
if (instance == null) {
instance = createExtension(name);//构造具体的实现类
holder.set(instance);//设置到缓存中
}
}
}
return (T) instance;
}
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);//此过程可能会有从文件中加载的过程
if (clazz == null) {//获取不到直接抛异常
throw findException(name);
}
T instance = (T) EXTENSION_INSTANCES.get(clazz);//此处依然缓存
if (instance == null) {//没有直接new 缓存起来
EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);//找到的具体扩展实现类,注入扩展,类似IoC机制
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
/* 找到扩展包装类,用刚刚找到的扩展实现类
* new出一个扩展实现类出来 进行依赖注入
* 然后再用上次new出的包装类,再次new一个扩展包装类出来,依次下去
* 最后接口方法调用顺序为:包装类1.方法()>包装类2.方法()>具体实现方法()
*/
if (wrapperClasses != null && wrapperClasses.size() > 0) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
}
}
//为组件实现注入扩展
private T injectExtension(T instance) {
try {
if (objectFactory != null) {//在ExtensionLoader构造方法初始化
for (Method method : instance.getClass().getMethods()) {
if (method.getName().startsWith("set")//方法名称以set开头
&& method.getParameterTypes().length == 1//有一个参数
&& Modifier.isPublic(method.getModifiers())) {//public修饰
Class<?> pt = method.getParameterTypes()[0];//属性类型
try {
// 获取属性名称
String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
/**objectFactory也是SPI机制加载的,适配类不需要动态生成
* AdaptiveExtensionFactory --适配类 依次从下面两个实现类获取
* SpiExtensionFactory --- 加载扩展适配类
* SpringExtensionFactory ---根据名称从spring容器中根据名称获取
*/
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
method.invoke(instance, object);//找到对象,实现注入
}
} catch (Exception e) {
logger.error("fail to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
//ExtensionFactory默认适配类
@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {
//ExtensionFactory具体实现类
private final List<ExtensionFactory> factories;
//初始化的时候,获取所有具体实现类,缓存到 factories集合中
public AdaptiveExtensionFactory() {
ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
for (String name : loader.getSupportedExtensions()) {
list.add(loader.getExtension(name));
}
factories = Collections.unmodifiableList(list);
}
public <T> T getExtension(Class<T> type, String name) {
for (ExtensionFactory factory : factories) {//遍历每个具体实现类,依次查找,找到返回
T extension = factory.getExtension(type, name);
if (extension != null) {
return extension;
}
}
return null;
}
}
//ExtensionFactory spring加载默认实现
public class SpringExtensionFactory implements ExtensionFactory {
/**
* 继续spi机制获取实现类,此处不进行IoC依赖注入
* SpringExtensionFactory 中context属性的注入
* 是在ServiceBean/ReferenceBean初始化调用
* SpringExtensionFactory.addApplicationContext(applicationContext)完成
s*/
private static final Set<ApplicationContext> contexts = new ConcurrentHashSet<ApplicationContext>();
public static void addApplicationContext(ApplicationContext context) {
contexts.add(context);
}
public static void removeApplicationContext(ApplicationContext context) {
contexts.remove(context);
}
@SuppressWarnings("unchecked")
public <T> T getExtension(Class<T> type, String name) {
for (ApplicationContext context : contexts) {
if (context.containsBean(name)) {
Object bean = context.getBean(name);
if (type.isInstance(bean)) {
return (T) bean;
}
}
}
return null;
}
}