[TOC]
# Scala實戰
## 1、課程目標
### 1.1、目標:熟練使用Scala編寫程序

## 2、項目概述
### 2.1、需求
> 目前大多數的分布式架構底層通信都是通過RPC實現的,RPC框架非常多,比如前我們學過的Hadoop項目的RPC通信框架,但是Hadoop在設計之初就是為了運行長達數小時的批量而設計的,在某些極端的情況下,任務提交的延遲很高,所有Hadoop的RPC顯得有些笨重。
> Spark 的RPC是通過Akka類庫實現的,Akka用Scala語言開發,基于Actor并發模型實現,Akka具有高可靠、高性能、可擴展等特點,使用Akka可以輕松實現分布式RPC功能。
### 2.2、Akka簡介
> Akka基于Actor模型,提供了一個用于構建可擴展的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程序的平臺。
> Actor模型:在計算機科學領域,Actor模型是一個并行計算(Concurrent Computation)模型,它把actor作為并行計算的基本元素來對待:為響應一個接收到的消息,一個actor能夠自己做出一些決策,如創建更多的actor,或發送更多的消息,或者確定如何去響應接收到的下一個消息。

> Actor是Akka中最核心的概念,它是一個封裝了狀態和行為的對象,Actor之間可以通過交換消息的方式進行通信,每個Actor都有自己的收件箱(Mailbox)。通過Actor能夠簡化鎖及線程管理,可以非常容易地開發出正確地并發程序和并行系統,Actor具有如下特性:
1. 提供了一種高級抽象,能夠簡化在并發(Concurrency)/并行(Parallelism)應用場景下的編程開發
2. 提供了異步非阻塞的、高性能的事件驅動編程模型
3. 超級輕量級事件處理(每GB堆內存幾百萬Actor)
## 3、項目實現
### 3.1、架構圖

### 3.2、重要類介紹
#### 3.2.1、ActorSystem
> 在Akka中,ActorSystem是一個重量級的結構,他需要分配多個線程,所以在實際應用中,ActorSystem通常是一個單例對象,我們可以使用這個ActorSystem創建很多Actor。
#### 3.2.2、Actor
> 在Akka中,Actor負責通信,在Actor中有一些重要的生命周期方法。
1. preStart()方法:該方法在Actor對象構造方法執行后執行,整個Actor生命周期中僅執行一次。
2. receive()方法:該方法在Actor的preStart方法執行完成后執行,用于接收消息,會被反復執行。
### 3.3、Master類
~~~
package cn.itcast.spark
import scala.concurrent.duration._
import akka.actor.{Props, ActorSystem, Actor}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory
import scala.collection.mutable
/**
* Master為整個集群中的主節點
* Master繼承了Actor
*/
class Master extends Actor{
//保存WorkerID和Work信息的map
val idToWorker = new mutable.HashMap[String, WorkerInfo]
//保存所有Worker信息的Set
val workers = new mutable.HashSet[WorkerInfo]
//Worker超時時間
val WORKER_TIMEOUT = 10 * 1000
//重新receive方法
//導入隱式轉換,用于啟動定時器
import context.dispatcher
//構造方法執行完執行一次
override def preStart(): Unit = {
//啟動定時器,定時執行
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckOfTimeOutWorker)
}
//該方法會被反復執行,用于接收消息,通過case class模式匹配接收消息
override def receive: Receive = {
//Worker向Master發送的注冊消息
case RegisterWorker(id, workerHost, memory, cores) => {
if(!idToWorker.contains(id)) {
val worker = new WorkerInfo(id, workerHost, memory, cores)
workers.add(worker)
idToWorker(id) = worker
sender ! RegisteredWorker("192.168.10.1")
}
}
//Worker向Master發送的心跳消息
case HeartBeat(workerId) => {
val workerInfo = idToWorker(workerId)
workerInfo.lastHeartbeat = System.currentTimeMillis()
}
//Master自己向自己發送的定期檢查超時Worker的消息
case CheckOfTimeOutWorker => {
val currentTime = System.currentTimeMillis()
val toRemove = workers.filter(w => currentTime - w.lastHeartbeat > WORKER_TIMEOUT).toArray
for(worker <- toRemove){
workers -= worker
idToWorker.remove(worker.id)
}
println("worker size: " + workers.size)
}
}
}
object Master {
//程序執行入口
def main(args: Array[String]) {
val host = "192.168.10.1"
val port = 8888
//創建ActorSystem的必要參數
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val config = ConfigFactory.parseString(configStr)
//ActorSystem是單例的,用來創建Actor
val actorSystem = ActorSystem.create("MasterActorSystem", config)
//啟動Actor,Master會被實例化,生命周期方法會被調用
actorSystem.actorOf(Props[Master], "Master")
}
}
~~~
### 3.4、Worker類
~~~
package cn.itcast.spark
import java.util.UUID
import scala.concurrent.duration._
import akka.actor.{ActorSelection, Props, ActorSystem, Actor}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory
/**
* Worker為整個集群的從節點
* Worker繼承了Actor
*/
class Worker extends Actor{
//Worker端持有Master端的引用(代理對象)
var master: ActorSelection = null
//生成一個UUID,作為Worker的標識
val id = UUID.randomUUID().toString
//構造方法執行完執行一次
override def preStart(): Unit = {
//Worker向MasterActorSystem發送建立連接請求
master = context.system.actorSelection("akka.tcp://MasterActorSystem@192.168.10.1:8888/user/Master")
//Worker向Master發送注冊消息
master ! RegisterWorker(id, "192.168.10.1", 10240, 8)
}
//該方法會被反復執行,用于接收消息,通過case class模式匹配接收消息
override def receive: Receive = {
//Master向Worker的反饋信息
case RegisteredWorker(masterUrl) => {
import context.dispatcher
//啟動定時任務,向Master發送心跳
context.system.scheduler.schedule(0 millis, 5000 millis, self, SendHeartBeat)
}
case SendHeartBeat => {
println("worker send heartbeat")
master ! HeartBeat(id)
}
}
}
object Worker {
def main(args: Array[String]) {
val clientPort = 2552
//創建WorkerActorSystem的必要參數
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.port = $clientPort
""".stripMargin
val config = ConfigFactory.parseString(configStr)
val actorSystem = ActorSystem("WorkerActorSystem", config)
//啟動Actor,Master會被實例化,生命周期方法會被調用
actorSystem.actorOf(Props[Worker], "Worker")
}
}
~~~
- hadoop
- linux基礎
- Linux入門
- Linux進階
- shell
- Zookeeper
- Zookeeper簡介及部署
- Zookeeper使用及API
- Redis
- Redis簡介安裝部署
- Redis使用及API
- Java高級增強
- Java多線程增強
- Maven簡介及搭建
- Hive
- Hive簡介及安裝
- Hive操作
- HIve常用函數
- Hive數據類型
- Flume
- Flume簡介及安裝
- flume 攔截器(interceptor)
- azkaban
- azKaban簡介及安裝
- Sqoop
- Sqoop簡介及安裝
- HDFS
- HDFS原理
- HDFS操作API
- MAPREDUCE原理
- MAPREDUCE圖片資源
- MAPREDUCE加強
- HBASE
- HBASE簡介及安裝
- HBASE操作及API
- HBASE內部原理
- Storm
- Storm簡介及安裝
- Storm原理
- kafka
- kafka簡介及安裝
- kafka常用操作及API
- kafka原理
- kafka配置詳解
- Scala
- Scala簡介及安裝
- Scala基礎語法
- Scala實戰