# Spark 調優
由于大多數Spark計算都在內存中,所以集群中的任何資源(CPU、網絡帶寬或內存)都可能成為Spark程序的瓶頸。
大多數情況下,如果數據適合存儲在內存中,那么瓶頸就是網絡帶寬,但是有時,你還需要進行一些調優,例如[以序列化的形式存儲RDD](http://spark.apache.org/docs/latest/rdd-programming-guide.html),以減少內存使用。
本指南將涵蓋兩個主要主題: 數據序列化和內存調優。數據序列化對于良好的網絡性能至關重要,它還可以減少內存使用。我們還概述了幾個較小的主題。
# 數據序列化
序列化在任何分布式應用程序的性能中都扮演著重要的角色。將對象序列化成或消耗大量字節的速度較慢的格式將極大地降低計算速度。通常,這是優化Spark應用程序的第一件事。Spark的目標是在便利性(允許你在操作中使用任何Java類型)和性能之間取得平衡。它提供了兩個序列化庫:
* [Java序列化](https://docs.oracle.com/javase/8/docs/api/java/io/Serializable.html): 默認情況下,使用Java的 Spark 序列化對象 `ObjectOutputStream` 框架,你可以使用任何實現了 [java.io.Serializable](https://docs.oracle.com/javase/8/docs/api/java/io/Serializable.html) 的類與它一起使用。你還可以通過繼承 [java.io.Externalizable](https://docs.oracle.com/javase/8/docs/api/java/io/Externalizable.html) 來更深入地控制序列化的性能。Java序列化有著靈活,但通常非常緩慢的特性,會導致許多類的大型序列化格式。
* [Kryo serialization](https://github.com/EsotericSoftware/kryo): Spark還可以使用Kryo庫(`version 4`)更快地序列化對象。Kryo比Java序列化要快得多(通常多達10倍),也更緊湊,但它不支持所有的 `Serializable` 類型,并且要求你預先 *注冊* 將在程序中使用的類,以獲得最佳性能。
你可以通過使用 [SparkConf](20.md#spark屬性) 初始化作業并調用 `conf.set("spark.serializer", org.apache.spark.serializer.KryoSerializer")`,切換使用 `Kryo` 來進行序列化。此設置配置的序列化程序不僅用于在工作節點之間數據 `shuffle`,而且還用于將 `RDD` 序列化到磁盤。Kryo不是默認的 `serializer` 的唯一原因是自定義的注冊要求,但是我們建議在任何網絡密集型應用程序中嘗試它。自從 Spark 2.0.0 以來,我們在使用簡單類型、簡單類型數組或字符串類型 RDD 之間 `shuffle` 時,在內部使用 `Kryo serializer`。
Spark 默認包含 `Kryo` 序列化器,用于 [Twitter chill](https://github.com/twitter/chill) 庫中 `AllScalaRegistrar` 涉及的許多常用的Scala核心類。
要向Kryo注冊你自己的自定義類,請使用 `registerKryoClasses` 方法。
```scala
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
```
[Kryo文檔](https://github.com/EsotericSoftware/kryo) 描述了更高級的注冊選項,如添加自定義序列化代碼。
##
如果你的對象很大,你可能還需要增加 `spark.kryoserializer.buffer` [config](20.md#壓縮和序列化)。這個值需要足夠大,以容納你要序列化的*最大*對象。
最后,如果你不注冊自定義類,Kryo仍然可以工作,但是它必須將完整的類名存儲在每個對象中,這是很浪費資源的。
# 內存調優
在內存使用時進行調優,有三個考慮因素:
- `內存大小`: 對象使用的內存 *大小* (你可能希望整個 `dataset` 都能在內存中)
- `成本`: 訪問對象的*成本*
- `開銷`: `gc` 開銷(如果對象的周轉率很高)。
默認情況下,Java對象的訪問速度很快,但是很容易比字段中的 “原始(raw)” 數據多消耗2-5倍的內存空間。這是由于以下幾個原因造成的:
- 每個不同的 Java 對象都有一個"對象頭",它大約有16個字節,包含指向其類的指針等信息。對于一個只有很少數據的對象(比如一個 `Int` 字段),它可比數據大。
* Java `String` 在原始字符串數據上大約有40個字節的開銷(因為他們將其存儲在 `Char` 數組中,并保留額外的數據,如長度),由于 `String` 內部使用 UTF-16 編碼,因此將每個字符存儲為 *`2字節`*。因此,一個10個字符的字符串可以輕松地消耗60字節空間。
* 常見的集合類,如 `HashMap` 和 `LinkedList`,使用鏈接數據結構,其中每個元素都有一個 "wrapper" 對象(例如“Map.Entry”)。這個對象不僅有一個頭,而且還有指向列表中下一個對象的地址指針(通常每個地址指針有 `8` 個字節)。
* 原始類型的集合通常將它們存儲為 `boxed `(裝箱) 對象,如 `java.lang.Integer`
本節將首先概述Spark中的內存管理,然后討論用戶可以采取的具體策略,以便在他/她的應用程序中更有效地使用內存。特別地,我們將描述如何確定對象的內存使用情況,以及如何改進它——通過更改數據結構或以序列化格式存儲數據。然后,我們將討論調整Spark的 `cache size` 和` Java gc`。
## 內存管理概論
Spark中的內存使用主要分為兩類: 執行和存儲。
- 執行內存: 是指用于 `shuffle` 、`join`、`sort` 和 `aggregation` 的計算
- 存儲內存: 是指用于在集群中 `cache` 和傳播內部數據的內存。
在Spark中,執行和存儲共享一個統一的區域(`M`),當沒有執行內存時,存儲可以獲取所有可用的內存,反之亦然。如果有必要,執行可能會驅逐存儲,但僅在總存儲內存使用量低于某個閾值(`R`)時才會這樣做。換句話說,`R` 描述了 `M` 中的一個子區域,其中緩存的塊永遠不會被驅逐。存儲可能不會因為執行的復雜性而驅逐執行。
這種設計確保了幾個理想的性能。首先,不使用緩存的應用程序可以將整個空間用于執行,從而避免不必要的磁盤溢出。其次,使用緩存的應用程序可以保留最小的存儲空間(R),在這里它們的數據塊不會被清除。最后,這種方法為各種工作負載提供了合理的開箱即用性能,而不需要用戶了解如何在內部劃分內存。
雖然有兩種相關的配置,但是典型的用戶不應該需要調整它們,因為默認值適用于大多數工作負載:
- `spark.memory.fraction` 表示 `M` 的大小是(JVM `heap space`- 300MB)的一部分(默認值0.6)。剩下的空間(40%)用于用戶數據結構、Spark中的內部元數據,以及在稀疏和異常大的記錄情況下防止 `OOM` 錯誤。
- `spark.memory.storageFraction` 將 `R` 的大小表示為 `M` 的一部分(默認值為0.5)。`R` 是 `M` 中的存儲空間,緩存的塊不會被執行清除。
* 應該設置 `spark.memory.fraction` 的值,以便在JVM的 `老年代(old)`或 `永久代(tenured)`中合適地容納這部分堆空間。有關詳細信息,請參閱下面關于 高級 GC調優的討論。
## 確定內存消耗
確定一個 `dataset` 所需的內存消耗的最好方法是創建一個RDD,將它放到緩存中,然后在web UI中查看 “Storage” 頁面。該頁將告訴你RDD占用了多少內存。
要估算特定對象的內存消耗,請使用 `SizeEstimator` 的 `estimate` 方法。這對于嘗試使用不同的數據布局來調整內存使用,以及確定廣播變量在每個執行器堆上占用的空間量非常有用。
## 優化數據結構
減少內存消耗的第一種方法是避免使用增加開銷的Java特性,比如 `pointer-based` 的數據結構和 `wrapper` 對象。
有幾種方法可以做到這一點:
1. 設計你自己的數據結構以選擇對象數組和基本類型,而不是標準的 `Java` 或 `Scala` 集合類(e.g. `HashMap`)。[fastutil](http://fastutil.di.unimi.it/) 庫為與 `Java`標準 library 兼容的基本類型提供了方便的集合類。
2. 盡可能避免嵌套有大量小對象和指針的結構。
3. 考慮使用數字id或枚舉對象代替 `key` 的字符串。
4. 如果你的內存不足32 GB,那么可以設置 JVM參數 `-XX:+UseCompressedOops`,使地址指針變成4個字節,而不是8個字節。你可以在 [`spark-env.sh`](20.md#環境變量) 中添加這些選項。
## 序列化 RDD 存儲
當對象仍過于龐大,盡管進行了上述調優,仍無法有效地進行存儲。 減少內存使用一個更簡單的方法是將它們用 *`serialized`* 形式進行存儲。 在 [RDD 持久 API](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd持久化) 使用序列化 StorageLevels, 如 `MEMORY_ONLY_SER`。Spark 然后將每個RDD分區存儲為一個大字節數組。以序列化形式存儲數據的惟一缺點是訪問時間較慢,因為必須動態地反序列化每個對象。如果你希望以序列化的形式緩存數據,我們強烈建議[使用Kryo](#數據序列化),因為它比Java序列化(當然也比原始Java對象)的大小小得多。
## GC優化
當你的程序存儲了大量的 `RDD` 數據時, `JVM GC` 可能會成為一個問題。(對于那些只讀取一次RDD,然后在上面運行許多操作的程序來說,這通常不是問題。)當Java需要清除舊對象來為新對象騰出空間時,它將需要跟蹤所有Java對象并找到未使用的對象。這里要記住的要點是,GC成本與Java對象的數量*成比例,因此使用對象較少的數據結構(例如,一個 `Int` 數組而不是 `LinkedList`數組)可以大大降低這一成本。一個更好的方法是以序列化的形式持久化對象,如上所述:現在每個RDD分區將只有 *一個* 對象(一個字節數組)。在嘗試其他技術之前,如果GC有問題,首先要嘗試的是使用[序列化緩存](#序列化 RDD 存儲)。
GC也可能是一個問題,因為任務的工作內存(運行任務所需的空間量)和節點上緩存的 `RDD` 之間存在干擾。我們將討論如何控制分配給RDD緩存的空間來緩解這種情況。
**測量GC的影響**
GC調優的第一步是收集關于 GC發生的頻率和GC花費的時間的統計信息。這可以通過在Java 參數中添加 `-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps` 來實現。(請參閱[配置指南](20.md#動態加載Spark屬性)以獲取關于將Java參數傳遞給Spark作業的信息。)下一次運行Spark作業時,你將看到每次發生垃圾收集時在 `Worker` 日志中打印的消息。注意,這些日志將在你的集群`Worker 節點` 上(在它們的工作目錄中的 `stdout` 文件中),而不是在你的 Driver 進程上。
**高級GC調優**
為了進一步調優 GC,我們首先需要了解JVM中關于內存管理的一些基本信息:
- Java堆空間分為兩個區域,年輕代(`Young`) 和老年代(`Old`) 。年輕代的目的是持有生命周期較短(short-object)的對象,而老年代的目的是持有具有更長的生存期的對象。
- 年輕一代被進一步劃分為三個區域[`Eden`, `Survivor1`, `Survivor2`]。
- GC 過程的簡化描述:
- 當Eden已滿時,在Eden上運行一個小型GC,從Eden和Survivor1存活的對象被復制到Survivor2。交換`Survivor` 區域。
- 如果一個對象足夠 Old(在多次GC后還存活) 或Survivor2已滿,則將其移動到老年代中。
- 最后,當老年代接近full時,將調用full GC。
在Spark中進行GC調優的目標是確保在老年代中只存儲長期存在的 `RDD`,而年輕代的大小足以存儲短期存在的對象。這將有助于避免在任務執行期間發生 `full gc` 來收集創建的臨時對象。一些可能有用的步驟是:
- 通過收集GC統計信息來檢查是否有太多的 `GC` 發生。如果在任務完成之前多次調用 `full GC`,這意味著沒有足夠的可用內存來執行任務。
- 如果 `Minor GC` 太多而 `Major GC` 太少,那么為Eden分配更多的內存會有所幫助。你可以將Eden的大小設置為高估 (over-estimate) 每個 `task` 所需的內存大小。如果 `Eden` 的大小被確定為 `E`,那么你可以使用選項 `-Xmn=4/3*E` 來設置年輕代的大小。(4/3的比例也考慮到了 `Survivor` 區域使用的空間, 一個 `Survivor` 占年輕代 1/8 空間)
- 在打印的GC統計中,如果老年代空間接近滿,通過降低 `spark.memory.fraction` 來減少用于緩存的內存數量; 緩存更少的對象比降低任務執行速度要好。或者,考慮減少年輕代的規模。這意味著降低 `-Xmn`,如果你已經設置為上述。如果沒有,請嘗試更改JVM的 `NewRatio` 參數的值。許多 JVM 將其默認為2,這意味著老一代占用了2/3的堆。它應該足夠大,使這個 `fraction` 超過 `spark.memory.fraction`。
- 嘗試使用JVM參數 `-XX:+UseG1GC` 使用的 `G1GC` 垃圾收集器。在垃圾收集成為瓶頸的某些情況下,它可以提高性能。注意, 對于 `Executor` 堆大小容量大的集群, 用 `-XX:G1HeapRegionSize` 參數增加 [G1 區域尺寸](http://www.oracle.com/technetwork/articles/java/g1gc-1984535.html) 會很重要。
- 例如,如果你的任務正在從HDFS讀取數據,則可以使用從HDFS讀取的數據塊的大小來估計任務所使用的內存量。 注意,解壓縮塊的大小通常是塊大小的2到3倍。因此,如果我們希望有3或4個任務的工作空間,并且 `HDFS` 塊大小為128MB,我們可以估計Eden的大小為 `4*3*128MB`。
- 監視隨新設置的變化, GC 所花費的頻率和時間。
我們的經驗表明,GC調優的效果取決于你的應用程序和可用的內存量。官網描述了[更多的調優選項](https://docs.oracle.com/javase/8/docs/technotes/guides/vm/gctuning/index.html),但是在高層次上,管理 `full GC` 發生的頻率有助于減少開銷。
可以通過在 Job 配置中,設置 `spark.executor.extraJavaOptions` 來指定 `Executor` 的GC調優等級。
# 其它考慮
## 并行級別
除非將每個 `operation` 的并行度設置得足夠高,否則集群資源不會得到充分利用。Spark根據每個文件的大小自動設置要在每個文件上運行的“map” 任務的數量(盡管你可以通過 `SparkContext.textFile` 的可選參數來控制它)。對于分布式的“reduce”操作,例如 `groupByKey` 和 `reduceByKey`,它使用最大的父RDD的分區數。你可以將并行度級別作為第二個參數傳遞(參見 [spark.PairRDDFunction](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions) 文件),或者設置配置屬性`spark.default.parallelism` 來更改默認值。通常,我們建議在你的集群中每個CPU內核執行2-3個任務。
## Reduce任務內存使用
有時,你會得到一個OutOfMemoryError錯誤,不是因為你的RDD 內存不合適,而是因為你的一個任務的工作集,例如 `groupByKey` 中的一個 `reduce` 任務,占用內存太多。Spark的 `shuffle` 操作( `sortByKey`、`groupByKey`、`reduceByKey`、`join`等)在每個任務中構建一個 `hash table` 來執行分組,分組常常很大。這里最簡單的修復方法是增加并行度,這樣每個任務的輸入集就會更小。Spark可以有效地支持短至200 ms的任務,因為它跨多個任務重用一個 `executor JVM`,而且它的任務啟動成本很低,所以可以安全地將并行度提高到比集群中的內核數量更多的水平。
## 廣播大變量
使用SparkContext中可用的[廣播功能](http://spark.apache.org/docs/latest/rdd-programming-guide.html#廣播變量)可以極大地減少每個序列化任務的大小,以及通過集群啟動 job 的成本。如果你的任務使用了 `Driver` 進程中的任何大對象(例如,一個 static 查找表),請考慮將其轉換為廣播變量。Spark在 `Master` 上記錄了每個任務的序列化大小,因此可以查看它來決定你的任務是否太大;一般情況下,大于20kb的任務可能值得優化。
## 數據局部性
數據局部性對Spark作業的性能有很大的影響。如果數據和對其進行操作的代碼在一起,那么計算往往會很快。但是,如果代碼和數據是分開的,那么其中一個必須轉移到另一個。通常,將序列化的代碼從一個地方傳送到另一個地方要比傳送數據塊快得多,因為代碼的大小比數據小得多。Spark根據數據局部性的一般原則構建其調度。
數據局部性是指數據與處理數據的代碼之間的距離。基于數據的當前位置有幾個級別的局部性。按從最近到最遠的順序排列:
- `PROCESS_LOCAL` : 數據與運行代碼位于相同的`JVM`中。這是最好的位置特性。
- `NODE_LOCAL`: 數據在同一個節點上。數據可能在同一節點上的 `HDFS` 中,也可能在同一節點上的另一個 `Executor` 中。這比 `PROCESS_LOCAL` 稍微慢一些,因為數據必須在進程之間傳遞
- `NO_PREF` : 數據從任何地方訪問, 速度都一樣快,并且沒有區域性優先特性
- `RACK_LOCAL`: 數據位于相同的服務器機架上。數據位于同一機架上的不同服務器上,因此需要通過網絡發送,通常是通過單個交換機
- `Any`: 數據都在網絡的其他地方,不在同一個機架上
Spark傾向于將所有任務安排在最佳位置級別,但這并不總是可能的。在任何空閑執行器上都沒有未處理的數據的情況下,Spark切換到較低的位置級別。有兩種選擇:
a)等待繁忙的CPU釋放出來,在同一服務器上的數據上啟動一個任務,或者
b)立即在需要移動數據的較遠的地方啟動一個新任務。
Spark通常做的是稍作等待,希望繁忙的CPU可以釋放。一旦超時過期,它就開始將數據從遠處移動到空閑的CPU。每個級別之間回退的等待超時可以單獨配置,也可以全部配置在一個參數中; 有關詳細信息,見 [配置頁面](20.md#計劃) `spark.locality`參數 。如果你的任務很長,并且位置不好,你應該增加這些設置,但是默認設置通常工作得很好。
## 總結
這是一個簡短的指南,指出了你在調優Spark應用程序時應該了解的主要問題——最重要的是數據序列化和內存調優。對于大多數程序,切換到Kryo序列化并以序列化的形式持久化數據將解決最常見的性能問題。你可以在[Spark郵件列表中](https://spark.apache.org/community.html) 詢問其他調優最佳實踐。
- Spark 概述
- 編程指南
- 快速入門
- Spark 編程指南
- 構建在 Spark 之上的模塊
- Spark Streaming 編程指南
- Spark SQL, DataFrames and Datasets Guide
- MLlib
- GraphX Programming Guide
- API 文檔
- 部署指南
- 集群模式概述
- Submitting Applications
- 部署模式
- Spark Standalone Mode
- 在 Mesos 上運行 Spark
- Running Spark on YARN
- 其它
- 更多
- Spark 配置
- Monitoring and Instrumentation
- Tuning Spark
- 作業調度
- Spark 安全
- 硬件配置
- Accessing OpenStack Swift from Spark
- 構建 Spark
- 其它
- 外部資源
- Spark RDD(Resilient Distributed Datasets)論文
- 翻譯進度