[TOC]
# MAPREDUCE原理
1) Mapreduce是一個分布式運算程序的編程框架,是用戶開發“基于hadoop的數據分析應用”的核心框架;
2) Mapreduce核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,并發運行在一個hadoop集群上;
## 1 為什么要MAPREDUCE
1) 海量數據在單機上處理因為硬件資源限制,無法勝任
2) 而一旦將單機版程序擴展到集群來分布式運行,將極大增加程序的復雜度和開發難度
3) 引入mapreduce框架后,開發人員可以將絕大部分工作集中在業務邏輯的開發上,而將分布式計算中的復雜性交由框架來處理
> 設想一個海量數據場景下的wordcount需求:
> 單機版:內存受限,磁盤受限,運算能力受限
> 分布式:
1. 文件分布式存儲(HDFS)
2. 運算邏輯需要至少分成2個階段(一個階段獨立并發,一個階段匯聚)
3. 運算程序如何分發
4. 程序如何分配運算任務(切片)
5. 兩階段的程序如何啟動?如何協調?
6. 整個程序運行過程中的監控?容錯?重試?
> 可見在程序由單機版擴成分布式時,會引入大量的復雜工作。為了提高開發效率,可以將分布式程序中的公共功能封裝成框架,讓開發人員可以將精力集中于業務邏輯。
> 而mapreduce就是這樣一個分布式程序的通用框架,其應對以上問題的整體結構如下:
1. MRAppMaster(mapreduce application master)
2. MapTask
3. ReduceTask
## 2 MAPREDUCE框架結構及核心運行機制
### 2.1 結構
> 一個完整的mapreduce程序在分布式運行時有三類實例進程:
1) MRAppMaster:負責整個程序的過程調度及狀態協調
2) mapTask:負責map階段的整個數據處理流程
3) ReduceTask:負責reduce階段的整個數據處理流程
### 2.2 MR程序運行流程
#### 2.2.1 流程示意圖

#### 2.2.2 流程解析
1) 一個mr程序啟動的時候,最先啟動的是MRAppMaster,MRAppMaster啟動后根據本次job的描述信息,計算出需要的maptask實例數量,然后向集群申請機器啟動相應數量的maptask進程
2) maptask進程啟動之后,根據給定的數據切片范圍進行數據處理,主體流程為:
a) 利用客戶指定的inputformat來獲取RecordReader讀取數據,形成輸入KV對
b) 將輸入KV對傳遞給客戶定義的map()方法,做邏輯運算,并將map()方法輸出的KV對收集到緩存
c) 將緩存中的KV對按照K分區排序后不斷溢寫到磁盤文件
3) MRAppMaster監控到所有maptask進程任務完成之后,會根據客戶指定的參數啟動相應數量的reducetask進程,并告知reducetask進程要處理的數據范圍(數據分區)
4) Reducetask進程啟動之后,根據MRAppMaster告知的待處理數據所在位置,從若干臺maptask運行所在機器上獲取到若干個maptask輸出結果文件,并在本地進行重新歸并排序,然后按照相同key的KV為一個組,調用客戶定義的reduce()方法進行邏輯運算,并收集運算輸出的結果KV,然后調用客戶指定的outputformat將結果數據輸出到外部存儲
## 3 MapTask并行度決定機制
> maptask的并行度決定map階段的任務處理并發度,進而影響到整個job的處理速度
> 那么,mapTask并行實例是否越多越好呢?其并行度又是如何決定呢?
### 3.1 mapTask并行度的決定機制
> 一個job的map階段并行度由客戶端在提交job時決定
> 而客戶端對map階段并行度的規劃的基本邏輯為:
> 將待處理數據執行邏輯切片(即按照一個特定切片大小,將待處理數據劃分成邏輯上的多個split),然后每一個split分配一個mapTask并行實例處理
> 這段邏輯及形成的切片規劃描述文件,由FileInputFormat實現類的getSplits()方法完成,其過程如下圖:

