# 附錄B: 高級異步模式
為了了解主要基于 Promise 與 Generator 的面向序列異步流程控制,附錄A介紹了?*asynquence*?庫。
現在我們將要探索其他建立在既存理解與功能之上的高級異步模式,并看看?*asynquence*?是如何在不需要許多分離的庫的情況下,使得這些精巧的異步技術與我們的程序進行混合與匹配的。
## 可迭代序列
我們在前一篇附錄中介紹過?*asynquence*?的可迭代序列,我們將更加詳細地重溫它們。
為了復習,回憶一下:
```source-js
var domready = ASQ.iterable();
// ..
domready.val( function(){
// DOM 準備好了
} );
// ..
document.addEventListener( "DOMContentLoaded", domready.next );
```
現在,讓我們定義將一個多步驟序列定義為一個可迭代序列:
```source-js
var steps = ASQ.iterable();
steps
.then( function STEP1(x){
return x * 2;
} )
.then( function STEP2(x){
return x + 3;
} )
.then( function STEP3(x){
return x * 4;
} );
steps.next( 8 ).value; // 16
steps.next( 16 ).value; // 19
steps.next( 19 ).value; // 76
steps.next().done; // true
```
如你所見,一個可迭代序列是一個標準兼容的?*iterator*(見第四章)。所以,就像一個 generator(或其他任何?*可迭代對象*)那樣,它是可以使用ES6`for..of`循環進行迭代的,
```source-js
var steps = ASQ.iterable();
steps
.then( function STEP1(){ return 2; } )
.then( function STEP2(){ return 4; } )
.then( function STEP3(){ return 6; } )
.then( function STEP4(){ return 8; } )
.then( function STEP5(){ return 10; } );
for (var v of steps) {
console.log( v );
}
// 2 4 6 8 10
```
除了在前一篇附錄中展示的事件觸發的例子之外,可迭代序列的有趣之處還因為它們實質上可以被視為 generator 和 Promise 鏈的替代品,但具備更多靈活性。
考慮一個多Ajax請求的例子 —— 我們已經在第三章和第四章中看到過同樣的場景,分別使用一個 Promise 鏈和一個 generator —— 表達為一個可迭代序列:
```source-js
// 兼容序列的 ajax
var request = ASQ.wrap( ajax );
ASQ( "http://some.url.1" )
.runner(
ASQ.iterable()
.then( function STEP1(token){
var url = token.messages[0];
return request( url );
} )
.then( function STEP2(resp){
return ASQ().gate(
request( "http://some.url.2/?v=" + resp ),
request( "http://some.url.3/?v=" + resp )
);
} )
.then( function STEP3(r1,r2){ return r1 + r2; } )
)
.val( function(msg){
console.log( msg );
} );
```
可迭代序列表達了一系列順序的(同步的或異步的)步驟,它看起來與一個 Promise 鏈極其相似 —— 換言之,它要比單純嵌套的回調看起來干凈的多,但沒有 generator 的基于`yield`的順序化語法那么好。
但我們將可迭代序列傳入`ASQ#runner(..)`,它將可迭代序列像一個 generator 那樣運行至完成。由于幾個原因,一個可迭代序列的行為實質上與一個 generator 相同的事實是值得注意的:
首先,對于ES6 generator 的特定子集來說,可迭代對象是它的一種前ES6等價物,這意味著你既可以直接編寫它們(為了在任何地方都能運行),也可以編寫ES6 generator 并將它們轉譯/轉換成可迭代序列(或者 Promise 鏈!)。
將一個異步運行至完成的 generator 考慮為一個 Promise 鏈的語法糖,是對它們之間的同構關系的一種重要認識。
在我們繼續之前,我們應當注意到,前一個代碼段本可以用?*asynquence*?表達為:
```source-js
ASQ( "http://some.url.1" )
.seq( /*STEP 1*/ request )
.seq( function STEP2(resp){
return ASQ().gate(
request( "http://some.url.2/?v=" + resp ),
request( "http://some.url.3/?v=" + resp )
);
} )
.val( function STEP3(r1,r2){ return r1 + r2; } )
.val( function(msg){
console.log( msg );
} );
```
進一步,步驟2本可以被表達為:
```source-js
.gate(
function STEP2a(done,resp) {
request( "http://some.url.2/?v=" + resp )
.pipe( done );
},
function STEP2b(done,resp) {
request( "http://some.url.3/?v=" + resp )
.pipe( done );
}
)
```
那么,為什么我們要在一個簡單/扁平的?*asyquence*?鏈看起來可以很好地工作的情況下,很麻煩地將自己的控制流在一個`ASQ#runner(..)`步驟中表達為一個可迭代序列呢?
因為可迭代序列的形式有一種重要的技巧可以給我們更多的力量。繼續讀。
### 擴展可迭代序列
Generator,普通的?*asynquence*?序列,和 Promise 鏈,都是被?急切求值?的 —— 控制流程最初要表達的的內容?*就是*?緊跟在后面的固定流程。
然而,可迭代序列是?懶惰求值?的,這意味著在可迭代序列執行期間,如果有需要的話你可以用更多的步驟擴展這個序列。
注意:?你只能在一個可迭代序列的末尾連接,而不是在序列的中間插入。
為了熟悉這種能力,首先讓我們看一個比較簡單(同步)的例子:
```source-js
function double(x) {
x *= 2;
// 我們應當繼續擴展嗎?
if (x < 500) {
isq.then( double );
}
return x;
}
// 建立單步可迭代序列
var isq = ASQ.iterable().then( double );
for (var v = 10, ret;
(ret = isq.next( v )) && !ret.done;
) {
v = ret.value;
console.log( v );
}
```
這個可迭代序列開始時只有一個定義好的步驟(`isq.then(double)`),但是這個序列會在特定條件下(`x < 500`)持續擴展自己。*asynquence*?序列和 Promise 鏈在技術上都?*可以*?做相似的事情,但是我們將看到它們的這種能力不足的一些原因。
這個例子意義不大,而且本可以使用一個 generator 中的`while`循環來表達,所以我們將考慮更精巧的情況。
例如,你可以檢查一個Ajax請求的應答,看它是否指示需要更多的數據,你可以條件性地向可迭代序列插入更多的步驟來發起更多的請求。或者你可以條件性地在Ajax處理器的末尾加入一個格式化步驟。
考慮如下代碼:
```source-js
var steps = ASQ.iterable()
.then( function STEP1(token){
var url = token.messages[0].url;
// 有額外的格式化步驟被提供嗎?
if (token.messages[0].format) {
steps.then( token.messages[0].format );
}
return request( url );
} )
.then( function STEP2(resp){
// 要為序列增加另一個Ajax請求嗎?
if (/x1/.test( resp )) {
steps.then( function STEP5(text){
return request(
"http://some.url.4/?v=" + text
);
} );
}
return ASQ().gate(
request( "http://some.url.2/?v=" + resp ),
request( "http://some.url.3/?v=" + resp )
);
} )
.then( function STEP3(r1,r2){ return r1 + r2; } );
```
你可以在兩個地方看到我們使用`steps.then(..)`條件性地擴展了`step`。為了運行這個`steps`可迭代序列,我們只要使用`ASQ#runner(..)`將它與一個?*asynquence*?序列(這里稱為`main`)鏈接進我們的主程序流程中:
```source-js
var main = ASQ( {
url: "http://some.url.1",
format: function STEP4(text){
return text.toUpperCase();
}
} )
.runner( steps )
.val( function(msg){
console.log( msg );
} );
```
`steps`可迭代序列的靈活性可以使用一個 generator 來表達嗎?某種意義上可以,但我們不得不以一種有些尷尬的方式重新安排邏輯:
```source-js
function *steps(token) {
// **步驟 1**
var resp = yield request( token.messages[0].url );
// **步驟 2**
var rvals = yield ASQ().gate(
request( "http://some.url.2/?v=" + resp ),
request( "http://some.url.3/?v=" + resp )
);
// **步驟 3**
var text = rvals[0] + rvals[1];
// **步驟 4**
// 有額外的格式化步驟被提供嗎?
if (token.messages[0].format) {
text = yield token.messages[0].format( text );
}
// **步驟 5**
// 要為序列增加另一個Ajax請求嗎?
if (/foobar/.test( resp )) {
text = yield request(
"http://some.url.4/?v=" + text
);
}
return text;
}
// 注意:`*steps()`可以向先前的`step`一樣被相同的`ASQ`序列運行
```
先把我們已經知道的序列的好處,以及看起來同步的 generator 語法(見第四章)放在一邊,`steps`邏輯不得不在`*steps()`generator 形式中重排,來假冒可擴展的可迭代序列`steps`的動態機制。
那么,使用 Promise 或者序列如何表達這種功能呢?你?*可以*?這么做:
```source-js
var steps = something( .. )
.then( .. )
.then( function(..){
// ..
// 擴展這個鏈條,對吧?
steps = steps.then( .. );
// ..
})
.then( .. );
```
這里要抓住的問題很微妙但很重要。那么,考慮試著將我們的`stpes`?Promise 鏈連接到我們的主程序流程中 —— 這次使用 Promise 代替?*asynquence*?來表達:
```source-js
var main = Promise.resolve( {
url: "http://some.url.1",
format: function STEP4(text){
return text.toUpperCase();
}
} )
.then( function(..){
return steps; // 提示!
} )
.val( function(msg){
console.log( msg );
} );
```
現在你能發現問題嗎?仔細觀察!
對于序列步驟的順序來說,這里有一個競合狀態。當你`return steps`時,`steps`在那個時刻?*可能*?是原本定義好的 promise 鏈了,或者它現在可能通過`steps = steps.then(..)`調用正指向擴張的 promise 鏈,這要看事情以什么順序發生。
這里有兩種可能的結果:
* 如果`steps`仍然是原來的 Promise 鏈,一旦它稍后通過`steps = steps.then(..)`“擴展”,這個位于鏈條末尾的擴展過的 promise 是?不會?被`main`流程考慮的,因為它已經通過這個`steps`鏈了。這就是不幸的?急切求值?限制。
* 如果`steps`已經是擴展過的 promise 鏈了,那么由于這個擴展過的 promise 正是`main`要通過的東西,所以它會如我們期望的那樣工作。
第一種情況除了展示競合狀態不可容忍的明顯事實,它還展示了 promise 鏈的?急切求值。相比之下,我們可以很容易地擴展可迭代序列而沒有這樣的問題,因為可迭代序列是?懶惰求值?的。
你越需要自己的流程控制動態,可迭代序列就越顯得強大。
提示:?在?*asynquence*?的網站([https://github.com/getify/asynquence/blob/master/README.md#iterable-sequences)上可以看到更多關于可迭代序列的信息與示例。](https://github.com/getify/asynquence/blob/master/README.md#iterable-sequences)
## 事件響應式
(至少!)從第三章看來這應當很明顯:Promise 是你異步工具箱中的一種非常強大的工具。但它們明顯缺乏處理事件流的能力,因為一個 Promise 只能被解析一次。而且坦白地講,對于?*asynquence*?序列來說這也正是它的一個弱點。
考慮這樣一個場景:你想要在一個特定事件每次被觸發時觸發一系列步驟。一個單獨的 Promise 或序列不能表示這個事件全部的發生狀況。所以,你不得不為每一個事件的發生創建一個全新的 Promise 鏈(或序列),比如:
```source-js
listener.on( "foobar", function(data){
// 創建一個新的事件處理 Promise 鏈
new Promise( function(resolve,reject){
// ..
} )
.then( .. )
.then( .. );
} );
```
在這種方式擁有我們需要的基本功能,但是對于表達我們意圖中的邏輯來說遠不能使人滿意。兩種分離的能力混雜在這個范例中:事件監聽,與事件應答;而關注點分離原則懇求我們將這些能力分開。
細心的讀者會發現,這個問題與我們在第二章中詳細講解過的問題是有些對稱的;它是一種控制反轉問題。
想象一下非反轉這個范例,就像這樣:
```source-js
var observable = listener.on( "foobar" );
// 稍后
observable
.then( .. )
.then( .. );
// 在其他的地方
observable
.then( .. )
.then( .. );
```
值`observable`不是一個真正的 Promise,但你可以像監聽一個 Promise 那樣?*監聽*?它,所以它們是有密切關聯的。事實上,它可以被監聽很多次,而且它會在每次事件(`"foobar"`)發生時都發送通知。
提示:?我剛剛展示過的這個模式,是響應式編程(reactive programming,也稱為 RP)背后的概念和動機的?大幅度簡化,響應式編程已經由好幾種了不起的項目和語言實現/詳細論述過了。RP 的一個變種是函數響應式編程(functional reactive programming,FRP),它指的是在數據流之上實施函數式編程技術(不可變性,參照完整性,等等)。“響應式”指的是隨著事件的推移散布這種功能,以對事件進行應答。對此感興趣的讀者應當考慮學習“響應式可監聽對象”,它源于由微軟開發的神奇的“響應式擴展”庫(對于 JavaScript 來說是 “RxJS”,[http://rxjs.codeplex.com/);它可要比我剛剛展示過的東西精巧和強大太多了。另外,Andre](http://rxjs.codeplex.com/)CAndre)?Staltz 寫過一篇出色的文章([https://gist.github.com/staltz/868e7e9bc2a7b8c1f754),用具體的例子高效地講解了](https://gist.github.com/staltz/868e7e9bc2a7b8c1f754)RP。
### ES7 可監聽對象
在本書寫作時,有一個早期ES7提案,一種稱為“Observable(可監聽對象)”的新數據類型([https://github.com/jhusain/asyncgenerator#introducing-observable),它在精神上與我們在這里講解過的相似,但是絕對更精巧。](https://github.com/jhusain/asyncgenerator#introducing-observable)
這種可監聽對象的概念是,你在一個流上“監聽”事件的方法是傳入一個 generator —— 其實?*迭代器*?才是有趣的部分 —— 它的`next(..)`方法會為每一個事件而調用。
你可以想象它是這樣一種東西:
```source-js
// `someEventStream` 是一個事件流,來自于鼠標點擊之類
var observer = new Observer( someEventStream, function*(){
while (var evt = yield) {
console.log( evt );
}
} );
```
你傳入的 generator 將會`yield`而暫停`while`循環,來等待下一個事件。添附在 generator 實例上的?*迭代器*?的`next(..)`將會在每次`someEventStream`發布一個新事件時被調用,因此這個事件將會使用`evt`數據推進你的 generator/*迭代器*。
在這里的監聽事件功能中,重要的是?*迭代器*?的部分,而不是 generator。所以從概念上講,你實質上可以傳入任何可迭代對象,包括`ASQ.iterable()`可迭代序列。
有趣的是,還存在一些被提案的適配方案,使得從特定類型的流中構建可監聽對象變得容易,例如為DOM事件提案的`fromEvent(..)`。如果你去看看`fromEvent(..)`在早期ES7提案中推薦的實現方式,你會發現它與我們將要在下一節中看到的`ASQ.react(..)`極其相似。
當然,這些都是早期提案,所以最終脫穎而出的東西可能會在外觀/行為上與這里展示的有很大的不同。但是看到在不同的庫與語言提案在概念上的早期統一還是很激動人心的!
### 響應式序列
將這種可監聽對象(和F/RP)的超級簡要的概覽作為我們的啟發與動機,我們現在將展示一種“響應式可監聽對象”的很小的子集的適配方案,我稱之為“響應式序列”。
首先,讓我們從如何創建一個可監聽對象開始,使用一個稱為`react(..)`的?*asynquence*?插件工具:
```source-js
var observable = ASQ.react( function setup(next){
listener.on( "foobar", next );
} );
```
現在,讓我們看看如何為這個`observable`定義一個“響應的”序列 —— 在F/RP中,這通常稱為“監聽”:
```source-js
observable
.seq( .. )
.then( .. )
.val( .. );
```
所以,你只需要通過在這個可監聽對象后面進行鏈接就可以了。很容易,是吧?
在F/RP中,事件流經常會通過一組函數式的變形,比如`scan(..)`,`map(..)`,`reduce(..)`,等等。使用響應式序列,每個事件會通過一個序列的新的實例。讓我們看一個更具體的例子:
```source-js
ASQ.react( function setup(next){
document.getElementById( "mybtn" )
.addEventListener( "click", next, false );
} )
.seq( function(evt){
var btnID = evt.target.id;
return request(
"http://some.url.1/?id=" + btnID
);
} )
.val( function(text){
console.log( text );
} );
```
響應式序列的“響應式”部分來源于分配一個或多個事件處理器來調用事件觸發器(調用`next(..)`)。
響應式序列的“序列”部分正是我們已經探索過的:每一個步驟都可以是任何合理的異步技術 —— 延續回調,Promise 或者 generator。
一旦擬建立了一個響應式序列,只要事件被持續地觸發,它就會一直初始化序列的實例。如果你想停止一個響應式序列,你可以調用`stop()`。
如果一個響應式序列被`stop()`了,你可能還想注銷事件處理器;為此你可以注冊一個拆卸處理器:
```source-js
var sq = ASQ.react( function setup(next,registerTeardown){
var btn = document.getElementById( "mybtn" );
btn.addEventListener( "click", next, false );
// 只要`sq.stop()`被調用,它就會被調用
registerTeardown( function(){
btn.removeEventListener( "click", next, false );
} );
} )
.seq( .. )
.then( .. )
.val( .. );
// 稍后
sq.stop();
```
注意:?在`setup(..)`處理器內部的`this`綁定引用是`sq`響應式序列,所以你可以在響應式序列的定義中使用`this`引用,比如調用`stop()`之類的方法,等等。
這是一個來自 Node.js 世界的例子,使用響應式序列處理到來的HTTP請求:
```source-js
var server = http.createServer();
server.listen(8000);
// 響應式監聽
var request = ASQ.react( function setup(next,registerTeardown){
server.addListener( "request", next );
server.addListener( "close", this.stop );
registerTeardown( function(){
server.removeListener( "request", next );
server.removeListener( "close", request.stop );
} );
});
// 應答請求
request
.seq( pullFromDatabase )
.val( function(data,res){
res.end( data );
} );
// 關閉 node
process.on( "SIGINT", request.stop );
```
`next(..)`觸發器還可以很容易地適配 node 流,使用`onStream(..)`和`unStream(..)`:
```source-js
ASQ.react( function setup(next){
var fstream = fs.createReadStream( "/some/file" );
// 將流的 "data" 事件導向 `next(..)`
next.onStream( fstream );
// 監聽流的結束
fstream.on( "end", function(){
next.unStream( fstream );
} );
} )
.seq( .. )
.then( .. )
.val( .. );
```
你還可以使用序列組合來構成多個響應式序列流:
```source-js
var sq1 = ASQ.react( .. ).seq( .. ).then( .. );
var sq2 = ASQ.react( .. ).seq( .. ).then( .. );
var sq3 = ASQ.react(..)
.gate(
sq1,
sq2
)
.then( .. );
```
這里的要點是,`ASQ.react(..)`是一個F/RP概念的輕量級適配,使得將一個事件流與一個序列的連接成為可能,因此得名“響應式序列”。對于基本的響應式用法,響應式序列的能力通常是足夠的。
注意:?這里有一個使用`ASQ.react(..)`來管理UI狀態的例子([http://jsbin.com/rozipaki/6/edit?js,output),和另一個使用`ASQ.react(..)`來處理HTTP請求/應答流的例子(https://gist.github.com/getify/bba5ec0de9d6047b720e)。](http://jsbin.com/rozipaki/6/edit?js,output)(https://gist.github.com/getify/bba5ec0de9d6047b720e)
## Generator 協程
希望第四章幫助你很好地熟悉了ES6 generator。特別地,我們將重溫并更加深入“Generator 并發性”的討論。
我們想象了一個`runAll(..)`工具,它可以接收兩個或更多的 generator 并且并發地運行它們,讓它們協作地將控制權從一個`yield`到下一個,并帶有可選的消息傳遞。
除了能夠將一個 generator 運行至完成之外,我們在附錄A中談論過的`AQS#runner(..)`是一個`runAll(..)`概念的近似實現,它可以將多個 generator 并發地運行至完成。
那么讓我們看看如何實現第四章的并發Ajax場景:
```source-js
ASQ(
"http://some.url.2"
)
.runner(
function*(token){
// 轉移控制權
yield token;
var url1 = token.messages[0]; // "http://some.url.1"
// 清空消息重新開始
token.messages = [];
var p1 = request( url1 );
// 轉移控制權
yield token;
token.messages.push( yield p1 );
},
function*(token){
var url2 = token.messages[0]; // "http://some.url.2"
// 傳遞消息并轉移控制權
token.messages[0] = "http://some.url.1";
yield token;
var p2 = request( url2 );
// 移控制權
yield token;
token.messages.push( yield p2 );
// 講結果傳遞給下一個序列步驟
return token.messages;
}
)
.val( function(res){
// `res[0]` comes from "http://some.url.1"
// `res[1]` comes from "http://some.url.2"
} );
```
以下是`ASQ#runner(..)`和`runAll(..)`之間的主要不同:
* 每個 generator(協程)都被提供了一個稱為`token`的參數值,它是一個當你想要明確地將控制權傳遞給下一個協程時`yield`用的特殊值。
* `token.messages`是一個數組,持有從前一個序列步驟中傳入的任何消息。它也是一種數據結構,你可以用來在協程之間分享消息。
* `yield`一個 Promise(或序列)值不會傳遞控制權,但會暫停這個協程處理直到這個值準備好。
* 這個協程處理運行到最后`return`或`yield`的值將會傳遞給序列中的下一個步驟。
為了適應不同的用法,在`ASQ#runner(..)`功能的基礎上包裝一層幫助函數也很容易。
### 狀態機
許多程序員可能很熟悉的一個例子是狀態機。在一個簡單包裝工具的幫助下,你可以創一個易于表達的狀態機處理器。
讓我們想象一個這樣的工具。我們稱之為`state(..)`,我們將傳遞給它兩個參數值:一個狀態值和一個處理這個狀態的 generator。`state(..)`將擔負起創建并返回一個適配器 generator 的臟活,并把它傳遞給`ASQ#runner(..)`。
考慮如下代碼:
```source-js
function state(val,handler) {
// 為這個狀態制造一個協程處理器
return function*(token) {
// 狀態轉換處理器
function transition(to) {
token.messages[0] = to;
}
// 設置初始狀態(如果還沒有設置的話)
if (token.messages.length < 1) {
token.messages[0] = val;
}
// 持續運行直到最終狀態(false)
while (token.messages[0] !== false) {
// 當前的狀態匹配這個處理器嗎?
if (token.messages[0] === val) {
// 委托到狀態處理器
yield *handler( transition );
}
// 要把控制權轉移給另一個狀態處理器嗎?
if (token.messages[0] !== false) {
yield token;
}
}
};
}
```
如果你仔細觀察,你會發現`state(..)`返回了一個接收`token`的 generator,然后它建立一個`while`循環,這個循環會運行到狀態機直到到達它的最終狀態(我們隨意地將它選定為`false`值)為止;這正是我們想要傳遞給`ASQ#runner(..)`的那種 generator!
我們還隨意地保留了`token.messages[0]`值槽,放置我們的狀態機將要追蹤的當前狀態,這意味著我們甚至可以指定初始狀態,作為序列中前一個步驟傳遞來的值。
我們如何將`state(..)`幫助函數與`ASQ#runner(..)`一起使用呢?
```source-js
var prevState;
ASQ(
/* 可選的:初始狀態值 */
2
)
// 運行我們的狀態機
// 轉換是:2 -> 3 -> 1 -> 3 -> false
.runner(
// 狀態 `1` 處理器
state( 1, function *stateOne(transition){
console.log( "in state 1" );
prevState = 1;
yield transition( 3 ); // 前往狀態 `3`
} ),
// 狀態 `2` 處理器
state( 2, function *stateTwo(transition){
console.log( "in state 2" );
prevState = 2;
yield transition( 3 ); // 前往狀態 `3`
} ),
// 狀態 `3` 處理器
state( 3, function *stateThree(transition){
console.log( "in state 3" );
if (prevState === 2) {
prevState = 3;
yield transition( 1 ); // 前往狀態 `1`
}
// 完成了!
else {
yield "That's all folks!";
prevState = 3;
yield transition( false ); // 終止狀態
}
} )
)
// 狀態機運行完成,所以繼續
.val( function(msg){
console.log( msg ); // That's all folks!
} );
```
重要的是,`*stateOne(..)`,`*stateTwo(..)`,和`*stateThree(..)`?generator 本身會在每次進入那種狀態時被調用,它們會在你`transition(..)`到另一個值時完成。雖然沒有在這里展示,但是這些狀態 generator 處理器理所當然地可以通過`yield`Promise/序列/thunk 來異步地暫停。
隱藏在底層的 generator 是由`state(..)`幫助函數產生的,實際上被傳遞給`ASQ#runner(..)`的 generator 是持續并發運行至狀態機長度的那一個,它們的每一個都協作地將控制權`yield`給下一個,如此類推。
注意:?看看這個“乒乓”的例子([http://jsbin.com/qutabu/1/edit?js,output),它展示了由`ASQ#runner(..)`驅動的](http://jsbin.com/qutabu/1/edit?js,output)generator 的協作并發的用法。
## 通信序列化處理(CSP)
“通信序列化處理(Communicating Sequential Processes —— CSP)”是由 C. A. R. Hoare 在1978年的一篇學術論文([http://dl.acm.org/citation.cfm?doid=359576.359585)中首先被提出的,后來在1985年的一本同名書籍中被描述過。CSP描述了一種并發“進程”在處理期間進行互動(也就是“通信”)的形式方法。](http://dl.acm.org/citation.cfm?doid=359576.359585)
你可能會回憶起我們在第一章檢視過的并發“進程”,所以我們對CSP的探索將會建立在那種理解之上。
就像大多數計算機科學中的偉大概念一樣,CSP深深地沉浸在學術形式主意中,被表達為一種代數處理。然而,我懷疑滿是符號的代數定理不會給讀者帶來太多實際意義,所以我們將找其他的方法將CSP帶進我們的大腦。
我會將很多CSP的形式描述和證明留給 Hoare 的文章,與其他許多美妙的相關作品。取而代之的是,我們將盡可能以一種非學院派的、但愿是可以直接理解的方法,來試著簡要地講解CSP的思想。
### 消息傳遞
CSP的核心原則是,在獨立進程之間的通信/互動都必須通過正式的消息傳遞。也許與你的期望背道而馳,CSP的消息傳遞是作為同步行為進行描述的,發送進程與接收進程都不得不為消息的傳遞做好準備。
這樣的同步消息怎么會與 JavaScript 中的異步編程有聯系?
這種聯系具體來自于 ES6 generator 的性質 —— generator 被用于生產看似同步的行為,而這些行為的內部既可以是同步的也可以(更可能)是異步的。
換言之,兩個或更多并發運行的 generator 可能看起來像是在互相同步地傳遞消息,而同時保留了系統的異步性基礎,因為每個 generator 的代碼都會被暫停(也就是“阻塞”)來等待一個異步動作的運行。
這是如何工作的?
想象一個稱為“A”的 generator,它想要給 generator “B” 發送一個消息。首先,“A”?`yield`出要發送給“B”的消息(因此暫停了“A”)。當“B”準備好并拿走這個消息時,“A”才會繼續(解除阻塞)。
與此對稱的,想象一個 generator “A”想要?從?“B”接收一個消息。“A”?`yield`出一個從“B”取得消息的請求(因此暫停了“A”),一旦“B”發送了一個消息,“A”就拿來這個消息并繼續。
對于這種CSP消息傳遞理論來說,一個更廣為人知的表達形式是 ClojureScript 的 core.async 庫,以及?*go*?語言。它們將CSP中描述的通信語義實現為一種在進程之間打開的管道,稱為“頻道(channel)”。
注意:?*頻道*?這個術語描述了問題的一部分,因為存在一種模式,會有多于一個的值被一次性發送到這個頻道的“緩沖”中;這與你對流的認識相似。我們不會在這里深入這個問題,但是對于數據流的管理來說它可能是一個非常強大的技術。
在CSP最簡單的概念中,一個我們在“A”和“B”之間建立的頻道會有一個稱為`take(..)`的阻塞方法來接收一個值,以及一個稱為`put(..)`的阻塞方法來發送一個值。
它看起來可能像這樣:
```source-js
var ch = channel();
function *foo() {
var msg = yield take( ch );
console.log( msg );
}
function *bar() {
yield put( ch, "Hello World" );
console.log( "message sent" );
}
run( foo );
run( bar );
// Hello World
// "message sent"
```
將這種結構化的、(看似)同步的消息傳遞互動,與`ASQ#runner(..)`通過`token.messages`數組與協作的`yield`提供的、非形式化與非結構化的消息共享相比較。實質上,`yield put(..)`是一種可以同時發送值并為了傳遞控制權而暫停執行的單一操作,而前一個例子中我們將這兩個步驟分開實施。
另外CSP強調,你不會真正明確地“傳遞控制權”,而是這樣設計你的并發過程:要么為了從頻道中接收值而阻塞,要么為了試著向這個頻道中發送值而阻塞。這種圍繞著消息的發送或接收的阻塞,就是你如何在協程之間協調行為序列的方法。
注意:?預先奉告:這種模式非常強大,但要習慣它有些燒腦。你可能會需要實踐它一下,來習慣這種協調并發性的新的思考方式。
有好幾個了不起的庫已經用 JavaScript 實現了這種風格的CSP,最引人注目的是“js-csp”([https://github.com/ubolonton/js-csp),由](https://github.com/ubolonton/js-csp)James Long ([http://twitter.com/jlongster)開出的分支(https://github.com/jlongster/js-csp),以及他特意撰寫的作品(http://jlongster.com/Taming-the-Asynchronous-Beast-with-CSP-in-JavaScript)。另外,關于將](http://twitter.com/jlongster)(https://github.com/jlongster/js-csp)(http://jlongster.com/Taming-the-Asynchronous-Beast-with-CSP-in-JavaScript)ClojureScript 中 go 風格的 core.async CSP 適配到 JS generator 的話題,無論怎么夸贊 David Nolen ([http://twitter.com/swannodette](http://twitter.com/swannodette)) 的許多作品很精彩都不為過 ([http://swannodette.github.io/2013/08/24/es6-generators-and-csp)。](http://swannodette.github.io/2013/08/24/es6-generators-and-csp)
### asynquence 的 CSP 模擬
因為我們是在我的?*asynquence*?庫的上下文環境中討論異步模式的,你可能會對這個話題很感興趣:我們可以很容易地在`ASQ#runner(..)`?generator 處理上增加一個模擬層,來近乎完美地移植CSP的API和行為。這個模擬層放在與?*asynquence*?一起發放的 “asynquence-contrib”包的可選部分。
與早先的`state(..)`幫助函數非常類似,`ASQ.csp.go(..)`接收一個 generator —— 用 go/core.async 的術語來講,它稱為一個 goroutine —— 并將它適配為一個可以與`ASQ#runner(..)`一起使用的新 generator。
與被傳入一個`token`不同,你的 goroutine 接收一個創建好的頻道(下面的`ch`),這個頻道會被本次運行的所有 goroutine 共享。你可以使用`ASQ.csp.chan(..)`創建更多頻道(這通常十分有用)。
在CSP中,我們使用頻道消息傳遞上的阻塞作為所有異步性的模型,而不是為了等待 Promise/序列/thunk 的完成而發生的阻塞。
所以,與`yield`從`request(..)`中返回的 Promise 不同的是,`request(..)`應當返回一個頻道,你從它那里`take(..)`一個值。換句話說,一個單值頻道在這種上下文環境/用法上大致上與一個 Promise/序列是等價的。
讓我們先制造一個兼容頻道版本的`request(..)`:
```source-js
function request(url) {
var ch = ASQ.csp.channel();
ajax( url ).then( function(content){
// `putAsync(..)` 是 `put(..)` 的另一個版本,
// 它可以在一個 generator 的外部使用。它為操作
// 的完成返回一個 promise。我們不在這里使用這個
// promise,但如果有需要的話我們可以在值被
// `taken(..)` 之后收到通知。
ASQ.csp.putAsync( ch, content );
} );
return ch;
}
```
在第三章中,“promisory”是一個生產 Promise 的工具,第四章中“thunkory”是一個生產thunk的工具,最后,在附錄A中我們發明了“sequory”表示一個生產序列的工具。
很自然地,我們需要為一個生產頻道的工具杜撰一個對稱的術語。所以就讓我們不出意料地稱它為“chanory”(“channel” + “factory”)吧。作為一個留給讀者的練習,請試著親手定義一個`channelify(..)`的工具,就像?`Promise.wrap(..)`/`promisify(..)`(第三章),`thunkify(..)`(第四章),和`ASQ.wrap(..)`(附錄A)一樣。
先考慮這個使用?*asyquence*?風格CSP的并發Ajax的例子:
```source-js
ASQ()
.runner(
ASQ.csp.go( function*(ch){
yield ASQ.csp.put( ch, "http://some.url.2" );
var url1 = yield ASQ.csp.take( ch );
// "http://some.url.1"
var res1 = yield ASQ.csp.take( request( url1 ) );
yield ASQ.csp.put( ch, res1 );
} ),
ASQ.csp.go( function*(ch){
var url2 = yield ASQ.csp.take( ch );
// "http://some.url.2"
yield ASQ.csp.put( ch, "http://some.url.1" );
var res2 = yield ASQ.csp.take( request( url2 ) );
var res1 = yield ASQ.csp.take( ch );
// 講結果傳遞給序列的下一個步驟
ch.buffer_size = 2;
ASQ.csp.put( ch, res1 );
ASQ.csp.put( ch, res2 );
} )
)
.val( function(res1,res2){
// `res1` comes from "http://some.url.1"
// `res2` comes from "http://some.url.2"
} );
```
消息傳遞在兩個 goroutines 之間進行的 URL 字符串交換是非常直接的。第一個 goroutine 向第一個URL發起一個Ajax請求,它的應答被放進`ch`頻道。第二個 goroutine 想第二個URL發起一個Ajax請求,然后從`ch`頻道取下第一個應答`res1`。在這個時刻,應答`res1`和`res2`都被完成且準備好了。
如果在 goroutine 運行的末尾`ch`頻道還有什么剩余價值的話,它們將被傳遞進序列的下一個步驟中。所以,為了從最后的 goroutine 中傳出消息,把它們`put(..)`進`ch`。就像展示的那樣,為了避免最后的那些`put(..)`阻塞,我們通過把`ch`的`buffer_size`設置為`2`(默認是`0`)來將它切換到緩沖模式。
注意:?更多使用?*asynquence*?風格CSP的例子可以參見這里([https://gist.github.com/getify/e0d04f1f5aa24b1947ae)。](https://gist.github.com/getify/e0d04f1f5aa24b1947ae)%E3%80%82)
## 復習
Promise 和 generator 為我們能夠創建更加精巧和強大的異步性提供了基礎構建塊。
*asynquence*?擁有許多工具,用于實現?*的迭代序列*,*響應式序列*(也就是“可監聽對象”),*并發協程*,甚至?*CSP goroutines*。
將這些模式,與延續回調和 Promise 能力相組合,使得?*asynquence*?擁有了混合不同異步處理的強大功能,一切都整合進一個干凈的異步流程控制抽象:序列。