Node中stream的深度感知

背景

之前在开发ASP.NET的时候,根据源代码依次追踪整个程序的执行过程,发现最重要的过程是基于一个管道流的,像水管一样,依次处理管道流中的十几个事件,当时对流的认知是四个字,依次执行。那么现在做Node的开发,对于Node中的流是另四个字,那就是源源不断。本篇文章主要目的是带大家手写可读流与可写流。

简介

在Node中,请求流,响应流,文件流等都是基于stream模块封装的。简单的理解,流就是将大块的东西,分小块依次处理。就像你需要10kg的水,水管就一点点的源源不断的流出来给你。又如在程序中

fs.readFileSync('/demo.txt', {encoding:'utf8'});fs.writeFileSync('/demo.txt', data);

以上两个方法是把文件内容全部读入内存,然后再写入文件,但是如果文件过大就会出现问题了,内存容易爆掉。这里就需要用到流了,一点点的读取或者写入。

分类

  • Readable - 可读的流 (例如 fs.createReadStream()).
  • Writable - 可写的流 (例如 fs.createWriteStream()).
  • Duplex - 可读写的流 (例如 net.Socket).
  • Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).

Readable - 可读的流介绍与实现

可读流分为两种模式:flowing 和 paused,并且两种模式可以相互转换

1.在 flowing 模式下, 可读流自动从系统底层读取数据,并通过 EventEmitter 接口的事件尽快将数据提供给应用。

2.在 paused 模式下,必须显式调用 stream.read() 方法来从流中读取数据片段。

3.所有初始工作模式为 paused 的 Readable 流,可以通过下面三种途径切换到 flowing 模式:

  • 监听 'data' 事件
  • 调用 stream.resume() 方法
  • 调用 stream.pipe() 方法将数据发送到 Writable

4.可读流可以通过下面途径切换到 paused 模式:

  • 如果不存在管道目标(pipe destination),可以通过调用 stream.pause() 方法实现。
  • 如果存在管道目标,可以通过取消 'data' 事件监听,并调用 stream.unpipe() 方法移除所有管道目标来实现。

5.为了便于理解,这里分开写两种模式,下面为flowing模式基本实现

  • 定义类首先要继承自EventEmitter,因为要发射监听事件,在构造函数中依次定义各个参数,其中需要说明的是this.flowing 用于切换模式,this.buffer并非是缓存,因为流动模式是默认不走缓存的,这个buffer是读取的时候fs.read的一个参数,所有的事件监听到都会执行newListener。当该类被构造的时候就打开文件,并监听事件,如果是data,默认走流动模式开始读取。
class ReadStream extends EventEmitter{
    constructor(path,options){
        super(path,options);
        this.path = path;//写入路径
        this.flags = options.flags || 'r'; //操作修饰符
        this.mode = options.mode || 0o666; //权限
        this.autoClose = options.autoClose;//是否自动关闭
        this.highWaterMark = options.highWaterMark || 64 * 1024; //默认64k
        this.pos = this.start = options.start || 0;//起始位置
        this.end = options.end;//结束位置
        this.encoding = options.encoding;//编码
        this.flowing = null;//流动模式
        this.buffer = Buffer.alloc(this.highWaterMark);//读取的buffer 不是缓存
        this.open();
        this.on('newListener',(type,listener)=>{
            if (type == 'data') {
                this.flowing = true;
                this.read();
            }
        })
    }
  • open方法 打开传入路径的文件 如果出错并且设置自动关闭属性,直接关闭打开,发射error事件。如果成功了,发射open事件,供读取方法接收。
 open() {
        fs.open(this.path,this.flags,this.mode,(err,fd)=>{
           if(err){
               if(this.autoClose){
                   this.destroy();
                   return this.emit('error',err);
               }
           }
           this.fd = fd;
           this.emit('open');
        })
    }
  • 最重要的方法Read,一进入就需要判断文件是否已经打开了,因为文件打开是异步的一个过程,此时可能并未打开,如果没有打开就监听发射的open事件,然后在回调函数中进行read方法的调用。其中核心是每次需要读多少,这个量由传入的开始结束位置已经最高水位线决定的。最高水位线代表依次最多读取多少,默认值是64kb。那么每次读取的值 howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark;
read(){
        if(typeof this.fd != 'number'){
            return this.once('open',()=>this.read());
        }
        let howMuchToRead = this.end?Math.min(this.end - this.pos + 1,this.highWaterMark):this.highWaterMark;
        fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytes)=>{//bytes是实际读到的字节数
            if(err){
                if(this.autoClose)
                    this.destroy();
                return this.emit('error',err);
            }
            if(bytes){
                let data = this.buffer.slice(0,bytes);
                this.pos += bytes;
                data = this.encoding?data.toString(this.encoding):data;
                this.emit('data',data);
                if(this.end && this.pos > this.end){
                   return this.endFn();
                }else{
                    if(this.flowing)
                      this.read();
                }
            }else{
                return this.endFn();
            }

        })
    }
  • pipe方法实现 pipe方法就是边读取边写入,控制读写速度,当可写流的写入返回false时,暂停读取,当可写流触发drain事件后,恢复读取。
