Node进阶 ---- Stream(一)

1. 流的概念

  • 流是一组有序的,有起点和终点的字节数据传输手段
  • 它不关心文件的整体内容,只关注是否从文件中读到了数据,以及读到数据之后的处理
  • 流是一个抽象接口,被 Node 中的很多对象所实现。比如HTTP 服务器request和response对象都是流。

2.可读流createReadStream

实现了stream.Readable接口的对象,将对象数据读取为流数据,当监听data事件后,开始发射数据

fs.createReadStream = function(path, options) {
  return new ReadStream(path, options);
};
util.inherits(ReadStream, Readable);

2.1 创建可读流

var rs = fs.createReadStream(path,[options]);
  1. path读取文件的路径
  2. options
    • flags打开文件要做的操作,默认为'r'
    • encoding默认为null
    • start开始读取的索引位置
    • end结束读取的索引位置(包括结束位置)
    • highWaterMark读取缓存区默认的大小64kb

如果指定utf8编码highWaterMark要大于3个字节

2.2 监听data事件

流切换到流动模式,数据会被尽可能快的读出

rs.on('data', function (data) {
    console.log(data);
});

2.3 监听end事件

该事件会在读完数据后被触发

rs.on('end', function () {
    console.log('读取完成');
});

2.4 监听error事件

rs.on('error', function (err) {
    console.log(err);
});

2.5 监听open事件

rs.on('open', function () {
    console.log(err);
});

2.6 监听close事件

rs.on('close', function () {
    console.log(err);
});

2.7 设置编码

与指定{encoding:'utf8'}效果相同,设置编码

rs.setEncoding('utf8');

2.8 暂停和恢复触发data

通过pause()方法和resume()方法

rs.on('data', function (data) {
    rs.pause();
    console.log(data);
});
setTimeout(function () {
    rs.resume();
},2000);

3.可写流createWriteStream

实现了stream.Writable接口的对象来将流数据写入到对象中

fs.createWriteStream = function(path, options) {
  return new WriteStream(path, options);
};

util.inherits(WriteStream, Writable);

3.1 创建可写流

var ws = fs.createWriteStream(path,[options]);
  1. path写入的文件路径
  2. options
    • flags打开文件要做的操作,默认为'w'
    • encoding默认为utf8
    • highWaterMark写入缓存区的默认大小16kb

3.2 write方法

ws.write(chunk,[encoding],[callback]);
  1. chunk写入的数据buffer/string
  2. encoding编码格式chunk为字符串时有用,可选
  3. callback 写入成功后的回调

返回值为布尔值,系统缓存区满时为false,未满时为true

3.3 end方法

ws.end(chunk,[encoding],[callback]);

表明接下来没有数据要被写入 Writable 通过传入可选的 chunk 和 encoding 参数,可以在关闭流之前再写入一段数据 如果传入了可选的 callback 函数,它将作为 'finish' 事件的回调函数

3.4 drain方法

  • 当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false。 一旦所有当前所有缓存的数据块都排空了(被操作系统接受来进行输出), 那么 'drain' 事件就会被触发

  • 建议, 一旦 write() 返回 false, 在 'drain' 事件触发前, 不能写入任何数据块

    let fs = require('fs');
    let ws = fs.createWriteStream('./2.txt',{
      flags:'w',
      encoding:'utf8',
      highWaterMark:3
    });
    let i = 10;
    function write(){
     let  flag = true;
     while(i&&flag){
          flag = ws.write("1");
          i--;
         console.log(flag);
     }
    }
    write();
    ws.on('drain',()=>{
      console.log("drain");
      write();
    });
    

3.5 finish方法

在调用了 stream.end() 方法,且缓冲区数据都已经传给底层系统之后, 'finish' 事件将被触发。

var writer = fs.createWriteStream('./2.txt');
for (let i = 0; i < 100; i++) {
  writer.write(`hello, ${i}!\n`);
}
writer.end('结束\n');
writer.on('finish', () => {
  console.error('所有的写入已经完成!');
});

4.pipe方法

4.1 pipe方法的原理

var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
    var flag = ws.write(data);
    if(!flag)
    rs.pause();
});
ws.on('drain', function () {
    rs.resume();
});
rs.on('end', function () {
    ws.end();
});

4.2 pipe用法

readStream.pipe(writeStream);
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);

将数据的滞留量限制到一个可接受的水平,以使得不同速度的来源和目标不会淹没可用内存。

4.3 unpipe用法

  • readable.unpipe()方法将之前通过stream.pipe()方法绑定的流分离

  • 如果 destination 没有传入, 则所有绑定的流都会被分离.

    let fs = require('fs');
    var from = fs.createReadStream('./1.txt');
    var to = fs.createWriteStream('./2.txt');
    from.pipe(to);
    setTimeout(() => {
    console.log('关闭向2.txt的写入');
    from.unpipe(writable);
    console.log('手工关闭文件流');
    to.end();
    }, 1000);
    

4.4 cork

