<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                [TOC] ## 通道 延期的值提供了一種便捷的方法使單個值在多個協程之間進行相互傳輸。通道提供了一種在流中傳輸值的方法。 ### 通道基礎 一個 [Channel](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html) 是一個和 `BlockingQueue` 非常相似的概念。其中一個不同是它代替了阻塞的 `put` 操作并提供了掛起的 [send](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html),還替代了阻塞的 `take` 操作并提供了掛起的 [receive](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html)。 ```kotlin import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking { //sampleStart val channel = Channel<Int>() launch { // 這里可能是消耗大量 CPU 運算的異步邏輯,我們將僅僅做 5 次整數的平方并發送 for (x in 1..5) channel.send(x * x) } // 這里我們打印了 5 次被接收的整數: repeat(5) { println(channel.receive()) } println("Done!") //sampleEnd } ``` > 可以在[這里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt)獲取完整代碼。 這段代碼的輸出如下: ```text 1 4 9 16 25 Done! ``` ### 關閉與迭代通道 和隊列不同,一個通道可以通過被關閉來表明沒有更多的元素將會進入通道。在接收者中可以定期的使用 `for` 循環來從通道中接收元素。 從概念上來說,一個 [close](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html)操作就像向通道發送了一個特殊的關閉指令。這個迭代停止就說明關閉指令已經被接收了。所以這里保證所有先前發送出去的元素都在通道關閉前被接收到。 ```kotlin import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking { //sampleStart val channel = Channel<Int>() launch { for (x in 1..5) channel.send(x * x) channel.close() // 我們結束發送 } // 這里我們使用 `for` 循環來打印所有被接收到的元素(直到通道被關閉) for (y in channel) println(y) println("Done!") //sampleEnd } ``` > 可以在[這里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-channel-02.kt)獲取完整代碼。 ``` 1 4 9 16 25 ``` ### 構建通道生產者 協程生成一系列元素的模式很常見。這是 _生產者——消費者_ 模式的一部分,并且經常能在并發的代碼中看到它。 你可以將生產者抽象成一個函數,并且使通道作為它的參數,但這與必須從函數中返回結果的常識相違悖。 這里有一個名為 [produce](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html) 的便捷的協程構建器,可以很容易的在生產者端正確工作,并且我們使用擴展函數 [consumeEach](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html) 在消費者端替代 `for` 循環: ```kotlin import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce { for (x in 1..5) send(x * x) } fun main() = runBlocking { //sampleStart val squares = produceSquares() squares.consumeEach { println(it) } println("Done!") //sampleEnd } ``` > 可以在[這里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-channel-03.kt)獲取完整代碼。 ``` 1 4 9 16 25 Done! ``` ### 管道 管道是一種一個協程在流中開始生產可能無窮多個元素的模式: ```kotlin fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1 while (true) send(x++) // 在流中開始從 1 生產無窮多個整數 } ``` 并且另一個或多個協程開始消費這些流,做一些操作,并生產了一些額外的結果。在下面的例子中,對這些數字僅僅做了平方操作: ```kotlin fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { for (x in numbers) send(x * x) } ``` 主要的代碼啟動并連接了整個管道: ```kotlin import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking { //sampleStart val numbers = produceNumbers() // 從 1 開始生產整數 val squares = square(numbers) // 對整數做平方 for (i in 1..5) println(squares.receive()) // 打印前 5 個數字 println("Done!") // 我們的操作已經結束了 coroutineContext.cancelChildren() // 取消子協程 //sampleEnd } fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1 while (true) send(x++) // 從 1 開始的無限的整數流 } fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { for (x in numbers) send(x * x) } ``` > 可以在[這里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-channel-04.kt)獲取完整代碼。 ``` 1 4 9 16 25 Done! ``` > 所有創建了協程的函數被定義在了 [CoroutineScope] 的擴展上,所以我們可以依靠[結構化并發](https://kotlinlang.org/docs/reference/coroutines/composing-suspending-functions.html#structured-concurrency-with-async)來確保沒有常駐在我們的應用程序中的全局協程。 ### 使用管道的素數 讓我們來展示一個極端的例子——在協程中使用一個管道來生成素數。我們開啟了一個數字的無限序列。 ```kotlin fun CoroutineScope.numbersFrom(start: Int) = produce<Int> { var x = start while (true) send(x++) // 開啟了一個無限的整數流 } ``` 在下面的管道階段中過濾了來源于流中的數字,刪除了所有可以被給定素數整除的數字。 ```kotlin fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> { for (x in numbers) if (x % prime != 0) send(x) } ``` 現在我們開啟了一個從 2 開始的數字流管道,從當前的通道中取一個素數,并為每一個我們發現的素數啟動一個流水線階段: ``` numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) …… ``` 下面的例子打印了前十個素數,在主線程的上下文中運行整個管道。直到所有的協程在該主協程 [runBlocking](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html) 的作用域中被啟動完成。我們不必使用一個顯式的列表來保存所有被我們已經啟動的協程。 我們使用 [cancelChildren](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/kotlin.coroutines.-coroutine-context/cancel-children.html)擴展函數在我們打印了前十個素數以后來取消所有的子協程。 ```kotlin import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking { //sampleStart var cur = numbersFrom(2) for (i in 1..10) { val prime = cur.receive() println(prime) cur = filter(cur, prime) } coroutineContext.cancelChildren() // 取消所有的子協程來讓主協程結束 //sampleEnd } fun CoroutineScope.numbersFrom(start: Int) = produce<Int> { var x = start while (true) send(x++) // 從 start 開始過濾整數流 } fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> { for (x in numbers) if (x % prime != 0) send(x) } ``` > 可以在[這里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-channel-05.kt)獲取完整代碼。 這段代碼的輸出如下: ```text 2 3 5 7 11 13 17 19 23 29 ``` 注意,你可以在標準庫中使用[`iterator`](https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/iterator.html) 協程構建器來構建一個相似的管道。使用 `iterator` 替換 `produce`、`yield` 替換 `send`、`next` 替換 `receive`、 `Iterator` 替換 `ReceiveChannel` 來擺脫協程作用域,你將不再需要 `runBlocking`。然而,如上所示,如果你在 [Dispatchers.Default](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html) 上下文中運行它,使用通道的管道的好處在于它可以充分利用多核心 CPU。 不過,這是一種非常不切實際的尋找素數的方法。在實踐中,管道調用了另外的一些掛起中的調用(就像異步調用遠程服務)并且這些管道不能內置使用 `sequence`/`iterator`,因為它們不被允許隨意的掛起,不像`produce` 是完全異步的。 ### 扇出 多個協程也許會接收相同的管道,在它們之間進行分布式工作。讓我們啟動一個定期產生整數的生產者協程 (每秒十個數字): ```kotlin fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1 // 從 1 開始 while (true) { send(x++) // 產生下一個數字 delay(100) // 等待 0.1 秒 } } ``` 接下來我們可以得到幾個生產者協程。在這個示例中,它們只是打印它們的 id 和接收到的數字: ```kotlin fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { for (msg in channel) { println("Processor #$id received $msg") } } ``` 現在讓我們啟動五個生產者協程并讓它們工作將近一秒。看看發生了什么: ```kotlin import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking<Unit> { //sampleStart val producer = produceNumbers() repeat(5) { launchProcessor(it, producer) } delay(950) producer.cancel() // 取消協程生產者從而將它們全部殺死 //sampleEnd } fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1 // start from 1 while (true) { send(x++) // 產生下一個數字 delay(100) // 等待 0.1 秒 } } fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { for (msg in channel) { println("Processor #$id received $msg") } } ``` > 可以在[這里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-channel-06.kt)獲取完整代碼。 該輸出將類似于如下所示,盡管接收的是生產者的 id但每個整數也許會不同: ``` Processor #2 received 1 Processor #4 received 2 Processor #0 received 3 Processor #1 received 4 Processor #3 received 5 Processor #2 received 6 Processor #4 received 7 Processor #0 received 8 Processor #1 received 9 Processor #3 received 10 ``` 注意,取消生產者協程并關閉它的通道,因此通過正在執行的生產者協程通道來終止迭代。 還有,注意我們如何使用 `for` 循環顯式迭代通道以在 `launchProcessor` 代碼中執行扇出。與 `consumeEach` 不同,這個 `for` 循環是安全完美地使用多個協程的。如果其中一個生產者協程執行失敗,其它的生產者協程仍然會繼續處理通道,而通過 `consumeEach`編寫的生產者始終在正常或非正常完成時消耗(取消)底層通道。 ### 扇入 多個協程可以發送到同一個通道。比如說,讓我們創建一個字符串的通道,和一個在這個通道中以指定的延遲反復發送一個指定字符串的掛起函數: ```kotlin suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { while (true) { delay(time) channel.send(s) } } ``` 現在,我們啟動了幾個發送字符串的協程,讓我們看看會發生什么(在示例中,我們在主線程的上下文中作為主協程的子協程來啟動它們): ```kotlin import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking { //sampleStart val channel = Channel<String>() launch { sendString(channel, "foo", 200L) } launch { sendString(channel, "BAR!", 500L) } repeat(6) { // 接收前六個 println(channel.receive()) } coroutineContext.cancelChildren() // 取消所有子協程來讓主協程結束 //sampleEnd } suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { while (true) { delay(time) channel.send(s) } } ``` > 可以在[這里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-channel-07.kt)獲取完整代碼。 輸出如下: ```text foo foo BAR! foo foo BAR! ``` ### 帶緩沖的通道 到目前為止展示的通道都是沒有緩沖區的。無緩沖的通道在發送者和接收者相遇時傳輸元素(aka rendezvous(這句話應該是個俚語,意思好像是“又是約會”的意思,不知道怎么翻))。如果發送先被調用,則它將被掛起直到接收被調用,如果接收先被調用,它將被掛起直到發送被調用。 [Channel()](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel.html) 工廠函數與 [produce](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html) 建造器通過一個可選的參數 `capacity`來指定 _緩沖區大小_ 。緩沖允許發送者在被掛起前發送多個元素,就像 `BlockingQueue` 有指定的容量一樣,當緩沖區被占滿的時候將會引起阻塞。 看看如下代碼的表現: ```kotlin import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking<Unit> { //sampleStart val channel = Channel<Int>(4) // 啟動帶緩沖的通道 val sender = launch { // 啟動發送者協程 repeat(10) { println("Sending $it") // 在每一個元素發送前打印它們 channel.send(it) // 將在緩沖區被占滿時掛起 } } // 沒有接收到東西……只是等待…… delay(1000) sender.cancel() // 取消發送者協程 //sampleEnd } ``` > 可以在[這里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-channel-08.kt)獲取完整代碼。 使用緩沖通道并給 capacity 參數傳入 _四_ 它將打印“sending” _五_ 次: ```text Sending 0 Sending 1 Sending 2 Sending 3 Sending 4 ``` 前四個元素被加入到了緩沖區并且發送者在試圖發送第五個元素的時候被掛起。 ### 通道是公平的 發送和接收操作是 _公平的_ 并且尊重調用它們的多個協程。它們遵守先進先出原則,可以看到第一個協程調用 `receive`并得到了元素。在下面的例子中兩個協程“乒”和“乓”都從共享的“桌子”通道接收到這個“球”元素。 ```kotlin import kotlinx.coroutines.* import kotlinx.coroutines.channels.* //sampleStart data class Ball(var hits: Int) fun main() = runBlocking { val table = Channel<Ball>() // 一個共享的 table(桌子) launch { player("ping", table) } launch { player("pong", table) } table.send(Ball(0)) // 乒乓球 delay(1000) // 延遲 1 秒鐘 coroutineContext.cancelChildren() // 游戲結束,取消它們 } suspend fun player(name: String, table: Channel<Ball>) { for (ball in table) { // 在循環中接收球 ball.hits++ println("$name $ball") delay(300) // 等待一段時間 table.send(ball) // 將球發送回去 } } //sampleEnd ``` > 可以在[這里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-channel-09.kt)得到完整代碼 “乒”協程首先被啟動,所以它首先接收到了球。甚至雖然“乒”協程在將球發送會桌子以后立即開始接收,但是球還是被“乓”協程接收了,因為它一直在等待著接收球: ```text ping Ball(hits=1) pong Ball(hits=2) ping Ball(hits=3) pong Ball(hits=4) ``` 注意,有時候通道執行時由于執行者的性質而看起來不那么公平。點擊[這個提案](https://github.com/Kotlin/kotlinx.coroutines/issues/111)來查看更多細節。 ### 計時器通道 計時器通道是一種特別的會合通道,每次經過特定的延遲都會從該通道進行消費并產生 `Unit`。雖然它看起來似乎沒用,它被用來構建分段來創建復雜的基于時間的 [produce](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html)管道和進行窗口化操作以及其它時間相關的處理。可以在 [select](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html) 中使用計時器通道來進行“打勾”操作。 使用工廠方法 [ticker](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/ticker.html) 來創建這些通道。為了表明不需要其它元素,請使用 [ReceiveChannel.cancel](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html) 方法。 現在讓我們看看它是如何在實踐中工作的: ```kotlin import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking<Unit> { val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) //創建計時器通道 var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Initial element is available immediately: $nextElement") // 初始尚未經過的延遲 nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // 所有隨后到來的元素都經過了 100 毫秒的延遲 println("Next element is not ready in 50 ms: $nextElement") nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } println("Next element is ready in 100 ms: $nextElement") // 模擬大量消費延遲 println("Consumer pauses for 150ms") delay(150) // 下一個元素立即可用 nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Next element is available immediately after large consumer delay: $nextElement") // 請注意,`receive` 調用之間的暫停被考慮在內,下一個元素的到達速度更快 nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement") tickerChannel.cancel() // 表明不再需要更多的元素 } ``` > 可以在[這里](https://github.com/hltj/kotlinx.coroutines-cn/blob/master/kotlinx-coroutines-core/jvm/test/guide/example-channel-10.kt)獲取完整代碼。 它的打印如下: ```text Initial element is available immediately: kotlin.Unit Next element is not ready in 50 ms: null Next element is ready in 100 ms: kotlin.Unit Consumer pauses for 150ms Next element is available immediately after large consumer delay: kotlin.Unit Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit ``` 請注意,[ticker](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/ticker.html) 知道可能的消費者暫停,并且默認情況下會調整下一個生成的元素如果發生暫停則延遲,試圖保持固定的生成元素率。 給可選的 `mode` 參數傳入 [TickerMode.FIXED_DELAY](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-ticker-mode/-f-i-x-e-d_-d-e-l-a-y.html) 可以保持固定元素之間的延遲。 [CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-scope/index.html [runBlocking]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/run-blocking.html [kotlin.coroutines.CoroutineContext.cancelChildren]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/kotlin.coroutines.-coroutine-context/cancel-children.html [Dispatchers.Default]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-default.html <!--- INDEX kotlinx.coroutines.channels --> [Channel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html [SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html [ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html [SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/close.html [produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/produce.html [consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html [Channel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel.html [ticker]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/ticker.html [ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html [TickerMode.FIXED_DELAY]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-ticker-mode/-f-i-x-e-d_-d-e-l-a-y.html [select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看