## Pro
一個`createWriteStream`的簡單實現,以求能增加對可寫流的理解與應用。
## 參數配置
```
/**
* createWriteStream
* @param1 path
* @param2 options
*/
let fs = require('fs');
let ws = fs.createWriteStream('./1.txt',{
flags:'w'//文件的打開模式
,mode:0o666//文件的權限設置
,encoding:'utf8'//寫入文件的字符的編碼
,highWaterMark:3//最高水位線
,start:0 //寫入文件的起始索引位置
,autoClose:true//是否自動關閉文檔
})
```
## createWriteStream類的實例化
- 實例化一個`createWriteStream`類
- 將`path`,`options`掛載在`createWriteStream`的實例上,除此之外再在實例上掛載以下屬性
- `self.fd=null`:文件打開后返回的文件描述符
- `self.pos=self.start`:用于表示文件真正寫入時的指針位置
- `self.Buffer=[]`:用來表示文件的緩沖區
- `self.len=null`:用來表示緩沖區此時的大小
- `self.isWriting=false`:用來表示是否正在真正寫入文件
- 調用`open`方法,打開文件(發射open事件)
## 實例write方法的執行流程
- `wirte`方法接收三個參數,`chunk`要寫入的內容,`encoding`要進行的,`cb`回調函數。
- `write`執行流程:
- 判斷傳入的`chunk`是否為buffer,如果不是,則轉換成buffer,用于轉化編碼依據傳入的`encoding`參數。
- 更新`Buffer`緩沖區的`len`長度,讓len加上該次chunk的長度
- 判斷`len`是否已經超過`highWaterMark`,將值存入`flag`
- 判斷是否處于`isWriting`狀態:
- 是,則先加`chunk`寫入實例對象下的`Buffer緩沖區`。
- 否,更新`isWriting`,接將參數傳遞給實例下的`_write`方法寫入文件
- 返回`flag`
## 實例_write方法的執行流程
此方法用于真正寫入文件
- 查看實例的`fd`屬性是否存在(文件是否打開成功)
- 成功,調用`fs`模塊的`write`方法正式寫入數據
- 更新實例對象下的`len`以及`pos`屬性
- 調用`clearBuffer`方法將緩沖區的內容寫入
- 調用write方法傳入的回調函數`cb`
- 失敗,訂閱一個`open事件`(open事件將會在open方法中被發射),在訂閱中的回調方法中再次以相同的參數調用_write方法
## 實例clearBuffer方法
- 從緩沖區中取出一個數據
- 如果數據存在,調用`_write`方法
- 如果數據不存在,將`isWriting`更改為false,發射`drain`事件
## 注意事項
- 若寫入沒有暫停過,即寫入本地文件的速度遠大于write()調用的速度,那么不會走寫入緩存,也就不會觸發`drain`事件。
## 實現源碼以及測試文件
```
let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter {
constructor(path, options) {
super();
let self = this;
Object.assign(self, options); //還需設置默認值
self.path = path;
self.isWriting = false;
self.Buffer = []; //源碼中為鏈表實現的緩沖區
self.len = null;
self.pos = self.start; //初始化寫入位置
self.fd = null;
self.open();
}
open() {
let self = this;
fs.open(self.path, self.flags, self.mode, (err, fd) => {
self.fd = fd;
if (err) return self.destroy(err);
self.emit('open');
});
}
destroy(err) {
fs.close(this.fd, () => {
this.emit('error', err);
});
}
write(chunk, encoding, cb) {
let self = this
, ret = null;
encoding = encoding?encoding:self.encoding; //優先使用write傳入的編碼方式
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
self.len += chunk.length;
ret = self.highWaterMark > self.len; //判斷當前最新的緩沖區是否已達到最高水位線
if (self.isWriting) { //說明正在調用底層方法真正寫入文件,先寫入Buffer
self.Buffer.push({
chunk
, cb
});
} else {
self.isWriting = true;
self._write(chunk, cb, () => self.clearBuffer());
}
return ret;
}
_write(chunk, cb, clear) {
let self = this;
if (!self.fd) return self.once('open', () => {
self._write(chunk, cb, clear)
});
fs.write(self.fd, chunk, 0, chunk.length, self.pos, (err, bytesWritten) => {
if (err) {
if (self.autoClose) {
self.destroy();
self.emit('error', err);
}
}
self.len -= bytesWritten;
self.pos += bytesWritten;
cb && cb();
clear && clear();
});
}
clearBuffer() {
let self = this
, data = null;
data = self.Buffer.shift();
if (data) {
self._write(data.chunk, data.cb, () => self.clearBuffer());
} else { //此時說明緩沖區已無數據
self.isWriting = false;
self.emit('drain');
}
}
}
module.exports = WriteStream;
```
測試文件:
```// let fs = require('fs');
let WriteStream = require('./practice');
let ws = new WriteStream('./1.txt',{
flags:'w'
,mode:0o666
,start:0
,encoding:'utf8'
,autoClose:true //當流寫完之后自動關閉文件
,highWaterMark:3
});
let n = 9;
ws.on('error',(err)=>{
console.log(err)
})
function write(){
let flag = true;
while(flag&&n>0){
flag = ws.write(n+"",'utf8',()=>{
console.log('ok');
});
n--;
console.log('flag=',flag)
}
ws.once('drain',()=>{
console.log('drain');
write();
})
}
// ws.on('drain',()=>{
// console.log('drain');
// write();
// })
write();
```
---
參考資料:
[https://nodejs.org/dist/latest-v9.x/docs/api/stream.html#stream_writable_streams](https://nodejs.org/dist/latest-v9.x/docs/api/stream.html#stream_writable_streams)