<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                ### 穩定度: 2 - 穩定 流是一個被`io.js`內部的許多對象所實現的抽象接口。例如一個發往HTTP服務器的請求是一個留,`stdout`也是一個流。流可以是可讀的,可寫的或雙向的。所有的流都是`EventEmitter`實例。 你可以通過`require('stream')`來取貨`Stream`的基類。其中包括了`Readable`流,`Writable`流,`Duplex`流和`Transform`流的基類。 此文檔分為三個章節。第一章節解釋了在你的編程中使用流時需要的API。如果你不需要實現你自己的流式API,你可以在這里停止。 第二章節解釋了你在構建你自己的流時需要的API,這些API是為了方便你這么做而設計的。 第三章節深入講述了流的工作機制,包括一些內部的機制和函數,你不應該去改動它們除非你知道你在做什么。 ### 面向流消費者的API 流可以是可讀的,可寫的,或雙工的。 所有的流都是`EventEmitters`。但是它們也各自有一些獨特的方法和屬性,這取決于它們是可讀流,可寫流或雙工流。 如果一個流同時是可讀的和可寫的,那么表示它實現了以下所有的方法和事件。所以,這些API同時也涵蓋`Duplex`或`Transform`流,即使它們的實現可能有些不同。 在你程序中,為了消費流而去實現流接口不是必須的。如果你確實正在你的程序中實現流接口,請參考下一章節`面向流實現者的API`。 幾乎所有`io.js`程序,不論多簡單,都使用了流。下面是一個在`io.js`是使用流的例子: ~~~ var http = require('http'); var server = http.createServer(function (req, res) { // req is an http.IncomingMessage, which is a Readable Stream // res is an http.ServerResponse, which is a Writable Stream var body = ''; // we want to get the data as utf8 strings // If you don't set an encoding, then you'll get Buffer objects req.setEncoding('utf8'); // Readable streams emit 'data' events once a listener is added req.on('data', function (chunk) { body += chunk; }); // the end event tells you that you have entire body req.on('end', function () { try { var data = JSON.parse(body); } catch (er) { // uh oh! bad json! res.statusCode = 400; return res.end('error: ' + er.message); } // write back something interesting to the user: res.write(typeof data); res.end(); }); }); server.listen(1337); // $ curl localhost:1337 -d '{}' // object // $ curl localhost:1337 -d '"foo"' // string // $ curl localhost:1337 -d 'not json' // error: Unexpected token o ~~~ #### Class: stream.Readable 可讀流接口是一個你可以從之讀取數據的數據源的抽象。換句話說,數據從可讀流而來。 除非你指示已經準備好接受數據,否則可讀流不會開始發生數據。 可讀流有兩個“模式”:流動模式和暫停模式。當在流動模式時,數據由底層系統讀出,并且會盡快地提供給你的程序。當在暫停模式時,你必須調用`stream.read()`方法來獲取數據塊。流默認是暫停模式。 注意:如果`data`事件沒有被綁定監聽器,并且沒有導流(pipe)目標,并且流被切換到了流動模式,那么數據將會被丟失。 你可以通過下面任意一個做法切換到流動模式: - 添加一個`data`事件的監聽器來監聽數據。 - 調用`resume()`方法來明確開啟流動模式。 - 調用`pipe()`方法將數據導入一個可寫流。 你可以同意下面任意一種方法切換回暫停模式: - 如果沒有導流(pipe)目標,調用`pause()`方法。 - 如果有導流(pipe)目標,移除所有的`data`事件監聽器,并且通過`unpipe()`方法移除所有導流目標。 注意,由于為了向后兼任的原因,移除`data`事件的監聽器將不會自動暫停流。同樣的,如果有導流目標,調用`pause()`方法將不會保證目標流排空并請求更多數據時保持暫停。 一些內置的可讀流例子: - 客戶端的HTTP請求 - 服務端的HTTP響應 - 文件系統讀取流 - `zlib`流 - `crypto`流 - tcp sockets - 子進程的stdout和stderr - `process.stdin` #### Event: 'readable' 當一個數據塊能可以從流中被讀出時,會觸發一個`readable`事件。 某些情況下,監聽一個`readable`事件會導致一些將要被讀出的數據從底層系統進入內部緩沖,如果它沒有準備好。 ~~~ var readable = getReadableStreamSomehow(); readable.on('readable', function() { // there is some data to read now }); ~~~ 當內部緩沖被排空時,一旦有更多數據,`readable`事件會再次觸發。 #### Event: 'data' - chunk Buffer | String 數據塊 為一個沒有被暫停的流添加一個`data`事件的監聽器會使其切換到流動模式。之后數據會被盡快得傳遞給用戶。 如果你只是想盡快得從流中取得所有數據,這是最好的方式。 ~~~ var readable = getReadableStreamSomehow(); readable.on('data', function(chunk) { console.log('got %d bytes of data', chunk.length); }); ~~~ #### Event: 'end' 當沒有更多可讀的數據時這個事件會被觸發。 注意,除非數據被完全消費,`end`事件才會觸發。這可以通過切換到流動模式,或重復調用`read()`方法。 ~~~ var readable = getReadableStreamSomehow(); readable.on('data', function(chunk) { console.log('got %d bytes of data', chunk.length); }); readable.on('end', function() { console.log('there will be no more data.'); }); ~~~ #### Event: 'close' 當底層資源(如源頭的文件描述符)被關閉時觸發。不是所有的流都會觸發這個事件。 #### Event: 'error' - Error Object 當接受數據時有錯誤發生,會觸發此事件。 #### readable.read([size]) - size Number 可選,指定讀取數據的數量 - Return String | Buffer | null `read()`方法從內部緩沖中取出數據并返回它。如果沒有可用數據,那么將返回`null`。 如果你傳遞了一個`size`參數,那么它將返回指定字節的數據。如果`size`參數的字節數不可用,那么將返回`null`。 如果你不指定`size`參數,那么將會返回內部緩沖中的所有數據。 這個方法只能在暫定模式中被調用。在流動模式下,這個方法會被自動地重復調用,知道內部緩沖被排空。 ~~~ var readable = getReadableStreamSomehow(); readable.on('readable', function() { var chunk; while (null !== (chunk = readable.read())) { console.log('got %d bytes of data', chunk.length); } }); ~~~ 如果這個方法返回一個數據塊,那么它也會觸發`data`事件。 #### readable.setEncoding(encoding) - encoding String 使用的編碼 - Return: this 調用這個函數會導致流返回指定編碼的字符串而不是`Buffer`對象。例如,如果你調用`readable.setEncoding('utf8')`,那么輸出的數據將被解釋為UTF-8數據,并且作為字符串返回。如果你調用了`readable.setEncoding('hex')`,那么數據將被使用十六進制字符串的格式編碼。 該方法可以正確地處理多字節字符。如果你只是簡單地直接取出緩沖并且對它們調用`buf.toString(encoding)`,將會導致錯位。如果你想使用字符串讀取數據,請使用這個方法。 ~~~ var readable = getReadableStreamSomehow(); readable.setEncoding('utf8'); readable.on('data', function(chunk) { assert.equal(typeof chunk, 'string'); console.log('got %d characters of string data', chunk.length); }); ~~~ #### readable.resume() - Return: this 這個方法將會讓可讀流繼續觸發`data`事件。 這個方法將會使流切換至流動模式。如果你不想消費流中的數據,但你想監聽它的`end`事件,你可以通過調用`readable.resume()`來打開數據流。 ~~~ var readable = getReadableStreamSomehow(); readable.resume(); readable.on('end', function() { console.log('got to the end, but did not read anything'); }); ~~~ #### readable.pause() - Return: this 這個方法會使一個處于流動模式的流停止觸發`data`事件,并切換至暫停模式。所有可用的數據將仍然存在于內部緩沖中。 ~~~ var readable = getReadableStreamSomehow(); readable.on('data', function(chunk) { console.log('got %d bytes of data', chunk.length); readable.pause(); console.log('there will be no more data for 1 second'); setTimeout(function() { console.log('now data will start flowing again'); readable.resume(); }, 1000); }); ~~~ #### readable.isPaused() - Return: Boolean 這個方法會返回流是否被客戶端代碼所暫停(調用`readable.pause()`,并且沒有在之后調用`readable.resume()`)。 ~~~ var readable = new stream.Readable readable.isPaused() // === false readable.pause() readable.isPaused() // === true readable.resume() readable.isPaused() // === false ~~~ #### readable.pipe(destination[, options]) - destination Writable Stream 寫入數據的目標 - **options Object** - end Boolean 當讀取者結束時結束寫入者。默認為`true`。 這個方法會取出可讀流中所有的數據,并且將之寫入指定的目標。這個方法會自動調節流量,所以當快速讀取可讀流時目標不會溢出。 可以將數據安全地導流至多個目標。 ~~~ var readable = getReadableStreamSomehow(); var writable = fs.createWriteStream('file.txt'); // All the data from readable goes into 'file.txt' readable.pipe(writable); ~~~ 這個函數返回目標流,所以你可以鏈式調用`pipe()`: ~~~ var r = fs.createReadStream('file.txt'); var z = zlib.createGzip(); var w = fs.createWriteStream('file.txt.gz'); r.pipe(z).pipe(w); ~~~ 例子,模仿UNIX的`cat`命令: ~~~ process.stdin.pipe(process.stdout); ~~~ 默認情況下,當源流觸發`end`事件時,目標流會被調用`end()`方法,然后目標就不再是可寫的了。將傳遞`{ end: false }`作為`options`參數,將保持目標流開啟。 例子,保持被寫入的流開啟,所以“Goodbye”可以在末端被寫入: ~~~ reader.pipe(writer, { end: false }); reader.on('end', function() { writer.end('Goodbye\n'); }); ~~~ 注意,不論指定任何`options`參數,`process.stderr`和`process.stdout`在程序退出前永遠不會被關閉。 #### readable.unpipe([destination]) - destination Writable Stream 可選,指定解除導流的流 這方法會移除之前調用`pipe()`方法所設置的鉤子。 如果沒有指定目標,那么所有的導流都會被移除。 如果指定了目標,但是并沒有為目標設置導流,那么什么都不會發生。 ~~~ var readable = getReadableStreamSomehow(); var writable = fs.createWriteStream('file.txt'); // All the data from readable goes into 'file.txt', // but only for the first second readable.pipe(writable); setTimeout(function() { console.log('stop writing to file.txt'); readable.unpipe(writable); console.log('manually close the file stream'); writable.end(); }, 1000); ~~~ #### readable.unshift(chunk) - chunk Buffer | String 要插回讀取隊列開頭的數據塊。 該方法在許多場景中都很有用,比如一個流正在被一個解析器消費,解析器可能需要將某些剛拉取出的數據“逆消費”回來源,以便流能將它傳遞給其它消費者。 如果你發現你必須經常在你的程序中調用`stream.unshift(chunk)`,你應該考慮實現一個`Transform`流(參閱下文的面向流實現者的API)。 ~~~ // Pull off a header delimited by \n\n // use unshift() if we get too much // Call the callback with (error, header, stream) var StringDecoder = require('string_decoder').StringDecoder; function parseHeader(stream, callback) { stream.on('error', callback); stream.on('readable', onReadable); var decoder = new StringDecoder('utf8'); var header = ''; function onReadable() { var chunk; while (null !== (chunk = stream.read())) { var str = decoder.write(chunk); if (str.match(/\n\n/)) { // found the header boundary var split = str.split(/\n\n/); header += split.shift(); var remaining = split.join('\n\n'); var buf = new Buffer(remaining, 'utf8'); if (buf.length) stream.unshift(buf); stream.removeListener('error', callback); stream.removeListener('readable', onReadable); // now the body of the message can be read from the stream. callback(null, header, stream); } else { // still reading the header. header += str; } } } } ~~~ #### readable.wrap(stream) - stream Stream 一個“舊式”可讀流 `Node.js` v0.10 以及之前版本的流沒有完全包含如今的所有的流API(更多的信息請參閱下文的“兼容性”)。 如果你正在使用一個老舊的`io.js`庫,它觸發`data`時間并且有一個僅作查詢用途的`pause()`方法,那么你可以調用`wrap()`方法來創建一個使用“舊式”流作為數據源的可讀流。 你幾乎不會用到這個函數,它的存在僅是為了老舊的`io.js`程序和庫交互。 例子: ~~~ var OldReader = require('./old-api-module.js').OldReader; var oreader = new OldReader; var Readable = require('stream').Readable; var myReader = new Readable().wrap(oreader); myReader.on('readable', function() { myReader.read(); // etc. }); ~~~ #### Class: stream.Writable 可寫流接口是一個你可以向其寫入數據的目標的抽象。 一些內部的可寫流例子: - 客戶端的http請求 - 服務端的http響應 - 文件系統寫入流 - `zlib`流 - `crypto`流 - tcp `socket` - 子進程`stdin` - `process.stdout`,`process.stderr` #### writable.write(chunk[, encoding][, callback]) - chunk String | Buffer 要寫入的數據 - encoding String 編碼,如果數據塊是字符串 - callback Function 當數據塊寫入完畢后調用的回調函數 - Returns: Boolean 如果被全部處理則返回`true` 該方法向底層系統寫入數據,并且當數據被全部處理后調用指定的回調函數。 返回值指示了你是否可以立刻寫入數據。如果數據需要被內部緩沖,會返回`false`。否則返回`true`。 返回值經供參考。即使返回`false`,你仍可以繼續寫入數據。但是,寫入的數據將會被緩沖在內存里,所以最好不要這樣做。應該在寫入更多數據前等待`drain`事件。 #### Event: 'drain' 如果一個`writable.write(chunk)`調用返回了`false`,那么`drain`事件會指示出可以繼續向流寫入數據的時機。 ~~~ // Write the data to the supplied writable stream 1MM times. // Be attentive to back-pressure. function writeOneMillionTimes(writer, data, encoding, callback) { var i = 1000000; write(); function write() { var ok = true; do { i -= 1; if (i === 0) { // last time! writer.write(data, encoding, callback); } else { // see if we should continue, or wait // don't pass the callback, because we're not done yet. ok = writer.write(data, encoding); } } while (i > 0 && ok); if (i > 0) { // had to stop early! // write some more once it drains writer.once('drain', write); } } } ~~~ #### writable.cork() 強制滯留所有寫入。 滯留的數據會在調用`.uncork()`或`.end()`方法后被寫入。 #### writable.uncork() 寫入在調用`.cork()`方法所有被滯留的數據。 #### writable.setDefaultEncoding(encoding) - encoding String 新的默認編碼 設置一個可寫流的默認編碼。 #### writable.end([chunk][, encoding][, callback]) - chunk String | Buffer 可選,寫入的數據 - encoding String 編碼,如果數據塊是字符串 - callback Function 可選,回調函數 當沒有更多可寫的數據時,調用這個方法。如果指定了回調函數,那么會被添加為`finish`事件的監聽器。 在調用了`end()`后調用`write()`會導致一個錯誤。 ~~~ // write 'hello, ' and then end with 'world!' var file = fs.createWriteStream('example.txt'); file.write('hello, '); file.end('world!'); // writing more now is not allowed! ~~~ #### Event: 'finish' 當調用了`end()`方法,并且所有的數據都被寫入了底層系統,這個事件會被觸發。 ~~~ var writer = getWritableStreamSomehow(); for (var i = 0; i < 100; i ++) { writer.write('hello, #' + i + '!\n'); } writer.end('this is the end\n'); writer.on('finish', function() { console.error('all writes are now complete.'); }); ~~~ #### Event: 'pipe' - src Readable Stream 對這個可寫流進行導流的源可讀流 這個事件將會在可讀流被一個可寫流使用`pipe()`方法進行導流時觸發。 ~~~ var writer = getWritableStreamSomehow(); var reader = getReadableStreamSomehow(); writer.on('pipe', function(src) { console.error('something is piping into the writer'); assert.equal(src, reader); }); reader.pipe(writer); ~~~ #### Event: 'unpipe' - src Readable Stream 對這個可寫流停止導流的源可讀流 當可讀流對其調用`unpipe()`方法,在源可讀流的目標集合中刪除這個可寫流,這個事件將會觸發。 ~~~ var writer = getWritableStreamSomehow(); var reader = getReadableStreamSomehow(); writer.on('unpipe', function(src) { console.error('something has stopped piping into the writer'); assert.equal(src, reader); }); reader.pipe(writer); reader.unpipe(writer); ~~~ #### Event: 'error' - Error object 在寫入數據或導流發生錯誤時觸發。 #### Class: stream.Duplex 雙工是同時實現了可讀流與可寫流的借口。它的用處請參閱下文。 內部雙工流的例子: - tcp `socket` - `zlib`流 - `crypto`流 #### Class: stream.Transform 轉換流是一種輸出由輸入計算所得的栓共流。它們同時集成了可讀流與可寫流的借口。它們的用處請參閱下文。 內部轉換流的例子: - `zlib`流 - `crypto`流 ### 面向流實現者的API 實現所有種類的流的模式都是一樣的: 1. 為你的子類繼承合適的父類(`util.inherits`非常合適于做這個)。 1. 為了保證內部機制被正確初始化,在你的構造函數中調用合適的父類構造函數。 1. 實現一個或多個特定的方法,參閱下文。 被擴展的類和要實現的方法取決于你要編寫的流類的類型: | 用途 | 類 | 需要實現的方法 | |-----|-----|-----| | 只讀 | Readable | _read | | 只寫 | Writable | _write, _writev | | 可讀以及可寫 | Duplex | _read, _write, _writev | | 操作被寫入數據,然后讀出結果 | Transform | _transform, _flush | 在你的實現代碼中,非常重要的一點是永遠不要調用上文的面向流消費者的API。否則,你在程序中消費你的流接口時可能有潛在的副作用。 #### Class: stream.Readable `stream.Readable`是一個被設計為需要實現底層的`_read(size)`方法的抽象類。 請參閱上文的面向流消費者的API來了解如何在程序中消費流。以下解釋了如果在你的程序中實現可讀流。 例子:一個計數流 這是一個可讀流的基礎例子。它從1到1,000,000遞增數字,然后結束。 ~~~ var Readable = require('stream').Readable; var util = require('util'); util.inherits(Counter, Readable); function Counter(opt) { Readable.call(this, opt); this._max = 1000000; this._index = 1; } Counter.prototype._read = function() { var i = this._index++; if (i > this._max) this.push(null); else { var str = '' + i; var buf = new Buffer(str, 'ascii'); this.push(buf); } }; ~~~ 例子:簡單協議 v1 (次優) 這類似于上文中提到的`parseHeader`函數,但是使用一個自定義流實現。另外,注意這個實現不將流入的數據轉換為字符串。 更好地實現是作為一個轉換流實現,請參閱下文更好地實現。 ~~~ // A parser for a simple data protocol. // The "header" is a JSON object, followed by 2 \n characters, and // then a message body. // // NOTE: This can be done more simply as a Transform stream! // Using Readable directly for this is sub-optimal. See the // alternative example below under the Transform section. var Readable = require('stream').Readable; var util = require('util'); util.inherits(SimpleProtocol, Readable); function SimpleProtocol(source, options) { if (!(this instanceof SimpleProtocol)) return new SimpleProtocol(source, options); Readable.call(this, options); this._inBody = false; this._sawFirstCr = false; // source is a readable stream, such as a socket or file this._source = source; var self = this; source.on('end', function() { self.push(null); }); // give it a kick whenever the source is readable // read(0) will not consume any bytes source.on('readable', function() { self.read(0); }); this._rawHeader = []; this.header = null; } SimpleProtocol.prototype._read = function(n) { if (!this._inBody) { var chunk = this._source.read(); // if the source doesn't have data, we don't have data yet. if (chunk === null) return this.push(''); // check if the chunk has a \n\n var split = -1; for (var i = 0; i < chunk.length; i++) { if (chunk[i] === 10) { // '\n' if (this._sawFirstCr) { split = i; break; } else { this._sawFirstCr = true; } } else { this._sawFirstCr = false; } } if (split === -1) { // still waiting for the \n\n // stash the chunk, and try again. this._rawHeader.push(chunk); this.push(''); } else { this._inBody = true; var h = chunk.slice(0, split); this._rawHeader.push(h); var header = Buffer.concat(this._rawHeader).toString(); try { this.header = JSON.parse(header); } catch (er) { this.emit('error', new Error('invalid simple protocol data')); return; } // now, because we got some extra data, unshift the rest // back into the read queue so that our consumer will see it. var b = chunk.slice(split); this.unshift(b); // and let them know that we are done parsing the header. this.emit('header', this.header); } } else { // from there on, just provide the data to our consumer. // careful not to push(null), since that would indicate EOF. var chunk = this._source.read(); if (chunk) this.push(chunk); } }; // Usage: // var parser = new SimpleProtocol(source); // Now parser is a readable stream that will emit 'header' // with the parsed header data. ~~~ #### new stream.Readable([options]) - **options Object** - highWaterMark Number 在停止從底層資源讀取之前,在內部緩沖中存儲的最大字節數。默認為16kb,對于`objectMode`則是16 - encoding String 如果被指定,那么緩沖將被利用指定編碼解碼為字符串,默認為`null` - objectMode Boolean 是否該流應該表現如一個對象的流。意思是說`stream.read(n)`返回一個單獨的對象而不是一個大小為`n`的`Buffer`,默認為`false` 在實現了`Readable`類的類中,請確保調用了`Readable`構造函數,這樣緩沖設置才能被正確的初始化。 #### readable._read(size) - size Number 異步讀取數據的字節數 注意:實現這個函數,而不要直接調用這個函數。 這個函數不應該被直接調用。它應該被子類實現,并且僅被`Readable`類的內部方法調用。 所有的可讀流都必須實現這個方法用來從底層資源中獲取數據。 這個函數有一個下劃線前綴,因為它對于類是內部的,并應該直接被用戶的程序調用。你應在你的拓展類里覆蓋這個方法。 當數據可用時,調用`readable.push(chunk)`方法將之推入讀取隊列。如果方法返回`false`,那么你應當停止讀取。當`_read`方法再次被調用,你應當推入更多數據。 參數`size`僅作查詢。“read”調用返回數據的實現可以通過這個參數來知道應當抓取多少數據;其余與之無關的實現,比如TCP或TLS,則可忽略這個參數,并在可用時返回數據。例如,沒有必要“等到”`size`個字節可用時才調用`stream.push(chunk)`。 #### readable.push(chunk[, encoding]) - chunk Buffer | null | String 被推入讀取隊列的數據塊 - encoding String 字符串數據塊的編碼。必須是一個合法的`Buffer`編碼,如'utf8'或'ascii' - return Boolean 是否應該繼續推入 注意:這個函數應該被`Readable`流的實現者調用,而不是消費者。 `_read()`函數在至少調用一次`push(chunk)`方法前,不會被再次調用。 `Readable`類通過在`readable`事件觸發時,調用`read()`方法將數據推入 之后用于讀出數據的讀取隊列 來工作。 `push()`方法需要明確地向讀取隊列中插入數據。如果它的參數為`null`,那么它將發送一個數據結束信號(`EOF`)。 這個API被設計為盡可能的靈活。例如,你可能正在包裝一個有`pause/resume`機制和一個數據回調函數的低級別源。那那些情況下,你可以通過以下方式包裝這些低級別源: ~~~ // source is an object with readStop() and readStart() methods, // and an `ondata` member that gets called when it has data, and // an `onend` member that gets called when the data is over. util.inherits(SourceWrapper, Readable); function SourceWrapper(options) { Readable.call(this, options); this._source = getLowlevelSourceObject(); var self = this; // Every time there's data, we push it into the internal buffer. this._source.ondata = function(chunk) { // if push() returns false, then we need to stop reading from source if (!self.push(chunk)) self._source.readStop(); }; // When the source ends, we push the EOF-signaling `null` chunk this._source.onend = function() { self.push(null); }; } // _read will be called when the stream wants to pull more data in // the advisory size argument is ignored in this case. SourceWrapper.prototype._read = function(size) { this._source.readStart(); }; ~~~ #### Class: stream.Writable `stream.Writable`是一個被設計為需要實現底層的`_write(chunk, encoding, callback)`方法的抽象類。 請參閱上文的面向流消費者的API來了解如何在程序中消費流。以下解釋了如果在你的程序中實現可寫流。 #### new stream.Writable([options]) - **options Object** - highWaterMark Number `write()`方法開始返回`false`的緩沖級別。默認為16kb,對于`objectMode`流則是`16` - decodeStrings Boolean 是否在傳遞給`write()`方法前將字符串解碼成`Buffer`。默認為`true` - objectMode Boolean 是否`write(anyObj)`為一個合法操作。如果設置為`true`你可以寫入任意數據而不僅是`Buffer`或字符串數據。默認為`false` 在實現了`Writable`類的類中,請確保調用了`Writable`構造函數,這樣緩沖設置才能被正確的初始化。 #### writable._write(chunk, encoding, callback) - chunk Buffer | String 將要被寫入的數據塊。除非`decodeStrings`配置被設置為`false`,否則將一直是一個`buffer` - encoding String 如果數據塊是一個字符串,那么這就是編碼的類型。如果是一個`buffer`,那么則會忽略它 - callback Function 當你處理完給定的數據塊后調用這個函數 所有的`Writable`流的實現都必須提供一個`_write()`方法來給底層資源傳輸數據。 這個函數不應該被直接調用。它應該被子類實現,并且僅被`Writable`類的內部方法調用。 回調函數使用標準的`callback(error)`模式來表示這個寫操作成功或發生了錯誤。 如果構造函數選項中設置了`decodeStrings`標志,那么數據塊將是一個字符串而不是一個`Buffer`,編碼將會決定字符串的類型。這個是為了幫助處理編碼字符串的實現。如果你沒有明確地將`decodeStrings`選項設為`false`,那么你會安全地忽略`encoding`參數,并且數據塊是`Buffer`形式。 這個函數有一個下劃線前綴,因為它對于類是內部的,并應該直接被用戶的程序調用。你應在你的拓展類里覆蓋這個方法。 #### writable._writev(chunks, callback) - chunks Array 將被寫入的數據塊數組。其中每一個數據都有如下格式:`{ chunk: ..., encoding: ... }` - callback Function 當你處理完給定的數據塊后調用這個函數 注意:這個函數不應該被直接調用。它應該被子類實現,并且僅被`Writable`類的內部方法調用。 這個函數對于你的實現是完全可選的。大多數情況下它是不必的。如果實現,它會被以所有滯留在寫入隊列中的數據塊調用。 #### Class: stream.Duplex 一個“雙工”流既是可讀的,又是可寫的。如TCP`socket`連接。 注意,和你實現`Readable`或`Writable`流時一樣,`stream.Duplex`是一個被設計為需要實現底層的`_read(size)`和`_write(chunk, encoding, callback)`方法的抽象類。 由于`JavaScript`并不具備多繼承能力,這個類是繼承于`Readable`類,并寄生于`Writable`類。所以為了實現這個類,用戶需要同時實現低級別的`_read(n)`方法和低級別的`_write(chunk, encoding, callback)`方法。 #### new stream.Duplex(options) - **options Object** 同時傳遞給`Writable`和`Readable`構造函數。并且包含以下屬性: - allowHalfOpen Boolean 默認為`true`。如果設置為`false`,那么流的可讀的一端結束時可寫的一端也會自動結束,反之亦然。 - readableObjectMode Boolean 默認為`false`,為流的可讀的一端設置`objectMode`。當`objectMode`為`true`時沒有效果。 - writableObjectMode Boolean 默認為`false`,為流的可寫的一端設置`objectMode`。當`objectMode`為`true`時沒有效果。 在實現了`Duplex`類的類中,請確保調用了`Duplex`構造函數,這樣緩沖設置才能被正確的初始化。 #### Class: stream.Transform “轉換”流是一個輸出于輸入存在對應關系的雙工流,如一個`zilib`流或一個`crypto`流。 輸出和輸出并不需要有相同的大小,相同的數據塊數或同時到達。例如,一個哈希流只有一個單獨數據塊的輸出當輸入結束時。一個`zlib`流的輸出比其輸入小得多或大得多。 除了實現`_read()`方法和`_write()`方法,轉換流還必須實現`_transform()`方法,并且可選地實現`_flush()`方法(參閱下文)。 #### new stream.Transform([options]) - options Object 同時傳遞給`Writable`和`Readable`構造函數。 在實現了`Transform`類的類中,請確保調用了`Transform`構造函數,這樣緩沖設置才能被正確的初始化。 #### transform._transform(chunk, encoding, callback) - chunk Buffer | String 將要被寫入的數據塊。除非`decodeStrings`配置被設置為`false`,否則將一直是一個`buffer` - encoding String 如果數據塊是一個字符串,那么這就是編碼的類型。如果是一個buffer,那么則會忽略它 - callback Function 當你處理完給定的數據塊后調用這個函數 這個函數不應該被直接調用。它應該被子類實現,并且僅被`Transform`類的內部方法調用。 所有`Transform`流的實現都必須提供一個`_transform`方法來接受輸入和產生輸出。 在`Transform`類中,`_transform`可以做需要做的任何事,如處理需要寫入的字節,將它們傳遞給可寫端,異步I/O,等等。 調用`transform.push(outputChunk)`0次或多次來從輸入的數據塊產生輸出,取決于你想從這個數據塊中輸出多少數據作為結果。 僅當目前的數據塊被完全消費后,才會調用回調函數。注意,對于某些特殊的輸入可能會沒有輸出。如果你將數據作為第二個參數傳入回調函數,那么數據將被傳遞給`push`方法。換句話說,下面的兩個例子是相等的: ~~~ transform.prototype._transform = function (data, encoding, callback) { this.push(data); callback(); } transform.prototype._transform = function (data, encoding, callback) { callback(null, data); } ~~~ 這個函數有一個下劃線前綴,因為它對于類是內部的,并應該直接被用戶的程序調用。你應在你的拓展類里覆蓋這個方法。 #### transform._flush(callback) - callback Function 當你排空了所有剩余數據后,這個回調函數會被調用 注意:這個函數不應該被直接調用。它應該被子類實現,并且僅被`Transform`類的內部方法調用。 在一些情景中,你的轉換操作需要在流的末尾多發生一點點數據。例如,一個`Zlib`壓縮流會存儲一些內部狀態以便它能優化壓縮輸出。但是在最后,它需要盡可能好得處理這些留下的東西來使數據完整。 在這種情況中,您可以實現一個`_flush`方法,它會在最后被調用,在所有寫入數據被消費、但在觸發`end`表示可讀端到達末尾之前。和`_transform`一樣,只需在寫入操作完成時適當地調用`transform.push(chunk)`零或多次。 這個函數有一個下劃線前綴,因為它對于類是內部的,并應該直接被用戶的程序調用。你應在你的拓展類里覆蓋這個方法。 #### Events: 'finish' 和 'end' `finish`和`end`事件分別來自于父類`Writable`和`Readable`。`finish`事件在`end()`方法被調用以及所有的輸入被`_transform`方法處理后觸發。`end`事件在所有的在`_flush`方法的回調函數被調用后的數據被輸出后觸發。 #### Example: SimpleProtocol 解釋器 v2 上文中的簡單協議解釋器可以簡單地通過高級別的`Transform`流更好地實現。與上文例子中的`parseHeader`和`SimpleProtocol v1`相似。 在這個例子中,沒有從參數中提供輸入,然后將它導流至解釋器中,這更符合`io.js`的使用習慣。 ~~~ var util = require('util'); var Transform = require('stream').Transform; util.inherits(SimpleProtocol, Transform); function SimpleProtocol(options) { if (!(this instanceof SimpleProtocol)) return new SimpleProtocol(options); Transform.call(this, options); this._inBody = false; this._sawFirstCr = false; this._rawHeader = []; this.header = null; } SimpleProtocol.prototype._transform = function(chunk, encoding, done) { if (!this._inBody) { // check if the chunk has a \n\n var split = -1; for (var i = 0; i < chunk.length; i++) { if (chunk[i] === 10) { // '\n' if (this._sawFirstCr) { split = i; break; } else { this._sawFirstCr = true; } } else { this._sawFirstCr = false; } } if (split === -1) { // still waiting for the \n\n // stash the chunk, and try again. this._rawHeader.push(chunk); } else { this._inBody = true; var h = chunk.slice(0, split); this._rawHeader.push(h); var header = Buffer.concat(this._rawHeader).toString(); try { this.header = JSON.parse(header); } catch (er) { this.emit('error', new Error('invalid simple protocol data')); return; } // and let them know that we are done parsing the header. this.emit('header', this.header); // now, because we got some extra data, emit this first. this.push(chunk.slice(split)); } } else { // from there on, just provide the data to our consumer as-is. this.push(chunk); } done(); }; // Usage: // var parser = new SimpleProtocol(); // source.pipe(parser) // Now parser is a readable stream that will emit 'header' // with the parsed header data. ~~~ #### Class: stream.PassThrough 這是一個`Transform`流的實現。將輸入的流簡單地傳遞給輸出。它的主要目的是用來演示和測試,但它在某些需要構建特殊流的情況下可能有用。 ### 簡化的構造器API 可以簡單的構造流而不使用繼承。 這可以通過調用合適的方法作為構造函數和參數來實現: 例子: #### Readable ~~~ var readable = new stream.Readable({ read: function(n) { // sets this._read under the hood } }); ~~~ #### Writable ~~~ var writable = new stream.Writable({ write: function(chunk, encoding, next) { // sets this._write under the hood } }); // or var writable = new stream.Writable({ writev: function(chunks, next) { // sets this._writev under the hood } }); ~~~ #### Duplex ~~~ var duplex = new stream.Duplex({ read: function(n) { // sets this._read under the hood }, write: function(chunk, encoding, next) { // sets this._write under the hood } }); // or var duplex = new stream.Duplex({ read: function(n) { // sets this._read under the hood }, writev: function(chunks, next) { // sets this._writev under the hood } }); ~~~ #### Transform ~~~ var transform = new stream.Transform({ transform: function(chunk, encoding, next) { // sets this._transform under the hood }, flush: function(done) { // sets this._flush under the hood } }); ~~~ ### 流:內部細節 #### 緩沖 `Writable`流和`Readable`流都會分別在一個內部的叫`_writableState.buffer`或`_readableState.buffer`的對象里緩沖數據。 潛在的被緩沖的數據量取決于被傳遞給構造函數的`highWaterMark`參數。 在`Readable`流中,當其的實現調用`stream.push(chunk)`時就會發生緩沖。如果流的消費者沒有調用`stream.read()`,那么數據就會保留在內部隊列中直到它被消費。 在`Writable`流中,當用戶重復調用`stream.write(chunk)`時就會發生緩沖,甚至是當`write()`返回`false`時。 流,尤其是`pipe()`方法的初衷,是限制數據的滯留量在一個可接受的水平,這樣才使得不同傳輸速度的來源和目標不會淹沒可用的內存。 #### stream.read(0) 在一些情況下,你想不消費任何數據而去觸發一次底層可讀流機制的刷新。你可以調用`stream.read(0)`,它總是返回`null`。 如果內部的讀緩沖量在`highWaterMark`之下,并且流沒有正在讀取,那么調用`read(0)`將會觸發一次低級別的`_read`調用。 幾乎永遠沒有必須這么做。但是,你可能會在`io.js`的`Readable`流類的內部代碼的幾處看到這個。 #### stream.push('') 推入一個0字節的字符串或`Buffer`(不處于對象模式)有一個有趣的副作用。因為這是一個`stream.push()`的調用,它將會結束讀取進程。但是,它不添加任何數據到可讀緩沖中,所以沒有任何用戶可消費的數據。 在極少的情況下,你當下沒有數據可以提供,但你的消費者同過調用`stream.read(0)`來得知合適再次檢查。在這樣的情況下,你可以調用`stream.push('')`。 至今為止,這個功能的唯一使用之處是在`tls.CryptoStream`類中,它將在`io.js`的1.0版本中被廢棄。如果你發現你不得不使用`stream.push('')`,請考慮使用另外的方式。因為這幾乎表示發生了某些可怕的錯誤。 ### 與舊版本的`Node.js`的兼容性 在`Node.js`的0.10版本之前,可讀流接口非常簡單,并且功能和功用都不強。 - `data`事件會立刻觸發,而不是等待你調用`read()`方法。如果你需要進行一些`I/O`操作來決定是否處理數據,那么你只能將數據存儲在某些緩沖區中以防數據流失。 - `pause()`僅供查詢,并不保證生效。這意味著你還是要準備接收`data`事件在流已經處于暫停模式中時。 在`io.js` v1.0 和`Node.js` v0.10中,下文所述的`Readable`類添加進來。為了向后兼容性,當一個`data`事件的監聽器被添加時或`resume()`方法被調用時,可讀流切換至流動模式。其作用是,即便您不使用新的`read()`方法和`readable`事件,您也不必擔心丟失數據塊。 大多數程序都會保持功能正常,但是,以下有一些邊界情況: - 沒有添加任何`data`事件 - 從未調用`resume()`方法 - 流沒有被導流至任何可寫的目標 例如,考慮以下代碼: ~~~ // WARNING! BROKEN! net.createServer(function(socket) { // we add an 'end' method, but never consume the data socket.on('end', function() { // It will never get here. socket.end('I got your message (but didnt read it)\n'); }); }).listen(1337); ~~~ 在`Node.js` v0.10前,到來的信息數據會被簡單地丟棄。但是在`io.js` v1.0 和`Node.js` v0.10后,`socket`會被永遠暫停。 解決方案是調用`resume()`方法來開啟數據流: ~~~ // Workaround net.createServer(function(socket) { socket.on('end', function() { socket.end('I got your message (but didnt read it)\n'); }); // start the flow of data, discarding it. socket.resume(); }).listen(1337); ~~~ 除了新的`Readable`流切換至流動模式之外,在v0.10之前的流可以被使用`wrap()`方法包裹。 #### 對象模式 通常情況下,流僅操作字符串和`Buffer`。 處于對象模式中的流除了`Buffer`和字符串外,還能讀出普通的`JavaScirpt`值。 處于對象模式中的可讀流在調用`stream.read(size)`后只會返回單個項目,不論`size`參數是什么。 處于對象模式中的可寫流總是忽略`stream.write(data, encoding)`中的`encoding`參數。 對于處于對象模式中的流,特殊值`null`仍然保留它的特殊意義。也就是說,對于對象模式的可讀流,`stream.read()`返回一個`null`仍意味著沒有更多的數據了,并且`stream.push(null)`會發送一個文件末端信號(`EOF`)。 核心`io.js`中沒有流是對象模式的。這個模式僅僅供用戶的流庫使用。 你應當在子類的構造函數的`options`參數對象中設置對象模式。在流的過程中設置對象模式時不安全的。 對于雙工流,可以分別得通過`readableObjectMode`和`writableObjectMode`設置可讀端和可寫端。這些配置可以被用來通過轉換流實現解釋器和序列化器。 ~~~ var util = require('util'); var StringDecoder = require('string_decoder').StringDecoder; var Transform = require('stream').Transform; util.inherits(JSONParseStream, Transform); // Gets \n-delimited JSON string data, and emits the parsed objects function JSONParseStream() { if (!(this instanceof JSONParseStream)) return new JSONParseStream(); Transform.call(this, { readableObjectMode : true }); this._buffer = ''; this._decoder = new StringDecoder('utf8'); } JSONParseStream.prototype._transform = function(chunk, encoding, cb) { this._buffer += this._decoder.write(chunk); // split on newlines var lines = this._buffer.split(/\r?\n/); // keep the last partial line buffered this._buffer = lines.pop(); for (var l = 0; l < lines.length; l++) { var line = lines[l]; try { var obj = JSON.parse(line); } catch (er) { this.emit('error', er); return; } // push the parsed object out to the readable consumer this.push(obj); } cb(); }; JSONParseStream.prototype._flush = function(cb) { // Just handle any leftover var rem = this._buffer.trim(); if (rem) { try { var obj = JSON.parse(rem); } catch (er) { this.emit('error', er); return; } // push the parsed object out to the readable consumer this.push(obj); } cb(); }; ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看