在默認情況下,當 Spark 在集群的多個不同節點的多個任務上并行運行一個函數時,它會把函數中涉及到的每個變量,在每個任務上都生成一個副本。但是,有時候需要在多個任務之間共享變量,或者在任務 (Task) 和任務控制節點 (Driver Program) 之間共享變量。
為了滿足這種需求,Spark 提供了兩種類型的變量:
* **累加器 (accumulators)**:累加器支持在所有不同節點之間進行累加計算 (比如計數或者求和)。
* **廣播變量 (broadcast variables)**:廣播變量用來把變量在所有節點的內存之間進行共享,在每個機器上緩存一個只讀的變量,而不是為機器上的每個任務都生成一個副本。
## 廣播變量
廣播變量允許程序員緩存一個只讀的變量在每臺機器上面,而不是每個任務保存一份拷貝。例如,利用廣播變量,我們能夠以一種更有效率的方式將一個大數據量輸入集合的副本分配給每個節點。(Broadcast variables allow the
programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.They can be used, for example,
to give every node a copy of a large input dataset in an efficient manner.)Spark也嘗試著利用有效的廣播算法去分配廣播變量,以減少通信的成本。
一個廣播變量可以通過調用`SparkContext.broadcast(v)`方法從一個初始變量v中創建。廣播變量是v的一個包裝變量,它的值可以通過`value`方法訪問,下面的代碼說明了這個過程:
```scala
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
```
廣播變量創建以后,我們就能夠在集群的任何函數中使用它來代替變量v,這樣我們就不需要再次傳遞變量v到每個節點上。另外,為了保證所有的節點得到廣播變量具有相同的值,對象v不能在廣播之后被修改。
**一個相對完整的例子**
```
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object BroadcastVariablesTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
//不使用廣播變量
val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")))
val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap
//scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana)
val fruitIds: RDD[Int] = sc.parallelize(List(2,4,1,3))
//根據水果編號取水果名稱
val fruitNames: RDD[String] = fruitIds.map(x=>fruitMap(x))
fruitNames.foreach(println)
//注意:以上代碼看似一點問題沒有,但是考慮到數據量如果較大,且Task數較多,
//那么會導致,被各個Task共用到的fruitMap會被多次傳輸
//應該要減少fruitMap的傳輸,一臺機器上一個,被該臺機器中的Task共用即可
//如何做到?---使用廣播變量
//注意:廣播變量的值不能被修改,如需修改可以將數據存到外部數據源,如MySQL、Redis
println("=====================")
val BroadcastFruitMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap)
val fruitNames2: RDD[String] = fruitIds.map(x=>BroadcastFruitMap.value(x))
fruitNames2.foreach(println)
}
}
```
## 累加器
顧名思義,累加器是一種只能通過關聯操作進行“加”操作的變量,因此它能夠高效的應用于并行操作中。它們能夠用來實現`counters`和`sums`。Spark原生支持數值類型的累加器,開發者可以自己添加支持的類型。
如果創建了一個具名的累加器,它可以在spark的UI中顯示。這對于理解運行階段(running stages)的過程有很重要的作用。(注意:這在python中還不被支持)
一個累加器可以通過調用`SparkContext.accumulator(v)`方法從一個初始變量v中創建。運行在集群上的任務可以通過`add`方法或者使用`+=`操作來給它加值。然而,它們無法讀取這個值。只有驅動程序可以使用`value`方法來讀取累加器的值。
如下的代碼,展示了如何利用累加器將一個數組里面的所有元素相加:
```scala
scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10
```
這個例子利用了內置的整數類型累加器。開發者可以利用子類[AccumulatorParam](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.AccumulatorParam)創建自己的
累加器類型。AccumulatorParam接口有兩個方法:`zero`方法為你的數據類型提供一個“0 值”(zero value);`addInPlace`方法計算兩個值的和。例如,假設我們有一個`Vector`類代表數學上的向量,我們能夠
如下定義累加器:
```scala
object VectorAccumulatorParam extends AccumulatorParam[Vector] {
def zero(initialValue: Vector): Vector = {
Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector): Vector = {
v1 += v2
}
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
```
在scala中,Spark支持用更一般的[Accumulable](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.Accumulable)接口來累積數據-結果類型和用于累加的元素類型
不一樣(例如通過收集的元素建立一個列表)。Spark也支持用`SparkContext.accumulableCollection`方法累加一般的scala集合類型。
**一個相對完整的測試的例子**
通常在向 `Spark` 傳遞函數時,比如使用 `map()` 函數或者用`filter()`傳條件時,可以使用驅動器程序中定義的變量,但是集群中運行的每個任務都會得到這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。這時使用累加器就可以實現我們想要的效果:
語法:`val xx: Accumulator[Int] = sc.accumulator(0)`
示例代碼:
```
import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}
object AccumulatorTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
//使用scala集合完成累加
var counter1: Int = 0;
var data = Seq(1,2,3)
data.foreach(x => counter1 += x )
println(counter1)//6
println("+++++++++++++++++++++++++")
//使用RDD進行累加(RDD是分布式的)
var counter2: Int = 0;
val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3]
dataRDD.foreach(x => counter2 += x)
println(counter2)//0
//注意:上面的RDD操作運行結果是0
//因為foreach中的函數是傳遞給Worker中的Executor執行,用到了counter2變量
//而counter2變量在Driver端定義的,在傳遞給Executor的時候,各個Executor都有了一份counter2
//最后各個Executor將各自個x加到自己的counter2上面了,和Driver端的counter2沒有關系
//那這個問題得解決啊!不能因為使用了Spark連累加都做不了了啊!
//如果解決?---使用累加器
val counter3: Accumulator[Int] = sc.accumulator(0)
dataRDD.foreach(x => counter3 += x)
println(counter3)//6
}
}
```
- Introduction
- 快速上手
- Spark Shell
- 獨立應用程序
- 開始翻滾吧!
- RDD編程基礎
- 基礎介紹
- 外部數據集
- RDD 操作
- 轉換Transformations
- map與flatMap解析
- 動作Actions
- RDD持久化
- RDD容錯機制
- 傳遞函數到 Spark
- 使用鍵值對
- RDD依賴關系與DAG
- 共享變量
- Spark Streaming
- 一個快速的例子
- 基本概念
- 關聯
- 初始化StreamingContext
- 離散流
- 輸入DStreams
- DStream中的轉換
- DStream的輸出操作
- 緩存或持久化
- Checkpointing
- 部署應用程序
- 監控應用程序
- 性能調優
- 減少批數據的執行時間
- 設置正確的批容量
- 內存調優
- 容錯語義
- Spark SQL
- 概述
- SparkSQLvsHiveSQL
- 數據源
- RDDs
- parquet文件
- JSON數據集
- Hive表
- 數據源例子
- join操作
- 聚合操作
- 性能調優
- 其他
- Spark SQL數據類型
- 其它SQL接口
- 編寫語言集成(Language-Integrated)的相關查詢
- GraphX編程指南
- 開始
- 屬性圖
- 圖操作符
- Pregel API
- 圖構造者
- 部署
- 頂點和邊RDDs
- 圖算法
- 例子
- 更多文檔
- 提交應用程序
- 獨立運行Spark
- 在yarn上運行Spark
- Spark配置
- RDD 持久化