OKHttp(一)dispatcher分发器

简介

OkHttp是当下Android使用最频繁的网络请求框架,由Square公司开源。Google在Android4.4以后开始将源码中的HttpURLConnection底层实现替换为OKHttp,同时现在流行的Retrofit框架底层同样是使用OKHttp的。

优点:

  • 支持Spdy、Http1.X、Http2、Quic以及WebSocket
  • 连接池复用底层TCP(Socket),减少请求延时
  • 无缝的支持GZIP减少数据流量
  • 缓存响应数据减少重复的网络请求
  • 请求失败自动重试主机的其他ip,自动重定向
    …….

tip:本文版本为OkHttp 3.10.0,最新版本为:4.0.1,逻辑与3版本并没有太大变化,但是改为kotlin实现。

   OkHttpClient client = new OkHttpClient();

   void syncGet(String url) throws IOException {
       Request request = new Request.Builder()
               .url(url)
               .build();

       // 执行同步请求
       Call call = client.newCall(request);
       Response response = call.execute();
       
       // 获得响应
       ResponseBody body = response.body();
       System.out.println(body.string());
   }

   void AsyncGet(String url) {
       Request request = new Request.Builder()
               .url(url)
               .build();

       // 执行异步请求
       Call call = client.newCall(request);
       call.enqueue(new Callback() {
           @Override
           public void onFailure(Call call, IOException e) {}

           @Override
           public void onResponse(Call call, Response response) throws IOException {
               // 获得响应
               ResponseBody body = response.body();
               System.out.println(body.string());
           }
       });
   }

使用起来很简单,我们用一张流程图描述一下

使用流程.png

在使用Okhttp发送一次请求时,对于使用者最少要用到OkHttpClientRequestCall这3个对象;
OkHttpClient 中是一些配置信息,比如代理的配置,ssl证书配置等等;
Request 是封装请求参数信息,比如请求地址、请求方法、请求头、请求体等等;
Call 本身是一个接口,实现类为RealCall,execute()是负责同步请求,enqueue()是负责异步请求,两者唯一区别在于一个会直接发起网络请求,而另一个使用OkHttp内置的线程池来进行。
请求任务是如何分配的呢?这就涉及到OkHttp的任务分发器。

Dispatcher 分发器

分发器就是来调配请求任务的,内部会包含一个线程池以及任务队列。在创建OkHttpClient时,我们也可以传递自己定义的线程池来创建分发器。我看一下Dispatcher 的成员属性:

public final class Dispatcher {  
    // 异步请求最大同时请求数量
    private int maxRequests = 64;
    // 异步请求同一域名同时存在的最大请求数量
    private int maxRequestsPerHost = 5;
    // 闲置任务(没有请求时可执行一些任务,由使用者设置) 
    private @Nullable
    Runnable idleCallback;
    //异步请求使用的线程池
    private @Nullable
    ExecutorService executorService;
    //异步请求等待执行队列
    private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
    //异步请求正在执行队列
    private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
    //同步请求正在执行队列
    private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
    // 上面说的可配置线程池的构造函数
    public Dispatcher(ExecutorService executorService) {
        this.executorService = executorService;
    }

    public Dispatcher() {}
    // 创建线程池(懒加载)
    public synchronized ExecutorService executorService() {
        if (executorService == null) {
            executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher",
                    false));
        }
        return executorService;
    }
}

execute() 同步请求

这是我们来看一下我们使用的时候调用Call 对象的同步方法execute()做了什么(实际上调用的是Call的实现类RealCall的execute方法):

    @Override
    public Response execute() throws IOException {
        synchronized (this) {
            if (executed) throw new IllegalStateException("Already Executed");
            executed = true;
        }
        captureCallStackTrace();
        eventListener.callStart(this);
        try {
            // 我们先看这里,调用了dispatcher的executed方法
            client.dispatcher().executed(this);
            Response result = getResponseWithInterceptorChain();
            if (result == null) throw new IOException("Canceled");
            return result;
        } catch (IOException e) {
            eventListener.callFailed(this, e);
            throw e;
        } finally {
            // 任务结束或者出现异常,将任务标记为结束
            client.dispatcher().finished(this);
        }
    }

很明显调用了dispatcher的executed()方法,ok再跟到dispatcher中:

    synchronized void executed(RealCall call) {
        runningSyncCalls.add(call);
    }

就一句话,将call添加到正在运行的同步队列中,表示当前的请求任务已经开始执行了,等到请求完成再把它从队列中移除 (dispatcher中的finished方法)。

enqueue() 异步请求

再看一下RealCall的enqueue方法

    @Override
    public void enqueue(Callback responseCallback) {
        synchronized (this) {
            if (executed) throw new IllegalStateException("Already Executed");
            executed = true;
        }
        captureCallStackTrace();
        eventListener.callStart(this);
        // 调用Dispatcher中的enqueue方法
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
    }

同样的调用了Dispatcher中的enqueue方法,看一眼:

    synchronized void enqueue(AsyncCall call) {
        // 如果同时执行的请求数量不超过64,同时同一域名主机的请求数量不超过5个
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
            // 满足条件将请求加入正在运行的队列中,并且开始执行
            runningAsyncCalls.add(call);
            executorService().execute(call);
        } else {
            // 不满足条件将请求加入等待队列
            readyAsyncCalls.add(call);
        }
    }

