SPI简介
站在一个框架作者的角度来说,定义一个接口,自己默认给出几个接口的实现类,同时 允许框架的使用者也能够自定义接口的实现。现在一个简单的问题就是:如何优雅的根据一个接口来获取该接口的所有实现类呢?
JDK SPI 正是为了优雅解决这个问题而生,SPI 全称为 (Service Provider Interface),即服务提供商接口,是JDK内置的一种服务提供发现机制。目前有不少框架用它来做服务的扩展发现,简单来说,它就是一种动态替换发现服务实现者的机制。
JDK SPI
JDK为SPI的实现提供了工具类,即java.util.ServiceLoader,ServiceLoader中定义的SPI规范没有什么特别之处,只需要有一个提供者配置文件(provider-configuration file),该文件需要在resource目录META-INF/services下,文件名就是服务接口的全限定名
public class SpiMain {
public static void main(String[] args) {
ServiceLoader<IHello> loaders = ServiceLoader.load(IHello.class);
System.out.println(loaders);
for (IHello hello : loaders) {
System.out.println(hello.sayHello("mergades"));
}
}
}
ServiceLoader 源码
读取文件
private static final String PREFIX = "META-INF/services/";
private boolean hasNextService() {
if (nextName != null) {
return true;
}
if (configs == null) {
try {
String fullName = PREFIX + service.getName();
if (loader == null)
configs = ClassLoader.getSystemResources(fullName);
else
configs = loader.getResources(fullName);
} catch (IOException x) {
fail(service, "Error locating configuration files", x);
}
}
while ((pending == null) || !pending.hasNext()) {
if (!configs.hasMoreElements()) {
return false;
}
pending = parse(service, configs.nextElement());
}
nextName = pending.next();
return true;
}
JDK SPI缺点
- 虽然ServiceLoader也算是使用的延迟加载,但是基本只能通过遍历全部获取,也就是接口的实现类全部加载并实例化一遍。如果你并不想用某些实现类,它也被加载并实例化了,这就造成了浪费。
获取某个实现类的方式不够灵活,只能通过Iterator形式获取,不能根据某个参数来获取对应的实现类。 - 多个并发多线程使用ServiceLoader类的实例是不安全的
Dubbo SPI扩展
Dubbo通过ExtensionLoader 来加载自定义的SPI服务。
Dubbo为了应对各种场景,所有的内部组件都是通过SPI的方式来管理的,
扩展功能
- 方便获取扩展实现:JDK的SPI机制仅仅只能通过接口类名获取所有实现,而ExtensionLoader则通过接口类名和key值获取一个实现。
- IOC依赖注入功能。Adaptive实现,就是生成一个代理类,这样子就根据实际调用参数动态决定要调用的类。
- 使用装饰器模式进行自动增强,自动包装实现。
扩展源码分析
ExtensinLoader初始化
/**
* Protocol. (API/SPI, Singleton, ThreadSafe)
*/
@SPI("dubbo")
public interface Protocol {
/**
* Get default port when user doesn't config the port.
*
* @return default port
*/
int getDefaultPort();
/**
* Export service for remote invocation: <br>
* 1. Protocol should record request source address after receive a request:
* RpcContext.getContext().setRemoteAddress();<br>
* 2. export() must be idempotent, that is, there's no difference between invoking once and invoking twice when
* export the same URL<br>
* 3. Invoker instance is passed in by the framework, protocol needs not to care <br>
*
* @param <T> Service type
* @param invoker Service invoker
* @return exporter reference for exported service, useful for unexport the service later
* @throws RpcException thrown when error occurs during export the service, for example: port is occupied
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/**
* Refer a remote service: <br>
* 1. When user calls `invoke()` method of `Invoker` object which's returned from `refer()` call, the protocol
* needs to correspondingly execute `invoke()` method of `Invoker` object <br>
* 2. It's protocol's responsibility to implement `Invoker` which's returned from `refer()`. Generally speaking,
* protocol sends remote request in the `Invoker` implementation. <br>
* 3. When there's check=false set in URL, the implementation must not throw exception but try to recover when
* connection fails.
*
* @param <T> Service type
* @param type Service class
* @param url URL address for the remote service
* @return invoker service's local proxy
* @throws RpcException when there's any error while connecting to the service provider
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
/**
* Destroy protocol: <br>
* 1. Cancel all services this protocol exports and refers <br>
* 2. Release all occupied resources, for example: connection, port, etc. <br>
* 3. Protocol can continue to export and refer new service even after it's destroyed.
*/
void destroy();
}
使用实例
ExtensionLoader<Protocol> protocolLoader = ExtensionLoader.getExtensionLoader(Protocol.class);
Protocol dubboProtocol = protocolLoader.getExtension(DubboProtocol.NAME);
可以参考dubbo源码单元测试 com.alibaba.dubbo.common.extensionloader.ExtensionLoaderTest
@Test
public void test_getDefaultExtension() throws Exception {
SimpleExt ext = ExtensionLoader.getExtensionLoader(SimpleExt.class).getDefaultExtension();
assertThat(ext, instanceOf(SimpleExtImpl1.class));
String name = ExtensionLoader.getExtensionLoader(SimpleExt.class).getDefaultExtensionName();
assertEquals("impl1", name);
SimpleExt simpleExt = ExtensionLoader.getExtensionLoader(SimpleExt.class).getExtension("impl2");
assertThat(simpleExt, instanceOf(SimpleExtImpl2.class));
}
如上单元测试可以看到,可以通过ExtensionLoader.getExtensionLoader(SimpleExt.class).getExtension("impl2"); name来加载不同的扩展,相比于JDK SPI机制,方便很多。
通过
- ExtensionLoader.getExtensionLoader(Protocol.class) 获取对应的ExtensionLoader 实例
- protocolLoader.getExtension(DubboProtocol.NAME):根据Key获取相应的扩展实现类实例
配置文件扫描
Dubbo默认依次扫描META-INF/dubbo/internal/、META-INF/dubbo/、META-INF/services/三个classpath目录下的配置文件。配置文件以具体扩展接口全名命名,如:com.alibaba.dubbo.rpc.Filter
如下配置
# Comment 1
impl1=com.alibaba.dubbo.common.extensionloader.ext1.impl.SimpleExtImpl1#Hello World
impl2=com.alibaba.dubbo.common.extensionloader.ext1.impl.SimpleExtImpl2 # Comment 2
impl3=com.alibaba.dubbo.common.extensionloader.ext1.impl.SimpleExtImpl3 # with head space
ExtensionLoader#getExtensionClasses
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}
private Map<String, Class<?>> loadExtensionClasses() {
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation != null) {
String value = defaultAnnotation.value();
if ((value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if (names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if (names.length == 1) cachedDefaultName = names[0];
}
}
Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
loadDirectory(extensionClasses, DUBBO_DIRECTORY);
loadDirectory(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses;
}
扩展适配器
在dubbo扩展中,适配器模式被广泛应用,其作用在于为同一个接口下面不同的多个实现的调用路由功能,如制定优先级,dubbo提供静态适配和动态适配器。
静态适配器
/**
* AdaptiveCompiler. (SPI, Singleton, ThreadSafe)
*/
@Adaptive
public class AdaptiveCompiler implements Compiler {
private static volatile String DEFAULT_COMPILER;
public static void setDefaultCompiler(String compiler) {
DEFAULT_COMPILER = compiler;
}
@Override
public Class<?> compile(String code, ClassLoader classLoader) {
Compiler compiler;
ExtensionLoader<Compiler> loader = ExtensionLoader.getExtensionLoader(Compiler.class);
String name = DEFAULT_COMPILER; // copy reference
if (name != null && name.length() > 0) {
compiler = loader.getExtension(name);
} else {
compiler = loader.getDefaultExtension();
}
return compiler.compile(code, classLoader);
}
}
动态适配器
动态适配器扩展即通过动态代理生成的动态代理类,在dubbo中通过javassist技术生成。
从ExtensionLoader构造器中会调用getAdaptiveExtension()方法触发为当前扩展类型生成适配器
private ExtensionLoader(Class<?> type) {
this.type = type;
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
ExtensionFactory
@SPI
public interface ExtensionFactory {
/**
* Get extension.
*
* @param type object type.
* @param name object name.
* @return object instance.
*/
<T> T getExtension(Class<T> type, String name);
}
初始化 ExtensionFactory 实例,如果非ExtensionFactory ,则直接获取getAdaptiveExtension()
@SuppressWarnings("unchecked")
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
}
}
}
} else {
throw new IllegalStateException("Failed to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}
return (T) instance;
}
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
第一步获取getAdaptiveExtensionClass
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
private Class<?> createAdaptiveExtensionClass() {
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
ClassLoader classLoader = findClassLoader();
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
getExtensionClasses中,再继续调用loadExtensionClasses架子啊默认的扩展机制。
ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension() 中dubbo底层会动态编译,生成。
Protocol产生的AdaptiveExtension如下:
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException(
"method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException(
"method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Exporter export( com.alibaba.dubbo.rpc.Invoker arg0)
throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) {
throw new IllegalArgumentException(
"com.alibaba.dubbo.rpc.Invoker argument == null");
}
if (arg0.getUrl() == null) {
throw new IllegalArgumentException(
"com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
}
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = ((url.getProtocol() == null) ? "dubbo"
: url.getProtocol());
if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" +
url.toString() + ") use keys([protocol])");
}
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class)
.getExtension(extName);
return extension.export(arg0);
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0,
com.alibaba.dubbo.common.URL arg1)
throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) {
throw new IllegalArgumentException("url == null");
}
com.alibaba.dubbo.common.URL url = arg1;
String extName = ((url.getProtocol() == null) ? "dubbo"
: url.getProtocol());
if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" +
url.toString() + ") use keys([protocol])");
}
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class)
.getExtension(extName);
return extension.refer(arg0, arg1);
}
}
ExtensionFactory 类似Spring的IOC作用,ExtensionLoader#injectExtension 主要完成IOC功能。
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
for (Method method : instance.getClass().getMethods()) {
if (method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) {
/**
* Check {@link DisableInject} to see if we need auto injection for this property
*/
if (method.getAnnotation(DisableInject.class) != null) {
continue;
}
Class<?> pt = method.getParameterTypes()[0];
try {
String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("fail to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
由以下代码完成真正的注入:
Object object = objectFactory.getExtension(pt, property);
AdaptiveExtensionFactory实例中的factories的size返回应为2,里面只会保存这两个类实例:
spring=com.alibaba.dubbo.config.spring.extension.SpringExtensionFactory
spi=com.alibaba.dubbo.common.extension.factory.SpiExtensionFactory
因为adaptive=com.alibaba.dubbo.common.extension.factory.AdaptiveExtensionFactory是保存在cachedAdaptiveClass上的
public AdaptiveExtensionFactory() {
ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
for (String name : loader.getSupportedExtensions()) {
list.add(loader.getExtension(name));
}
factories = Collections.unmodifiableList(list);
}
其中,SpringExtensionFactory 中,会更具Spring的ApplicationContext 来获取对应的bean实例。
SpringExtensionFactory
public <T> T getExtension(Class<T> type, String name) {
for (ApplicationContext context : contexts) {
if (context.containsBean(name)) {
Object bean = context.getBean(name);
if (type.isInstance(bean)) {
return (T) bean;
}
}
}
logger.warn("No spring extension (bean) named:" + name + ", try to find an extension (bean) of type " + type.getName());
if (Object.class == type) {
return null;
}
for (ApplicationContext context : contexts) {
try {
return context.getBean(type);
} catch (NoUniqueBeanDefinitionException multiBeanExe) {
logger.warn("Find more than 1 spring extensions (beans) of type " + type.getName() + ", will stop auto injection. Please make sure you have specified the concrete parameter type and there's only one extension of that type.");
} catch (NoSuchBeanDefinitionException noBeanExe) {
if (logger.isDebugEnabled()) {
logger.debug("Error when get spring extension(bean) for type:" + type.getName(), noBeanExe);
}
}
}
logger.warn("No spring extension (bean) named:" + name + ", type:" + type.getName() + " found, stop get bean.");
return null;
}
直接从Spring容器中获取对应的bean
SpiExtensionFactory
@Override
public <T> T getExtension(Class<T> type, String name) {
if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type);
if (!loader.getSupportedExtensions().isEmpty()) {
return loader.getAdaptiveExtension();
}
}
return null;
}
最终还有由 loader.getAdaptiveExtension() 完成。
AdaptiveExtensionFactory 作用
(1)就是只有当相应的SPI接口的所有方法上是否有带Adaptive注解的方法,如果有就会生成动态类的代码然后进行动态编译(比如使用javassist框架),如果没有带Adaptive注解的方法 ,那就说明该SPI接口是没有Adaptive性质的实现类的,就会拋出异常
(2)动态类的本质也是在实现相应的SPI接口,它最终也是在调一个现成的SPI实现类来工作,这样就会有这样的疑问,那为何不直接指定呢,还非得用动态的呢,这就是为什么凡是在方法上出现Adaptive注解的SPI的Adaptive形式都要动态的原因了,因为dubbo这样一来就可以做到用不同的Adaptive方法,调不同的SPI实现类去处理。
SPI的应用
具体文件内容
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
配置了一个键值对,key为dubbo,值为org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol,在其它几个子包下,也有名称叫做org.apache.dubbo.rpc.Protocol的配置文件,说明Protocol插口有几个对应的插件
可以猜测一下,当<dubbo:protocol/>仅仅配置了name="dubbo",port="20880"时,会加载哪一个协议插件呢,根据名称,可以猜测,加载的DubboProtocol插件。那dubbo是怎样做到的呢,我们来一探究竟。
三个注解
- SPI:这个注解使用在接口上,标识接口是否是extension(扩展或插口),可以接收一个默认的extension名称
- Adaptive: 这个注解可以使用在类或方法上,决定加载哪一个extension,值为字符串数组,数组中的字符串是key值,比如new String[]{"key1","key2"};先在URL中寻找key1的值,如果找到,则使用此值加载extension,如果key1没有,则寻找key2的值,如果key2也没有,则使用接口SPI注解的值,如果接口SPI注解,没有配置默认值,则将接口名按照首字母大写分成多个部分,然后以'.'分隔,例如org.apache.dubbo.xxx.YyyInvokerWrapper接口名会变成yyy.invoker.wrapper,然后以此名称做为key到URL寻找,如果仍没有找到,则抛出IllegalStateException异常;Adaptive注解用在类上,表示此类是它实现接口(插口)的自适应插件
- Activate:这个注解可以使用在类或方法上,用以根据URL的key值判断当前extension是否生效,当一个extension有多个实现时,可以加载特定的extension实现类,例如extension实现类上有注解@Activate("cache, validation"),则当URL上出现"cache”或“validation" key时,当前extension才会生效
ExtensionLoader#getAdaptiveExtensionClass
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses(); //获取所有插件
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass; //存在插件则返回
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();//创建
}
dubbo启动时,加载所有的SPI注解的插件,最终根据我们指定的name="dubbo"来加载对应的dubbo协议。
参考 http://dubbo.apache.org/zh-cn/docs/source_code_guide/dubbo-spi.html