- pre-notify
- 可寫流實現
- 初始化參數
- write()
- 流程圖
- 注意事項
- _write()
- 流程圖
- 注意事項
- end()
- 可讀流實現
- 初始化參數
- 流動模式和暫停模式
- 流動模式
- 暫停模式
- case1
- case2
- 流程圖
- 實現難點
- howMuchToRead
- pipe
- 源碼
[TOC]
## pre-notify
可讀可寫流的簡明實現,以求加深對可讀流可寫流的印象與理解。
本文用流程圖概括了整個源碼的實現,著重講述了比較重要以及難實現的點,推薦打開尾巴處的倉庫地址,對照實際的代碼來閱讀。
(づ ̄ 3 ̄)づ Let`s go!
## 可寫流實現
### 初始化參數
```
class WriteStream extends EventEmitter{
constructor(path,options){
this.path = path;
this.flags = options.flags||'w';
this.encoding = options.encoding||'utf8';
this.highWaterMark = options.highWaterMark||16*1024;
this.mode = options.mode||0o666;
this.autoClose = options.autoClose||true;
this.fd = options.fd||null;
// 之所以需要pos,是因為還可以將flags設置成a 追加嘛
this.pos = options.start||0;
// 用來標識是否正在真正寫入文件
this.writing = false;
// 用于緩存正在寫入時write進的東東
this.buffers = [];
// 用來標識緩存區的大小
this.leng = 0;
// 用來標識寫入文件完畢后是否需要觸發drain事件
this.needDrain = false;
// 用來標識是否已經調用過end()方法
this.isEnd = false;
// --- --- ---
this.open(); // 打開文件,緩存fd
}
}
```
### write()
#### 流程圖

`_write`部分沒有詳細注釋,其中有一點需要格外注意的是,如果此次`write`方法是通過調用`end`間接調用的,那么在`_write`寫入文件完畢后會關閉文件。
#### 注意事項
- 寫入的必須是 `buffer` 或則 `字符串`,數字是**不行的**
```
let flag = ws.write(1+'','utf8',()=>{
console.log('ok');
});
```
- 只有當攢存的數據大于 `hightWaterMark` **且** 緩存的數據被清空時,才會觸發 `drain` 事件。
- `needDrain` 和 `isEnd` 都是針對于**整個寫入對象**來說的。
### \_write()
#### 流程圖

#### 注意事項
- 這里判斷是否為 `end()` 方法調用并不是依靠 `isEnd` 而是依據調用 `_write` 方法時的第三個參數 `end` ,因為 isEnd 的改變是在**本輪執行**時就改變了,而我們要關閉文件的話必須確保的是在調用完 end 以后。
```
// write 方法中
...
else{ // isWriting
this.push({
chunk
,end
,callback
})
}
...
// --- --- ---
// clearBuffer 方法中
...
if(buf){
this._write(buf.chunk,()=>{
buf.callback();
this.clearBuffer();
},buf.end);
...
```
### end()
```
end(chunk,encoding=this.encoding,callback=()=>{}){
this.write(chunk,encoding,callback,true);
}
```
第三個參數為內部使用,用來標識是通過end調用的write方法,調用之后不再允許使用write繼續寫入,并且在end實際寫入文件后關閉文件
## 可讀流實現
### 初始化參數
```
class ReadStream extends EventEmitter{
constructor(path,options){
this.path = path;
this.flags = options.flags||'r';
this.highWaterMark = options.highWaterMark||64*1024;
this.encoding = options.encoding||null;
this.mode = options.mode||0o666;
this.autoClose = options.autoClose||true;
this.fd = options.fd||null;
this.pos = options.start||0;
this.end = options.end||null;
// 標識可讀流此刻的模式 流動||暫停
this.flowing = false;
// 每一次讀取的buffer的大小
this.buffer = Buffer.alloc(this.highWaterMark);
// 用于暫停模式時緩存讀取的數據
this.buffers = [];
// 相當于rs._readableState.length
this.length = 0;
// 是否需要發射readable事件
// 只有緩存區被讀取干凈時才會發射事件
this.emittedReadable = false;
// --- --- ---
this.open();
this.on('newListener',(eventName)=>{ // 切換為流動模式讀取
if(eventName === 'data'){
this.flowing = true;
this.read();
}
if(eventName === 'readable'){ // 切換為暫停模式讀取
this.flowing = false;
this.read();
}
});
}
}
```
### 流動模式和暫停模式
從上面的參數初始化可知,可讀流可以通過監聽兩種不同的事件來獲取數據。
#### 流動模式
監聽的第一種 `data` 事件被稱之為可讀流的 `流動模式` 讀取,監聽之后它會框框的不停發射它所讀取到的data,每次讀取到的data大小取決于 `highWaterMark` 。另外我們可以在data的回調中通過`.pause()` 方法暫停文件的讀取和data的發射,什么時候想恢復了還可以通過 `.resume()` 來恢復文件的讀取和data的發射。
#### 暫停模式
監聽的第二種 `readable` 事件被稱之為可讀流的 `暫停模式` 讀取。
不同于流動模式的讀取,暫停模式下,首先當我們一旦監聽readable事件,它會先去讀取 `highWaterMark` 個字節到緩存中**并且會觸發一次 `readable` 事件來通知我們**,而我們想要拿到這些緩存中的數據需要通過 `read(n)` 。
并且這個模式下,它很智能,只要我們從緩存中拿取了數據且剩下的數據小于 `highWaterMark` 時,它就**會自動續杯**,往緩沖區再填充 `highWaterMark` 這么多字節的數據。
> **注意**,它每次填充的數據都是剛好 `hightWaterMark` 這么多,不會多也不會少。
那,readable 事件除了剛開始那一次觸發,什么時候會再觸發呢?
答案是當緩存區被抽干,嗯。。。**完全抽干再續上杯**的時候就會再一次觸發的 `readable` 事件。
> **注意:** 續杯并不一定等于會觸發readable,只有緩沖區被抽干,并且還續了杯,才會觸發readable
##### case1
讓我們看如下這么個栗子
```
// 假設hightWaterMark為3
rs.on('readable',()=>{
let result = rs.read(1);
console.log(result);
result = rs.read(1);
console.log(result);
result = rs.read(1);
console.log(result);
})
<<<
會一直打印,直到整個文件被讀取完
```
之所以產生這樣的結果,就是因為我們在readable回調了剛好讀取了 `highWaterMark` 這么多字節的數據,每一次剛好把緩沖區讀完,這意味著它續杯的時候就會再一次觸發 readable,這樣就形成了遞歸,不斷觸發readable。
##### case2
還有一種情況會不斷觸發 readable
```
rs.on('readable',()=>{
let result = rs.read(); // 什么都不填
console.log(result);
})
<<<
每次會打印hightWaterMark個字節
```
實際上這個栗子是上面栗子的簡寫形式,`rs.read()` 就相當于 `rs.read(rs.highWaterMark)`
#### 流程圖

上面的流程圖中有一點是沒有詳細注釋的,就是當要讀取的字節數大于緩沖區中存儲的字節數時,Node.js源碼中是會將 `hightWaterMark` 先擴充(擴充的大小是按照**2的N次方**的方式來擴充的),再去讀取數據。嗯。。。讀一個比你設置的hightWaterMark還大的,有蝦米意義?早知如此,當初就該把highWaterMark設置大點不就好咯?我們這里的實現略過這種情況。
#### 實現難點
##### howMuchToRead
讀不像寫,讀的時候不僅可以設置 `start` 還能設置 `end`。
So,當我們設置了 `end` 時,我們每次讀取的大小可能就不再是 `highWaterMark` 個了,準確來說我們最后一次讀取的量應該是 `this.end-this.pos+1` 這么多個。
> **注意:** 之所以要+1,是因為流的API是全Node中最奇葩的,它的索引位置是包前又包后的!
所以每次讀取前,我們需要先計算先讀取的字節數
```
let howMuchToRead = this.end?Math.min(this.highWaterMark,this.end-this.pos+1):this.hightWaterMark;
```
### pipe
pipe實現就很簡單咯,就是利用可寫流的 `flag` 和 可讀流的流動模式 以及 `pause` 和 `resume` 方法。
```
pipe(ws){
this.on('data',(data)=>{
let flag = ws.write(data);
if(!flag)this.pause();
});
ws.on('drain',()=>{
this.resume();
});
this.on('end',()=>{
ws.end();
});
}
```
## 源碼
> 倉庫地址: [點我點我!](https://github.com/fancierpj0/iStream)