### 3.2 FileInputFormat切片機制
1) 切片定義在InputFormat類中的getSplit()方法
2) FileInputFormat中默認的切片機制:
a) 簡單地按照文件的內容長度進行切片
b) 切片大小,默認等于block大小
c) 切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片
> 比如待處理數據有兩個文件:
~~~
file1.txt 320M
file2.txt 10M
~~~
> 經過FileInputFormat的切片機制運算后,形成的切片信息如下:
~~~
file1.txt.split1-- 0~128
file1.txt.split2-- 128~256
file1.txt.split3-- 256~320
file2.txt.split1-- 0~10M
~~~
3) FileInputFormat中切片的大小的參數配置
> 通過分析源碼,在FileInputFormat中,計算切片大小的邏輯:
~~~
Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由這幾個值來運算決定
minsize:默認值:1
配置參數: mapreduce.input.fileinputformat.split.minsize
maxsize:默認值:Long.MAXValue
配置參數:mapreduce.input.fileinputformat.split.maxsize
blocksize
因此,默認情況下,切片大小=blocksize
maxsize(切片最大值):
參數如果調得比blocksize小,則會讓切片變小,而且就等于配置的這個參數的值
minsize (切片最小值):
參數調的比blockSize大,則可以讓切片變得比blocksize還大
但是,不論怎么調參數,都不能讓多個小文件“劃入”一個split
~~~
> 選擇并發數的影響因素:
1) 運算節點的硬件配置
2) 運算任務的類型:CPU密集型還是IO密集型
3) 運算任務的數據量
## 4 map并行度的經驗之談
> 如果硬件配置為2*12core + 64G,恰當的map并行度是大約每個節點20-100個map,最好每個map的執行時間至少一分鐘。
1) 如果job的每個map或者 reduce task的運行時間都只有30-40秒鐘,那么就減少該job的map或者reduce數,每一個task(map|reduce)的setup和加入到調度器中進行調度,這個中間的過程可能都要花費幾秒鐘,所以如果每個task都非常快就跑完了,就會在task的開始和結束的時候浪費太多的時間。
> 配置task的JVM重用可以改善該問題:
> (mapred.job.reuse.jvm.num.tasks,默認是1,表示一個JVM上最多可以順序執行的task
> 數目(屬于同一個Job)是1。也就是說一個task啟一個JVM)
2) 如果input的文件非常的大,比如1TB,可以考慮將hdfs上的每個block size設大,比如設成256MB或者512MB
## 5 ReduceTask并行度的決定
> reducetask的并行度同樣影響整個job的執行并發度和執行效率,但與maptask的并發數由切片數決定不同,Reducetask數量的決定是可以直接手動設置:
~~~
//默認值是1,手動設置為4
job.setNumReduceTasks(4);
~~~
> 如果數據分布不均勻,就有可能在reduce階段產生數據傾斜
> 注意: reducetask數量并不是任意設置,還要考慮業務邏輯需求,有些情況下,需要計算全局匯總結果,就只能有1個reducetask
> 盡量不要運行太多的reduce task。對大多數job來說,最好rduce的個數最多和集群中的reduce持平,或者比集群的 reduce slots小。這個對于小集群而言,尤其重要。
## 6 MAPREDUCE程序運行演示
> Hadoop的發布包中內置了一個hadoop-mapreduce-example-2.4.1.jar,這個jar包中有各種MR示例程序,可以通過以下步驟運行:
~~~
啟動hdfs,yarn
~~~
> 然后在集群中的任意一臺服務器上啟動執行程序(比如運行wordcount):
~~~
hadoop jar hadoop-mapreduce-example-2.4.1.jar wordcount /wordcount/data /wordcount/out
~~~
## 7 MAPREDUCE中的Combiner
1. combiner是MR程序中Mapper和Reducer之外的一種組件
2. combiner組件的父類就是Reducer
3. combiner和reducer的區別在于運行的位置:
Combiner是在每一個maptask所在的節點運行
Reducer是接收全局所有Mapper的輸出結果;
4) combiner的意義就是對每一個maptask的輸出進行局部匯總,以減小網絡傳輸量
具體實現步驟:
1) 自定義一個combiner繼承Reducer,重寫reduce方法
2) 在job中設置: job.setCombinerClass(CustomCombiner.class)
5) combiner能夠應用的前提是不能影響最終的業務邏輯而且,combiner的輸出kv應該跟reducer的輸入kv類型要對應起來
## 8 mapreduce的shuffle機制
### 8.1 概述:
* mapreduce中,map階段處理的數據如何傳遞給reduce階段,是mapreduce框架中最關鍵的一個流程,這個流程就叫shuffle;
* shuffle: 洗牌、發牌——(核心機制:數據分區,排序,緩存);
* 具體來說:就是將maptask輸出的處理結果數據,分發給reducetask,并在分發的過程中,對數據按key進行了分區和排序;
### 8.2 主要流程:
> Shuffle緩存流程:

