Android开源项目原理系列
okio是okhttp的io部分,可以单独使用。知道它的存在后,我就爱不释手,因为它真的十分精彩。
按官方的介绍,okio是对java io和nio的补充,以便更加容易处理数据输入输出。
demo
private void write(File file) throws IOException {
try (Sink sink = Okio.sink(file);
BufferedSink bufferedSink = Okio.buffer(sink)) {
bufferedSink.writeUtf8("寥落古行宫").writeUtf8(System.lineSeparator());
bufferedSink.writeUtf8("宫花寂寞红").writeUtf8(System.lineSeparator());
bufferedSink.writeUtf8("白头宫女在").writeUtf8(System.lineSeparator());
bufferedSink.writeUtf8("闲坐说玄宗").writeUtf8(System.lineSeparator());
}
}
private void read(File file) throws IOException {
try (Source source = Okio.source(file);
BufferedSource bufferedSource = Okio.buffer(source)) {
for (String line; (line = bufferedSource.readUtf8Line()) != null; ) {
System.out.println(line);
}
}
}
先来看okio的基本使用,例子是一个写方法、一个读方法,Sink和Source对应java io的OutputStream和InputStream。
try后面加了个括号,这是java7对try-catch的改进。结束try里的代码时,自动执行括号对象的close方法(对象需要实现Closeable),io代码的福音。
okio的代码不多,借助idea的show diagrams功能,将okio全部类和继承关系打印出来:
ByteString
ByteString是一个不可变的字节串,引用官方对ByteString诙谐的介绍:
ByteString is String's long-lost brother, making it easy to treat binary data as a value.
final byte[] data;
transient String utf8; // Lazily computed.
数据在ByteString里同时保存为byte[]和String,所以两者的转换易如反掌(都存成两份了,哈),典型的空间换时间。注意到String标记成transient,所以在序列化中,只处理byte[],无必要浪费功夫多理String。
有了ByteString,各种字符转换和处理就很容易了(一些工具类可以干掉了),挑几个方法看看:
public String utf8() {
String result = utf8;
return result != null ? result : (utf8 = new String(data, Util.UTF_8));
}
public String base64() {
return Base64.encode(data);
}
public ByteString md5() {
return digest("MD5");
}
public ByteString sha1() {
return digest("SHA-1");
}
Source和Sink
okio对数据的处理和java io类似,使用了流的概念。Source是输入流,Sink是输出流,分别定义了基础的read/write方法。
我们知道,java io是“装饰者模式”的经典实现,在InputStream/OutputStream的基础上,有很多装饰类,动态地为流添加各种各样的功能。这是个很好的设计模式,但个人感觉类多了点,而Source/Sink的实现类只有几个,用得最多是buffer相关的类,提供大部分方法,简单些又不失功能。
网络上偷张《Head First设计模式》讲装饰者模式的图:
Buffer
和ByteString处理不可变字节串相对,Buffer处理的是可变字节串。定义在BufferedSource/BufferedSink,对应的实现类是RealBufferedSource/RealBufferedSink。回顾上图okio所有类,Buffer在Source/Sink的中间,打通两者的操作。
每个BufferedSource/BufferedSink都会创建自己的Buffer,一一对应:
public final Buffer buffer = new Buffer();
segment
缓存操作的数据结构是segment,可以想象成一块块片段组成了整个buffer。
final byte[] data;
int pos;
int limit;
boolean shared;
boolean owner;
Segment next;
Segment prev;
segment其实就是长度固定的数组(SIZE=8192),两个指针pos和limit分别指示开始和结束的index,next和prev将segment构成一个双向链表。segmen还有两个布尔标记共享:
- shared:是否和其他segment共享数据;
- owner:是否独享数据。
okio对缓存操作的核心思想就是:
在buffer之间共享segment。
在java io中,流与流的缓存是无联系的,如果输入流的数据要写入输出流,需要将数据复制过去。在okio中,相同的动作,只需要将segment从一个buffer分配给另一个buffer。好处,显而易见。
- push/pop
- compact
- spilt
- writeTo
上面几个是操作segment的函数,在后文会用到,早点准备。函数的逻辑实质是链表的操作,其中push/pop将segment加入或移出链表,小学水平,路过。
public void writeTo(Segment sink, int byteCount) {
if (!sink.owner) throw new IllegalArgumentException();
if (sink.limit + byteCount > SIZE) {
// We can’t fit byteCount bytes at the sink’s current position. Shift sink first.
if (sink.shared) throw new IllegalArgumentException();
if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
sink.limit -= sink.pos;
sink.pos = 0;
}
System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
sink.limit += byteCount;
pos += byteCount;
}
writeTo将当前segment中的数据写入另一个segment,写入前需要判断owner和sheared。目标segment的空间需要足够,但有可能是read导致pos向后移,这个时候pos会大于0。已读完的数据可以覆盖,所以先移一移数据。剩下的,就是当前segment数组内容复制到目标segment数组。
public void compact() {
if (prev == this) throw new IllegalStateException();
if (!prev.owner) return; // Cannot compact: prev isn‘t writable.
int byteCount = limit - pos;
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
writeTo(prev, byteCount);
pop();
SegmentPool.recycle(this);
}
为了节约内存空间,空闲的segment会尝试合并前一个segment。如果空间足够容纳,当前segment的内容writeTo前一个segment,然后回收当前segment。
public Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
Segment prefix;
// We have two competing performance goals:
// - Avoid copying data. We accomplish this by sharing segments.
// - Avoid short shared segments. These are bad for performance because they are readonly and
// may lead to long chains of short segments.
// To balance these goals we only share segments when the copy will be large.
if (byteCount >= SHARE_MINIMUM) {
prefix = sharedCopy();
} else {
prefix = SegmentPool.take();
System.arraycopy(data, pos, prefix.data, 0, byteCount);
}
prefix.limit = prefix.pos + byteCount;
pos += byteCount;
prev.push(prefix);
return prefix;
}
split方法将一个segment分割成两个segment,前者保存的数据范围:pos..pos+byteCount,后者保存的数据范围:pos+byteCount..limit。在创建新segment时,会根据内容多少控制segment是否shared(size>=1024),避免共享太细的segment。
SegmentPool
为了提高性能,无用的segment不会白白丢弃,SegmentPool提供了回收机制(享元模式,另一个例子是Handler发送的Message)。
static void recycle(Segment segment) {
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
if (segment.shared) return; // This segment cannot be recycled.
synchronized (SegmentPool.class) {
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
byteCount += Segment.SIZE;
segment.next = next;
segment.pos = segment.limit = 0;
next = segment;
}
}
recycle方法将无用的segment放入SegmentPool的单链表存着,下次重复使用时,调用take取出来:
static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) {
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment(); // Pool is empty. Don’t zero-fill while holding a lock.
}
有存货就从链表里取出来,无就新建。
write
okio的缓存实质就是对segment读写,停一分钟想想对segment的write/read需要考虑什么问题。
好了,segment里数组的长度固定,我们读写的数据可能是各种格式:Int、Long、String等。有可能读写发生在一个segment,也可能需要跨多个segment。
具体来看例子里调用BufferedSink的writeUtf8,里面调用buffer的writeUtf8。写入一段string会比较长,writeUtf8进行分拆处理,核心是对分拆的字符调用writeByte方法。
@Override public Buffer writeByte(int b) {
Segment tail = writableSegment(1);
tail.data[tail.limit++] = (byte) b;
size += 1;
return this;
}
写入操作就是两步:1、获取一个剩余空间足够的segment;2、数据写入segment里的数组。
Segment writableSegment(int minimumCapacity) {
if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
if (head == null) {
head = SegmentPool.take(); // Acquire a first segment.
return head.next = head.prev = head;
}
Segment tail = head.prev;
if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
}
return tail;
}
writableSegment入参是最低要求的空间长度,如果head不存在,直接新建;否则通过head.prev取得链表尾,检查长度并返回。其他一堆write方法,根据数据类型,控制写入长度,大同小异。
最精华的地方来了,来看write(Buffer source, long byteCount) ,入参是另一个buffer:
@Override public void write(Buffer source, long byteCount) {
if (source == null) throw new IllegalArgumentException("source == null");
if (source == this) throw new IllegalArgumentException("source == this");
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
//1
if (byteCount < (source.head.limit - source.head.pos)) {
Segment tail = head != null ? head.prev : null;
if (tail != null && tail.owner
&& (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
//2
source.head.writeTo(tail, (int) byteCount);
source.size -= byteCount;
size += byteCount;
return;
} else {
//3
source.head = source.head.split((int) byteCount);
}
}
// 4
Segment segmentToMove = source.head;
long movedByteCount = segmentToMove.limit - segmentToMove.pos;
source.head = segmentToMove.pop();
if (head == null) {
head = segmentToMove;
head.next = head.prev = head;
} else {
Segment tail = head.prev;
tail = tail.push(segmentToMove);
tail.compact();
}
source.size -= movedByteCount;
size += movedByteCount;
byteCount -= movedByteCount;
}
}
从别的buffer读取内容写入当前buffer,正是前文提到的,okio利用segment共享提高性能的体现。来源buffer需要考虑跨segment,目标buffer也需要考虑跨segment。
进入循环,每次操作一段数据,从byteCount减去本次操作数据的长度,直到byteCount为零。mark1判断是否source.head包括所有要操作的数据,是的话就比较简单:
source:[aabbbb] -> [bbbb]
sink:[head]-[10%] -> [head]-[10%+aa]
mark2,source.head剩余[aa]需要输入,尾segment能够容纳,直接调用writeTo,最终[aa]复制进尾segment。
source:[aabbbb] -> [aa]-[bbbb]
sink:[head]-[80%] -> [head]-[80%]-[aa]
mark3,source[aabbbb]还剩一段[aa]需要写入,尾segment不能容纳,无法使用writeTo。这个时候,需要将source分割成[aa]-[bbbb]两个,在mark4中将[aa]接入尾segment。
mark4就是source的segment接入当前buffer的过程,当操作完成后,会尝试compact节约空间。
纵观上述的过程,只有mark2有复制数据的动作,而且也是必须的,节约了内存。其他情况都是将segment指向不同的buffer,非常节约性能,真是666。
read
再来看read方法,例子里调用的readUtf8Line最终调用了buffer的readString:
@Override public String readString(long byteCount, Charset charset) throws EOFException {
checkOffsetAndCount(size, 0, byteCount);
if (charset == null) throw new IllegalArgumentException("charset == null");
if (byteCount > Integer.MAX_VALUE) {
throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount);
}
if (byteCount == 0) return "";
Segment s = head;
if (s.pos + byteCount > s.limit) {
// If the string spans multiple segments, delegate to readBytes().
return new String(readByteArray(byteCount), charset);
}
String result = new String(s.data, s.pos, (int) byteCount, charset);
s.pos += byteCount;
size -= byteCount;
if (s.pos == s.limit) {
head = s.pop();
SegmentPool.recycle(s);
}
return result;
}
首先获取head所在的segment,判断读取的长度是否大于limit:
- segment范围内:直接可以读完,如果刚好读完segment,将它移出链表并放入SegmentPool回收;
- 跨segment读取:委托readByteArray处理。
@Override public void readFully(byte[] sink) throws EOFException {
int offset = 0;
while (offset < sink.length) {
int read = read(sink, offset, sink.length - offset);
if (read == -1) throw new EOFException();
offset += read;
}
}
readByteArray创建了个字节数组存放结果,交由readFully处理。既然需要跨segment,少不了循环逐段读取,readFully的循环里调用了read方法。
@Override public int read(byte[] sink, int offset, int byteCount) {
checkOffsetAndCount(sink.length, offset, byteCount);
Segment s = head;
if (s == null) return -1;
int toCopy = Math.min(byteCount, s.limit - s.pos);
System.arraycopy(s.data, s.pos, sink, offset, toCopy);
s.pos += toCopy;
size -= toCopy;
if (s.pos == s.limit) {
head = s.pop();
SegmentPool.recycle(s);
}
return toCopy;
}
read方法和上面说的readString方法类似,从segment读取的结果复制到目标数组。
okhttp和okio
okhttp连接上的流Http1Codec的输入输出用的就是okio:
final BufferedSource source;
final BufferedSink sink;
Timeout
okio对数据的读写提供超时机制,例如从InputSream获取Source(适配器,Sink/Source和OutputStream/InputSteam可以随意切换),有个入参Timeout,表示对读取数据有时间要求。具体Timeout的代码不打算细说,讲个大概。
超时的指定有两种:
- Timeouts:最大等待时间,比如从网络读取数据,通过指定最大等待时间判断网络是否可用;
- Deadlines:截止时间,在这个时间之前要完成数据的操作。
Timeout类很常规,是个同步超时类,除此之外,AsyncTimeout类继承Timeout,提供异步超时机制。在适配Socket时用的是AsyncTimeout,因为socket在等待数据时会阻塞,需要异步处理。
AsyncTimeout互相之间形成链表,根据超时时间有序排序。有个唯一守护线程WatchDog,实现超时到达马上反馈。io阻塞时线程停顿,需要通过别人告知,这就是异步超时的原理。
第一次知道异步超时处理,wonderful。
总结
okio说:“java io很好,也有不足的地方,由我来优化”。
小而美,精巧强大,缓存部分对segment的操作非常值得学习。只言片语不能尽述,RTFSC。
不care有无人看我写的笔记,反正我会了,哈哈。