對Map的結果進行排序并傳輸到Reduce進行處理 Map的結果并不#x662F;直接存放到硬盤,而是利用緩存做一些預排序處理 Map會調用Combiner,壓縮,按key進行分區、排序等,盡量減少結果的大小 每個Map完成后都會通知Task,然后Reduce就可以進行處理

## Map端
當Map程序開始產生結果的時候,并不是直接寫到文件的,而是利用緩存做一些排序方面的預處理操作
每個Map任務都有一個循環內存緩沖區(默認100MB),當緩存的內容達到80%時,后臺線程開始將內容寫到文件,此時Map任務可以&#x#x7EE7;續輸出結果,但如果緩沖區滿了,Map任務則需要等待
寫文件使用round-robin方式。在寫入文件之前,先將數據按照Reduce進行分區。對于每一個分區,都會在內存中根據key進行排序,如果配置了Combiner,則排序后執行Combiner(Combine之后可以減少寫入文件和傳輸的數據)
每次結果達到緩沖區的閥值時,都會創建一個文件,在Map結束時,可能會產生大量的文件。在Map完成前,會將這些文件進行合并和排序。如果文件的數量超過3個,則&##x5408;并后會再次運行Combiner(1、2個文件就沒有必要了)
如果配置了壓縮,則最終寫入的文件會先進行壓縮,這樣可以減少寫入和傳輸的數據
一旦Map完成,則通知任務管理器,此時Reduce就可以開始復制結果數據
## Reduce端
Map的結果文件都存放到運行Map任務的機器的本地硬盤中
如果Map的結果很少,則直接放到內存,否則寫入文件中
同時后臺線程將這些文件進行合并和排序到一個更大的文件中(如果文件是壓縮的?#xFF0C;則需要先解壓)
當所有的Map結果都被復制和合并后,就會調用Reduce方法
Reduce結果會寫入到HDFS中
## 調優
一般的原則是給shuffle分配盡可能多的內存,但前提是要保證Map、Reduce任務有足夠的內存
對于Map,主要就是避免把文件寫入磁盤,例如使用Combiner,增大io.sort.mb的值
對于Reduce,主要是把Map的結果盡可能地保存到內存中,同樣也是要避免把中間結果寫入磁盤。默認情況下,所有的內存都是分配給Reduce方法的,如果Reduce方法不怎&##x4E48;消耗內存,可以mapred.inmem.merge.threshold設成0,mapred.job.reduce.input.buffer.percent設成1.0
在任務監控中可通過Spilled records counter來監控寫入磁盤的數,但這個值是包括map和reduce的
對于IO方面,可以Map的結果可以使用壓縮,同時增大buffer size(io.file.buffer.size,默認4kb)
## 配置
| 屬性 | 默認值 | 描述 |
| --- | --- | --- |
| io.sort.mb | 100 | The size of the memory buffer to use while sorting map output. |
| io.sort.record.percent | 0.05 | The proportion of io.sort.mb reserved for storing record boundaries of the map outputs. The remaining space is used for the map output records themselves. This property was removed in releases after 1.x, as the shuffle code was improved to do a better job of using all the available memory for map output and accounting information. |
| io.sort.spill.percent | 0.80 | The threshold usage proportion for both the map output memory buffer and the record boundaries index to start the process of spilling to disk. |
| io.sort.factor | 10 | The maximum number of streams to merge at once when sorting files. This property is also used in the reduce. It’s fairly common to increase this to 100. |
| min.num.spills.for.combine | 3 | The minimum number of spill files needed for the combiner to run (if a combiner is specified). |
| mapred.compress.map.output | false | Compress map outputs. |
| mapred.map.output.compression.codec | DefaultCodec | The compression codec to use for map outputs. |
| mapred.reduce.parallel.copies | 5 | The number of threads used to copy map outputs to the reducer. |
| mapred.reduce.copy.backoff | 300 | The maximum amount of time, in seconds, to spend retrieving one map output for a reducer before declaring it as failed. The reducer may repeatedly reattempt a transfer within this time if it fails (using exponential backoff). |
| io.sort.factor | 10 | The maximum number of streams to merge at once when sorting files. This property is also used in the map. |
| mapred.job.shuffle.input.buffer.percent | 0.70 | The proportion of total heap size to be allocated to the map outputs buffer during the copy phase of the shuffle. |
| mapred.job.shuffle.merge.percent | 0.66 | The threshold usage proportion for the map outputs buffer (defined by mapred.job.shuf fle.input.buffer.percent) for starting the process of merging the outputs and spilling to disk. |
| mapred.inmem.merge.threshold | 1000 | The threshold number of map outputs for starting the process of merging the outputs and spilling to disk. A value of 0 or less means there is no threshold, and the spill behavior is governed solely by mapred.job.shuffle.merge.percent. |
| mapred.job.reduce.input.buffer.percent | 0.0 | The proportion of total heap size to be used for retaining map outputs in memory during the reduce. For the reduce phase to begin, the size of map outputs in memory must be no more than this size. By default, all map outputs are merged to disk before the reduce begins, to give the reducers as much memory as possible. However, if your reducers require less memory, this value may be increased to minimize the number of trips to disk. |