Dubbo源码分析(七) 过滤器

下面我们来分析一下Dubbo的过滤器。在服务调用之前,Dubbo会通过各种filter来对请求的数据进行过滤,现在我们就来分析一下Dubbo的过滤器。先来看一下ProtocolFilterWrapper这个类

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    // 获得过滤器数组
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    // 倒序循环 Filter ,创建带 Filter 链的 Invoker 对象
    if (!filters.isEmpty()) {
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new Invoker<T>() {

                @Override
                public Class<T> getInterface() {
                    return invoker.getInterface();
                }

                @Override
                public URL getUrl() {
                    return invoker.getUrl();
                }

                @Override
                public boolean isAvailable() {
                    return invoker.isAvailable();
                }

                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    return filter.invoke(next, invocation);
                }

                @Override
                public void destroy() {
                    invoker.destroy();
                }

                @Override
                public String toString() {
                    return invoker.toString();
                }
            };
        }
    }
    System.out.println("group:" + group);
    for (Filter filter : filters) {
        System.out.println(filter.getClass());
    }
    return last;
}

再来看一下export方法

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    // 注册中心
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    // 建立带有 Filter 过滤链的 Invoker ,再暴露服务。
    return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}

再看一下refer方法

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // 注册中心
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    // 引用服务,返回 Invoker 对象
    // 给改 Invoker 对象,包装成带有 Filter 过滤链的 Invoker 对象
    return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}

就是在服务暴露和服务引用的时候构建一个过滤器链,执行每个过滤器的方法。
下面介绍几个Dubbo里的filter。
ClassLoaderFilter,类加载器切换过滤器

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    // 获得原来的类加载器
    ClassLoader ocl = Thread.currentThread().getContextClassLoader();
    // 切换当前线程的类加载器为服务接口的类加载器
    Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
    // 服务调用
    try {
        return invoker.invoke(invocation);
    } finally {
        // 切换当前线程的类加载器为原来的类加载器
        Thread.currentThread().setContextClassLoader(ocl);
    }
}

ContextFilter,服务提供者的 ContextFilter

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    // 创建新的 `attachments` 集合,清理公用的隐式参数
    Map<String, String> attachments = invocation.getAttachments();
    if (attachments != null) {
        attachments = new HashMap<String, String>(attachments);
        attachments.remove(Constants.PATH_KEY);
        attachments.remove(Constants.GROUP_KEY);
        attachments.remove(Constants.VERSION_KEY);
        attachments.remove(Constants.DUBBO_VERSION_KEY);
        attachments.remove(Constants.TOKEN_KEY);
        attachments.remove(Constants.TIMEOUT_KEY);
        attachments.remove(Constants.ASYNC_KEY); // Remove async property to avoid being passed to the following invoke chain.
                                                 // 清空消费端的异步参数
    }
    // 设置 RpcContext 对象
    RpcContext.getContext()
            .setInvoker(invoker)
            .setInvocation(invocation)
 //                .setAttachments(attachments)  // merged from dubbox
            .setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
    // mreged from dubbox
    // we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)
    // 在此过滤器(例如rest协议)之前,我们可能已经在RpcContext中添加了一些附件。
    if (attachments != null) {
        if (RpcContext.getContext().getAttachments() != null) {
            RpcContext.getContext().getAttachments().putAll(attachments);
        } else {
            RpcContext.getContext().setAttachments(attachments);
        }
    }
    // 设置 RpcInvocation 对象的 `invoker` 属性
    if (invocation instanceof RpcInvocation) {
        ((RpcInvocation) invocation).setInvoker(invoker);
    }
    // 服务调用
    try {
        return invoker.invoke(invocation);
    } finally {
        // 移除上下文
        RpcContext.removeContext();
    }
}

