Streams 是驱动 node.js 程序的核心概念。它提供了处理对文件的读写,网络传输,或者其他端到端的数据交换的更加高效的方式。

streams 流并不是 node.js 首先引入的概念,unix 操作系统在很久之前就在使用了,一个程序可以通过 pipe 管道操作符 | 来传递 streams 流给其他程序。

下面的示例是在 Linux 中,通过 pipe 管道将 cat 读取的文件数据传递给 grep 进行过滤,test.txt 文件内容如下:

aaa bbb
bbb ccc
aaa ccc
$ cat test.txt | grep aaa
aaa bbb
aaa ccc

stream 优点

在传统方法中,当程序读取一个文件内容时会先将文件内容全部读取到内存中,然后再去使用它。而使用 stream 时会一段段的读取文件内容并进行处理,而不需要整体读取到内存。

node.js 的 stream 模块是构建所有 streaming API 的底层。所有的 streams 流都是 eventEmitter 的实例。每种类型的 stream 都有各自的 event 事件,在数据流变化时触发对应事件。

相比于其他数据处理方法,streams 有两个优势:

  • 内存效率:在使用数据前不需要将大量数据存入内存中。
  • 时间效率:能够更快的进入数据处理阶段,在获取到文件后能够很快的使用它而不需要等待它全部加载完毕。

使用方法

下面我们从一个示例来说明 stream 的使用方法。我们建立一个 http server,当收到请求时读取一个本地文件的内容并作为 response 发送给客户端。

首先传统实现方式如下:

const http = require('http')
const fs = require('fs')

const hostname = '127.0.0.1'
const port = 3000

const server = http.createServer((req, res) => {
    fs.readFile(__dirname + '/test.txt', (err, data) => {
        if (err) {
            console.log('read error')
        }
        res.end(data)
    })
})

server.listen(port, hostname, () => {
    console.log(`Server running at http://${hostname}:${port}/`)
})

以上方式在收到请求后,通过 readFile() 读取文件内容到内存中,当成功后会调用 callback 通过 res.end(data) 发送读取的数据给客户端。如果文件较大时以上过程会花费一些时间。

__dirname 返回当前执行文件的路径。

下面介绍使用 stream 实现上面的过程:

const http = require('http')
const fs = require('fs')

const hostname = '127.0.0.1'
const port = 3000

const server = http.createServer((req, res) => {
    const stream = fs.createReadStream(__dirname + '/test.txt')
    stream.pipe(res)
})

server.listen(port, hostname, () => {
    console.log(`Server running at http://${hostname}:${port}/`)
})

通过 fs.createReadStream() 返回一个文件的 stream。

不同于第一种,以上方式会在 stream 中只要有了 data chunk 块就立刻作为 response 数据传输给客户端。这样就会一边读取文件一边传输数据。

pipe()

以上示例中调用 streampipe() method。它的功能是建立一个 source stream 源流到 destination stream 目标流的 pipe 管道。这样文件的 stream 流通向了 http response。

pipe() 的返回是 destination stream 目标流,这样就可以很方便的链接多个 pipe:

src.pipe(dest1).pipe(dest2)

以下写法和上面示例效果一样:

src.pipe(dest1)
dest1.pipe(dest2)

基于 stream 的 node.js API

由于 stream 的巨大优势,很多 node.js 核心模块提供了 stream 的原生支持,以下是常用的部分:

  • process.stdin 返回一个链接到 stdin 的 stream
  • process.stdout 返回一个链接到 stdout 的 stream
  • process.stderr 返回一个链接到 stderr 的 stream
  • fs.createReadStream() 创建一个文件的可读的 stream
  • fs.createWriteStream() 创建一个文件的可写的 stream
  • net.connect() 初始化一个基于 stream 的连接
  • http.request() 返回一个 http.ClientRequest 实例,它是一个可写的 stream

streams 的类型

