Twitter最重要的標準庫是?[Util](http://github.com/twitter/util)?和?[Finagle](https://github.com/twitter/finagle)。Util 可以理解為Scala和Java的標準庫擴展,提供了標準庫中沒有的功能或已有功能的更合適的實現。Finagle 是我們的RPC系統,核心分布式系統組件。
### Future
Futures已經在并發一節中簡單討論過。它是調異步處理的中心機制,滲透在我們代碼庫中,也是Finagle的核心。Futures允許組合并發事件,簡化了高并發操作。也是JVM上異步并發的一種高效的實現。
Twitter的future是*異步*的,所以基本上任何操作(阻塞操作)——基本上任何可以suspend它的線程的執行;網絡IO和磁盤IO是就是例子——必須由系統處理,它為結果提供future。Finagle為網絡IO提供了這樣一種系統。
Futures清晰簡單:它們持有一個尚未完成運算結果的 promise 。它們是一個簡單的容器——一個占位符。一次計算當然可能會失敗,這種狀況必須被編碼:一個Future可以是三種狀態之一: pending, failed, completed。
### 閑話:?*組合(composition)*
讓我們重新審視我們所說的組合:將簡單的組件合成一個更復雜的。函數組合的一個權威的例子:給定函數 f 和 g,組合函數 (g°f)(x) = g(f(x)) ——結果先對 x使用f函數,然后在使用g函數——用Scala來寫:
~~~
val f = (i: Int) => i.toString
val g = (s: String) => s+s+s
val h = g compose f // : Int => String
scala> h(123)
res0: java.lang.String = 123123123
~~~
復合函數h,是個新的函數,由之前定義的f和g函數合成。
Futures是一種集合類型——它是個包含0或1個元素的容器——你可以發現他們有標準的集合方法(eg:map, filter, foreach)。因為Future的值是延遲的,結果應用這些方法中的任何一種必然也延遲;在
~~~
val result: Future[Int]
val resultStr: Future[String] = result map { i => i.toString }
~~~
函數 { i => i.toString } 不會被調用,直到int值可用;轉換集合的resultStr在可用之前也一直是待定狀態。
List可以被扁平化(flattened):
~~~
val listOfList: List[List[Int]] = ..
val list: List[Int] = listOfList.flatten
~~~
這對future也是有意義的:
~~~
val futureOfFuture: Future[Future[Int]] = ..
val future: Future[Int] = futureOfFuture.flatten
~~~
因為future是延遲的,flatten的實現——立即返回——不得不返回一個等待外部future (`**Future[**Future[Int]**]**`) 完成的future (`Future[**Future[Int]**]`).如果外部future失敗,內部flattened future也將失敗。
Future (類似List) 也定義了flatMap;Future[A] 定義方法flatMap的簽名
~~~
flatMap[B](f: A => Future[B]): Future[B]
~~~
如同組合 map 和 flatten,我們可以這樣實現:
~~~
def flatMap[B](f: A => Future[B]): Future[B] = {
val mapped: Future[Future[B]] = this map f
val flattened: Future[B] = mapped.flatten
flattened
}
~~~
這是一種有威力的組合!使用flatMap我們可以定義一個 Future 作為兩個Future序列的結果。第二個future 的計算基于第一個的結果。想象我們需要2次RPC調用來驗證一個用戶身份,我們可以用下面的方式組合操作:
~~~
def getUser(id: Int): Future[User]
def authenticate(user: User): Future[Boolean]
def isIdAuthed(id: Int): Future[Boolean] =
getUser(id) flatMap { user => authenticate(user) }
~~~
這種組合類型的一個額外的好處是錯誤處理是內置的:如果getUser(..)或authenticate(..)失敗,future 從 isAuthred(..)返回時將會失敗。這里我們沒有額外的錯誤處理的代碼。
#### 風格
Future回調方法(respond, onSuccess, onFailure, ensure) 返回一個新的Future,并鏈接到調用者。這個Future被保證只有在它調用者完成后才完成,使用模式如下:
~~~
acquireResource()
future onSuccess { value =>
computeSomething(value)
} ensure {
freeResource()
}
~~~
freeResource() 被保證只有在 computeSomething之后才執行,這樣就模擬了try-finally 模式。
使用 onSuccess替代 foreach —— 它與 onFailure 方法對稱,命名的意圖更明確,并且也允許 chaining。
永遠避免直接創建Promise實例: 幾乎每一個任務都可以通過使用預定義的組合子完成。這些組合子確保錯誤和取消是可傳播的, 通常鼓勵的數據流風格的編程,不再需要同步和volatility聲明。
用尾遞歸風格編寫的代碼不再導致堆棧空間泄漏,并使得以數據流風格高效的實現循環成為可能:
~~~
case class Node(parent: Option[Node], ...)
def getNode(id: Int): Future[Node] = ...
def getHierarchy(id: Int, nodes: List[Node] = Nil): Future[Node] =
getNode(id) flatMap {
case n@Node(Some(parent), ..) => getHierarchy(parent, n :: nodes)
case n => Future.value((n :: nodes).reverse)
}
~~~
Future定義很多有用的方法: 使用 Future.value() 和 Future.exception() 來創建未滿意(pre-satisfied) 的future。Future.collect(), Future.join() 和 Future.select() 提供了組合子將多個future合成一個(例如:scatter-gather操作的gather部分)。
#### Cancellation
Future實現了一種弱形式的取消。調用Future#cancel 不會直接終止運算,而是發送某個級別的可被任何處理查詢的觸發信號,最終滿足這個future。Cancellation信號流向相反的方向:一個由消費者設置的cancellation信號,會傳播到它的生產者。生產者使用 Promise的onCancellation來監聽信號并執行相應的動作。
這意味這cancellation語意上依賴生產者,沒有默認的實現。cancellation只是一個提示。
#### Local
Util的[Local](https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Local.scala#L40)提供了一個位于特定的future派發樹(dispatch tree)的引用單元(cell)。設定一個local的值,使這個值可以用于被同一個線程的Future 延遲的任何計算。有一些類似于thread locals(注:Java中的線程機制),不同的是它們的范圍不是一個Java線程,而是一個 future 線程樹。在
~~~
trait User {
def name: String
def incrCost(points: Int)
}
val user = new Local[User]
...
user() = currentUser
rpc() ensure {
user().incrCost(10)
}
~~~
在 ensure塊中的 user() 將在回調被添加的時候引用 user local的值。
就thread locals來說,我們的Locals非常的方便,但要盡量避免使用:除非確信通過顯式傳遞數據時問題不能被充分的解決,哪怕解決起來有些繁重。
Locals有效的被核心庫使用在非常常見的問題上——線程通過RPC跟蹤,傳播監視器,為future的回調創建stack traces——任何其他解決方法都使得用戶負擔過度。Locals在幾乎任何其他情況下都不適合。
### Offer/Broker
并發系統由于需要協調訪問數據和資源而變得復雜。[Actor](http://www.scala-lang.org/api/current/scala/actors/Actor.html)提出一種簡化的策略:每一個actor是一個順序的進程(process),保持自己的狀態和資源,數據通過消息的方式與其它actor共享。 共享數據需要actor之間通信。
Offer/Broker 建立于Actor之上,以這三種重要的方式表現:1,通信通道(Brokers)是first class——即發送消息需要通過Brokers,而非直接到actor。2, Offer/Broker 是一種同步機制:通信會話是同步的。 這意味我們可以用 Broker做為協調機制:當進程a發送一條信息給進程b;a和b都要對系統狀態達成一致。3, 最后,通信可以選擇性地執行:一個進程可以提出幾個不同的通信,其中的一個將被獲取。
為了以一種通用的方式支持選擇性通信(以及其他組合),我們需要將通信的描述和執行解耦。這正是Offer做的——它是一個持久數據用于描述一次通信;為了執行這個通信(offer執行),我們通過它的sync()方法同步
~~~
trait Offer[T] {
def sync(): Future[T]
}
~~~
返回 Future[T] 當通信被獲取的時候生成交換值。
Broker通過offer協調值的交換——它是通信的通道:
~~~
trait Broker[T] {
def send(msg: T): Offer[Unit]
val recv: Offer[T]
}
~~~
所以,當創建兩個offer
~~~
val b: Broker[Int]
val sendOf = b.send(1)
val recvOf = b.recv
~~~
sendOf和recvOf都同步
~~~
// In process 1:
sendOf.sync()
// In process 2:
recvOf.sync()
~~~
兩個offer都獲取并且值1被交換。
通過將多個offer和Offer.choose綁定來執行可選擇通信。
~~~
def choose[T](ofs: Offer[T]*): Offer[T]
~~~
上面的代碼生成一個新的offer,當同步時獲取一個特定的ofs——第一個可用的。當多個都立即可用時,隨機獲取一個。
Offer對象有些一次性的Offers用于與來自Broker的Offer構建。
~~~
Offer.timeout(duration): Offer[Unit]
~~~
offer在給定時間后激活。Offer.never將用于不會有效,Offer.const(value)在給定值后立即有效。這些操作由選擇性通信來組合是非常有用的。例如,在一個send操作中使用超時:
~~~
Offer.choose(
Offer.timeout(10.seconds),
broker.send("my value")
).sync()
~~~
人們可能會比較 Offer/Broker 與[SynchronousQueue](http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/SynchronousQueue.html),他們有細微但非常重要的區別。Offer可以被組合,而queue不能。例如,考慮一組queues,描述為 Brokers:
~~~
val q0 = new Broker[Int]
val q1 = new Broker[Int]
val q2 = new Broker[Int]
~~~
現在讓我們為讀取創建一個合并的queue
~~~
val anyq: Offer[Int] = Offer.choose(q0.recv, q1.recv, q2.recv)
~~~
anyq是一個將從第一個可用的queue中讀取的offer。注意 anyq 仍是同步的——我們仍然擁有底層隊列的語義。這類組合是不可能用queue實現的。
#### 例子:一個簡單的連接池
連接池在網絡應用中很常見,并且它們的實現常常需要技巧——例如,在從池中獲取一個連接的時候,通常需要超時機制,因為不同的客戶端有不同的延遲需求。池的簡單原則:維護一個連接隊列,滿足那些進入的等待者。使用傳統的同步原語,這通常需要兩個隊列(queues):一個用于等待者(當沒有連接可用時),一個用于連接(當沒有等待者時)。
使用 Offer/Brokers ,可以表達得非常自然:
~~~
class Pool(conns: Seq[Conn]) {
private[this] val waiters = new Broker[Conn]
private[this] val returnConn = new Broker[Conn]
val get: Offer[Conn] = waiters.recv
def put(c: Conn) { returnConn ! c }
private[this] def loop(connq: Queue[Conn]) {
Offer.choose(
if (connq.isEmpty) Offer.never else {
val (head, rest) = connq.dequeue
waiters.send(head) { _ => loop(rest) }
},
returnConn.recv { c => loop(connq enqueue c) }
).sync()
}
loop(Queue.empty ++ conns)
}
~~~
loop總是提供一個歸還的連接,但只有queue非空的時候才會send。 使用持久化隊列(persistent queue)更進一步簡化邏輯。與連接池的接口也是通過Offer實現,所以調用者如果愿意設置timeout,他們可以通過利用組合子(combinators)來做:
~~~
val conn: Future[Option[Conn]] = Offer.choose(
pool.get { conn => Some(conn) },
Offer.timeout(1.second) { _ => None }
).sync()
~~~
實現timeout不需要額外的記賬(bookkeeping);這是因為Offer的語義:如果Offer.timeout被選擇,不會再有offer從池中獲得——連接池和它的調用者在各自waiter的broker上不必同時同意接受和發送。
#### 埃拉托色尼篩子(Sieve of Eratosthenes 譯注:一種用于篩選素數的算法)
把并發程序構造為一組順序的同步通信進程,通常很有用——有時程序被大大地簡化了。Offer和Broker提供了一組工具來讓它簡單并一致。確實,它們的應用超越了我們可能認為是經典并發性問題——并發編程(有Offer/Broker的輔助)是一種有用的構建工具,正如子例程(subroutines),類,和模塊都是——來自CSP(譯注:Communicating sequential processes的縮寫,即通信順序進程)的重要思想。
這里有一個[埃拉托色尼篩子](http://ja.wikipedia.org/wiki/%E3%82%A8%E3%83%A9%E3%83%88%E3%82%B9%E3%83%86%E3%83%8D%E3%82%B9%E3%81%AE%E7%AF%A9)可以構造為一個針對一個整數流(stream of integers)的連續的應用過濾器 。首先,我們需要一個整數的源(source of integers):
~~~
def integers(from: Int): Offer[Int] = {
val b = new Broker[Int]
def gen(n: Int): Unit = b.send(n).sync() ensure gen(n + 1)
gen(from)
b.recv
}
~~~
integers(n) 方法簡單地提供了從n開始的所有連續的整數。然后我們需要一個過濾器:
~~~
def filter(in: Offer[Int], prime: Int): Offer[Int] = {
val b = new Broker[Int]
def loop() {
in.sync() onSuccess { i =>
if (i % prime != 0)
b.send(i).sync() ensure loop()
else
loop()
}
}
loop()
b.recv
}
~~~
filter(in, p) 方法返回的offer刪除了in中的所有質數(prime)的倍數。最終我們定義了我們的篩子(sieve):
~~~
def sieve = {
val b = new Broker[Int]
def loop(of: Offer[Int]) {
for (prime <- of.sync(); _ <- b.send(prime).sync())
loop(filter(of, prime))
}
loop(integers(2))
b.recv
}
~~~
loop() 工作很簡單:從of中讀取下一個質數,然后對of應用過濾器排除這個質數。loop不斷的遞歸,持續的質數被過濾,于是我們得到了篩選結果。我們現在打印前10000個質數:
~~~
val primes = sieve
0 until 10000 foreach { _ =>
println(primes.sync()())
}
~~~
除了構造簡單,組件正交,這種做法也給你一種流式篩子(streaming sieve):你不需要事先計算出你感興趣的質數集合,從而進一步提高了模塊化。