前言
Okio是一款轻量级IO框架,由安卓大区最强王者Square公司打造,是著名网络框架OkHttp的基石。Okio结合了java.io和java.nio,提供阻塞IO和非阻塞IO的功能,同时也对缓存等底层结构做了优化,能让你更轻快的获得、存储和处理数据。
这篇文章主要是对Okio框架的核心部分做详尽的解析。由于Okio的代码量不大且比较精巧,核心的代码大约5000行,本文将采用自底向上的分析方法。先谈下Java IO的缺点,并对Okio的整体框架做个介绍,再依次详细分析Okio的各个模块的实现,包括缓存模块、定时模块等,之后对阻塞IO和非阻塞IO的执行过程,通过阅读源码,进行流程分析,最后做个总结,总结Okio的优化思想和设计精髓。
借着这篇文章的机会,向大家介绍这款优雅的IO框架,也想和大家探讨设计的相关问题。希望通过这篇文章,能让大家对Okio有个了解,甚至乐于放弃JAVA原生的IO体系,转而使用这款IO框架来作为自己日常开发的工具。
如果你对一些基础的IO模型(阻塞IO、非阻塞IO、同步IO、异步IO、多路复用、BIO、NIO、AIO)不清楚的话,下面是一些不错的补课资料。
Linux IO模式及 select、poll、epoll详解
Java NIO Tutorial
Java NIO - Ron Hitchens
源码下载地址
https://github.com/square/okio
文中部分图片可能看不清楚,可以点一下看原图。
全文较长,这里先放出整体的一个目录图
- 前言
- 从Java IO说起
- Okio框架结构
- 缓存结构
- 定时机制
- 自定义字符串ByteString
- 流程分析
- 总结
从Java IO说起
大量独立拓展的装饰者导致类爆炸
用过Java IO的同学都应该有体会,Java的流用起来很麻烦和笨重。这主要是因为Java IO体系采用装饰者模式构建和扩展,整个体系十分复杂庞大,基础接口就有4个(InputStream, OutputStream, Reader, Writer),为了支持每一种组合而产生大量独立拓展的子类,使得子类的数目呈爆炸性增长,每个类对应一种IO需求。
下面是一段Java IO调用代码。仅仅是一个简单需求就要写这么一大堆代码。相信大家早已对此心怀不满。
// Java IO
public static void writeTest(File file) {
try {
FileOutputStream fos = new FileOutputStream(file);
OutputStream os = new BufferedOutputStream(fos);
DataOutputStream dos = new DataOutputStream(os);
dos.writeUTF("write string by utf-8.\n");
dos.writeInt(1234);
dos.flush();
fos.close();
} catch (Exception e) {
e.printStackTrace();
}
}
使用Okio实现同样的功能,明显轻松得多。而且Okio中的类被特意地设计为支持链式调用。正确的使用链式调用,就能产生简洁、优美、易读的代码。现在很多框架都是这样设计,是个流行趋势。
// Okio
public static void writeTest(File file) {
try {
Okio.buffer(Okio.sink(file))
.writeUtf8("write string by utf-8.\n")
.writeInt(1234).close();
} catch (Exception e) {
e.printStackTrace();
}
}
阻塞IO的瓶颈
传统Java socket的阻塞性质曾经是Java程序可伸缩性的最重要制约之一。维持一个socket连接必须单独创建一个线程来管理,由此产生大量的线程切换,导致程序性能急剧降低。有了非阻塞IO,进程仅需一个线程就能管理所有的连接,非阻IO是许多复杂的、高性能的程序构建的基础。
服务器端经常会考虑到非阻塞socket通道,因为它们使同时管理很多socket 通道变得更容易。但是,在客户端使用一个或几个非阻塞模式的socket 通道也是有益处的,例如,借助非阻塞的socket 通道,GUI 程序可以专注于用户请求并且同时维护与一个或多个服务器的会话。在很多程序上,非阻塞模式都是有用的。
为了解决这个问题Java的1.4版本加入了nio库,引入了Buffer,Channel,Selector等概念,实现了非阻塞IO多路复用模型。
而Okio另辟蹊径,对的Java原生流做了一个分装,自己设计了一套非阻塞调用的机制(看门狗)。至于为什么底层采用的是原生流而不是Channel,我只能对大佬的思想做一个猜测。因为Okio被设计出来主要是为了做网络通信,而TCP/IP本身就是流式协议,所以底层采用的还是Java的原生流。使用看门狗而不是Selector,是为了更轻量的IO操作,更适合移动端。
Okio框架结构
废话不多说,先直接上类图。下图画出了Okio中的一些核心类(部分装饰者类和工具类没有画出来)。图片看出清楚可以点一下放大。
可以看出Okio的类图是非常简单的,这也是Okio之所以轻量的原因。
最基本的接口只有两个:Sink、Source,大概相当于OutputStream和InputStream在原生接口中的地位。这两个接口中只定义了一些最基础的IO操作方法。
BufferedSink和BufferedSource接口分别继承自Sink和Source,扩展了读写功能,定义了各式各样的读和写。
public interface BufferedSink extends Sink {
Buffer buffer();
BufferedSink write(ByteString byteString) throws IOException;
BufferedSink write(byte[] source) throws IOException;
BufferedSink write(byte[] source, int offset, int byteCount) throws IOException;
long writeAll(Source source) throws IOException;
BufferedSink write(Source source, long byteCount) throws IOException;
BufferedSink writeUtf8(String string) throws IOException;
BufferedSink writeUtf8(String string, int beginIndex, int endIndex) throws IOException;
BufferedSink writeString(String string, int beginIndex, int endIndex, Charset charset)
throws IOException;
BufferedSink writeByte(int b) throws IOException;
BufferedSink writeShort(int s) throws IOException;
BufferedSink writeShortLe(int s) throws IOException;
BufferedSink writeInt(int i) throws IOException;
BufferedSink writeIntLe(int i) throws IOException;
BufferedSink writeLong(long v) throws IOException;
BufferedSink writeLongLe(long v) throws IOException;
BufferedSink writeDecimalLong(long v) throws IOException;
BufferedSink writeHexadecimalUnsignedLong(long v) throws IOException;
@Override void flush() throws IOException;
BufferedSink emit() throws IOException;
BufferedSink emitCompleteSegments() throws IOException;
OutputStream outputStream();
}
public interface BufferedSource extends Source {
Buffer buffer();
boolean exhausted() throws IOException;
void require(long byteCount) throws IOException;
boolean request(long byteCount) throws IOException;
byte readByte() throws IOException;
short readShort() throws IOException;
short readShortLe() throws IOException;
int readInt() throws IOException;
int readIntLe() throws IOException;
long readLong() throws IOException;
long readLongLe() throws IOException;
long readDecimalLong() throws IOException;
long readHexadecimalUnsignedLong() throws IOException;
void skip(long byteCount) throws IOException;
ByteString readByteString() throws IOException;
ByteString readByteString(long byteCount) throws IOException;
int select(Options options) throws IOException;
byte[] readByteArray() throws IOException;
byte[] readByteArray(long byteCount) throws IOException;
int read(byte[] sink) throws IOException;
void readFully(byte[] sink) throws IOException;
int read(byte[] sink, int offset, int byteCount) throws IOException;
void readFully(Buffer sink, long byteCount) throws IOException;
long readAll(Sink sink) throws IOException;
String readUtf8() throws IOException;
String readUtf8(long byteCount) throws IOException;
@Nullable String readUtf8Line() throws IOException;
String readUtf8LineStrict() throws IOException;
String readUtf8LineStrict(long limit) throws IOException;
int readUtf8CodePoint() throws IOException;
String readString(Charset charset) throws IOException;
String readString(long byteCount, Charset charset) throws IOException;
long indexOf(byte b) throws IOException;
long indexOf(byte b, long fromIndex) throws IOException;
long indexOf(byte b, long fromIndex, long toIndex) throws IOException;
long indexOf(ByteString bytes) throws IOException;
long indexOf(ByteString bytes, long fromIndex) throws IOException;
long indexOfElement(ByteString targetBytes) throws IOException;
long indexOfElement(ByteString targetBytes, long fromIndex) throws IOException;
boolean rangeEquals(long offset, ByteString bytes) throws IOException;
boolean rangeEquals(long offset, ByteString bytes, int bytesOffset, int byteCount)
throws IOException;
InputStream inputStream();
}
Buffer实现了BufferedSink和BufferedSource,是个集大成者,同时还增加了一些处理数据的操作,是一个可读、可写、可处理数据的缓存类。Buffer的数据操作依赖ByteString类,这个类配合着Buffer进行数据处理。由于篇幅限制,下面仅贴出Buffer中一些新增方法的声明,具体实现大家可自行查看源码。
public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
@Nullable Segment head;
long size;
public long size();
public Buffer copyTo(OutputStream out) throws IOException;
public Buffer copyTo(OutputStream out, long offset, long byteCount) throws IOException;
public Buffer copyTo(Buffer out, long offset, long byteCount);
public Buffer writeTo(OutputStream out) throws IOException;
public Buffer writeTo(OutputStream out, long byteCount) throws IOException;
public Buffer readFrom(InputStream in) throws IOException;
public Buffer readFrom(InputStream in, long byteCount) throws IOException;
private void readFrom(InputStream in, long byteCount, boolean forever) throws IOException;
public byte getByte(long pos);
int selectPrefix(Options options);
public void clear();
Segment writableSegment(int minimumCapacity);
List<Integer> segmentSizes();
public ByteString md5();
public ByteString sha1();
public ByteString sha256();
public ByteString sha512() ;
private ByteString digest(String algorithm);
public ByteString hmacSha1(ByteString key);
public ByteString hmacSha256(ByteString key);
public ByteString hmacSha512(ByteString key);
private ByteString hmac(String algorithm, ByteString key);
public ByteString snapshot();
public ByteString snapshot(int byteCount);
}
RealBufferedSink和RealBufferedSource是BufferedSink和BufferedSource的实现类,实现了接口的所有方法,同时内部拥有一个Buffer对象,是真正进行的缓冲读写的角色。
Okio类相当于一个简单工厂,对外暴露接口,可以产生各式各样的Sink和Source。
Buffer的存储容器用的不是数组,而是Segment类对象构成的循环链表,Segment用了享元模式,有SegmentPool对Segment进行管理。
定时模块主要由Timeout和其子类AnsycTimeout类组成。
缓存结构
缓存是Okio中最重要的部分,很多优化思想都体现在这里,非常值得学习。Okio的缓存设计在cpu利用率和内存利用率之间做了权衡,即时间与空间的权衡,精巧而高效。
缓存模块主要由Buffer,Segment,SegmentPool这三个类构成,三者之间的关系如下图所示。Buffer内实际存储数据的容器是一条由Segment构成的的循环链表。暂时不用的Segment由SegmentPool通过单链表保存,防止频繁GC,避免内存抖动,增加资源的重复利用,提高效率。
Segment是存储数据的基本单元,也是链表结构中的一个节点,其源码如下。
final class Segment {
static final int SIZE = 8192;
static final int SHARE_MINIMUM = 1024;
final byte[] data;
int pos;
int limit;
boolean shared;
boolean owner;
Segment next;
Segment prev;
Segment() {
this.data = new byte[SIZE];
this.owner = true;
this.shared = false;
}
Segment(Segment shareFrom) {
this(shareFrom.data, shareFrom.pos, shareFrom.limit);
shareFrom.shared = true;
}
Segment(byte[] data, int pos, int limit) {
this.data = data;
this.pos = pos;
this.limit = limit;
this.owner = false;
this.shared = true;
}
public @Nullable Segment pop() {
Segment result = next != this ? next : null;
prev.next = next;
next.prev = prev;
next = null;
prev = null;
return result;
}
public Segment push(Segment segment) {
segment.prev = this;
segment.next = next;
next.prev = segment;
next = segment;
return segment;
}
public Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
Segment prefix;
if (byteCount >= SHARE_MINIMUM) {
prefix = new Segment(this);
} else {
prefix = SegmentPool.take();
System.arraycopy(data, pos, prefix.data, 0, byteCount);
}
prefix.limit = prefix.pos + byteCount;
pos += byteCount;
prev.push(prefix);
return prefix;
}
public void compact() {
if (prev == this) throw new IllegalStateException();
if (!prev.owner) return; // Cannot compact: prev isn't writable.
int byteCount = limit - pos;
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
writeTo(prev, byteCount);
pop();
SegmentPool.recycle(this);
}
public void writeTo(Segment sink, int byteCount) {
if (!sink.owner) throw new IllegalArgumentException();
if (sink.limit + byteCount > SIZE) {
// We can't fit byteCount bytes at the sink's current position. Shift sink first.
if (sink.shared) throw new IllegalArgumentException();
if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
sink.limit -= sink.pos;
sink.pos = 0;
}
System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
sink.limit += byteCount;
pos += byteCount;
}
}
一个Segment可以分为三个部分,用pos和limit区分,如下图所示。红色部分的数据已经被读过了,为失效数据;绿色部分是刚写入的数据,还没有被读过;黄色部分还没有被使用,可以写入新数据。这个设计模仿了java.nio中的缓存设计,但却更加巧妙。java.nio中缓存读写操作需要调用很多额外的操作方法,如从写切换到读需要调用flip,客户需要对缓存的结构非常熟悉才能使用。而Okio的这种设计对用户是透明的,用户不需要清楚底层结构也能使用。
Segment提供的一些操作:
public Segment push(Segment segment)
节点插入。在调用该方法的节点后插入segment节点,并返回新插入的节点。public @Nullable Segment pop()
节点删除。在双向链表中删除调用该方法的节点,并返回后继节点。若该节点为头节点(此时只剩头节点,链表为空),则返回null。public Segment split(int byteCount)
节点分裂。将一个节点分裂成两个,第一个节点获得原节点[pos, pos+byteCount)区间的数据,第二个节点获得[pos+byteCount, limit)的数据,返回第一个节点。如下图所示
注意,这里有技巧。由于第一个节点是新产生的,如果第一个节点数据长度大于SHARE_MINIMUM(1024),那么就调用拷贝构造函数创造新节点,拷贝构造函数做的是浅拷贝,即两个节点都持有同一个data数组的引用,这样就省去了开辟内存及复制内存的开销。若小于,则从SegmentPool中取出一个节点,并做真实的数据拷贝。Avoid short shared segments. These are bad for performance because they are readonly and may lead to long chains of short segments.(这句话是大佬的原文,怕翻译的不好没有翻译) 可以看出,这是一个权衡性的设计。
public void compact()
节点合并。当前驱节点没有被共享时,若两个节点可以合并(两个节点的数据长度小于SIZE(8192)),则将该节点的数据写入前驱节点,并回收该节点。public void writeTo(Segment sink, int byteCount)
将sink节点的前byteCount个字节写入到调用该方法的节点,当该节点的尾部长度不足byteCount时,会将该节点的数据字段前移pos位,与首部对齐。
SegmentPool非常简单,其内部维持一条单链表保存暂时不用的Segment,缓存池的大小限制为64KB,所以最多能保存8个Segment。SegmentPool提供两个同步方法,分别用来存取Segment。
final class SegmentPool {
static final long MAX_SIZE = 64 * 1024; // 64 KiB.
static @Nullable Segment next;
static long byteCount;
private SegmentPool() {
}
static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) {
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
}
static void recycle(Segment segment) {
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
if (segment.shared) return; // This segment cannot be recycled.
synchronized (SegmentPool.class) {
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
byteCount += Segment.SIZE;
segment.next = next;
segment.pos = segment.limit = 0;
next = segment;
}
}
}
真正做Segment分裂、合并的地方是Buffer类中的write(Buffer source, long byteCount)方法,该方法把传入的source Buffer的前byteCount个字节写到调用该方法的Buffer中去。由于两个Buffer里的数据结构都是循环链表,所以写入过程是将source链表的节点按从头到尾的顺序一个个取下来,然后插入到被写入到链表,并看看新插入的节点能否和前一个节点合并。如果要写的只是一个Segment的部分数据,那么这个Segment进行分裂,把要写的数据分裂出来。
public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
// ...
@Override
public void write(Buffer source, long byteCount) {
if (source == null) throw new IllegalArgumentException("source == null");
if (source == this) throw new IllegalArgumentException("source == this");
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
// Is a prefix of the source's head segment all that we need to move?
if (byteCount < (source.head.limit - source.head.pos)) {
Segment tail = head != null ? head.prev : null;
if (tail != null && tail.owner
&& (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
// Our existing segments are sufficient. Move bytes from source's head to our tail.
source.head.writeTo(tail, (int) byteCount);
source.size -= byteCount;
size += byteCount;
return;
} else {
source.head = source.head.split((int) byteCount);
}
}
// Remove the source's head segment and append it to our tail.
Segment segmentToMove = source.head;
long movedByteCount = segmentToMove.limit - segmentToMove.pos;
source.head = segmentToMove.pop();
if (head == null) {
head = segmentToMove;
head.next = head.prev = head;
} else {
Segment tail = head.prev;
tail = tail.push(segmentToMove);
tail.compact();
}
source.size -= movedByteCount;
size += movedByteCount;
byteCount -= movedByteCount;
}
}
}
好了,到这Okio的缓存结构已经看得很清楚了。
定时机制
基类Timeout
Okio中使用Timeout类来控制I/O的定时操作。该定时机制使用了时间段和绝对时间点两种计算定时的方式,可以选择使用其中一种。下面我们看其源码
public class Timeout {
private boolean hasDeadline;
private long deadlineNanoTime;
private long timeoutNanos;
// ...
public void throwIfReached() throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException("thread interrupted");
}
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
throw new InterruptedIOException("deadline reached");
}
}
public final void waitUntilNotified(Object monitor) throws InterruptedIOException {
try {
boolean hasDeadline = hasDeadline();
long timeoutNanos = timeoutNanos();
if (!hasDeadline && timeoutNanos == 0L) {
monitor.wait(); // There is no timeout: wait forever.
return;
}
// Compute how long we'll wait.
long waitNanos;
long start = System.nanoTime();
if (hasDeadline && timeoutNanos != 0) {
long deadlineNanos = deadlineNanoTime() - start;
waitNanos = Math.min(timeoutNanos, deadlineNanos);
} else if (hasDeadline) {
waitNanos = deadlineNanoTime() - start;
} else {
waitNanos = timeoutNanos;
}
// Attempt to wait that long. This will break out early if the monitor is notified.
long elapsedNanos = 0L;
if (waitNanos > 0L) {
long waitMillis = waitNanos / 1000000L;
monitor.wait(waitMillis, (int) (waitNanos - waitMillis * 1000000L));
elapsedNanos = System.nanoTime() - start;
}
// Throw if the timeout elapsed before the monitor was notified.
if (elapsedNanos >= waitNanos) {
throw new InterruptedIOException("timeout");
}
} catch (InterruptedException e) {
throw new InterruptedIOException("interrupted");
}
}
}
可以看出Timeout类处理超时的机制比较简单,首先是有3个实例变量:
private boolean hasDeadline; // 是否设置了超时的时间点
private long deadlineNanoTime; // 超时时间点
private long timeoutNanos; // 超时时间段
然后有一堆getter和setter方法,没有什么好说的,代码中为了简洁也没有列出来。而针对定时处理的方法有两个:
public void throwIfReached() throws IOException
如果当前线程被中断了或者定时时间点到了,抛出中断异常。public final void waitUntilNotified(Object monitor) throws InterruptedIOException
首先是处理没有等待时长的特殊情况,即无限期等待,直到有人唤醒。如果设置了等待时长,则计算时长以后进入等待状态,并等待一定时间。定时时间到了之后抛出中断异常。
异步事件定时类AsyncTimeout
真正实现异步事件定时的类是AsyncTimeout类,该类继承自TimeOut类,主要的逻辑如下图所示。类中维护着一条由AsyncTimeout对象构成的异步事件最小剩余时间优先队列(由单列表实现),即最先超时的节点在队首。类中定义了一个内部类WatchDog(看门狗),看门狗将作为守护线程在后台运行,不断取出队首元素并判断是否到达定时时间,若到达定时时间则执行该AsyncTimeout节点对象的timedOut方法。timedOut方法为空方法,需要在继承的子类中重写。
AsyncTimeout类有两个方法用于包装输入和输出,source和sink,这两个方法都返回代理对象。通过源码可以看出source和sink方法都会先调用enter方法将异步事件放入队列,再执行真实对象的输入、输出方法,当然若出现异常或者在超时之前读写完成将调用exit函数进入异常处理。
public class AsyncTimeout extends Timeout {
// ...
static @Nullable AsyncTimeout head;
private boolean inQueue;
private @Nullable AsyncTimeout next;
private long timeoutAt;
protected void timedOut() {
}
public final Source source(final Source source) {
return new Source() {
@Override
public long read(Buffer sink, long byteCount) throws IOException {
boolean throwOnTimeout = false;
enter();
try {
long result = source.read(sink, byteCount);
throwOnTimeout = true;
return result;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override
public void close() throws IOException {
boolean throwOnTimeout = false;
try {
source.close();
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override
public Timeout timeout() {
return AsyncTimeout.this;
}
// ...
};
}
public final Sink sink(final Sink sink) {
return new Sink() {
@Override
public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0L) {
// Count how many bytes to write. This loop guarantees we split on a segment boundary.
long toWrite = 0L;
for (Segment s = source.head; toWrite < TIMEOUT_WRITE_SIZE; s = s.next) {
int segmentSize = s.limit - s.pos;
toWrite += segmentSize;
if (toWrite >= byteCount) {
toWrite = byteCount;
break;
}
}
// Emit one write. Only this section is subject to the timeout.
boolean throwOnTimeout = false;
enter();
try {
sink.write(source, toWrite);
byteCount -= toWrite;
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
}
@Override
public void flush() throws IOException {
boolean throwOnTimeout = false;
enter();
try {
sink.flush();
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override
public void close() throws IOException {
boolean throwOnTimeout = false;
enter();
try {
sink.close();
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override
public Timeout timeout() {
return AsyncTimeout.this;
}
// ...
};
}
}
enter方法将节点放入异步事件队列,而真正执行放入队列的操作的是scheduleTimeout(AsyncTimeout node, long timeoutNanos, boolean hasDeadline)方法。该方法为同步方法,若队列为空就创建队列,并创建守护线程看门狗,之后计算事件被触发的剩余时间,并将事件放入队列,如果新放入队列的元素是在队首,就唤醒看门狗,检查该事件是否超时。
public class AsyncTimeout extends Timeout {
// ...
public final void enter() {
if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
long timeoutNanos = timeoutNanos();
boolean hasDeadline = hasDeadline();
if (timeoutNanos == 0 && !hasDeadline) {
return; // No timeout and no deadline? Don't bother with the queue.
}
inQueue = true;
scheduleTimeout(this, timeoutNanos, hasDeadline);
}
private static synchronized void scheduleTimeout(
AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
// Start the watchdog thread and create the head node when the first timeout is scheduled.
if (head == null) {
head = new AsyncTimeout();
new Watchdog().start();
}
long now = System.nanoTime();
if (timeoutNanos != 0 && hasDeadline) {
node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
} else if (timeoutNanos != 0) {
node.timeoutAt = now + timeoutNanos;
} else if (hasDeadline) {
node.timeoutAt = node.deadlineNanoTime();
} else {
throw new AssertionError();
}
// Insert the node in sorted order.
long remainingNanos = node.remainingNanos(now);
for (AsyncTimeout prev = head; true; prev = prev.next) {
if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
node.next = prev.next;
prev.next = node;
if (prev == head) {
AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
}
break;
}
}
}
private long remainingNanos(long now) {
return timeoutAt - now;
}
}
异常处理涉及以下几个方法,具体就是将事件从队列中移除并抛出合适的异常。
public class AsyncTimeout extends Timeout {
// ...
final void exit(boolean throwOnTimeout) throws IOException {
boolean timedOut = exit();
if (timedOut && throwOnTimeout) throw newTimeoutException(null);
}
final IOException exit(IOException cause) throws IOException {
if (!exit()) return cause;
return newTimeoutException(cause);
}
public final boolean exit() {
if (!inQueue) return false;
inQueue = false;
return cancelScheduledTimeout(this);
}
// Returns true if the timeout occurred.
private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
// Remove the node from the linked list.
for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
if (prev.next == node) {
prev.next = node.next;
node.next = null;
return false;
}
}
// The node wasn't found in the linked list: it must have timed out!
return true;
}
protected IOException newTimeoutException(@Nullable IOException cause) {
InterruptedIOException e = new InterruptedIOException("timeout");
if (cause != null) {
e.initCause(cause);
}
return e;
}
}
看门狗调用同步方法每次从队列中取出队首元素,若发现队列为空就休眠IDLE_TIMEOUT_MILLIS(1分钟),休眠完成后,若还是为空则线程退出。取出后检查队首元素的定时时间,发现还没到,则休眠剩余时间;发现已超时,则回掉队首元素的timedOut()方法,并将该元素弹出队列。看门狗设计的非常高效,没有任务的时候处于休眠或退出状态。
public class AsyncTimeout extends Timeout {
private static final long IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60);
private static final long IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS);
private static final class Watchdog extends Thread {
Watchdog() {
super("Okio Watchdog");
setDaemon(true);
}
public void run() {
while (true) {
try {
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {
timedOut = awaitTimeout();
// Didn't find a node to interrupt. Try again.
if (timedOut == null) continue;
// The queue is completely empty. Let this thread exit and let another watchdog thread
// get created on the next call to scheduleTimeout().
if (timedOut == head) {
head = null;
return;
}
}
// Close the timed out node.
timedOut.timedOut();
} catch (InterruptedException ignored) {
}
}
}
}
static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {
// Get the next eligible node.
AsyncTimeout node = head.next;
// The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
if (node == null) {
long startNanos = System.nanoTime();
AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);
return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
? head // The idle timeout elapsed.
: null; // The situation has changed.
}
long waitNanos = node.remainingNanos(System.nanoTime());
// The head of the queue hasn't timed out yet. Await that.
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
return null;
}
// The head of the queue has timed out. Remove it.
head.next = node.next;
node.next = null;
return node;
}
}
自定义字符串ByteString
ByteString是自定义的字节字符串类,此类被设计为不可变的(创建后之后不能修改其数据),和String类似。当然,Java语言可没有不可变标记关键字,如果想要实现一个不可变的对象,还需要一些操作。
- 不要提供任何会修改对象状态的方法
- 保证类不会被扩展
- 使所有的域都是final的
- 使所有的域都是private的
- 确保对于任何可变组件的互斥访问
不可变的对象有许多的好处,首先本质是线程安全的,不要求同步处理,也就是没有锁之类的性能问题,而且可以被自由的共享内部信息,当然坏处就是需要创建大量的类的对象。
ByteString不仅是不可变的,同时在内部有两个filed,分别是byte[]数据,以及String的数据,这样能够让这个类在Byte和String转换上基本没有开销,同样的也需要保存两份引用,这是明显的空间换时间的方式,为了性能Okio做了很多的事情。但是这个String前面有 transient 关键字标记,也就是说不会进入序列化和反序列化,反序列化的过程会进行懒加载,节省开销。
ByteString提供了哪些功能,我们看一下方法就一目了然。
public class ByteString implements Serializable, Comparable<ByteString> {
final byte[] data;
transient int hashCode; // Lazily computed; 0 if unknown.
transient String utf8; // Lazily computed.
ByteString(byte[] data);
public static ByteString of(byte... data);
public static ByteString of(byte[] data, int offset, int byteCount);
public static ByteString of(ByteBuffer data);
public static ByteString encodeUtf8(String s);
public static ByteString encodeString(String s, Charset charset);
public String utf8();
public String string(Charset charset);
public String base64();
public ByteString md5();
public ByteString sha1();
public ByteString sha256();
public ByteString sha512();
private ByteString digest(String algorithm);
public ByteString hmacSha1(ByteString key);
public ByteString hmacSha256(ByteString key);
public ByteString hmacSha512(ByteString key);
private ByteString hmac(String algorithm, ByteString key);
public String base64Url();
public static @Nullable ByteString decodeBase64(String base64);
public String hex();
public static ByteString decodeHex(String hex);
private static int decodeHexDigit(char c);
public static ByteString read(InputStream in, int byteCount) throws IOException;
public ByteString toAsciiLowercase();
public ByteString toAsciiUppercase();
public ByteString substring(int beginIndex);
public ByteString substring(int beginIndex, int endIndex);
public int size();
public byte[] toByteArray();
byte[] internalArray();
public ByteBuffer asByteBuffer();
public void write(OutputStream out) throws IOException;
void write(Buffer buffer);
public boolean rangeEquals(int offset, ByteString other, int otherOffset, int byteCount);
public boolean rangeEquals(int offset, byte[] other, int otherOffset, int byteCount);
public final boolean startsWith(ByteString prefix);
public final boolean startsWith(byte[] prefix);
public final boolean endsWith(ByteString suffix);
public final boolean endsWith(byte[] suffix);
public final int indexOf(ByteString other);
public final int indexOf(ByteString other, int fromIndex);
public final int indexOf(byte[] other);
public int indexOf(byte[] other, int fromIndex);
public final int lastIndexOf(ByteString other);
public final int lastIndexOf(ByteString other, int fromIndex);
public final int lastIndexOf(byte[] other);
public int lastIndexOf(byte[] other, int fromIndex);
@Override public boolean equals(Object o);
@Override public int hashCode();
@Override public int compareTo(ByteString byteString);
@Override public String toString();
static int codePointIndexToCharIndex(String s, int codePointCount);
private void readObject(ObjectInputStream in) throws IOException;
private void writeObject(ObjectOutputStream out) throws IOException;
}
流程分析
阻塞调用
让我们再回过头来看看文章开始的那个同步调用是个怎样的流程,代码如下。
Okio.buffer(Okio.sink(file))
.writeUtf8("write string by utf-8.\n")
.writeInt(1234).close();
先看看Okio.sink(file)。
// Okio.java
public static Sink sink(File file) throws FileNotFoundException {
if (file == null) throw new IllegalArgumentException("file == null");
return sink(new FileOutputStream(file));
}
public static Sink sink(OutputStream out) {
return sink(out, new Timeout());
}
private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached();
Segment head = source.head;
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
out.write(head.data, head.pos, toCopy);
head.pos += toCopy;
byteCount -= toCopy;
source.size -= toCopy;
if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}
@Override public void flush() throws IOException {
out.flush();
}
@Override public void close() throws IOException {
out.close();
}
@Override public Timeout timeout() {
return timeout;
}
@Override public String toString() {
return "sink(" + out + ")";
}
};
}
从源码可以看出Okio.sink(file)最终会调用Okio.sink(final OutputStream in, final Timeout timeout)方法。传入的OutputStream对象是new出来的FileOutputStream对象,到这里我们可以看出,Sink只是包裹了Java原生流,可以看成原生流的代理,包装了写操作增加了一些处理,最终底层的写操作将由FileOutputStream完成。传入的Timeout对象是通过默认构造函数new出来的Timeout对象,没有设置延时。
调用最终返回一个Sink对象,这个对象重写了write(Buffer source, long byteCount)方法,是为了RealBufferSink作准备,该方法将Buffer里的byteCount个字节写入到Java原生流中,写操作会改变Buffer的size以及涉及到的Segment的状态。需要注意的是,若timeout设置了定时,则将延迟设置的时间,直到超时后才写数据,这是一个阻塞I/O。返回的Sink对象也重写了close(),flush()等方法,实际上都是对Java原生流的操作。
得到Sink对象后将进入Okio.buffer(Sink sink)方法。
// Okio.java
public static BufferedSink buffer(Sink sink) {
return new RealBufferedSink(sink);
}
这个方法非常简单,仅仅是new了一个RealBufferedSink对象就返回了。构造把Sink对象传进去了,RealBufferedSink内部持有传入的Sink,也可以看成是Sink的代理,各种操作都是在Sink上操作。RealBufferedSink内部也持有一个Buffer对象,作为缓存数据的容器。
之后调用就到了RealBufferedSink.writeUtf8(String string)方法。
// RealBufferedSink.java
@Override public BufferedSink writeUtf8(String string) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.writeUtf8(string);
return emitCompleteSegments();
}
// Buffer.java
@Override public Buffer writeUtf8(String string) {
return writeUtf8(string, 0, string.length());
}
@Override public Buffer writeUtf8(String string, int beginIndex, int endIndex) {
if (string == null) throw new IllegalArgumentException("string == null");
if (beginIndex < 0) throw new IllegalArgumentException("beginIndex < 0: " + beginIndex);
if (endIndex < beginIndex) {
throw new IllegalArgumentException("endIndex < beginIndex: " + endIndex + " < " + beginIndex);
}
if (endIndex > string.length()) {
throw new IllegalArgumentException(
"endIndex > string.length: " + endIndex + " > " + string.length());
}
// Transcode a UTF-16 Java String to UTF-8 bytes.
for (int i = beginIndex; i < endIndex;) {
int c = string.charAt(i);
if (c < 0x80) {
Segment tail = writableSegment(1);
byte[] data = tail.data;
int segmentOffset = tail.limit - i;
int runLimit = Math.min(endIndex, Segment.SIZE - segmentOffset);
// Emit a 7-bit character with 1 byte.
data[segmentOffset + i++] = (byte) c; // 0xxxxxxx
// Fast-path contiguous runs of ASCII characters. This is ugly, but yields a ~4x performance
// improvement over independent calls to writeByte().
while (i < runLimit) {
c = string.charAt(i);
if (c >= 0x80) break;
data[segmentOffset + i++] = (byte) c; // 0xxxxxxx
}
int runSize = i + segmentOffset - tail.limit; // Equivalent to i - (previous i).
tail.limit += runSize;
size += runSize;
} else if (c < 0x800) {
// Emit a 11-bit character with 2 bytes.
writeByte(c >> 6 | 0xc0); // 110xxxxx
writeByte(c & 0x3f | 0x80); // 10xxxxxx
i++;
} else if (c < 0xd800 || c > 0xdfff) {
// Emit a 16-bit character with 3 bytes.
writeByte(c >> 12 | 0xe0); // 1110xxxx
writeByte(c >> 6 & 0x3f | 0x80); // 10xxxxxx
writeByte(c & 0x3f | 0x80); // 10xxxxxx
i++;
} else {
// c is a surrogate. Make sure it is a high surrogate & that its successor is a low
// surrogate. If not, the UTF-16 is invalid, in which case we emit a replacement character.
int low = i + 1 < endIndex ? string.charAt(i + 1) : 0;
if (c > 0xdbff || low < 0xdc00 || low > 0xdfff) {
writeByte('?');
i++;
continue;
}
// UTF-16 high surrogate: 110110xxxxxxxxxx (10 bits)
// UTF-16 low surrogate: 110111yyyyyyyyyy (10 bits)
// Unicode code point: 00010000000000000000 + xxxxxxxxxxyyyyyyyyyy (21 bits)
int codePoint = 0x010000 + ((c & ~0xd800) << 10 | low & ~0xdc00);
// Emit a 21-bit character with 4 bytes.
writeByte(codePoint >> 18 | 0xf0); // 11110xxx
writeByte(codePoint >> 12 & 0x3f | 0x80); // 10xxxxxx
writeByte(codePoint >> 6 & 0x3f | 0x80); // 10xxyyyy
writeByte(codePoint & 0x3f | 0x80); // 10yyyyyy
i += 2;
}
}
return this;
}
Segment writableSegment(int minimumCapacity) {
if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
if (head == null) {
head = SegmentPool.take(); // Acquire a first segment.
return head.next = head.prev = head;
}
Segment tail = head.prev;
if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
}
return tail;
}
RealBufferedSink的writeUtf8方法调用其内部Buffer的writeUtf8方法,最终String以“utf-8”编码写入了Buffer里。"utf-8"是一种变长前缀码,相当于在Unicode的基础上做了个信源压缩。
注意,在每次真实的写之前会调用writableSegment(int minimumCapacity)方法,以获得足够写入大小的容器。
写操作完成后将调用emitCompleteSegments()方法,我们继续跟进去看一看。
// RealBufferedSink.java
@Override public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.completeSegmentByteCount();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
// Buffer.java
public long completeSegmentByteCount() {
long result = size;
if (result == 0) return 0;
// Omit the tail if it's still writable.
Segment tail = head.prev;
if (tail.limit < Segment.SIZE && tail.owner) {
result -= tail.limit - tail.pos;
}
return result;
}
这段代码的逻辑就是写操作完成后计算Buffer中可写的数据量,由于最后一个Segment有可能不满,所以要特殊处理下。然后根据计算出的字节数执行Sink的写操作,将数据写入FileOutputStream中。
RealBufferSink确实比Sink多了缓存的作用,先将数据写到Buffer里,写操作完成后再把Buffer中缓存的数据一把写到流中。
至此将String写入流中已经完毕了。写入Int的过程非常类似没有太多好说的。
// RealBufferedSink.java
@Override public BufferedSink writeInt(int i) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.writeInt(i);
return emitCompleteSegments();
}
// Buffer.java
@Override public Buffer writeInt(int i) {
Segment tail = writableSegment(4);
byte[] data = tail.data;
int limit = tail.limit;
data[limit++] = (byte) ((i >>> 24) & 0xff);
data[limit++] = (byte) ((i >>> 16) & 0xff);
data[limit++] = (byte) ((i >>> 8) & 0xff);
data[limit++] = (byte) (i & 0xff);
tail.limit = limit;
size += 4;
return this;
}
最后是调用RealBufferedSink.close方法关闭流。
// RealBufferedSink.java
@Override public void close() throws IOException {
if (closed) return;
Throwable thrown = null;
try {
if (buffer.size > 0) {
sink.write(buffer, buffer.size);
}
} catch (Throwable e) {
thrown = e;
}
try {
sink.close();
} catch (Throwable e) {
if (thrown == null) thrown = e;
}
closed = true;
if (thrown != null) Util.sneakyRethrow(thrown);
}
close方法首先会检查Buffer中是否还有未写入的数据,若有则一把写入到流里,不这样的话就内存泄漏了,Buffer中的数据永远得不到处理,没用的Segment也不会回收。最后将执行Sink的关闭操作,其实就是关闭掉FileOutputStream流。
至此整个阻塞调用的流程已经分析完了,可以看出Okio的阻塞IO与Java的阻塞IO是非常相似的,主要是在缓存上做了优化。
之所以叫阻塞IO,是指IO调用会使线程阻塞,直到IO完成时线程才继续执行。
非阻塞调用
我们将上例中的file换成socket就变成了一个非阻塞的调用。
Okio.buffer(Okio.sink(socket))
.writeUtf8("write string by utf-8.\n")
.writeInt(1234).close();
依然从Okio.sink(socket)开始看。
// Okio.java
public static Sink sink(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
AsyncTimeout timeout = timeout(socket);
Sink sink = sink(socket.getOutputStream(), timeout);
return timeout.sink(sink);
}
private static AsyncTimeout timeout(final Socket socket) {
return new AsyncTimeout() {
@Override protected IOException newTimeoutException(@Nullable IOException cause) {
InterruptedIOException ioe = new SocketTimeoutException("timeout");
if (cause != null) {
ioe.initCause(cause);
}
return ioe;
}
@Override protected void timedOut() {
try {
socket.close();
} catch (Exception e) {
logger.log(Level.WARNING, "Failed to close timed out socket " + socket, e);
} catch (AssertionError e) {
if (isAndroidGetsocknameError(e)) {
logger.log(Level.WARNING, "Failed to close timed out socket " + socket, e);
} else {
throw e;
}
}
}
};
}
可以看出sink方法首先调用timeout方法产生一个AsyncTimeout对象,该对象重写了timedOut方法,超时则将socket关闭。之后将调用sink(final OutputStream out, final Timeout timeout)创建原生流的代理对象,这与之前的逻辑一样。最后调用timeout.sink(sink),把异步事件放入定时队列,并返回经过AsyncTimeout包装的sink对象。之后的逻辑和之前一摸一样,也没有什么好分析的了。
这个IO是非阻塞的,线程不会因为等待网络数据而一致阻塞,超时的IO操作会被看门狗移出队列,并回调timedOut方法,具体就是把socket关闭。
总结
到这里整个Okio框架的解析就结束。由于篇幅和时间的限制很多功能和模块没有写出来,如Pipe,以及一些实现压缩、转码的类,不过着无伤大雅,我们已经能看清楚Okio的核心部分,并体会到其优化思想,总结如下:
- 使用方便。对比Java IO和Okio我们可以看出OKio使用更方便,支持链式调用,代码简洁、优美。缓存等功能对用户都是透明的,不需要了解底层结构也嫩方便实用。
- 功能整合。Java IO进行不同的读写功能需要包裹各种装饰类,而Okio把各种读写操作都整合了起来,不需要串上一堆装饰类。
- cpu和内存的优化。数据容器采用循环链表实现,Segment通过分裂、合并、共享等操作避免了拷贝操作。SegmentPool会对暂时不用的Segment回收保存,避免频繁GC。看门狗在没任务的时候都处于休眠状态,不占用cpu。ByteString通过空间换时间,同时懒加载实现了cpu优化。
- 功能强大。支持阻塞IO和非阻塞IO,提供了一系列的方便工具,如GZip的透明处理,对数据计算md5、sha1等都提供了支持,对数据校验非常方便。
最后贴出一些其他分析Okio写得不错的文章,本文在一定程度上参考了它们
OKio - 重新定义“短小精悍”
大概是最完全的Okio源码解析文章
深入理解okio的优化思想