原标题:Node.js 中流操作实践

Stream 是 Node.js 中的基础概念,类似于 EventEmitter,专注于 IO 管道中事件驱动的数据处理方式;类比于数组或者映射,Stream 也是数据的集合,只不过其代表了不一定正在内存中的数据。。Node.js 的 Stream 分为以下类型:

Stream 本身提供了一套接口规范,很多 Node.js 中的内建模块都遵循了该规范,譬如著名的 fs 模块,即是使用 Stream 接口来进行文件读写;同样的,每个 HTTP 请求是可读流,而 HTTP 响应则是可写流。

const stream = require('stream'); const fs = require('fs'); const readableStream = fs.createReadStream(process.argv[2], { encoding: 'utf8' }); // 手动设置流数据编码 // readableStream.setEncoding('utf8'); let wordCount = 0; readableStream.on('data', function(data) { wordCount += data.split(/\s{1,}/).length; }); readableStream.on('end', function() { // Don't count the end of the file. console.log('%d %s', --wordCount, process.argv[2]); });

当我们创建某个可读流时,其还并未开始进行数据流动;添加了 data 的事件监听器,它才会变成流动态的。在这之后,它就会读取一小块数据,然后传到我们的回调函数里面。 data 事件的触发频次同样是由实现者决定,譬如在进行文件读取时,可能每行都会触发一次;而在 HTTP 请求处理时,可能数 KB 的数据才会触发一次。可以参考 nodejs/readable-stream/_stream_readable 中的相关实现,发现 on 函数会触发 resume 方法,该方法又会调用 flow 函数进行流读取:

我们还可以监听 readable 事件,然后手动地进行数据读取:

Readable Stream 还包括如下常用的方法:

在日常开发中,我们可以用 stream-wormhole 来模拟消耗可读流:

当 end() 被调用时,所有数据会被写入,然后流会触发一个 finish 事件。注意在调用 end() 之后,你就不能再往可写流中写入数据了。

Writable Stream 中同样包含一些与 Readable Stream 相关的重要事件:

多个管道顺序调用,即是构建了链接(Chaining):

管道也常用于 Web 服务器中的文件处理,以 Egg.js 中的应用为例,我们可以从 Context 中获取到文件流并将其传入到可写文件流中:

参照分布式系统导论,可知在典型的流处理场景中,我们不可以避免地要处理所谓的背压(Backpressure)问题。无论是 Writable Stream 还是 Readable Stream,实际上都是将数据存储在内部的 Buffer 中,可以通过 writable.writableBuffer或者 readable.readableBuffer 来读取。当要处理的数据存储超过了 highWaterMark 或者当前写入流处于繁忙状态时,write 函数都会返回 false。pipe 函数即会自动地帮我们启用背压机制:

当 Node.js 的流机制监测到 write 函数返回了 false,背压系统会自动介入;其会暂停当前 Readable Stream 的数据传递操作,直到消费者准备完毕。

| | +> emit .resume(); +------------+ | |

| | +------------+ add chunk to queue | | Duplex Stream 可以看做读写流的聚合体,其包含了相互独立、拥有独立内部缓存的两个读写流, 读取与写入操作也可以异步进行: Duplex Stream ------------------| Read External Sink ------------------| 我们可以使用 Duplex 模拟简单的套接字操作: const { Duplex } = require('stream'); class Duplexer extends Duplex { constructor(props) { super(props); this.data = []; } _read(size) { const chunk = this.data.shift(); if (chunk == 'stop') { this.push(null); } else { if (chunk) { this.push(chunk); } } } _write(chunk, encoding, cb) { this.data.push(chunk); cb(); }} const d = new Duplexer({ allowHalfOpen: true });d.on('data', function(chunk) { console.log('read: ', chunk.toString());});d.on('readable', function() { console.log('readable');});d.on('end', function() { console.log('Message Complete');});d.write('....'); 在开发中我们也经常需要直接将某个可读流输出到可写流中,此时也可以在其中引入 PassThrough,以方便进行额外地监听: const { PassThrough } = require('stream');const fs = require('fs'); const duplexStream = new PassThrough(); // can be piped from reaable streamfs.createReadStream('tmp.md').pipe(duplexStream); // can pipe to writable streamduplexStream.pipe(process.stdout); // 监听数据,这里直接输出的是 BufferduplexStream.on('data', console.log); Transform Stream Transform Stream 则是实现了 _transform 方法的 Duplex Stream,其在兼具读写功能的同时,还可以对流进行转换: Transform Stream --------------|-------------- You Write ----> ----> Read You --------------|-------------- 这里我们实现简单的 Base64 编码器: const util = require('util');const Transform = require('stream').Transform; function Base64Encoder(options) { Transform.call(this, options);} util.inherits(Base64Encoder, Transform); Base64Encoder.prototype._transform = function(data, encoding, callback) { callback(null, data.toString('base64'));}; process.stdin.pipe(new Base64Encoder()).pipe(process.stdout);

查看原文 >>
相关文章