上面的描述已经很详细了,那么等待队列的请求啥时候才能被临幸呢?有没有注意到
Dispatcher的enqueue方法的参数是AsyncCall,其实他就是一个Runnable。

    final class AsyncCall extends NamedRunnable {
        private final Callback responseCallback;

        AsyncCall(Callback responseCallback) {
            super("OkHttp %s", redactedUrl());
            this.responseCallback = responseCallback;
        }

        .....

        @Override
        protected void execute() {
            boolean signalledCallback = false;
            try {
                Response response = getResponseWithInterceptorChain();
                if (retryAndFollowUpInterceptor.isCanceled()) {
                    signalledCallback = true;
                    responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
                } else {
                    signalledCallback = true;
                    responseCallback.onResponse(RealCall.this, response);
                }
            } catch (IOException e) {
                if (signalledCallback) {
                    // Do not signal the callback twice!
                    Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
                } else {
                    eventListener.callFailed(RealCall.this, e);
                    responseCallback.onFailure(RealCall.this, e);
                }
            } finally {
                // 当请求执行完成调用了Dispatcher的finished方法
                client.dispatcher().finished(this);
            }
        }
    }

和同步请求一样,当一个异步请求结束也调用了Dispatcher的finished方法,看一眼:

    void finished(AsyncCall call) {
        finished(runningAsyncCalls, call, true);
    }

调用重载的finished:

    private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
        int runningCallsCount;
        Runnable idleCallback;
        synchronized (this) {
            // 先把当前的请求移出正在执行队列
            if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
            // promoteCalls这里是true, 执行promoteCalls()
            if (promoteCalls) promoteCalls();
            runningCallsCount = runningCallsCount();
            idleCallback = this.idleCallback;
        }

        if (runningCallsCount == 0 && idleCallback != null) {
            idleCallback.run();
        }
    }

先把当前的请求移出正在执行队列,然后执行了promoteCalls(),八成我们想要的就在这里了:

    private void promoteCalls() {
        // 如果同时执行的请求数量不超过64个,直接return
        if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
        // 如果等待队列是空的,return!
        if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
        // 遍历等待队列
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
            AsyncCall call = i.next();
            // 检查一下正在执行的同一host的请求数量是不是不满5个
            if (runningCallsForHost(call) < maxRequestsPerHost) {
                // 满足条件,移出等待队列,加入正在执行队列,直接执行请求任务!
                i.remove();
                runningAsyncCalls.add(call);
                executorService().execute(call);
            }
            // 正在执行队列满了,return 吧!
            if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
        }
    }

这里就是我们要找的,当一个异步任务执行完成时,检查正在执行队列有没有空位置,遍历等待队列,把其中满足条件的请求任务拿出来,放到正在执行队列中去执行,执行完了再次检查,如此循环。

Dispatcher中的线程池

Dispatcher中的线程池很有意思,我们来看一下

 public synchronized ExecutorService executorService() {
        if (executorService == null) {
            // 核心线程数0,最大线程数Integer.MAX_VALUE,空闲线程闲置时间 60,        
            // 闲置时间单位 秒,线程等待队列 SynchronousQueue, 以及线程创建工厂
            executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher",
                    false));
        }
        return executorService;
    }

这是Dispatcher中默认创建的线程池,注意里面几个关键的参数,第一个参数核心线程数量0,最大线程数 Integer.MAX_VALUE 和队列 SynchronousQueue。
为啥要这么配置呢?
首先核心线程为0,表示线程池就不用一直为我们缓存线程,线程池中所有线程都是在60s内没有工作就会被回收,在一些网络请求不频繁的app上可以节约一些内存。而最大线程Integer.MAX_VALUE与等待队列SynchronousQueue的组合能够得到最大的吞吐量,即当需要线程池执行任务时,如果不存在空闲线程不需要等待,马上新建线程执行任务!来分析一下:
SynchronousQueue 是一种无容量的队列,当向线程池中加入新请求时,只会有以下几种情况:

  • 由于没有核心线程,第一次进来的请求会直接创建新线程执行。
  • 此时有未被回收的空闲线程,复用该线程执行请求。
  • 没有空闲线程了,新请求需要添加到队列,但是队列的容量为0,相当于满了,此时要判断当前正在执行的线程数是否大于最大线程数,由于最大线程数为Integer.MAX_VALUE,肯定不会超过,所以要新建一个线程执行请求。

可以得出结论,我们的请求任务是可以立即执行的,其实还有另外2种阻塞队列:ArrayBlockingQueueLinkedBlockingQueue,但是它们一般都有容量,当请求任务进入等待队列后,可能会出现以下几种情况:

  • 没有空闲线程了,新请求添加到队列,队列中的请求一直在等待空闲线程,出现阻塞。
  • 没有空闲线程了,等待队列也满了,正在执行的线程数小于最大线程数,新建线程执行请求,出现后提交的任务先执行,而先提交的任务一直在等待的情况,同样阻塞了。
  • 没有空闲线程了,等待队列也满了,正在执行的线程数大于等于最大线程数,请求被拒绝了,尴尬。

总结一句话,Dispatcher的默认线程池参数配置保证了新建的请求都可以被立即执行,避免阻塞。
当然需要注意的时,进程的内存是存在限制的,而每一个线程都需要分配一定的内存。所以线程并不能无限个数。那么当设置最大线程数为Integer.MAX_VALUE时,OkHttp同时还有最大请求任务执行个数: 64的限制,这样既解决了这个问题同时也能获得最大吞吐。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容