> shuffle是MR處理流程中的一個過程,它的每一個處理步驟是分散在各個map task和reduce task節點上完成的,整體來看,分為3個操作:
1) 分區partition
2) Sort根據key排序
3) Combiner進行局部value的合并
### 8.3 詳細流程
1) maptask收集我們的map()方法輸出的kv對,放到內存緩沖區中
2) 從內存緩沖區不斷溢出本地磁盤文件,可能會溢出多個文件
3) 多個溢出文件會被合并成大的溢出文件
4) 在溢出過程中,及合并的過程中,都要調用partitoner進行分組和針對key進行排序
5) reducetask根據自己的分區號,去各個maptask機器上取相應的結果分區數據
6) reducetask會取到同一個分區的來自不同maptask的結果文件,reducetask會將這些文件再進行合并(歸并排序)
7) 合并成大文件后,shuffle的過程也就結束了,后面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)
> Shuffle中的緩沖區大小會影響到mapreduce程序的執行效率,原則上說,緩沖區越大,磁盤io的次數越少,執行速度就越快
> 緩沖區的大小可以通過參數調整, 參數:io.sort.mb 默認100M
### 8.4 詳細流程示意圖

## 9 MAPREDUCE中的序列化
### 9.1 概述
> Java的序列化是一個重量級序列化框架(Serializable),一個對象被序列化后,會附帶很多額外的信息(各種校驗信息,header,繼承體系。。。。),不便于在網絡中高效傳輸;
> 所以,hadoop自己開發了一套序列化機制(Writable),精簡,高效
### 9.2 Jdk序列化和MR序列化之間的比較
> 簡單代碼驗證兩種序列化機制的差別:
~~~
public class TestSeri {
public static void main(String[] args) throws Exception {
//定義兩個ByteArrayOutputStream,用來接收不同序列化機制的序列化結果
ByteArrayOutputStream ba = new ByteArrayOutputStream();
ByteArrayOutputStream ba2 = new ByteArrayOutputStream();
//定義兩個DataOutputStream,用于將普通對象進行jdk標準序列化
DataOutputStream dout = new DataOutputStream(ba);
DataOutputStream dout2 = new DataOutputStream(ba2);
ObjectOutputStream obout = new ObjectOutputStream(dout2);
//定義兩個bean,作為序列化的源對象
ItemBeanSer itemBeanSer = new ItemBeanSer(1000L, 89.9f);
ItemBean itemBean = new ItemBean(1000L, 89.9f);
//用于比較String類型和Text類型的序列化差別
Text atext = new Text("a");
// atext.write(dout);
itemBean.write(dout);
byte[] byteArray = ba.toByteArray();
//比較序列化結果
System.out.println(byteArray.length);
for (byte b : byteArray) {
System.out.print(b);
System.out.print(":");
}
System.out.println("-----------------------");
String astr = "a";
// dout2.writeUTF(astr);
obout.writeObject(itemBeanSer);
byte[] byteArray2 = ba2.toByteArray();
System.out.println(byteArray2.length);
for (byte b : byteArray2) {
System.out.print(b);
System.out.print(":");
}
}
}
~~~
### 9.3 自定義對象實現MR中的序列化接口
> 如果需要將自定義的bean放在key中傳輸,則還需要實現comparable接口,因為mapreduce框中的shuffle過程一定會對key進行排序,此時,自定義的bean實現的接口應該是:
~~~
public class FlowBean implements WritableComparable<FlowBean>
~~~
> 需要自己實現的方法是:
~~~
/**
* 反序列化的方法,反序列化時,從流中讀取到的各個字段的順序應該與序列化時寫出去的順序保持一致
*/
@Override
public void readFields(DataInput in) throws IOException {
upflow = in.readLong();
dflow = in.readLong();
sumflow = in.readLong();
}
/**
* 序列化的方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upflow);
out.writeLong(dflow);
//可以考慮不序列化總流量,因為總流量是可以通過上行流量和下行流量計算出來的
out.writeLong(sumflow);
}
@Override
public int compareTo(FlowBean o) {
//實現按照sumflow的大小倒序排序
return sumflow>o.getSumflow()?-1:1;
}
~~~
## 10 MapReduce與YARN
### 10.1 YARN概述
> Yarn是一個資源調度平臺,負責為運算程序提供服務器運算資源,相當于一個分布式的操作系統平臺,而mapreduce等運算程序則相當于運行于操作系統之上的應用程序
### 10.2 YARN的重要概念
1) yarn并不清楚用戶提交的程序的運行機制
2) yarn只提供運算資源的調度(用戶程序向yarn申請資源,yarn就負責分配資源)
3) yarn中的主管角色叫ResourceManager
4) yarn中具體提供運算資源的角色叫NodeManager
5) 這樣一來,yarn其實就與運行的用戶程序完全解耦,就意味著yarn上可以運行各種類型的分布式運算程序(mapreduce只是其中的一種),比如mapreduce、storm程序,spark程序,tez ……
6) 所以,spark、storm等運算框架都可以整合在yarn上運行,只要他們各自的框架中有符合yarn規范的資源請求機制即可
7) Yarn就成為一個通用的資源調度平臺,從此,企業中以前存在的各種運算集群都可以整合在一個物理集群上,提高資源利用率,方便數據共享
### 10.3 Yarn中運行運算程序的示例
> mapreduce程序的調度過程,如下圖

# MAPREDUCE實踐篇
## 1 MAPREDUCE 示例編寫及編程規范
### 1.1 編程規范
1. 用戶編寫的程序分成三個部分:Mapper,Reducer,Driver(提交運行mr程序的客戶端)
2. Mapper的輸入數據是KV對的形式(KV的類型可自定義)
3. Mapper的輸出數據是KV對的形式(KV的類型可自定義)
4. Mapper中的業務邏輯寫在map()方法中
5. map()方法(maptask進程)對每一個<K,V>調用一次
6. Reducer的輸入數據類型對應Mapper的輸出數據類型,也是KV
7. Reducer的業務邏輯寫在reduce()方法中
8. Reducetask進程對每一組相同k的<k,v>組調用一次reduce()方法
9. 用戶自定義的Mapper和Reducer都要繼承各自的父類
10. 整個程序需要一個Drvier來進行提交,提交的是一個描述了各種必要信息的job對象
### 1.2 wordcount示例編寫
> 需求:在一堆給定的文本文件中統計輸出每一個單詞出現的總次數
1) 定義一個mapper類
~~~
//首先要定義四個泛型的類型
//keyin: LongWritable valuein: Text
//keyout: Text valueout:IntWritable
~~~
~~~
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
//map方法的生命周期: 框架每傳一行數據就被調用一次
//key : 這一行的起始點在文件中的偏移量
//value: 這一行的內容
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//拿到一行數據轉換為string
String line = value.toString();
//將這一行切分出各個單詞
String[] words = line.split(" ");
//遍歷數組,輸出<單詞,1>
for(String word:words){
context.write(new Text(word), new IntWritable(1));
}
}
}
~~~
2) 定義一個reducer類
~~~
//生命周期:框架每傳遞進來一個kv 組,reduce方法被調用一次
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//定義一個計數器
int count = 0;
//遍歷這一組kv的所有v,累加到count中
for(IntWritable value:values){
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
~~~
3) 定義一個主類,用來描述job并提交job
~~~
public class WordCountRunner {
//把業務邏輯相關的信息(哪個是mapper,哪個是reducer,要處理的數據在哪里,輸出的結果放哪里……)描述成一個job對象
//把這個描述好的job提交給集群去運行
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job wcjob = Job.getInstance(conf);
//指定我這個job所在的jar包
// wcjob.setJar("/home/hadoop/wordcount.jar");
wcjob.setJarByClass(WordCountRunner.class);
wcjob.setMapperClass(WordCountMapper.class);
wcjob.setReducerClass(WordCountReducer.class);
//設置我們的業務邏輯Mapper類的輸出key和value的數據類型
wcjob.setMapOutputKeyClass(Text.class);
wcjob.setMapOutputValueClass(IntWritable.class);
//設置我們的業務邏輯Reducer類的輸出key和value的數據類型
wcjob.setOutputKeyClass(Text.class);
wcjob.setOutputValueClass(IntWritable.class);
//指定要處理的數據所在的位置
FileInputFormat.setInputPaths(wcjob, "hdfs://hdp-server01:9000/wordcount/data/big.txt");
//指定處理完成之后的結果所保存的位置
FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://hdp-server01:9000/wordcount/output/"));
//向yarn集群提交這個job
boolean res = wcjob.waitForCompletion(true);
System.exit(res?0:1);
}
~~~
## 2 MAPREDUCE程序運行模式
### 2.1 本地運行模式
1. mapreduce程序是被提交給LocalJobRunner在本地以單進程的形式運行
2. 而處理的數據及輸出結果可以在本地文件系統,也可以在hdfs上
3. 怎樣實現本地運行?寫一個程序,不要帶集群的配置文件(本質是你的mr程序的conf中是否有mapreduce.framework.name=local以及yarn.resourcemanager.hostname參數)
4. 本地模式非常便于進行業務邏輯的debug,只要在eclipse中打斷點即可
> 如果在windows下想運行本地模式來測試程序邏輯,需要在windows中配置環境變量:
~~~
%HADOOP_HOME% = d:/hadoop-2.6.1
%PATH% = %HADOOP_HOME%\bin
并且要將d:/hadoop-2.6.1的lib和bin目錄替換成windows平臺編譯的版本
~~~
### 2.2 集群運行模式
1. 將mapreduce程序提交給yarn集群resourcemanager,分發到很多的節點上并發執行
2. 處理的數據和輸出結果應該位于hdfs文件系統
3. 提交集群的實現步驟:
A) 、將程序打成JAR包,然后在集群的任意一個節點上用hadoop命令啟動
$ hadoop jar wordcount.jar cn.itcast.bigdata.mrsimple.WordCountDriver inputpath outputpath
B) 、直接在linux的eclipse中運行main方法
(項目中要帶參數:mapreduce.framework.name=yarn以及yarn的兩個基本配置)
C)、 如果要在windows的eclipse中提交job給集群,則要修改YarnRunner類
> mapreduce程序在集群中運行時的大體流程:

> 附:在windows平臺上訪問hadoop時改變自身身份標識的方法之二:

## 3 Mapreduce中的排序初步
### 3.1 需求
> 對日志數據中的上下行流量信息匯總,并輸出按照總流量倒序排序的結果
> 數據如下:
~~~
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
~~~
### 3.2 分析
> 基本思路:實現自定義的bean來封裝流量信息,并將bean作為map輸出的key來傳輸
> MR程序在處理數據的過程中會對數據排序(map輸出的kv對傳輸到reduce之前,會排序),排序的依據是map輸出的key,所以,我們如果要實現自己需要的排序規則,則可以考慮將排序因素放到key中,讓key實現接口:WritableComparable 然后重寫key的compareTo方法
### 3.3 實現
1) 自定義的bean
~~~
public class FlowBean implements WritableComparable<FlowBean>{
long upflow;
long downflow;
long sumflow;
//如果空參構造函數被覆蓋,一定要顯示定義一下,否則在反序列時會拋異常
public FlowBean(){}
public FlowBean(long upflow, long downflow) {
super();
this.upflow = upflow;
this.downflow = downflow;
this.sumflow = upflow + downflow;
}
public long getSumflow() {
return sumflow;
}
public void setSumflow(long sumflow) {
this.sumflow = sumflow;
}
public long getUpflow() {
return upflow;
}
public void setUpflow(long upflow) {
this.upflow = upflow;
}
public long getDownflow() {
return downflow;
}
public void setDownflow(long downflow) {
this.downflow = downflow;
}
//序列化,將對象的字段信息寫入輸出流
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upflow);
out.writeLong(downflow);
out.writeLong(sumflow);
}
//反序列化,從輸入流中讀取各個字段信息
@Override
public void readFields(DataInput in) throws IOException {
upflow = in.readLong();
downflow = in.readLong();
sumflow = in.readLong();
}
@Override
public String toString() {
return upflow + "\t" + downflow + "\t" + sumflow;
}
@Override
public int compareTo(FlowBean o) {
//自定義倒序比較規則
return sumflow > o.getSumflow() ? -1:1;
}
}
2、 mapper 和 reducer
public class FlowCount {
static class FlowCountMapper extends Mapper<LongWritable, Text, FlowBean,Text > {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
try {
String phonenbr = fields[0];
long upflow = Long.parseLong(fields[1]);
long dflow = Long.parseLong(fields[2]);
FlowBean flowBean = new FlowBean(upflow, dflow);
context.write(flowBean,new Text(phonenbr));
} catch (Exception e) {
e.printStackTrace();
}
}
}
static class FlowCountReducer extends Reducer<FlowBean,Text,Text, FlowBean> {
@Override
protected void reduce(FlowBean bean, Iterable<Text> phonenbr, Context context) throws IOException, InterruptedException {
Text phoneNbr = phonenbr.iterator().next();
context.write(phoneNbr, bean);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowCount.class);
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
~~~
## 4 Mapreduce中的分區Partitioner
### 4.1 需求
根據歸屬地輸出流量統計數據結果到不同文件,以便于在查詢統計結果時可以定位到省級范圍進行
### 4.2 分析
> Mapreduce中會將map輸出的kv對,按照相同key分組,然后分發給不同的reducetask
> 默認的分發規則為:根據key的hashcode%reducetask數來分發
> 所以:如果要按照我們自己的需求進行分組,則需要改寫數據分發(分組)組件Partitioner
> 自定義一個CustomPartitioner繼承抽象類:Partitioner
> 然后在job對象中,設置自定義partitioner: job.setPartitionerClass(CustomPartitioner.class)
### 4.3 實現
~~~
/**
* 定義自己的從map到reduce之間的數據(分組)分發規則 按照手機號所屬的省份來分發(分組)ProvincePartitioner
* 默認的分組組件是HashPartitioner
*
* @author
*
*/
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();
static {
provinceMap.put("135", 0);
provinceMap.put("136", 1);
provinceMap.put("137", 2);
provinceMap.put("138", 3);
provinceMap.put("139", 4);
}
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
Integer code = provinceMap.get(key.toString().substring(0, 3));
return code == null ? 5 : code;
}
}
~~~
## 5 mapreduce數據壓縮
### 5.1 概述
> 這是mapreduce的一種優化策略:通過壓縮編碼對mapper或者reducer的輸出進行壓縮,以減少磁盤IO,提高MR程序運行速度(但相應增加了cpu運算負擔)
1) Mapreduce支持將map輸出的結果或者reduce輸出的結果進行壓縮,以減少網絡IO或最終輸出數據的體積
2) 壓縮特性運用得當能提高性能,但運用不當也可能降低性能
3) 基本原則:
* 運算密集型的job,少用壓縮
* IO密集型的job,多用壓縮
### 5.2 MR支持的壓縮編碼

