<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                [TOC] ## Stream是什么? Unix操作系統從很早以前,就有Stream(流)這個概念,它是不同進程之間傳遞數據的一種方式。管道命令Pipe就起到在不同命令之間,連接Stream的作用。 Stream把較大的數據,拆成很小的部分。只要命令部署了Stream接口,就可以把一個流的輸出接到另一個流的輸入。Node引入了這個概念,通過Stream為異步讀寫數據提供的統一接口。無論是硬盤數據、網絡數據,還是內存數據,都可以采用這個接口讀寫。 讀寫數據有兩種方式。一種方式是同步處理,即先將數據全部讀入內存,然后處理。它的優點是符合直覺,流程非常自然,缺點是如果遇到大文件,要花很長時間,可能要過很久才能進入數據處理的步驟。另一種方式就是Stream方式,它是系統讀取外部數據實際上的方式,即每次只讀入數據的一小塊,像“流水”一樣。所以,Stream方式就是每當系統讀入了一小塊數據,就會觸發一個事件,發出“新數據塊”的信號,只要監聽這個事件,就能掌握進展,做出相應處理,這樣就提高了程序的性能。 Stream接口最大特點就是通過事件通信,具有readable、writable、drain、data、end、close等事件,既可以讀取數據,也可以寫入數據。讀寫數據時,每讀入(或寫入)一段數據,就會觸發一次data事件,全部讀取(或寫入)完畢,觸發end事件。如果發生錯誤,則觸發error事件。 一個對象只要部署了Stream接口,就可以從讀取數據,或者寫入數據。Node內部很多涉及IO處理的對象,都部署了Stream接口,比如HTTP連接、文件讀寫、標準輸入輸出等。 ## 基本用法 Node的I/O操作都是異步的,所以與磁盤和網絡的交互,都要通過回調函數。一個典型的寫文件操作,可能像下面這樣。 ~~~ var http = require('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { fs.readFile(__dirname + '/data.txt', function (err, data) { res.end(data); }); }); server.listen(8000); ~~~ 上面的代碼有一個問題,那就是它必須將整個data.txt文件讀入內存,然后再輸入。如果data.txt非常大,就會占用大量的內容。一旦有多個并發請求,操作就會變得非常緩慢,用戶不得不等很久,才能得到結果。 由于參數req和res都部署了Stream接口,可以使用`fs.createReadStream()`替代`fs.readFile()`,就能解決這個問題。 ~~~ var http = require('http'); var fs = require('fs'); var server = http.createServer(function (req, res) { var stream = fs.createReadStream(__dirname + '/data.txt'); stream.pipe(res); }); server.listen(8000); ~~~ Stream接口的最大特點,就是數據會發出node和data事件,內置的pipe方法會處理這兩個事件。 數據流通過pipe方法,可以方便地導向其他具有Stream接口的對象。 ~~~ var fs = require('fs'); var zlib = require('zlib'); fs.createReadStream('wow.txt') .pipe(zlib.createGzip()) .pipe(process.stdout); ~~~ 上面代碼先打開文本文件wow.txt,然后壓縮,再導向標準輸出。 ~~~ fs.createReadStream('wow.txt') .pipe(zlib.createGzip()) .pipe(fs.createWriteStream('wow.gz')); ~~~ 上面代碼壓縮文件wow.txt以后,又將其寫回壓縮文件。 下面代碼新建一個Stream實例,然后指定寫入事件和終止事件的回調函數,再將其接到標準輸入之上。 ~~~ var stream = require('stream'); var Stream = stream.Stream; var ws = new Stream; ws.writable = true; ws.write = function(data) { console.log("input=" + data); } ws.end = function(data) { console.log("bye"); } process.stdin.pipe(ws); ~~~ 調用上面的腳本,會產生以下結果。 ~~~ $ node pipe_out.js hello input=hello ^d bye ~~~ 上面代碼調用腳本下,鍵入hello,會輸出`input=hello`。然后按下ctrl-d,會輸出bye。使用管道命令,可以看得更清楚。 ~~~ $ echo hello | node pipe_out.js input=hello bye ~~~ Stream接口分成三類。 * 可讀數據流接口,用于讀取數據。 * 可寫數據流接口,用于寫入數據。 * 雙向數據流接口,用于讀取和寫入數據,比如Node的tcp sockets、zlib、crypto都部署了這個接口。 ## 可讀數據流 “可讀數據流”表示數據的來源,只要一個對象提供“可讀數據流”,就表示你可以從其中讀取數據。 “可讀數據流”有兩種狀態:流動態和暫停態。處于流動態時,數據會盡快地從數據源導向用戶的程序;處于暫停態時,必須顯式調用`stream.read()`等指令,“可讀數據流”才會釋放數據。剛剛新建的時候,“可讀數據流”處于暫停態。 三種方法可以讓暫停態轉為流動態。 * 添加data事件的監聽函數 * 調用resume方法 * 調用pipe方法將數據送往一個可寫數據流 如果轉為流動態時,沒有data事件的監聽函數,也沒有pipe方法的目的地,那么數據將遺失。 以下兩種方法可以讓流動態轉為暫停態。 * 不存在pipe方法的目的地時,調用pause方法 * 存在pipe方法的目的地時,移除所有data事件的監聽函數,并且調用unpipe方法,移除所有pipe方法的目的地 注意,只移除data事件的監聽函數,并不會自動引發數據流進入“暫停態”。另外,存在pipe方法的目的地時,調用pause方法,并不能保證數據流總是處于暫停態,一旦那些目的地發出數據請求,數據流有可能會繼續提供數據。 每當系統有新的數據,該接口可以監聽到data事件,從而回調函數。 ~~~ var fs = require('fs'); var readableStream = fs.createReadStream('file.txt'); var data = ''; readableStream.setEncoding('utf8'); readableStream.on('data', function(chunk) { data+=chunk; }); readableStream.on('end', function() { console.log(data); }); ~~~ 上面代碼中,fs模塊的createReadStream方法,是部署了Stream接口的文件讀取方法。該方法對指定的文件,返回一個對象。該對象只要監聽data事件,回調函數就能讀到數據。 除了data事件,監聽readable事件,也可以讀到數據。 ~~~ var fs = require('fs'); var readableStream = fs.createReadStream('file.txt'); var data = ''; var chunk; readableStream.setEncoding('utf8'); readableStream.on('readable', function() { while ((chunk=readableStream.read()) !== null) { data += chunk; } }); readableStream.on('end', function() { console.log(data) }); ~~~ readable事件表示系統緩沖之中有可讀的數據,使用read方法去讀出數據。如果沒有數據可讀,read方法會返回null。 “可讀數據流”除了read方法,還有以下方法。 * Readable.pause() :暫停數據流。已經存在的數據,也不再觸發data事件,數據將保留在緩存之中,此時的數據流稱為靜態數據流。如果對靜態數據流再次調用pause方法,數據流將重新開始流動,但是緩存中現有的數據,不會再觸發data事件。 * Readable.resume():恢復暫停的數據流。 * readable.unpipe():從管道中移除目的地數據流。如果該方法使用時帶有參數,會阻止“可讀數據流”進入某個特定的目的地數據流。如果使用時不帶有參數,則會移除所有的目的地數據流。 ### read() read方法從系統緩存讀取并返回數據。如果讀不到數據,則返回null。 該方法可以接受一個整數作為參數,表示所要讀取數據的數量,然后會返回該數量的數據。如果讀不到足夠數量的數據,返回null。如果不提供這個參數,默認返回系統緩存之中的所有數據。 只在“暫停態”時,該方法才有必要手動調用。“流動態”時,該方法是自動調用的,直到系統緩存之中的數據被讀光。 ~~~ var readable = getReadableStreamSomehow(); readable.on('readable', function() { var chunk; while (null !== (chunk = readable.read())) { console.log('got %d bytes of data', chunk.length); } }); ~~~ 如果該方法返回一個數據塊,那么它就觸發了data事件。 ### setEncoding() 調用該方法,會使得數據流返回指定編碼的字符串,而不是緩存之中的二進制對象。比如,調用`setEncoding('utf8')`,數據流會返回UTF-8字符串,調用`setEncoding('hex')`,數據流會返回16進制的字符串。 該方法會正確處理多字節的字符,而緩存的方法`buf.toString(encoding)`不會。所以如果想要從數據流讀取字符串,應該總是使用該方法。 ~~~ var readable = getReadableStreamSomehow(); readable.setEncoding('utf8'); readable.on('data', function(chunk) { assert.equal(typeof chunk, 'string'); console.log('got %d characters of string data', chunk.length); }); ~~~ ### resume() resume方法會使得“可讀數據流”繼續釋放data事件,即轉為流動態。 ~~~ var readable = getReadableStreamSomehow(); readable.resume(); readable.on('end', function(chunk) { console.log('數據流到達尾部,未讀取任務數據'); }); ~~~ 上面代碼中,調用resume方法使得數據流進入流動態,只定義end事件的監聽函數,不定義data事件的監聽函數,表示不從數據流讀取任何數據,只監聽數據流到達尾部。 ### pause() pause方法使得流動態的數據流,停止釋放data事件,轉而進入暫停態。任何此時已經可以讀到的數據,都將停留在系統緩存。 ~~~ var readable = getReadableStreamSomehow(); readable.on('data', function(chunk) { console.log('讀取%d字節的數據', chunk.length); readable.pause(); console.log('接下來的1秒內不讀取數據'); setTimeout(function() { console.log('數據恢復讀取'); readable.resume(); }, 1000); }); ~~~ ### isPaused() 該方法返回一個布爾值,表示“可讀數據流”被客戶端手動暫停(即調用了pause方法),目前還沒有調用resume方法。 ~~~ var readable = new stream.Readable readable.isPaused() // === false readable.pause() readable.isPaused() // === true readable.resume() readable.isPaused() // === false ~~~ ### pipe() pipe方法是自動傳送數據的機制,就像管道一樣。它從“可讀數據流”讀出所有數據,將其寫出指定的目的地。整個過程是自動的。 ~~~ var fs = require('fs'); var readableStream = fs.createReadStream('file1.txt'); var writableStream = fs.createWriteStream('file2.txt'); readableStream.pipe(writableStream); ~~~ 上面代碼使用pipe方法,將file1的內容寫入file2。整個過程由pipe方法管理,不用手動干預,所以可以將傳送數據寫得很簡潔。 pipe方法返回目的地的數據流,因此可以使用鏈式寫法,將多個數據流操作連在一起。 ~~~ var fs = require('fs'); var zlib = require('zlib'); fs.createReadStream('input.txt.gz') .pipe(zlib.createGunzip()) .pipe(fs.createWriteStream('output.txt')); ~~~ 上面代碼采用鏈式寫法,先讀取文件,然后進行壓縮,最后輸出。 下面的寫法模擬了Unix系統的cat命令,將標準輸出寫入標準輸入。 ~~~ process.stdin.pipe(process.stdout); ~~~ 當來源地的數據流讀取完成,默認會調用目的地的end方法,就不再能夠寫入。對pipe方法傳入第二個參數`{ end: false }`,可以讓目的地的數據流保持打開。 ~~~ reader.pipe(writer, { end: false }); reader.on('end', function() { writer.end('Goodbye\n'); }); ~~~ 上面代碼中,目的地數據流默認不會調用end方法,只能手動調用,因此“Goodbye”會被寫入。 ### unpipe() 該方法移除pipe方法指定的數據流目的地。如果沒有參數,則移除所有的pipe方法目的地。如果有參數,則移除該參數指定的目的地。如果沒有匹配參數的目的地,則不會產生任何效果。 ~~~ var readable = getReadableStreamSomehow(); var writable = fs.createWriteStream('file.txt'); readable.pipe(writable); setTimeout(function() { console.log('停止寫入file.txt'); readable.unpipe(writable); console.log('手動關閉file.txt的寫入數據流'); writable.end(); }, 1000); ~~~ 上面代碼寫入file.txt的時間,只有1秒鐘,然后就停止寫入。 ### 事件 (1)readable readable事件在數據流能夠向外提供數據時觸發。 ~~~ var readable = getReadableStreamSomehow(); readable.on('readable', function() { // there is some data to read now }); ~~~ (2)data 對于那些沒有顯式暫停的數據流,添加data事件監聽函數,會將數據流切換到流動態,盡快向外提供數據。 ~~~ var readable = getReadableStreamSomehow(); readable.on('data', function(chunk) { console.log('got %d bytes of data', chunk.length); }); ~~~ (3)end 無法再讀取到數據時,會觸發end事件。也就是說,只有當前數據被完全讀取完,才會觸發end事件,比如不停地調用read方法。 ~~~ var readable = getReadableStreamSomehow(); readable.on('data', function(chunk) { console.log('got %d bytes of data', chunk.length); }); readable.on('end', function() { console.log('there will be no more data.'); }); ~~~ (4)close 數據源關閉時,close事件被觸發。并不是所有的數據流都支持這個事件。 (5)error 當讀取數據發生錯誤時,error事件被觸發。 ## 可寫數據流 “可寫數據流”允許你將數據寫入某個目的地。它是數據寫入的一種抽象,不同的數據目的地部署了這個接口以后,就可以用統一的方法寫入。 以下是部署了可寫數據流的一些場合。 * 客戶端的http requests * 服務器的http responses * fs write streams * zlib streams * crypto streams * tcp sockets * child process stdin * process.stdout, process.stderr 下面是fs模塊的可寫數據流的例子。 ~~~ var fs = require('fs'); var readableStream = fs.createReadStream('file1.txt'); var writableStream = fs.createWriteStream('file2.txt'); readableStream.setEncoding('utf8'); readableStream.on('data', function(chunk) { writableStream.write(chunk); }); ~~~ 上面代碼中,fs模塊的createWriteStream方法針對特定文件,創建了一個“可寫數據流”,本質上就是對寫入操作部署了Stream接口。然后,“可寫數據流”的write方法,可以將數據寫入文件。 ### write() write方法用于向“可寫數據流”寫入數據。它接受兩個參數,一個是寫入的內容,可以是字符串,也可以是一個stream對象(比如可讀數據流),另一個是寫入完成后的回調函數。 它返回一個布爾值,表示本次數據是否處理完成。如果返回true,就表示可以寫入新的數據了。如果等待寫入的數據被緩存了,就返回false。不過,在返回false的情況下,也可以繼續傳入新的數據等待寫入。只是這時,新的數據不會真的寫入,只會緩存在內存中。為了避免內存消耗,比較好的做法還是等待該方法返回true,然后再寫入。 ### cork(),uncork() cork方法可以強制等待寫入的數據進入緩存。當調用uncork方法或end方法時,緩存的數據就會吐出。 ### setDefaultEncoding() setDefaultEncoding方法用于將寫入的數據編碼成新的格式。它返回一個布爾值,表示編碼是否成功,如果返回false就表示編碼失敗。 ### end() end方法用于終止“可寫數據流”。該方法可以接受三個參數,全部都是可選參數。第一個參數是最后所要寫入的數據,可以是字符串,也可以是stream對象;第二個參數是寫入編碼;第三個參數是一個回調函數,finish事件觸發時,會調用這個回調函數。 ~~~ var file = fs.createWriteStream('example.txt'); file.write('hello, '); file.end('world!'); ~~~ 上面代碼會在數據寫入結束時,在尾部寫入“world!”。 調用end方法之后,再寫入數據會報錯。 ~~~ var file = fs.createWriteStream('example.txt'); file.end('world!'); file.write('hello, '); // 報錯 ~~~ ### 事件 (1)drain事件 `writable.write(chunk)`返回false以后,當緩存數據全部寫入完成,可以繼續寫入時,會觸發drain事件。 ~~~ function writeOneMillionTimes(writer, data, encoding, callback) { var i = 1000000; write(); function write() { var ok = true; do { i -= 1; if (i === 0) { writer.write(data, encoding, callback); } else { ok = writer.write(data, encoding); } } while (i > 0 && ok); if (i > 0) { writer.once('drain', write); } } } ~~~ 上面代碼是一個寫入100萬次的例子,通過drain事件得到可以繼續寫入的通知。 (2)finish事件 調用end方法時,所有緩存的數據釋放,觸發finish事件。該事件的回調函數沒有參數。 ~~~ var writer = getWritableStreamSomehow(); for (var i = 0; i < 100; i ++) { writer.write('hello, #' + i + '!\n'); } writer.end('this is the end\n'); writer.on('finish', function() { console.error('all writes are now complete.'); }); ~~~ (3)pipe事件 “可寫數據流”調用pipe方法,將數據流導向寫入目的地時,觸發該事件。 該事件的回調函數,接受發出該事件的“可讀數據流”對象作為參數。 ~~~ var writer = getWritableStreamSomehow(); var reader = getReadableStreamSomehow(); writer.on('pipe', function(src) { console.error('something is piping into the writer'); assert.equal(src, reader); }); reader.pipe(writer); ~~~ (4)unpipe事件 “可讀數據流”調用unpipe方法,將可寫數據流移出寫入目的地時,觸發該事件。 該事件的回調函數,接受發出該事件的“可讀數據流”對象作為參數。 ~~~ var writer = getWritableStreamSomehow(); var reader = getReadableStreamSomehow(); writer.on('unpipe', function(src) { console.error('something has stopped piping into the writer'); assert.equal(src, reader); }); reader.pipe(writer); reader.unpipe(writer); ~~~ (5)error事件 如果寫入數據或pipe數據時發生錯誤,就會觸發該事件。 該事件的回調函數,接受一個Error對象作為參數。 ## HTTP請求 HTTP對象使用Stream接口,實現網絡數據的讀寫。 ~~~ var http = require('http'); var server = http.createServer(function (req, res) { // req is an http.IncomingMessage, which is a Readable Stream // res is an http.ServerResponse, which is a Writable Stream var body = ''; // we want to get the data as utf8 strings // If you don't set an encoding, then you'll get Buffer objects req.setEncoding('utf8'); // Readable streams emit 'data' events once a listener is added req.on('data', function (chunk) { body += chunk; }); // the end event tells you that you have entire body req.on('end', function () { try { var data = JSON.parse(body); } catch (er) { // uh oh! bad json! res.statusCode = 400; return res.end('error: ' + er.message); } // write back something interesting to the user: res.write(typeof data); res.end(); }); }); server.listen(1337); // $ curl localhost:1337 -d '{}' // object // $ curl localhost:1337 -d '"foo"' // string // $ curl localhost:1337 -d 'not json' // error: Unexpected token o ~~~ data事件表示讀取或寫入了一塊數據。 ~~~ req.on('data', function(buf){ // Do something with the Buffer }); ~~~ 使用req.setEncoding方法,可以設定字符串編碼。 ~~~ req.setEncoding('utf8'); req.on('data', function(str){ // Do something with the String }); ~~~ end事件,表示讀取或寫入數據完畢。 ~~~ var http = require('http'); http.createServer(function(req, res){ res.writeHead(200); req.on('data', function(data){ res.write(data); }); req.on('end', function(){ res.end(); }); }).listen(3000); ~~~ 上面代碼相當于建立了“回聲”服務,將HTTP請求的數據體,用HTTP回應原樣發送回去。 system模塊提供了pump方法,有點像Linux系統的管道功能,可以將一個數據流,原封不動得轉給另一個數據流。所以,上面的例子也可以用pump方法實現。 ~~~ var http = require('http'), sys = require('sys'); http.createServer(function(req, res){ res.writeHead(200); sys.pump(req, res); }).listen(3000); ~~~ ## fs模塊 fs模塊的createReadStream方法用于新建讀取數據流,createWriteStream方法用于新建寫入數據流。使用這兩個方法,可以做出一個用于文件復制的腳本copy.js。 ~~~ // copy.js var fs = require('fs'); console.log(process.argv[2], '->', process.argv[3]); var readStream = fs.createReadStream(process.argv[2]); var writeStream = fs.createWriteStream(process.argv[3]); readStream.on('data', function (chunk) { writeStream.write(chunk); }); readStream.on('end', function () { writeStream.end(); }); readStream.on('error', function (err) { console.log("ERROR", err); }); writeStream.on('error', function (err) { console.log("ERROR", err); });d all your errors, you wouldn't need to use domains. ~~~ 上面代碼非常容易理解,使用的時候直接提供源文件路徑和目標文件路徑,就可以了。 ~~~ node cp.js src.txt dest.txt ~~~ Streams對象都具有pipe方法,起到管道作用,將一個數據流輸入另一個數據流。所以,上面代碼可以重寫成下面這樣: ~~~ var fs = require('fs'); console.log(process.argv[2], '->', process.argv[3]); var readStream = fs.createReadStream(process.argv[2]); var writeStream = fs.createWriteStream(process.argv[3]); readStream.on('open', function () { readStream.pipe(writeStream); }); readStream.on('end', function () { writeStream.end(); }); ~~~ ## 錯誤處理 下面是壓縮后發送文件的代碼。 ~~~ http.createServer(function (req, res) { // set the content headers fs.createReadStream('filename.txt') .pipe(zlib.createGzip()) .pipe(res) }) ~~~ 上面的代碼沒有部署錯誤處理機制,一旦發生錯誤,就無法處理。所以,需要加上error事件的監聽函數。 ~~~ http.createServer(function (req, res) { // set the content headers fs.createReadStream('filename.txt') .on('error', onerror) .pipe(zlib.createGzip()) .on('error', onerror) .pipe(res) function onerror(err) { console.error(err.stack) } }) ~~~ 上面的代碼還是存在問題,如果客戶端中斷下載,寫入的數據流就會收不到close事件,一直處于等待狀態,從而造成內存泄漏。因此,需要使用[on-finished模塊](https://github.com/jshttp/on-finished)用來處理這種情況。 ~~~ http.createServer(function (req, res) { var stream = fs.createReadStream('filename.txt') // set the content headers stream .on('error', onerror) .pipe(zlib.createGzip()) .on('error', onerror) .pipe(res) onFinished(res, function () { // make sure the stream is always destroyed stream.destroy() }) }) ~~~ ## 參考鏈接 * James Halliday,?[cs294-101-streams-lecture](https://github.com/substack/cs294-101-streams-lecture)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看