# 流模塊基礎
在node中,一共有五種類型的流:readable,writable,transform,duplex以及"classic"
## [](https://github.com/jabez128/stream-handbook#pipe)pipe
無論哪一種流,都會使用`.pipe()`方法來實現輸入和輸出。
`.pipe()`函數很簡單,它僅僅是接受一個源頭`src`并將數據輸出到一個可寫的流`dst`中:
~~~
src.pipe(dst)
~~~
`.pipe(dst)`將會返回`dst`因此你可以鏈式調用多個流:
~~~
a.pipe(b).pipe(c).pipe(d)
~~~
上面的代碼也可以等價為:
~~~
a.pipe(b);
b.pipe(c);
c.pipe(d);
~~~
這和你在unix中編寫流代碼很類似:
~~~
a | b | c | d
~~~
只不過此時你是在node中編寫而不是在shell中!
## [](https://github.com/jabez128/stream-handbook#readable流)readable流
Readable流可以產出數據,你可以將這些數據傳送到一個writable,transform或者duplex流中,只需要調用`pipe()`方法:
~~~
readableStream.pipe(dst)
~~~
### [](https://github.com/jabez128/stream-handbook#創建一個readable流)創建一個readable流
現在我們就來創建一個readable流!
~~~
var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);
rs.pipe(process.stdout);
~~~
下面運行代碼:
~~~
$ node read0.js
beep boop
~~~
在上面的代碼中`rs.push(null)`的作用是告訴`rs`輸出數據應該結束了。
需要注意的一點是我們在將數據輸出到`process.stdout`之前已經將內容推送進readable流`rs`中,但是所有的數據依然是可寫的。
這是因為在你使用`.push()`將數據推進一個readable流中時,一直要到另一個東西來消耗數據之前,數據都會存在一個緩存中。
然而,在更多的情況下,我們想要的是當需要數據時數據才會產生,以此來避免大量的緩存數據。
我們可以通過定義一個`._read`函數來實現按需推送數據:
~~~
var Readable = require('stream').Readable;
var rs = Readable();
var c = 97;
rs._read = function () {
rs.push(String.fromCharCode(c++));
if (c > 'z'.charCodeAt(0)) rs.push(null);
};
rs.pipe(process.stdout);
~~~
代碼的運行結果如下所示:
~~~
$ node read1.js
abcdefghijklmnopqrstuvwxyz
~~~
在這里我們將字母`a`到`z`推進了rs中,但是只有當數據消耗者出現時,數據才會真正實現推送。
`_read`函數也可以獲取一個`size`參數來指明消耗者想要讀取多少比特的數據,但是這個參數是可選的。
需要注意到的是你可以使用`util.inherit()`來繼承一個Readable流。
為了說明只有在數據消耗者出現時,`_read`函數才會被調用,我們可以將上面的代碼簡單的修改一下:
~~~
var Readable = require('stream').Readable;
var rs = Readable();
var c = 97 - 1;
rs._read = function () {
if (c >= 'z'.charCodeAt(0)) return rs.push(null);
setTimeout(function () {
rs.push(String.fromCharCode(++c));
}, 100);
};
rs.pipe(process.stdout);
process.on('exit', function () {
console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);
~~~
運行上面的代碼我們可以發現如果我們只請求5比特的數據,那么`_read`只會運行5次:
~~~
$ node read2.js | head -c5
abcde
_read() called 5 times
~~~
在上面的代碼中,`setTimeout`很重要,因為操作系統需要花費一些時間來發送程序結束信號。
另外,`process.stdout.on('error',fn)`處理器也很重要,因為當`head`不再關心我們的程序輸出時,操作系統將會向我們的進程發送一個`SIGPIPE`信號,此時`process.stdout`將會捕獲到一個`EPIPE`錯誤。
上面這些復雜的部分在和操作系統相關的交互中是必要的,但是如果你直接和node中的流交互的話,則可有可無。
如果你創建了一個readable流,并且想要將任何的值推送到其中的話,確保你在創建流的時候指定了objectMode參數,`Readable({ objectMode: true })`。
### [](https://github.com/jabez128/stream-handbook#消耗一個readable流)消耗一個readable流
大部分時候,將一個readable流直接pipe到另一種類型的流或者使用through或者concat-stream創建的流中,是一件很容易的事情。但是有時我們也會需要直接來消耗一個readable流。
~~~
process.stdin.on('readable', function () {
var buf = process.stdin.read();
console.dir(buf);
});
~~~
代碼運行結果如下所示:
~~~
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js
<Buffer 61 62 63 0a>
<Buffer 64 65 66 0a>
<Buffer 67 68 69 0a>
null
~~~
當數據可用時,`readable`事件將會被觸發,此時你可以調用`.read()`方法來從緩存中獲取這些數據。
當流結束時,`.read()`將返回`null`,因為此時已經沒有更多的字節可以供我們獲取了。
你也可以告訴`.read()`方法來返回`n`個字節的數據。雖然所有核心對象中的流都支持這種方式,但是對于對象流來說這種方法并不可用。
下面是一個例子,在這里我們制定每次讀取3個字節的數據:
~~~
process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
});
~~~
運行上面的例子,我們將獲取到不完整的數據:
~~~
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>
~~~
這是因為多余的數據都留在了內部的緩存中,因此這個時候我們需要告訴node我們還對剩下的數據感興趣,我們可以使用`.read(0)`來完成這件事:
~~~
process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
process.stdin.read(0);
});
~~~
到現在為止我們的代碼和我們所期望的一樣了!
~~~
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>
<Buffer 68 69 0a>
~~~
我們也可以使用`.unshift()`方法來放置多余的數據。
使用`unshift()`方法能夠放置我們進行不必要的緩存拷貝。在下面的代碼中我們將創建一個分割新行的可讀解析器:
~~~
var offset = 0;
process.stdin.on('readable', function () {
var buf = process.stdin.read();
if (!buf) return;
for (; offset < buf.length; offset++) {
if (buf[offset] === 0x0a) {
console.dir(buf.slice(0, offset).toString());
buf = buf.slice(offset + 1);
offset = 0;
process.stdin.unshift(buf);
return;
}
}
process.stdin.unshift(buf);
});
~~~
代碼的運行結果如下所示:
~~~
$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'
~~~
當然,已經有很多這樣的模塊比如split來幫助你完成這件事情,你完全不需要自己寫一個。
## [](https://github.com/jabez128/stream-handbook#writable流)writable流
一個writable流指的是只能流進不能流出的流:
~~~
src.pipe(writableStream)
~~~
### [](https://github.com/jabez128/stream-handbook#創建一個writable流)創建一個writable流
只需要定義一個`._write(chunk,enc,next)`函數,你就可以將一個readable流的數據釋放到其中:
~~~
var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
console.dir(chunk);
next();
};
process.stdin.pipe(ws);
~~~
代碼運行結果如下所示:
~~~
$ (echo beep; sleep 1; echo boop) | node write0.js
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
~~~
第一個參數,`chunk`代表寫進來的數據。
第二個參數`enc`代表編碼的字符串,但是只有在`opts.decodeString`為`false`的時候你才可以寫一個字符串。
第三個參數,`next(err)`是一個回調函數,使用這個回調函數你可以告訴數據消耗者可以寫更多的數據。你可以有選擇性的傳遞一個錯誤對象`error`,這時會在流實體上觸發一個`emit`事件。
在從一個readable流向一個writable流傳數據的過程中,數據會自動被轉換為`Buffer`對象,除非你在創建writable流的時候制定了`decodeStrings`參數為`false`,`Writable({decodeStrings: false})`。
如果你需要傳遞對象,需要指定`objectMode`參數為`true`,`Writable({ objectMode: true })`。
### [](https://github.com/jabez128/stream-handbook#向一個writable流中寫東西)向一個writable流中寫東西
如果你需要向一個writable流中寫東西,只需要調用`.write(data)`即可。
~~~
process.stdout.write('beep boop\n');
~~~
為了告訴一個writable流你已經寫完畢了,只需要調用`.end()`方法。你也可以使用`.end(data)`在結束前再寫一些數據。
~~~
var fs = require('fs');
var ws = fs.createWriteStream('message.txt');
ws.write('beep ');
setTimeout(function () {
ws.end('boop\n');
}, 1000);
~~~
運行結果如下所示:
~~~
$ node writing1.js
$ cat message.txt
beep boop
~~~
如果你在創建writable流時指定了`highWaterMark`參數,那么當沒有更多數據寫入時,調用`.write()`方法將會返回false。
如果你想要等待緩存情況,可以監聽`drain`事件。
## [](https://github.com/jabez128/stream-handbook#transform流)transform流
你可以將transform流想象成一個流的中間部分,它可以讀也可寫,但是并不保存數據,它只負責處理流經它的數據。
## [](https://github.com/jabez128/stream-handbook#duplex流)duplex流
Duplex流是一個可讀也可寫的流,就好像一個電話,可以接收也可以發送語音。一個rpc交換是一個duplex流的最好的例子。如果你看到過下面這樣的代碼:
~~~
a.pipe(b).pipe(a)
~~~
那么你需要處理的就是一個duplex流對象。
## [](https://github.com/jabez128/stream-handbook#classic流)classic流
Classic流是一個古老的接口,最早出現在node 0.4中。雖然現在不怎么用,但是我們最好還是來了解一下它的工作原理。
無論何時,只要一個流對象注冊了一個`data`監聽器,它就會自動的切換到`classic`模式,并且根據舊API的方式運行。
### [](https://github.com/jabez128/stream-handbook#classic-readable流)classic readable流
Classic readable流只是一個事件發射器,當有數據消耗者出現時發射`emit`事件,當輸出數據完畢時發射`end`事件。
我們可以同構檢查`stream.readable`來檢查一個classic流對象是否可讀。
下面是一個簡單的readable流對象的例子,程序的運行結果將會輸出`A`到`J`:
~~~
var Stream = require('stream');
var stream = new Stream;
stream.readable = true;
var c = 64;
var iv = setInterval(function () {
if (++c >= 75) {
clearInterval(iv);
stream.emit('end');
}
else stream.emit('data', String.fromCharCode(c));
}, 100);
stream.pipe(process.stdout);
~~~
運行結果如下所示:
~~~
$ node classic0.js
ABCDEFGHIJ
~~~
為了從一個classic readable流中讀取數據,你可以注冊`data`和`end`監聽器。下面是一個使用舊readable流方式從`process.stdin`中讀取數據的例子:
~~~
process.stdin.on('data', function (buf) {
console.log(buf);
});
process.stdin.on('end', function () {
console.log('__END__');
});
~~~
運行結果如下所示:
~~~
$ (echo beep; sleep 1; echo boop) | node classic1.js
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__
~~~
需要注意的一點是當你在一個流對象上注冊了一個`data`監聽器,你就將這個流放在了兼容模式下,此時你不能使用兩個stream2的api。
如果你自己創建流對象,永遠不要綁定`data`和`end`監聽器。如果你需要和舊版本的流兼容,最好使用第三方庫來實現`.pipe()`方法。
例如,你可以使用through模塊來避免顯式的使用`data`和`end`監聽器:
~~~
var through = require('through');
process.stdin.pipe(through(write, end));
function write (buf) {
console.log(buf);
}
function end () {
console.log('__END__');
}
~~~
程序運行結果如下所示:
~~~
$ (echo beep; sleep 1; echo boop) | node through.js
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__
~~~
你也可以使用concat-stream模塊來將整個流的內容緩存起來:
~~~
var concat = require('concat-stream');
process.stdin.pipe(concat(function (body) {
console.log(JSON.parse(body));
}));
~~~
程序運行結果如下所示:
~~~
$ echo '{"beep":"boop"}' | node concat.js
{ beep: 'boop' }
~~~
Classic readable流擁有`.pause()`和`.resume()`邏輯來暫停一個流,但是這都是可選的。如果你想要使用`.pause()`和`.resume()`方法,你應該使用through模塊來幫助你處理緩存。
### [](https://github.com/jabez128/stream-handbook#classic-writable流)classic writable流
Classic writable流非常簡單。其中只定義了`.write(buf)`,`.end(buf)`,以及`.desctory()`方法。其中`.end(buf)`的參數buf是可選參數,但是一般來說node程序員還是喜歡使用`.end(buf)`這種寫法。