8-Task模塊和通用TCP服務器(gen_tcp)
================
* [Echo服務器]()
* [Tasks]()
* [Task的監督者]()
本章我們學習如何使用[Erlang的:gen_tcp模塊](http://erlang.org/doc/man/gen_tcp.html)來處理請求。
在未來幾章中我們還會擴展我們的服務器,使之能夠服務于真正的命令。
這也是我們探索Elixir的Task模塊的大好機會。
## 8.1-Echo服務器
我們首先實現一個Echo(回聲)服務器來開始我們的TCP服務器之旅。它只是簡單地返回從請求中收到的文字。
我們會慢慢地改進這個服務器,使它有監督者來監督,并且可以處理大量連接。
一個TCP服務器,總的來說,實現以下幾步:
1. 在可用端口建立socket連接,監聽這個端口
2. 等待這個端口的客戶端連接,有了就接受它
3. 讀取客戶端請求并且寫回復
我們來實現這些步驟。在```apps/kv_server```程序中,打開文件```lib/kv_server.ex```,添加以下函數:
```elixir
def accept(port) do
# The options below mean:
#
# 1. `:binary` - receives data as binaries (instead of lists)
# 2. `packet: :line` - receives data line by line
# 3. `active: false` - block on `:gen_tcp.recv/2` until data is available
#
{:ok, socket} = :gen_tcp.listen(port,
[:binary, packet: :line, active: false])
IO.puts "Accepting connections on port #{port}"
loop_acceptor(socket)
end
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
serve(client)
loop_acceptor(socket)
end
defp serve(client) do
client
|> read_line()
|> write_line(client)
serve(client)
end
defp read_line(socket) do
{:ok, data} = :gen_tcp.recv(socket, 0)
data
end
defp write_line(line, socket) do
:gen_tcp.send(socket, line)
end
```
我們通過調用```KVServer.accept(4040)```來啟動服務器,其中4040是端口號。在```accept/1```中,第一步是去監聽這個端口,知道socket變成可用狀態,然后調用```loop_acceptor/1```。函數```loop_acceptor/1```只是一個循環,來接受客戶端的連接。
對于每次接受的客戶端連接,我們調用```serve/1```函數。
函數```serve/1```也是個循環,它一次從socket中讀取一行,并將其寫進給socket的回復。
注意```serve/1```使用了[管道運算符 ```|>```](http://elixir-lang.org/docs/stable/elixir/Kernel.html#%7C%3E/2)來表達操作流程。
管道運算符計算左側函數計算的結果,并將其作為第一個參數傳遞給右側函數調用。如:
```elixir
socket |> read_line() |> write_line(socket)
```
相當于:
```elixir
write_line(read_line(socket), socket)
```
>當使用```|>```運算符時,是否給函數調用加上括號是很重要的。舉個例子:
```elixir
1..10 |> Enum.filter &(&1 <= 5) |> Enum.map &(&1 * 2)
```
會被翻譯為:
```elixir
1..10 |> Enum.filter(&(&1 <= 5) |> Enum.map(&(&1 * 2)))
```
這個不是我們想要的,因為本應傳給```Enum.filter/2```的那個匿名函數```&(&1<=5)```成了傳給```Enum.map/2```的第一個參數。
解決方法就是加上括號:
```elixir
1..10 |> Enum.filter(&(&1 <= 5)) |> Enum.map(&(&1 * 2))
```
函數```read_line/2```中使用```:gen_tcp.recv/2```接收從socket傳來的數據。
而```write_line/2```中使用```:gen_tcp.send/2```向socket寫入數據。
這差不多就是我們為實現這個回聲服務器所要做的。讓我們試一試。
用```iex -S mix```在```kv_server```應用程序中啟動對話,執行:
```elixir
iex> KVServer.accept(4040)
```
服務器就運行了,注意到此時該命令行會被阻塞。現在我們使用一個[telnet客戶端](http://en.wikipedia.org/wiki/Telnet)
來訪問這個服務器。基本上每個操作系統都有telnet客戶端程序,命令也都差不多:
```
$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
is it me
is it me
you are looking for?
you are looking for?
```
輸入“hello”,按回車,你就會得到“hello”字樣的回復。好牛逼!
退出telnet客戶端方法不一,有些用```ctrl + ]```,有些是```quit```按回車。
一旦你退出telnet客戶端,你會發現IEx會話中打印出一個錯誤信息:
```
** (MatchError) no match of right hand side value: {:error, :closed}
(kv_server) lib/kv_server.ex:41: KVServer.read_line/1
(kv_server) lib/kv_server.ex:33: KVServer.serve/1
(kv_server) lib/kv_server.ex:27: KVServer.loop_acceptor/1
```
這是因為我們還期望從```:gen_tcp.recv/2```拿數據,但是客戶端斷了。我們將來要處理這個問題才行。
目前還有個更重要的bug要修:假如TCP接收者掛了怎么辦?意為它沒有監督者,不會自己重啟,要是掛了我們將不能在處理更多的請求。
這就是為啥我們要將它挪進監督樹。
## 8.2-Tasks
我們已經學習了Agent,通用服務器以及事件管理器。它們都可以進行多消息協作,或者管理狀態。
但是,若是只需要處理一些任務,選什么呢?
[Task模塊](http://elixir-lang.org/docs/stable/elixir/Task.html)為此提供了所需的功能。
例如,它有```start_link/3```函數,接受一個模塊名、一個函數和函數的參數,從而執行這個傳入的函數,并且還是作為監督樹的一部分。
我們來試試。打開```lib/kv_server.ex```,修改下里```start/2```函數里的監督者:
```elixir
def start(_type, _args) do
import Supervisor.Spec
children = [
worker(Task, [KVServer, :accept, [4040]])
]
opts = [strategy: :one_for_one, name: KVServer.Supervisor]
Supervisor.start_link(children, opts)
end
```
改動的意思是要讓```KVServer.accept(4040)```成為一個工人來運行。目前我們暫時hardcode這個端口號,之后再討論如何修改。
現在,這個服務器是監督樹的一部分了,它應該會隨著應用程序啟動而自動運行。
在終端中輸入```mix run --no-halt```,然后再次用telnet客戶端來試試看是否還一切正常:
```
$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
say you
say you
say me
say me
```
看,它還是好使!這回就算退了客戶端,服務器掛了,你會看到又一個立馬起來了。嗯,不錯。。。不過它可伸縮性如何?
試著打開兩個telnet客戶端一起連接,你會注意到,第二個客戶端根本不能回聲:
```
$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello?
HELLOOOOOO?
```
看起來根本不工作嘛。這是因為處理請求和接受請求是在同一個進程。一個客戶端連上來,就沒法處理第二個了。
## 8.3-Task的監督者
為了讓我們的服務器能夠處理并發連接,我們需要讓一個進程來當接收者,然后派生其它的進程來服務接收到的連接。
一個方案是:
```elixir
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
serve(client)
loop_acceptor(socket)
end
```
函數```Task.start_link/1```類似```Task.start_link/3```,但是它可以接受一個匿名函數而不是(模塊,函數,參數)的組合:
```elixir
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
Task.start_link(fn -> serve(client) end)
loop_acceptor(socket)
end
```
我們翻過這個錯了,記得嗎?
和我們當時在注冊表進程中調用```KV.Bucket.start_link/0```犯的錯差不多。它意味著一個bucket掛會導致整個注冊表進程掛。
上面的代碼頁犯了相同的錯誤:如果我們把```serve(client)```這個任務和接收者連接起來,那么在處理請求時發生的小事故就會導致請求接收者掛,繼而導致連接都掛掉。
當時我們解決這個問題是用了一個簡單的一對一監督者。這里我們也將使用相同的辦法,除了一點:這個模式在Task中實在是太通用了,
所有Task已經為之提供了一個解決方案---一個簡單的一對一監督者加上臨時工(臨時的工人),這個我們在之前的監督樹中就是這么用的。
讓我們再次修改下```start/2```函數,加個監督者:
```elixir
def start(_type, _args) do
import Supervisor.Spec
children = [
supervisor(Task.Supervisor, [[name: KVServer.TaskSupervisor]]),
worker(Task, [KVServer, :accept, [4040]])
]
opts = [strategy: :one_for_one, name: KVServer.Supervisor]
Supervisor.start_link(children, opts)
end
```
我們簡單地啟動了一個[```Task.Supervisor```](http://elixir-lang.org/docs/stable/elixir/Task.Supervisor.html)進程,
名字叫```Task.Supervisor```。記住,因為接收者任務依賴于這個監督者,因此該監督者必須先啟動。
現在我們只需修改```loop_acceptor/2```,使用```Task.Supervisor```來處理每個請求:
```elixir
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
loop_acceptor(socket)
end
```
用命令```mix run --no-halt```啟動新的服務器,現在就可以打開多個客戶端來連接了。而且你會發現一個客戶端退出不會讓接收者掛掉。
好棒!
一下是完整的服務器實現,在單個模塊中:
```elixir
defmodule KVServer do
use Application
@doc false
def start(_type, _args) do
import Supervisor.Spec
children = [
supervisor(Task.Supervisor, [[name: KVServer.TaskSupervisor]]),
worker(Task, [KVServer, :accept, [4040]])
]
opts = [strategy: :one_for_one, name: KVServer.Supervisor]
Supervisor.start_link(children, opts)
end
@doc """
Starts accepting connections on the given `port`.
"""
def accept(port) do
{:ok, socket} = :gen_tcp.listen(port,
[:binary, packet: :line, active: false])
IO.puts "Accepting connections on port #{port}"
loop_acceptor(socket)
end
defp loop_acceptor(socket) do
{:ok, client} = :gen_tcp.accept(socket)
Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
loop_acceptor(socket)
end
defp serve(socket) do
socket
|> read_line()
|> write_line(socket)
serve(socket)
end
defp read_line(socket) do
{:ok, data} = :gen_tcp.recv(socket, 0)
data
end
defp write_line(line, socket) do
:gen_tcp.send(socket, line)
end
end
```
因為我們修改了監督者的需求,我們會問:我們的監督者策略還適用嗎?
這里答案是Yes:如果接收者掛了,現存的連接是沒理由一起掛的。另一方面,如果task監督者掛了,同樣也沒必要讓接收者掛掉。
這和注冊表進程那種情況相反,那種情況我們在一開始必須在注冊表進程掛掉時讓監督者也掛掉,直到后來我們用上了ETS來持久化保存狀態。
而task是沒有狀態什么的,掛掉一個兩個也不會拖誰的后腿。
下一章我們將開始解析客戶請求,然后發送回復,從而完成我們的服務器。