调用 writable.cork() 方法将强制所有写入数据都存放到内存中的缓冲区里。 直到调用 stream.uncork() 或 stream.end() 方法时,缓冲区里的数据才会被输出。

4.5 uncork

writable.uncork()将输出在stream.cork()方法被调用之后缓冲在内存中的所有数据。

stream.cork();
stream.write('1');
stream.write('2');
process.nextTick(() => stream.uncork());

5. 简单实现

5.1 可读流的简单实现 [#](#t235.1 可读流的简单实现)

let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
    flags: 'r',
    encoding: 'utf8',
    start: 3,
    end: 7,
    highWaterMark: 3
});
rs.on('open', function () {
    console.log("open");
});
rs.on('data', function (data) {
    console.log(data);
});
rs.on('end', function () {
    console.log("end");
});
rs.on('close', function () {
    console.log("close");
});
/**
 open
 456
 789
 end
 close
 **/
let fs = require('fs');
let EventEmitter = require('events');

class WriteStream extends EventEmitter {
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.fd = options.fd;
        this.flags = options.flags || 'r';
        this.encoding = options.encoding;
        this.start = options.start || 0;
        this.pos = this.start;
        this.end = options.end;
        this.flowing = false;
        this.autoClose = true;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.buffer = Buffer.alloc(this.highWaterMark);
        this.length = 0;
        this.on('newListener', (type, listener) => {
            if (type == 'data') {
                this.flowing = true;
                this.read();
            }
        });
        this.on('end', () => {
            if (this.autoClose) {
                this.destroy();
            }
        });
        this.open();
    }

    read() {
        if (typeof this.fd != 'number') {
            return this.once('open', () => this.read());
        }
        let n = this.end ? Math.min(this.end - this.pos, this.highWaterMark) : this.highWaterMark;
        fs.read(this.fd,this.buffer,0,n,this.pos,(err,bytesRead)=>{
            if(err){
             return;
            }
            if(bytesRead){
                let data = this.buffer.slice(0,bytesRead);
                data = this.encoding?data.toString(this.encoding):data;
                this.emit('data',data);
                this.pos += bytesRead;
                if(this.end && this.pos > this.end){
                  return this.emit('end');
                }
                if(this.flowing)
                    this.read();
            }else{
                this.emit('end');
            }
        })
    }

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) return this.emit('error', err);
            this.fd = fd;
            this.emit('open', fd);
        })
    }
  
    end() {
          if (this.autoClose) {
              this.destroy();
          }
      }

      destroy() {
          fs.close(this.fd, () => {
              this.emit('close');
          })
      }

  }

module.exports = WriteStream;

5.2 可写流的简单实现

let fs = require('fs');
 let FileWriteStream = require('./FileWriteStream');
 let ws = FileWriteStream('./2.txt',{
     flags:'w',
     encoding:'utf8',
     highWaterMark:3
 });
 let i = 10;
 function write(){
     let  flag = true;
     while(i&&flag){
         flag = ws.write("1",'utf8',(function(i){
             return function(){
                 console.log(i);
             }
         })(i));
         i--;
         console.log(flag);
     }
 }
 write();
 ws.on('drain',()=>{
     console.log("drain");
     write();
 });
 /**
  10
  9
  8
  drain
  7
  6
  5
  drain
  4
  3
  2
  drain
  1
  **/
let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends  EventEmitter{
    constructor(path, options) {
        super(path, options);
        this.path = path;
        this.fd = options.fd;
        this.flags = options.flags || 'w';
        this.mode = options.mode || 0o666;
        this.encoding = options.encoding;
        this.start = options.start || 0;
        this.pos = this.start;
        this.writing = false;
        this.autoClose = true;
        this.highWaterMark = options.highWaterMark || 16 * 1024;
        this.buffers = [];
        this.length = 0;
        this.open();
    }

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) return this.emit('error', err);
            this.fd = fd;
            this.emit('open', fd);
        })
    }

    write(chunk, encoding, cb) {
        if (typeof encoding == 'function') {
            cb = encoding;
            encoding = null;
        }

        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, this.encoding || 'utf8');
        let len = chunk.length;
        this.length += len;
        let ret = this.length < this.highWaterMark;
        if (this.writing) {
            this.buffers.push({
                chunk,
                encoding,
                cb,
            });
        } else {
            this.writing = true;
            this._write(chunk, encoding,this.clearBuffer.bind(this));
        }
        return ret;
    }

    _write(chunk, encoding, cb) {
        if (typeof this.fd != 'number') {
            return this.once('open', () => this._write(chunk, encoding, cb));
        }
        fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, written) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                }
                return this.emit('error', err);
            }
            this.length -= written;
            this.pos += written;
            cb && cb();
        });
    }

    clearBuffer() {
        let data = this.buffers.shift();
        if (data) {
            this._write(data.chunk, data.encoding, this.clearBuffer.bind(this))
        } else {
            this.writing = false;
            this.emit('drain');
        }
    }

    end() {
        if (this.autoClose) {
            this.emit('end');
            this.destroy();
        }
    }

    destroy() {
        fs.close(this.fd, () => {
            this.emit('close');
        })
    }

}

