实例讲解阿里Spring Cloud面试题——如何使用自定义过滤器通过Hystrix实现降级处理

一. 前提

在微服务架构中,下游依赖出现问题如果上游调用方不做请求降级处理,下游的异常依赖没有被隔离,很有可能出现因为一两个服务或者小到一两个接口异常导致上游所有服务不可用,甚至影响整个业务线。请求降级处理目前比较主流的依然是Netfilx出品的Hystrix。Hystrix的工作原理是:

  • 把请求基于线程池或者信号量隔离,一旦下游服务在指定配置的超时时间内无法响应会进入预设或者默认的降级实现。
  • 每个请求的状态都会记录下来,在一个滑动窗口内处理失败的比率超过设定的阈值就会触发熔断器(Circle Breaker)开启,熔断器开启之后所有请求都会直接进入预设或者默认的降级逻辑。
  • 熔断器打开后,且距离熔断器打开的时间或上一次试探请求放行的时间超过设定值,熔断器器进入半开状态,允许放行一个试探请求。
  • 请求成功率提高后,基于统计数据确定对熔断器进行关闭,所有请求正常放行。

这里不对Hystrix的细节做更深入分析,而是接着谈谈Spring Cloud Gateway中如何使用Hystrix,主要包括内置的Hystrix过滤器和定制过滤器结合Hystrix实现我们想要的功能。除了要引入spring-cloud-starter-gateway依赖之外,还需要引入spring-cloud-starter-netflix-hystrix。

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-gateway</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
    </dependency>
</dependencies>

二. 使用内置的Hystrix过滤器

内置的Hystrix过滤器是HystrixGatewayFilterFactory,它支持的配置是:

public static class Config {
    // 如果下面的Setter配置为null的时候,name会作为Hystrix的HystrixCommandKey
    private String name;
    // Hystrix的Setter属性,主要用来配置命令的KEY和其他属性
    private Setter setter;
    // 降级的目标URI,必须以forward开头,URI会匹配到网关应用的控制器方法
    private URI fallbackUri;

    public String getName() {
        return name;
    }

    public Config setName(String name) {
        this.name = name;
        return this;
    }

    public Config setFallbackUri(String fallbackUri) {
        if (fallbackUri != null) {
            setFallbackUri(URI.create(fallbackUri));
        }
        return this;
    }

    public URI getFallbackUri() {
        return fallbackUri;
    }
    
    // 注意这个方法,配置的fallbackUri要以forward开始作为schema,否则会抛异常
    public void setFallbackUri(URI fallbackUri) {
        if (fallbackUri != null && !"forward".equals(fallbackUri.getScheme())) {
            throw new IllegalArgumentException("Hystrix Filter currently only supports 'forward' URIs, found " + fallbackUri);
        }
        this.fallbackUri = fallbackUri;
    }

    public Config setSetter(Setter setter) {
        this.setter = setter;
        return this;
    }
}

另外,①全局的Hystrix配置也会对HystrixGatewayFilterFactory生效;②HystrixGatewayFilterFactory可以作为默认过滤器(default-filters)对所有的路由配置作为兜底过滤器并发挥作用。

对于第①点,我们如果在application.yaml中配置如下:

// 执行超时时间为1秒,会对下面路由order_route绑定的HystrixGatewayFilterFactory生效
hystrix.command.fallbackcmd.execution.isolation.thread.timeoutInMilliseconds: 1000

