[TOC]
## 共享的可變狀態與并發
協程可用多線程調度器(比如默認的 [Dispatchers.Default](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html))并發執行。這樣就可以提出所有常見的并發問題。主要的問題是同步訪問**共享的可變狀態**。協程領域對這個問題的一些解決方案類似于多線程領域中的解決方案,但其它解決方案則是獨一無二的。
### 問題
我們啟動一百個協程,它們都做一千次相同的操作。我們同時會測量它們的完成時間以便進一步的比較:
```kotlin
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 啟動的協程數量
val k = 1000 // 每個協程重復執行同一動作的次數
val time = measureTimeMillis {
coroutineScope { // 協程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
```
我們從一個非常簡單的動作開始:使用多線程的 [Dispatchers.Default](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html) 來遞增一個共享的可變變量。
```kotlin
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 啟動的協程數量
val k = 1000 // 每個協程重復執行同一動作的次數
val time = measureTimeMillis {
coroutineScope { // 協程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
//sampleEnd
```
> 可以在[這里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-sync-01.kt)獲取完整代碼。
這段代碼最后打印出什么結果?它不太可能打印出“Counter = 100000”,因為一百個協程在多個線程中同時遞增計數器但沒有做并發處理。
### volatile 無濟于事
有一種常見的誤解:volatile 可以解決并發問題。讓我們嘗試一下:
```kotlin
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 啟動的協程數量
val k = 1000 // 每個協程重復執行同一動作的次數
val time = measureTimeMillis {
coroutineScope { // 協程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
@Volatile // 在 Kotlin 中 `volatile` 是一個注解
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
//sampleEnd
```
> 可以在[這里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-sync-02.kt)獲取完整代碼。
這段代碼運行速度更慢了,但我們最后仍然沒有得到“Counter = 100000”這個結果,因為 volatile 變量保證可線性化(這是“原子”的技術術語)讀取和寫入變量,但在大量動作(在我們的示例中即“遞增”操作)發生時并不提供原子性。
### 線程安全的數據結構
一種對線程、協程都有效的常規解決方法,就是使用線程安全(也稱為同步的、可線性化、原子)的數據結構,它為需要在共享狀態上執行的相應操作提供所有必需的同步處理。在簡單的計數器場景中,我們可以使用具有`incrementAndGet` 原子操作的 `AtomicInteger` 類:
```kotlin
import kotlinx.coroutines.*
import java.util.concurrent.atomic.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 啟動的協程數量
val k = 1000 // 每個協程重復執行同一動作的次數
val time = measureTimeMillis {
coroutineScope { // 協程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
var counter = AtomicInteger()
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter.incrementAndGet()
}
}
println("Counter = $counter")
}
//sampleEnd
```
> 可以在[這里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-sync-03.kt)獲取完整代碼。
這是針對此類特定問題的最快解決方案。它適用于普通計數器、集合、隊列和其他標準數據結構以及它們的基本操作。然而,它并不容易被擴展來應對復雜狀態、或一些沒有現成的線程安全實現的復雜操作。
### 以細粒度限制線程
_限制線程_ 是解決共享可變狀態問題的一種方案:對特定共享狀態的所有訪問權都限制在單個線程中。它通常應用于 UI 程序中:所有 UI 狀態都局限于單個事件分發線程或應用主線程中。這在協程中很容易實現,通過使用一個單線程上下文:
```kotlin
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 啟動的協程數量
val k = 1000 // 每個協程重復執行同一動作的次數
val time = measureTimeMillis {
coroutineScope { // 協程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// 將每次自增限制在單線程上下文中
withContext(counterContext) {
counter++
}
}
}
println("Counter = $counter")
}
//sampleEnd
```
> 可以在[這里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-sync-04.kt)獲取完整代碼。
這段代碼運行非常緩慢,因為它進行了 _細粒度_ 的線程限制。每個增量操作都得使用[withContext(counterContext)] 塊從多線程 [Dispatchers.Default](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html) 上下文切換到單線程上下文。
### 以粗粒度限制線程
在實踐中,線程限制是在大段代碼中執行的,例如:狀態更新類業務邏輯中大部分都是限于單線程中。下面的示例演示了這種情況,在單線程上下文中運行每個協程。
```kotlin
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 啟動的協程數量
val k = 1000 // 每個協程重復執行同一動作的次數
val time = measureTimeMillis {
coroutineScope { // 協程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
// 將一切都限制在單線程上下文中
withContext(counterContext) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
//sampleEnd
```
> 可以在[這里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-sync-05.kt)獲取完整代碼。
```
Completed 100000 actions in xxx ms
Counter = 100000
```
這段代碼運行更快而且打印出了正確的結果。
### 互斥
該問題的互斥解決方案:使用永遠不會同時執行的 _關鍵代碼塊_來保護共享狀態的所有修改。在阻塞的世界中,你通常會為此目的使用 `synchronized` 或者 `ReentrantLock`。在協程中的替代品叫做 [Mutex](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html) 。它具有 [lock](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html)和 [unlock](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/unlock.html)方法,可以隔離關鍵的部分。關鍵的區別在于 `Mutex.lock()` 是一個掛起函數,它不會阻塞線程。
還有 [withLock](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/with-lock.html) 擴展函數,可以方便的替代常用的`mutex.lock(); try { …… } finally { mutex.unlock() }` 模式:
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 啟動的協程數量
val k = 1000 // 每個協程重復執行同一動作的次數
val time = measureTimeMillis {
coroutineScope { // 協程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// 用鎖保護每次自增
mutex.withLock {
counter++
}
}
}
println("Counter = $counter")
}
//sampleEnd
```
> 可以在[這里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-sync-06.kt)獲取完整代碼。
```
Completed 100000 actions in xxx ms
Counter = 100000
```
此示例中鎖是細粒度的,因此會付出一些代價。但是對于某些必須定期修改共享狀態的場景,它是一個不錯的選擇,但是沒有自然線程可以限制此狀態。
### Actors
一個 [actor](https://en.wikipedia.org/wiki/Actor_model) 是由協程、被限制并封裝到該協程中的狀態以及一個與其它協程通信的 _通道_ 組合而成的一個實體。一個簡單的 actor 可以簡單的寫成一個函數,但是一個擁有復雜狀態的 actor 更適合由類來表示。
有一個 [actor](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html) 協程構建器,它可以方便地將 actor 的郵箱通道組合到其作用域中(用來接收消息)、組合發送 channel 與結果集對象,這樣對 actor 的單個引用就可以作為其句柄持有。
使用 actor 的第一步是定義一個 actor 要處理的消息類。Kotlin 的[密封類](https://kotlinlang.org/docs/reference/sealed-classes.html)很適合這種場景。我們使用 `IncCounter` 消息(用來遞增計數器)和 `GetCounter` 消息(用來獲取值)來定義 `CounterMsg` 密封類。后者需要發送回復。[CompletableDeferred](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-completable-deferred/index.html) 通信原語表示未來可知(可傳達)的單個值,這里被用于此目的。
```kotlin
// 計數器 Actor 的各種類型
sealed class CounterMsg
object IncCounter : CounterMsg() // 遞增計數器的單向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 攜帶回復的請求
```
接下來我們定義一個函數,使用 [actor](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html) 協程構建器來啟動一個 actor:
```kotlin
// 這個函數啟動一個新的計數器 actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor 狀態
for (msg in channel) { // 即將到來消息的迭代器
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
```
main 函數代碼很簡單:
```kotlin
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 啟動的協程數量
val k = 1000 // 每個協程重復執行同個動作的次數
val time = measureTimeMillis {
coroutineScope { // 協程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
// 計數器 Actor 的各種類型
sealed class CounterMsg
object IncCounter : CounterMsg() // 遞增計數器的單向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 攜帶回復的請求
// 這個函數啟動一個新的計數器 actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor 狀態
for (msg in channel) { // 即將到來消息的迭代器
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
//sampleStart
fun main() = runBlocking<Unit> {
val counter = counterActor() // 創建該 actor
withContext(Dispatchers.Default) {
massiveRun {
counter.send(IncCounter)
}
}
// 發送一條消息以用來從一個 actor 中獲取計數值
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // 關閉該actor
}
//sampleEnd
```
> 可以在[這里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-sync-07.kt)獲取完整代碼。
```
Completed 100000 actions in xxx ms
Counter = 100000
```
actor 本身執行時所處上下文(就正確性而言)無關緊要。一個 actor 是一個協程,而一個協程是按順序執行的,因此將狀態限制到特定協程可以解決共享可變狀態的問題。實際上,actor 可以修改自己的私有狀態,但只能通過消息互相影響(避免任何鎖定)。
actor 在高負載下比鎖更有效,因為在這種情況下它總是有工作要做,而且根本不需要切換到不同的上下文。
> 注意,[actor](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html) 協程構建器是一個雙重的 [produce](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html) 協程構建器。一個 actor 與它接收消息的通道相關聯,而一個 producer 與它發送元素的通道相關聯。
[Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html
[withContext]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/with-context.html
[CompletableDeferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-completable-deferred/index.html
[Mutex]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/index.html
[Mutex.lock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/lock.html
[Mutex.unlock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/-mutex/unlock.html
[withLock]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.sync/with-lock.html
[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/actor.html
[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html
- 前言
- Kotlin簡介
- IntelliJ IDEA技巧總結
- idea設置類注釋和方法注釋模板
- 像Android Studion一樣創建工程
- Gradle
- Gradle入門
- Gradle進階
- 使用Gradle創建一個Kotlin工程
- 環境搭建
- Androidstudio平臺搭建
- Eclipse的Kotlin環境配置
- 使用IntelliJ IDEA
- Kotlin學習路線
- Kotlin官方中文版文檔教程
- 概述
- kotlin用于服務器端開發
- kotlin用于Android開發
- kotlin用于JavaScript開發
- kotlin用于原生開發
- Kotlin 用于數據科學
- 協程
- 多平臺
- 新特性
- 1.1的新特性
- 1.2的新特性
- 1.3的新特性
- 開始
- 基本語法
- 習慣用法
- 編碼規范
- 基礎
- 基本類型
- 包與導入
- 控制流
- 返回與跳轉
- 類與對象
- 類與繼承
- 屬性與字段
- 接口
- 可見性修飾符
- 擴展
- 數據類
- 密封類
- 泛型
- 嵌套類
- 枚舉類
- 對象
- 類型別名
- 內嵌類
- 委托
- 委托屬性
- 函數與Lambda表達式
- 函數
- Lambda表達式
- 內聯函數
- 集合
- 集合概述
- 構造集合
- 迭代器
- 區間與數列
- 序列
- 操作概述
- 轉換
- 過濾
- 加減操作符
- 分組
- 取集合的一部分
- 取單個元素
- 排序
- 聚合操作
- 集合寫操作
- List相關操作
- Set相關操作
- Map相關操作
- 多平臺程序設計
- 平臺相關聲明
- 以Gradle創建
- 更多語言結構
- 解構聲明
- 類型檢測與轉換
- This表達式
- 相等性
- 操作符重載
- 空安全
- 異常
- 注解
- 反射
- 作用域函數
- 類型安全的構造器
- Opt-in Requirements
- 核心庫
- 標準庫
- kotlin.test
- 參考
- 關鍵字與操作符
- 語法
- 編碼風格約定
- Java互操作
- Kotlin中調用Java
- Java中調用Kotlin
- JavaScript
- 動態類型
- kotlin中調用JavaScript
- JavaScript中調用kotlin
- JavaScript模塊
- JavaScript反射
- JavaScript DCE
- 原生
- 并發
- 不可變性
- kotlin庫
- 平臺庫
- 與C語言互操作
- 與Object-C及Swift互操作
- CocoaPods集成
- Gradle插件
- 調試
- FAQ
- 協程
- 協程指南
- 基礎
- 取消與超時
- 組合掛起函數
- 協程上下文與調度器
- 異步流
- 通道
- 異常處理與監督
- 共享的可變狀態與并發
- Select表達式(實驗性)
- 工具
- 編寫kotlin代碼文檔
- 使用Kapt
- 使用Gradle
- 使用Maven
- 使用Ant
- Kotlin與OSGI
- 編譯器插件
- 編碼規范
- 演進
- kotlin語言演進
- 不同組件的穩定性
- kotlin1.3的兼容性指南
- 常見問題
- FAQ
- 與Java比較
- 與Scala比較(官方已刪除)
- Google開發者官網簡介
- Kotlin and Android
- Get Started with Kotlin on Android
- Kotlin on Android FAQ
- Android KTX
- Resources to Learn Kotlin
- Kotlin樣品
- Kotlin零基礎到進階
- 第一階段興趣入門
- kotlin簡介和學習方法
- 數據類型和類型系統
- 入門
- 分類
- val和var
- 二進制基礎
- 基礎
- 基本語法
- 包
- 示例
- 編碼規范
- 代碼注釋
- 異常
- 根類型“Any”
- Any? 可空類型
- 可空性的實現原理
- kotlin.Unit類型
- kotlin.Nothing類型
- 基本數據類型
- 數值類型
- 布爾類型
- 字符型
- 位運算符
- 變量和常量
- 語法和運算符
- 關鍵字
- 硬關鍵字
- 軟關鍵字
- 修飾符關鍵字
- 特殊標識符
- 操作符和特殊符號
- 算術運算符
- 賦值運算符
- 比較運算符
- 邏輯運算符
- this關鍵字
- super關鍵字
- 操作符重載
- 一元操作符
- 二元操作符
- 字符串
- 字符串介紹和屬性
- 字符串常見方法操作
- 字符串模板
- 數組
- 數組介紹創建及遍歷
- 數組常見方法和屬性
- 數組變化以及下標越界問題
- 原生數組類型
- 區間
- 正向區間
- 逆向區間
- 步長
- 類型檢測與類型轉換
- is、!is、as、as-運算符
- 空安全
- 可空類型變量
- 安全調用符
- 非空斷言
- Elvis操作符
- 可空性深入
- 可空性和Java
- 函數
- 函數式編程概述
- OOP和FOP
- 函數式編程基本特性
- 組合與范疇
- 在Kotlin中使用函數式編程
- 函數入門
- 函數作用域
- 函數加強
- 命名參數
- 默認參數
- 可變參數
- 表達式函數體
- 頂層、嵌套、中綴函數
- 尾遞歸函數優化
- 函數重載
- 控制流
- if表達式
- when表達式
- for循環
- while循環
- 循環中的 Break 與 continue
- return返回
- 標簽處返回
- 集合
- list集合
- list集合介紹和操作
- list常見方法和屬性
- list集合變化和下標越界
- set集合
- set集合介紹和常見操作
- set集合常見方法和屬性
- set集合變換和下標越界
- map集合
- map集合介紹和常見操作
- map集合常見方法和屬性
- map集合變換
- 集合的函數式API
- map函數
- filter函數
- “ all ”“ any ”“ count ”和“ find ”:對集合應用判斷式
- 別樣的求和方式:sumBy、sum、fold、reduce
- 根據人的性別進行分組:groupBy
- 扁平化——處理嵌套集合:flatMap、flatten
- 惰性集合操作:序列
- 區間、數組、集合之間轉換
- 面向對象
- 面向對象-封裝
- 類的創建及屬性方法訪問
- 類屬性和字段
- 構造器
- 嵌套類(內部類)
- 枚舉類
- 枚舉類遍歷&枚舉常量常用屬性
- 數據類
- 密封類
- 印章類(密封類)
- 面向對象-繼承
- 類的繼承
- 面向對象-多態
- 抽象類
- 接口
- 接口和抽象類的區別
- 面向對象-深入
- 擴展
- 擴展:為別的類添加方法、屬性
- Android中的擴展應用
- 優化Snackbar
- 用擴展函數封裝Utils
- 解決煩人的findViewById
- 擴展不是萬能的
- 調度方式對擴展函數的影響
- 被濫用的擴展函數
- 委托
- 委托類
- 委托屬性
- Kotlin5大內置委托
- Kotlin-Object關鍵字
- 單例模式
- 匿名類對象
- 伴生對象
- 作用域函數
- let函數
- run函數
- with函數
- apply函數
- also函數
- 標準庫函數
- takeIf 與 takeUnless
- 第二階段重點深入
- Lambda編程
- Lambda成員引用高階函數
- 高階函數
- 內聯函數
- 泛型
- 泛型的分類
- 泛型約束
- 子類和子類型
- 協變與逆變
- 泛型擦除與實化類型
- 泛型類型參數
- 泛型的背后:類型擦除
- Java為什么無法聲明一個泛型數組
- 向后兼容的罪
- 類型擦除的矛盾
- 使用內聯函數獲取泛型
- 打破泛型不變
- 一個支持協變的List
- 一個支持逆變的Comparator
- 協變和逆變
- 第三階段難點突破
- 注解和反射
- 聲明并應用注解
- DSL
- 協程
- 協程簡介
- 協程的基本操作
- 協程取消
- 管道
- 慕課霍丙乾協程筆記
- Kotlin與Java互操作
- 在Kotlin中調用Java
- 在Java中調用Kotlin
- Kotlin與Java中的操作對比
- 第四階段專題練習
- 朱凱Kotlin知識點總結
- Kotlin 基礎
- Kotlin 的變量、函數和類型
- Kotlin 里那些「不是那么寫的」
- Kotlin 里那些「更方便的」
- Kotlin 進階
- Kotlin 的泛型
- Kotlin 的高階函數、匿名函數和 Lambda 表達式
- Kotlin協程
- 初識
- 進階
- 深入
- Kotlin 擴展
- 會寫「18.dp」只是個入門——Kotlin 的擴展函數和擴展屬性(Extension Functions / Properties)
- Kotlin實戰-開發Android