pipe(ws){
        this.on('data',data =>{
            let flag = ws.write(data);
            if (!flag) {
                this.pause();
            }
        })
        ws.on('drain',()=>{
            this.resume();
        })
    }
    pause(){
        this.flowing  = false;
    }
    resume(){
        this.flowing  = true;
        this.read();
    }
  1. 暂停模式 暂停模式不同的是需要走缓存,并且监听的是readable事件(这里我只贴出具有差异性的代码)
  • 在构造函数中需要加入this.buffers = [];(源码中为了提高效率,使用的是链表结构,这里我用数组代替),以及readable事件的监听。
 this.on('newListener', (type) => {
            if (type == 'data') {
                this.flowing = true;
                this.read();
            }
            if (type == 'readable') {
                this.read(0);
            }
        });
  • 这里read方法需要传入一个n,表示需要读取的字节数。如果判断缓存的大小,即this.length,如果this.length == 0 || this.length < this.highWaterMark ,执行_read()方法,此时执行的n为0
 let _read = () => {
            let m = this.end ? Math.min(this.end - this.pos + 1, this.highWaterMark) : this.highWaterMark;
            fs.read(this.fd, this.buffer, 0, m, this.pos, (err, bytesRead) => {
                if (err) {
                    return
                }
                let data;
                if (bytesRead > 0) {
                    data = this.buffer.slice(0, bytesRead);
                    this.pos += bytesRead;
                    this.length += bytesRead;
                    if (this.end && this.pos > this.end) {
                        if (this.needReadable) {
                            this.emit('readable');
                        }

                        this.emit('end');
                    } else {
                        this.buffers.push(data);
                        if (this.needReadable) {
                            this.emit('readable');
                            this.needReadable = false;
                        }

                    }
                } else {
                    if (this.needReadable) {
                        this.emit('readable');
                    }
                    return this.emit('end');
                }
            })
        }
  • 如果传入的n值 0 < n < this.length,走以下逻辑,即从缓存区中读取相应的字节数进行读取
  if (0 < n < this.length) {
            ret = Buffer.alloc(n);
            let b;
            let index = 0;
            while (null != (b = this.buffers.shift())) {
                for (let i = 0; i < b.length; i++) {
                    ret[index++] = b[i];
                    if (index == ret.length) {
                        this.length -= n;
                        b = b.slice(i + 1);
                        this.buffers.unshift(b);
                        break;
                    }
                }
            }
            if (this.encoding) ret = ret.toString(this.encoding);
        }

Writable - 可读的流介绍与实现

  • 构造函数跟上面差距不大,有一个this.buffer的缓存区,并且最高水位线默认为16k
  //构造函数
    constructor(path,options){
        super(path,options);
        this.path = path; //写入路径
        this.flags = options.flags || 'w';//操作修饰符
        this.mode = options.mode || 0o666;//权限
        this.start = options.start || 0;//写入的起始位置
        this.pos = this.start;//文件的写入索引
        this.encoding = options.encoding || 'utf8';//编码
        this.autoClose = options.autoClose;//自动关闭
        this.highWaterMark = options.highWaterMark || 16 * 1024; //默认最高水位线16k
        this.buffers = [];//缓存区 源码里面是链表
        this.writing = false;//标识内部正在写入数据
        this.length = 0;//标识缓存区字节的长度
        this.open();//默认一创建就打开文件
    }
  • open即打开文件是一样的,这里不再描述。最重要的write方法,该方法有一个返回值,标识缓存区的长度是否超过了最高水位线,特别注意的是如果超过了也会写入进去的,因为会放入到缓存区中,等到drain事件触发,再接着写入,具体写入是执行的_write方法,其中有一个this.writing 标识是否正在写入,如果正在写入,则放入缓存区中,在清空缓存区的时候依次取出写入,即以下的clearBuffer方法,此方法中当缓存区清空了以后触发drain事件,这里需要特殊说明一下,如果缓存区从未满过,是不会触发这个事件的。
write(chunk,encoding,callback){
        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk,this.encoding);//此方法只吸入buffer或者字符串
        this.length += chunk.length;//当前缓存区的长度
        if (this.writing) {//如果正在写入数据 则需要把数据放入缓存区里面
            this.buffers.push({
                chunk,
                encoding,
                callback
            })
        } else { //如果当前空闲 直接调用底层写入的方法进行写入 并且在写完以后 清空缓存区
            this.writing = true;
            this._write(chunk,encoding,()=>{
                callback&&callback();
                this.clearBuffer();
            })
        }

        //write方法有一个返回值 表示当前缓存区是否超过了最高水位线 即是否能继续写入
        return this.length < this.highWaterMark;
    }

    _write(chunk,encoding,callback){
        if (typeof this.fd != 'number') { //因为是异步的 文件可能在这个时候并未打开
            return this.once('open',()=>this._write(chunk, encoding, callback));
        }
        fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,bytesWritten)=>{
            if (err) {
                if (this.autoClose) {
                    this.destory();
                }
               return this.emit('error',err);
            }
            this.pos += bytesWritten;

            this.length -= bytesWritten;
            callback&&callback();
        })
    }
    clearBuffer(){
        let data = this.buffers.shift();
        if(data){
                this._write(data.chunk,data.encoding,()=>{
                    data.callback && data.callback();
                    this.clearBuffer();
                })
            }else{
                this.writing = false;
                //缓存区清空了 缓存区如果没有满过 是不会触发这个事件的
                this.emit('drain');
            }
    }

参考链接

  1. Node.js API文档
  2. 深入理解 Node Stream 内部机制
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342