# 第5章 并行編程
| 翻譯: | 張馳原 |
|-----|-----|
| 校訂: | 連城 |
進程和進程間通信都是Erlang以及所有并行編程中最基本的概念,進程的創建和進程間的通信都是顯式進行的。
### 進程的創建
一個進程是一個獨立自治的計算單元,它與系統中其他的進程并行地存在。進程之間沒有繼承的層次關系,不過應用程序的設計者也可以顯式地創建這樣一個層次關系。
BIF spawn/3創建并開始執行一個新的進程,它的參數和apply/3是一樣的:
~~~
Pid = spawn(Module, FunctionName, ArgumentList)
~~~
和apply不同的是,spawn并不是直接對函數進行求值并返回其結果,而是啟動一個新的并行進程用于對函數進行求值,返回值是這個新創建的進程的Pid(進程標識符)。和一個進程的所有形式的交互都是通過Pid來進行的。spawn/3會在啟動新進程之后**立即**返回,而**不會**等待它對函數完成求值過程。
在圖5.1(a)中,我們有一個標識符為Pid1的進程調用了如下函數:
~~~
Pid2 = spawn(Mod, Func, Args)
~~~
在spawn返回之后,會有兩個進程Pid1和Pid2并行地存在,狀態如圖5.1(b)所示。現在只有進程Pid1知道新進程的標識符,亦即Pid2。由于Pid是一切進程間通訊的必要元素,一個Erlang系統中的安全性也是建立在限制進程Pid分發的基礎上的。

