上一章介紹了 Future 類型,以及如何用它來編寫高可讀性、高組合性的異步執行代碼。
Future 只是整個謎團的一部分:它是一個只讀類型,允許你使用它計算得到的值,或者處理計算中出現的錯誤。但是在這之前,必須得有一種方法把這個值放進去。這一章里,你將會看到如何通過 Promise 類型來達到這個目的。
### 類型 Promise
之前,我們把一段順序執行的代碼塊傳遞給了 `scala.concurrent` 里的 `future` 方法,并且在作用域中給出了一個 `ExecutionContext`,它神奇地異步調用代碼塊,返回一個 Future 類型的結果。
雖然這種獲得 Future 的方式很簡單,但還有其他的方法來創建 Future 實例,并填充它,這就是 Promise。Promise 允許你在 Future 里放入一個值,不過只能做一次,Future 一旦完成,就不能更改了。
一個 Future 實例總是和一個(也只能是一個)Promise 實例關聯在一起。如果你在 REPL 里調用 `future` 方法,你會發現返回的也是一個 Promise:
~~~
import concurrent.Future
import concurrent.Future
scala> import concurrent.future
import concurrent.future
scala> import concurrent.ExecutionContext.Implicits.global
import concurrent.ExecutionContext.Implicits.global
scala> val f: Future[String] = future { "Hello World!" }
f: scala.concurrent.Future[String] = scala.concurrent.impl.Promise$DefaultPromise@2b509249
~~~
你得到的對象是一個 `DefaultPromise` ,它實現了 `Future` 和 `Promise` 接口,不過這就是具體的實現細節了(譯注,有興趣的讀者可翻閱其實現的源碼),使用者只需要知道代碼實現把 Future 和對應的 Promise 之間的聯系分的很清晰。
這個小例子說明了:除了通過 Promise,沒有其他方法可以完成一個 Future,`future` 方法也只是一個輔助函數,隱藏了具體的實現機制。
現在,讓我們動動手,看看怎樣直接使用 Promise 類型。
#### 給出承諾
當我們談論起承諾能否被兌現時,一個很熟知的例子是那些政客的競選諾言。
假設被推選的政客給他的投票者一個減稅的承諾。這可以用 `Promise[TaxCut]` 表示:
~~~
import concurrent.Promise
case class TaxCut(reduction: Int)
// either give the type as a type parameter to the factory method:
val taxcut = Promise[TaxCut]()
// or give the compiler a hint by specifying the type of your val:
val taxcut2: Promise[TaxCut] = Promise()
// taxcut: scala.concurrent.Promise[TaxCut] = scala.concurrent.impl.Promise$DefaultPromise@66ae2a84
// taxcut2: scala.concurrent.Promise[TaxCut] = scala.concurrent.impl.Promise$DefaultPromise@346974c6
~~~
一旦創建了這個 Promise,就可以在它上面調用 `future` 方法來獲取承諾的未來:
~~~
val taxCutF: Future[TaxCut] = taxcut.future
// `> scala.concurrent.Future[TaxCut] ` scala.concurrent.impl.Promise$DefaultPromise@66ae2a84
~~~
返回的 Future 可能并不和 Promise 一樣,但在同一個 Promise 上調用 `future` 方法總是返回同一個對象,以確保 Promise 和 Future 之間一對一的關系。
#### 結束承諾
一旦給出了承諾,并告訴全世界會在不遠的將來兌現它,那最好盡力去實現。在 Scala 中,可以結束一個 Promise,無論成功還是失敗。
##### 兌現承諾
為了成功結束一個 Promise,你可以調用它的 `success` 方法,并傳遞一個大家期許的結果:
~~~
taxcut.success(TaxCut(20))
~~~
這樣做之后,Promise 就無法再寫入其他值了,如果偏要再寫,會產生異常。
此時,和 Promise 關聯的 Future 也成功完成,注冊的回調會開始執行,或者說對這個 Future 進行了映射,那這個時候,映射函數也該執行了。
一般來說,Promise 的完成和對返回的 Future 的處理發生在不同的線程。很可能你創建了 Promise,并立即返回和它關聯的 Future 給調用者,而實際上,另外一個線程還在計算它。
為了說明這一點,我們拿減稅來舉個例子:
~~~
object Government {
def redeemCampaignPledge(): Future[TaxCut] = {
val p = Promise[TaxCut]()
Future {
println("Starting the new legislative period.")
Thread.sleep(2000)
p.success(TaxCut(20))
println("We reduced the taxes! You must reelect us!!!!1111")
}
p.future
}
}
~~~
這個例子中使用了 Future 伴生對象,不過不要被它搞混淆了,這個例子的重點是:Promise 并不是在調用者的線程里完成的。
現在我們來兌現當初的競選宣言,在 Future 上添加一個 `onComplete` 回調:
~~~
import scala.util.{Success, Failure}
val taxCutF: Future[TaxCut] = Government.redeemCampaignPledge()
println("Now that they're elected, let's see if they remember their promises...")
taxCutF.onComplete {
case Success(TaxCut(reduction)) =>
println(s"A miracle! They really cut our taxes by $reduction percentage points!")
case Failure(ex) =>
println(s"They broke their promises! Again! Because of a ${ex.getMessage}")
}
~~~
多次運行這個例子,會發現顯示屏輸出的結果順序是不確定的,而且,最終回調函數會執行,進入成功的那個 case 。
##### 違背諾言
政客習慣違背諾言,Scala 程序員有時候也只能這樣做。調用 `failure` 方法,傳遞一個異常,結束 Promise:
~~~
case class LameExcuse(msg: String) extends Exception(msg)
object Government {
def redeemCampaignPledge(): Future[TaxCut] = {
val p = Promise[TaxCut]()
Future {
println("Starting the new legislative period.")
Thread.sleep(2000)
p.failure(LameExcuse("global economy crisis"))
println("We didn't fulfill our promises, but surely they'll understand.")
}
p.future
}
}
~~~
這個 `redeemCampaignPledge` 實現最終會違背承諾。一旦用 `failure` 結束這個 Promise,也無法再次寫入了,正如 `success` 方法一樣。相關聯的 Future 也會以 `Failure` 收場。
如果已經有了一個 Try,那可以直接把它傳遞給 Promise 的 `complete` 方法,以此來結束這個它。如果這個 Try 是一個 Success,關聯的 Future 會成功完成,否則,就失敗。
### 基于 Future 的編程實踐
如果想使用基于 Future 的編程范式以增加應用的擴展性,那應用從下到上都必須被設計成非阻塞模式。這意味著,基本上應用層所有的函數都應該是異步的,并且返回 Future。
當下,一個可能的使用場景是開發 Web 應用。流行的 Scala Web 框架,允許你將響應作為 `Future[Response]` 返回,而不是等到你完成響應再返回。這個非常重要,因為它允許 Web 服務器用少量的線程處理更多的連接。通過賦予服務器 `Future[Response]` 的能力,你可以最大化服務器線程池的利用率。
而且,應用的服務可能需要多次調用數據庫層以及(或者)某些外部服務,這時候可以獲取多個 Future,用 for 語句將它們組合成新的 Future,簡單可讀!最終,Web 層再將這樣的一個 Future 變成 `Future[Response]`。
但是該怎樣在實踐中實現這些呢?需要考慮三種不同的場景:
#### 非阻塞IO
應用很可能涉及到大量的 IO 操作。比如,可能需要和數據庫交互,還可能作為客戶端去調用其他的 Web 服務。
如果是這樣,可以使用一些基于 Java 非阻塞 IO 實現的庫,也可以直接或通過 Netty 這樣的庫來使用 Java 的 NIO API。這樣的庫可以用定量的線程池處理大量的連接。
但如果是想開發這樣的一個庫,直接和 Promise 打交道更為合適。
#### 阻塞 IO
有時候,并沒有基于 NIO 的庫可用。比如,Java 世界里大多數的數據庫驅動都是使用阻塞 IO。在 Web 應用中,如果用這樣的驅動發起大量訪問數據庫的調用,要記得這些調用是發生在服務器線程里的。為了避免這個問題,可以將所有需要和數據庫交互的代碼都放入 `future` 代碼塊里,就像這樣:
~~~
// get back a Future[ResultSet] or something similar:
Future {
queryDB(query)
}
~~~
到現在為止,我們都是使用隱式可用的全局 `ExecutionContext` 來執行這些代碼塊。通常,更好的方式是創建一個專用的 `ExecutionContext` 放在數據庫層里。可以從 Java的 `ExecutorService` 來它,這也意味著,可以異步的調整線程池來執行數據庫調用,應用的其他部分不受影響。
~~~
import java.util.concurrent.Executors
import concurrent.ExecutionContext
val executorService = Executors.newFixedThreadPool(4)
val executionContext = ExecutionContext.fromExecutorService(executorService)
~~~
#### 長時間運行的計算
取決于應用的本質特點,一個應用偶爾還會調用一些長時間運行的任務,它們完全不涉及 IO(CPU 密集的任務)。這些任務也不應該在服務器線程中執行,因此需要將它們變成 Future:
~~~
Future {
longRunningComputation(data, moreData)
}
~~~
同樣,最好有一些專屬的 `ExecutionContext` 來處理這些 CPU 密集的計算。怎樣調整這些線程池大小取決于應用的特征,這些已經超過了本文的范圍。
### 總結
這一章里,我們學習了 Promise - 基于 Future 的并發范式的可寫組件,以及怎樣用它來完成一個 Future;同時,還給出了一些在實踐中使用它們的建議。
下一章會討論 Scala 函數式編程是如何增加代碼可用性(一個長久以來和面向對象編程相關聯的概念)的。