spring:
  cloud:
    gateway:
      routes:
      - id: order_route
        uri: http://localhost:9091
        predicates:
        - Path=/order/**
        filters:
        - name: Hystrix
          args:
            name: HystrixCommand
            fallbackUri: forward:/fallback

配置的hystrix.command.fallbackcmd.execution.isolation.thread.timeoutInMilliseconds会对绑定在路由order_route中的HystrixGatewayFilterFactory生效。

对于第②点,我们可以把HystrixGatewayFilterFactory配置为默认过滤器,这样子所有的路由都会关联此过滤器,但是非必要时建议不要这样做

spring:
  cloud:
    gateway:
      routes:
        - id: order_route
          uri: http://localhost:9091
          predicates:
            - Path=/order/**
      default-filters:
        - name: Hystrix
          args:
            name: HystrixCommand
            fallbackUri: forward:/fallback

笔者在测试的时候,发现上面提到的Setter无法配置,估计是由于Hystrix的Setter对象是经过多重包装,暂时没有办法设置该属性。接着我们要在网关服务加一个控制器方法用于处理重定向的/fallback请求:

@RestController
public class FallbackController {

    @RequestMapping(value = "/fallback")
    @ResponseStatus
    public Mono<Map<String, Object>> fallback(ServerWebExchange exchange, Throwable throwable) {
        Map<String, Object> result = new HashMap<>(8);
        ServerHttpRequest request = exchange.getRequest();
        result.put("path", request.getPath().pathWithinApplication().value());
        result.put("method", request.getMethodValue());
        if (null != throwable.getCause()) {
            result.put("message", throwable.getCause().getMessage());
        } else {
            result.put("message", throwable.getMessage());
        }
        return Mono.just(result);
    }
}

控制器方法入参会被Spring Cloud Gateway的内部组件处理,可以回调一些有用的类型例如ServerWebExchange实例、具体的异常实例等等。

三. 使用Hystrix定制过滤器

HystrixGatewayFilterFactory在大多数情况下应该可以满足业务需要,但是这里也做一次定制一个整合Hystrix的过滤器,实现的功能如下:

  • 基于每个请求URL创建一个新的Hystrix命令实例进行调用。
  • 每个URL可以指定特有的线程池配置,如果不指定则使用默认的。
  • 每个URL可以配置单独的Hystrix超时时间。

也就是通过Hystrix使用线程池对每种不同的外部请求URL进行隔离。当然,这样的过滤器仅仅在外部请求的不同URL的数量有限的情况下才比较合理,否则有可能创建过多的线程池造成系统性能的下降,适得其反。改造如下:

@Component
public class CustomHystrixFilter extends AbstractGatewayFilterFactory<CustomHystrixFilter.Config> {

    private static final String FORWARD_KEY = "forward";
    private static final String NAME = "CustomHystrix";
    private static final int TIMEOUT_MS = 1000;
    private final ObjectProvider<DispatcherHandler> dispatcherHandlerProvider;
    private volatile DispatcherHandler dispatcherHandler;
    private boolean processConfig = false;

    public CustomHystrixFilter(ObjectProvider<DispatcherHandler> dispatcherHandlerProvider) {
        super(Config.class);
        this.dispatcherHandlerProvider = dispatcherHandlerProvider;
    }

    private DispatcherHandler getDispatcherHandler() {
        if (dispatcherHandler == null) {
            dispatcherHandler = dispatcherHandlerProvider.getIfAvailable();
        }

        return dispatcherHandler;
    }

    @Override
    public List<String> shortcutFieldOrder() {
        return Collections.singletonList(NAME_KEY);
    }


    @Override
    public GatewayFilter apply(Config config) {
        processConfig(config);
        return (exchange, chain) -> {
            ServerHttpRequest request = exchange.getRequest();
            String path = request.getPath().pathWithinApplication().value();
            int timeout = config.getTimeout().getOrDefault(path, TIMEOUT_MS);
            CustomHystrixCommand command = new CustomHystrixCommand(config.getFallbackUri(), exchange, chain, timeout, path);
            return Mono.create(s -> {
                Subscription sub = command.toObservable().subscribe(s::success, s::error, s::success);
                s.onCancel(sub::unsubscribe);
            }).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> {
                if (throwable instanceof HystrixRuntimeException) {
                    HystrixRuntimeException e = (HystrixRuntimeException) throwable;
                    HystrixRuntimeException.FailureType failureType = e.getFailureType();
                    switch (failureType) {
                        case TIMEOUT:
                            return Mono.error(new TimeoutException());
                        case COMMAND_EXCEPTION: {
                            Throwable cause = e.getCause();
                            if (cause instanceof ResponseStatusException || AnnotatedElementUtils
                                    .findMergedAnnotation(cause.getClass(), ResponseStatus.class) != null) {
                                return Mono.error(cause);
                            }
                        }
                        default:
                            break;
                    }
                }
                return Mono.error(throwable);
            }).then();
        };
    }

    /**
     * YAML解析的时候MAP的KEY不支持'/',这里只能用'-'替代
     *
     * @param config config
     */
    private void processConfig(Config config) {
        if (!processConfig) {
            processConfig = true;
            if (null != config.getTimeout()) {
                Map<String, Integer> timeout = new HashMap<>(8);
                config.getTimeout().forEach((k, v) -> {
                    String key = k.replace("-", "/");
                    if (!key.startsWith("/")) {
                        key = "/" + key;
                    }
                    timeout.put(key, v);
                });
                config.setTimeout(timeout);
            }
        }
    }

    @Override
    public String name() {
        return NAME;
    }

    private class CustomHystrixCommand extends HystrixObservableCommand<Void> {

        private final URI fallbackUri;
        private final ServerWebExchange exchange;
        private final GatewayFilterChain chain;

        public CustomHystrixCommand(URI fallbackUri,
                                    ServerWebExchange exchange,
                                    GatewayFilterChain chain,
                                    int timeout,
                                    String key) {
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(key))
                    .andCommandKey(HystrixCommandKey.Factory.asKey(key))
                    .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(timeout)));
            this.fallbackUri = fallbackUri;
            this.exchange = exchange;
            this.chain = chain;
        }

        @Override
        protected Observable<Void> construct() {
            return RxReactiveStreams.toObservable(this.chain.filter(exchange));
        }

        @Override
        protected Observable<Void> resumeWithFallback() {
            if (null == fallbackUri) {
                return super.resumeWithFallback();
            }
            URI uri = exchange.getRequest().getURI();
            boolean encoded = containsEncodedParts(uri);
            URI requestUrl = UriComponentsBuilder.fromUri(uri)
                    .host(null)
                    .port(null)
                    .uri(this.fallbackUri)
                    .build(encoded)
                    .toUri();
            exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
            ServerHttpRequest request = this.exchange.getRequest().mutate().uri(requestUrl).build();
            ServerWebExchange mutated = exchange.mutate().request(request).build();
            return RxReactiveStreams.toObservable(getDispatcherHandler().handle(mutated));
        }
    }

    public static class Config {

        private String id;
        private URI fallbackUri;
        /**
         * url -> timeout ms
         */
        private Map<String, Integer> timeout;

        public String getId() {
            return id;
        }

        public Config setId(String id) {
            this.id = id;
            return this;
        }

        public URI getFallbackUri() {
            return fallbackUri;
        }

        public Config setFallbackUri(URI fallbackUri) {
            if (fallbackUri != null && !FORWARD_KEY.equals(fallbackUri.getScheme())) {
                throw new IllegalArgumentException("Hystrix Filter currently only supports 'forward' URIs, found " + fallbackUri);
            }
            this.fallbackUri = fallbackUri;
            return this;
        }

        public Map<String, Integer> getTimeout() {
            return timeout;
        }

        public Config setTimeout(Map<String, Integer> timeout) {
            this.timeout = timeout;
            return this;
        }
    }
}

