Fegin-完整请求流程解析

这一章说说基于Fegin的声明式调用请求是怎么个流程

首先我们从构建流程中知道,大体上来说是基于JDK的动态代理机制实现的,那么在JDK的动态代理中,对方法进行增强的类就是InvocationHandler,核心方法就是invoke(),在Fegin中就是FeignInvocationHandler

我们看看这个类

feign.ReflectiveFeign$FeignInvocationHandler

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      //对Object原生方法做几个判断
      if ("equals".equals(method.getName())) {
        try {
          Object
              otherHandler =
              args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
          return equals(otherHandler);
        } catch (IllegalArgumentException e) {
          return false;
        }
      } else if ("hashCode".equals(method.getName())) {
        return hashCode();
      } else if ("toString".equals(method.getName())) {
        return toString();
      }
      //从methodHandle中依据接口的Method获取SynchronousMethodHandler
      return dispatch.get(method).invoke(args);
    }

真正执行的Method是解析好的SynchronousMethodHandler,args是请求的方法参数,从这里看出,真正执行的是构建的
SynchronousMethodHandler

进入SynchronousMethodHandler#invoke方法中

  @Override
  public Object invoke(Object[] argv) throws Throwable {
    RequestTemplate template = buildTemplateFromArgs.create(argv);
    Retryer retryer = this.retryer.clone();
    while (true) {
      try {
        //看名字就知道是执行并解码返回值
        return executeAndDecode(template);
      } catch (RetryableException e) {
        retryer.continueOrPropagate(e);
        if (logLevel != Logger.Level.NONE) {
          logger.logRetry(metadata.configKey(), logLevel);
        }
        continue;
      }
    }
  }
Object executeAndDecode(RequestTemplate template) throws Throwable {
    Request request = targetRequest(template);

    if (logLevel != Logger.Level.NONE) {
      logger.logRequest(metadata.configKey(), logLevel, request);
    }

    Response response;
    long start = System.nanoTime();
    try {
      //真正执行方法的核心    
      response = client.execute(request, options);
      // ensure the request is set. TODO: remove in Feign 10
      response.toBuilder().request(request).build();
    } catch (IOException e) {
      if (logLevel != Logger.Level.NONE) {
        logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
      }
      throw errorExecuting(request, e);
    }
    //省略……………………
}

我们知道在targetToHandlersByName.apply(target)方法中将接口的注解参数使用SpringMvcContract解析生成MethodMetadata,其中就有template,在上面executeAndDecode方法,第一步是将实际的请求地址进行拼装,参数替换,最后形成的就像这样:

http://serverA/1?name=tom?age=12

最后形成一个包含服务名的请求路径,是不是很眼熟,和我们在ribbon中传入的URL很像

这里方法嵌套的很多,没办法,只有一个个分析

    @Override
    public Response execute(Request request, Request.Options options) throws IOException {
        try {
            //获取请求URL 类似:http://serverA/1?name=tom?age=12
            URI asUri = URI.create(request.url());
            //获取服务名 类似:serverA
            String clientName = asUri.getHost();
            //替换掉服务名,类似:http:///1?name=tom?age=12
            URI uriWithoutHost = cleanUrl(request.url(), clientName);
            //这一步是将请求封装为RibbonRequest
            FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
                    this.delegate, request, uriWithoutHost);
            //获取当前服务的请求参数
            IClientConfig requestConfig = getClientConfig(options, clientName);
            //核心方法 细讲
            return lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
                    requestConfig).toResponse();
        }
        catch (ClientException e) {
            IOException io = findIOException(e);
            if (io != null) {
                throw io;
            }
            throw new RuntimeException(e);
        }
    }

上面excute()方的核心lbClient(clientName).executeWithLoadBalancer(ribbonRequest,
requestConfig).toResponse(),看第一个方法lbClient(clientName):

org.springframework.cloud.netflix.feign.ribbon.CachingSpringLoadBalancerFactory#create

    public FeignLoadBalancer create(String clientName) {
        if (this.cache.containsKey(clientName)) {
            return this.cache.get(clientName);
        }
        IClientConfig config = this.factory.getClientConfig(clientName);
        ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
        ServerIntrospector serverIntrospector = this.factory.getInstance(clientName, ServerIntrospector.class);
        FeignLoadBalancer client = enableRetry ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
            loadBalancedRetryPolicyFactory, loadBalancedBackOffPolicyFactory, loadBalancedRetryListenerFactory) : new FeignLoadBalancer(lb, config, serverIntrospector);
        this.cache.put(clientName, client);
        return client;
    }

第一步是先去缓存中获取该服务对应的FeignLoadBalancer,如果没有进行创建,我们看看创建的流程

  • 获取该服务对应的配置类
  • 获取服务对应的ILoadBalancer,其实在Ribbon中提到的,这里默认的走的是ZoneAwareLoadBalancer,注意啊,在ZoneAwareLoadBalancer初始话的时候已经完成了和EurekaClient本地注册表的拉取,保存在allServerList(BaseLoadBalancer)中,并启动了定时调度任务,每30S进行一次全量更新。初始的话也是创建和服务对应的Ribbon上下文,从该上下文中获取该服务实例的
  • 获取服务拦截器
  • 是否配置重试机制,一般没有配置,走FeignLoadBalancer

