fs模块就是基于stream的
可读流
// 官方用法
let rs = fs.createReadStream('./temp/1.txt', { // 官方的
flags: 'r',
encoding: null,
mode: 0o666,
autoClose: true,
start: 0, // 读的起点
highWaterMark: 2, // 每读一下的大小
end: 4 // 读的终点;包前又包后,读5个
})
rs.on('open', fd => {
console.info('触发open', fd)
})
let temp = setInterval(() => { clearInterval(temp); rs.resume(); }, 1000)
rs.on('data', data => {
console.info('data:', data.toString())
rs.pause()
})
rs.on('end', () => {
console.info('触发end')
})
rs.on('close', () => {
console.info('触发close')
})
自己实现一个readstream
- 有on方法,基于events事件
- 流一创建,马上触发open事件(马上打开文件)
- 订阅了data事件(且文件已打开)之后,再开始读操作。基于newListener
- 每次读出数据后,触发data事件
- 读取位置到达end(如果有的话)后,触发end事件
- 读完(到达end或文件结尾)后,如果autoClose==true,触发close事件
- 有pause事件,暂停读取
- 有resume事件,恢复读取
let events = require('events')
class myReadStream extends events {
constructor (dir, option) {
super()
this.flags = option.flags || 'r'
this.encoding = option.encoding
this.mode = option.mode || 0o666
this.autoClose = option.autoClose || true
this.start = option.start || 0
this.end = option.end || Infinity
this.highWaterMark = option.highWaterMark || 64 * 1024 // 默认16k
this.fd = option.fd
this.read_state = 0 // 1已打开 2已on-data 3可读 4暂停 5结束
this.position = this.start
if (!this.fd) {
this.open(dir)
} else {
this.read_state = 1
}
this.on('newListener', type => {
if (type == 'data') {
this.read_state += 2
this.read()
}
})
}
open (dir) {
fs.open(dir, this.flags, (err, fd) => {
if (err) return new Error('文件不存在')
this.fd = fd
this.emit('open', fd)
this.read_state += 1
this.read()
})
}
close () {
this.read_state = 5
this.emit('end')
if (this.autoClose) {
fs.close(this.fd, () => this.emit('close'))
}
}
read () {
if (this.read_state != 3) return
let buf = Buffer.alloc(this.highWaterMark)
let how_mach_to_read = Math.min(this.highWaterMark, this.end - this.position + 1)
fs.read(this.fd, buf, 0, how_mach_to_read, this.position, (err, bytesRead) => {
if (err) return new Error(err)
this.push(buf, bytesRead)
this.position += bytesRead
if (bytesRead == this.highWaterMark) {
this.read()
} else {
this.close()
}
// console.info(bytesRead, buf.toString('utf8'))
})
}
push (buf, bytesRead) {
if (!bytesRead) return
if (bytesRead != this.highWaterMark){
buf = buf.subarray(0, bytesRead)
}
this.emit('data', this.encoding ? buf.toString(this.encoding): buf)
}
pause () {
if (this.read_state == 3) this.read_state = 4
}
resume () {
console.log('resume', this.read_state)
if (this.read_state == 4) {
this.read_state = 3
this.read()
}
}
}
// 自己的
let rs = new myReadStream('./temp/1.txt', {})
可写流
let ws = fs.createWriteStream('./temp/1.txt', { // 官方的
flags: 'w',
encoding: null,
mode: 0o666,
autoClose: true,
start: 0,
highWaterMark: 2 // 所有write的累加
})
ws.on('open', fd => {
console.info('触发open', fd)
})
ws.on('drain', data => {
console.info('drain:', data)
})
ws.on('finish', () => {
console.info('触发finish')
})
ws.on('close', () => {
console.info('触发close')
})
// 返回值:写入字节数是否在highWaterMark内
let isunfull = ws.write('111', err => console.log('done 111'))
console.log(444444,isunfull)
isunfull = ws.write('222', err => console.log('done 222'))
console.log(444444,isunfull)
isunfull = ws.write('333', err => console.log('done 333'))
console.log(444444,isunfull)
// ws.end('结束') // 调用了end就不执行drain了
自己实现一个writestream
- 有on方法,基于events事件
- 流一创建,马上触发open事件(马上打开文件)
- write方法入参:内容、编码、回调
- write方法出参:(所有的write累加的)
调用
字节数 < highWaterMark - 当
写入
字节数 > highWaterMark,触发drain事件 - 有end方法,被调用后不再响应write方法,
所有内容写入文件后,触发finish事件
如果autoClose==true,触发close事件
let events = require('events')
class myWriteStream extends events {
constructor (dir, option) {
super()
this.flags = option.flags || 'w'
this.encoding = option.encoding || 'utf8'
this.mode = option.mode || 0o666
this.autoClose = option.autoClose || true
this.start = option.start || 0
this.highWaterMark = option.highWaterMark || 64 * 1024 // 默认16k
this.fd = option.fd
this._cache = [] // 等待写入的数据
this._cache_buf_length = 0 // 等待写入的字节数
this._isend = false // 是否已经调用end
this.position = this.start
if (!this.fd) {
this.open(dir)
}
}
open (dir) {
fs.open(dir, this.flags, (err, fd) => {
if (err) return new Error('文件不存在')
this.fd = fd
this.emit('open', fd)
})
}
finish () {
this.emit('finish')
if (this.autoClose) {
fs.close(this.fd, () => this.emit('close'))
}
}
write (chunk, encoding = 'utf8', callback = ()=>{}) {
if (this._isend) return
if (!Buffer.isBuffer(chunk)) chunk = Buffer.from(chunk)
this._cache.push({ chunk, callback })
if (this._cache.length == 1) {
this._write()
}
this._cache_buf_length += chunk.length
return this._cache_buf_length < this.highWaterMark
}
_write () {
if (!this.fd) {
return this.once('open', () => this._write() )
}
let {chunk, callback, isend} = this._cache.shift()
fs.write(this.fd, chunk, 0, chunk.length, this.position, (err, written) => {
if (err) return new Error(err)
callback()
this.position += chunk.length
if (this.position >= this.highWaterMark && this.highWaterMark > 0 && !this._isend) {
this.emit('drain')
this.highWaterMark = 0
}
if (this._cache.length) {
this._write()
} else if (this._isend) {
this.finish()
}
})
}
end (...args) {
this.write(...args)
this._isend = true
}
}
// 自己的
let ws = new myWriteStream('./temp/1.txt', {})
流的pipe 管道
// 读取1.txt写入2.txt
let rs = fs.createReadStream('./temp/1.txt', {})
let ws = fs.createWriteStream('./temp/2.txt', {})
rs.pipe(ws)
自己来实现,在myReadStream类中添加pipe方法
pipe (ws) {
this.on('data', data => { ws.write(data) })
}
官方createReadStream基于Readable 类;createWriteStream基于Writeable 类
双工流Duplex、转化流Transform与pipe的应用
// 读取1.txt并加密写入2.txt
let rs = fs.createReadStream('./temp/1.txt', {})
let ws = fs.createWriteStream('./temp/2.txt', {})
let crypto = require('crypto')
rs.pipe(crypto.createHash('md5')).pipe(ws)
.
crypto包
crypto.createHash('md5').update('123456').digest('base64')
// update可以直接加密buffer