圖5.1
當傳遞給spawn的函數執行完畢之后,進程會自動退出。這個頂層函數的返回值將被丟棄[[1]](#)。
進程標識符是一個普通的數據對象,可以像操作其他對象那樣對其進行處理。例如,它可以被放到一個列表或元組中,可以與其他標識符進行比較,也可以當做消息發送給其他進程。
### 進程間通信
在Erlang中進行進程間通信的唯一方法就是消息傳遞。一個消息通過原語!(send)發送給另一個進程:
~~~
Pid ! Message
~~~
Pid是要向其發送消息的進程的標識符。任何合法的Erlang表達式都可以作為一個消息發送。send是一個會對其參數進行求值的原語。它的返回值是發送的消息。因此:
~~~
foo(12) ! bar(baz)
~~~
會分別對foo(12)和bar(baz)進行求值得到進程標識符和要發送的消息。如同其他的Erlang函數一樣,send對其參數的求值順序是不確定的。它會將消息參數求值的結果作為返回值返回。發送消息是一個異步操作,因此send既不會等待消息送達目的地也不會等待對方收到消息。就算發送消息的目標進程已經退出了,系統也不會通知發送者。這是為了保持消息傳遞的異步性──應用程序必須自己來實現各種形式的檢查(見下文)。消息一定會被傳遞到接受者那里,并且保證是按照其發送的順序進行傳遞的。
原語receive被用于接收消息。它的語法如下:
~~~
receive
Message1 [when Guard1] ->
Actions1 ;
Message2 [when Guard2] ->
Actions2 ;
...
end
~~~
每個進程都有一個郵箱,所有發送到該進程的消息都被按照它們到達的順序依次存儲在郵箱里。在上面的例子中,Message1和Message2是用于匹配進程郵箱中的消息的**模式**。當找到一個匹配的消息并且對應的保護式(Guard)滿足的時候,這個消息就被選中,并從郵箱中刪除,同時對應的ActionsN會被執行。receive會返回ActionosN中最后一個表達式求值的結果。就如同Erlang里其他形式的模式匹配一樣,消息模式中未綁定(unbound)量會被綁定(bound)。未被receive選中的消息會按照原來的順序繼續留在郵箱中,用于下一次recieve的匹配。調用receive的進程會一直阻塞,直到有匹配的消息為止。
Erlang有一種選擇性接收消息的機制,因此意外發送到一個進程的消息不會阻塞其它正常的消息。不過,由于所有未匹配的消息會被留在郵箱中,保證系統不要完全被這樣的無關消息填滿就變成了程序員的責任。
### 消息接收的順序
當receive嘗試尋找一個匹配的消息的時候,它會依次對郵箱中的每一個消息嘗試用給定的每個模式去進行匹配。我們用下面的例子來解釋其工作原理。
圖5.2(a)給出了一個進程的郵箱,郵箱里面有四個消息,依次是msg_1、msg_2、msg_3和msg_4。運行
~~~
receive
msg_3 ->
...
end
~~~

圖5.2
會匹配到郵箱中的msg_3并導致它被從郵箱中刪除。然后郵箱的狀態會變成如圖5.2(b)所示。當我們再運行
~~~
receive
msg_4 ->
...
msg_2 ->
...
end
~~~
的時候,receive會依次對郵箱中的每一個消息,首先嘗試與msg_4匹配,然后嘗試與msg_2匹配。結果是msg_2匹配成功并被從郵箱中刪除,郵箱的狀態變成圖5.2(c)那樣。最后,運行
~~~
receive
AnyMessage ->
...
end
~~~
其中AnyMessage是一個未綁定(unbound)的變量,receive會匹配到郵箱里的msg_1并將其刪除,郵箱中最終只剩下msg_4,如圖5.2(d)所示。
這說明receive里的模式的順序并不能直接用來實現消息的優先級,不過這可以通過超時的機制來實現,詳見第??小節。
### 只接收來自某個特定進程的消息
有時候我們會只希望接收來自某一個特定進程的消息。要實現這個機制,消息發送者必須顯式地在消息中包含自己的進程標識符:
~~~
Pid | {self(),abc}
~~~
BIF self()返回當前進程的標識符。這樣的消息可以通過如下方式來接收:
~~~
receive
{Pid,Msg} ->
...
end
~~~
如果Pid已經預先綁定(bound)到發送者的進程標識符上了,那么如上所示的receive就能實現只接收來自該進程[[2]](#)的消息了。
### 一些例子
程序5.1中的模塊實現了一個簡單的計數器,可以用來創建一個包含計數器的進程并對計數器進行遞增操作。
程序 5.1
~~~
-module(counter).
-export([start/0,loop/1]).
start() ->
spawn(counter, loop, [0]).
loop(Val) ->
receive
increment ->
loop(Val + 1)
end.
~~~
這個例子展示了一些基本概念:
- 每個新的計數器進程都通過調用counter:start/0來創建。每個進程都會以調用counter:loop(0)啟動。
- 用于實現一個**永久**的進程的遞歸函數調用在等待輸入的時候會被掛起。loop是一個**尾遞歸**函數,這讓計數器進程所占用的空間保持為一個常數。
- 選擇性的消息接收,在這個例子中,僅接收increment消息。
不過,在這過例子中也有不少缺陷,比如:
- 由于計數器的值是一個進程的局部變量,只能被自己訪問到,卻其他進程沒法獲取這個值。
- 消息協議是顯式的,其他進程需要顯式地發送increment消息給計數器進程。
程序5.2
~~~
-module(counter).
-export([start/0,loop/1,increment/1,value/1,stop/1]).
%% First the interface functions.
start() ->
spawn(counter, loop, [0]).
increment(Counter) ->
Counter ! increment.
value(Counter) ->
Counter ! {self(),value}
receive
{Counter,Value} ->
Value
end.
stop(Counter) ->
Counter ! stop.
%% The counter loop.
loop(Val) ->
receive
increment ->
loop(Val + 1);
{From,value} ->
From ! {self(),Val},
loop(Val);
stop -> % No recursive call here
true;
Other -> % All other messages
loop(Val)
end.
~~~
下一個例子展示了如何修正這些缺陷。程序5.2是counter模塊的改進版,允許對計數器進行遞增、訪問計數器的值以及停止計數器。
同前一個例子中一樣,在這里一個新的計數器進程通過調用counter::start()啟動起來,返回值是這個計數器的進程標識符。為了隱藏消息傳遞的協議,我們提供了接口函數increment、value和stop來操縱計數器。
計數器進程使用選擇性接收的機制來處理發送過來的請求。它同時展示了一種處理未知消息的方法。通過在receive的最后一個子句中使用未綁定(unbound)的變量Other作為模式,任何未被之前的模式匹配到的消息都會被匹配到,此時我們直接忽略這樣的未知消息并繼續等待下一條消息。這是處理未知消息的標準方法:通過receive把它們從郵箱中刪除掉。
為了訪問計數器的值,我們必須將自己的Pid作為消息的一部分發送給計數器進程,這樣它才能將回復發送回來。回復的消息中也包含了發送方的進程標識符(在這里也就是計數器進程的Pid),這使得接收進程可以只接收包含回復的這個消息。簡單地等待一個包含未知值(在這個例子中是一個數字)的消息是不安全的做法,任何不相關的碰巧發送到該進程的消息都會被匹配到。因此,在進程之間發送的消息通常都會包含某種標識自己的機制,一種方法是通過內容進行標識,就像發送給計數器進程的請求消息一樣,另一種方法是通過在消息中包含某種“唯一”并且可以很容易識別的標識符,就如同計數器進程發回的包含計數器值的回復消息一樣。

圖5.3
現在我們再來考慮對一個有窮自動機(FSM)進行建模。圖5.3展示了一個4狀態的簡單FSM以及可能的狀態轉移和相應的觸發事件。一種編寫這樣的“狀態-事件”機器的方法如程序5.3所示。在這段代碼中,我們只專注于如何表示狀態以及管理狀態之間的轉移。每個狀態由一個單獨的函數表示,而事件則表示為消息。
程序 5.2
~~~
s1() ->
receive
msg_a ->
s2();
msg_c ->
s3()
end.
s2() ->
receive
msg_x ->
s3();
msg_h ->
s4()
end.
s3() ->
receive
msg_b ->
s1();
msg_y ->
s2()
end.
s4() ->
receive
msg_i ->
s3()
end.
~~~
轉臺函數通過receive來等待事件所對應的消息。當收到消息時,FSM通過調用相應的狀態函數轉移到指定的狀態。通過保證每次對于新狀態的函數的調用都是最后一個語句(參見第??小節),FSM進程可以在一個常數大小的空間中進行求值。
狀態數據可以通過為狀態函數添加參數的方式來處理。需要在進入狀態的時候執行的動作在調用receive之前完成,而需要在離開狀態時執行的動作可以放在對應的receive子句中調用新的狀態函數之前。
### 超時
Erlang中用于接收消息的基本原語receive可以通過添加一個可選的超時子句來進行增強,完整的語法變成這樣:
~~~
receive
Message1 [when Guard1] ->
Actions1 ;
Message2 [when Guard2] ->
Actions2 ;
...
after
TimeOutExpr ->
ActionsT
end
~~~
TimeOutExpr是一個整數值表達式,表示**毫秒**數。時間的精確程度受到具體Erlang實現的底層操作系統以及硬件的限制——這是一個局部性問題(local issue)。如果在指定的時間內沒有任何消息被匹配到,超時將會發生,ActionsT會被執行,而具體什么時候執行則是依賴與很多因素的,比如,和系統當前的負載有關系。
例如,對于一個窗口系統,類似于下面的代碼可能會出現在處理事件的進程中:
~~~
get_event() ->
receive
{mouse, click} ->
receive
{mouse, click} ->
double_click
after double_click_interval() ->
single_click
end
...
end.
~~~
在這個模型中,事件由消息來表示。get_event函數會等待一個消息,然后返回一個表示對應事件的原子式。我們希望能檢測鼠標雙擊,亦即在某一個較短時間段內的連續兩次鼠標點擊。當接收到一個鼠標點擊事件時我們再通過receive試圖接收下一個鼠標點擊事件。不過,我們為這個receive添加了一個超時,如果在指定的時間內(由double_click_interval指定)沒有發生下一次鼠標點擊事件,receive就會超時,此時get_event會返回single_click。如果第二個鼠標點擊事件在給定的超時時限之內被接收到了,那么get_event將會返回double_click。
在超時表達式的參數中有兩個值有特殊意義:
infinity
> 原子式infinity表示超時**永遠**也不會發生。如果超時時間需要在運行時計算的話,這個功能就很有用。我們可能會希望通過對一個表達式進行求值來得到超時長度:如果返回值是infinity的話,則永久等待。
0
> 數值0表示超時會立即發生,不過在那之前系統仍然會首先嘗試對郵箱中已有的消息進行匹配。
在receive中使用超時比一下子想象到的要有用得多。函數sleep(Time)將當前進程掛起Time毫秒:
~~~
sleep(Time) ->
receive
after Time ->
true
end.
~~~
flush_buffer()清空當前進程的郵箱:
~~~
flush_buffer() ->
receive
AnyMessage ->
flush_buffer()
after 0 ->
true
end.
~~~
只要郵箱中還有消息,第一個消息會被匹配到(未綁定變量AnyMessage會匹配到任何消息,在這里就是第一個消息),然后flush_buffer會再次被調用,但是如果郵箱已經為空了,那么函數會從超時子句中返回。
消息的優先級也可以通過使用0作為超時長度來實現:
~~~
priority_receive() ->
receive
interrupt ->
interrupt
after 0 ->
receive
AnyMessage ->
AnyMessage
end
end
~~~
函數priority_receive會返回郵箱中第一個消息,**除非**有消息interrupt發送到了郵箱中,此時將返回interrupt。通過首先使用超時時長0來調用receive去匹配interrupt,我們可以檢查郵箱中是否已經有了這個消息。如果是,我們就返回它,否則,我們再通過模式AnyMessage去調用receive,這將選中郵箱中的第一條消息。
程序 5.4
~~~
-module(timer).
-export([timeout/2,cancel/1,timer/3]).
timeout(Time, Alarm) ->
spawn(timer, timer, [self(),Time,Alarm]).
cancel(Timer) ->
Timer ! {self(),cancel}.
timer(Pid, Time, Alarm) ->
receive
{Pid,cancel} ->
true
after Time ->
Pid ! Alarm
end.
~~~
在receive中的超時純粹是在receive語句內部的,不過,要創建一個全局的超時機制也很容易。在程序5.4中的timer模塊中的timer::timeout(Time,Alarm)函數就實現了這個功能。
調用timer:timeout(Time,Alarm)會導致消息Alarm在時間Time之后被發送到調用進程。該函數返回計時器進程的標識符。當進程完成自己的任務之后,可以使用該計時器進程標識符來等待這個消息。通過調用timer::cancel(Timer),進程也可以使用這個標識符來撤銷計時器。需要注意的是,調用timer:cancel并不能**保證**調用進程不會收到Alarm消息,這是由于cancel消息有可能在Alarm消息被發送出去之后才被收到的。
### 注冊進程
為了向一個進程發送消息,我們需要事先知道它的進程標識符(Pid)。在某些情況下,這有些不切實際甚至不太合理。比如,在一個大型系統中通常存在許多全局服務器,或者某個進程由于安全方面的考慮希望隱藏它自己的標識符。為了讓一個進程在并不事先知道對方的進程標識符的情況下向其發送消息,我們提供了**注冊**進程的機制,換句話說,給進程一個名字。注冊進程的名字必須是一個原子式。
### 基本原語
Erlang提供了四個BIF來操縱注冊進程的名字:
register(Name,Pid)
> 將原子式Name關聯到進程Pid。
unregister(Name)
> 刪除原子式Name與對應進程的關聯。
whereis(Name)
> 返回關聯到注冊名Name的進程標識符,如果沒有任何進程關聯到這個名字,則返回原子式undefined。
registered()
> 返回一個包含所有當前已注冊過的名字。
消息發送的原語“!” 允許直接使用一個注冊進程的名字作為目標,例如:
~~~
number_analyzer ! {self(), {analyse,[1,2,3,4]}}
~~~
表示將消息{Pid,{analyse,[1,2,3,4]}}發送到注冊為numeber_analyser的進程那里。Pid是調用send的進程的標識符。
### “客戶端-服務端”模型
注冊進程的一個主要用途就是用于支持**“客戶端-服務端”模型**編程。在這個模型中有一個**服務端**管理著一些資源,一些**客戶端**通過向服務端發送請求來訪問這些資源,如圖5.4所示。要實現這個模型,我們需要三個基本組件——一個**服務端**,一個**協議**和一個**訪問庫**。我們將通過幾個例子來闡明基本原則。
在先前的程序5.2中展示的counter模塊里,每一根計數器都是一個服務端。客戶端通過調用模塊所定義的函數來訪問服務端。

圖5.4
程序5.5中展示的例子是一個可以用于電話交換機系統里分析用戶所撥打的號碼的服務端。start()會調用spawn并將新建的進程注冊為number_analyser,這就完成了號碼分析服務端的創建。之后服務端進程會在server函數中不斷循環并等待服務請求。如果收到了一個形如{add_number,Seq,Dest}的請求,該號碼序列(Seq)以及對應的目標進程(Dest),以及分析出結果之后將會發送的目的地,會被添加到查找表中。這是由函數insert完成的。之后消息ack將會被發送到請求的進程。如果服務端收到了形如{analyse,Seq}的消息,那么它將通過調用lookup完成號碼序列Seq的分析,并將包含分析結果的消息發回發送請求的進程。我們在這里沒有給出函數insert和lookup的具體定義,因為那對于我們目前討論的問題而言并不重要。
客戶端發送到服務端的請求消息包含了自己的進程標識符。這讓服務端可以向客戶端發送回復。發回的回復消息中也包含了一個“發送者”的標識,在這里就是服務端的注冊名字,這使得客戶端可以選擇性地接收回復消息。這比簡單地等待第一個消息到達要更加安全一些——因為客戶端的郵箱中也許已經有了一些消息,或者其他進程也許會在服務端回復**之前**給客戶端發送一些消息。
程序 5.5
~~~
-module(number_analyser).
-export([start/0,server/1]).
-export([add_number/2,analyse/1]).
start() ->
register(number_analyser,
spawn(number_analyser, server, [nil])).
%% The interface functions.
add_number(Seq, Dest) ->
request({add_number,Seq,Dest}).
analyse(Seq) ->
request({analyse,Seq}).
request(Req) ->
number_analyser ! {self(), Req},
receive
{number_analyser, Reply} ->
Reply
end.
%% The server.
server(AnalTable) ->
receive
{From, {analyse,Seq}} ->
Result = lookup(Seq, AnalTable),
From ! {number_analyser, Result},
server(AnalTable);
{From, {add_number, Seq, Dest}} ->
From ! {number_analyser, ack},
server(insert(Seq, Dest, AnalTable))
end.
~~~
現在我們已經實現了服務端并定義了協議。我們在這里使用了一個異步協議,每個發送到服務端的請求都會有一個回復。在服務端的回復中我們使用number_analyser(亦即服務端的注冊名字)作為發送者標識,這樣做是因為我們不希望暴露服務端的Pid。
接下來我們定義一些**接口**函數用于以一種標準的方式訪問服務端。函數add_number和analyse按照上面描述的方式實現了客戶端的協議。它們都使用了局部函數request來發送請求并接收回復。
程序5.6
~~~
-module(allocator).
-export([start/1,server/2,allocate/0,free/1]).
start(Resources) ->
Pid = spawn(allocator, server, [Resources,[]]),
register(resource_alloc, Pid).
% The interface functions.
allocate() ->
request(alloc).
free(Resource) ->
request({free,Resource}).
request(Request) ->
resource_alloc ! {self(),Request},
receive
{resource_alloc,Reply} ->
Reply
end.
% The server.
server(Free, Allocated) ->
receive
{From,alloc} ->
allocate(Free, Allocated, From);
{From,{Free,R}} ->
free(Free, Allocated, From, R)
end.
allocate([R|Free], Allocated, From) ->
From ! {resource_alloc,{yes,R}},
server(Free, [{R,From}|Allocated]);
allocate([], Allocated, From) ->
From ! {resource_alloc,no},
server([], Allocated).
free(Free, Allocated, From, R) ->
case lists:member({R,From}, Allocated) of
true ->
From ! {resource_alloc,ok},
server([R|Free], lists:delete({R,From}, Allocated));
false ->
From ! {resource_alloc,error},
server(Free, Allocated)
end.
~~~
下一個例子是如程序5.6中所示的一個簡單的資源分配器。服務端通過一個需要管理的初始的資源列表來啟動。其他進程可以向服務端請求分配一個資源或者將不再使用的資源釋放掉。
服務端進程維護兩個列表,一個是未分配的資源列表,另一個是已分配的資源列表。通過將資源在兩個列表之間移動,服務端可以追蹤每個資源的分配情況。
當服務端收到一個請求分配資源的消息時,函數allocate/3會被調用,它會檢查是否有未分配的資源存在,如果是則將資源放在回復給客戶端的yes消息中發送回去,否則直接發回no消息。未分配資源列表是一個包含所有未分配資源的列表,而已分配資源列表是一個二元組{Resource,AllocPid}的列表。在一個資源被釋放之前,亦即從已分配列表中刪除并添加到未分配列表中去之前,我們首先會檢查它是不是一個已知的資源,如果不是的話,就返回error。
### 討論
**接口函數**的目的是創建一個抽象層并隱藏客戶端和服務端之間使用的協議的細節。一個服務的用戶在使用服務的時候并不需要知道協議的細節或者服務端所使用的內部數據結構以及算法。一個服務的**具體實現**可以在保證外部用戶接口一致性的情況下自由地更改這些**內部細節**。
此外,回復服務請求的進程還有可能并不是實際的服務器進程,而是一個不同的進程——所有的請求都被委轉發到它那里。實際上,“一個”服務器可能會是一個巨大的進程網絡,這些互通的進程一起實現了給定的服務,但是卻被接口函數隱藏起來。應當**發布**的是接口函數的集合,它們應當被暴露給用戶,因為這些函數提供了唯一**合法**的訪問服務端提供的服務的方式。
在Erlang中實現的“客戶端-服務端”模型是非常靈活的。*monitor*或*remote procedure call*之類的機制可以很容易地實現出來。在某些特殊的情況下,**具體實現**也可以繞過接口函數直接與服務端進行交互。由于Erlang并沒有**強制**創建或使用這樣的接口函數,因此需要由系統設計師來保證在需要的時候創建它們。Erlang并沒有提供用于遠程過程調用之類的現成解決方案,而是提供了一些基本原語用于構造這樣的解決方案。
### 進程調度,實時性以及優先級
到目前為止我們還沒有提到過一個Erlang系統中的進程是如何調度的。雖然這是一個實現相關的問題,但是也有一些所有實現都需要滿足的準則:
- 調度算法必須是**公平**的,換句話說,任何可以運行的進程都會被執行,并且(如果可能的話)按照它們變得可以運行的順序來執行。
- 不允許任意一個進程長期阻塞整個系統。一個進程被分配一小段運行時間(稱為**時間片**),再那之后它將被掛起并等待下一次運行調度,以使得其他可運行的進程也有機會運行。
典型情況下,時間片被設為可以讓當前進程完成500次規約(reduction)[[3]](#)的時間。
Erlang語言實現的一個要求是要保證讓它能夠適用于編寫**軟實時**的應用程序,也就是說,系統的反應時間必須至少是毫秒級別的。一個滿足以上準則的調度算法通常對于一個這樣的Erlang實現來說已經足夠了。
要讓Erlang系統能應用于實時應用程序的另一個重要的特性是內存管理。Erlang對用戶隱藏了所有的內存管理工作。內存在需要的時候被自動分配并在不需要之后一段時間內會被自動回收。內存的分配和回收的實現必須要保證不會長時間地阻塞系統的運行,最好是比一個時間片更短,以保證不會影響其實時性。
### 進程優先級
所有新創建的進程都在運行在同一個優先級上。不過有時候也許會希望一些進程以一個比其他進程更高或更低的優先級運行:例如,一個用于跟蹤系統狀態的進程也許只需要偶爾運行一下。BIF process_flag可以用來改變進程的優先級:
~~~
process_flag(priority, Pri)
~~~
Pri是進程的新的優先級,可以是normal或者low,這將改變調用該BIF的進程的運行優先級。優先級為normal的進程會比優先級為low的進程運行得更加頻繁一些。所有進程默認的優先級都是normal。
### 進程組
所有Erlang進程都有一個與其相關聯的Pid,稱作進程的組長。當一個新進程被創建時,它會被自動歸屬到調用spawn語句的那個進程所屬的進程組中。一開始,系統中的第一關進程是它自身的組長,因此也是所有后來創建的進程的組長。這表示所有的Erlang進程被組織為一個樹形結構,第一個進程是樹根。
以下的BIF可以被用于操控進程組:
group_leader()
> 返回調用該BIF的進程的組長Pid。
group_leader(Leader,Pid)
> 將進程Pid的組長設置為Leader。
Erlang的輸入輸出系統中用到了進程組的概念,詳見第??章的描述。
腳注
| [[1]](#) | 因為并沒有專門用于存放這些計算結果的地方。 |
|-----|-----|
| [[2]](#) | 或者其他知道該進程標識符的進程。 |
|-----|-----|
| [[3]](#) | 一次規約(reduction)等價于一次函數調用。 |
|-----|-----|