ActiveLimitFilter,每服务消费者每服务每方法最大并发调用数限制的过滤器实现类

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    URL url = invoker.getUrl();
    String methodName = invocation.getMethodName();
    // 获得服务提供者每服务每方法最大可并行执行请求数
    int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
    // 获得 RpcStatus 对象,基于服务 URL + 方法维度
    RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
    if (max > 0) {
        // 获得超时值
        long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
        long start = System.currentTimeMillis();
        long remain = timeout; // 剩余可等待时间
        int active = count.getActive();
        // 超过最大可并行执行请求数,等待
        if (active >= max) {
            synchronized (count) { // 通过锁,有且仅有一个在等待。
                // 循环,等待可并行执行请求数
                while ((active = count.getActive()) >= max) {
                    // 等待,直到超时,或者被唤醒
                    try {
                        count.wait(remain);
                    } catch (InterruptedException e) {
                    }
                    // 判断是否没有剩余时长了,抛出 RpcException 异常
                    long elapsed = System.currentTimeMillis() - start; // 本地等待时长
                    remain = timeout - elapsed;
                    if (remain <= 0) {
                        throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                + invoker.getInterface().getName() + ", method: "
                                + invocation.getMethodName() + ", elapsed: " + elapsed
                                + ", timeout: " + timeout + ". concurrent invokes: " + active
                                + ". max concurrent invoke limit: " + max);
                    }
                }
            }
        }
    }
    try {
        long begin = System.currentTimeMillis();
        // 调用开始的计数
        RpcStatus.beginCount(url, methodName);
        try {
            // 服务调用
            Result result = invoker.invoke(invocation);
            // 调用结束的计数(成功)
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
            return result;
        } catch (RuntimeException t) {
            // 调用结束的计数(失败)
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
            throw t;
        }
    } finally {
        // 唤醒等待的相同服务的相同方法的请求
        if (max > 0) {
            synchronized (count) {
                count.notify();
            }
        }
    }
}

ExecuteLimitFilter,服务提供者每服务每方法最大可并行执行请求数的过滤器实现类

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    URL url = invoker.getUrl();
    String methodName = invocation.getMethodName();
    Semaphore executesLimit = null; // 信号量
    boolean acquireResult = false; // 是否获得信号量
    // 获得服务提供者每服务每方法最大可并行执行请求数
    int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
    if (max > 0) {
        // 获得 RpcStatus 对象,基于服务 URL + 方法维度
        RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
        // 获得信号量。若获得不到,抛出异常。
//            if (count.getActive() >= max) {
        /**
         * http://manzhizhen.iteye.com/blog/2386408
         * use semaphore for concurrency control (to limit thread number)
         */
        executesLimit = count.getSemaphore(max);
        if (executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
            throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
        }
    }
    long begin = System.currentTimeMillis();
    boolean isSuccess = true;
    // 调用开始的计数
    RpcStatus.beginCount(url, methodName);
    try {
        // 服务调用
        return invoker.invoke(invocation);
    } catch (Throwable t) {
        isSuccess = false; // 标记失败
        if (t instanceof RuntimeException) {
            throw (RuntimeException) t;
        } else {
            throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
        }
    } finally {
        // 调用结束的计数(成功)(失败)
        RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
        // 释放信号量
        if (acquireResult) {
            executesLimit.release();
        }
    }
}

TimeoutFilter,如果服务调用超时,记录告警日志,不干涉服务的运行。

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    long start = System.currentTimeMillis();
    // 服务调用
    Result result = invoker.invoke(invocation);
    // 计算调用时长
    long elapsed = System.currentTimeMillis() - start;
    // 超过时长,打印告警日志
    if (invoker.getUrl() != null
            && elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", Integer.MAX_VALUE)) {
        if (logger.isWarnEnabled()) {
            logger.warn("invoke time out. method: " + invocation.getMethodName()
                    + " arguments: " + Arrays.toString(invocation.getArguments()) + " , url is "
                    + invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");
        }
    }
    return result;
}

ExceptionFilter,异常过滤器

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    try {
        // 服务调用
        Result result = invoker.invoke(invocation);

        if (result.hasException() && GenericService.class != invoker.getInterface()) {
            try {
                Throwable exception = result.getException();

                // directly throw if it's checked exception
                // 如果是checked异常,直接抛出
                if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
                    return result;
                }
                // directly throw if the exception appears in the signature
                // 在方法签名上有声明,直接抛出
                try {
                    Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
                    Class<?>[] exceptionClassses = method.getExceptionTypes();
                    for (Class<?> exceptionClass : exceptionClassses) {
                        if (exception.getClass().equals(exceptionClass)) {
                            return result;
                        }
                    }
                } catch (NoSuchMethodException e) {
                    return result;
                }

                // 未在方法签名上定义的异常,在服务器端打印 ERROR 日志
                // for the exception not found in method's signature, print ERROR message in server's log.
                logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                        + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                        + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);

                // 异常类和接口类在同一 jar 包里,直接抛出
                // directly throw if exception class and interface class are in the same jar file.
                String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
                String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
                if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
                    return result;
                }
                // 是JDK自带的异常,直接抛出
                // directly throw if it's JDK exception
                String className = exception.getClass().getName();
                if (className.startsWith("java.") || className.startsWith("javax.")) {
                    return result;
                }
                // 是Dubbo本身的异常,直接抛出
                // directly throw if it's dubbo exception
                if (exception instanceof RpcException) {
                    return result;
                }

                // 否则,包装成RuntimeException抛给客户端
                // otherwise, wrap with RuntimeException and throw back to the client
                return new RpcResult(new RuntimeException(StringUtils.toString(exception)));
            } catch (Throwable e) {
                logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()
                        + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                        + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
                return result;
            }
        }
        // 返回
        return result;
    } catch (RuntimeException e) {
        logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
        throw e;
    }
}

