[TOC]
# 結構
一個完整的mapreduce程序在分布式運行時有三類實例進程:
1. MRAppMaster:負責整個程序的過程調度及狀態協調
2. mapTask:負責map階段的整個數據處理流程
3. ReduceTask:負責reduce階段的整個數據處理流程
# MR程序運行流程
流程示意圖

流程解析
1. 一個mr程序啟動的時候,最先啟動的是MRAppMaster,MRAppMaster啟動后根據本次job的描述信息,計算出需要的maptask實例數量,然后向集群申請機器啟動相應數量的maptask進程
2. maptask進程啟動之后,根據給定的數據切片范圍進行數據處理,主體流程為:
* 利用客戶指定的inputformat來獲取RecordReader讀取數據,形成輸入KV對
* 將輸入KV對傳遞給客戶定義的map()方法,做邏輯運算,并將map()方法輸出的KV對收集到緩存
* 將緩存中的KV對按照K分區排序后不斷溢寫到磁盤文件
3. MRAppMaster監控到所有maptask進程任務完成之后,會根據客戶指定的參數啟動相應數量的reducetask進程,并告知reducetask進程要處理的數據范圍(數據分區)
4. Reducetask進程啟動之后,根據MRAppMaster告知的待處理數據所在位置,從若干臺maptask運行所在機器上獲取到若干個maptask輸出結果文件,并在本地進行重新歸并排序,然后按照相同key的KV為一個組,調用客戶定義的reduce()方法進行邏輯運算,并收集運算輸出的結果KV,然后調用客戶指定的outputformat將結果數據輸出到外部存儲
# MapTask并行度決定機制
maptask的并行度決定map階段的任務處理并發度,進而影響到整個job的處理速度
那么,mapTask并行實例是否越多越好呢?其并行度又是如何決定呢?
## mapTask并行度的決定機制
一個job的map階段并行度由客戶端在提交job時決定
而客戶端對map階段并行度的規劃的基本邏輯為:
將待處理數據執行邏輯切片(即按照一個特定切片大小,將待處理數據劃分成邏輯上的多個split),然后每一個split分配一個mapTask并行實例處理
這段邏輯及形成的切片規劃描述文件,由FileInputFormat實現類的getSplits()方法完成,其過程如下圖:

