Okhttp连接池


一、ConnectInterceptor拦截器

okhttp连接池与链路

ConnectInterceptor拦截器,intercept()方法,创建Network链路。
每一次RealCall请求类,分配一个StreamAllocation对象,负责RealConnection管理,连接池操作,HttpCodec,网络链路的分配、取消、释放,RealConnection可复用。

@Override
public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();//从Chain中获取Request
    StreamAllocation streamAllocation = realChain.streamAllocation();

    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();
    //处理返回Response
    return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

RetryAndFollowUpInterceptor拦截器(位于拦截器列表第二项目),intercept()方法,创建StreamAllocation对象,每次递归将引用传给下一个新建Chain。

streamAllocation = new StreamAllocation(
        client.connectionPool(), createAddress(request.url()), callStackTrace);

到达ConnectInterceptor拦截器时,从Chain中取出,StreamAllocation类的newStream()方法,创建/查找链路。

public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    ...
    try {
        RealConnection resultConnection = findHealthyConnection(connectTimeout, 
                readTimeout,writeTimeout, connectionRetryEnabled, 
                doExtensiveHealthChecks);
        HttpCodec resultCodec = resultConnection.newCodec(client, this);
        synchronized (connectionPool) {
            codec = resultCodec;
            return resultCodec;
      }
    } catch (IOException e) {
    }
}

findConnection()方法,通过策略寻找一个真正的RealConnection连接,该连接和Server建立个Socket连接,RealConnection类初始化HttpCodec。

public HttpCodec newCodec(OkHttpClient client, StreamAllocation streamAllocation) 
            throws SocketException {
    if (http2Connection != null) {
        return new Http2Codec(client, streamAllocation, http2Connection);
    } else {
        socket.setSoTimeout(client.readTimeoutMillis());
        source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
        sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
        return new Http1Codec(client, streamAllocation, source, sink);
    }
}

若采用Protocol.HTTP_2协议,即Http2Connection存在,创建Http2Codec,否则,创建Http1Codec,封装BufferedSource和BufferedSink。

这些新初始化的对象StreamAllocation、HttpCodec、RealConnection,一起传递给Chain节点,下一个拦截器使用。

二、分配策略

StreamAllocation类findConnection()方法。

private RealConnection findConnection(int connectTimeout, int readTimeout, 
            int writeTimeout,boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
        //先使用StreamAllocation内部保存的RealConnection
        RealConnection allocatedConnection = this.connection;
        if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
            return allocatedConnection;
        }
        //连接池中寻找
        Internal.instance.get(connectionPool, address, this, null);
        if (connection != null) {
            return connection;
        }
        selectedRoute = route;
    }
    ...
    RealConnection result;
    synchronized (connectionPool) {
        Internal.instance.get(connectionPool, address, this, selectedRoute);
        if (connection != null) return connection;
        //创建RealConnection
        route = selectedRoute;
        refusedStreamCount = 0;
        result = new RealConnection(connectionPool, selectedRoute);
        acquire(result);
    }
    //socket连接
    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      //入池
      Internal.instance.put(connectionPool, result);
      ...
    }
    return result;
  }

1,StreamAllocation内部RealConnection连接。
2,从ConnectionPool连接池查找。
3,创建RealConnection对象,保存在StreamAllocation内部,入池。

OkHttpClient类静态代码段初始化一个Internal对象。

static {
    Internal.instance = new Internal() {
        @Override public 
        RealConnection get(ConnectionPool pool, Address address,StreamAllocation streamAllocation, Route route) {
            return pool.get(address, streamAllocation, route);
        }
        @Override 
        public void put(ConnectionPool pool, RealConnection connection) {
            pool.put(connection);
        }
        ....//其他方法.
    };
}

ConnectionPool类get()方法,查询连接池中的RealConnection。

@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    for (RealConnection connection : connections) {
        if (connection.isEligible(address, route)) {
            streamAllocation.acquire(connection, true);
            return connection;
        }
    }
    return null;
}

遍历连接池每项RealConnection,当Address相同且StreamAllocationReference数量小于限制,说明是可用连接。
RealConnection内部引用StreamAllocation类型弱引用列表,allocationLimit变量限制StreamAllocation弱引用数量。

public void acquire(RealConnection connection) {
    ..
    this.connection = connection;
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}

将可用RealConnection设置成StreamAllocation内部connection,同时将StreamAllocation加入RealConnection内部弱引用列表。
达到限制时,不能再被StreamAllocation使用,新建链路,新RealConnection同样赋值到StreamAllocation内部,将该StreamAllocation加入弱引用列表,最后put连接池。

