<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                [TOC=2,2] ## Runnable/Callable Runnable接口只有一個沒有返回值的方法。 ~~~ trait Runnable { def run(): Unit } ~~~ Callable與之類似,除了它有一個返回值 ~~~ trait Callable[V] { def call(): V } ~~~ ## 線程 Scala并發是建立在Java并發模型基礎上的。 在Sun JVM上,對IO密集的任務,我們可以在一臺機器運行成千上萬個線程。 一個線程需要一個Runnable。你必須調用線程的?`start`?方法來運行Runnable。 ~~~ scala> val hello = new Thread(new Runnable { def run() { println("hello world") } }) hello: java.lang.Thread = Thread[Thread-3,5,main] scala> hello.start hello world ~~~ 當你看到一個類實現了Runnable接口,你就知道它的目的是運行在一個線程中。 ### 單線程代碼 這里有一個可以工作但有問題的代碼片斷。 ~~~ import java.net.{Socket, ServerSocket} import java.util.concurrent.{Executors, ExecutorService} import java.util.Date class NetworkService(port: Int, poolSize: Int) extends Runnable { val serverSocket = new ServerSocket(port) def run() { while (true) { // This will block until a connection comes in. val socket = serverSocket.accept() (new Handler(socket)).run() } } } class Handler(socket: Socket) extends Runnable { def message = (Thread.currentThread.getName() + "\n").getBytes def run() { socket.getOutputStream.write(message) socket.getOutputStream.close() } } (new NetworkService(2020, 2)).run ~~~ 每個請求都會回應當前線程的名稱,所以結果始終是?`main`?。 這段代碼的主要缺點是在同一時間,只有一個請求可以被相應! 你可以把每個請求放入一個線程中處理。只要簡單改變 ~~~ (new Handler(socket)).run() ~~~ 為 ~~~ (new Thread(new Handler(socket))).start() ~~~ 但如果你想重用線程或者對線程的行為有其他策略呢? ## Executors 隨著Java 5的發布,它決定提供一個針對線程的更抽象的接口。 你可以通過?`Executors`?對象的靜態方法得到一個?`ExecutorService`?對象。這些方法為你提供了可以通過各種政策配置的?`ExecutorService`?,如線程池。 下面改寫我們之前的阻塞式網絡服務器來允許并發請求。 ~~~ import java.net.{Socket, ServerSocket} import java.util.concurrent.{Executors, ExecutorService} import java.util.Date class NetworkService(port: Int, poolSize: Int) extends Runnable { val serverSocket = new ServerSocket(port) val pool: ExecutorService = Executors.newFixedThreadPool(poolSize) def run() { try { while (true) { // This will block until a connection comes in. val socket = serverSocket.accept() pool.execute(new Handler(socket)) } } finally { pool.shutdown() } } } class Handler(socket: Socket) extends Runnable { def message = (Thread.currentThread.getName() + "\n").getBytes def run() { socket.getOutputStream.write(message) socket.getOutputStream.close() } } (new NetworkService(2020, 2)).run ~~~ 這里有一個連接腳本展示了內部線程是如何重用的。 ~~~ $ nc localhost 2020 pool-1-thread-1 $ nc localhost 2020 pool-1-thread-2 $ nc localhost 2020 pool-1-thread-1 $ nc localhost 2020 pool-1-thread-2 ~~~ ## Futures `Future`?代表異步計算。你可以把你的計算包裝在Future中,當你需要計算結果的時候,你只需調用一個阻塞的?`get()`?方法就可以了。一個?`Executor`?返回一個?`Future`?。如果使用Finagle RPC系統,你可以使用?`Future`?實例持有可能尚未到達的結果。 一個?`FutureTask`?是一個Runnable實現,就是被設計為由?`Executor`?運行的 ~~~ val future = new FutureTask[String](new Callable[String]() { def call(): String = { searcher.search(target); }}) executor.execute(future) ~~~ 現在我需要結果,所以阻塞直到其完成。 ~~~ val blockingResult = future.get() ~~~ **參考**?[Scala School的Finagle介紹](http://twitter.github.io/scala_school/zh_cn/finagle.html)中大量使用了`Future`,包括一些把它們結合起來的不錯的方法。以及 Effective Scala 對[Futures](http://twitter.github.com/effectivescala/#Twitter's standard libraries-Futures)的意見。 ## 線程安全問題 ~~~ class Person(var name: String) { def set(changedName: String) { name = changedName } } ~~~ 這個程序在多線程環境中是不安全的。如果有兩個線程有引用到同一個Person實例,并調用?`set`?,你不能預測兩個調用結束后?`name`?的結果。 在Java內存模型中,允許每個處理器把值緩存在L1或L2緩存中,所以在不同處理器上運行的兩個線程都可以有自己的數據視圖。 讓我們來討論一些工具,來使線程保持一致的數據視圖。 ### 三種工具 #### 同步 互斥鎖(Mutex)提供所有權語義。當你進入一個互斥體,你擁有它。同步是JVM中使用互斥鎖最常見的方式。在這個例子中,我們會同步Person。 在JVM中,你可以同步任何不為null的實例。 ~~~ class Person(var name: String) { def set(changedName: String) { this.synchronized { name = changedName } } } ~~~ #### volatile 隨著Java 5內存模型的變化,volatile和synchronized基本上是相同的,除了volatile允許空值。 `synchronized`?允許更細粒度的鎖。 而?`volatile`?則對每次訪問同步。 ~~~ class Person(@volatile var name: String) { def set(changedName: String) { name = changedName } } ~~~ #### AtomicReference 此外,在Java 5中還添加了一系列低級別的并發原語。?`AtomicReference`?類是其中之一 ~~~ import java.util.concurrent.atomic.AtomicReference class Person(val name: AtomicReference[String]) { def set(changedName: String) { name.set(changedName) } } ~~~ #### 這個成本是什么? `AtomicReference`?是這兩種選擇中最昂貴的,因為你必須去通過方法調度(method dispatch)來訪問值。 `volatile`?和?`synchronized`?是建立在Java的內置監視器基礎上的。如果沒有資源爭用,監視器的成本很小。由于?`synchronized`?允許你進行更細粒度的控制權,從而會有更少的爭奪,所以?`synchronized`?往往是最好的選擇。 當你進入同步點,訪問volatile引用,或去掉AtomicReferences引用時, Java會強制處理器刷新其緩存線從而提供了一致的數據視圖。 如果我錯了,請大家指正。這是一個復雜的課題,我敢肯定要弄清楚這一點需要一個漫長的課堂討論。 ### Java5的其他靈巧的工具 正如前面提到的?`AtomicReference`?,Java5帶來了許多很棒的工具。 #### CountDownLatch `CountDownLatch`?是一個簡單的多線程互相通信的機制。 ~~~ val doneSignal = new CountDownLatch(2) doAsyncWork(1) doAsyncWork(2) doneSignal.await() println("both workers finished!") ~~~ 先不說別的,這是一個優秀的單元測試。比方說,你正在做一些異步工作,并要確保功能完成。你的函數只需要?`倒數計數(countDown)`?并在測試中?`等待(await)`?就可以了。 #### AtomicInteger/Long 由于對Int和Long遞增是一個經常用到的任務,所以增加了?`AtomicInteger`?和?`AtomicLong`?。 #### AtomicBoolean 我可能不需要解釋這是什么。 #### ReadWriteLocks `讀寫鎖(ReadWriteLock)`?使你擁有了讀線程和寫線程的鎖控制。當寫線程獲取鎖的時候讀線程只能等待。 ## 讓我們構建一個不安全的搜索引擎 下面是一個簡單的倒排索引,它不是線程安全的。我們的倒排索引按名字映射到一個給定的用戶。 這里的代碼天真地假設只有單個線程來訪問。 注意使用了?`mutable.HashMap`?替代了默認的構造函數?`this()` ~~~ import scala.collection.mutable case class User(name: String, id: Int) class InvertedIndex(val userMap: mutable.Map[String, User]) { def this() = this(new mutable.HashMap[String, User]) def tokenizeName(name: String): Seq[String] = { name.split(" ").map(_.toLowerCase) } def add(term: String, user: User) { userMap += term -> user } def add(user: User) { tokenizeName(user.name).foreach { term => add(term, user) } } } ~~~ 這里沒有寫如何從索引中獲取用戶。稍后我們會補充。 ## 讓我們把它變為線程安全 在上面的倒排索引例子中,userMap不能保證是線程安全的。多個客戶端可以同時嘗試添加項目,并有可能出現前面?`Person`?例子中的視圖錯誤。 由于userMap不是線程安全的,那我們怎樣保持在同一個時間只有一個線程能改變它呢? 你可能會考慮在做添加操作時鎖定userMap。 ~~~ def add(user: User) { userMap.synchronized { tokenizeName(user.name).foreach { term => add(term, user) } } } ~~~ 不幸的是,這個粒度太粗了。一定要試圖在互斥鎖以外做盡可能多的耗時的工作。還記得我說過如果不存在資源爭奪,鎖開銷就會很小嗎。如果在鎖代碼塊里面做的工作越少,爭奪就會越少。 ~~~ def add(user: User) { // tokenizeName was measured to be the most expensive operation. val tokens = tokenizeName(user.name) tokens.foreach { term => userMap.synchronized { add(term, user) } } } ~~~ ## SynchronizedMap 我們可以通過SynchronizedMap特質將同步混入一個可變的HashMap。 我們可以擴展現有的InvertedIndex,提供給用戶一個簡單的方式來構建同步索引。 ~~~ import scala.collection.mutable.SynchronizedMap class SynchronizedInvertedIndex(userMap: mutable.Map[String, User]) extends InvertedIndex(userMap) { def this() = this(new mutable.HashMap[String, User] with SynchronizedMap[String, User]) } ~~~ 如果你看一下其實現,你就會意識到,它只是在每個方法上加同步鎖來保證其安全性,所以它很可能沒有你希望的性能。 ## Java ConcurrentHashMap Java有一個很好的線程安全的ConcurrentHashMap。值得慶幸的是,我們可以通過JavaConverters獲得不錯的Scala語義。 事實上,我們可以通過擴展老的不安全的代碼,來無縫地接入新的線程安全InvertedIndex。 ~~~ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ class ConcurrentInvertedIndex(userMap: collection.mutable.ConcurrentMap[String, User]) extends InvertedIndex(userMap) { def this() = this(new ConcurrentHashMap[String, User] asScala) } ~~~ ## 讓我們加載InvertedIndex ### 原始方式 ~~~ trait UserMaker { def makeUser(line: String) = line.split(",") match { case Array(name, userid) => User(name, userid.trim().toInt) } } class FileRecordProducer(path: String) extends UserMaker { def run() { Source.fromFile(path, "utf-8").getLines.foreach { line => index.add(makeUser(line)) } } } ~~~ 對于文件中的每一行,我們可以調用?`makeUser`?然后?`add`?到 InvertedIndex中。如果我們使用并發InvertedIndex,我們可以并行調用add因為makeUser沒有副作用,所以我們的代碼已經是線程安全的了。 我們不能并行讀取文件,但我們?*可以*?并行構造用戶并且把它添加到索引中。 ### 一個解決方案:生產者/消費者 異步計算的一個常見模式是把消費者和生產者分開,讓他們只能通過?`隊列(Queue)`?溝通。讓我們看看如何將這個模式應用在我們的搜索引擎索引中。 ~~~ import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue} // Concrete producer class Producer[T](path: String, queue: BlockingQueue[T]) extends Runnable { def run() { Source.fromFile(path, "utf-8").getLines.foreach { line => queue.put(line) } } } // Abstract consumer abstract class Consumer[T](queue: BlockingQueue[T]) extends Runnable { def run() { while (true) { val item = queue.take() consume(item) } } def consume(x: T) } val queue = new LinkedBlockingQueue[String]() // One thread for the producer val producer = new Producer[String]("users.txt", q) new Thread(producer).start() trait UserMaker { def makeUser(line: String) = line.split(",") match { case Array(name, userid) => User(name, userid.trim().toInt) } } class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extends Consumer[String](queue) with UserMaker { def consume(t: String) = index.add(makeUser(t)) } // Let's pretend we have 8 cores on this machine. val cores = 8 val pool = Executors.newFixedThreadPool(cores) // Submit one consumer per core. for (i <- i to cores) { pool.submit(new IndexerConsumer[String](index, q)) } ~~~ Built at?[@twitter](http://twitter.com/twitter)?by?[@stevej](http://twitter.com/stevej),?[@marius](http://twitter.com/marius), and?[@lahosken](http://twitter.com/lahosken)?with much help from?[@evanm](http://twitter.com/evanm),?[@sprsquish](http://twitter.com/sprsquish),?[@kevino](http://twitter.com/kevino),?[@zuercher](http://twitter.com/zuercher),?[@timtrueman](http://twitter.com/timtrueman),?[@wickman](http://twitter.com/wickman), and[@mccv](http://twitter.com/mccv); Russian translation by?[appigram](https://github.com/appigram); Chinese simple translation by?[jasonqu](https://github.com/jasonqu); Korean translation by?[enshahar](https://github.com/enshahar); Licensed under the?[Apache License v2.0](http://www.apache.org/licenses/LICENSE-2.0).
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看