node中Stream分为Readable(可读流)、Writable(可写流)、Duplex(可读写流)、Transform(读写过程中可以修改和变换数据的 Duplex 流)。
为了实现可写流,我们需要使用流模块中的Writable构造函数。 我们只需给Writable构造函数传递一些选项并创建一个对象。唯一需要的选项是write函数,该函数揭露数据块要往哪里写。
chunk通常是一个buffer,除非我们配置不同的流。
encoding是在特定情况下需要的参数,通常我们可以忽略它。
callback是在完成处理数据块后需要调用的函数。这是写数据成功与否的标志。若要发出故障信号,请用错误对象调用回调函数
下面代码用ES6对可写流进行代码简要的实现:
let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter {
constructor(path, options) {
super();
let self = this;
Object.assign(self, options); //还需设置默认值
self.path = path; //文件路经
self.isWriting = false;
self.Buffer = []; //缓冲区
self.len = null;
self.pos = self.start; //初始化写入位置
self.fd = null;
self.open();
}
open() {//首先打开文件
let self = this;
fs.open(self.path, self.flags, self.mode, (err, fd) => {
self.fd = fd;
if (err) return self.destroy(err);
self.emit('open');
});
}
destroy(err) {
fs.close(this.fd, () => {
this.emit('error', err);
});
}
write(chunk, encoding, cb) {
let self = this
, ret = null;
encoding = encoding?encoding:self.encoding; //优先使用write传入的编码方式
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
self.len += chunk.length;
ret = self.highWaterMark > self.len; //判断当前最新的缓冲区是否已达到最高水位线
if (self.isWriting) { //说明正在调用底层方法真正写入文件,先写入Buffer
self.Buffer.push({
chunk
, cb
});
} else {
self.isWriting = true;
self._write(chunk, cb, () => self.clearBuffer());
}
return ret;
}
_write(chunk, cb, clear) {
let self = this;
if (!self.fd) return self.once('open', () => {
self._write(chunk, cb, clear)
});
fs.write(self.fd, chunk, 0, chunk.length, self.pos, (err, bytesWritten) => {
if (err) {
if (self.autoClose) {
self.destroy();
self.emit('error', err);
}
}
self.len -= bytesWritten;
self.pos += bytesWritten;
cb && cb();
clear && clear();
});
}
clearBuffer() {
let self = this
, data = null;
data = self.Buffer.shift();
if (data) {
self._write(data.chunk, data.cb, () => self.clearBuffer());
} else { //此时说明缓冲区已无数据
self.isWriting = false;
self.emit('drain');
}
}
}