6-ETS
======
[ETS當緩存用]()
[競爭條件?]()
[ETS當持久存儲用]()
每次我們要找一個bucket時,都要發消息給注冊表進程。在某些情況下,這意味著注冊表進程會變成性能瓶頸!
本章我們將學習ETS(Erlang Term Storage),以及如何把它當成緩存使用。
之后我們會拓展它的功能,把數據從監督者保存到其孩子上。這樣即使崩潰,數據也能存續。
>嚴重注意!絕對不要冒失地把ETS當緩存用。仔細分析你的程序,看看到底哪里才是瓶頸。這樣來決定是否需要緩存以及緩存什么。
本章僅僅講解ETS是如何工作的一個例子,具體怎么做得由你自己決定。
## 6.1-ETS當緩存用
ETS可以把Erlang/Elixir的詞語(term)存儲在內存表中。
使用[Erlang的```:ets```模塊](http://www.erlang.org/doc/man/ets.html)來操作:
```elixir
iex> table = :ets.new(:buckets_registry, [:set, :protected])
8207
iex> :ets.insert(table, {"foo", self})
true
iex> :ets.lookup(table, "foo")
[{"foo", #PID<0.41.0>}]
```
在創建一個ETS表時,需要兩個參數:表名和一組選項。對于在上面的例子,在可選的選項中我們傳遞了表類型和訪問規則。
我們選擇了```:set```類型,意思是鍵不能有重復(集合論)。
我們選擇的訪問規則是```:protected```,意思是對于這個表,只有創建該表的進程可以修改,而其它進程只能讀取。
這兩個選項是默認的,這里就不多說了。
ETS表可以被命名,可以通過名字訪問:
```elixir
iex> :ets.new(:buckets_registry, [:named_table])
:buckets_registry
iex> :ets.insert(:buckets_registry, {"foo", self})
true
iex> :ets.lookup(:buckets_registry, "foo")
[{"foo", #PID<0.41.0>}]
```
好了,現在我們使用ETS表,修改```KV.Registry```。
我們對事件管理器和bucket的監督者使用相同的技術,顯式傳遞ETS表名給```start_link```。
記住,有了服務器以及ETS表的名字,本地進程就可以訪問那個表。
打開```lib/kv/registry.ex```,修改里面的實現。加上注釋來標明我們的修改:
```elixir
defmodule KV.Registry do
use GenServer
## Client API
@doc """
Starts the registry.
"""
def start_link(table, event_manager, buckets, opts \\ []) do
# 1. We now expect the table as argument and pass it to the server
GenServer.start_link(__MODULE__, {table, event_manager, buckets}, opts)
end
@doc """
Looks up the bucket pid for `name` stored in `table`.
Returns `{:ok, pid}` if a bucket exists, `:error` otherwise.
"""
def lookup(table, name) do
# 2. lookup now expects a table and looks directly into ETS.
# No request is sent to the server.
case :ets.lookup(table, name) do
[{^name, bucket}] -> {:ok, bucket}
[] -> :error
end
end
@doc """
Ensures there is a bucket associated with the given `name` in `server`.
"""
def create(server, name) do
GenServer.cast(server, {:create, name})
end
## Server callbacks
def init({table, events, buckets}) do
# 3. We have replaced the names HashDict by the ETS table
ets = :ets.new(table, [:named_table, read_concurrency: true])
refs = HashDict.new
{:ok, %{names: ets, refs: refs, events: events, buckets: buckets}}
end
# 4. The previous handle_call callback for lookup was removed
def handle_cast({:create, name}, state) do
# 5. Read and write to the ETS table instead of the HashDict
case lookup(state.names, name) do
{:ok, _pid} ->
{:noreply, state}
:error ->
{:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets)
ref = Process.monitor(pid)
refs = HashDict.put(state.refs, ref, name)
:ets.insert(state.names, {name, pid})
GenEvent.sync_notify(state.events, {:create, name, pid})
{:noreply, %{state | refs: refs}}
end
end
def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
# 6. Delete from the ETS table instead of the HashDict
{name, refs} = HashDict.pop(state.refs, ref)
:ets.delete(state.names, name)
GenEvent.sync_notify(state.events, {:exit, name, pid})
{:noreply, %{state | refs: refs}}
end
def handle_info(_msg, state) do
{:noreply, state}
end
end
```
注意,修改前的```KV.Registry.lookup/2```給服務器發送請求;修改后,它就直接從ETS表里面讀取數據了。該表是對各進程都共享的。
這就是我們實現的緩存機制的大體想法。
為了讓緩存機制工作,新建的ETS起碼需要```:protected```訪問規則(默認的),這樣客戶端才能從中讀取數據。
否則就只有```KV.Registry```進程才能訪問。
我們還在啟動ETS表時設置了```:read_concurrency```,為表的并發訪問稍作優化。
我們以上的改動導致測試都掛了。一個重要原因是我們在啟動注冊表進程時,需要多傳遞一個參數給```KV.Registry.start_link/3```。
讓我們重寫```setup```回調來修復測試代碼```test/kv/registry_test.exs```:
```elixir
setup do
{:ok, sup} = KV.Bucket.Supervisor.start_link
{:ok, manager} = GenEvent.start_link
{:ok, registry} = KV.Registry.start_link(:registry_table, manager, sup)
GenEvent.add_mon_handler(manager, Forwarder, self())
{:ok, registry: registry, ets: :registry_table}
end
```
注意我們傳遞了一個表名```:registry_table```給```KV.Registry.start_link/3```,
其后返回了```ets: :registry_table```,成為了測試的上下文。
修改了這個回調后,測試仍有fail,差不多都是這個樣子:
```
1) test spawns buckets (KV.RegistryTest)
test/kv/registry_test.exs:38
** (ArgumentError) argument error
stacktrace:
(stdlib) :ets.lookup(#PID<0.99.0>, "shopping")
(kv) lib/kv/registry.ex:22: KV.Registry.lookup/2
test/kv/registry_test.exs:39
```
這是因為我們傳遞了注冊表進程的pid給函數```KV.Registry.lookup/2```,而它期待的卻是ETS的表名。
為了修復我們要把所有的:
```elixir
KV.Registry.lookup(registry, ...)
```
都改為:
```elixir
KV.Registry.lookup(ets, ...)
```
其中獲取```ets```的方法跟我們獲取注冊表一個樣子:
```elixir
test "spawns buckets", %{registry: registry, ets: ets} do
```
像這樣,我們對測試進行修改,把```ets```傳遞給```lookup/2```。一旦我們完成這些修改,有些測試還是會失敗。
你還會觀察到,每次執行測試,成功和失敗不是穩定的。例如,對于“派生bucket進程”這個測試來說:
```elixir
test "spawns buckets", %{registry: registry, ets: ets} do
assert KV.Registry.lookup(ets, "shopping") == :error
KV.Registry.create(registry, "shopping")
assert {:ok, bucket} = KV.Registry.lookup(ets, "shopping")
KV.Bucket.put(bucket, "milk", 1)
assert KV.Bucket.get(bucket, "milk") == 1
end
```
有可能會在這行失敗:
```elixir
assert {:ok, bucket} = KV.Registry.lookup(ets, "shopping")
```
但是假如我們在這行之前創建一個bucket,還會失敗嗎?
原因在于(嗯哼!基于教學目的),我們犯了兩個錯誤:
1. 我們過于冒進地使用緩存來優化
2. 我們使用的是```cast/2```,它應該是```call/2```
## 6.2-競爭條件?
用Elixir編程不會讓你避免競爭狀態。但是Elixir關于“沒啥是共享”的這個特點可以幫助你很容易找到導致競爭狀態的根本原因。
我們測試中發生的事兒是__延遲__---介于我們操作和我們觀察到ETS表被改動之間。下面是我們期望發生的:
1. 我們執行```KV.Registry.create(registry, "shopping")```
2. 注冊表進程創建了bucket,并且更新了緩存表
3. 我們用```KV.Registry.lookup(ets, "shopping")```從表中獲取信息
4. 上面的命令返回```{:ok, bucket}```
但是,因為```KV.Registry.create/2```使用cast操作,命令在真正修改表之前先返回了結果!換句話說,其實發生了下面的事:
1. 我們執行```KV.Registry.create(registry, "shopping")```
2. 我們用```KV.Registry.lookup(ets, "shopping")```從表中獲取信息
3. 命令返回```:error```
4. 注冊表進程創建了bucket,并且更新了緩存表
要修復這個問題,只需要讓```KV.Registry.create/2```同步操作,使用```call/2```而不是```cast/2```。
這就能保證客戶端只會在表被修改后才能繼續下面的操作。讓我們來修改相應函數和回調:
```elixir
def create(server, name) do
GenServer.call(server, {:create, name})
end
def handle_call({:create, name}, _from, state) do
case lookup(state.names, name) do
{:ok, pid} ->
{:reply, pid, state} # Reply with pid
:error ->
{:ok, pid} = KV.Bucket.Supervisor.start_bucket(state.buckets)
ref = Process.monitor(pid)
refs = HashDict.put(state.refs, ref, name)
:ets.insert(state.names, {name, pid})
GenEvent.sync_notify(state.events, {:create, name, pid})
{:reply, pid, %{state | refs: refs}} # Reply with pid
end
end
```
我們只是簡單地把回調里的```handle_cast/2```改成了```handle_call/3```,并且返回創建的bucket的pid。
現在執行下測試。這次,我們要使用```--trace```選項:
```elixir
$ mix test --trace
```
如果你的測試中有死鎖或者競爭條件時,```--trace```選項非常有用。因為它可以同步執行所有測試(而```async: true```沒啥效果),并且顯式每條測試的詳細信息。這次我們應該只有一條失敗(可能也是間歇性的):
```
1) test removes buckets on exit (KV.RegistryTest)
test/kv/registry_test.exs:48
Assertion with == failed
code: KV.Registry.lookup(ets, "shopping") == :error
lhs: {:ok, #PID<0.103.0>}
rhs: :error
stacktrace:
test/kv/registry_test.exs:52
```
根據錯誤信息,我們期望表中沒有bucket,但是它卻有。
這個問題和我們剛剛解決的相反:之前的問題是創建bucket的命令與更新表之間的延遲,而現在是bucket處理退出操作與清除它在表中的記錄之間的延遲。
不幸的是,這次我們無法簡單地把```handle_info/2```改成一個同步的操作。但是我們可以用事件管理器的通知來修復該失敗。
先來看看我們```handle_info/2```的實現:
```elixir
def handle_info({:DOWN, ref, :process, pid, _reason}, state) do
# 5. Delete from the ETS table instead of the HashDict
{name, refs} = HashDict.pop(state.refs, ref)
:ets.delete(state.names, name)
GenEvent.sync_notify(state.event, {:exit, name, pid})
{:noreply, %{state | refs: refs}}
end
```
注意我們在發通知__之前__就從ETS表中進行刪除操作。這是有意為之的。
這意味著當我們收到```{:exit, name, pid}```通知的時候,表即已經是最新了。讓我們更新剩下的代碼:
```elixir
test "removes buckets on exit", %{registry: registry, ets: ets} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(ets, "shopping")
Agent.stop(bucket)
assert_receive {:exit, "shopping", ^bucket} # Wait for event
assert KV.Registry.lookup(ets, "shopping") == :error
end
```
我們對測試稍作調整,保證先收到```{:exit, name, pid}消息,再執行```KV.Registry.lookup/2```。
你看,我們能夠通過修改程序邏輯來使測試通過,而不是使用諸如```:timer.sleep/1```或者其它小技巧。這很重要。
大部分時間里,我們依賴于事件,監視以及消息機制來確保系統處在期望狀態,在執行測試斷言之前。
為方便,下面給出能通過的測試全文:
```elixir
defmodule KV.RegistryTest do
use ExUnit.Case, async: true
defmodule Forwarder do
use GenEvent
def handle_event(event, parent) do
send parent, event
{:ok, parent}
end
end
setup do
{:ok, sup} = KV.Bucket.Supervisor.start_link
{:ok, manager} = GenEvent.start_link
{:ok, registry} = KV.Registry.start_link(:registry_table, manager, sup)
GenEvent.add_mon_handler(manager, Forwarder, self())
{:ok, registry: registry, ets: :registry_table}
end
test "sends events on create and crash", %{registry: registry, ets: ets} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(ets, "shopping")
assert_receive {:create, "shopping", ^bucket}
Agent.stop(bucket)
assert_receive {:exit, "shopping", ^bucket}
end
test "spawns buckets", %{registry: registry, ets: ets} do
assert KV.Registry.lookup(ets, "shopping") == :error
KV.Registry.create(registry, "shopping")
assert {:ok, bucket} = KV.Registry.lookup(ets, "shopping")
KV.Bucket.put(bucket, "milk", 1)
assert KV.Bucket.get(bucket, "milk") == 1
end
test "removes buckets on exit", %{registry: registry, ets: ets} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(ets, "shopping")
Agent.stop(bucket)
assert_receive {:exit, "shopping", ^bucket} # Wait for event
assert KV.Registry.lookup(ets, "shopping") == :error
end
test "removes bucket on crash", %{registry: registry, ets: ets} do
KV.Registry.create(registry, "shopping")
{:ok, bucket} = KV.Registry.lookup(ets, "shopping")
# Kill the bucket and wait for the notification
Process.exit(bucket, :shutdown)
assert_receive {:exit, "shopping", ^bucket}
assert KV.Registry.lookup(ets, "shopping") == :error
end
end
```
隨著測試通過,我們只需更新監督者```init/1```回調函數的代碼(文件```lib/kv/supervisor.ex```),傳遞ETS表的名字作為參數給注冊表工人:
```elixir
@manager_name KV.EventManager
@registry_name KV.Registry
@ets_registry_name KV.Registry
@bucket_sup_name KV.Bucket.Supervisor
def init(:ok) do
children = [
worker(GenEvent, [[name: @manager_name]]),
supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]),
worker(KV.Registry, [@ets_registry_name, @manager_name,
@bucket_sup_name, [name: @registry_name]])
]
supervise(children, strategy: :one_for_one)
end
```
注意我們仍使用```KV.Registry```作為ETS表的名字,好讓debug方便些,因為它指明了使用它的模塊。ETS名和進程名分別存儲在不同的注冊表,以避免沖突。
## 6.3-ETS當持久存儲用
到目前為止,我們在初始化注冊表的時候創建了一個ETS表,而沒有操心在注冊表結束時關閉該ETS表。
這是因為ETS表是“連接”(某種修辭上說)著創建它的進程的。如果那進程掛了,表也會自動關閉。
這作為默認行為實在是太方便了,我們可以在將來更多地利用這個特點。
記住,注冊表和bucket監督者之間有依賴。注冊表掛,我們希望bucket監督者也掛。
因為一旦注冊表掛,所有連接bucket進程的信息都會丟失。
但是,假如我們能保存注冊表的數據怎么樣?
如果我們能做到這點,就可以去除注冊表和bucket監督者之間的依賴了,讓```:one_for_one```成為監督者最合適的策略。
要做到這點需要些小改動。首先我們需要在監督者內啟動ETS表。其次,我們需要把表的訪問類型從```:protected```改成```:public```。
因為表的所有者是監督者,但是進行修改操作的仍然是時間管理者。
讓我們從修改```KV.Supervisor```的```init/1```回調開始:
```elixir
def init(:ok) do
ets = :ets.new(@ets_registry_name,
[:set, :public, :named_table, {:read_concurrency, true}])
children = [
worker(GenEvent, [[name: @manager_name]]),
supervisor(KV.Bucket.Supervisor, [[name: @bucket_sup_name]]),
worker(KV.Registry, [ets, @manager_name,
@bucket_sup_name, [name: @registry_name]])
]
supervise(children, strategy: :one_for_one)
end
```
接下來,我們修改```KV.Registry```的```init/1```回調,因為它不再需要創建一個表,而是需要一個表作為參數:
```elixir
def init({table, events, buckets}) do
refs = HashDict.new
{:ok, %{names: table, refs: refs, events: events, buckets: buckets}}
end
```
最終,我們修改```test/kv/registry_test.exs```中的```setup```回調,來顯式地創建ETS表。
我們還將用這個機會分離```setup```的功能,放到一個方便的私有函數中:
```elixir
setup do
ets = :ets.new(:registry_table, [:set, :public])
registry = start_registry(ets)
{:ok, registry: registry, ets: ets}
end
defp start_registry(ets) do
{:ok, sup} = KV.Bucket.Supervisor.start_link
{:ok, manager} = GenEvent.start_link
{:ok, registry} = KV.Registry.start_link(ets, manager, sup)
GenEvent.add_mon_handler(manager, Forwarder, self())
registry
end
```
這之后,我們的測試應該都綠啦!
現在只剩下一個場景需要考慮:一旦我們收到了ETS表,可能有現存的bucket的pid在這個表中。
這是我們這次改動的目的。
但是,新啟動的注冊表進程沒有監視這些bucket,因為它們是作為之前的注冊表的一部分創建的,現在那些注冊表已經不存在了。
這意味著表將被嚴重拖累,因為我們都不去清除已經掛掉的bucket。
來增加一個測試來暴露這個bug:
```elixir
test "monitors existing entries", %{registry: registry, ets: ets} do
bucket = KV.Registry.create(registry, "shopping")
# Kill the registry. We unlink first, otherwise it will kill the test
Process.unlink(registry)
Process.exit(registry, :shutdown)
# Start a new registry with the existing table and access the bucket
start_registry(ets)
assert KV.Registry.lookup(ets, "shopping") == {:ok, bucket}
# Once the bucket dies, we should receive notifications
Process.exit(bucket, :shutdown)
assert_receive {:exit, "shopping", ^bucket}
assert KV.Registry.lookup(ets, "shopping") == :error
end
```
執行這個測試,它將失敗:
```
1) test monitors existing entries (KV.RegistryTest)
test/kv/registry_test.exs:72
No message matching {:exit, "shopping", ^bucket}
stacktrace:
test/kv/registry_test.exs:85
```
這是我們期望的。如果bucket不被監視,在它掛的時候,注冊表將得不到通知,因此也沒有事件發生。
我們可以修改```KV.Registry```的```init/1```回調來修復這個問題。給所有表中的現存條目設置監視器:
```elixir
def init({table, events, buckets}) do
refs = :ets.foldl(fn {name, pid}, acc ->
HashDict.put(acc, Process.monitor(pid), name)
end, HashDict.new, table)
{:ok, %{names: table, refs: refs, events: events, buckets: buckets}}
end
```
我們用```:ets.foldl/3```來遍歷表中所有條目,類似于```Enum.reduce/3```。它為每個條目執行提供的函數,并且用一個累加器累加結果。
在函數回調中,我們監視每個表中的pid,并相應地更新存放引用信息的字典。
如果有某個條目是掛掉的,我們還能收到```:DOWN```消息,稍后可以清除它們。
本章讓監督者擁有ETS表,并且使其將表作為參數傳遞給注冊表進程。通過這樣的方法,我們讓程序變得更加健壯。
我們還探索了把ETS當作緩存,并且討論了如果在客戶端和服務器共享數據時會進入的競爭狀態。