module.exports = WriteStream;

5.3 pipe [#](#t255.3 pipe)

let fs = require('fs');
let ReadStream = require('./ReadStream');
let rs = ReadStream('./1.txt', {
    flags: 'r',
    encoding: 'utf8',
    highWaterMark: 3
});
let FileWriteStream = require('./WriteStream');
let ws = FileWriteStream('./2.txt',{
    flags:'w',
    encoding:'utf8',
    highWaterMark:3
});
rs.pipe(ws);
ReadStream.prototype.pipe = function (dest) {
    this.on('data', (data)=>{
        let flag = dest.write(data);
        if(!flag){
            this.pause();
        }
    });
    dest.on('drain', ()=>{
        this.resume();
    });
    this.on('end', ()=>{
        dest.end();
    });
}
ReadStream.prototype.pause = function(){
    this.flowing = false;

}
ReadStream.prototype.resume = function(){
    this.flowing = true;
    this.read();
}

5.4 暂停模式

let fs =require('fs');
let ReadStream2 = require('./ReadStream2');
let rs = new ReadStream2('./1.txt',{
    start:3,
    end:8,
    encoding:'utf8',
    highWaterMark:3
});
rs.on('readable',function () {
    console.log('readable');
    console.log('rs.buffer.length',rs.length);
    let d = rs.read(1);
    console.log(d);
    console.log('rs.buffer.length',rs.length);

    setTimeout(()=>{
        console.log('rs.buffer.length',rs.length);
    },500)
});

    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                if (this.autoClose) {
                    this.destroy();
                    return this.emit('error', err);
                }
            }
            this.fd = fd;
            this.emit('open');
        });
    }
    
    read(n) {
        if (typeof this.fd != 'number') {
            return this.once('open', () => this.read());
        }
        n = parseInt(n,10);
        if(n != n){
            n = this.length;
        }
        if(this.length ==0)
            this.needReadable = true;
        let ret;
        if (0<n < this.length) {
            ret = Buffer.alloc(n);
            let b ;
            let index = 0;
            while(null != (b = this.buffers.shift())){
                for(let i=0;i<b.length;i++){
                    ret[index++] = b[i];
                    if(index == ret.length){
                        this.length -= n;
                        b = b.slice(i+1);
                        this.buffers.unshift(b);
                        break;
                    }
                }
            }
            if (this.encoding) ret = ret.toString(this.encoding);
        }
    
        let _read = () => {
            let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                if (err) {
                    return
                }
                let data;
                if (bytesRead > 0) {
                    data = this.buffer.slice(0, bytesRead);
                    this.pos += bytesRead;
                    this.length += bytesRead;
                    if (this.end && this.pos > this.end) {
                        if(this.needReadable){
                            this.emit('readable');
                        }
    
                        this.emit('end');
                    } else {
                        this.buffers.push(data);
                        if(this.needReadable){
                            this.emit('readable');
                            this.needReadable = false;
                        }
    
                    }
                } else {
                    if(this.needReadable) {
                        this.emit('readable');
                    }
                    return this.emit('end');
                }
            })
        }
        if (this.length == 0 || (this.length < this.highWaterMark)) {
            _read(0);
        }
        return ret;
    }
    
    destroy() {
        fs.close(this.fd, (err) => {
            this.emit('close');
        });
    }
    
    pause() {
        this.flowing = false;
    }
    
    resume() {
        this.flowing = true;
        this.read();
    }
    
    pipe(dest) {
        this.on('data', (data) => {
            let flag = dest.write(data);
            if (!flag) this.pause();
        });
        dest.on('drain', () => {
            this.resume();
        });
        this.on('end', () => {
            dest.end();
        });
    }
    

}

module.exports = ReadStream; /**

  • if (n !== 0) state.emittedReadable = false; 只要要读的字节数不是0就需要触发readable事件 如果传入的NaN,则将n赋为缓区的长度,第一次就是0

    缓存区为0就开始读吧 如果n等于0就返回null,state.needReadable = true; 如果缓存区为0,是 state.needReadable = true; 需要触发readable

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343

推荐阅读更多精彩内容

  • stream 流是一个抽象接口,在 Node 里被不同的对象实现。例如 request to an HTTP se...
    明明三省阅读 3,392评论 1 10
  • https://nodejs.org/api/documentation.html 工具模块 Assert 测试 ...
    KeKeMars阅读 6,305评论 0 6
  • 流是Node中最重要的组件和模式之一。在社区里有一句格言说:让一切事务流动起来。这已经足够来描述在Node中流...
    宫若石阅读 539评论 0 0
  • 昨晚和以前的同事吃饭,随便聊了聊,他提起今年报了省考但是答不完题,加上岗位大热,所以名落孙山。同事开玩笑般的要我简...
    王滚滚打小怪兽阅读 143评论 0 0
  • 绽放 一·女,那么漂亮像春天里的花朵我采不到二·关键是那个女子叫人不能忘怀我想问问:是你吗三·如果真的是你我宁愿给...
    屈冰阅读 539评论 35 65