okhttp3源码分析

前言

相信大家想仔细认真的看下源码,然后去网上找资料都会有这样的两个感觉

  • 版本不一致,导致和博主分析的有偏差
  • 突然就跳转到某个关键类,根本不知道是从哪里进去的

反正阿简在看源码找资料的时候就是这样,说着说着快流了泪...
那阿简今天来和大家一起慢慢分析一下okhttp拦截器源码

先上总的框架图

okhtt.png

源码分析准备

  • app的build.gradle引入okhttp相关包
    implementation 'com.squareup.okhttp3:okhttp:3.14.4'
  • 创建一个简单的activity,写一个简单的GET请求的okhttp异步方法
    private void okhttpTest() {
        String url = "http://wwww.baidu.com";
        OkHttpClient okHttpClient = new OkHttpClient.Builder().addInterceptor(TestInterceptor);
        final Request request = new Request.Builder()
                .url(url)
                .get()//默认就是GET请求,可以不写
                .build();
        Call call = okHttpClient.newCall(request);
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                Log.d(TAG, "onFailure: ");
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                Log.d(TAG, "onResponse: " + response.body().string());
            }
        });
    }

然后开始我们的源码分析~

首先,我们创建了一个OkHttpClient,点进去看它的构造方法,初始化了一个叫做Builder的静态内部类,我们看看初始化了哪些参数(部分参数介绍请看注释)

 public Builder() {
     //任务调度器
      dispatcher = new Dispatcher();
     //支持的协议
      protocols = DEFAULT_PROTOCOLS;
      connectionSpecs = DEFAULT_CONNECTION_SPECS;
      eventListenerFactory = EventListener.factory(EventListener.NONE);
      proxySelector = ProxySelector.getDefault();
      if (proxySelector == null) {
        proxySelector = new NullProxySelector();
      }
      cookieJar = CookieJar.NO_COOKIES;
      socketFactory = SocketFactory.getDefault();
      hostnameVerifier = OkHostnameVerifier.INSTANCE;
      certificatePinner = CertificatePinner.DEFAULT;
      proxyAuthenticator = Authenticator.NONE;
      authenticator = Authenticator.NONE;
      //ConnectionPool是一个连接池对象,它可以用来管理连接对象,从它的构造方法中可以看到连接池的默认空闲连接数为5个,keepAlive时间为5分钟。
      connectionPool = new ConnectionPool();
      dns = Dns.SYSTEM;
      followSslRedirects = true;
      followRedirects = true;
      retryOnConnectionFailure = true;
      callTimeout = 0;
      connectTimeout = 10_000;
      readTimeout = 10_000;
      writeTimeout = 10_000;
      pingInterval = 0;
    }

阿简认为,初始化的这些这些成员变量中最重要的是Dispatcher,其他都是一些一看命名就大致知道的成员变量,那我们来看下这个任务调度器包含什么

public final class Dispatcher {
  private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
  private @Nullable Runnable idleCallback;

  /** Executes calls. Created lazily. */
  private @Nullable ExecutorService executorService;

