[TOC]
# MapReduce重要配置參數
## 資源相關參數
**以下參數是在用戶自己的mr應用程序中配置就可以生效**
1. **mapreduce.map.memory.mb**: 一個Map Task可使用的資源上限(單位:MB),默認為1024。如果Map Task實際使用的資源量超過該值,則會被強制殺死。
2. **mapreduce.reduce.memory.mb**: 一個Reduce Task可使用的資源上限(單位:MB),默認為1024。如果Reduce Task實際使用的資源量超過該值,則會被強制殺死。
3. **mapreduce.map.cpu.vcores**: 每個Map task可使用的最多cpu core數目, 默認值: 1
4. **mapreduce.reduce.cpu.vcores**: 每個Reduce task可使用的最多cpu core數目, 默認值: 1
5. `mapreduce.map.java.opts`: Map Task的JVM參數,你可以在此配置默認的java heap size等參數, e.g.
`“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc” `(@taskid@會被Hadoop框架自動換為相應的taskid), 默認值: “”
6. `mapreduce.reduce.java.opts`: Reduce Task的JVM參數,你可以在此配置默認的java heap size等參數, e.g.
`“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc”`, 默認值: “”
**應該在yarn啟動之前就配置在服務器的配置文件中才能生效**
7. `yarn.scheduler.minimum-allocation-mb` 1024 給應用程序container分配的最小內存
8. `yarn.scheduler.maximum-allocation-mb` 8192 給應用程序container分配的最大內存
9. `yarn.scheduler.minimum-allocation-vcores` 1
10. `yarn.scheduler.maximum-allocation-vcores` 32
11. `yarn.nodemanager.resource.memory-mb` 8192
**shuffle性能優化的關鍵參數,應在yarn啟動之前就配置好**
12. mapreduce.task.io.sort.mb 100 ` //shuffle的環形緩沖區大小,默認100m`
14. mapreduce.map.sort.spill.percent 0.8 `//環形緩沖區溢出的閾值,默認80%`
## 容錯相關參數
1. `mapreduce.map.maxattempts`: 每個Map Task最大重試次數,一旦重試參數超過該值,則認為Map Task運行失敗,默認值:4。
2. `mapreduce.reduce.maxattempts`: 每個Reduce Task最大重試次數,一旦重試參數超過該值,則認為Map Task運行失敗,默認值:4。
3. `mapreduce.map.failures.maxpercent`: 當失敗的Map Task失敗比例超過該值為,整個作業則失敗,默認值為0. 如果你的應用程序允許丟棄部分輸入數據,則該該值設為一個大于0的值,比如5,表示如果有低于5%的Map Task失敗(如果一個Map Task重試次數超過mapreduce.map.maxattempts,則認為這個Map Task失敗,其對應的輸入數據將不會產生任何結果),整個作業扔認為成功。
4. `mapreduce.reduce.failures.maxpercent`: 當失敗的Reduce Task失敗比例超過該值為,整個作業則失敗,默認值為0.
5. `mapreduce.task.timeout`: Task超時時間,經常需要設置的一個參數,該參數表達的意思為:如果一個task在一定時間內沒有任何進入,即不會讀取新的數據,也沒有輸出數據,則認為該task處于block狀態,可能是卡住了,也許永遠會卡主,為了防止因為用戶程序永遠block住不退出,則強制設置了一個該超時時間(單位毫秒),默認是300000。如果你的程序對每條輸入數據的處理時間過長(比如會訪問數據庫,通過網絡拉取數據等),建議將該參數調大,該參數過小常出現的錯誤提示是`“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”`
## 本地運行mapreduce 作業
設置以下幾個參數:
~~~
mapreduce.framework.name=local
mapreduce.jobtracker.address=local
fs.defaultFS=local
~~~
## 效率和穩定性相關參數
1. mapreduce.map.speculative: 是否為Map Task打開推測執行機制,默認為false
2. mapreduce.reduce.speculative: 是否為Reduce Task打開推測執行機制,默認為false
3. mapreduce.job.user.classpath.first & mapreduce.task.classpath.user.precedence:當同一個class同時出現在用戶jar包和hadoop jar中時,優先使用哪個jar包中的class,默認為false,表示優先使用hadoop jar中的class。
4. mapreduce.input.fileinputformat.split.minsize: FileInputFormat做切片時的最小切片大小,
5. mapreduce.input.fileinputformat.split.maxsize: FileInputFormat做切片時的最大切片大小
(切片的默認大小就等于blocksize,即 134217728)
# 全局計數器
在實際生產代碼中,常常需要將數據處理過程中遇到的不合規數據行進行全局計數,類似這種需求可以借助mapreduce框架中提供的全局計數器來實現

