引言
前面两篇文章分析了Okio的核心类和超时机制,了解了底层Okio的工作机制,今天我们自顶向下梳理Okio的IO操作流程。
读写文件Demo
下面是典型的读写文件代码,当然Okio可以做的远远不止这些(序列化对象、编解码图片等等),但是流程都是类似的,所以我们从这个File读写案例开始阅读源码:
/**
* 测试写文件
*/
private void testWirteFile() {
boolean isCreate = false;
Sink sink;
BufferedSink bufferedSink = null;
String path = Environment.getExternalStorageDirectory().getPath();
File file = new File(path, fileName);
try {
//创建文件
if (!file.exists()) {
isCreate = file.createNewFile();
} else {
isCreate = true;
}
if (isCreate) {
sink = Okio.sink(file);//构建输出流
bufferedSink = Okio.buffer(sink);//包装sink
bufferedSink.writeInt(100);//写入整形
bufferedSink.writeUtf8("aaa12352345234523452233as\r\ndfasdasdfas我是汉字字串");//写入UTF-8字串
bufferedSink.flush();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (null != bufferedSink) {
bufferedSink.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 测试读文件
*/
public void testReadFile() {
Source source;
BufferedSource bufferedSource = null;
try {
String path = Environment.getExternalStorageDirectory().getPath();
File file = new File(path, fileName);
source = Okio.source(file);//构造输入流
bufferedSource = Okio.buffer(source);//包装
int c = bufferedSource.readInt();//读int
//按行读line
while (true){
String read = bufferedSource.readUtf8Line();
if(read == null){
break;
}
Log.e("Okio", read);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
读流程分析
Okio的source方法
/** Returns a source that reads from {@code file}. */
public static Source source(File file) throws FileNotFoundException {
if (file == null) throw new IllegalArgumentException("file == null");
return source(new FileInputStream(file));
}
/** Returns a source that reads from {@code in}. */
public static Source source(InputStream in) {
return source(in, new Timeout());
}
//最终会调用InputStream输入流的包装方法
private static Source source(final InputStream in, final Timeout timeout) {
if (in == null) throw new IllegalArgumentException("in == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
//此处返回的Source才是正真与InputStream打交道的输入流
return new Source() {
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (byteCount == 0) return 0;
try {
timeout.throwIfReached();
//下面的代码是从in中读取(最多)byteCount字节写到缓冲区尾部
Segment tail = sink.writableSegment(1);//找buffer中的尾部片段
int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);//最多不能呢超过tail的剩余容量
//in中读取字节到缓存
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
if (bytesRead == -1) return -1;
//更新tail片段的可写位置和buffer大小
tail.limit += bytesRead;
sink.size += bytesRead;
return bytesRead;
} catch (AssertionError e) {
if (isAndroidGetsocknameError(e)) throw new IOException(e);
throw e;
}
}
@Override public void close() throws IOException {
//输入流关闭
in.close();
}
@Override public Timeout timeout() {
return timeout;
}
@Override public String toString() {
return "source(" + in + ")";
}
};
}
这个系列的方法返回一个包装了InputStream的Source对象,它是最终通过InputStream实现读操作的对象。这个对象会被Okio的buffer方法包装。
Okio的buffer方法
public static BufferedSource buffer(Source source) {
return new RealBufferedSource(source);
}
返回RealBufferedSource对象,这个前面的文章我们讲到过它是BufferedSource的实现类,里面有Buffer和被包装的Source对象,读写操作都是通过这两个类实现,其中Source对象负责从输入流读入数据,Buffer负责缓存这些数据:
final class RealBufferedSource implements BufferedSource {
public final Buffer buffer = new Buffer();
public final Source source;//和输入流IO打交道的Source
boolean closed;
....
}
接下在的读写操作都是RealBufferedSource对象实现的。
RealBufferedSource的读操作
1.readInt方法:
@Override
public int readInt() throws IOException {
//这里source先从输入流读取4字节数据到缓冲区
require(4);
//从缓冲区读取4字节构造整形返回
return buffer.readInt();
}
先调用require方法:
@Override
public void require(long byteCount) throws IOException {
if (!request(byteCount)) throw new EOFException();
}
@Override
public boolean request(long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (closed) throw new IllegalStateException("closed");
//循环读取InputStream数据到缓冲区,直到缓冲区size到达byteCount
while (buffer.size < byteCount) {
//InputStream没有数据了
if (source.read(buffer, Segment.SIZE) == -1) return false;
}
return true;
}
前面我们分析了Source的read仅仅是从输入流中读字节输出到Buffer的一个片段当中去,这里循环从输入流读字节到缓冲区,知道缓冲区size达到byteCount。如果未读满,输入流已经没有数据返回false,读满则返回true。该方法执行后,缓冲区有了数据,然后执行Buffer的readInt方法,这个方法在第一篇文章已经详细分析过,这里不再重复说明了。
下面再看看读取字串的方法:
@Override
public @Nullable String readUtf8Line() throws IOException {
//找换行符
long newline = indexOf((byte) '\n');
//如果没有直接读取整个缓冲区
if (newline == -1) {
return buffer.size != 0 ? readUtf8(buffer.size) : null;
}
//缓冲区读取一行
return buffer.readUtf8Line(newline);
}
先看readUtf8方法:
@Override
public String readUtf8(long byteCount) throws IOException {
require(byteCount);//从流中读数据到buffer
//从buffe读取字串
return buffer.readUtf8(byteCount);
}
再来看看Buffer的readUtf8方法:
@Override
public String readUtf8(long byteCount) throws EOFException {
return readString(byteCount, Util.UTF_8);
}
@Override
//读取byteCount字节并构造字串
public String readString(long byteCount, Charset charset) throws EOFException {
checkOffsetAndCount(size, 0, byteCount);
if (charset == null) throw new IllegalArgumentException("charset == null");
if (byteCount > Integer.MAX_VALUE) {
throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount);
}
if (byteCount == 0) return "";
//从片段链表头开始读
Segment s = head;
//如果一个片段的有效数据size不够byteCount,则交给readByteArray跨片段读字节数组
if (s.pos + byteCount > s.limit) {
// If the string spans multiple segments, delegate to readBytes().
return new String(readByteArray(byteCount), charset);
}
//一个片段数据大小够,则读取byteCount,构造字串
String result = new String(s.data, s.pos, (int) byteCount, charset);
//更新片段的可读位置,减少缓冲区有效数据大小
s.pos += byteCount;
size -= byteCount;
//读完片段没有数据了,则回收
if (s.pos == s.limit) {
head = s.pop();//弹出头结点,head指向下一片段
SegmentPool.recycle(s);//弹出的片段加入回收池
}
return result;
}
readUtf8会调用readString方法,它的流程如下:
1.从buffer中的片段头开始读,如果当前片段的数据size不够byteCount则通过readByteArray方法跨片段读取字节数组;
2.如果当前片段的数据size足够,则从本片段中读取byteCount字节并构造String;
3.读完数据,更新片段的可读位置和Buffer大小,并检测片段是是否被读完,是则回收。
下面我们看看可以跨片段读数据的readByteArray方法:
//可跨片段执行的读字节数组方法,最终调用readFully方法
@Override
public byte[] readByteArray(long byteCount) throws EOFException {
checkOffsetAndCount(size, 0, byteCount);
if (byteCount > Integer.MAX_VALUE) {
throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount);
}
//存放读出数据的字节数组
byte[] result = new byte[(int) byteCount];
readFully(result);
return result;
}
//跨片段读满字节数组操作
@Override
public void readFully(byte[] sink) throws EOFException {
int offset = 0;
while (offset < sink.length) {
//循环读取,因为缓冲区分块的,所以一次读取的数据size<目标大小,所以需要多次读取
int read = read(sink, offset, sink.length - offset);
if (read == -1) throw new EOFException();
offset += read;
}
}
readFully方法又会调用read方法,这里才是最终读干货的地方:
//单次读取数据到sink,offset为读的起始位置,byteCount为目标读取大小
@Override
public int read(byte[] sink, int offset, int byteCount) {
checkOffsetAndCount(sink.length, offset, byteCount);
//从头片段开始读
Segment s = head;
if (s == null) return -1;//缓冲区为空返回
//byteCount为目标size,s.limit - s.pos为当前片段的size,取较小值
int toCopy = Math.min(byteCount, s.limit - s.pos);
//片段中的数据copy到sink中
System.arraycopy(s.data, s.pos, sink, offset, toCopy);
//更新片段的可读位置和buffer的size
s.pos += toCopy;
size -= toCopy;
//片段数据被读完回收
if (s.pos == s.limit) {
head = s.pop();
SegmentPool.recycle(s);
}
//返回实际读到的字节个数
return toCopy;
}
相信大家只要熟悉Buffer的源码,这些代码都很好理解。read方法是单次从片段读取,每读完一个片段则回收它,readFully循环调用read方法实现跨片段操作。readByteArray方法调用readFully得到字节数组。
Okio的写操作
关于写操作和读操作是对称的,读的数据流向为
InputSream(写到Buffer尾部)-Buffer-调用层(读Buffer头部),而写的数据流向为调用层(写到Buffer尾部)-Buffer-OutputStream(读端Buffer头部)。有了前面的读的基础,这里我们简单过一下写操作即可。
首先是和OutputStream打交道的sink方法:
private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Sink() {
//单次写操作
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached();
//和输入流相反,out相对buffer而言是读端,所以从head开始读数据
Segment head = source.head;
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
//缓冲区数据写入输出流
out.write(head.data, head.pos, toCopy);
head.pos += toCopy;
byteCount -= toCopy;
source.size -= toCopy;
if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}
@Override public void flush() throws IOException {
out.flush();
}
@Override public void close() throws IOException {
out.close();
}
@Override public Timeout timeout() {
return timeout;
}
@Override public String toString() {
return "sink(" + out + ")";
}
};
}
然后在看看RealBufferedSink的writeInt方法:
@Override
public BufferedSink writeInt(int i) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.writeInt(i);
return emitCompleteSegments();
}
@Override
public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.completeSegmentByteCount();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
emitCompleteSegments方法执行sink.write操作将缓冲数据写入输出流, buffer.completeSegmentByteCount方法是计算当前Buffer已经写入的数据大小,最后一句 sink.write(buffer, byteCount);将缓存区数据全部写入输出流。
其他方法中比较重要的是写utf8字串方法,它将串中每个字符取出进行utf-8编码。UTF-8是一种变长字节编码方式,对于某一个字符的UTF-8编码,如果只有一个字节则其最高二进制位为0;如果是多字节,其第一个字节从最高位开始,连续的二进制位值为1的个数决定了其编码的位数,其余各字节均以10开头。UTF-8最多可用到6个字节。 编码规则如下表:
1字节 0xxxxxxx
2字节 110xxxxx 10xxxxxx
3字节 1110xxxx 10xxxxxx 10xxxxxx
4字节 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx
5字节 111110xx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx
6字节 1111110x 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx
具体代码如下
@Override public Buffer writeUtf8(String string, int beginIndex, int endIndex) {
if (string == null) throw new IllegalArgumentException("string == null");
if (beginIndex < 0) throw new IllegalArgumentException("beginIndex < 0: " + beginIndex);
if (endIndex < beginIndex) {
throw new IllegalArgumentException("endIndex < beginIndex: " + endIndex + " < " + beginIndex);
}
if (endIndex > string.length()) {
throw new IllegalArgumentException(
"endIndex > string.length: " + endIndex + " > " + string.length());
}
// Transcode a UTF-16 Java String to UTF-8 bytes.
for (int i = beginIndex; i < endIndex;) {
int c = string.charAt(i);
//char字符utf-8编码
if (c < 0x80) {//ascii码,一字节编码
Segment tail = writableSegment(1);
byte[] data = tail.data;
int segmentOffset = tail.limit - i;
int runLimit = Math.min(endIndex, Segment.SIZE - segmentOffset);
// Emit a 7-bit character with 1 byte.
data[segmentOffset + i++] = (byte) c; // 0xxxxxxx
// Fast-path contiguous runs of ASCII characters. This is ugly, but yields a ~4x performance
// improvement over independent calls to writeByte().
//继续往前连续读ascii码
while (i < runLimit) {
c = string.charAt(i);
if (c >= 0x80) break;//不是ascii码中止连续读
data[segmentOffset + i++] = (byte) c; // 0xxxxxxx
}
//连续读了多少字节
int runSize = i + segmentOffset - tail.limit; // Equivalent to i - (previous i).
//更新可写位置和缓冲区size
tail.limit += runSize;
size += runSize;
} else if (c < 0x800) {//编码结果是两字节
// Emit a 11-bit character with 2 bytes.
writeByte(c >> 6 | 0xc0); // 110xxxxx
writeByte(c & 0x3f | 0x80); // 10xxxxxx
i++;
} else if (c < 0xd800 || c > 0xdfff) {//编码结果是三字节
// Emit a 16-bit character with 3 bytes.
writeByte(c >> 12 | 0xe0); // 1110xxxx
writeByte(c >> 6 & 0x3f | 0x80); // 10xxxxxx
writeByte(c & 0x3f | 0x80); // 10xxxxxx
i++;
} else {//四字节分为高低两部分写,低部分字符的索引为i+1
// c is a surrogate. Make sure it is a high surrogate & that its successor is a low
// surrogate. If not, the UTF-16 is invalid, in which case we emit a replacement character.
int low = i + 1 < endIndex ? string.charAt(i + 1) : 0;
//无法编码
if (c > 0xdbff || low < 0xdc00 || low > 0xdfff) {
writeByte('?');
i++;
continue;
}
// UTF-16 high surrogate: 110110xxxxxxxxxx (10 bits)
// UTF-16 low surrogate: 110111yyyyyyyyyy (10 bits)
// Unicode code point: 00010000000000000000 + xxxxxxxxxxyyyyyyyyyy (21 bits)
//计算Unicode
int codePoint = 0x010000 + ((c & ~0xd800) << 10 | low & ~0xdc00);
//写入四字节
// Emit a 21-bit character with 4 bytes.
writeByte(codePoint >> 18 | 0xf0); // 11110xxx
writeByte(codePoint >> 12 & 0x3f | 0x80); // 10xxxxxx
writeByte(codePoint >> 6 & 0x3f | 0x80); // 10xxyyyy
writeByte(codePoint & 0x3f | 0x80); // 10yyyyyy
i += 2;
}
}
return this;
}
总结
截止目前,Okio的核心源码大部分分析完毕,我们也了解了它的工作原理,这里以我个人浅显的理解Okio的设计思想:
1.传统的java io操作类需要大量的装饰类,使用比较笨重,IO流类族群非常庞大,Okio只有两大类字节流Sink/Source和对应的缓冲字节流(BufferedSink/BufferedSource),他们各自的实现类的读写功能都委托给Buffer类,Buffer类是集大成者,可读可写。使用Okio的时候只需要两步:构建Source/Sink对象,然后缓冲包装一下就ok了,至于客户端是读写的是什么,Okio都通过Buffer进行了封装,功能齐全;
2.传统的java io操作的读写是通过readByte为单位读写的,如果需要缓冲区,需要另外包装,Okio对字节流统一强制加上Buffer,而Buffer是以块为单位(Segment)拷贝字节数组的,效率高。
3.Buffer类底层的数据结构是双向循环链表+定长数组,采用折中方案保证读数据和插入、删除、修改节点的效率;
4.Buffer类在一个片段数据被读完以后,不是简单粗暴的删除,而是将这个空片段放入SegmentPool以供复用,减少GC频次。
- Segment提供数据共享和压缩功能:数据共享通过片段的分裂功能实现,数据拷贝为浅拷贝,压缩机制合并片段,减少cpu和内存消耗;
6.对IO操作提供超时机制,针对网络IO任务可异步检测Socket超时;
7.不可变类ByteString封装了byte[]数据和String,这样能够让这个类在Byte和String转换上基本没有开销,同样的也需要保存两份引用,这是明显的空间换时间的方式。此外还提供大量的编解码方法,如base64\md5等等,功能强大。