4-GenEvent
============
[事件管理器]()
[注冊表進程的事件]()
[事件流]()
>注:Elixir v1.1 發布后本章內容被從官方入門手冊中拿掉了。
這里留存,如果仍需使用GenEvent,可以查閱。大家可以暫時跳過這一章。
本章探索GenEvent,Elixir和OTP提供的又一個行為抽象。它允許我們派生一個事件管理器,用來向多個處理者發布事件消息。
我們會激發兩種事件:一個是每次bucket被加到注冊表,另一個是從注冊表中移除。
## 4.1-事件管理器
打開一個新```iex -S mix```對話,玩弄一下GenEvent的API:
```elixir
iex> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.83.0>}
iex> GenEvent.sync_notify(manager, :hello)
:ok
iex> GenEvent.notify(manager, :world)
:ok
```
函數```GenEvent.start_link/0```啟動了一個新的事件管理器。不需額外的參數。
管理器創建好后,我們就可以調用```GenEvent.notify/2```函數和```GenEvent.sync_notify/2```函數來發送通知。
但是,當前還沒有任何消息處理者綁定到該管理器,因此不管它發啥通知,叫破喉嚨都不會有事兒發生。
現在就在iex對話里創建第一個事件處理器:
```elixir
iex> defmodule Forwarder do
...> use GenEvent
...> def handle_event(event, parent) do
...> send parent, event
...> {:ok, parent}
...> end
...> end
iex> GenEvent.add_handler(manager, Forwarder, self())
:ok
iex> GenEvent.sync_notify(manager, {:hello, :world})
:ok
iex> flush
{:hello, :world}
:ok
```
我們創建了一個處理器(handler),并通過函數```GenEvent.add_handler/3```把它“綁定”到事件管理器上,傳遞的三個參數是:
1. 剛啟動的那個時間管理器
2. 定義事件處理者的模塊(如這里的```Forwarder```)
3. 事件處理者的狀態:在這里,使用當前進程的id
加上這個處理器之后,可以看到,調用了```sync_notify/2```之后,```Forwarder```處理器成功地把事件轉給了它的父進程(IEx),因此那個消息進入了我們的收件箱。
這里有幾點需要注意:
1. 事件處理器運行在事件管理器的同一個進程里
2. ```sync_notify/2```同步地運行事件處理器處理請求
3. ```notify/2```使事件處理器異步處理請求
這里```sync_notify/2```和```notify/2```類似于GenServer里面的```call/2```和```cast/2```。推薦使用```sync_notify/2```。
它以反向壓力的機制工作,減少了“發消息速度快過消息被成功分發的速度”的可能性。
記得去[GenServer的模塊文檔](http://elixir-lang.org/docs/stable/elixir/GenEvent.html)閱讀其它函數。
目前我們的程序就用提到的這些知識就可以了。
## 4.2-注冊表進程的事件
為了能發出事件消息,我們要稍微修改一下我們的注冊表進程,使之與一個事件管理器進行協作。
我們需要在注冊表進程啟動的時候,事件管理器也能自動啟動。
比如在```init/1```回調里面,最好能傳遞事件處理器的pid或名字什么的作為參數來```start_link```,以此將啟動事件管理器與注冊表進程分解開。
但是,首先讓我們修改測試中注冊表進程的行為。打開```test/kv/registry_text.exs```,修改目前的```setup```回調,然后再加上新的測試:
```elixir
defmodule Forwarder do
use GenEvent
def handle_event(event, parent) do
send parent, event
{:ok, parent}
end
end
setup do
{:ok, manager} = GenEvent.start_link
{:ok, registry} = KV.Registry.start_link(manager)
GenEvent.add_mon_handler(manager, Forwarder, self())
{:ok, registry: registry}
end
test "sends events on create and crash", %{registry: registry} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(registry, "shopping")
assert_receive {:create, "shopping", ^bucket}
Agent.stop(bucket)
assert_receive {:exit, "shopping", ^bucket}
end
```
為了測試我們即將添加的功能,我們首先定義了一個```Forwarder```事件處理器,類似剛才在IEx中創建的那樣。
在```Setup```中,我們啟動了事件管理器,把它作為參數傳遞給了注冊表進程,并且向該管理器添加了我們定義的```Forwarder```處理器。
至此,事件可以發向待測進程了。
在測試中,我們創建、停止了一個bucket進程,并且使用```assert_receive```斷言來檢查是否收到了```:create```和```:exit```事件消息。
斷言```assert_receive```默認是500毫秒超時時間,這對于測試足夠了。
同樣要指出的是,```assert_receive```期待接收一個模式,而不是一個值。
這就是為啥我們用```^bucket```來匹配bucket的pid(參考《入門》關于變量的匹配內容)。
最終,注意我們調用了```GenEvent.add_mon_handler/3```來代替```GenEvent.add_handler/3```。該函數不但可以添加一個處理器,它還告訴事件管理器來監視當前進程。如果當前進程掛了,事件處理器也一并抹去。
這個很有道理,因為對于這里的```Forwarder```,如果消息的接收方(```self()```/測試進程)終止,我們理所應當停止轉發消息。
好了,現在來修改注冊表進程代碼來讓測試pass。打開```lib/kv/registry.ex```,輸入以下新的內容(一些關鍵語句的解釋寫在注釋里):
```elixir
defmodule KV.Registry do
use GenServer
## Client API
@doc """
Starts the registry.
"""
def start_link(event_manager, opts \\ []) do
# 1. start_link now expects the event manager as argument
GenServer.start_link(__MODULE__, event_manager, opts)
end
@doc """
Looks up the bucket pid for `name` stored in `server`.
Returns `{:ok, pid}` in case a bucket exists, `:error` otherwise.
"""
def lookup(server, name) do
GenServer.call(server, {:lookup, name})
end
@doc """
Ensures there is a bucket associated with the given `name` in `server`.
"""
def create(server, name) do
GenServer.cast(server, {:create, name})
end
## Server callbacks
def init(events) do
# 2. The init callback now receives the event manager.
# We have also changed the manager state from a tuple
# to a map, allowing us to add new fields in the future
# without needing to rewrite all callbacks.
names = HashDict.new
refs = HashDict.new
{:ok, %{names: names, refs: refs, events: events}}
end
def handle_call({:lookup, name}, _from, state) do
{:reply, HashDict.fetch(state.names, name), state}
end
def handle_cast({:create, name}, state) do
if HashDict.get(state.names, name) do
{:noreply, state}
else
{:ok, pid} = KV.Bucket.start_link()
ref = Process.monitor(pid)
refs = HashDict.put(state.refs, ref, name)
names = HashDict.put(state.names, name, pid)
# 3. Push a notification to the event manager on create
GenEvent.sync_notify(state.events, {:create, name, pid})
{:noreply, %{state | names: names, refs: refs}}
end
end
def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
{name, refs} = HashDict.pop(state.refs, ref)
names = HashDict.delete(state.names, name)
# 4. Push a notification to the event manager on exit
GenEvent.sync_notify(state.events, {:exit, name, pid})
{:noreply, %{state | names: names, refs: refs}}
end
def handle_info(_msg, state) do
{:noreply, state}
end
end
```
這些改變很直觀。我們給```GenServer```初始化過程傳遞一個事件管理器,該管理器是我們用```start_link```啟動進程時作為參數收到的。
我們還改了cast和info兩個回調,在里面調用了```GenEvent.sync_notify/2```。
最后,我們借這個機會還把服務器的狀態改成了一個圖,方便我們以后改進注冊表進程。
執行測試,都是綠的。
## 4.3-事件流
最后一個值得探索的```GenEvent```的功能點是像處理流一樣處理事件:
```elixir
iex> {:ok, manager} = GenEvent.start_link
{:ok, #PID<0.83.0>}
iex> spawn_link fn ->
...> for x <- GenEvent.stream(manager), do: IO.inspect(x)
...> end
:ok
iex> GenEvent.notify(manager, {:hello, :world})
{:hello, :world}
:ok
```
上面的例子中,我們創建了一個```GenEvent.stream(manager)```,返回一個事件的流(即一個enumerable),并隨即處理了它。
處理事件是一個_阻塞_的行為,我們派生新進程來處理事件消息,把消息打印在終端上。這一系列的操作,就像看到的那樣,如實地執行了。
每次調用```sync_notify/2```或者```notify/2```,事件都被打印在終端上,后面跟著一個```:ok```(IEx輸出語句的執行結果)。
通常事件流提供了足夠多的內置功能來處理事件,使我們不必實現我們自己的處理器。
但是,若是需要某些自定義的功能,或是在測試時,定義自己的事件處理器回調才是正道。
至此,我們有了一個事件處理器,一個注冊表進程以及可能會同時執行的許多bucket進程,是時候開始擔心這些進程會不會掛掉了。