<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                Twitter最重要的標準庫是?[Util](http://github.com/twitter/util)?和?[Finagle](https://github.com/twitter/finagle)。Util 可以理解為Scala和Java的標準庫擴展,提供了標準庫中沒有的功能或已有功能的更合適的實現。Finagle 是我們的RPC系統,核心分布式系統組件。 ### Future Futures已經在并發一節中簡單討論過。它是調異步處理的中心機制,滲透在我們代碼庫中,也是Finagle的核心。Futures允許組合并發事件,簡化了高并發操作。也是JVM上異步并發的一種高效的實現。 Twitter的future是*異步*的,所以基本上任何操作(阻塞操作)——基本上任何可以suspend它的線程的執行;網絡IO和磁盤IO是就是例子——必須由系統處理,它為結果提供future。Finagle為網絡IO提供了這樣一種系統。 Futures清晰簡單:它們持有一個尚未完成運算結果的 promise 。它們是一個簡單的容器——一個占位符。一次計算當然可能會失敗,這種狀況必須被編碼:一個Future可以是三種狀態之一: pending, failed, completed。 ### 閑話:?*組合(composition)* 讓我們重新審視我們所說的組合:將簡單的組件合成一個更復雜的。函數組合的一個權威的例子:給定函數 f 和 g,組合函數 (g°f)(x) = g(f(x)) ——結果先對 x使用f函數,然后在使用g函數——用Scala來寫: ~~~ val f = (i: Int) => i.toString val g = (s: String) => s+s+s val h = g compose f // : Int => String scala> h(123) res0: java.lang.String = 123123123 ~~~ 復合函數h,是個新的函數,由之前定義的f和g函數合成。 Futures是一種集合類型——它是個包含0或1個元素的容器——你可以發現他們有標準的集合方法(eg:map, filter, foreach)。因為Future的值是延遲的,結果應用這些方法中的任何一種必然也延遲;在 ~~~ val result: Future[Int] val resultStr: Future[String] = result map { i => i.toString } ~~~ 函數 { i => i.toString } 不會被調用,直到int值可用;轉換集合的resultStr在可用之前也一直是待定狀態。 List可以被扁平化(flattened): ~~~ val listOfList: List[List[Int]] = .. val list: List[Int] = listOfList.flatten ~~~ 這對future也是有意義的: ~~~ val futureOfFuture: Future[Future[Int]] = .. val future: Future[Int] = futureOfFuture.flatten ~~~ 因為future是延遲的,flatten的實現——立即返回——不得不返回一個等待外部future (`**Future[**Future[Int]**]**`) 完成的future (`Future[**Future[Int]**]`).如果外部future失敗,內部flattened future也將失敗。 Future (類似List) 也定義了flatMap;Future[A] 定義方法flatMap的簽名 ~~~ flatMap[B](f: A => Future[B]): Future[B] ~~~ 如同組合 map 和 flatten,我們可以這樣實現: ~~~ def flatMap[B](f: A => Future[B]): Future[B] = { val mapped: Future[Future[B]] = this map f val flattened: Future[B] = mapped.flatten flattened } ~~~ 這是一種有威力的組合!使用flatMap我們可以定義一個 Future 作為兩個Future序列的結果。第二個future 的計算基于第一個的結果。想象我們需要2次RPC調用來驗證一個用戶身份,我們可以用下面的方式組合操作: ~~~ def getUser(id: Int): Future[User] def authenticate(user: User): Future[Boolean] def isIdAuthed(id: Int): Future[Boolean] = getUser(id) flatMap { user => authenticate(user) } ~~~ 這種組合類型的一個額外的好處是錯誤處理是內置的:如果getUser(..)或authenticate(..)失敗,future 從 isAuthred(..)返回時將會失敗。這里我們沒有額外的錯誤處理的代碼。 #### 風格 Future回調方法(respond, onSuccess, onFailure, ensure) 返回一個新的Future,并鏈接到調用者。這個Future被保證只有在它調用者完成后才完成,使用模式如下: ~~~ acquireResource() future onSuccess { value => computeSomething(value) } ensure { freeResource() } ~~~ freeResource() 被保證只有在 computeSomething之后才執行,這樣就模擬了try-finally 模式。 使用 onSuccess替代 foreach —— 它與 onFailure 方法對稱,命名的意圖更明確,并且也允許 chaining。 永遠避免直接創建Promise實例: 幾乎每一個任務都可以通過使用預定義的組合子完成。這些組合子確保錯誤和取消是可傳播的, 通常鼓勵的數據流風格的編程,不再需要同步和volatility聲明。 用尾遞歸風格編寫的代碼不再導致堆棧空間泄漏,并使得以數據流風格高效的實現循環成為可能: ~~~ case class Node(parent: Option[Node], ...) def getNode(id: Int): Future[Node] = ... def getHierarchy(id: Int, nodes: List[Node] = Nil): Future[Node] = getNode(id) flatMap { case n@Node(Some(parent), ..) => getHierarchy(parent, n :: nodes) case n => Future.value((n :: nodes).reverse) } ~~~ Future定義很多有用的方法: 使用 Future.value() 和 Future.exception() 來創建未滿意(pre-satisfied) 的future。Future.collect(), Future.join() 和 Future.select() 提供了組合子將多個future合成一個(例如:scatter-gather操作的gather部分)。 #### Cancellation Future實現了一種弱形式的取消。調用Future#cancel 不會直接終止運算,而是發送某個級別的可被任何處理查詢的觸發信號,最終滿足這個future。Cancellation信號流向相反的方向:一個由消費者設置的cancellation信號,會傳播到它的生產者。生產者使用 Promise的onCancellation來監聽信號并執行相應的動作。 這意味這cancellation語意上依賴生產者,沒有默認的實現。cancellation只是一個提示。 #### Local Util的[Local](https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Local.scala#L40)提供了一個位于特定的future派發樹(dispatch tree)的引用單元(cell)。設定一個local的值,使這個值可以用于被同一個線程的Future 延遲的任何計算。有一些類似于thread locals(注:Java中的線程機制),不同的是它們的范圍不是一個Java線程,而是一個 future 線程樹。在 ~~~ trait User { def name: String def incrCost(points: Int) } val user = new Local[User] ... user() = currentUser rpc() ensure { user().incrCost(10) } ~~~ 在 ensure塊中的 user() 將在回調被添加的時候引用 user local的值。 就thread locals來說,我們的Locals非常的方便,但要盡量避免使用:除非確信通過顯式傳遞數據時問題不能被充分的解決,哪怕解決起來有些繁重。 Locals有效的被核心庫使用在非常常見的問題上——線程通過RPC跟蹤,傳播監視器,為future的回調創建stack traces——任何其他解決方法都使得用戶負擔過度。Locals在幾乎任何其他情況下都不適合。 ### Offer/Broker 并發系統由于需要協調訪問數據和資源而變得復雜。[Actor](http://www.scala-lang.org/api/current/scala/actors/Actor.html)提出一種簡化的策略:每一個actor是一個順序的進程(process),保持自己的狀態和資源,數據通過消息的方式與其它actor共享。 共享數據需要actor之間通信。 Offer/Broker 建立于Actor之上,以這三種重要的方式表現:1,通信通道(Brokers)是first class——即發送消息需要通過Brokers,而非直接到actor。2, Offer/Broker 是一種同步機制:通信會話是同步的。 這意味我們可以用 Broker做為協調機制:當進程a發送一條信息給進程b;a和b都要對系統狀態達成一致。3, 最后,通信可以選擇性地執行:一個進程可以提出幾個不同的通信,其中的一個將被獲取。 為了以一種通用的方式支持選擇性通信(以及其他組合),我們需要將通信的描述和執行解耦。這正是Offer做的——它是一個持久數據用于描述一次通信;為了執行這個通信(offer執行),我們通過它的sync()方法同步 ~~~ trait Offer[T] { def sync(): Future[T] } ~~~ 返回 Future[T] 當通信被獲取的時候生成交換值。 Broker通過offer協調值的交換——它是通信的通道: ~~~ trait Broker[T] { def send(msg: T): Offer[Unit] val recv: Offer[T] } ~~~ 所以,當創建兩個offer ~~~ val b: Broker[Int] val sendOf = b.send(1) val recvOf = b.recv ~~~ sendOf和recvOf都同步 ~~~ // In process 1: sendOf.sync() // In process 2: recvOf.sync() ~~~ 兩個offer都獲取并且值1被交換。 通過將多個offer和Offer.choose綁定來執行可選擇通信。 ~~~ def choose[T](ofs: Offer[T]*): Offer[T] ~~~ 上面的代碼生成一個新的offer,當同步時獲取一個特定的ofs——第一個可用的。當多個都立即可用時,隨機獲取一個。 Offer對象有些一次性的Offers用于與來自Broker的Offer構建。 ~~~ Offer.timeout(duration): Offer[Unit] ~~~ offer在給定時間后激活。Offer.never將用于不會有效,Offer.const(value)在給定值后立即有效。這些操作由選擇性通信來組合是非常有用的。例如,在一個send操作中使用超時: ~~~ Offer.choose( Offer.timeout(10.seconds), broker.send("my value") ).sync() ~~~ 人們可能會比較 Offer/Broker 與[SynchronousQueue](http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/SynchronousQueue.html),他們有細微但非常重要的區別。Offer可以被組合,而queue不能。例如,考慮一組queues,描述為 Brokers: ~~~ val q0 = new Broker[Int] val q1 = new Broker[Int] val q2 = new Broker[Int] ~~~ 現在讓我們為讀取創建一個合并的queue ~~~ val anyq: Offer[Int] = Offer.choose(q0.recv, q1.recv, q2.recv) ~~~ anyq是一個將從第一個可用的queue中讀取的offer。注意 anyq 仍是同步的——我們仍然擁有底層隊列的語義。這類組合是不可能用queue實現的。 #### 例子:一個簡單的連接池 連接池在網絡應用中很常見,并且它們的實現常常需要技巧——例如,在從池中獲取一個連接的時候,通常需要超時機制,因為不同的客戶端有不同的延遲需求。池的簡單原則:維護一個連接隊列,滿足那些進入的等待者。使用傳統的同步原語,這通常需要兩個隊列(queues):一個用于等待者(當沒有連接可用時),一個用于連接(當沒有等待者時)。 使用 Offer/Brokers ,可以表達得非常自然: ~~~ class Pool(conns: Seq[Conn]) { private[this] val waiters = new Broker[Conn] private[this] val returnConn = new Broker[Conn] val get: Offer[Conn] = waiters.recv def put(c: Conn) { returnConn ! c } private[this] def loop(connq: Queue[Conn]) { Offer.choose( if (connq.isEmpty) Offer.never else { val (head, rest) = connq.dequeue waiters.send(head) { _ => loop(rest) } }, returnConn.recv { c => loop(connq enqueue c) } ).sync() } loop(Queue.empty ++ conns) } ~~~ loop總是提供一個歸還的連接,但只有queue非空的時候才會send。 使用持久化隊列(persistent queue)更進一步簡化邏輯。與連接池的接口也是通過Offer實現,所以調用者如果愿意設置timeout,他們可以通過利用組合子(combinators)來做: ~~~ val conn: Future[Option[Conn]] = Offer.choose( pool.get { conn => Some(conn) }, Offer.timeout(1.second) { _ => None } ).sync() ~~~ 實現timeout不需要額外的記賬(bookkeeping);這是因為Offer的語義:如果Offer.timeout被選擇,不會再有offer從池中獲得——連接池和它的調用者在各自waiter的broker上不必同時同意接受和發送。 #### 埃拉托色尼篩子(Sieve of Eratosthenes 譯注:一種用于篩選素數的算法) 把并發程序構造為一組順序的同步通信進程,通常很有用——有時程序被大大地簡化了。Offer和Broker提供了一組工具來讓它簡單并一致。確實,它們的應用超越了我們可能認為是經典并發性問題——并發編程(有Offer/Broker的輔助)是一種有用的構建工具,正如子例程(subroutines),類,和模塊都是——來自CSP(譯注:Communicating sequential processes的縮寫,即通信順序進程)的重要思想。 這里有一個[埃拉托色尼篩子](http://ja.wikipedia.org/wiki/%E3%82%A8%E3%83%A9%E3%83%88%E3%82%B9%E3%83%86%E3%83%8D%E3%82%B9%E3%81%AE%E7%AF%A9)可以構造為一個針對一個整數流(stream of integers)的連續的應用過濾器 。首先,我們需要一個整數的源(source of integers): ~~~ def integers(from: Int): Offer[Int] = { val b = new Broker[Int] def gen(n: Int): Unit = b.send(n).sync() ensure gen(n + 1) gen(from) b.recv } ~~~ integers(n) 方法簡單地提供了從n開始的所有連續的整數。然后我們需要一個過濾器: ~~~ def filter(in: Offer[Int], prime: Int): Offer[Int] = { val b = new Broker[Int] def loop() { in.sync() onSuccess { i => if (i % prime != 0) b.send(i).sync() ensure loop() else loop() } } loop() b.recv } ~~~ filter(in, p) 方法返回的offer刪除了in中的所有質數(prime)的倍數。最終我們定義了我們的篩子(sieve): ~~~ def sieve = { val b = new Broker[Int] def loop(of: Offer[Int]) { for (prime <- of.sync(); _ <- b.send(prime).sync()) loop(filter(of, prime)) } loop(integers(2)) b.recv } ~~~ loop() 工作很簡單:從of中讀取下一個質數,然后對of應用過濾器排除這個質數。loop不斷的遞歸,持續的質數被過濾,于是我們得到了篩選結果。我們現在打印前10000個質數: ~~~ val primes = sieve 0 until 10000 foreach { _ => println(primes.sync()()) } ~~~ 除了構造簡單,組件正交,這種做法也給你一種流式篩子(streaming sieve):你不需要事先計算出你感興趣的質數集合,從而進一步提高了模塊化。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看