## FileInputFormat切片機制
1. 切片定義在InputFormat類中的getSplit()方法
2. FileInputFormat中默認的切片機制:
* 簡單地按照文件的內容長度進行切片
* 切片大小,默認等于block大小
* 切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片
比如待處理數據有兩個文件:
~~~
file1.txt 320M
file2.txt 10M
~~~
經過FileInputFormat的切片機制運算后,形成的切片信息如下:
~~~
file1.txt.split1-- 0~128
file1.txt.split2-- 128~256
file1.txt.split3-- 256~320
file2.txt.split1-- 0~10M
~~~
3. FileInputFormat中切片的大小的參數配置
通過分析源碼,在FileInputFormat中,計算切片大小的邏輯:
`Math.max(minSize, Math.min(maxSize, blockSize));` 切片主要由這幾個值來運算決定
~~~
minsize:默認值:1
配置參數: mapreduce.input.fileinputformat.split.minsize
maxsize:默認值:Long.MAXValue
配置參數:mapreduce.input.fileinputformat.split.maxsize
blocksize
~~~
因此,默認情況下,切片大小=blocksize
maxsize(切片最大值):
參數如果調得比blocksize小,則會讓切片變小,而且就等于配置的這個參數的值
minsize (切片最小值):
參數調的比blockSize大,則可以讓切片變得比blocksize還大
**但是,不論怎么調參數,都不能讓多個小文件“劃入”一個split**
選擇并發數的影響因素:
運算節點的硬件配置
運算任務的類型:CPU密集型還是IO密集型
運算任務的數據量
## map并行度的經驗之談
如果硬件配置為`2*12core + 64G`,恰當的map并行度是大約每個節點20-100個map,最好每個map的執行時間至少一分鐘。
* 如果job的每個map或者 reduce task的運行時間都只有30-40秒鐘,那么就減少該job的map或者reduce數,每一個task(map|reduce)的setup和加入到調度器中進行調度,這個中間的過程可能都要花費幾秒鐘,所以如果每個task都非常快就跑完了,就會在task的開始和結束的時候浪費太多的時間。
配置task的JVM重用可以改善該問題:
(`mapred.job.reuse.jvm.num.tasks`,默認是1,表示一個JVM上最多可以順序執行的task
數目(屬于同一個Job)是1。也就是說一個task啟一個JVM)
* 如果input的文件非常的大,比如1TB,可以考慮將hdfs上的每個block size設大,比如設成256MB或者512MB
# ReduceTask并行度的決定
reducetask的并行度同樣影響整個job的執行并發度和執行效率,但與maptask的并發數由切片數決定不同,Reducetask數量的決定是可以直接手動設置:
~~~
//默認值是1,手動設置為4
job.setNumReduceTasks(4);
~~~
如果數據分布不均勻,就有可能在reduce階段產生數據傾斜
注意: reducetask數量并不是任意設置,還要考慮業務邏輯需求,有些情況下,需要計算全局匯總結果,就只能有1個reducetask
盡量不要運行太多的reduce task。對大多數job來說,最好rduce的個數最多和集群中的reduce持平,或者比集群的 reduce slots小。這個對于小集群而言,尤其重要。
# mapreduce實踐
## mapreduce編程規范
編程規范
1. 用戶編寫的程序分成三個部分:Mapper,Reducer,Driver(提交運行mr程序的客戶端)
2. Mapper的輸入數據是KV對的形式(KV的類型可自定義)
3. Mapper的輸出數據是KV對的形式(KV的類型可自定義)
4. Mapper中的業務邏輯寫在map()方法中
5. map()方法(maptask進程)對每一個<K,V>調用一次
6. Reducer的輸入數據類型對應Mapper的輸出數據類型,也是KV
7. Reducer的業務邏輯寫在reduce()方法中
8. Reducetask進程對每一組相同k的<k,v>組調用一次reduce()方法
9. 用戶自定義的Mapper和Reducer都要繼承各自的父類
10. 整個程序需要一個Drvier來進行提交,提交的是一個描述了各種必要信息的job對象
## mapreduce程序運行模式
### 本地運行模式
1. mapreduce程序是被提交給LocalJobRunner在本地以單進程的形式運行
2. 而處理的數據及輸出結果可以在本地文件系統,也可以在hdfs上
3. 怎樣實現本地運行?寫一個程序,不要帶集群的配置文件(本質是你的mr程序的conf中是否有mapreduce.framework.name=local以及yarn.resourcemanager.hostname參數)
4. 本地模式非常便于進行業務邏輯的debug,只要在eclipse中打斷點即可
如果在windows下想運行本地模式來測試程序邏輯,需要在windows中配置環境變量:
~~~
%HADOOP_HOME% = d:/hadoop-2.6.1
%PATH% = %HADOOP_HOME%\bin
~~~
并且要將`d:/hadoop-2.6.1`的lib和bin目錄替換成windows平臺編譯的版本
### 集群運行模式
1. 將mapreduce程序提交給yarn集群resourcemanager,分發到很多的節點上并發執行
2. 處理的數據和輸出結果應該位于hdfs文件系統
3. 提交集群的實現步驟:
A. 將程序打成JAR包,然后在集群的任意一個節點上用hadoop命令啟動
~~~
$ hadoop jar wordcount.jar cn.bigdata.mrsimple.WordCountDriver inputpath outputpath
~~~
B. 直接在linux的eclipse中運行main方法
(項目中要帶參數:`mapreduce.framework.name=yarn`以及yarn的兩個基本配置)
C. 如果要在windows的eclipse中提交job給集群,則要修改YarnRunner類
mapreduce程序在集群中運行時的大體流程:

附:在windows平臺上訪問hadoop時改變自身身份標識的方法之二:
在代碼中設置配置或者

