顧名思義,broadcast 就是將數據從一個節點發送到其他各個節點上去。這樣的場景很多,比如 driver 上有一張表,其他節點上運行的 task 需要 lookup 這張表,那么 driver 可以先把這張表 copy 到這些節點,這樣 task 就可以在本地查表了。如何實現一個可靠高效的 broadcast 機制是一個有挑戰性的問題。先看看 Spark 官網上的一段話:
> Broadcast variables allow the programmer to keep a?**read-only**?variable cached on each?**machine**rather than shipping a copy of it with?**tasks**. They can be used, for example, to give every node a copy of a?**large input dataset**?in an efficient manner. Spark also attempts to distribute broadcast variables using?**efficient**?broadcast algorithms to reduce communication cost.
## 問題:為什么只能 broadcast 只讀的變量?
這就涉及一致性的問題,如果變量可以被更新,那么一旦變量被某個節點更新,其他節點要不要一塊更新?如果多個節點同時在更新,更新順序是什么?怎么做同步?還會涉及 fault-tolerance 的問題。為了避免維護數據一致性問題,Spark 目前只支持 broadcast 只讀變量。
## 問題:broadcast 到節點而不是 broadcast 到每個 task?
因為每個 task 是一個線程,而且同在一個進程運行 tasks 都屬于同一個 application。因此每個節點(executor)上放一份就可以被所有 task 共享。
## 問題: 具體怎么用 broadcast?
driver program 例子:
~~~
val data = List(1, 2, 3, 4, 5, 6)
val bdata = sc.broadcast(data)
val rdd = sc.parallelize(1 to 6, 2)
val observedSizes = rdd.map(_ => bdata.value.size)
~~~
driver 使用?`sc.broadcast()`?聲明要 broadcast 的 data,bdata 的類型是 Broadcast。
當?`rdd.transformation(func)`?需要用 bdata 時,直接在 func 中調用,比如上面的例子中的 map() 就使用了 bdata.value.size。
## 問題:怎么實現 broadcast?
broadcast 的實現機制很有意思:
### 1\. 分發 task 的時候先分發 bdata 的元信息
Driver 先建一個本地文件夾用以存放需要 broadcast 的 data,并啟動一個可以訪問該文件夾的 HttpServer。當調用`val bdata = sc.broadcast(data)`時就把 data 寫入文件夾,同時寫入 driver 自己的 blockManger 中(StorageLevel 為內存+磁盤),獲得一個 blockId,類型為 BroadcastBlockId。當調用`rdd.transformation(func)`時,如果 func 用到了 bdata,那么 driver submitTask() 的時候會將 bdata 一同 func 進行序列化得到 serialized task,**注意序列化的時候不會序列化 bdata 中包含的 data。**上一章講到 serialized task 從 driverActor 傳遞到 executor 時使用 Akka 的傳消息機制,消息不能太大,而實際的 data 可能很大,所以這時候還不能 broadcast data。
> driver 為什么會同時將 data 放到磁盤和 blockManager 里面?放到磁盤是為了讓 HttpServer 訪問到,放到 blockManager 是為了讓 driver program 自身使用 bdata 時方便(其實我覺得不放到 blockManger 里面也行)。
**那么什么時候傳送真正的 data?**在 executor 反序列化 task 的時候,會同時反序列化 task 中的 bdata 對象,這時候會調用 bdata 的 readObject() 方法。該方法先去本地 blockManager 那里詢問 bdata 的 data 在不在 blockManager 里面,如果不在就使用下面的兩種 fetch 方式之一去將 data fetch 過來。得到 data 后,將其存放到 blockManager 里面,這樣后面運行的 task 如果需要 bdata 就不需要再去 fetch data 了。如果在,就直接拿來用了。
下面探討 broadcast data 時候的兩種實現方式:
### 2\. HttpBroadcast
顧名思義,HttpBroadcast 就是每個 executor 通過的 http 協議連接 driver 并從 driver 那里 fetch data。
Driver 先準備好要 broadcast 的 data,調用`sc.broadcast(data)`后會調用工廠方法建立一個 HttpBroadcast 對象。該對象做的第一件事就是將 data 存到 driver 的 blockManager 里面,StorageLevel 為內存+磁盤,blockId 類型為 BroadcastBlockId。
同時 driver 也會將 broadcast 的 data 寫到本地磁盤,例如寫入后得到`/var/folders/87/grpn1_fn4xq5wdqmxk31v0l00000gp/T/spark-6233b09c-3c72-4a4d-832b-6c0791d0eb9c/broadcast_0`, 這個文件夾作為 HttpServer 的文件目錄。
> Driver 和 executor 啟動的時候,都會生成 broadcastManager 對象,調用 HttpBroadcast.initialize(),driver 會在本地建立一個臨時目錄用來存放 broadcast 的 data,并啟動可以訪問該目錄的 httpServer。
**Fetch data:**在 executor 反序列化 task 的時候,會同時反序列化 task 中的 bdata 對象,這時候會調用 bdata 的 readObject() 方法。該方法先去本地 blockManager 那里詢問 bdata 的 data 在不在 blockManager 里面,**如果不在就使用 http 協議連接 driver 上的 httpServer,將 data fetch 過來。**得到 data 后,將其存放到 blockManager 里面,這樣后面運行的 task 如果需要 bdata 就不需要再去 fetch data 了。如果在,就直接拿來用了。
HttpBroadcast 最大的問題就是?**driver 所在的節點可能會出現網絡擁堵**,因為 worker 上的 executor 都會去 driver 那里 fetch 數據。
### 3\. TorrentBroadcast
為了解決 HttpBroadast 中 driver 單點網絡瓶頸的問題,Spark 又設計了一種 broadcast 的方法稱為 TorrentBroadcast,**這個類似于大家常用的 BitTorrent 技術。**基本思想就是將 data 分塊成 data blocks,然后假設有 executor fetch 到了一些 data blocks,那么這個 executor 就可以被當作 data server 了,隨著 fetch 的 executor 越來越多,有更多的 data server 加入,data 就很快能傳播到全部的 executor 那里去了。
HttpBroadcast 是通過傳統的 http 協議和 httpServer 去傳 data,在 TorrentBroadcast 里面使用在上一章介紹的 blockManager.getRemote() => NIO ConnectionManager 傳數據的方法來傳遞,讀取數據的過程與讀取 cached rdd 的方式類似,可以參閱?[CacheAndCheckpoint](https://github.com/JerryLead/SparkInternals/blob/master/markdown/6-CacheAndCheckpoint.md)?中的最后一張圖。
下面討論 TorrentBroadcast 的一些細節:

### driver 端:
Driver 先把 data 序列化到 byteArray,然后切割成 BLOCK_SIZE(由?`spark.broadcast.blockSize = 4MB`?設置)大小的 data block,每個 data block 被 TorrentBlock 對象持有。切割完 byteArray 后,會將其回收,因此內存消耗雖然可以達到 2 * Size(data),但這是暫時的。
完成分塊切割后,就將分塊信息(稱為 meta 信息)存放到 driver 自己的 blockManager 里面,StorageLevel 為內存+磁盤,同時會通知 driver 自己的 blockManagerMaster 說 meta 信息已經存放好。**通知 blockManagerMaster 這一步很重要,因為 blockManagerMaster 可以被 driver 和所有 executor 訪問到,信息被存放到 blockManagerMaster 就變成了全局信息。**
之后將每個分塊 data block 存放到 driver 的 blockManager 里面,StorageLevel 為內存+磁盤。存放后仍然通知 blockManagerMaster 說 blocks 已經存放好。到這一步,driver 的任務已經完成。
### Executor 端:
executor 收到 serialized task 后,先反序列化 task,這時候會反序列化 serialized task 中包含的 bdata 類型是 TorrentBroadcast,也就是去調用 TorrentBroadcast.readObject()。這個方法首先得到 bdata 對象,**然后發現 bdata 里面沒有包含實際的 data。怎么辦?**先詢問所在的 executor 里的 blockManager 是會否包含 data(通過查詢 data 的 broadcastId),包含就直接從本地 blockManager 讀取 data。否則,就通過本地 blockManager 去連接 driver 的 blockManagerMaster 獲取 data 分塊的 meta 信息,獲取信息后,就開始了 BT 過程。
**BT 過程:**task 先在本地開一個數組用于存放將要 fetch 過來的 data blocks?`arrayOfBlocks = new Array[TorrentBlock](totalBlocks)`,TorrentBlock 是對 data block 的包裝。然后打亂要 fetch 的 data blocks 的順序,比如如果 data block 共有 5 個,那么打亂后的 fetch 順序可能是 3-1-2-4-5。然后按照打亂后的順序去 fetch 一個個 data block。fetch 的過程就是通過 “本地 blockManager -本地 connectionManager-driver/executor 的 connectionManager-driver/executor 的 blockManager-data” 得到 data,這個過程與 fetch cached rdd 類似。**每 fetch 到一個 block 就將其存放到 executor 的 blockManager 里面,同時通知 driver 上的 blockManagerMaster 說該 data block 多了一個存儲地址。**這一步通知非常重要,意味著 blockManagerMaster 知道 data block 現在在 cluster 中有多份,下一個不同節點上的 task 再去 fetch 這個 data block 的時候,可以有兩個選擇了,而且會隨機選擇一個去 fetch。這個過程持續下去就是 BT 協議,隨著下載的客戶端越來越多,data block 服務器也越來越多,就變成 p2p下載了。關于 BT 協議,Wikipedia 上有一個[動畫](http://zh.wikipedia.org/wiki/BitTorrent_(%E5%8D%8F%E8%AE%AE))。
整個 fetch 過程結束后,task 會開一個大 Array[Byte],大小為 data 的總大小,然后將 data block 都 copy 到這個 Array,然后對 Array 中 bytes 進行反序列化得到原始的 data,這個過程就是 driver 序列化 data 的反過程。
最后將 data 存放到 task 所在 executor 的 blockManager 里面,StorageLevel 為內存+磁盤。顯然,這時候 data 在 blockManager 里存了兩份,不過等全部 executor 都 fetch 結束,存儲 data blocks 那份可以刪掉了。
## 問題:broadcast RDD 會怎樣?
[@Andrew-Xia](http://weibo.com/u/1410938285)?回答道:不會怎樣,就是這個rdd在每個executor中實例化一份。
## Discussion
公共數據的 broadcast 是很實用的功能,在 Hadoop 中使用 DistributedCache,比如常用的`-libjars`就是使用 DistributedCache 來將 task 依賴的 jars 分發到每個 task 的工作目錄。不過分發前 DistributedCache 要先將文件上傳到 HDFS。這種方式的主要問題是**資源浪費**,如果某個節點上要運行來自同一 job 的 4 個 mapper,那么公共數據會在該節點上存在 4 份(每個 task 的工作目錄會有一份)。但是通過 HDFS 進行 broadcast 的好處在于**單點瓶頸不明顯**,因為公共 data 首先被分成多個 block,然后不同的 block 存放在不同的節點。這樣,只要所有的 task 不是同時去同一個節點 fetch 同一個 block,網絡擁塞不會很嚴重。
對于 Spark 來講,broadcast 時考慮的不僅是如何將公共 data 分發下去的問題,還要考慮如何讓同一節點上的 task 共享 data。
對于第一個問題,Spark 設計了兩種 broadcast 的方式,傳統存在單點瓶頸問題的 HttpBroadcast,和類似 BT 方式的 TorrentBroadcast。HttpBroadcast 使用傳統的 client-server 形式的 HttpServer 來傳遞真正的 data,而 TorrentBroadcast 使用 blockManager 自帶的 NIO 通信方式來傳遞 data。TorrentBroadcast 存在的問題是**慢啟動**和**占內存**,慢啟動指的是剛開始 data 只在 driver 上有,要等 executors fetch 很多輪 data block 后,data server 才會變得可觀,后面的 fetch 速度才會變快。executor 所占內存的在 fetch 完 data blocks 后進行反序列化時需要將近兩倍 data size 的內存消耗。不管哪一種方式,driver 在分塊時會有兩倍 data size 的內存消耗。
對于第二個問題,每個 executor 都包含一個 blockManager 用來管理存放在 executor 里的數據,將公共數據存放在 blockManager 中(StorageLevel 為內存+磁盤),可以保證在 executor 執行的 tasks 能夠共享 data。
其實 Spark 之前還嘗試了一種稱為 TreeBroadcast 的機制,詳情可以見技術報告?[Performance and Scalability of Broadcast in Spark](http://www.cs.berkeley.edu/~agearh/cs267.sp10/files/mosharaf-spark-bc-report-spring10.pdf)。
更深入點,broadcast 可以用多播協議來做,不過多播使用 UDP,不是可靠的,仍然需要應用層的設計一些可靠性保障機制。