### 5.3 Reducer輸出壓縮
> 在配置參數或在代碼中都可以設置reduce的輸出壓縮
1) 在配置參數中設置
~~~
mapreduce.output.fileoutputformat.compress=false
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
mapreduce.output.fileoutputformat.compress.type=RECORD
~~~
2) 在代碼中設置
~~~
Job job = Job.getInstance(conf);
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName(""));
~~~
5.4 Mapper輸出壓縮
> 在配置參數或在代碼中都可以設置reduce的輸出壓縮
1) 在配置參數中設置
~~~
mapreduce.map.output.compress=false
mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
~~~
2) 在代碼中設置:
~~~
conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class);
~~~
### 5.5 壓縮文件的讀取
> Hadoop自帶的InputFormat類內置支持壓縮文件的讀取,比如TextInputformat類,在其initialize方法中:
~~~
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
//根據文件后綴名創建相應壓縮編碼的codec
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
//判斷是否屬于可切片壓縮編碼類型
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
//如果是可切片壓縮編碼,則創建一個CompressedSplitLineReader讀取壓縮數據
in = new CompressedSplitLineReader(cIn, job,
this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
//如果是不可切片壓縮編碼,則創建一個SplitLineReader讀取壓縮數據,并將文件輸入流轉換成解壓數據流傳遞給普通SplitLineReader讀取
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
//如果不是壓縮文件,則創建普通SplitLineReader讀取數據
in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);
filePosition = fileIn;
}
~~~
## 6 更多MapReduce編程案例
### 6.1 reduce端join算法實現
#### 1 需求:
> 訂單數據表t_order:
| id | date| pid | amount|
| --- | --- | --- | --- |
| 1001| 20150710| P0001| 2|
| 1002| 20150710| P0001| 3|
| 1002| 20150710| P0002| 3|
> 商品信息表t_product
| id | pname | category_id| price |
| --- | --- | --- | --- |
| P0001 | 小米5 | 1000 | 2|
| P0002| 錘子T1 | 1000 | 3|
> 假如數據量巨大,兩表的數據是以文件的形式存儲在HDFS中,需要用mapreduce程序來實現一下SQL查詢運算:
~~~
select a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id
~~~
#### 2 實現機制:
> 通過將關聯的條件作為map輸出的key,將兩表滿足join條件的數據并攜帶數據所來源的文件信息,發往同一個reduce task,在reduce中進行數據的串聯
~~~
public class OrderJoin {
static class OrderJoinMapper extends Mapper<LongWritable, Text, Text, OrderJoinBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 拿到一行數據,并且要分辨出這行數據所屬的文件
String line = value.toString();
String[] fields = line.split("\t");
// 拿到itemid
String itemid = fields[0];
// 獲取到這一行所在的文件名(通過inpusplit)
String name = "你拿到的文件名";
// 根據文件名,切分出各字段(如果是a,切分出兩個字段,如果是b,切分出3個字段)
OrderJoinBean bean = new OrderJoinBean();
bean.set(null, null, null, null, null);
context.write(new Text(itemid), bean);
}
}
static class OrderJoinReducer extends Reducer<Text, OrderJoinBean, OrderJoinBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<OrderJoinBean> beans, Context context) throws IOException, InterruptedException {
//拿到的key是某一個itemid,比如1000
//拿到的beans是來自于兩類文件的bean
// {1000,amount} {1000,amount} {1000,amount} --- {1000,price,name}
//將來自于b文件的bean里面的字段,跟來自于a的所有bean進行字段拼接并輸出
}
}
}
~~~
> 缺點:這種方式中,join的操作是在reduce階段完成,reduce端的處理壓力太大,map節點的運算負載則很低,資源利用率不高,且在reduce階段極易產生數據傾斜
> 解決方案: map端join實現方式
### 6.2 map端join算法實現
#### 1 原理闡述
1) 適用于關聯表中有小表的情形;
2) 可以將小表分發到所有的map節點,這樣,map節點就可以在本地對自己所讀到的大表數據進行join并輸出最終結果,可以大大提高join操作的并發度,加快處理速度
#### 2實現示例
> --先在mapper類中預先定義好小表,進行join
> --引入實際場景中的解決方案:一次加載數據庫或者用distributedcache
~~~
public class TestDistributedCache {
static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{
FileReader in = null;
BufferedReader reader = null;
HashMap<String,String> b_tab = new HashMap<String, String>();
String localpath =null;
String uirpath = null;
//是在map任務初始化的時候調用一次
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//通過這幾句代碼可以獲取到cache file的本地絕對路徑,測試驗證用
Path[] files = context.getLocalCacheFiles();
localpath = files[0].toString();
URI[] cacheFiles = context.getCacheFiles();
//緩存文件的用法——直接用本地IO來讀取
//這里讀的數據是map task所在機器本地工作目錄中的一個小文件
in = new FileReader("b.txt");
reader =new BufferedReader(in);
String line =null;
while(null!=(line=reader.readLine())){
String[] fields = line.split(",");
b_tab.put(fields[0],fields[1]);
}
IOUtils.closeStream(reader);
IOUtils.closeStream(in);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//這里讀的是這個map task所負責的那一個切片數據(在hdfs上)
String[] fields = value.toString().split("\t");
String a_itemid = fields[0];
String a_amount = fields[1];
String b_name = b_tab.get(a_itemid);
// 輸出結果 1001 98.9 banan
context.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name ));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(TestDistributedCache.class);
job.setMapperClass(TestDistributedCacheMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//這里是我們正常的需要處理的數據所在路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//不需要reducer
job.setNumReduceTasks(0);
//分發一個文件到task進程的工作目錄
job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));
//分發一個歸檔文件到task進程的工作目錄
// job.addArchiveToClassPath(archive);
//分發jar包到task節點的classpath下
// job.addFileToClassPath(jarfile);
job.waitForCompletion(true);
}
}
~~~
### 6.3 web日志預處理
#### 1、需求:
1. 對web訪問日志中的各字段識別切分
2. 去除日志中不合法的記錄
3. 根據KPI統計需求,生成各類訪問請求過濾數據
#### 2、實現代碼:
a)定義一個bean,用來記錄日志數據中的各數據字段
~~~
public class WebLogBean {
private String remote_addr;// 記錄客戶端的ip地址
private String remote_user;// 記錄客戶端用戶名稱,忽略屬性"-"
private String time_local;// 記錄訪問時間與時區
private String request;// 記錄請求的url與http協議
private String status;// 記錄請求狀態;成功是200
private String body_bytes_sent;// 記錄發送給客戶端文件主體內容大小
private String http_referer;// 用來記錄從那個頁面鏈接訪問過來的
private String http_user_agent;// 記錄客戶瀏覽器的相關信息
private boolean valid = true;// 判斷數據是否合法
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getRemote_user() {
return remote_user;
}
public void setRemote_user(String remote_user) {
this.remote_user = remote_user;
}
public String getTime_local() {
return time_local;
}
public void setTime_local(String time_local) {
this.time_local = time_local;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getBody_bytes_sent() {
return body_bytes_sent;
}
public void setBody_bytes_sent(String body_bytes_sent) {
this.body_bytes_sent = body_bytes_sent;
}
public String getHttp_referer() {
return http_referer;
}
public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
}
public String getHttp_user_agent() {
return http_user_agent;
}
public void setHttp_user_agent(String http_user_agent) {
this.http_user_agent = http_user_agent;
}
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.valid);
sb.append("\001").append(this.remote_addr);
sb.append("\001").append(this.remote_user);
sb.append("\001").append(this.time_local);
sb.append("\001").append(this.request);
sb.append("\001").append(this.status);
sb.append("\001").append(this.body_bytes_sent);
sb.append("\001").append(this.http_referer);
sb.append("\001").append(this.http_user_agent);
return sb.toString();
}
}
~~~
b) 定義一個parser用來解析過濾web訪問日志原始記錄
~~~
public class WebLogParser {
public static WebLogBean parser(String line) {
WebLogBean webLogBean = new WebLogBean();
String[] arr = line.split(" ");
if (arr.length > 11) {
webLogBean.setRemote_addr(arr[0]);
webLogBean.setRemote_user(arr[1]);
webLogBean.setTime_local(arr[3].substring(1));
webLogBean.setRequest(arr[6]);
webLogBean.setStatus(arr[8]);
webLogBean.setBody_bytes_sent(arr[9]);
webLogBean.setHttp_referer(arr[10]);
if (arr.length > 12) {
webLogBean.setHttp_user_agent(arr[11] + " " + arr[12]);
} else {
webLogBean.setHttp_user_agent(arr[11]);
}
if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP錯誤
webLogBean.setValid(false);
}
} else {
webLogBean.setValid(false);
}
return webLogBean;
}
public static String parserTime(String time) {
time.replace("/", "-");
return time;
}
}
~~~
c) mapreduce程序
~~~
public class WeblogPreProcess {
static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Text k = new Text();
NullWritable v = NullWritable.get();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
WebLogBean webLogBean = WebLogParser.parser(line);
if (!webLogBean.isValid())
return;
k.set(webLogBean.toString());
context.write(k, v);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WeblogPreProcess.class);
job.setMapperClass(WeblogPreProcessMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
~~~
- hadoop
- linux基礎
- Linux入門
- Linux進階
- shell
- Zookeeper
- Zookeeper簡介及部署
- Zookeeper使用及API
- Redis
- Redis簡介安裝部署
- Redis使用及API
- Java高級增強
- Java多線程增強
- Maven簡介及搭建
- Hive
- Hive簡介及安裝
- Hive操作
- HIve常用函數
- Hive數據類型
- Flume
- Flume簡介及安裝
- flume 攔截器(interceptor)
- azkaban
- azKaban簡介及安裝
- Sqoop
- Sqoop簡介及安裝
- HDFS
- HDFS原理
- HDFS操作API
- MAPREDUCE原理
- MAPREDUCE圖片資源
- MAPREDUCE加強
- HBASE
- HBASE簡介及安裝
- HBASE操作及API
- HBASE內部原理
- Storm
- Storm簡介及安裝
- Storm原理
- kafka
- kafka簡介及安裝
- kafka常用操作及API
- kafka原理
- kafka配置詳解
- Scala
- Scala簡介及安裝
- Scala基礎語法
- Scala實戰