### MAPREDUCE中的Combiner
Combiner的使用要非常謹慎
因為combiner在mapreduce過程中可能調用也肯能不調用,可能調一次也可能調多次
所以:combiner使用的原則是:有或沒有都不能影響業務邏輯
1. combiner是MR程序中Mapper和Reducer之外的一種組件
2. combiner組件的父類就是Reducer
3. combiner和reducer的區別在于運行的位置:
Combiner是在每一個maptask所在的節點運行
Reducer是接收全局所有Mapper的輸出結果;
4. combiner的意義就是對每一個maptask的輸出進行局部匯總,以減小網絡傳輸量
具體實現步驟:
1. 自定義一個combiner繼承Reducer,重寫reduce方法
2. 在job中設置: job.setCombinerClass(CustomCombiner.class)
5. combiner能夠應用的前提是不能影響最終的業務邏輯,而且,combiner的輸出kv應該跟reducer的輸入kv類型要對應起來
# MAPREDUCE原理篇
## mapreduce的shuffle機制
概述:
* mapreduce中,map階段處理的數據如何傳遞給reduce階段,是mapreduce框架中最關鍵的一個流程,這個流程就叫shuffle;
* shuffle: 洗牌、發牌——(核心機制:數據分區,排序,緩存);
* 具體來說:就是將maptask輸出的處理結果數據,分發給reducetask,并在分發的過程中,對數據按key進行了分區和排序;
## 主要流程:
Shuffle緩存流程:

shuffle是MR處理流程中的一個過程,它的每一個處理步驟是分散在各個map task和reduce task節點上完成的,整體來看,分為3個操作:
1. 分區partition
2. Sort根據key排序
3. Combiner進行局部value的合并
## 詳細流程
1. maptask收集我們的map()方法輸出的kv對,放到內存緩沖區中
2. 從內存緩沖區不斷溢出本地磁盤文件,可能會溢出多個文件
3. 多個溢出文件會被合并成大的溢出文件
4. 在溢出過程中,及合并的過程中,都要調用partitoner進行分組和針對key進行排序
5. reducetask根據自己的分區號,去各個maptask機器上取相應的結果分區數據
6. reducetask會取到同一個分區的來自不同maptask的結果文件,reducetask會將這些文件再進行合并(歸并排序)
7. 合并成大文件后,shuffle的過程也就結束了,后面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)
Shuffle中的緩沖區大小會影響到mapreduce程序的執行效率,原則上說,緩沖區越大,磁盤io的次數越少,執行速度就越快
緩沖區的大小可以通過參數調整, 參數:io.sort.mb 默認100M
## 詳細流程示意圖


