Nodejs可以做什么
- 轻量级,高性能的Web服务
- 前后端JavaScript同构开发
- 便捷高效的前端工程化
Natives modules
- 当前层内容由JS实现
- 提供应用程序可直接调用库,例如 fs,path,http
- JS语言无法直接操作底层硬件设置
底层 - V8:执行JS代码,提供桥梁接口
- Libuv:事件循环,事件队列,异步IO
- 第三方模块:zlib,http,c-ares等
Nodejs更适用于IO密集型高并发请求
Nodejs异步IO
- 阻塞IO
-
非阻塞IO
- IO是应用程序的瓶颈所在
- 异步IO提高性能无需原地等待结果返回
- IO操作属于操作系统级别,平台都有对应实现
- Nodejs单线程配合事件驱动架构及libuv实现了异步IO
Nodejs单线程(主线程是单线程)
使用JS实现高效可伸缩的高性能Web服务
- 单线程机制配合异步非阻塞IO配合事件回调通知实现高并发请求
- Nodejs单线程指运行js代码主线程是单线程(v8引擎中专门执行js代码为单线程),在libuv库中存在一个存放多个线程的线程池。
- Nodejs单线程机制使得nodejs不适合处理cpu密集型的任务,比如代码中一些密集计算会阻塞后面代码。
Nodejs应用场景
-
IO密集型高并发请求
- 操作数据库提供API服务
- 实时聊天应用程序
Nodejs实现Api服务
- npm init -y
初始化项目 - npm install -g typescript
安装ts(npm view typescript version查看安装版本) - tsc --init
生成tsconfig.json配置文件 - npm i ts-node -D
安装node运行ts文件工具 - npm i express
安装express - npm i @types/express -D
expres包ts类型声明
//server.ts
import express from 'express'
import { DataStore } from './data'
const app = express()
app.get('/', (req, res) => {
// res.end('ddd')
res.json(DataStore.list)
})
app.listen(8080, () => {
console.log('服务开启')
})
Nodejs全局对象
- 与浏览器平台的window不完全相同
- Nodejs全局对象上挂载许多属性
Nodejs全局对象就是global,Global的根本作用就是作为全局对象的宿主
全局对象可以看做是全局变量的宿主
常见全局变量 - __filename:返回正在执行脚本文件的绝对路径
- __dirname:返回正在执行脚本所在目录
- timer类函数:执行顺序与事件循环间的关系
- process:提供与当前进程互动的接口
- require:实现模块的加载
- module,exports:处理模块的导出
全局变量process - 获取进程信息
- 执行进程操作
process.memoryUsage() //内存使用情况
process.cpuUsage() //cpu使用情况
process.cwd() //当前工作目录
process.versions //node信息
process.arch //操作系统信息
process.env //当前node环境
process.env.PATH //当前配置的系统环境变量
process.env,USERPROFILE //管理员目录 mac下(HOME)
process.platform //系统平台
process.argv //启动参数
process.pid //程序运行id
process.uptime() //运行消耗时间
//事件
process.on('beforeExit',(code)=>{
//即将退出
})
process.on('exit',(code)=>{
//退出 只能执行同步代码
})
process.exit() //程序主动退出
process.stdout //标准输出
process.stdin //标准输入
path模块
常用api
- basename()获取路径中基础名称
- dirname()获取路径中目录名称
- extname()获取路径中扩展名称
- isAbsolute()获取路径是否为绝对路径
- join()拼接多个路径片段
- resolve()返回绝对路径
- parse()解析路径
- format()序列化路径
- normalize()规范化路径
全局变量Buffer
Buffer缓冲区,Buffer让JavaScript 可以操作二进制
IO行为操作的就是二进制数据
流操作配合管道实现数据分段传输
Nodejs中Buffer是一片内存空间
- 无需require的一个全局变量
- 实现Nodejs平台下得二进制数据操作
- 不占据V8堆内存大小的内存空间
- 内存的使用由Node来控制,由V8的GC回收
- 一般配合Stream流使用,充当数据缓冲区
创建Buffer实例 - alloc:创建指定字节大小的buffer
- allocUnsafe:创建指定大小的buffer(不安全)
- from:接收数据,创建buffer
Buffer实例方法 - fill:使用数据填充buffer
- write:向buffer中写入数据
- toString:从buffer中提取数据
- slice:截取buffer
- indexOf:在buffer中查找数据
- copy:拷贝buffer中的数据
自定义Buffer之split
实现对Buffer拆分
ArrayBuffer.prototype.split = function (sep) {
let len = Buffer.from(sep).length
let ret = []
let start = 0
let offset = 0
while (offset = this.indexOf(sep, start) !== -1) {
ret.push(this.slice(start, offset))
start = offset + len
}
ret.push(this.slice(start))
return ret
}
let buf = '吃面,吃米,吃肉'
let arr = buf.split('吃')
FS
FS是内置的核心模块,提供文件系统操作的API
权限位,标识符,文件描述符
- 权限位:用户对于文件所具备的操作权限
- 标识符:Nodejs中flag表示对文件操作方式
- r:表示可读
- w:表示可写
- s:表示同步
- +:表示执行相反操作
- x:表示排他操作
- a:表示追加操作
- 文件描述符:fd就是操作系统分配给被打开文件的标识
文件读写与拷贝操作
文件操作API
- readFile:从指定文件中读取数据
- writeFile:向指定文件中写入数据(覆盖写操作)
- appendFile:追加的方式向指定文件中写入数据
- copyFile:将某个文件中的数据拷贝至另一个文件
- watchFile:对指定文件进行监控
- unwatchFile:取消对文件监控
const fs = require('fs')
const path = require('path')
fs.readFile(path.resolve('package.json'), 'utf8', (err, data) => {
console.log(data)
})
fs.writeFile('data.txt', 'abcdefg', (err) => {
})
文件打开与关闭
readFile,writeFile将文件一次性读取与写入,对于大体积的文件不合理。需要一种边读边写的操作方式
- open()
- close()
const fs = require('fs')
const path = require('path')
fs.open(path.resolve('data.txt'), 'r', (err, fd) => {
console.log(fd) //3
fs.close(fd, err => {
console.log('关闭成功')
})
})
大文件读写操作
const fs = require('fs')
const path = require('path')
//read:读操作就是将数据从磁盘文件中写入到 buffer 中
let buf = Buffer.alloc(10)
fs.open('data.txt', 'r', (err, rfd) => {
//read 将磁盘文件写入缓冲区
/**
* fs.read(fd,buf,offset,len,pos,cb)
* fd 定位当前被打开的文件
* buf 用于表示当前缓冲区
* offset 从buffer哪一个位置开始写
* len 当前次写入长度
* pos 当前从文件的哪个位置开始读
*/
fs.read(rfd, buf, 0, 3, 0, (err, readBytes, data) => {
})
//write 将缓冲区内容写入磁盘文件中
/**
* fs.write(fd,buf,offset,len,pos,cb)
* fd 当前操作的文件
* buf 用于表示当前缓冲区
* offset 从buffer哪一个位置开始读数据
* len 当前次写入长度
* pos 当前从文件的哪个位置开始写
*/
fs.write(rfd,buf,0,3,0,(err,written,buffer)=>{
fs.close(rfd)
})
})
文件拷贝自定义实现
const fs = require('fs')
//将a.txt 写入 b.txt中
// 打开 a 文件 利用read将数据保存在buffer中
// 打开 b 文件 用write将buffer中数据写入b文件中
let buf = Buffer.alloc(10)
fs.open('a.txt', 'r', (err, rfd) => {
fs.open('b.txt', 'w', (err, wfd) => {
fs.read(rfd, buf, 0, 10, 0, (err, readBytes, data) => {
fs.write(wfd, buf, 0, 10, 0, (werr, written) => {
console.log('写入成功')
// fs.close(rfd)
// fs.close(wfd)
})
})
})
})
优化处理
const fs = require('fs')
//将a.txt 写入 b.txt中
// 打开 a 文件 利用read将数据保存在buffer中
// 打开 b 文件 用write将buffer中数据写入b文件中
let buf = Buffer.alloc(10)
const bufSize = buf.length
let readOffset = 0
fs.open('a.txt', 'r', (err, rfd) => {
fs.open('b.txt', 'w', (err, wfd) => {
function next() {
fs.read(rfd, buf, 0, bufSize, readOffset, (err, readBytes, data) => {
if (!readBytes) {
//内容读取完
fs.close(rfd, () => { })
fs.close(wfd, () => { })
return
}
readOffset += readBytes
fs.write(wfd, buf, 0, readBytes, (werr, written) => {
next()
})
})
}
next()
})
})
FS目录操作API(异步)
- access:判断文件或目录是否具有操作权限
- stat:获取目录及文件信息
- mkdir:创建目录
- rmidr:删除目录
- readdir:读取目录中内容
- unlink:删除指定文件
创建目录(同步)
const fs = require('fs')
const path = require('path')
//接收a/b/c路径格式,使用/将路径拆分,放入数组['a','b','c']
//对数组遍历,拿到每一项,与前一项拼接 /
//判断一个当前对拼接后的路径是否具有可操作的权限,有则存在,没有则创建
function mkdirSync(dirpath) {
let pathArr = dirpath.split('/')
for (let i = 1; i <= pathArr.length; i++) {
let dir = pathArr.slice(0, i).join(path.sep)
try {
fs.accessSync(dir)
} catch (error) {
fs.mkdirSync(dir)
}
}
}
mkdirSync('a/b/c')
创建目录(异步)
const fs = require('fs')
const path = require('path')
function mkDir(dirPath, cb) {
let pathArr = dirPath.split('/')
let index = 1
function next() {
if (index > pathArr.length) return cb && cb()
let current = pathArr.slice(0, index++).join('/')
fs.access(current, (err) => {
if (err) {
fs.mkdir(current, next)
} else {
next()
}
})
}
next()
}
mkDir('a/b/c', () => {
console.log('创建成功')
})
promise写法
const fs = require('fs')
const path = require('path')
const { promisify } = require('util')
const access = promisify(fs.access)
const mkdir = promisify(fs.mkdir)
async function mkDir(dirPath, cb) {
let pathArr = dirPath.split('/')
for (let index = 1; index <= pathArr.length; index++) {
let current = pathArr.slice(0, index).join('/')
try {
await access(current)
} catch (error) {
await mkdir(current)
}
}
cb && cb()
}
mkDir('a/b/c', () => {
console.log('success')
})
目录删除异步实现
const fs = require('fs')
const path = require('path')
//定义一个函数,接收一个路径,删除操作
//判断当前路径是否为一个文件,是则直接删除当前文件
//如果传入的是一个目录 则需继续读取目录中内容 然后执行删除文件操作
function removeDir(dirPath, cb) {
//判断dirPath类型
fs.stat(dirPath, (err, statObj) => {
//判断是否是目录
if (statObj.isDirectory()) {
fs.readdir(dirPath, (derr, files) => {
let dirs = files.map(item => {
return path.join(dirPath, item)
})
let index = 0
function next() {
if (index === dirs.length) return fs.rmdir(dirPath, cb)
let current = dirs[index++]
removeDir(current, next)
}
next()
})
} else {
fs.unlink(dirPath, cb)
}
})
}
removeDir('a', () => {
console.log('delete all')
})
CommonJS规范
CommonJS规范主要应用于Nodejs
CommonJS规范起初是为了弥补JS语言模块化缺陷
CommonJS规范定义模块的加载是同步完成
- 模块引用 (require)
- 模块定义 (exports)
- 模块标识(路径)
Nodejs与CommonJS
- 任意一个文件就是一个模块,具有独立作用域
- 使用require导入其它模块
- 将模块ID传入require实现目标模块定位
module属性 - 任意js文件就是一个模块,可以直接使用module属性
- id:返回模块标识符,一般是一个绝对路径
- filename:返回文件模块的绝对路径
- loaded:返回布尔值,表示模块是否完成加载
- parent:返回对象存放调用当前模块的模块
- children:返回数组,存放当前模块调用的其它模块
- exports:返回当前模块需要暴露的内容
- paths:返回数组,存放不同目录下的node_modules位置
require属性 - 基本功能是读入并且执行一个模块文件,返回文件的exports对象
- resolve:返回模块文件绝对路径
- extensions:依据不同后缀名执行解析操作
- main:返回主模块对象(入口)
模块分类与加载流程
模块分类
- 内置模块:在Node源码编译时写入到二进制文件中
- 文件模块:代码运行时,动态加载
加载流程
- 路径分析:依据标识符确定模块位置
- 路径标识符
- 非路径标识符(见于核心模块fs,http,path以及第三方包等,都可以直接写名称)
- 文件定位:确定目标模块中具体的文件及文件类型
- 项目下存在a.js模块,导入时使用require('a')语法,会按照a.js->a.json->a.node方式补足扩展名
- 如果都没找到node会把a当成一个目录当成包处理,查找其中package.json文件,使用JSON.parse()解析,取出描述文件中的main属性值(如果值没有扩展名 main.js->main.json->main.node补充扩展查找)
- 如果都没有 则将index作为目标模块中的具体文件名称(index.js->index.json->index.node)在当前目录下查找,如果还没有则会在module.paths模块路径数组一层一层查找对应目录下的node_modules中查找 如果还没有则会抛出查找失败异常
- 编译执行:采用对应的方式完成文件的编译执行
- 将某个具体类型的文件按照相应的方式进行编译执行
- 创建新对象,按路径载入,完成编译执行
- JS文件编译执行
- 使用fs模块同步读入目标文件内容
- 对内容进行语法包装,生成可执行JS函数
- 调用函数时传入exports,module,require等属性值
- JSON文件编译执行
- 将读取到的内容通过JSON.parse()进行解析,将结果返回给exports对象
缓存优先原则
- 提高模块加载速度(通过路径标识符查找当前缓存中是否存在该模块,存在则直接返回当前模块)
- 当前模块不存在,则经历一次完整加载流程
- 模块加载完成后,使用将当前模块路径作为索引进行缓存
自己实现一个简单的require方法
核心模块Events
nodejs事件管理 通过EventEmitter类实现事件统一管理
events与EventEmitter
- nodejs是基于事件驱动的异步操作架构,内置events模块
- events模块提供了EventEmitter类
- nodejs中很多内置核心模块继承EventEmitter(fs,net,http模块等)
EventEmitter常见API - on:添加当事件被触发时调用的回调函数
- emit:触发事件,按照注册的序同步调用每个事件监听器
- once:添加当事件在注册之后首次被触发时调用的回调函数
- off:移除特定的监听器
const EventEmitter = require('events')
const ev = new EventEmitter()
ev.on('test', () => {
console.log('1')
})
ev.on('test', () => {
console.log('2')
})
ev.emit('test') //1 2
ev.once('one', () => {
})
ev.off('one',()=>{})
ev.on('args',(...args)=>{
console.log(args)
})
ev.emit('args','1','2') //[1,2]
发布订阅模式
定义对象间一对多的依赖关系
订阅者将想要订阅的事件监听注册在调度中心,事件被触发的时候,发布者将事件发布在调度中心,之后调度中心统一调度之前订阅者注册的事件
发布订阅要素
- 缓存队列,存放订阅者信息
- 发布者具有增加,删除订阅的能力
- 当发布者状态改变时通知所有订阅者执行监听
class Pubsub {
constructor() {
this._events = {}
}
//订阅
subscribe(event, cb) {
if (this._events[event]) {//已经存在 添加监听操作
this._events[event].push(cb)
} else {
this._events[event] = [cb]
}
}
//发布
publish(event, ...args) {
const evns = this._events[event]
if (evns && evns.length) {
evns.forEach(cb => cb(...args))
}
}
}
let pub = new Pubsub()
pub.subscribe('test', (a, b) => {
console.log(a, b)
})
pub.subscribe('test', (a, b) => {
console.log('2', a, b)
})
pub.publish('test', 1, 2)
EventEmitter类模拟实现
function MyEvent() {
//缓存订阅者信息
this._events = Object.create(null)
}
MyEvent.prototype.on = function (event, cb) {
//判断当前事件是否存在
if (this._events[event]) {
this._events[event].push(cb)
} else {
this._events[event] = [cb]
}
}
MyEvent.prototype.emit = function (event, ...args) {
let emits = this._events[event]
if (emits && emits.length) {
emits.forEach(cb => cb(...args))
}
}
MyEvent.prototype.off = function (event, cb) {
let events = this._events[event]
if (events) {
this._events = events.filters(item => item !== cb && item.link !== cb)
}
}
MyEvent.prototype.once = function (event, cb) {
let foo = function (...args) {
cb.call(this, ...args)
this.off(event, foo)
}
foo.link = cb
this.on(event, foo)
}
let ev = new MyEvent()
let fn = function (data, d) {
console.log('1', data, d)
}
ev.on('test', fn)
ev.on('test', (data) => {
console.log('2', data)
})
ev.emit('test', '参数', '参数2')
浏览器中的Eventloop
完整事件环执行顺序
- 从上至下执行所有的同步代码
- 执行过程中将遇到的宏任务与微任务添加至相应的队列
- 同步代码执行完毕之后,执行满足条件的微任务回调
- 微任务队列执行完毕后执行所有满足需求的宏任务回调
- 循环事件环操作
- 每次执行一个宏任务之后就会立刻检查微任务队列
Nodejs事件循环机制
在浏览器下有两个任务队列,宏任务,微任务
在nodejs中有六个事件队列。
timers:执行setTimeout与setInterval回调
pending callbacks:执行系统操作的回调,例如tcp udp
idle,prepare:只在系统内部进行使用
poll:执行I/O相关的回调
check:执行setImmediate中的回调
-
close callbacks:执行close事件的回调
定时器:本阶段执行已经被 setTimeout() 和 setInterval() 的调度回调函数。
待定回调:执行延迟到下一个循环迭代的 I/O 回调。
idle, prepare:仅系统内部使用。
轮询:检索新的 I/O 事件;执行与 I/O 相关的回调(几乎所有情况下,除了关闭的回调函数,那些由计时器和 setImmediate() 调度的之外),其余情况 node 将在适当的时候在此阻塞。
检测:setImmediate() 回调函数在这里执行。
关闭的回调函数:一些关闭的回调函数,如:socket.on('close', ...)。
Nodejs完整事件环
- 执行同步代码,将不同的任务添加到相应的队列
- 所有同步代码执行后会去执行满足条件的微任务
- 所有微任务代码执行后会执行timer队列中满足的宏任务
- timer中的所有宏任务执行完成后就会依次切换队列(如上图顺序)
- 在完成队列切换前会先清空微任务代码
- 微任务中 process.nextTick 优先级高于Promise.then
Node与浏览器事件环不同
- 任务队列数不同
- 浏览器中只有二个任务队列
- Nodejs中有6个事件队列
- Nodejs微任务执行时机不同
- 二者都会在同步代码执行完毕后执行微任务
- 浏览器每当一个宏任务执行完毕后就清空微任务
- Nodejs平台每个宏任务执行完毕,事件队列切换时都会去清空微任务
- 微任务优先级不同
- 浏览器事件环中,微任务存放在事件队列,先进先出
- Nodejs中 process.nextTick 先于 promise.then
Stream模块
Nodejs诞生之初就是为了提高IO性能
文件操作系统和网络模块实现了流接口
Nodejs中的流就是处理流式数据的抽象接口
流处理数据的优势
- 事件效率:流的分段处理可以同时操作多个数据chunk
- 空间效率:同一时间流无须占据大内存空间
- 使用方便:流配合管理,扩展程序变得简单
Nodejs内置了stream,它实现了流操作对象
nodejs中流的分类
- Readable:可读流,能够实现数据的读取
- Writeable:可写流,能够实现数据的写操作
- Duplex:双工流,既可读又可写
- Tranform:转换流,可读可写,还能实现数据转换(既可读又可写)
Nodejs流特点 - Stream模块实现了四个具体的抽象
- 所有流都继承了EventEmiiter
可读流
生产供程序消费数据的流
自定可读流- 继承stream里的Readable
-
重写_read方法调用push产出数据
const { Readable } = require('stream')
//自定义类 继承Readable
class MyRead extends Readable {
constructor(source) {
super()
this.source = source
}
_read() {
let data = this.source.shift() || null
this.push(data)
}
}
//模拟底层数据
let source = ['one', 'two', 'three']
let mr = new MyRead(source)
//手动调用read()方式
mr.on('readable', () => {
let data = null
//需要主动调用read从中读取数据 传参 每次读几个字节 当读到null时说明读取完毕
while ((data = mr.read(2)) !== null) {
console.log(data.toString())
}
})
//流动模式
mr.on('data', (chunk) => {
console.log(chunk.toString())
})
可写流
用于消费数据的流
const fs = require('fs')
//创建一个可读流
let rs = fs.createReadStream('data.txt')
rs.setEncoding('utf-8')
//创建一个可写流
let ws = fs.createWriteStream('data1.txt')
//监听事件调用方法完成具体数据消费
rs.on('data', chunk => {
//数据写入
ws.write(chunk)
})
自定义可写流
- 继承stream模块的Writeable
- 重写_write方法,调用write执行写入
const { Writable } = require('stream')
class MyWrite extends Writable {
constructor() {
super()
}
_write(chunk, en, done) {//chunk:数据 en:设置编码集 done:回调
process.stdout.write(chunk.toString() + '->')
process.nextTick(done)//在write执行完后执行回调
}
}
let mw = new MyWrite()
//手动调用write()方式
mw.write('test', 'utf-8', () => {
console.log('done')
})
可写流数据
- pipe事件:可读流调用pipe()方法时触发
- unpipe事件:可读流调用unpipe()方法时触发(切断数据流)
Duplex&&Tranform
自定义双工流
- 继承Duplex类
- 重写_read方法,调用push生产数据
- 重写_write方法,调用write消费数据
自定义转换流(中间可以对数据进行转换操作,可读与可写流之间是打通的)
- 继承Transform类
- 重写_transform方法,调用push和callback
- 重写_flush方法,处理剩余数据
//自定义双工流
const { Duplex } = require('stream')
class MyDuplex extends Duplex {
constructor(source) {
super()
this.source = source
}
_read() {
let data = this.source.shift() || null
this.push(data)
}
_write(chunk, en, next) {
process.stdout.write(chunk)
process.nextTick(next)
}
}
let source = ['1', '2', '3']
let md = new MyDuplex(source)
md.on('data', chunk => {
console.log(chunk.toString())
})
md.write('sss', () => {
console.log('done')
})
//自定义转换流
const { Transform } = require('stream')
class MyTransform extends Transform {
constructor() {
super()
}
_transform(chunk, en, cb) {
this.push(chunk.toString().toUpperCase())
cb(null)
}
}
let mt = new MyTransform()
mt.write('a')
mt.on('data', chunk => {
console.log(chunk.toString()) //A
})
文件可读流创建与消费
const fs = require('fs')
let rs = fs.createReadStream('data.txt', {
flags: 'r',//方式 r:可读方式
encoding: null,//数据流格式默认buffer
fd: null,//文件标识符 默认3开始 0,1,2(标准输入,输出,错误占用)
mode: 438,//权限位 十进制438
autoClose: true,//自动关闭文件
start: 0,//默认从哪个位置读取
// end: 3,//文件结束位置
highWaterMark: 4,//每次最多读多少字节数据 每次开辟的缓存区大小
})
//data方式
rs.on('data', chunk => {
console.log(chunk.toString())
rs.pause()//暂停模式
rs.resume()//再次打开
})
//readable方式
rs.on('readable', () => {
let data
while ((data = rs.read(1)) !== null) {
console.log(data.toString())
// console.log('::::', rs._readableState.length)
}
})
//可读流一些事件
//监听文件是否被打开 调用createReadStream便触发
rs.on('open',fd => {
//fd 当前文件操作符
})
//文件关闭 数据被消费完成后触发 在end后
rs.on('close',() => {
})
//数据被清空之后触发 在close之前
rs.on('end',() => {
})
//出错触发
rs.on('error',(err) => {
})
文件可写流
const fs = require('fs')
const ws = fs.createWriteStream('test.txt', {
flags: 'w',
mode: 438,
fd: null,
encoding: 'utf-8',
start: 0,
highWaterMark: 3
})
//写入数据
ws.write('abcd', () => {
console.log('done')
})
//createWriteStream 就触发
ws.on('open', (fd) => {
})
ws.end('end') //表示写操作已经完成 end之后不能执行写操作 否则报错 进入error事件 可传参 将参数数据也写入
//close 在数据写入操作全部完成之后执行 也就是在 ws.end() 触发后
ws.on('close', () => {
})
//写入错误触发
ws.on('error', (err) => {
})
Node.js背压机制
nodejs的stream已经实现了可以保证数据平滑流动的背压机制(pipe方法)
文件读取速度大于写入速度,缓存大小有限。可能会导致内存溢出,GC频繁调用,其它进程变慢
//背压机制简单实现
const fs = require('fs')
let rs = fs.createReadStream('data.txt',{
highWaterMark:4
})
let ws = fs.createWriteStream('copy.txt',{
highWaterMark:1
})
let flag = true
rs.on('data',chunk=>{
flag = ws.write(chunk)
if(!flag){ //flag为false则当前缓存超限
rs.pause() //读入暂停
}
})
ws.on('drain',()=>{
rs.resume() //当前缓存读取完 再次打开读入
})
文件可读流实现
//文件可读流实现
const fs = require('fs')
const EventEmitter = require('events')
class MyFileReadStream extends EventEmitter {
constructor(path, options = {}) {
super()
this.path = path
this.flags = options.flags || 'r'
this.mode = options.mode || 438
this.autoClose = options.autoClose || true
this.start = options.start || 0
this.end = options.end
this.highWaterMark = options.highWaterMark || 64 * 1024
this.readOffset = 0
this.open()
this.on('newListener', type => {
if (type === 'data') {
this.read()
}
})
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
this.emit('error', err)
}
this.fd = fd
this.emit('open', fd)
})
}
read() {
if (typeof this.fd !== 'number') {
return this.once('open', this.read) //为了先拿到读取的文件fd
}
let buf = Buffer.alloc(this.highWaterMark)
let howMuchToRead = this.highWaterMark
if (this.end) {
howMuchToRead = Math.min(this.end - this.readOffset, this.highWaterMark)
}
fs.read(this.fd, buf, 0, howMuchToRead, this.readOffset, (err, readBytes) => {
if (readBytes) {
this.readOffset += readBytes
this.emit('data', buf.slice(0, readBytes))
this.read()
} else {
this.emit('end')
this.close()
}
})
}
close() {
fs.close(this.fd, () => {
this.emit('close')
})
}
}
let rs = new MyFileReadStream('test.txt', {
end: 6,
highWaterMark: 3
})
// rs.on('open', fd => {
.
// console.log(fd)
// })
rs.on('data', chunk => {
console.log(chunk)
})
文件可写流实现
pipe方法
const fs = require('fs')
const rs = fs.createReadStream('data.txt')
const ws = fs.createWriteStream('copy.txt')
rs.pipe(ws)
创建TCP通信
Net模块实现了底层通信接口
通信过程
- 创建服务端:接收和回写客户端数据
- 创建客户端:发送和接收服务端数据
- 数据传输:内置服务事件和方法读写数据
通信事件&方法 - listening事件:调用server.listen方法之后触发
- connection事件:新的连接建立时触发
- close事件:当server关闭时触发
- error事件:当错误出现时触发
- data事件:当接收到数据的时候触发该事件
- write方法:在socket上发送数据,默认UT8编码
- end操作:当socket的一端发送FIN包时触发,结束可读端
//server.js
const net = require('net')
//创建服务端实例
const server = net.createServer()
const PORT = 1111
const HOST = 'localhost'
server.listen(PORT, HOST)
server.on('listening', () => {
console.log(`服务开启 ${HOST}:${PORT}`)
})
//接收消息 回消息
server.on('connection', (socket) => {
socket.on('data', (chunk) => {
const msg = chunk.toString()
console.log(msg)
//回消息
socket.write(Buffer.from('回复'))
})
})
server.on('close', () => {
console.log('服务关闭')
})
server.on('error', (err) => {
console.log(err)
})
//client.js
const net = require('net')
const client = net.createConnection({
port: 1111,
host: '127.0.0.1'
})
client.on('connect', () => {
client.write('发送数据')
})
client.on('data', (chunk) => {
console.log('接收数据:', chunk.toString())
})
client.on('error', (err) => {
console.log('client error', err)
})
client.on('close', () => {
console.log('client clsoe')
})
http协议
const http = require('http')
const url = require('url') //处理url路径
//创建服务端
let server = http.createServer((req, res) => {
//请求路径获取
let { pathname, query } = url.parse(req.url, true)
//请求方式获取
let method = req.method
//版本号获取
let httpVersion = req.httpVersion
//请求头
let hearders = req.headers
//请求体获取 请求体是一个数据流 按照数据流处理
let arr = []
req.on('data', data => {
arr.push(data)
})
req.on('end', () => {
let allData = Buffer.concat(arr).toString()
})
//设置响应
res.statusCode = 302 //设置状态码
res.setHeader('Content-type', 'text/html;charset=utf-8')//设置响应头
res.end('返回数据')
})
server.listen(8080, () => {
console.log('服务开启')
})
客户端代理
//server.js
const http = require('http')
const url = require('url')
const querystring = require('querystring')
const server = http.createServer((req, res) => {
let { pathname, query } = url.parse(req.url)
let headers = req.headers //请求头
//post
let arr = []
req.on('data', (data) => {
arr.push(data)
})
req.on('end', () => {
let endData = Buffer.concat(arr).toString //post请求数据
if (headers['Content-type'] == 'application/json') {
let d = JSON.parse(endData)
d.age = 40
res.end(JSON.stringify(d))
} else if (headers['Content-type'] == 'application/x-www-form-urlencoded') {
let fromdata = querystring(endData)
res.end(JSON.stringify(fromdata))
}
})
})
server.listen(1111, () => {
})
//client.js
const http = require('http')
// let options = {
// host: 'localhost',
// port: 1111,
// path: '/?a=1'
// }
// http.get(options, (res) => {
// })
let options = {
host: 'localhost',
port: 1111,
path: '/?a=1',
method: 'POST',
headers: {
'Content-type': 'application/json' //设置数据格式 json
// 'Content-type': 'application/x-www-form-urlencoded' //form表单格式
}
}
let req = http.request(options, (res) => {
let arr = []
req.on('data', (data) => {
arr.push(data)
})
req.on('end', () => {
let endData = Buffer.concat(arr).toString //post请求数据
})
})
// req.end('post请求数据') //字符串格式
req.end('{"name":"jay"}') //json格式
// req.end('name=jay&age=40') //form表单格式