我們要使用Scala和先前介紹的?[Finagle](http://github.com/twitter/finagle)?框架構建一個簡單的分布式搜索引擎。
[TOC=3,3]
### 設計目標:大圖景
從廣義上講,我們的設計目標包括?*抽象*?(abstraction:在不知道其內部的所有細節的前提下,利用該系統功能的能力)、?*模塊化*?(modularity:把系統分解為小而簡單的片段,從而更容易被理解和/或被更換的能力)和?*擴展性*?(scalability:用簡單直接的方法給系統擴容的能力)。
我們要描述的系統有三個部分: (1)?*客戶端*?發出請求,(2)?*服務端*?接收請求并應答,和(3)?*傳送*?機制來這些通信包裝起來。通常情況下,客戶端和服務器位于不同的機器上,通過網絡上的一個特定的?[*端口*](http://en.wikipedia.org/wiki/Port_(computer_networking))?進行通信,但在這個例子中,它們將運行在同一臺機器上(而且仍然使用端口進行通信) 。在我們的例子中,客戶端和服務器將用Scala編寫,傳送協議將使用?[Thrift](http://thrift.apache.org/)?處理。本教程的主要目的是展示一個簡單的具有良好可擴展性的服務器和客戶端。
### 探索默認的引導程序項目
首先,使用?[scala-bootstrapper](https://github.com/twitter/scala-bootstrapper)?創建一個骨架項目( “ Searchbird ” )。這將創建一個簡單的基于?[Finagle](http://twitter.github.com/finagle/)?和key-value內存存儲的Scala服務。我們將擴展這個工程以支持搜索值,并進而支持多進程多個內存存儲的搜索。
~~~
$ mkdir searchbird ; cd searchbird
$ scala-bootstrapper searchbird
writing build.sbt
writing config/development.scala
writing config/production.scala
writing config/staging.scala
writing config/test.scala
writing console
writing Gemfile
writing project/plugins.sbt
writing README.md
writing sbt
writing src/main/scala/com/twitter/searchbird/SearchbirdConsoleClient.scala
writing src/main/scala/com/twitter/searchbird/SearchbirdServiceImpl.scala
writing src/main/scala/com/twitter/searchbird/config/SearchbirdServiceConfig.scala
writing src/main/scala/com/twitter/searchbird/Main.scala
writing src/main/thrift/searchbird.thrift
writing src/scripts/searchbird.sh
writing src/scripts/config.sh
writing src/scripts/devel.sh
writing src/scripts/server.sh
writing src/scripts/service.sh
writing src/test/scala/com/twitter/searchbird/AbstractSpec.scala
writing src/test/scala/com/twitter/searchbird/SearchbirdServiceSpec.scala
writing TUTORIAL.md
~~~
首先,來看下?`scala-bootstrapper`?為我們創建的默認項目。這是一個模板。雖然最終將替換它的大部分內容,不過作為支架它還是很方便的。它定義了一個簡單(但完整)的key-value存儲,并包含了配置、thrift接口、統計輸出和日志記錄。
在我們看代碼之前,先運行一個客戶端和服務器,看看它是如何工作的。這里是我們構建的:

這里是我們的服務輸出的接口。由于Searchbird服務是一個?[Thrift](http://thrift.apache.org/)?服務(和我們大部分服務一樣),因而其外部接口使用Thrift IDL(“接口描述語言”)定義。
##### src/main/thrift/searchbird.thrift
~~~
service SearchbirdService {
string get(1: string key) throws(1: SearchbirdException ex)
void put(1: string key, 2: string value)
}
~~~
這是非常直觀的:我們的服務?`SearchbirdService`?輸出兩個RPC方法?`get`?和?`put`?。他們組成了一個到key-value存儲的簡單接口。
現在,讓我們運行默認的服務,啟動客戶端連接到這個服務,并通過這個接口來探索他們。打開兩個窗口,一個用于服務器,一個用于客戶端。
在第一個窗口中,用交互模式啟動SBT(在命令行中運行?`./sbt`[1](http://twitter.github.io/scala_school/zh_cn/searchbird.html#fn1)),然后構建和運行項目內SBT。這會運行?`Main.scala`?定義的?`主`?進程。
~~~
$ ./sbt
...
> compile
> run -f config/development.scala
...
[info] Running com.twitter.searchbird.Main -f config/development.scala
~~~
配置文件 (`development.scala`) 實例化一個新的服務,并監聽9999端口。客戶端可以連接到9999端口使用此服務。
現在,我們將使用?`控制臺`?shell腳本初始化和運行一個客戶端實例,即?`SearchbirdConsoleClient`?實例 (`SearchbirdConsoleClient.scala`) 。在另一個窗口中運行此腳本:
~~~
$ ./console 127.0.0.1 9999
[info] Running com.twitter.searchbird.SearchbirdConsoleClient 127.0.0.1 9999
'client' is bound to your thrift client.
finagle-client>
~~~
客戶端對象?`client`?現在連接到本地計算機上的9999端口,并可以跟服務交互了。接下來我們發送一些請求:
~~~
scala> client.put("marius", "Marius Eriksen")
res0: ...
scala> client.put("stevej", "Steve Jenson")
res1: ...
scala> client.get("marius")
res2: com.twitter.util.Future[String] = ...
scala> client.get("marius").get()
res3: String = Marius Eriksen
~~~
(第二個?`get()`?調用解析?`client.get()`?返回的?`Future`?類型值,阻塞直到該值準備好。)
該服務器還輸出運行統計(配置文件中指定這些信息在9900端口)。這不僅方便對各個服務器進行檢查,也利于聚集全局的服務統計(以機器可讀的JSON接口)。打開第三個窗口來查看這些統計:
~~~
$ curl localhost:9900/stats.txt
counters:
Searchbird/connects: 1
Searchbird/received_bytes: 264
Searchbird/requests: 3
Searchbird/sent_bytes: 128
Searchbird/success: 3
jvm_gc_ConcurrentMarkSweep_cycles: 1
jvm_gc_ConcurrentMarkSweep_msec: 15
jvm_gc_ParNew_cycles: 24
jvm_gc_ParNew_msec: 191
jvm_gc_cycles: 25
jvm_gc_msec: 206
gauges:
Searchbird/connections: 1
Searchbird/pending: 0
jvm_fd_count: 135
jvm_fd_limit: 10240
jvm_heap_committed: 85000192
jvm_heap_max: 530186240
jvm_heap_used: 54778640
jvm_nonheap_committed: 89657344
jvm_nonheap_max: 136314880
jvm_nonheap_used: 66238144
jvm_num_cpus: 4
jvm_post_gc_CMS_Old_Gen_used: 36490088
jvm_post_gc_CMS_Perm_Gen_used: 54718880
jvm_post_gc_Par_Eden_Space_used: 0
jvm_post_gc_Par_Survivor_Space_used: 1315280
jvm_post_gc_used: 92524248
jvm_start_time: 1345072684280
jvm_thread_count: 16
jvm_thread_daemon_count: 7
jvm_thread_peak_count: 16
jvm_uptime: 1671792
labels:
metrics:
Searchbird/handletime_us: (average=9598, count=4, maximum=19138, minimum=637, p25=637, p50=4265, p75=14175, p90=19138, p95=19138, p99=19138, p999=19138, p9999=19138, sum=38393)
Searchbird/request_latency_ms: (average=4, count=3, maximum=9, minimum=0, p25=0, p50=5, p75=9, p90=9, p95=9, p99=9, p999=9, p9999=9, sum=14)
~~~
除了我們自己的服務統計信息以外,還有一些通用的JVM統計。
現在,讓我們來看看配置、服務器和客戶端的實現代碼。
##### …/config/SearchbirdServiceConfig.scala
配置是一個Scala的特質,有一個方法?`apply: RuntimeEnvironment => T`?來創建一些?`T`?。在這個意義上,配置是“工廠” 。在運行時,配置文件(通過使用Scala編譯器庫)被取值為一個腳本,并產生一個配置對象。?`RuntimeEnvironment`?是一個提供各種運行參數(命令行標志, JVM版本,編譯時間戳等)查詢的一個對象。
`SearchbirdServiceConfig`?類就是這樣一個配置類。它使用其默認值一起指定配置參數。 (Finagle 支持一個通用的跟蹤系統,我們在本教程將不會介紹:[Zipkin](https://github.com/twitter/zipkin)?一個集合/聚合軌跡的 分布式跟蹤系統。)
~~~
class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] {
var thriftPort: Int = 9999
var tracerFactory: Tracer.Factory = NullTracer.factory
def apply(runtime: RuntimeEnvironment) = new SearchbirdServiceImpl(this)
}
~~~
在我們的例子中,我們要創建一個?`SearchbirdService.ThriftServer`。這是由thrift代碼生成器生成的服務器類型[2](http://twitter.github.io/scala_school/zh_cn/searchbird.html#fn2)。
##### …/Main.scala
在SBT控制臺中鍵入“run”調用?`main`?,這將配置和初始化服務器。它讀取配置(在?`development.scala`?中指定,并會作為參數傳給“run”),創建`SearchbirdService.ThriftServer`?,并啟動它。?`RuntimeEnvironment.loadRuntimeConfig`?執行配置賦值,并把自身作為一個參數來調用?`apply`[3](http://twitter.github.io/scala_school/zh_cn/searchbird.html#fn3)。
~~~
object Main {
private val log = Logger.get(getClass)
def main(args: Array[String]) {
val runtime = RuntimeEnvironment(this, args)
val server = runtime.loadRuntimeConfig[SearchbirdService.ThriftServer]
try {
log.info("Starting SearchbirdService")
server.start()
} catch {
case e: Exception =>
log.error(e, "Failed starting SearchbirdService, exiting")
ServiceTracker.shutdown()
System.exit(1)
}
}
}
~~~
##### …/SearchbirdServiceImpl.scala
這是實質的服務:我們用自己的實現擴展?`SearchbirdService.ThriftServer`?。回憶一下thrift為我們生成的?`SearchbirdService.ThriftServer`?。它為每一個thrift方法生成一個Scala方法。到目前為止,在我們的例子中生成的接口是:
~~~
trait SearchbirdService {
def put(key: String, value: String): Future[Void]
def get(key: String): Future[String]
}
~~~
返回值是?`Future[Value]`?而不是直接返回值,可以推遲它們的計算(finagle的?[文檔](http://twitter.github.io/scala_school/zh_cn/finagle.html)?有?`Future`?更多的細節)。對本教程的目的來說,你唯一需要知道的有關?`Future`?的知識點是,可以通過?`get()`?獲取其值。
`scala-bootstrapper`?默認實現的key-value存儲很簡單:它提供了一個通過?`get`?和?`put`?訪問的?`數據庫`?數據結構。
~~~
class SearchbirdServiceImpl(config: SearchbirdServiceConfig) extends SearchbirdService.ThriftServer {
val serverName = "Searchbird"
val thriftPort = config.thriftPort
override val tracerFactory = config.tracerFactory
val database = new mutable.HashMap[String, String]()
def get(key: String) = {
database.get(key) match {
case None =>
log.debug("get %s: miss", key)
Future.exception(SearchbirdException("No such key"))
case Some(value) =>
log.debug("get %s: hit", key)
Future(value)
}
}
def put(key: String, value: String) = {
log.debug("put %s", key)
database(key) = value
Future.Unit
}
def shutdown() = {
super.shutdown(0.seconds)
}
}
~~~
其結果是構建在 Scala?`HashMap`?上的一個簡單thrift接口。
## 一個簡單的搜索引擎
現在,我們將擴展現有的例子,來創建一個簡單的搜索引擎。然后,我們將進一步擴展它成為由多個分片組成的?*分布式*?搜索引擎,使我們能夠適應比單臺機器內存更大的語料庫。
為了簡單起見,我們將最小化擴展目前的thrift服務,以支持搜索操作。使用模型是用?`put`?把文件加入搜索引擎,其中每個文件包含了一系列的記號(詞),那么我們就可以輸入一串記號,然后搜索會返回包含這個串中所有記號的所有文件。該體系結構是與前面的例子相同,但增加了一個新的@search@調用。

要實現這樣一個搜索引擎需要修改以下兩個文件:
##### src/main/thrift/searchbird.thrift
~~~
service SearchbirdService {
string get(1: string key) throws(1: SearchbirdException ex)
void put(1: string key, 2: string value)
list<string> search(1: string query)
}
~~~
我們增加了一個?`search`?方法來搜索當前哈希表,返回其值與查詢匹配的鍵列表。實現也很簡單直觀:
##### …/SearchbirdServiceImpl.scala
大部分修改都在這個文件中。
現在的?`數據庫`?HashMap保存一個正向索引來持有到文檔的鍵映射。我們重命名它為?`forward`?并增加一個?`倒排(reverse)`?索引(映射記號到所有包含該記號的文件)。所以在?`SearchbirdServiceImpl.scala`?中,更換?`database`?定義:
~~~
val forward = new mutable.HashMap[String, String]
with mutable.SynchronizedMap[String, String]
val reverse = new mutable.HashMap[String, Set[String]]
with mutable.SynchronizedMap[String, Set[String]]
~~~
在?`get`?調用中,使用?`forward`?替換?`數據庫`?即可,在其他方面?`get`?保持不變(僅執行正向查找)。不過?`put`?還需要改變:我們還需要為文件中的每個令牌填充反向索引,把文件的鍵附加到令牌關聯的列表中。用下面的代碼替換?`put`?調用。給定一個特定的搜索令牌,我們現在可以使用?`反向`?映射來查找文件。
~~~
def put(key: String, value: String) = {
log.debug("put %s", key)
forward(key) = value
// serialize updaters
synchronized {
value.split(" ").toSet foreach { token =>
val current = reverse.getOrElse(token, Set())
reverse(token) = current + key
}
}
Future.Unit
}
~~~
需要注意的是(即使?`HashMap`?是線程安全的)同時只能有一個線程可以更新?`倒排`?索引,以確保對映射條目的 讀-修改-寫 是一個原子操作。 (這段代碼過于保守;在進行 檢索-修改-寫 操作時,它鎖定了整個映射,而不是鎖定單個條目。)。另外還要注意使用?`Set`?作為數據結構;這可以確保即使一個文件中兩次出現同樣的符號,它也只會被?`foreach`?循環處理一次。
這個實現仍然有一個問題,作為留給讀者的一個練習:當我們用一個新文檔覆蓋的一個鍵的時候,我們誒有刪除任何倒排索引中引用的舊文件。
現在進入搜索引擎的核心:新的?`search`?方法。他應該解析查詢,尋找匹配的文檔,然后對這些列表做相交操作。這將產生包含所有查詢中的標記的文件列表。在Scala中可以很直接地表達;添加這段代碼到?`SearchbirdServiceImpl`?類中:
~~~
def search(query: String) = Future.value {
val tokens = query.split(" ")
val hits = tokens map { token => reverse.getOrElse(token, Set()) }
val intersected = hits reduceLeftOption { _ & _ } getOrElse Set()
intersected.toList
}
~~~
在這段短短的代碼中有幾件事情是值得關注的。在構建命中列表時,如果鍵(?`token`?)沒有被發現,?`getOrElse`?會返回其第二個參數(在這種情況下,一個空?`Set`?)。我們使用left-reduce執行實際的相交操作。特別是當?`reduceLeftOption`?發現?`hits`?為空時將不會繼續嘗試執行reduce操作。這使我們能夠提供一個默認值,而不是拋出一個異常。其實這相當于:
~~~
def search(query: String) = Future.value {
val tokens = query.split(" ")
val hits = tokens map { token => reverse.getOrElse(token, Set()) }
if (hits.isEmpty)
Nil
else
hits reduceLeft { _ & _ } toList
}
~~~
使用哪種方式大多是個人喜好的問題,雖然函數式風格往往會避開帶有合理默認值的條件語句。
現在,我們可以嘗試在控制臺中實驗我們新的實現。重啟服務器:
~~~
$ ./sbt
...
> compile
> run -f config/development.scala
...
[info] Running com.twitter.searchbird.Main -f config/development.scala
~~~
然后再從searchbird目錄,啟動客戶端:
~~~
$ ./console 127.0.0.1 9999
...
[info] Running com.twitter.searchbird.SearchbirdConsoleClient 127.0.0.1 9999
'client' is bound to your thrift client.
finagle-client>
~~~
粘貼以下說明到控制臺:
~~~
client.put("basics", " values functions classes methods inheritance try catch finally expression oriented")
client.put("basics", " case classes objects packages apply update functions are objects (uniform access principle) pattern")
client.put("collections", " lists maps functional combinators (map foreach filter zip")
client.put("pattern", " more functions! partialfunctions more pattern")
client.put("type", " basic types and type polymorphism type inference variance bounds")
client.put("advanced", " advanced types view bounds higher kinded types recursive types structural")
client.put("simple", " all about sbt the standard scala build")
client.put("more", " tour of the scala collections")
client.put("testing", " write tests with specs a bdd testing framework for")
client.put("concurrency", " runnable callable threads futures twitter")
client.put("java", " java interop using scala from")
client.put("searchbird", " building a distributed search engine using")
~~~
現在,我們可以執行一些搜索,返回包含搜索詞的文件的鍵。
~~~
> client.search("functions").get()
res12: Seq[String] = ArrayBuffer(basics)
> client.search("java").get()
res13: Seq[String] = ArrayBuffer(java)
> client.search("java scala").get()
res14: Seq[String] = ArrayBuffer(java)
> client.search("functional").get()
res15: Seq[String] = ArrayBuffer(collections)
> client.search("sbt").get()
res16: Seq[String] = ArrayBuffer(simple)
> client.search("types").get()
res17: Seq[String] = ArrayBuffer(type, advanced)
~~~
回想一下,如果調用返回一個?`Future`?,我們必須使用一個阻塞的?`get()`?來獲取其中包含的值。我們可以使用?`Future.collect`?命令來創建多個并發請求,并等待所有請求成功返回:
~~~
> import com.twitter.util.Future
...
> Future.collect(Seq(
client.search("types"),
client.search("sbt"),
client.search("functional")
)).get()
res18: Seq[Seq[String]] = ArrayBuffer(ArrayBuffer(type, advanced), ArrayBuffer(simple), ArrayBuffer(collections))
~~~
## 分發我們的服務
單臺機器上一個簡單的內存搜索引擎將無法搜索超過內存大小的語料庫。現在,我們要大膽改進,用一個簡單的分片計劃來構建分布式節點。下面是框圖:

### 抽象
為了幫助我們的工作,我們會先介紹另一個抽象?`索引`?來解耦?`SearchbirdService`?對索引實現的依賴。這是一個直觀的重構。我們首先添加一個索引文件到構建 (創建文件?`searchbird/src/main/scala/com/twitter/searchbird/Index.scala`?):
##### …/Index.scala
~~~
package com.twitter.searchbird
import scala.collection.mutable
import com.twitter.util._
import com.twitter.conversions.time._
import com.twitter.logging.Logger
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.thrift.ThriftClientFramedCodec
trait Index {
def get(key: String): Future[String]
def put(key: String, value: String): Future[Unit]
def search(key: String): Future[List[String]]
}
class ResidentIndex extends Index {
val log = Logger.get(getClass)
val forward = new mutable.HashMap[String, String]
with mutable.SynchronizedMap[String, String]
val reverse = new mutable.HashMap[String, Set[String]]
with mutable.SynchronizedMap[String, Set[String]]
def get(key: String) = {
forward.get(key) match {
case None =>
log.debug("get %s: miss", key)
Future.exception(SearchbirdException("No such key"))
case Some(value) =>
log.debug("get %s: hit", key)
Future(value)
}
}
def put(key: String, value: String) = {
log.debug("put %s", key)
forward(key) = value
// admit only one updater.
synchronized {
(Set() ++ value.split(" ")) foreach { token =>
val current = reverse.get(token) getOrElse Set()
reverse(token) = current + key
}
}
Future.Unit
}
def search(query: String) = Future.value {
val tokens = query.split(" ")
val hits = tokens map { token => reverse.getOrElse(token, Set()) }
val intersected = hits reduceLeftOption { _ & _ } getOrElse Set()
intersected.toList
}
}
~~~
現在,我們把thrift服務轉換成一個簡單的調度機制:為每一個?`索引`?實例提供一個thrift接口。這是一個強大的抽象,因為它分離了索引實現和服務實現。服務不再知道索引的任何細節;索引可以是本地的或遠程的,甚至可能是許多索引的組合,但服務并不關心,索引實現可能會更改但是不用修改服務。
將?`SearchbirdServiceImpl`?類定義更換為以下(簡單得多)的代碼(其中不再包含索引實現細節)。注意初始化服務器現在需要第二個參數?`Index`?。
##### …/SearchbirdServiceImpl.scala
~~~
class SearchbirdServiceImpl(config: SearchbirdServiceConfig, index: Index) extends SearchbirdService.ThriftServer {
val serverName = "Searchbird"
val thriftPort = config.thriftPort
def get(key: String) = index.get(key)
def put(key: String, value: String) =
index.put(key, value) flatMap { _ => Future.Unit }
def search(query: String) = index.search(query)
def shutdown() = {
super.shutdown(0.seconds)
}
}
~~~
##### …/config/SearchbirdServiceConfig.scala
相應地更新?`SearchbirdServiceConfig`?的?`apply`?調用:
~~~
class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] {
var thriftPort: Int = 9999
var tracerFactory: Tracer.Factory = NullTracer.factory
def apply(runtime: RuntimeEnvironment) = new SearchbirdServiceImpl(this, new ResidentIndex)
}
~~~
我們將建立一個簡單的分布式系統,一個主節點組織查詢其子節點。為了實現這一目標,我們將需要兩個新的?`Index`?類型。一個代表遠程索引,另一種是其他多個?`Index`?實例的組合索引。這樣我們的服務就可以實例化多個遠程索引的復合索引來構建分布式索引。請注意這兩個?`Index`?類型具有相同的接口,所以服務器不需要知道它們所連接的索引是遠程的還是復合的。
##### …/Index.scala
在?`Index.scala`?中定義了?`CompositeIndex`?:
~~~
class CompositeIndex(indices: Seq[Index]) extends Index {
require(!indices.isEmpty)
def get(key: String) = {
val queries = indices.map { idx =>
idx.get(key) map { r => Some(r) } handle { case e => None }
}
Future.collect(queries) flatMap { results =>
results.find { _.isDefined } map { _.get } match {
case Some(v) => Future.value(v)
case None => Future.exception(SearchbirdException("No such key"))
}
}
}
def put(key: String, value: String) =
Future.exception(SearchbirdException("put() not supported by CompositeIndex"))
def search(query: String) = {
val queries = indices.map { _.search(query) rescue { case _=> Future.value(Nil) } }
Future.collect(queries) map { results => (Set() ++ results.flatten) toList }
}
}
~~~
組合索引構建在一組相關?`Index`?實例的基礎上。注意它并不關心這些實例實際上是如何實現的。這種組合類型在構建不同查詢機制的時候具有極大的靈活性。我們沒有定義拆分機制,所以復合索引不支持?`put`?操作。這些請求被直接交由子節點處理。?`get`?的實現是查詢所有子節點,并提取第一個成功的結果。如果沒有成功結果的話,則拋出一個異常。注意因為沒有結果是通過拋出一個異常表示的,所以我們?`處理Future`?,是將任何異常轉換成?`None`?。在實際系統中,我們很可能會為遺漏值填入適當的錯誤碼,而不是使用異常。異常在構建原型時是方便和適宜的,但不能很好地組合。為了把真正的例外和遺漏值區分開,必須要檢查異常本身。相反,把這種區別直接嵌入在返回值的類型中是更好的風格。
`search`?像以前一樣工作。和提取第一個結果不同,我們把它們組合起來,通過使用?`Set`?確保其唯一性。
`RemoteIndex`?提供了到遠程服務器的一個?`Index`?接口。
~~~
class RemoteIndex(hosts: String) extends Index {
val transport = ClientBuilder()
.name("remoteIndex")
.hosts(hosts)
.codec(ThriftClientFramedCodec())
.hostConnectionLimit(1)
.timeout(500.milliseconds)
.build()
val client = new SearchbirdService.FinagledClient(transport)
def get(key: String) = client.get(key)
def put(key: String, value: String) = client.put(key, value) map { _ => () }
def search(query: String) = client.search(query) map { _.toList }
}
~~~
這樣就使用一些合理的默認值,調用代理,稍微調整類型,就構造出一個finagle thrift客戶端。
### 全部放在一起
現在我們擁有了需要的所有功能。我們需要調整配置,以便能夠調用一個給定的節點,不管是主節點亦或是數據分片節點。為了做到這一點,我們將通過創建一個新的配置項來在系統中枚舉分片。我們還需要添加?`Index`?參數到我們的?`SearchbirdServiceImpl`?實例。然后,我們將使用命令行參數(還記得?`Config`是如何做到的嗎)在這兩種模式中啟動服務器。
##### …/config/SearchbirdServiceConfig.scala
~~~
class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] {
var thriftPort: Int = 9999
var shards: Seq[String] = Seq()
def apply(runtime: RuntimeEnvironment) = {
val index = runtime.arguments.get("shard") match {
case Some(arg) =>
val which = arg.toInt
if (which >= shards.size || which < 0)
throw new Exception("invalid shard number %d".format(which))
// override with the shard port
val Array(_, port) = shards(which).split(":")
thriftPort = port.toInt
new ResidentIndex
case None =>
require(!shards.isEmpty)
val remotes = shards map { new RemoteIndex(_) }
new CompositeIndex(remotes)
}
new SearchbirdServiceImpl(this, index)
}
}
~~~
現在,我們將調整配置:添加“分片”初始化到?`SearchbirdServiceConfig`?的初始化中(我們可以通過端口9000訪問分片0,9001訪問分片1,依次類推)。
##### config/development.scala
~~~
new SearchbirdServiceConfig {
// Add your own config here
shards = Seq(
"localhost:9000",
"localhost:9001",
"localhost:9002"
)
...
~~~
注釋掉?`admin.httpPort`?的設置(我們不希望在同一臺機器上運行多個服務,而不注釋的話這些服務都會試圖打開相同的端口):
~~~
// admin.httpPort = 9900
~~~
現在,如果我們不帶任何參數調用我們的服務器程序,它會啟動一個主節點來和所有分片通信。如果我們指定一個分片參數,它會在指定端口啟動一個分片服務器。
讓我們試試吧!我們將啟動3個服務:2個分片和1個主節點。首先編譯改動:
~~~
$ ./sbt
> compile
...
> exit
~~~
然后啟動三個服務:
~~~
$ ./sbt 'run -f config/development.scala -D shard=0'
$ ./sbt 'run -f config/development.scala -D shard=1'
$ ./sbt 'run -f config/development.scala'
~~~
您可以在3個不同的窗口中分別運行,或在同一窗口開始依次逐個運行,等待其啟動后,只用ctrl-z懸掛這個命令,并使用?`bg`?將它放在后臺執行。
然后,我們將通過控制臺與它們進行互動。首先,讓我們填充一些數據在兩個分片節點。從searchbird目錄運行:
~~~
$ ./console localhost 9000
...
> client.put("fromShardA", "a value from SHARD_A")
> client.put("hello", "world")
~~~
~~~
$ ./console localhost 9001
...
> client.put("fromShardB", "a value from SHARD_B")
> client.put("hello", "world again")
~~~
一旦完成就可以退出這些控制臺會話。現在通過主節點查詢我們的數據庫(9999端口):
~~~
$ ./console localhost 9999
[info] Running com.twitter.searchbird.SearchbirdConsoleClient localhost 9999
'client' is bound to your thrift client.
finagle-client> client.get("hello").get()
res0: String = world
finagle-client> client.get("fromShardC").get()
SearchbirdException(No such key)
...
finagle-client> client.get("fromShardA").get()
res2: String = a value from SHARD_A
finagle-client> client.search("hello").get()
res3: Seq[String] = ArrayBuffer()
finagle-client> client.search("world").get()
res4: Seq[String] = ArrayBuffer(hello)
finagle-client> client.search("value").get()
res5: Seq[String] = ArrayBuffer(fromShardA, fromShardB)
~~~
這個設計有多個數據抽象,允許更加模塊化和可擴展的實現:
* `ResidentIndex`?數據結構對網絡、服務器或客戶端一無所知。
* `CompositeIndex`?對其索引構成的底層數據結構和組合方式一無所知;它只是簡單地把請求分配給他們。
* 服務器相同的?`search`?接口(特質)允許服務器查詢其本地數據結構(`ResidentIndex`) ,或分發到其他服務器(`CompositeIndex`) 查詢,而不需要知道這個區別,這是從調用隱藏的。
* `SearchbirdServiceImpl`?和?`Index`?現在是相互獨立的模塊,這使服務實現變得簡單,同時實現了服務和其數據結構之間的分離。
* 這個設計靈活到允許一個或多個遠程索引運行在本地機器或遠程機器上。
這個實現的可能改進將包括:
* 當前的實現將?`put()`?調用發送到所有節點。取而代之,我們可以使用一個哈希表,將?`put()`調用只發送到一個節點,而在所有節點之間分配存儲。
* 但是值得注意的是,在這個策略下我們失去了冗余。我們怎樣在不需要完全復制的前提下保持一定的冗余度呢?
* 當系統出錯時我們沒有做任何有趣的處理(例如我們沒有處理任何異常)。
[1](http://twitter.github.io/scala_school/zh_cn/searchbird.html#fnr1)?本地?`./sbt`?腳本只是保證該SBT版本和我們知道的所有庫是一致的。
[2](http://twitter.github.io/scala_school/zh_cn/searchbird.html#fnr2)?在?`target/gen-scala/com/twitter/searchbird/SearchbirdService.scala`?。
[3](http://twitter.github.io/scala_school/zh_cn/searchbird.html#fnr3)?更多信息見Ostrich’s?[README](https://github.com/twitter/ostrich/blob/master/README.md)?。
Built at?[@twitter](http://twitter.com/twitter)?by?[@stevej](http://twitter.com/stevej),?[@marius](http://twitter.com/marius), and?[@lahosken](http://twitter.com/lahosken)?with much help from?[@evanm](http://twitter.com/evanm),?[@sprsquish](http://twitter.com/sprsquish),?[@kevino](http://twitter.com/kevino),?[@zuercher](http://twitter.com/zuercher),?[@timtrueman](http://twitter.com/timtrueman),?[@wickman](http://twitter.com/wickman), and[@mccv](http://twitter.com/mccv); Russian translation by?[appigram](https://github.com/appigram); Chinese simple translation by?[jasonqu](https://github.com/jasonqu); Korean translation by?[enshahar](https://github.com/enshahar);
Licensed under the?[Apache License v2.0](http://www.apache.org/licenses/LICENSE-2.0).