# 流
~~~
穩定度: 2 - 不穩定
~~~
流是一個抽象接口,被 Node 中的很多對象所實現。比如[對一個 HTTP 服務器的請求](#)是一個流,[stdout](#) 也是一個流。流是可讀、可寫或兼具兩者的。所有流都是 [EventEmitter](#) 的實例。
您可以通過 `require('stream')` 加載 Stream 基類,其中包括了 [Readable](#) 流、[Writable](#) 流、[Duplex](#) 流和 [Transform](#) 流的基類。
本文檔分為三個章節。第一章節解釋了您在您的程序中使用流時需要了解的那部分 API,如果您不打算自己實現一個流式 API,您可以只閱讀這一章節。
第二章節解釋了當您自己實現一個流時需要用到的那部分 API,這些 API 是為了方便您這么做而設計的。
第三章節深入講解了流的工作方式,包括一些內部機制和函數,除非您明確知道您在做什么,否則盡量不要改動它們。
### 面向流消費者的 API
流可以是可讀([Readable](#))或可寫([Writable](#)),或者兼具兩者([Duplex](#),雙工)的。
所有流都是 EventEmitter,但它們也具有其它自定義方法和屬性,取決于它們是 Readable、Writable 或 Duplex。
如果一個流既可讀(Readable)也可寫(Writable),則它實現了下文所述的所有方法和事件。因此,這些 API 同時也涵蓋了 [Duplex](#) 或 [Transform](#) 流,即便它們的實現可能有點不同。
為了消費流而在您的程序中自己實現 Stream 接口是沒有必要的。如果您**確實**正在您自己的程序中實現流式接口,請同時參考下文[面向流實現者的 API](#)。
幾乎所有 Node 程序,無論多簡單,都在某種途徑用到了流。這里有一個使用流的 Node 程序的例子:
~~~
var http = require('http');
<!-- endsection -->
<!-- section:5dd53fb86ef5aa2fb0a6e831e46cc135 -->
var server = http.createServer(function (req, res) {
// req 為 http.IncomingMessage,是一個可讀流(Readable Stream)
// res 為 http.ServerResponse,是一個可寫流(Writable Stream)
<!-- endsection -->
<!-- section:fd5e086becb475ded97300c6e8b1f889 -->
var body = '';
// 我們打算以 UTF-8 字符串的形式獲取數據
// 如果您不設置編碼,您將得到一個 Buffer 對象
req.setEncoding('utf8');
<!-- endsection -->
<!-- section:bb5a4bf69e5c71de2331fe85918ed96b -->
// 一旦監聽器被添加,可讀流會觸發 'data' 事件
req.on('data', function (chunk) {
body += chunk;
})
<!-- endsection -->
<!-- section:5768f3afd395c860ba272f79026a6799 -->
// 'end' 事件表明您已經得到了完整的 body
req.on('end', function () {
try {
var data = JSON.parse(body);
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end('錯誤: ' + er.message);
}
<!-- endsection -->
<!-- section:812496c72ef4682c63a7ba8837f9610a -->
// 向用戶回寫一些有趣的信息
res.write(typeof data);
res.end();
})
})
<!-- endsection -->
<!-- section:3bbc30d951532659ecc70a505ea1e985 -->
server.listen(1337);
<!-- endsection -->
<!-- section:f0dea661693acf21ed203ec804a4f05a -->
// $ curl localhost:1337 -d '{}'
// object
// $ curl localhost:1337 -d '"foo"'
// string
// $ curl localhost:1337 -d 'not json'
// 錯誤: Unexpected token o
~~~
### 類: stream.Readable
Readable(可讀)流接口是對您正在讀取的數據的*來源*的抽象。換言之,數據*出自*一個 Readable 流。
在您表明您就緒接收之前,Readable 流并不會開始發生數據。
Readable 流有兩種“模式”:**流動模式**和**暫停模式**。當處于流動模式時,數據由底層系統讀出,并盡可能快地提供給您的程序;當處于暫停模式時,您必須明確地調用 `stream.read()` 來取出若干數據塊。流默認處于暫停模式。
**注意**:如果沒有綁定 data 事件處理器,并且沒有 [`pipe()`](#) 目標,同時流被切換到流動模式,那么數據會流失。
您可以通過下面幾種做法切換到流動模式:
- 添加一個 [`'data'` 事件](#)處理器來監聽數據。
- 調用 [`resume()`](#) 方法來明確開啟數據流。
- 調用 [`pipe()`](#) 方法將數據發送到一個 [Writable](#)。
您可以通過下面其中一種做法切換回暫停模式:
- 如果沒有導流目標,調用 [`pause()`](#) 方法。
- 如果有導流目標,移除所有 [`'data'` 事件][] 處理器、調用 [`unpipe()`](#) 方法移除所有導流目標。
請注意,為了向后兼容考慮,移除 `'data'` 事件監聽器并**不會**自動暫停流。同樣的,當有導流目標時,調用 `pause()` 并不能保證流在那些目標排空并請求更多數據時*維持*暫停狀態。
一些可讀流的例子:
- [客戶端上的 HTTP 響應](#)
- [服務器上的 HTTP 請求](#)
- [fs 讀取流](#)
- [zlib 流](#)
- [crypto 流](#)
- [TCP 嵌套字](#)
- [子進程的 stdout 和 stderr](#)
- [process.stdin](#)
#### 事件: 'readable'
當一個數據塊可以從流中被讀出時,它會觸發一個 `'readable'` 事件。
在某些情況下,假如未準備好,監聽一個 `'readable'` 事件會使得一些數據從底層系統被讀出到內部緩沖區中。
~~~
var readable = getReadableStreamSomehow();
readable.on('readable', function() {
// 現在有數據可以讀了
})
~~~
當內部緩沖區被排空后,一旦更多數據時,一個 `readable` 事件會被再次觸發。
#### 事件: 'data'
- `chunk` {Buffer | String} 數據塊。
綁定一個 `data` 事件監聽器到一個未被明確暫停的流會將流切換到流動模式,數據會被盡可能地傳遞。
如果您想從流盡快取出所有數據,這是最理想的方式。
~~~
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('得到了 %d 字節的數據', chunk.length);
})
~~~
#### 事件: 'end'
該事件會在沒有更多數據能夠提供時被觸發。
請注意,`end` 事件在數據被完全消費之前**不會被觸發**。這可通過切換到流動模式,或者在到達末端前不斷調用 `read()` 來實現。
~~~
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('得到了 %d 字節的數據', chunk.length);
})
readable.on('end', function() {
console.log('讀取完畢。');
});
~~~
#### 事件: 'close'
當底層數據源(比如,源頭的文件描述符)被關閉時觸發。并不是所有流都會觸發這個事件。
#### 事件: 'error'
當數據接收時發生錯誤時觸發。
#### readable.read([size])
- `size` {Number} 可選參數,指定要讀取多少數據。
- 返回 {String | Buffer | null}
`read()` 方法從內部緩沖區中拉取并返回若干數據。當沒有更多數據可用時,它會返回 `null`。
若您傳入了一個 `size` 參數,那么它會返回相當字節的數據;當 `size` 字節不可用時,它則返回 `null`。
若您沒有指定 `size` 參數,那么它會返回內部緩沖區中的所有數據。
該方法僅應在暫停模式時被調用。在流動模式中,該方法會被自動調用直到內部緩沖區排空。
~~~
var readable = getReadableStreamSomehow();
readable.on('readable', function() {
var chunk;
while (null !== (chunk = readable.read())) {
console.log('得到了 %d 字節的數據', chunk.length);
}
});
~~~
當該方法返回了一個數據塊,它同時也會觸發 [`'data'` 事件](#)。
#### readable.setEncoding(encoding)
- `encoding` {String} 要使用的編碼。
- 返回: `this`
調用此函數會使得流返回指定編碼的字符串而不是 Buffer 對象。比如,當您 `readable.setEncoding('utf8')`,那么輸出數據會被作為 UTF-8 數據解析,并以字符串返回。如果您 `readable.setEncoding('hex')`,那么數據會被編碼成十六進制字符串格式。
該方法能正確處理多字節字符。假如您不這么做,僅僅直接取出 Buffer 并對它們調用 `buf.toString(encoding)`,很可能會導致字節錯位。因此如果您打算以字符串讀取數據,請總是使用這個方法。
~~~
var readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', function(chunk) {
assert.equal(typeof chunk, 'string');
console.log('得到了 %d 個字符的字符串數據', chunk.length);
})
~~~
#### readable.resume()
- 返回: `this`
該方法讓可讀流繼續觸發 `data` 事件。
該方法會將流切換到流動模式。如果您*不想*從流中消費數據,但您*想*得到它的 `end` 事件,您可以調用 [`readable.resume()`](#) 來啟動數據流。
~~~
var readable = getReadableStreamSomehow();
readable.resume();
readable.on('end', function(chunk) {
console.log('到達末端,但并未讀取任何東西');
})
~~~
#### readable.pause()
- 返回: `this`
該方法會使一個處于流動模式的流停止觸發 `data` 事件,切換到非流動模式,并讓后續可用數據留在內部緩沖區中。
~~~
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('取得 %d 字節數據', chunk.length);
readable.pause();
console.log('接下來 1 秒內不會有數據');
setTimeout(function() {
console.log('現在數據會再次開始流動');
readable.resume();
}, 1000);
})
~~~
#### readable.pipe(destination, [options])
- `destination` {[Writable](#) Stream} 寫入數據的目標
- `options` {Object} 導流選項
- `end` {Boolean} 在讀取者結束時結束寫入者。缺省為 `true`
該方法從可讀流中拉取所有數據,并寫入到所提供的目標。該方法能自動控制流量以避免目標被快速讀取的可讀流所淹沒。
可以導流到多個目標。
~~~
var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// 所有來自 readable 的數據會被寫入到 'file.txt'
readable.pipe(writable);
~~~
該函數返回目標流,因此您可以建立導流鏈:
~~~
var r = fs.createReadStream('file.txt');
var z = zlib.createGzip();
var w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
~~~
例如,模擬 Unix 的 `cat` 命令:
~~~
process.stdin.pipe(process.stdout);
~~~
缺省情況下當來源流觸發 `end` 時目標的 [`end()`](#) 會被調用,所以此時 `destination` 不再可寫。傳入 `{ end: false }` 作為 `options` 可以讓目標流保持開啟狀態。
這將讓 `writer` 保持開啟,因此最后可以寫入 "Goodbye"。
~~~
reader.pipe(writer, { end: false });
reader.on('end', function() {
writer.end('Goodbye\n');
});
~~~
請注意 `process.stderr` 和 `process.stdout` 在進程結束前都不會被關閉,無論是否指定選項。
#### readable.unpipe([destination])
- `destination` {[Writable](#) Stream} 可選,指定解除導流的流
該方法會移除之前調用 `pipe()` 所設定的鉤子。
如果不指定目標,所有導流都會被移除。
如果指定了目標,但并沒有與之建立導流,則什么事都不會發生。
~~~
var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// 來自 readable 的所有數據都會被寫入 'file.txt',
// 但僅發生在第 1 秒
readable.pipe(writable);
setTimeout(function() {
console.log('停止寫入到 file.txt');
readable.unpipe(writable);
console.log('自行關閉文件流');
writable.end();
}, 1000);
~~~
#### readable.unshift(chunk)
- `chunk` {Buffer | String} 要插回讀取隊列開頭的數據塊
該方法在許多場景中都很有用,比如一個流正在被一個解析器消費,解析器可能需要將某些剛拉取出的數據“逆消費”回來源,以便流能將它傳遞給其它消費者。
如果您發現您需要在您的程序中頻繁調用 `stream.unshift(chunk)`,請考慮實現一個 [Transform](#) 流。(詳見下文面向流實現者的 API。)
~~~
// 取出以 \n\n 分割的頭部并將多余部分 unshift() 回去
// callback 以 (error, header, stream) 形式調用
var StringDecoder = require('string_decoder').StringDecoder;
function parseHeader(stream, callback) {
stream.on('error', callback);
stream.on('readable', onReadable);
var decoder = new StringDecoder('utf8');
var header = '';
function onReadable() {
var chunk;
while (null !== (chunk = stream.read())) {
var str = decoder.write(chunk);
if (str.match(/\n\n/)) {
// 找到頭部邊界
var split = str.split(/\n\n/);
header += split.shift();
var remaining = split.join('\n\n');
var buf = new Buffer(remaining, 'utf8');
if (buf.length)
stream.unshift(buf);
stream.removeListener('error', callback);
stream.removeListener('readable', onReadable);
// 現在可以從流中讀取消息的主體了
callback(null, header, stream);
} else {
// 仍在讀取頭部
header += str;
}
}
}
}
~~~
#### readable.wrap(stream)
- `stream` {Stream} 一個“舊式”可讀流
Node v0.10 版本之前的流并未實現現今所有流 API。(更多信息詳見下文“兼容性”章節。)
如果您正在使用早前版本的 Node 庫,它觸發 `'data'` 事件并且有一個