下面我们来分析一下Dubbo的过滤器。在服务调用之前,Dubbo会通过各种filter来对请求的数据进行过滤,现在我们就来分析一下Dubbo的过滤器。先来看一下ProtocolFilterWrapper这个类
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 获得过滤器数组
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
// 倒序循环 Filter ,创建带 Filter 链的 Invoker 对象
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
System.out.println("group:" + group);
for (Filter filter : filters) {
System.out.println(filter.getClass());
}
return last;
}
再来看一下export方法
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 注册中心
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
// 建立带有 Filter 过滤链的 Invoker ,再暴露服务。
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
再看一下refer方法
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 注册中心
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
// 引用服务,返回 Invoker 对象
// 给改 Invoker 对象,包装成带有 Filter 过滤链的 Invoker 对象
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
就是在服务暴露和服务引用的时候构建一个过滤器链,执行每个过滤器的方法。
下面介绍几个Dubbo里的filter。
ClassLoaderFilter,类加载器切换过滤器
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 获得原来的类加载器
ClassLoader ocl = Thread.currentThread().getContextClassLoader();
// 切换当前线程的类加载器为服务接口的类加载器
Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
// 服务调用
try {
return invoker.invoke(invocation);
} finally {
// 切换当前线程的类加载器为原来的类加载器
Thread.currentThread().setContextClassLoader(ocl);
}
}
ContextFilter,服务提供者的 ContextFilter
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 创建新的 `attachments` 集合,清理公用的隐式参数
Map<String, String> attachments = invocation.getAttachments();
if (attachments != null) {
attachments = new HashMap<String, String>(attachments);
attachments.remove(Constants.PATH_KEY);
attachments.remove(Constants.GROUP_KEY);
attachments.remove(Constants.VERSION_KEY);
attachments.remove(Constants.DUBBO_VERSION_KEY);
attachments.remove(Constants.TOKEN_KEY);
attachments.remove(Constants.TIMEOUT_KEY);
attachments.remove(Constants.ASYNC_KEY); // Remove async property to avoid being passed to the following invoke chain.
// 清空消费端的异步参数
}
// 设置 RpcContext 对象
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
// .setAttachments(attachments) // merged from dubbox
.setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
// mreged from dubbox
// we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)
// 在此过滤器(例如rest协议)之前,我们可能已经在RpcContext中添加了一些附件。
if (attachments != null) {
if (RpcContext.getContext().getAttachments() != null) {
RpcContext.getContext().getAttachments().putAll(attachments);
} else {
RpcContext.getContext().setAttachments(attachments);
}
}
// 设置 RpcInvocation 对象的 `invoker` 属性
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
// 服务调用
try {
return invoker.invoke(invocation);
} finally {
// 移除上下文
RpcContext.removeContext();
}
}
ActiveLimitFilter,每服务消费者每服务每方法最大并发调用数限制的过滤器实现类
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
// 获得服务提供者每服务每方法最大可并行执行请求数
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
// 获得 RpcStatus 对象,基于服务 URL + 方法维度
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
if (max > 0) {
// 获得超时值
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout; // 剩余可等待时间
int active = count.getActive();
// 超过最大可并行执行请求数,等待
if (active >= max) {
synchronized (count) { // 通过锁,有且仅有一个在等待。
// 循环,等待可并行执行请求数
while ((active = count.getActive()) >= max) {
// 等待,直到超时,或者被唤醒
try {
count.wait(remain);
} catch (InterruptedException e) {
}
// 判断是否没有剩余时长了,抛出 RpcException 异常
long elapsed = System.currentTimeMillis() - start; // 本地等待时长
remain = timeout - elapsed;
if (remain <= 0) {
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + active
+ ". max concurrent invoke limit: " + max);
}
}
}
}
}
try {
long begin = System.currentTimeMillis();
// 调用开始的计数
RpcStatus.beginCount(url, methodName);
try {
// 服务调用
Result result = invoker.invoke(invocation);
// 调用结束的计数(成功)
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
return result;
} catch (RuntimeException t) {
// 调用结束的计数(失败)
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
throw t;
}
} finally {
// 唤醒等待的相同服务的相同方法的请求
if (max > 0) {
synchronized (count) {
count.notify();
}
}
}
}
ExecuteLimitFilter,服务提供者每服务每方法最大可并行执行请求数的过滤器实现类
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
Semaphore executesLimit = null; // 信号量
boolean acquireResult = false; // 是否获得信号量
// 获得服务提供者每服务每方法最大可并行执行请求数
int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
if (max > 0) {
// 获得 RpcStatus 对象,基于服务 URL + 方法维度
RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
// 获得信号量。若获得不到,抛出异常。
// if (count.getActive() >= max) {
/**
* http://manzhizhen.iteye.com/blog/2386408
* use semaphore for concurrency control (to limit thread number)
*/
executesLimit = count.getSemaphore(max);
if (executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
}
}
long begin = System.currentTimeMillis();
boolean isSuccess = true;
// 调用开始的计数
RpcStatus.beginCount(url, methodName);
try {
// 服务调用
return invoker.invoke(invocation);
} catch (Throwable t) {
isSuccess = false; // 标记失败
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
} finally {
// 调用结束的计数(成功)(失败)
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
// 释放信号量
if (acquireResult) {
executesLimit.release();
}
}
}
TimeoutFilter,如果服务调用超时,记录告警日志,不干涉服务的运行。
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
long start = System.currentTimeMillis();
// 服务调用
Result result = invoker.invoke(invocation);
// 计算调用时长
long elapsed = System.currentTimeMillis() - start;
// 超过时长,打印告警日志
if (invoker.getUrl() != null
&& elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", Integer.MAX_VALUE)) {
if (logger.isWarnEnabled()) {
logger.warn("invoke time out. method: " + invocation.getMethodName()
+ " arguments: " + Arrays.toString(invocation.getArguments()) + " , url is "
+ invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");
}
}
return result;
}
ExceptionFilter,异常过滤器
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
try {
// 服务调用
Result result = invoker.invoke(invocation);
if (result.hasException() && GenericService.class != invoker.getInterface()) {
try {
Throwable exception = result.getException();
// directly throw if it's checked exception
// 如果是checked异常,直接抛出
if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
return result;
}
// directly throw if the exception appears in the signature
// 在方法签名上有声明,直接抛出
try {
Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
Class<?>[] exceptionClassses = method.getExceptionTypes();
for (Class<?> exceptionClass : exceptionClassses) {
if (exception.getClass().equals(exceptionClass)) {
return result;
}
}
} catch (NoSuchMethodException e) {
return result;
}
// 未在方法签名上定义的异常,在服务器端打印 ERROR 日志
// for the exception not found in method's signature, print ERROR message in server's log.
logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
+ ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
+ ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);
// 异常类和接口类在同一 jar 包里,直接抛出
// directly throw if exception class and interface class are in the same jar file.
String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
return result;
}
// 是JDK自带的异常,直接抛出
// directly throw if it's JDK exception
String className = exception.getClass().getName();
if (className.startsWith("java.") || className.startsWith("javax.")) {
return result;
}
// 是Dubbo本身的异常,直接抛出
// directly throw if it's dubbo exception
if (exception instanceof RpcException) {
return result;
}
// 否则,包装成RuntimeException抛给客户端
// otherwise, wrap with RuntimeException and throw back to the client
return new RpcResult(new RuntimeException(StringUtils.toString(exception)));
} catch (Throwable e) {
logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()
+ ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
+ ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
return result;
}
}
// 返回
return result;
} catch (RuntimeException e) {
logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
+ ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
+ ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
throw e;
}
}
TokenFilter,带token的filter
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
// 获得服务提供者配置的 Token 值
String token = invoker.getUrl().getParameter(Constants.TOKEN_KEY);
if (ConfigUtils.isNotEmpty(token)) {
// 从隐式参数中,获得 Token 值。
Class<?> serviceType = invoker.getInterface();
Map<String, String> attachments = inv.getAttachments();
String remoteToken = attachments == null ? null : attachments.get(Constants.TOKEN_KEY);
// 对比,若不一致,抛出 RpcException 异常
if (!token.equals(remoteToken)) {
throw new RpcException("Invalid token! Forbid invoke remote service " + serviceType + " method " + inv.getMethodName() + "() from consumer " + RpcContext.getContext().getRemoteHost() + " to provider " + RpcContext.getContext().getLocalHost());
}
}
// 服务调用
return invoker.invoke(inv);
}
CacheFilter,缓存过滤器实现类
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 方法开启 Cache 功能
if (cacheFactory != null && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.CACHE_KEY))) {
// 基于 URL + Method 为维度,获得 Cache 对象。
Cache cache = cacheFactory.getCache(invoker.getUrl().addParameter(Constants.METHOD_KEY, invocation.getMethodName()));
if (cache != null) {
// 获得 Cache Key
String key = StringUtils.toArgumentString(invocation.getArguments());
// 从缓存中获得结果。若存在,创建 RpcResult 对象。
Object value = cache.get(key);
if (value != null) {
return new RpcResult(value);
}
// 服务调用
Result result = invoker.invoke(invocation);
// 若非异常结果,缓存结果
if (!result.hasException()) {
cache.put(key, result.getValue());
}
return result;
}
}
// 服务调用
return invoker.invoke(invocation);
}
ValidationFilter,参数验证过滤器实现类
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (validation != null && !invocation.getMethodName().startsWith("$") // 非泛化调用和回音调用等方法
&& ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.VALIDATION_KEY))) { // 方法开启 Cache 功能
try {
// 获得 Validator 对象
Validator validator = validation.getValidator(invoker.getUrl());
// 使用 Validator ,验证方法参数。若不合法,抛出异常。
if (validator != null) {
validator.validate(invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
}
} catch (RpcException e) {
throw e;
} catch (Throwable t) {
return new RpcResult(t);
}
}
// 服务调用
return invoker.invoke(invocation);
}
这些filter可以过滤请求,在filter处理后,再进行服务的调用。
Dubbo的过滤器就分析到这里了。