  /** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }

  public Dispatcher() {
  }

可以看到Dispatcher这个类中包含了两个异步队列,一个同步队列用来保存请求,该文章主要从异步请求分析,所以大家可以记一下这两个异步请求的成员变量名称,后面会用到readyAsyncCallsrunningAsyncCalls

接着我们通过建造者模式创建了一个Request请求:

      final Request request = new Request.Builder()
                .url(url)
                .get()//默认就是GET请求,可以不写
                .build();

可以看到Request里面就是包含一个完整的请求报文的几个成员变量,对这部门计算机网络有疑惑的小伙伴可以参考阿简的这篇文章哟:网络基本知识-http

public final class Request {
  final HttpUrl url;
  final String method;
  final Headers headers;
  final @Nullable RequestBody body;
  final Map<Class<?>, Object> tags;

紧接着,我们通过okhttpClient去new了一个请求(call),newCall点进去

   Call call = okHttpClient.newCall(request);

可以看到最后是调用到了一个叫RealCall类的一个静态方法newRealCall,该方法创建了一个realCall的成员变量,传入了OkHttpClient 对象,同时创建了一个叫Transmitter的对象,这个类是干嘛的后面会提到(官方注释是:OkHttp的应用层和网络层之间的桥梁)

  static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.transmitter = new Transmitter(client, call);
    return call;
  }

然后开始我们的异步请求

        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                Log.d(TAG, "onFailure: ");
            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                Log.d(TAG, "onResponse: " + response.body().string());
            }
        });

点进去,可以看到Call只有一个实现类,就是我们刚才的那个RealCall,

  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    transmitter.callStart();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

可以看到这里transmitter干了一件事callStart()
然后通过Okhttp的那个成员变量dispatcher(任务调度器)调用了异步方法enqueue(AsyncCall call),传入了我们的CallBack回调监听(观察者模式喔)
然后最最最重要的来啦
我们看RealCall里面的这个内部类AsyncCall,继承了NamedRunnable,NamedRunnable又实现了Runnable 接口,那必然要实现run方法,run方法里面又调用了execute

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();

大家记住这个AsyncCall,后面会调用到这里的关键代码
回到刚才 client.dispatcher().enqueue这里,可以看到Dispatcher中的enqueue方法,我们的请求放入了readyAsyncCalls这个异步队列

  void enqueue(AsyncCall call) {
    synchronized (this) {
     //请求放入了该异步队列
      readyAsyncCalls.add(call);

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.get().forWebSocket) {
        AsyncCall existingCall = findExistingCallWithHost(call.host());
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
      }
    }
    promoteAndExecute();
  }

进入promoteAndExecute()方法,这个方法很重要哦,可以看到两个判断条件
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
如果正在执行的异步请求数量大于最大的请求数量(maxRequests 默认等于64)的话
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity
每个主机最大请求数,默认为5

  private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));
    //创建一个临时变量List用作储存异步请求
    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();

        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
       //循环中把请求分别放入临时变量和readyAsyncCalls中
        asyncCall.callsPerHost().incrementAndGet();
        executableCalls.add(asyncCall);
        runningAsyncCalls.add(asyncCall);
      }
      isRunning = runningCallsCount() > 0;
    }

  //遍历该临时变量,执行executeOn
    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }

我们看到遍历的 asyncCall.executeOn(executorService())这行代码,然后回到了RealCall的内部类AsyncCall的executeOn方法,然后传入了ThreadPoolExecutor线程池对象

    void executeOn(ExecutorService executorService) {
      assert (!Thread.holdsLock(client.dispatcher()));
      boolean success = false;
      try {
        executorService.execute(this);
        success = true;
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        transmitter.noMoreExchanges(ioException);
        responseCallback.onFailure(RealCall.this, ioException);
      } finally {
        if (!success) {
          client.dispatcher().finished(this); // This call is no longer running!
        }
      }
    }
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

执行了 executorService.execute(this),传入了this,this是什么,是不是就是我们刚才的那个实现Runable的实现类AsyncCall本身,其实这里已经很清楚了,传入Runable到线程池,线程池肯定会多态重写执行run方法,run方法又调用了execute

    @Override protected void execute() {
      boolean signalledCallback = false;
      transmitter.timeoutEnter();
      try {
        Response response = getResponseWithInterceptorChain();
        signalledCallback = true;
        responseCallback.onResponse(RealCall.this, response);
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } catch (Throwable t) {
        cancel();
        if (!signalledCallback) {
          IOException canceledException = new IOException("canceled due to " + t);
          canceledException.addSuppressed(t);
          responseCallback.onFailure(RealCall.this, canceledException);
        }
        throw t;
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

然后拦截器来了,大家进入RealCall的

  Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    boolean calledNoMoreExchanges = false;
    try {
      Response response = chain.proceed(originalRequest);
      if (transmitter.isCanceled()) {
        closeQuietly(response);
        throw new IOException("Canceled");
      }
      return response;
    } catch (IOException e) {
      calledNoMoreExchanges = true;
      throw transmitter.noMoreExchanges(e);
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null);
      }
    }
  }

可以看到interceptors 这个list中放入了很多拦截器,最新放入的是我们的应用拦截器OkHttpClient.Builder().addInterceptor(TestInterceptor)
这些拦截器的作用我后面会说明
然后就开始了我们的拦截器责任链的代码了(原理也是非常的简单)
这里创建了一个RealInterceptorChain对象,然后开始执行proceed传入原始的请求originalRequest

  public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
      throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.exchange != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }

每次传入的index+1取集合中的下一个拦截器执行intercept,然后intercept中又执行proceed进行下一次的拦截执行
是不是非常非常的简单!!
然后开始收尾,execute中最后finall执行了 client.dispatcher().finished(this);

  void finished(AsyncCall call) {
    call.callsPerHost().decrementAndGet();
    finished(runningAsyncCalls, call);
  }

可以看到finished了promoteAndExecute()中放入runningAsyncCalls的对象,然后结束了
大致就分析到这里

开始分析下刚才说的几个拦截器

RetryAndFollowUpInterceptor,用来实现连接失败的重试和重定向。

BridgeInterceptor,用来修改请求和响应的 header 信息。

CacheInterceptor,用来实现响应缓存。比如获取到的 Response 带有 Date,Expires,Last-Modified,Etag 等 header,表示该 Response 可以缓存一定的时间,下次请求就可以不需要发往服务端,直接拿缓存的。

ConnectInterceptor,用来打开到服务端的连接。其实是调用了 StreamAllocation 的newStream 方法来打开连接的。建联的 TCP 握手,TLS 握手都发生该阶段。过了这个阶段,和服务端的 socket 连接打通。

CallServerInterceptor,用来发起请求并且得到响应。上一个阶段已经握手成功,HttpStream 流已经打开,所以这个阶段把 Request 的请求信息传入流中,并且从流中读取数据封装成 Response 返回。

拦截器分类

  • 应用拦截器,就是我们addInterceptor添加的拦截器
  • 网络拦截器 addNetworkInterceptor添加的拦截器

两个的特点分别是:
应用拦截器
1.不需要关心是否重定向或者失败重连
2.应用拦截器只会调用一次,即使数据来源于缓存
3.自定义的应用拦截器是第一个开始执行的拦截器,所以这句话的意思就是,应用拦截器可以决定是否执行其他的拦截器(如果不想继续往下传递请求,那么就不调用Chain.proceed()),通过Chain.proceed()
看getResponseWithInterceptorChain()方法结合责任链,可以很好的理解这几个特点
网络拦截器
1.允许像重定向和重试一样操作中间响应。
2.网络发生短路时不调用缓存响应。
3.在数据被传递到网络时观察数据。
4.有权获得装载请求的连接。

以下关于官网介绍的这张图区分应用拦截器和网络拦截器,通过getResponseWithInterceptorChain方法可以很好的理解,最新执行的是应用拦截器


END

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容