其实大部分代码和内置的Hystrix过滤器差不多,只是改了命令改造函数部分和配置加载处理的部分。配置文件如下:

spring:
  cloud:
    gateway:
      routes:
        - id: hystrix_route
          uri: http://localhost:9091
          predicates:
            - Host=localhost:9090
          filters:
            - name: CustomHystrix
              args:
                id: CustomHystrix
                fallbackUri: forward:/fallback
                timeout:
                  # 这里暂时用-分隔URL,因为/不支持
                  order-remote: 2000
  application:
    name: route-server
server:
  port: 9090

网关添加一个/fallback处理控制器如下:

@RestController
public class FallbackController {

    @RequestMapping(value = "/fallback")
    @ResponseStatus
    public Mono<Map<String, Object>> fallback(ServerWebExchange exchange, Throwable throwable) {
        Map<String, Object> result = new HashMap<>(8);
        ServerHttpRequest request = exchange.getRequest();
        result.put("path", request.getPath().pathWithinApplication().value());
        result.put("method", request.getMethodValue());
        if (null != throwable.getCause()) {
            result.put("message", throwable.getCause().getMessage());
        } else {
            result.put("message", throwable.getMessage());
        }
        return Mono.just(result);
    }
}

故意在下游服务打断点:

curl http://localhost:9090/order/remote

响应结果:
{
    "path": "/fallback",
    "method": "GET",
    "message": null   # <== 这里由于是超时异常,message就是null
}

刚好符合预期结果。

四. 总结

这篇文章仅仅是对Hystrix和过滤器应用提供一个可用的例子和解决问题的思路,具体如何使用还是需要针对真实的场景。

写在最后

  • 第一:看完点赞,感谢您的认可;
  • ...
  • 第二:随手转发,分享知识,让更多人学习到;
  • ...
  • 第三:记得点关注,每天更新的!!!
  • ...
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,898评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,401评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,058评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,539评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,382评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,319评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,706评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,370评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,664评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,715评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,476评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,326评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,730评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,003评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,275评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,683评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,877评论 2 335