Node.js Stream

芝麻凛 2021年10月31日 364次浏览

Node.js Stream

Stream - 流

image20211031150055491.png

stream 是水流,但默认没有水

stream.write 可以让水流中有水(数据)

每次写的小数据叫做chunk (块)

产生数据的一段叫做source (源头)

得到数据的一段叫做sink(水池)

管道

image20211031152132838.png

两个流可以用一个管道相连,stream1的末尾连接上stream2的开端,只要stream1有数据,就会流到stream2

常用代码:stream1.pipe(stream2)

链式操作:a.pipe(b).pipe(c) === a.pipe(b) b.pipe(c)

例子一:

步骤

  • 打开流,多次往里面写入内容,关闭流
const fs = require('fs')
const stream = fs.createWrteStream('./big_file.txt')
for(let i = 0;i < 10000; i++){
    stream.write(`这是第 ${i} 行内容,需要很多很多内容,要不停的写文件1111111111回车\n`)
}
stream.end() //关掉 stream
console.log('done')

例子二:

const fs = require('fs')
const http = require('http')

const server = http.createServer()
server.on('request', (request, response) => {
  fs.readFile('./big_file.txt', (error, data) => {
    if (error) throw error
    response.end(data)
    console.log('done')
  })
})
server.listen(8888)

可以使用任务管理器查看 Node.js 内存占用。

例子三:

  • 用Stream改写第二个例子
  • 查看Node.js 内存占用,基本不会高于 30Mb,流可以使我们内存占用降的比较低
  • 文件stream 和 response stream 通过管道相连
const http = require('http');
const fs = require('fs');
const server = http.createServer()
server.on('request', (request, response) => {
  const stream = fs.createWriteStream('./big_file.txt')
  stream.pipe(response)
  stream.on('end',() => console.log('done'))
})
server.listen(8888)

管道也可以通过事件实现

// stream1 一有数据就塞给 stream2
stream1.on('data',(chunk)=>{
    stream2.write(chunk)
})
// stream1 停了 就停掉 stream2
stream1.on('end',()=>{
    stream2.end()
})

调试:

  • 运行 node --inspect-brk test.js
  • 打开浏览器,F12打开控制台,等待Node.js LOGO出现
  • 点击控制台的Node 图标,就可以在Dev Tools - Node.js 中调试代码了

Stream对象的原型链

s = fs.createReadStream(path)

  • 那么它的对象层级为
  • 自身属性(由fs.ReadStream构造)
  • 原型:stream.Readable.prototype
  • 二级原型:streamStream.prototype
  • 三级原型:events.EventEmitter.prototype
  • 四级原型:Object.prototype

Stream 对象都继承了EventEmitter

支持的事件和方法

image20211031154226249.png

Stream 分类

名称特点
Readable可读
Writable可写
Duplex可读可写(双向)
Transform可读可写(变化)

image20211031160329994.png

Readable Stream

静止态 paused 和 流动态 flowing

  • 默认处于 paused 态
  • 添加 data 事件监听,它就变为 flowing 态
  • 删除 data 事件监听, 它就变为 paused 态
  • pause() 可以将它变为 paused
  • resume() 可以将它变为 flowing

Writable Stream

drain 流干了事件

  • 表示可以加点水了
  • 调用 stream.write(chunk) 的时候,可能会得到false
  • FALSE的意思是写的太快了,数据积压了
  • 等drain 事件触发,才能继续write

finish 事件

  • 调用 stream.end() 之后 ,而且缓冲区数据都已经传给底层系统之后
  • 触发 finish 事件

自定义Stream

创建一个Writable Stream

const {Writable} = require("stream")

const outStream = new Writable({
    write(chunk,encoding,callback){
        console.log(chunk.toString());
        callback();
    }
});

process.stdin.pipe(outStream);
// 保存文件为 writable.js 然后用 node 运行
// 不管输入什么 都会得到相同的结果

创建一个 Readable Stream

const {Readable} = require('stream')

const inStream = new Readable();

inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');

inStream.push(null); // No more data

inStream.on('data', (chunk) => {
  process.stdout.write(chunk)
  console.log('写数据了')
})
// 保存文件为 readable.js 然后用 node运行
// 先把所有数据都push进去了,然后pipe

const {Readable} = require('stream')

const inStream = new Readable({
  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null)
    }
  }
});

inStream.currentCharCode = 65
inStream.pipe(process.stdout)
// 保存文件为 readable2.js 然后用 node 运行
// 这次的数据是按需提供的,对方调用 read 我们才会给一次数据

Duplex Stream

const { Duplex } = require('stream')
const inoutStream = new Duplex({
    write(chunk,encoding,callback){
        console.log(chunk.toString())
        callback()
    },
    read(size){
        this.push(String.fromCharCode(this.currentCharCode++));
        if (this.currentCharCode > 90) {
      		this.push(null)
    	}
    }
});
inoutStream.currentCharCode = 65;

process.stdin.pipe(inoutStream).pipe(process.stdout);

Transform Stream

const { Transform } = require('stream')
const upperCaseTr = new Transform({
    transform(chunk,encoding,callback){
        this.push(chunk.toString().toUpperCase());
        callback();
    }
});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);

Node.js 内置的 Transform Stream

const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .on('data', () => process.stdout.write('.'))
  .pipe(fs.createWriteStream(file + '.gz'));

Node.js 中的 Stream

image20211031170923780.png

数据流中的积压问题 背压 Back Pressure