TokenFilter,带token的filter

public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
    // 获得服务提供者配置的 Token 值
    String token = invoker.getUrl().getParameter(Constants.TOKEN_KEY);
    if (ConfigUtils.isNotEmpty(token)) {
        // 从隐式参数中,获得 Token 值。
        Class<?> serviceType = invoker.getInterface();
        Map<String, String> attachments = inv.getAttachments();
        String remoteToken = attachments == null ? null : attachments.get(Constants.TOKEN_KEY);
        // 对比,若不一致,抛出 RpcException 异常
        if (!token.equals(remoteToken)) {
            throw new RpcException("Invalid token! Forbid invoke remote service " + serviceType + " method " + inv.getMethodName() + "() from consumer " + RpcContext.getContext().getRemoteHost() + " to provider " + RpcContext.getContext().getLocalHost());
        }
    }
    // 服务调用
    return invoker.invoke(inv);
}

CacheFilter,缓存过滤器实现类

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    // 方法开启 Cache 功能
    if (cacheFactory != null && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.CACHE_KEY))) {
        // 基于 URL + Method 为维度,获得 Cache 对象。
        Cache cache = cacheFactory.getCache(invoker.getUrl().addParameter(Constants.METHOD_KEY, invocation.getMethodName()));
        if (cache != null) {
            // 获得 Cache Key
            String key = StringUtils.toArgumentString(invocation.getArguments());
            // 从缓存中获得结果。若存在,创建 RpcResult 对象。
            Object value = cache.get(key);
            if (value != null) {
                return new RpcResult(value);
            }
            // 服务调用
            Result result = invoker.invoke(invocation);
            // 若非异常结果,缓存结果
            if (!result.hasException()) {
                cache.put(key, result.getValue());
            }
            return result;
        }
    }
    // 服务调用
    return invoker.invoke(invocation);
}

ValidationFilter,参数验证过滤器实现类

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    if (validation != null && !invocation.getMethodName().startsWith("$") // 非泛化调用和回音调用等方法
            && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.VALIDATION_KEY))) { // 方法开启 Cache 功能
        try {
            // 获得 Validator 对象
            Validator validator = validation.getValidator(invoker.getUrl());
            // 使用 Validator ,验证方法参数。若不合法,抛出异常。
            if (validator != null) {
                validator.validate(invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            }
        } catch (RpcException e) {
            throw e;
        } catch (Throwable t) {
            return new RpcResult(t);
        }
    }
    // 服务调用
    return invoker.invoke(invocation);
}

这些filter可以过滤请求,在filter处理后,再进行服务的调用。
Dubbo的过滤器就分析到这里了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,591评论 18 139
  • Dubbo是什么 Dubbo是Alibaba开源的分布式服务框架,它最大的特点是按照分层的方式来架构,使用这种方式...
    Coselding阅读 17,165评论 3 196
  • 家有宠物乐趣多 1 如今的我们,在工作之余,总要找点属于自己的乐事。女的理家教子。男的打猎是不行了,遛狗还可以,可...
    lovingyourself阅读 161评论 0 0
  • 孙秀力 一.回放我在事件当中的是什么情绪? 情绪:生气,不耐烦 事件:一个金融平台,要求投资人在群里看消息,学习自...
    孙秀力阅读 301评论 0 0
  • 00上升双子 当我们说起上升星座的时候 一定会有人问 上升星座是什么? 上升星座该怎么看? 其实上升星座的意思是 ...
    学长的秘密花园阅读 11,335评论 2 2