# MAPREDUCE中的序列化
## 概述
Java的序列化是一個重量級序列化框架(Serializable),一個對象被序列化后,會附帶很多額外的信息(各種校驗信息,header,繼承體系。。。。),不便于在網絡中高效傳輸;
所以,hadoop自己開發了一套序列化機制(Writable),精簡,高效
## Jdk序列化和MR序列化之間的比較
簡單代碼驗證兩種序列化機制的差別:
~~~
public class TestSeri {
public static void main(String[] args) throws Exception {
//定義兩個ByteArrayOutputStream,用來接收不同序列化機制的序列化結果
ByteArrayOutputStream ba = new ByteArrayOutputStream();
ByteArrayOutputStream ba2 = new ByteArrayOutputStream();
//定義兩個DataOutputStream,用于將普通對象進行jdk標準序列化
DataOutputStream dout = new DataOutputStream(ba);
DataOutputStream dout2 = new DataOutputStream(ba2);
ObjectOutputStream obout = new ObjectOutputStream(dout2);
//定義兩個bean,作為序列化的源對象
ItemBeanSer itemBeanSer = new ItemBeanSer(1000L, 89.9f);
ItemBean itemBean = new ItemBean(1000L, 89.9f);
//用于比較String類型和Text類型的序列化差別
Text atext = new Text("a");
// atext.write(dout);
itemBean.write(dout);
byte[] byteArray = ba.toByteArray();
//比較序列化結果
System.out.println(byteArray.length);
for (byte b : byteArray) {
System.out.print(b);
System.out.print(":");
}
System.out.println("-----------------------");
String astr = "a";
// dout2.writeUTF(astr);
obout.writeObject(itemBeanSer);
byte[] byteArray2 = ba2.toByteArray();
System.out.println(byteArray2.length);
for (byte b : byteArray2) {
System.out.print(b);
System.out.print(":");
}
}
}
~~~
## 自定義對象實現MR中的序列化接口
如果需要將自定義的bean放在key中傳輸,則還需要實現comparable接口,因為mapreduce框中的shuffle過程一定會對key進行排序,此時,自定義的bean實現的接口應該是:
`public class FlowBean implements WritableComparable<FlowBean> `
需要自己實現的方法是:
~~~
/**
* 反序列化的方法,反序列化時,從流中讀取到的各個字段的順序應該與序列化時寫出去的順序保持一致
*/
@Override
public void readFields(DataInput in) throws IOException {
upflow = in.readLong();
dflow = in.readLong();
sumflow = in.readLong();
}
/**
* 序列化的方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upflow);
out.writeLong(dflow);
//可以考慮不序列化總流量,因為總流量是可以通過上行流量和下行流量計算出來的
out.writeLong(sumflow);
}
@Override
public int compareTo(FlowBean o) {
//實現按照sumflow的大小倒序排序
return sumflow>o.getSumflow()?-1:1;
}
~~~
# MapReduce與YARN
## YARN概述
Yarn是一個資源調度平臺,負責為運算程序提供服務器運算資源,相當于一個分布式的操作系統平臺,而mapreduce等運算程序則相當于運行于操作系統之上的應用程序
## YARN的重要概念
1. yarn并不清楚用戶提交的程序的運行機制
2. yarn只提供運算資源的調度(用戶程序向yarn申請資源,yarn就負責分配資源)
3. yarn中的主管角色叫ResourceManager
4. yarn中具體提供運算資源的角色叫NodeManager
5. 這樣一來,yarn其實就與運行的用戶程序完全解耦,就意味著yarn上可以運行各種類型的分布式運算程序(mapreduce只是其中的一種),比如mapreduce、storm程序,spark程序,tez ……
6. 所以,spark、storm等運算框架都可以整合在yarn上運行,只要他們各自的框架中有符合yarn規范的資源請求機制即可
7. Yarn就成為一個通用的資源調度平臺,從此,企業中以前存在的各種運算集群都可以整合在一個物理集群上,提高資源利用率,方便數據共享
## Yarn中運行運算程序的示例
mapreduce程序的調度過程,如下圖

- linux
- 常用命令
- 高級文本命令
- 面試題
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推薦
- java高級特性
- 多線程
- 實現線程的三種方式
- 同步關鍵詞
- 讀寫鎖
- 鎖的相關概念
- 多線程的join
- 有三個線程T1 T2 T3,保證順序執行
- java五種線程池
- 守護線程與普通線程
- ThreadLocal
- BlockingQueue消息隊列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty簡介
- 案例一發送字符串
- 案例二發送對象
- 輕量級RPC開發
- 簡介
- spring(IOC/AOP)
- spring初始化順序
- 通過ApplicationContextAware加載Spring上下文
- InitializingBean的作用
- 結論
- 自定義注解
- zk在框架中的應用
- hadoop
- 簡介
- hadoop集群搭建
- hadoop單機安裝
- HDFS簡介
- hdfs基本操作
- hdfs環境搭建
- 常見問題匯總
- hdfs客戶端操作
- mapreduce工作機制
- 案列-單詞統計
- 局部聚合Combiner
- 案列-流量統計(分區,排序,比較)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法實現
- 案例-求topN(分組)
- 自定義inputFormat
- 自定義outputFormat
- 框架運算全流程
- mapreduce的優化方案
- HA機制
- Hive
- 安裝
- DDL操作
- 創建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 嚴格模式
- 數據類型
- shell參數
- 函數
- 內置運算符
- 內置函數
- 自定義函數
- Transform實現
- 特殊分割符處理
- 案例
- 級聯求和accumulate
- flume
- 簡介
- 安裝
- 常用的組件
- 攔截器
- 案例
- 采集目錄到HDFS
- 采集文件到HDFS
- 多個agent串聯
- 日志采集和匯總
- 自定義攔截器
- 高可用配置
- 使用注意
- sqoop
- 安裝
- 數據導入
- 導入數據到HDFS
- 導入關系表到HIVE
- 導入表數據子集
- 增量導入
- 數據導出
- 作業
- 原理
- azkaban
- 簡介
- 安裝
- 案例
- 簡介
- command類型單一job
- command類型多job工作流flow
- HDFS操作任務
- mapreduce任務
- hive腳本任務
- hbase
- 簡介
- 安裝
- 命令行
- 基本CURD
- 過濾器查詢
- 系統架構
- 物理存儲
- 尋址機制
- 讀寫過程
- Region管理
- master工作機制
- 建表高級屬性
- 與mapreduce結合
- 協處理器
- 點擊流平臺開發
- 簡介
- storm
- 簡介
- 安裝
- 集群啟動及任務過程分析
- 單詞統計
- 并行度
- ACK容錯機制
- ACK簡介