作為一個對 Scala 充滿熱情的開發者,你應該已經聽說過 Scala 處理并發的能力,或許你就是被這個吸引來的。相較于大多數編程語言低級的并發 API,Scala 提供的方法可以讓人們更好的理解并發以及編寫良構的并發程序。
本章的主題- Future 就是這種方法的兩大基石之一。(另一個是 actor)我會解釋 Future 的優點,以及它的函數式特征。
如果你想動手試試接下來的例子,請確保 Scala 版本不低于 2.9.3,Future 在 2.10.0 版本中引入,并向后兼容到 2.9.3,最初,它是 Akka 庫的一部分(API略有不同)。
### 順序代碼為什么會變壞
假設你想準備一杯卡布奇諾,你可以一個接一個的執行以下步驟:
1. 研磨所需的咖啡豆
1. 加熱一些水
1. 用研磨好的咖啡豆和熱水制做一杯咖啡
1. 打奶泡
1. 結合咖啡和奶泡做成卡布奇諾
轉換成 Scala 代碼,可能會是這樣:
~~~
import scala.util.Try
// Some type aliases, just for getting more meaningful method signatures:
type CoffeeBeans = String
type GroundCoffee = String
case class Water(temperature: Int)
type Milk = String
type FrothedMilk = String
type Espresso = String
type Cappuccino = String
// dummy implementations of the individual steps:
def grind(beans: CoffeeBeans): GroundCoffee = s"ground coffee of $beans"
def heatWater(water: Water): Water ` water.copy(temperature ` 85)
def frothMilk(milk: Milk): FrothedMilk = s"frothed $milk"
def brew(coffee: GroundCoffee, heatedWater: Water): Espresso = "espresso"
def combine(espresso: Espresso, frothedMilk: FrothedMilk): Cappuccino = "cappuccino"
// some exceptions for things that might go wrong in the individual steps
// (we'll need some of them later, use the others when experimenting with the code):
case class GrindingException(msg: String) extends Exception(msg)
case class FrothingException(msg: String) extends Exception(msg)
case class WaterBoilingException(msg: String) extends Exception(msg)
case class BrewingException(msg: String) extends Exception(msg)
// going through these steps sequentially:
def prepareCappuccino(): Try[Cappuccino] = for {
ground <- Try(grind("arabica beans"))
water <- Try(heatWater(Water(25)))
espresso <- Try(brew(ground, water))
foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)
~~~
這樣做有幾個優點:可以很輕易的弄清楚事情的步驟,一目了然,而且不會混淆。(畢竟沒有上下文切換)不好的一面是,大部分時間,你的大腦和身體都處于等待的狀態:在等待研磨咖啡豆時,你完全不能做任何事情,只有當這一步完成后,你才能開始燒水。這顯然是在浪費時間,所以你可能想一次開始多個步驟,讓它們同時執行,一旦水燒開,咖啡豆也磨好了,你可以制做咖啡了,這期間,打奶泡也可以開始了。
這和編寫軟件沒什么不同。一個 Web 服務器可以用來處理和響應請求的線程只有那么多,不能因為要等待數據庫查詢或其他 HTTP 服務調用的結果而阻塞了這些可貴的線程。相反,一個異步編程模型和非阻塞 IO 會更合適,這樣的話,當一個請求處理在等待數據庫查詢結果時,處理這個請求的線程也能夠為其他請求服務。
> "I heard you like callbacks, so I put a callback in your callback!"
在并發家族里,你應該已經知道 nodejs 這個很酷的家伙,nodejs 完全通過回調來通信,不幸的是,這很容易導致回調中包含回調的回調,這簡直是一團糟,代碼難以閱讀和調試。
Scala 的 Future 也允許回調,但它提供了更好的選擇,所以你不怎么需要它。
> "I know Futures, and they are completely useless!"
也許你知道些其他的 Future 實現,最引人注目的是 Java 提供的那個。但是對于 Java 的 Future,你只能去查看它是否已經完成,或者阻塞線程直到其結束。簡而言之,Java 的 Future 幾乎沒有用,而且用起來絕對不會讓人開心。
如果你認為 Scala 的 Future 也是這樣,那大錯特錯了!
### Future 語義
_scala.concurrent_ 包里的 `Future[T]` 是一個容器類型,代表一種返回值類型為 `T` 的計算。計算可能會出錯,也可能會超時;從而,當一個 future 完成時,它可能會包含異常,而不是你期望的那個值。
Future 只能寫一次: 當一個 future 完成后,它就不能再被改變了。同時,Future 只提供了讀取計算值的接口,寫入計算值的任務交給了 Promise,這樣,API 層面上會有一個清晰的界限。這篇文章里,我們主要關注前者,下一章會介紹 Promise 的使用。
### 使用 Future
Future 有多種使用方式,我將通過重寫 “卡布奇諾” 這個例子來說明。
首先,所有可以并行執行的函數,應該返回一個 Future:
~~~
import scala.concurrent.future
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random
def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
println("start grinding...")
Thread.sleep(Random.nextInt(2000))
if (beans == "baked beans") throw GrindingException("are you joking?")
println("finished grinding...")
s"ground coffee of $beans"
}
def heatWater(water: Water): Future[Water] = Future {
println("heating the water now")
Thread.sleep(Random.nextInt(2000))
println("hot, it's hot!")
water.copy(temperature = 85)
}
def frothMilk(milk: Milk): Future[FrothedMilk] = Future {
println("milk frothing system engaged!")
Thread.sleep(Random.nextInt(2000))
println("shutting down milk frothing system")
s"frothed $milk"
}
def brew(coffee: GroundCoffee, heatedWater: Water): Future[Espresso] = Future {
println("happy brewing :)")
Thread.sleep(Random.nextInt(2000))
println("it's brewed!")
"espresso"
}
~~~
上面的代碼有幾處需要解釋。
首先是 Future 伴生對象里的 `apply` 方法需要兩個參數:
~~~
object Future {
def apply[T](body: => T)(implicit execctx: ExecutionContext): Future[T]
}
~~~
要異步執行的計算通過傳名參數 `body` 傳入。第二個參數是一個隱式參數,隱式參數是說,函數調用時,如果作用域中存在一個匹配的隱式值,就無需顯示指定這個參數。`ExecutionContext` 可以執行一個 Future,可以把它看作是一個線程池,是絕大部分 Future API 的隱式參數。
`import scala.concurrent.ExecutionContext.Implicits.global` 語句引入了一個全局的執行上下文,確保了隱式值的存在。這時候,只需要一個單元素列表,可以用大括號來代替小括號。調用 `future` 方法時,經常使用這種形式,使得它看起來像是一種語言特性,而不是一個普通方法的調用。
這個例子沒有大量計算,所以用隨機休眠來模擬以說明問題,而且,為了更清晰的說明并發代碼的執行順序,還在“計算”之前和之后打印了些東西。
計算會在 Future 創建后的某個不確定時間點上由 `ExecutionContext` 給其分配的某個線程中執行。
#### 回調
對于一些簡單的問題,使用回調就能很好解決。Future 的回調是偏函數,你可以把回調傳遞給 Future 的 `onSuccess` 方法,如果這個 Future 成功完成,這個回調就會執行,并把 Future 的返回值作為參數輸入:
~~~
grind("arabica beans").onSuccess { case ground =>
println("okay, got my ground coffee")
}
~~~
類似的,也可以在 `onFailure` 上注冊回調,只不過它是在 Future 失敗時調用,其輸入是一個 `Throwable`。
通常的做法是將兩個回調結合在一起以更好的處理 Future:在 `onComplete` 方法上注冊回調,回調的輸入是一個 Try。
~~~
import scala.util.{Success, Failure}
grind("baked beans").onComplete {
case Success(ground) => println(s"got my $ground")
case Failure(ex) => println("This grinder needs a replacement, seriously!")
}
~~~
傳遞給 `grind` 的是 “baked beans”,因此 `grind` 方法會產生異常,進而導致 Future 中的計算失敗。
#### Future 組合
當嵌套使用 Future 時,回調就變得比較煩人。不過,你也沒必要這么做,因為 Future 是可組合的,這是它真正發揮威力的時候!
你一定已經注意到,之前討論過的所有容器類型都可以進行 `map` 、 `flatMap` 操作,也可以用在 for 語句中。作為一種容器類型,Future 支持這些操作也不足為奇!
真正的問題是,在還沒有完成的計算上執行這些操作意味這什么,如何去理解它們?
#### Map 操作
Scala 讓 “時間旅行” 成為可能!假設想在水加熱后就去檢查它的溫度,可以通過將 `Future[Water]` 映射到 `Future[Boolean]` 來完成這件事情:
~~~
val tempreatureOkay: Future[Boolean] = heatWater(Water(25)) map { water =>
println("we're in the future!")
(80 to 85) contains (water.temperature)
}
~~~
`tempreatureOkay` 最終會包含水溫的結果。你可以去改變 `heatWater` 的實現來讓它拋出異常(比如說,加熱器爆炸了),然后等待 “we're in the future!” 出現在顯示屏上,不過你永遠等不到。
寫傳遞給 `map` 的函數時,你就處在未來(或者說可能的未來)。一旦 `Future[Water]` 實例成功完成,這個函數就會執行,只不過,該函數所在的時間線可能不是你現在所處的這個。如果 `Future[Water` 失敗,傳遞給 `map` 的函數中的事情永遠不會發生,調用 `map` 的結果將是一個失敗的 `Future[Boolean]`。
#### FlatMap 操作
如果一個 Future 的計算依賴于另一個 Future 的結果,那需要求救于 `flatMap` 以避免 Future 的嵌套。
假設,測量水溫的線程需要一些時間,那你可能想異步的去檢查水溫是否 OK。比如,有一個函數,接受一個 `Water` ,并返回 `Future[Boolean]` :
~~~
def temperatureOkay(water: Water): Future[Boolean] = future {
(80 to 85) contains (water.temperature)
}
~~~
使用 `flatMap`(而不是 `map`)得到一個 `Future[Boolean]`,而不是 `Future[Future[Boolean]]`:
~~~
val nestedFuture: Future[Future[Boolean]] = heatWater(Water(25)) map {
water => temperatureOkay(water)
}
val flatFuture: Future[Boolean] = heatWater(Water(25)) flatMap {
water => temperatureOkay(water)
}
~~~
同樣,映射只會發生在 `Future[Water]` 成功完成情況下。
#### for 語句
除了調用 `flatMap` ,也可以寫成 for 語句。上面的例子可以重寫成:
~~~
val acceptable: Future[Boolean] = for {
heatedWater <- heatWater(Water(25))
okay <- temperatureOkay(heatedWater)
} yield okay
~~~
如果有多個可以并行執行的計算,則需要特別注意,要先在 for 語句外面創建好對應的 Futures。
~~~
def prepareCappuccinoSequentially(): Future[Cappuccino] =
for {
ground <- grind("arabica beans")
water <- heatWater(Water(25))
foam <- frothMilk("milk")
espresso <- brew(ground, water)
} yield combine(espresso, foam)
~~~
這看起來很漂亮,但要知道,for 語句只不過是 `flatMap` 嵌套調用的語法糖。這意味著,只有當 `Future[GroundCoffee]` 成功完成后, `heatWater` 才會創建 `Future[Water]`。你可以查看函數運行時打印出來的東西來驗證這個說法。
因此,要確保在 for 語句之前實例化所有相互獨立的 Futures:
~~~
def prepareCappuccino(): Future[Cappuccino] = {
val groundCoffee = grind("arabica beans")
val heatedWater = heatWater(Water(20))
val frothedMilk = frothMilk("milk")
for {
ground <- groundCoffee
water <- heatedWater
foam <- frothedMilk
espresso <- brew(ground, water)
} yield combine(espresso, foam)
}
~~~
在 for 語句之前,三個 Future 在創建之后就開始各自獨立的運行,顯示屏的輸出是不確定的。唯一能確定的是 “happy brewing” 總是出現在后面,因為該輸出所在的函數 `brew` 是在其他兩個函數執行完畢后才開始執行的。也因為此,可以在 for 語句里面直接調用它,當然,前提是前面的 Future 都成功完成。
#### 失敗偏向的 Future
你可能會發現 `Future[T]` 是成功偏向的,允許你使用 `map`、`flatMap`、`filter` 等。
但是,有時候可能處理事情出錯的情況。調用 `Future[T]` 上的 `failed` 方法,會得到一個失敗偏向的 Future,類型是 `Future[Throwable]`。之后就可以映射這個 `Future[Throwable]`,在失敗的情況下執行 mapping 函數。
### 總結
你已經見過 Future 了,而且它的前途看起來很光明!因為它是一個可組合、可函數式使用的容器類型,這讓我們的工作變得異常舒服。
調用 `future` 方法可以輕易將阻塞執行的代碼變成并發執行,但是,代碼最好原本就是非阻塞的。為了實現它,我們還需要 `Promise` 來完成 `Future`,這就是下一章的主題。