RealConnection代表一个真正的链路,封装BufferedSource、BufferedSink、路由、socket、协议、Handshake握手信息。

若是新链接,进行Socket连接,connect()方法,connectSocket()方法建立连接。

private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();
    //创建Socket
    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);
    rawSocket.setSoTimeout(readTimeout);
    try {
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }
    source = Okio.buffer(Okio.source(rawSocket));
    sink = Okio.buffer(Okio.sink(rawSocket));
}

创建一个Socket,Platform#connectSocket方法,连接。

public void connectSocket(Socket socket, InetSocketAddress address, int connectTimeout) throws IOException {
    socket.connect(address, connectTimeout);
}

成功,和Server建立一个Socket连接,失败,抛出异常。

三、连接清理

ConnectionPool连接池管理所有Socket连接,当有新的请求时,从池中分配一个链路。
ArrayDeque双向队列,线性连续空间,双向开口,在头尾两端插入删除高效,同时具有队列和栈性质,缓存常用。
默认支持5个并发keepalive,链路生命为5分钟,即链路数据传输完成,可保持5分钟的存活时间。
自动清除线程,将查找超过5分钟的链路,关闭socket。

任务清理流程

ConnectionPool的get()/put()操作方法。

void put(RealConnection connection) {
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);//执行清理任务
    }
    connections.add(connection);//加入连接池队列
}

线程池执行cleanupRunnable清理任务,设置cleanupRunning标志位。实质上是一个阻塞的清理任务。若while一直运行,下次put()将不会触发。

while (true) {
    long waitNanos = cleanup(System.nanoTime());
    if (waitNanos == -1) return;
    if (waitNanos > 0) {
        long waitMillis = waitNanos / 1000000L;
        waitNanos -= (waitMillis * 1000000L);
        synchronized (ConnectionPool.this) {
            try {
                ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
        }
    }
}

waitNanos等待下次清理的间隔时间,-1表示不需要再次清理,退出循环,0表示立即再次清理,wait()方法等待,释放锁与时间片

long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;
    synchronized (this) {
        for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
            RealConnection connection = i.next();
            // 是否在使用
            if (pruneAndGetAllocationCount(connection, now) > 0) {
                inUseConnectionCount++;
                continue;
            }
            idleConnectionCount++;
            //找空闲最长的
            long idleDurationNs = now - connection.idleAtNanos;
            if (idleDurationNs > longestIdleDurationNs) {
                longestIdleDurationNs = idleDurationNs;
                longestIdleConnection = connection;
            }
        }  
        //决定是否清理
        if (longestIdleDurationNs >= this.keepAliveDurationNs
                || idleConnectionCount > this.maxIdleConnections) {
             //删除连接
            connections.remove(longestIdleConnection);
        } else if (idleConnectionCount > 0) {
            return keepAliveDurationNs - longestIdleDurationNs;
        } else if (inUseConnectionCount > 0) {
            return keepAliveDurationNs;
        } else {
            cleanupRunning = false;
            return -1;
        }
    }
    //关闭Socket
    closeQuietly(longestIdleConnection.socket());
    return 0;
}

遍历连接,pruneAndGetAllocationCount()方法查看连接是否在使用。inUseConnectionCount,正在使用数量,自增,idleConnectionCount,空闲数量,未使用自增。
idleDurationNs,空闲连接,计算已空闲时间,查找到空闲时间最长的。

清理算法

1,longestIdleDurationNs(最长空闲时间),大于5min或空闲数量超过5个(默认),将对应longestIdleConnection连接删除,返回0,下一次立即清理。
2,不足5min,空闲连接数量<5,返回timeout(距离5min还有多久),线程阻塞timeout后,再次清理。
3,空闲连接=0,且正使用连接>0,返回5min,最长等待时间。
4,空闲和正使用连接都0,返回-1,不清理,线程结束。设置cleanupRunning标志位,下次put()方法重新唤起cleanupRunnable任务。

清理过程

从连接池队列ArrayDeque中删除该项连接。
closeQuietly()方法关闭Socket。

using连接判断

遍历RealConnection内部Reference<StreamAllocation>列表
reference.get()不空,说明有StreamAllocation正引用RealConnection。
reference.get()不存在,说明StreamAllocation已被Jvm清理,同时,从references列表中删除该项reference。
若references列表是空,说明references弱引用都是空,没有StreamAllocation使用该连接。

四、总结

ConnectInterceptor拦截器负责Network连接。

每一个RealCall请求对应一个StreamAllocation。

真正的连接RealConnection复用。

连接池采用ArrayDeque双向队列数据结构。

连接清理任务。


任重而道远

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345