好了,lbClient(clientName)返回的是一个FeignLoadBalancer,接着执行它的executeWithLoadBalancer方法

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

        try {
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
                throw (ClientException) t;
            } else {
                throw new ClientException(e);
            }
        }
        
    }
public Observable<T> submit(final ServerOperation<T> operation) {
    final ExecutionInfoContext context = new ExecutionInfoContext();

    if (listenerInvoker != null) {
        try {
            listenerInvoker.onExecutionStart();
        } catch (AbortExecutionException e) {
            return Observable.error(e);
        }
    }

    //获取在每个服务实例重试的的次数
    final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
    //最多尝试几个服务实例
    final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

    //对于每个服务实例的调用逻辑
    //默认field server是null,通过selectServer()方法获取一个Server
    Observable<T> o = 
            (server == null ? selectServer() : Observable.just(server))
            .concatMap(new Func1<Server, Observable<T>>() {
                @Override
                //对于每个Server,按顺序映射为对于每个Server包含重试逻辑的请求调用
                public Observable<T> call(Server server) {
                    //设置上下文
                    context.setServer(server);
                    final ServerStats stats = loadBalancerContext.getServerStats(server);

                    //每个Server包含重试逻辑的请求调用
                    Observable<T> o = Observable
                            .just(server)
                            .concatMap(new Func1<Server, Observable<T>>() {
                                @Override
                                public Observable<T> call(final Server server) {
                                    context.incAttemptCount();
                                    //增加Server正在处理的请求计数
                                    loadBalancerContext.noteOpenConnection(stats);

                                    //监听器
                                    if (listenerInvoker != null) {
                                        try {
                                            listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                        } catch (AbortExecutionException e) {
                                            return Observable.error(e);
                                        }
                                    }

                                    //计时器
                                    final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                    //operation.call(server)就是刚刚分析的AbstractLoadBalancerAwareClient传过来的ServerOperation,就是直接对这个Server调用请求
                                    //doOnEach的操作就是记录请求前后的一些数据用于负载均衡数据统计
                                    return operation.call(server).doOnEach(new Observer<T>() {
                                        private T entity;
                                        @Override
                                        public void onCompleted() {
                                            //记录请求完成
                                            recordStats(tracer, stats, entity, null);
                                        }

                                        @Override
                                        public void onError(Throwable e) {
                                            //记录请求结束
                                            recordStats(tracer, stats, null, e);
                                            logger.debug("Got error {} when executed on server {}", e, server);
                                            //发生了错误,通知listener
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                            }
                                        }

                                        @Override
                                        public void onNext(T entity) {
                                            //因为只有调用请求成功只有一个结果(只有一个请求), 这里的entity就是结果,只要收到结果就代表请求成功
                                            this.entity = entity;
                                            if (listenerInvoker != null) {
                                                listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                            }
                                        }                            

                                        private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                            tracer.stop();
                                            loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                        }
                                    });
                                }
                            });

                    if (maxRetrysSame > 0)
                        //是否retry
                        o = o.retry(retryPolicy(maxRetrysSame, true));
                    return o;
                }
            });

    if (maxRetrysNext > 0 && server == null)
        //是否retry,如果retry回调用selectServer()返回下一个Server
        o = o.retry(retryPolicy(maxRetrysNext, false));

    //异常处理
    return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
        @Override
        public Observable<T> call(Throwable e) {
            if (context.getAttemptCount() > 0) {
                //如果超过重试次数,则抛异常
                if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                            "Number of retries on next server exceeded max " + maxRetrysNext
                            + " retries, while making a call for: " + context.getServer(), e);
                }
                else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                    e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                            "Number of retries exceeded max " + maxRetrysSame
                            + " retries, while making a call for: " + context.getServer(), e);
                }
            }
            if (listenerInvoker != null) {
                listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
            }
            return Observable.error(e);
        }
    });
}

首先这个Observable使用的是Java的rx包下面的组件,服务的选取采用selectServer,这个就是

Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   

采用Ribbon的服务选取进行的,上面那一大坨主要是对服务调用包装一些重试策略

具体的重试讲解,后续开文再说

这submit()方法,请求最终还是会回调call(Server server)方法

@Override
public Observable<T> call(Server server) {
   //将服务请求URL替换为真实URL,Ribbon中选取的服务    
   URI finalUri = reconstructURIWithServer(server, request.getUri());
    S requestForServer = (S) request.replaceUri(finalUri);
    try {
        //执行请求逻辑
         return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
        } catch (Exception e) {
          return Observable.error(e);
        }
  }
    @Override
    public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)
            throws IOException {
        Request.Options options;
        if (configOverride != null) {
            //配置请求参数
            options = new Request.Options(
                    configOverride.get(CommonClientConfigKey.ConnectTimeout,
                            this.connectTimeout),
                    (configOverride.get(CommonClientConfigKey.ReadTimeout,
                            this.readTimeout)));
        }
        else {
            options = new Request.Options(this.connectTimeout, this.readTimeout);
        }
        //真实请求
        Response response = request.client().execute(request.toRequest(), options);
        return new RibbonResponse(request.getUri(), response);
    }

附上总流程图:


Fegin-完整Http请求流程.png
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,099评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,473评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,229评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,570评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,427评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,335评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,737评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,392评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,693评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,730评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,512评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,349评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,750评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,017评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,290评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,706评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,904评论 2 335

推荐阅读更多精彩内容