<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                [TOC] # Scala實戰 ## 1、課程目標 ### 1.1、目標:熟練使用Scala編寫程序 ![](https://box.kancloud.cn/1ca496183d367bf9760e22f01939fb4e_696x374.png) ## 2、項目概述 ### 2.1、需求 > 目前大多數的分布式架構底層通信都是通過RPC實現的,RPC框架非常多,比如前我們學過的Hadoop項目的RPC通信框架,但是Hadoop在設計之初就是為了運行長達數小時的批量而設計的,在某些極端的情況下,任務提交的延遲很高,所有Hadoop的RPC顯得有些笨重。 > Spark 的RPC是通過Akka類庫實現的,Akka用Scala語言開發,基于Actor并發模型實現,Akka具有高可靠、高性能、可擴展等特點,使用Akka可以輕松實現分布式RPC功能。 ### 2.2、Akka簡介 > Akka基于Actor模型,提供了一個用于構建可擴展的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程序的平臺。 > Actor模型:在計算機科學領域,Actor模型是一個并行計算(Concurrent Computation)模型,它把actor作為并行計算的基本元素來對待:為響應一個接收到的消息,一個actor能夠自己做出一些決策,如創建更多的actor,或發送更多的消息,或者確定如何去響應接收到的下一個消息。 ![](https://box.kancloud.cn/04363a1ee86ee1e92901f1cf550e3cd7_694x591.png) > Actor是Akka中最核心的概念,它是一個封裝了狀態和行為的對象,Actor之間可以通過交換消息的方式進行通信,每個Actor都有自己的收件箱(Mailbox)。通過Actor能夠簡化鎖及線程管理,可以非常容易地開發出正確地并發程序和并行系統,Actor具有如下特性: 1. 提供了一種高級抽象,能夠簡化在并發(Concurrency)/并行(Parallelism)應用場景下的編程開發 2. 提供了異步非阻塞的、高性能的事件驅動編程模型 3. 超級輕量級事件處理(每GB堆內存幾百萬Actor) ## 3、項目實現 ### 3.1、架構圖 ![](https://box.kancloud.cn/054164c5106633c8deb689c8029f24ca_692x222.png) ### 3.2、重要類介紹 #### 3.2.1、ActorSystem > 在Akka中,ActorSystem是一個重量級的結構,他需要分配多個線程,所以在實際應用中,ActorSystem通常是一個單例對象,我們可以使用這個ActorSystem創建很多Actor。 #### 3.2.2、Actor > 在Akka中,Actor負責通信,在Actor中有一些重要的生命周期方法。 1. preStart()方法:該方法在Actor對象構造方法執行后執行,整個Actor生命周期中僅執行一次。 2. receive()方法:該方法在Actor的preStart方法執行完成后執行,用于接收消息,會被反復執行。 ### 3.3、Master類 ~~~ package cn.itcast.spark import scala.concurrent.duration._ import akka.actor.{Props, ActorSystem, Actor} import akka.actor.Actor.Receive import com.typesafe.config.ConfigFactory import scala.collection.mutable /** * Master為整個集群中的主節點 * Master繼承了Actor */ class Master extends Actor{ //保存WorkerID和Work信息的map val idToWorker = new mutable.HashMap[String, WorkerInfo] //保存所有Worker信息的Set val workers = new mutable.HashSet[WorkerInfo] //Worker超時時間 val WORKER_TIMEOUT = 10 * 1000 //重新receive方法 //導入隱式轉換,用于啟動定時器 import context.dispatcher //構造方法執行完執行一次 override def preStart(): Unit = { //啟動定時器,定時執行 context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckOfTimeOutWorker) } //該方法會被反復執行,用于接收消息,通過case class模式匹配接收消息 override def receive: Receive = { //Worker向Master發送的注冊消息 case RegisterWorker(id, workerHost, memory, cores) => { if(!idToWorker.contains(id)) { val worker = new WorkerInfo(id, workerHost, memory, cores) workers.add(worker) idToWorker(id) = worker sender ! RegisteredWorker("192.168.10.1") } } //Worker向Master發送的心跳消息 case HeartBeat(workerId) => { val workerInfo = idToWorker(workerId) workerInfo.lastHeartbeat = System.currentTimeMillis() } //Master自己向自己發送的定期檢查超時Worker的消息 case CheckOfTimeOutWorker => { val currentTime = System.currentTimeMillis() val toRemove = workers.filter(w => currentTime - w.lastHeartbeat > WORKER_TIMEOUT).toArray for(worker <- toRemove){ workers -= worker idToWorker.remove(worker.id) } println("worker size: " + workers.size) } } } object Master { //程序執行入口 def main(args: Array[String]) { val host = "192.168.10.1" val port = 8888 //創建ActorSystem的必要參數 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config = ConfigFactory.parseString(configStr) //ActorSystem是單例的,用來創建Actor val actorSystem = ActorSystem.create("MasterActorSystem", config) //啟動Actor,Master會被實例化,生命周期方法會被調用 actorSystem.actorOf(Props[Master], "Master") } } ~~~ ### 3.4、Worker類 ~~~ package cn.itcast.spark import java.util.UUID import scala.concurrent.duration._ import akka.actor.{ActorSelection, Props, ActorSystem, Actor} import akka.actor.Actor.Receive import com.typesafe.config.ConfigFactory /** * Worker為整個集群的從節點 * Worker繼承了Actor */ class Worker extends Actor{ //Worker端持有Master端的引用(代理對象) var master: ActorSelection = null //生成一個UUID,作為Worker的標識 val id = UUID.randomUUID().toString //構造方法執行完執行一次 override def preStart(): Unit = { //Worker向MasterActorSystem發送建立連接請求 master = context.system.actorSelection("akka.tcp://MasterActorSystem@192.168.10.1:8888/user/Master") //Worker向Master發送注冊消息 master ! RegisterWorker(id, "192.168.10.1", 10240, 8) } //該方法會被反復執行,用于接收消息,通過case class模式匹配接收消息 override def receive: Receive = { //Master向Worker的反饋信息 case RegisteredWorker(masterUrl) => { import context.dispatcher //啟動定時任務,向Master發送心跳 context.system.scheduler.schedule(0 millis, 5000 millis, self, SendHeartBeat) } case SendHeartBeat => { println("worker send heartbeat") master ! HeartBeat(id) } } } object Worker { def main(args: Array[String]) { val clientPort = 2552 //創建WorkerActorSystem的必要參數 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.port = $clientPort """.stripMargin val config = ConfigFactory.parseString(configStr) val actorSystem = ActorSystem("WorkerActorSystem", config) //啟動Actor,Master會被實例化,生命周期方法會被調用 actorSystem.actorOf(Props[Worker], "Worker") } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看