[TOC]
# 流(Stream)是什么?

> 一種處理讀寫文件、網絡通信或任何端到端信息交換的有效方式。
> 從早先的 unix 開始,stream 便開始進入了人們的視野,在過去的幾十年的時間里,它被證明是一種可依賴的編程方式,它可以將一個大型的系統拆成一些很小的部分,并且讓這些部分之間完美地進行合作。在 **unix 中,我們可以使用 `|` 符號來實現流**。
> 在 node 中,node 內置的 stream 模塊已經被多個核心模塊使用,同時也可以被用戶自定義的模塊使用。和 unix 類似,node 中的流模塊的基本操作符叫做 `pipe ()`,同時你也可以使用一個**后壓機制來應對那些對數據消耗較慢的對象**。
應用程序中,流是一組有序的,有起點和終點的字節數據傳輸方式,在應用程序中傳輸數據的時候,將該對象中**所包含的數據轉換為各種形式的流數據(字節數據)**,在通過流的傳輸到達目的對象后再將流數據轉換為該對象中可以使用的數據。
流的獨特之處在于,它不像傳統的程序那樣一次將一個文件讀入內存,而是逐塊讀取數據、處理其內容,而不是將其全部保存在內存中。
這使得流在處理大量數據時非常強大,例如,文件可能大于你的空閑內存,不可能將整個文件讀入內存來處理,這時候流就發揮作用了。
我們以 YouTube 或 Netflix 等流媒體服務為例:這些服務不會讓你立即下載完整的視頻和音頻,而是瀏覽器將視頻作為連續流的數據塊,可以做到用戶立即收看。
然而,流并不僅僅用來處理媒體或大數據,它還賦予了代碼的“可組合性”。在設計時考慮到可組合性意味著幾個組件可以以某種方式組合以產生相同類型的結果。在 Node.js 中,通過使用流將數據從其他更小的代碼段中導入或導出,可以組成功能強大的代碼段。
# 為什么要用流
與其他數據處理方法相比,流有兩個主要優勢:
1. 內存效率:不需要加載大量的數據到內存就可以處理
2. 時間效率:一旦有了數據就開始處理,而不必等待傳輸完所有數據
# Node.js 中的 4 種流(Stream)
Stream 模塊是 Node.js 中默認提供的內建模塊(是所有流 API 的基礎)。?
Stream 是 EventEmitter 類的實例,流本質上是基于事件的,該類在 Node 中用于異步處理事件。
stream 模塊對于創建新型流實例非常有用。通常沒有必要使用 stream 模塊來消費流。
1. Readable?(可讀流):
* 對可讀取和可消費的數據源的抽象
* 示例:客戶端上的HTTP響應、服務器上的HTTP請求、`fs.createReadStream()`、process.stdin等。
2. Writable?(可寫流):
* 對可以寫入數據的目標的抽象
* 示例:服務器上的HTTP響應、客戶端上的HTTP請求、`fs.createWriteStream()`?、process.stout、process.stderr等。
3. Duplex?(雙向流):
* 同時實現可讀(Readable)和可寫(Writable)接口的流
* 示例:TCP套接字(`net.Socket`)。
4. Transform?(轉換流):
* 類似于雙向流的流,具有在讀寫數據時修改或轉換數據的能力例如,在文件壓縮操作中,可以向文件寫入壓縮數據,并從文件中讀取解壓數據。
* 示例:壓縮流(zlib.createGzip)。
例如,在基于 Node.js 的 HTTP 服務器中,`request`?是可讀流,`response`?是可寫流。還有`fs`?模塊,能同時處理可讀和可寫文件流。只要你用 Express,就是在使用流與客戶端進行交互,流也被用于各種數據庫連接驅動程序中,因為 TCP 套接字、TLS 堆棧和其他連接都是基于 Node.js 流的。
# 異步迭代器(async iterator)
強烈建議在處理流時使用異步迭代器。異步迭代是一種異步檢索數據容器內容的協議,意味著當前的“任務”可能在檢索數據項之前暫停。另外,值得一提的是,流的異步迭代器的內部實現使用了?`readable`事件。
當從可讀的流讀取數據時,可以使用 async iterator:
```
import * as fs from 'fs';
async function logChunks(readable) {
for await (const chunk of readable) {
console.log(chunk);
}
}
const readable = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);
// Output:
// 'This is a test!\n'
```
也可以在字符串中收集可讀流的內容:
```
import { Readable } from 'stream';
async function readableToString2(readable) {
let result = '';
for await (const chunk of readable) {
result += chunk;
}
return result;
}
const readable = Readable.from('Good morning!', { encoding: 'utf8' });
assert.equal(await readableToString2(readable), 'Good morning!');
```
注意,在本例中,我們必須使用異步函數,因為我們希望返回一個 Promise。
記得不要將異步函數與?`EventEmitter`?搞混了,因為目前無法捕獲從事件處理程序中發出的 rejection,從而導致難以跟蹤 bug 和內存泄漏。當前的最佳實踐是始終將異步函數的內容封裝在?`try/catch`?塊中并處理錯誤,但這很容易出錯。[這個 pull request](https://github.com/nodejs/node/pull/27867) 就是為了解決這個問題,如果能加入到 Node 核心代碼的話。
# Readable.from(): 從 iterables 創建可讀流
`stream.Readable.from(iterable, [options])`?是一個實用方法,用于從迭代器創建可讀流,其中的 iterable 包含了數據。iterable 可以是同步迭代的,也可以是異步迭代的。`options`?是可選的,可以用于指定文本編碼。
```
const { Readable } = require('stream');
async function * generate() {
yield 'hello';
yield 'streams';
}
const readable = Readable.from(generate());
readable.on('data', (chunk) => {
console.log(chunk);
});
```
# 兩種讀取模式
根據?Streams API,可讀流有兩種操作模式:?flowing?和 paused。 無論流是處于流模式還是暫停模式,可讀流都可以用對象模式或非對象模式。
1. 在 flowing 模式中,數據從底層系統自動讀取,并通過 `EventEmitter` 接口以盡可能快的速度使用事件提供給應用程序。
2. 在 paused 模式中,必須顯式地調用?`stream.read()`?方法來從流中讀取數據塊。
在 flowing 模式中,要從流中讀取數據,可以監聽 data 事件并綁定回調。當數據塊可用時,可讀流發出 data 事件并執行回調。代碼如下:
```
var fs = require("fs");
var data = '';
var readerStream = fs.createReadStream('file.txt'); //Create a readable stream
readerStream.setEncoding('UTF8'); // Set the encoding to be utf8\.
// 處理 stream 事件 --> data, end, 和 error
readerStream.on('data', function(chunk) {
data += chunk;
});
readerStream.on('end',function() {
console.log(data);
});
readerStream.on('error', function(err) {
console.log(err.stack);
});
console.log("Program Ended");
```
函數調用?fs.createReadStream() 提供了一個可讀流。一開始,流處于靜止狀態。
**只要監聽 data 事件并綁定回調,它就開始流動。**
然后,讀取數據塊并將其傳遞給回調。流的實現者可以決定 data 事件發出的頻率。例如,HTTP 請求可以在每讀取幾 KB 數據時發出一個 data 事件。
當你從文件中讀取數據時,你可能會采取每讀取一行就發出 data 事件。
當沒有更多的數據要讀取 (到達尾部) 時,流就會發出 end 事件。
在上面的代碼中,我們監聽了這個事件,以便在結束時得到通知。
另外,如果出現錯誤,流將發出錯誤并通知。
在 paused 模式下,你只需要反復調用流實例上的 read(),直到每一塊數據都被讀取,如下所示:
```
var fs = require('fs');
var readableStream = fs.createReadStream('file.txt');
var data = '';
var chunk;
readableStream.on('readable', function() {
while ((chunk=readableStream.read()) != null) {
data += chunk;
}
});
readableStream.on('end', function() {
console.log(data)
});
```
`read() `函數從內部緩沖區讀取一些數據并返回。
當沒有要讀取的內容時,它返回 `null`。
因此,在 `while` 循環中,我們檢查 `null` 并終止循環。請注意,readable 事件是在可以從流中讀取數據塊時發出的。
所有 Readable 數據流都以 paused 模式開始,但可以通過以下方式切換到 flowing 模式:
* 添加 `data` 事件處理器
* 調用?`stream.resume()` 方法
* 調用?`stream.pipe()`?方法發送數據到一個 `Writable`
Readable 可以使用以下幾種方式切換回 paused 模式:
如果沒有管道(pipe)目標,調用 `stream.pause()` 方法
如果有管道(pipe)目標,刪除所有管道目標。可以通過調用?`stream.unpipe()`?方法來刪除多個管道目標。
要記住的重要概念是,除非提供了一種用于消費或忽略該數據的機制,否則 Readable?將不會生成數據。如果消費機制被禁用或取消,Readable 將嘗試停止生成數據。?添加一個 readable?事件處理程序會自動使流停止流動,并通過 `readable.read() `消費數據。如果刪除了 readable 事件處理程序,那么如果存在 data 事件處理程序,則流就會再次開始流動。
# 如何創建可寫流
要將數據寫入可寫流,你需要在流實例上調用 `write()`。?如下所示:
```
var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');
readableStream.setEncoding('utf8');
readableStream.on('data', function(chunk) {
writableStream.write(chunk);
});
```
上面的代碼簡單直白。它只是簡單地從輸入流中讀取數據塊,并使用 `write()` 寫入目標位置。
該函數返回一個布爾值,表明操作是否成功。如果為 true,則寫入成功,你可以繼續寫入更多數據。 如果返回 false,則表示出了點問題,目前無法寫入任何內容。
可寫流將通過發出 `drain 事件`來通知你何時可以開始寫入更多數據。
調用 `writable.end()` 方法表明沒有更多數據將被寫入 Writable。?如果提供可選的回調函數,將作為 finish 事件的監聽器函數。
```
// 寫入 'hello, ' 然后以 'world!' 結束
const fs = require('fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// 不允許寫更多內容!
```
使用可寫流,你可以從可讀流中讀取數據:
```
const Stream = require('stream')
const readableStream = new Stream.Readable()
const writableStream = new Stream.Writable()
writableStream._write = (chunk, encoding, next) => {
console.log(chunk.toString())
next()
}
readableStream.pipe(writableStream)
readableStream.push('ping!')
readableStream.push('pong!')
writableStream.end()
```
你還可以使用異步迭代器寫入可寫流,這也是建議的做法:
```
import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';
const finished = util.promisify(stream.finished); // (A)
async function writeIterableToFile(iterable, filePath) {
const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
for await (const chunk of iterable) {
if (!writable.write(chunk)) { // (B)
// 處理反壓
await once(writable, 'drain');
}
}
writable.end(); // (C)
// 等待完成,如果有錯誤則拋出
await finished(writable);
}
await writeIterableToFile(
['One', ' line of text.\n'], 'tmp/log.txt');
assert.equal(
fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
'One line of text.\n');
```
`stream.finished()` 的默認版本是基于回調的,但是可以通過 `util.promisify()` 轉換為基于 Promise 的版本(A 行)。
在此示例中,使用了以下兩種模式:
寫入可寫流,同時處理反壓(短時負載高峰導致系統接收數據的速率遠高于它處理數據的速率)(B 行):
> 譯者注:**在數據流從上游生產者向下游消費者傳輸的過程中,上游生產速度大于下游消費速度,導致下游的 Buffer 溢出,這種現象就叫做 Backpressure(背壓;回壓;反向壓力;反壓) 出現**
> [數據流中的積壓問題](https://nodejs.org/zh-cn/docs/guides/backpressuring-in-streams/)
```
if (!writable.write(chunk)) {
await once(writable, 'drain');
}
```
關閉可寫流,并等待寫入完成(C 行):
```
writable.end();
await finished(writable);
```
# 管道
管道是一種機制,是將一個流的輸出作為另一流的輸入。它通常用于從一個流中獲取數據并將該流的輸出傳遞到另外的流。管道操作沒有限制,換句話說,管道用于分步驟處理流數據。
Node 10.x 引入了 `stream.pipeline()`。 這是一種模塊方法,用于在流之間進行管道傳輸,轉發錯誤信息和數據清理,并在管道完成后提供回調。
下面是使用 pipeline 的一個例子:
```
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
// 使用 pipeline API 輕松管理多個管道流,并且在管道全部完成時得到通知
// 一個用來高效壓縮超大視頻文件的管道
pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
(err) => {
if (err) {
console.error('Pipeline failed', err);
} else {
console.log('Pipeline succeeded');
}
}
);
```
應該使用 pipeline?而不是?pipe,因為 pipe 是不安全的。
# 基于流的 Node.js API
由于它們的優點,Node.js 許多核心模塊提供了原生流處理功能,最值得注意的是這些:
| API | 描述 |
| --- | --- |
|?net.Socket | 基于流的主要 node api,是以下大部分 API 的基礎|
|?process.stdin | 返回連接到 stdin 的流|
|?process.stdou | 返回連接到?stdout 的流 |
|?process.stderr | 返回連接到?stderr 的流 |
|?fs.createReadStream() |?創建一個文件可讀流 |
|?fs.createWriteStream() |?創建一個文件可寫流 |
|?net.connect() |?初始化一個基于流的連接 |
|?http.request()? |?返回 http.ClientRequest 類的一個實例,是一個可寫流 |
|?zlib.createGzip() |?用 gzip (一種壓縮算法) 將數據壓縮到流 |
|?zlib.createGunzip() |?解壓 gzip 流 |
|?zlib.createDeflate() |?用 deflate (一種壓縮算法) 將數據壓縮到流 |
|?zlib.createInflate() |?解壓 deflate 流 |
# Streams 備忘單
| 類型 | 功能 |
| --- | --- |
| Readable | 數據提供者 |
| Writable | 數據接收者 |
| Transform | 提供者和接收者 |
| Duplex | 提供者和接收者(獨立的) |
更多內容請查閱文檔:?[Stream](https://nodejs.org/api/stream.html#stream_stream)?(nodejs.org)
Streams
```
const Readable = require('stream').Readable
const Writable = require('stream').Writable
const Transform = require('stream').Transform
```
管道 Piping
```
clock() // 可讀流
.pipe(xformer()) // 轉換流
.pipe(renderer()) // 可寫流
```
方法
```
stream.push(/*...*/) // Emit a chunk
stream.emit('error', error) // Raise an error
stream.push(null) // Close a stream
```
事件
```
const st = source() // 假設 source()?是可讀流
st.on('data', (data) => { console.log('<-', data) })
st.on('error', (err) => { console.log('!', err.message) })
st.on('close', () => { console.log('** bye') })
st.on('finish', () => { console.log('** bye') })
```
Flowing 模式
```
// 開啟和關閉 flowing 模式
st.resume()
st.pause()
// 自動開啟 flowing 模式
st.on('data', /*...*/)
```
可讀流
```
function clock () {
const stream = new Readable({
objectMode: true,
read() {} // 自己實現 read() 方法,如果要按需讀取
})
setInterval(() => {
stream.push({ time: new Date() })
}, 1000)
return stream
}
```
可讀流是數據生成器,用 `stream.push()` 寫入數據。
轉換流
```
function xformer () {
let count = 0
return new Transform({
objectMode: true,
transform: (data, _, done) => {
done(null, { ...data, index: count++ })
}
})
}
```
將轉換后的數據塊傳給?`done(null, chunk)`.
可寫流
```
function renderer () {
return new Writable({
objectMode: true,
write: (data, _, done) => {
console.log('<-', data)
done()
}
})
}
```
全部串起來
```
clock() // 可讀流
.pipe(xformer()) // 轉換流
.pipe(renderer()) // 可寫流
```
以下是與可寫流相關的一些重要事件:
* `error`?– 在寫入 / 管道操作發生了錯誤時發送
* `pipeline`?– 當將可讀流傳遞到可寫流中時,可寫流會發出此事件。
* `unpipe`?– 當你在可讀流上調用 `unpipe` 并停止將其輸送到目標流中時發出。
# 總結
這就是所有關于流的基礎知識。?流、管道和鏈式操作是 Node.js 的核心和最強大的功能。流確實可以幫助你編寫簡潔而高效的代碼來操作 I/O。
此外,還有一個值得期待的 [Node.js 戰略計劃](https://github.com/nodejs/TSC/blob/master/Strategic-Initiatives.md#current-initiatives)叫做 [BOB](https://github.com/Fishrock123/bob),目標是改善 Node.js 的流數據接口,既可應用于 Node.js 內部核心,將來還有希望用于公開 API
# 參考
[簡單了解 node stream](https://www.cnblogs.com/luoxiaoer/p/11846386.html)
[說Node.js做后端開發,stream有必要了解下](https://mp.weixin.qq.com/s/HQmualyyEV4t7lG0PZf8zQ)
[Node.js Stream - 實戰篇](https://zhuanlan.zhihu.com/p/21681134)
[Node Stream-qianniuer毛豆前端團隊](https://mp.weixin.qq.com/s/SmIZE6y7sI2j4GKbVHVrtA)
[【譯】Node.js Stream API:理解與運用](https://mp.weixin.qq.com/s/AirnM9fJdKydCTwdzxbk7w)
[Understanding Streams in Node.js - NodeSource](https://nodesource.com/blog/understanding-streams-in-nodejs/)
[Node.js 中的一股清流:理解 Stream(流)的基本概念](https://juejin.im/post/5de9f4fa6fb9a016323d6f50#heading-1)