有四个 streams 的 calsses:

  • Readable: 一个可以作为管道源头的 stream,不能作为管道的目标也就是不能写入数据
  • Writable: 一个可以作为管道目标的 stream,不能作为管道源头也就是不能从中获取数据
  • Duplex: 一个既可作为管道源头也可以作为目标的 stream
  • Transform: 类似于 Duplex

创建 readable stream

从 stream 模块定义一个可读的 stream 然后通过定义 readable._read() method 内容完成初始化:

const Stream = require('stream')
const readableStream = new Stream.Readable()

readableStream._read = () => {}

初始化也可以这样写:

const readableStream = new Stream.Readable({
    read() {}
})

以上就创建了一个可读的 stream,可以给其传输数据:

readableStream.push('hi')
readableStream.push('hello')

可以将这个 stream 和一个可写的 stream 之间建立管道:

readableStream.pipe(process.stdout)

创建 writable stream

通过实例化一个 Writable object 然后通过定义 _write() method 内容完成初始化:

const Stream = require('stream')
const writeableStream = new Stream.Writable()

writeableStream._write = (chunk, encoding, callback) => {
    console.log(chunk.toString())
    callback()
}

接收到的数据块传入 chunk,encoding 定义数据类型,callback function 是当一个 chunk 数据块传输完成后调用,此 callback 可在 writeableStream.write 内定义,一般可以是 error 处理,下面会介绍。

可以将定义的可写 stream 同一个可读 stream pipe 管道连接:

process.stdin.pipe(writeableStream)

这样就可以在 stdin 和 writeableStream 间建立 pipe,此时在 stdin 输入数据就会立刻将数据显示在终端。

从可读的 stream 读取数据

下面的示例中,我们建立一个可读的 stream 和一个 可写的 stream,并建立 pipe 管道连接:

const Stream = require('stream')

const readableStream = new Stream.Readable()
readableStream._read = () => {}

const writeableStream = new Stream.Writable()
writeableStream._write = (chunk, encoding, callback) => {
    console.log(chunk.toString())
    callback()
}

readableStream.pipe(writeableStream)

readableStream.push('hi\n')
readableStream.push('hello\n')

也通过 readable event 事件来处理可读 stream。当 stream 中有准备好的数据块时会触发 readable event:

readableStream.on('readable', () => {
    console.log(readableStream.read().toString())
})

给可写的 stream 写入数据

使用 write method 来写入数据:

writeableStream.write('hello world\n')

可以定义写入数据的编码格式和 callback function:

writeableStream.write('hello world\n', 'utf-8', err => {console.log(err)})

实际中 callback function 是否会执行要看 stream 初始化中在 writeableStream._write 是否调用了 callback。

如果数据写入已经完成,可以使用 end method 来告诉可写的 stream:

const Stream = require('stream')
const writeableStream = new Stream.Writable()

writeableStream._write = (chunk, encoding, next) => {
    console.log(chunk.toString())
    next()
}

process.stdin.pipe(writeableStream)

writeableStream.write('hello world\n')
writeableStream.end()

以上示例中如果在最后一句不调用 end method 则 writeableStream 会持续保持接收来自 stdin 的数据状态。

创建 transform stream

和可写的 stream 创建方式类似,可读也可写:

const Stream = require('stream')
const transformStream = new Stream.Transform()

transformStream._transform = (chunk, encoding, callback) => {
    console.log('transform' + chunk.toString())
    transformStream.push(chunk)
    callback()
}

const writeableStream = new Stream.Writable()
writeableStream._write = (chunk, encoding, callback) => {
    console.log('write' + chunk.toString())
    callback()
}

process.stdin.pipe(transformStream).pipe(writeableStream)

transformStream._transform 定义了当 transform stream 接收到数据后通过 transformStream.push 将数据发到 readable stream 中,这样其他 stream 就可以读取到它接收到的数据了。

更多 stream 使用方法参考:https://nodejs.org/api/stream.html

标签: none

添加新评论