這個統計是全局的
# 多job串聯
一個稍復雜點的處理邏輯往往需要多個mapreduce程序串聯處理,多job的串聯可以借助mapreduce框架的JobControl實現
1. 我們可以用shell腳本,根據狀態返回,來決定下一步的shell執行還是不執行
2. 可以設置多個job他們的依賴關系
~~~
ControlledJob cJob1 = new ControlledJob(job1.getConfiguration());
ControlledJob cJob2 = new ControlledJob(job2.getConfiguration());
ControlledJob cJob3 = new ControlledJob(job3.getConfiguration());
cJob1.setJob(job1);
cJob2.setJob(job2);
cJob3.setJob(job3);
// 設置作業依賴關系,job2執行依賴job1,job3依賴job2
cJob2.addDependingJob(cJob1);
cJob3.addDependingJob(cJob2);
//設置JobControl,里面放一個組名
JobControl jobControl = new JobControl("RecommendationJob");
jobControl.addJob(cJob1);
jobControl.addJob(cJob2);
jobControl.addJob(cJob3);
// 新建一個線程來運行已加入JobControl中的作業,開始進程并等待結束
Thread jobControlThread = new Thread(jobControl);
jobControlThread.start();
//判斷是不是已經finish了,沒有finish就繼續執行
while (!jobControl.allFinished()) {
Thread.sleep(500);
}
jobControl.stop();
return 0;
~~~
# 數據壓縮
## 概述
這是mapreduce的一種優化策略:通過壓縮編碼對mapper或者reducer的輸出進行壓縮,以減少磁盤IO,提高MR程序運行速度(但相應增加了cpu運算負擔)
1. Mapreduce支持將map輸出的結果或者reduce輸出的結果進行壓縮,以減少網絡IO或最終輸出數據的體積
2. 壓縮特性運用得當能提高性能,但運用不當也可能降低性能
3. 基本原則:
運算密集型的job,少用壓縮
IO密集型的job,多用壓縮
## MR支持的壓縮編碼

## Reducer輸出壓縮
在配置參數或在代碼中都可以設置reduce的輸出壓縮
1. 在配置參數中設置
~~~
mapreduce.output.fileoutputformat.compress=false
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
mapreduce.output.fileoutputformat.compress.type=RECORD
~~~
2. 在代碼中設置
~~~
Job job = Job.getInstance(conf);
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, (Class<? extends CompressionCodec>) Class.forName(""));
~~~
## Mapper輸出壓縮
在配置參數或在代碼中都可以設置reduce的輸出壓縮
1. 在配置參數中設置
~~~
mapreduce.map.output.compress=false
mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
~~~
2. 在代碼中設置:
~~~
conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class);
~~~
## 壓縮文件的讀取(源碼)
Hadoop自帶的InputFormat類內置支持壓縮文件的讀取,比如TextInputformat類,在其initialize方法中:
~~~
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
//根據文件后綴名創建相應壓縮編碼的codec
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
//判斷是否屬于可切片壓縮編碼類型
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
//如果是可切片壓縮編碼,則創建一個CompressedSplitLineReader讀取壓縮數據
in = new CompressedSplitLineReader(cIn, job,
this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
//如果是不可切片壓縮編碼,則創建一個SplitLineReader讀取壓縮數據,并將文件輸入流轉換成解壓數據流傳遞給普通SplitLineReader讀取
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
//如果不是壓縮文件,則創建普通SplitLineReader讀取數據
in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);
filePosition = fileIn;
}
~~~
- linux
- 常用命令
- 高級文本命令
- 面試題
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推薦
- java高級特性
- 多線程
- 實現線程的三種方式
- 同步關鍵詞
- 讀寫鎖
- 鎖的相關概念
- 多線程的join
- 有三個線程T1 T2 T3,保證順序執行
- java五種線程池
- 守護線程與普通線程
- ThreadLocal
- BlockingQueue消息隊列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty簡介
- 案例一發送字符串
- 案例二發送對象
- 輕量級RPC開發
- 簡介
- spring(IOC/AOP)
- spring初始化順序
- 通過ApplicationContextAware加載Spring上下文
- InitializingBean的作用
- 結論
- 自定義注解
- zk在框架中的應用
- hadoop
- 簡介
- hadoop集群搭建
- hadoop單機安裝
- HDFS簡介
- hdfs基本操作
- hdfs環境搭建
- 常見問題匯總
- hdfs客戶端操作
- mapreduce工作機制
- 案列-單詞統計
- 局部聚合Combiner
- 案列-流量統計(分區,排序,比較)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法實現
- 案例-求topN(分組)
- 自定義inputFormat
- 自定義outputFormat
- 框架運算全流程
- mapreduce的優化方案
- HA機制
- Hive
- 安裝
- DDL操作
- 創建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 嚴格模式
- 數據類型
- shell參數
- 函數
- 內置運算符
- 內置函數
- 自定義函數
- Transform實現
- 特殊分割符處理
- 案例
- 級聯求和accumulate
- flume
- 簡介
- 安裝
- 常用的組件
- 攔截器
- 案例
- 采集目錄到HDFS
- 采集文件到HDFS
- 多個agent串聯
- 日志采集和匯總
- 自定義攔截器
- 高可用配置
- 使用注意
- sqoop
- 安裝
- 數據導入
- 導入數據到HDFS
- 導入關系表到HIVE
- 導入表數據子集
- 增量導入
- 數據導出
- 作業
- 原理
- azkaban
- 簡介
- 安裝
- 案例
- 簡介
- command類型單一job
- command類型多job工作流flow
- HDFS操作任務
- mapreduce任務
- hive腳本任務
- hbase
- 簡介
- 安裝
- 命令行
- 基本CURD
- 過濾器查詢
- 系統架構
- 物理存儲
- 尋址機制
- 讀寫過程
- Region管理
- master工作機制
- 建表高級屬性
- 與mapreduce結合
- 協處理器
- 點擊流平臺開發
- 簡介
- storm
- 簡介
- 安裝
- 集群啟動及任務過程分析
- 單詞統計
- 并行度
- ACK容錯機制
- ACK簡介