# HBase and MapReduce
> 貢獻者:[BridgetLai](https://github.com/BridgetLai)
Apache MapReduce 是 [Apache Hadoop](https://hadoop.apache.org/) 提供的軟件框架,用來進行大規模數據分析.MapReduce 已超出本文檔范圍,可通過如下文檔學習[https://hadoop.apache.org/docs/r2.6.0/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html](https://hadoop.apache.org/docs/r2.6.0/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html). MapReduce version 2 (MR2)目前是[YARN](https://hadoop.apache.org/docs/r2.3.0/hadoop-yarn/hadoop-yarn-site/)的一部分.
本章將討論在 HBase 中使用 MapReduce 處理數據時需要進行的一些特定配置步驟;另外,還將討論 HBase 與 MapReduce jobs 之間的交互以及存在的一些問題;最后將介紹 MapReduce 的[替代 API](http://www.cascading.org/):[Cascading](#cascading).
> `mapred` and `mapreduce`
>
> 與 MapReduce 一樣,在 HBase 中也有 2 種 mapreduce API 包.
>_org.apache.hadoop.hbase.mapred_ and _org.apache.hadoop.hbase.mapreduce_.
> 前者使用舊式風格的 API,后者采用新的模式.相比于前者,后者更加靈活,你可以在舊式 API 中找到等價的.選擇 API 時,請使用 MapReduce 部署時選擇的包.如果不知道如何選擇或者想從頭再來,那就使用 _org.apache.hadoop.hbase.mapreduce_.在接下來的文章中,將使用 _o.a.h.h.mapreduce_ 如果你使用的是 _o.a.h.h.mapred_ 就自行替換.
## 48\. HBase, MapReduce, and the CLASSPATH
默認情況下,部署在 MapReduce 集群中的 MapReduce jobs 沒有權限訪問`$HBASE_CONF_DIR`路徑下的 HBase 配置 或者 HBase classes.
通過以下方式可以為 MapReduce jobs 配置權限.
> 增加 _hbase-site.xml_ 到 _$HADOOP_HOME/conf_
> <br>然后將 HBase jars 添加到 _$HADOOP_HOME/lib_ 目錄下
> <br> 最后需要將這些變更拷貝到 Hadoop 集群中所有服務上.
或者
> 編輯 _$HADOOP_HOME/conf/hadoop-env.sh_ 將 HBase 依賴添加到 `HADOOP_CLASSPATH`.
以上配置均不推薦,因為它會讓 Hadoop 安裝 HBase 的依賴,并且需要重啟 Hadoop 集群才能使用 HBase 中的數據.
推薦的方式是 HBase 使用`HADOOP_CLASSPATH` or `-libjars`添加其依賴的 jar 包.
從 HBase `0.90.x`,HBase 添加依賴 jar 包到任務自身配置中. 依賴項只需要在本地`CLASSPATH`可用,然后被打包部署到 MapReduce 集群的 fat job jar 中.一種取巧的方式是傳遞全量的 HBase classpath(即 hbase,獨立的 jars 還有配置)到 mapreduce job 運行器中令 hbase 工具從全量的 classpath 挑選依賴最終配置到 MapReduce job 的配置中(可以查看源碼實現`TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)`).
下面的例子是在表`usertable`上運行的 HBase 的 MapReduce 任務: 表行數統計任務[RowCounter](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/RowCounter.html).設置在 MapReduce 上下文運行需要的 hbase jars 以及配置文件如 hbase-site.xml 到 `HADOOP_CLASSPATH`. 一定要確保使用了與你的系統相對應的 HBase Jar.替換以下命令中的 VERSION 字段為本地 HBASE 版本. 反引號(\`)使 shell 執行子命令,將`hbase classpath`的輸出設置為`HADOOP_CLASSPATH`. 這個例子需要在 Bash-compatible 執行.
```
$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-VERSION.jar \
org.apache.hadoop.hbase.mapreduce.RowCounter usertable
```
以上命令將啟動一個運行在本地配置指定的 hbase 集群的 mapreduce 作業,用來統計表行數.這個集群也是 Hadoop 配置指定的.
`hbase-mapreduce.jar` 核心是一個驅動,羅列了 HBASE 裝載的一些基礎的 MapReduce 任務.例如,假設你安裝的是`2.0.0-SNAPSHOT`版本:
```
$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-2.0.0-SNAPSHOT.jar
An example program must be given as the first argument.
Valid program names are:
CellCounter: Count cells in HBase table.
WALPlayer: Replay WAL files.
completebulkload: Complete a bulk data load.
copytable: Export a table from local cluster to peer cluster.
export: Write table data to HDFS.
exportsnapshot: Export the specific snapshot to a given FileSystem.
import: Import data written by Export.
importtsv: Import data in TSV format.
rowcounter: Count rows in HBase table.
verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.
```
您可以使用上面列出的 MapReduce 任務的簡寫采用以下命令重新執行表行數統計任務(同樣,假設安裝的 HBASE 是`2.0.0-SNAPSHOT`版本):
```
$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-2.0.0-SNAPSHOT.jar \
rowcounter usertable
```
您可能發現了`hbase mapredcp`工具的輸出; 它列出了在 hbase 運行基礎 mapreduce 作業所需的最小 jar 文件集合(不包括配置,如果希望 MapReduce 作業能準確找到目標集群,則可能需要添加些配置)。 一旦你開始做任何實質性的事情,你還需要添加額外依賴,這些依賴需在運行`hbase mapredcp`時通過傳遞系統屬性`-Dtmpjars`來指定。
對于那些沒有打包依賴的 jobs 或者直接調用`TableMapReduceUtil#addDependencyJars`,則下面的命令格式就非常必要了:
```
$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(${HBASE_HOME}/bin/hbase mapredcp | tr ':' ',') ...
```
如果您是在 HBase 的構建地址而不是安裝地址執行以上示例,您會遇到如下錯誤:
```
java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper
```
如果出現了以上問題,請參照以下命令修改,它從構建環境的 _target/_ 目錄下使用 HBASE jars
```
$ HADOOP_CLASSPATH=${HBASE_BUILD_HOME}/hbase-mapreduce/target/hbase-mapreduce-VERSION-SNAPSHOT.jar:`${HBASE_BUILD_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_BUILD_HOME}/hbase-mapreduce/target/hbase-mapreduce-VERSION-SNAPSHOT.jar rowcounter usertable
```
> Notice to MapReduce users of HBase between 0.96.1 and 0.98.4
>一些 HBase MapReduce 任務啟動失敗,會出現以下類似的異常:
> ```
> Exception in thread "main" java.lang.IllegalAccessError: class
> com.google.protobuf.ZeroCopyLiteralByteString cannot access its superclass
> com.google.protobuf.LiteralByteString
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:792)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at
> org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:818)
> at
> org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.convertScanToString(TableMapReduceUtil.java:433)
> at
> org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:186)
> at
> org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:147)
> at
> org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:270)
> at
> org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:100)
> ...
> ```
>
> 這是[HBASE-9867](https://issues.apache.org/jira/browse/HBASE-9867)無意間增加了一個類加載器依賴引入的優化.
> 這個影響使用`-libjars` 和 'fat jar '的任務,他們將運行時依賴放在在`lib`路徑下.
為了滿足新類加載器需要,`hbase-protocol.jar`必須包含在 Hadoop 的 環境變量下.可通過[HBase, MapReduce, and the CLASSPATH](#hbase.mapreduce.classpath)查閱解決 一些 classpath 錯誤的推薦解決方法.
The following is included for historical purposes.
>
> 在 Hadoop 的 lib 目錄里通過系統連接或者直接拷貝方式引入`hbase-protocol.jar`,可以系統范圍內解決 classpath 問題.
>
>這也可以在每個作業啟動的基礎上實現,方法是在作業提交時將其(`hbase-protocol.jar`)包含在`HADOOP_CLASSPATH`環境變量中。啟動時打包其依賴項,以下所有三個作業啟動命令都滿足此要求
> ```
> $ HADOOP_CLASSPATH=/path/to/hbase-protocol.jar:/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass
> $ HADOOP_CLASSPATH=$(hbase mapredcp):/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass
> $ HADOOP_CLASSPATH=$(hbase classpath) hadoop jar MyJob.jar MyJobMainClass
> ```
>
>下面的命令對于那些不打包自己依賴的 Jar 文件很有必要:
> ```
> $ HADOOP_CLASSPATH=$(hbase mapredcp):/etc/hbase/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(hbase mapredcp | tr ':' ',') ...
> ```
>
> 可以查閱 [HBASE-10304](https://issues.apache.org/jira/browse/HBASE-10304)進行更深入的討論.
## 49\. MapReduce Scan Caching
現在 TableMapReduceUtil 恢復在傳入的 Scan 對象上設置掃描器緩存(在將結果返回到客戶端之前緩存的行數)的選項。由于 HBase 0.95 中的錯誤([HBASE-11558]),此功能丟失 (https://issues.apache.org/jira/browse/HBASE-11558)),修正了 HBase 0.98.5 和 0.96.3。 選擇掃描程序緩存的優先順序如下:
1.在掃描對象上設置的緩存設置。
2.通過配置選項`hbase.client.scanner.caching`指定的緩存設置,可以在 _hbase-site.xml_ 中手動設置,也可以通過輔助方法`TableMapReduceUtil.setScannerCaching()`設置。
3.默認值`HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING`,設置為“100”。
優化緩存設置是平衡客戶端等待結果的時間與客戶端需要接收的結果集數量。 如果緩存設置太大,客戶端可能會等待很長時間,甚至可能超時。 如果設置太小,則需要將結果分多個部分返回。 如果您將掃描看作是鏟子,那么更大的緩存設置相當于更大的鏟子,而更小的緩存設置等價于為了填滿桶而進行更多的鏟動。
上面提到的優先級列表允許您設置一個合理的默認值,也可以根據需要重寫。
有關[Scan](https://hbase.apache.org/apidocs/org/apache/hadoop op/hbase/client/scan.html)的更多細節,請參閱 API 文檔。
## 50\.捆綁的 HBase MapReduce 作業
HBase JAR 還可用作某些捆綁 MapReduce 作業的驅動程序。要了解捆綁的 MapReduce 作業,請運行以下命令。
```
$ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-mapreduce-VERSION.jar
An example program must be given as the first argument.
Valid program names are:
copytable: Export a table from local cluster to peer cluster
completebulkload: Complete a bulk data load.
export: Write table data to HDFS.
import: Import data written by Export.
importtsv: Import data in TSV format.
rowcounter: Count rows in HBase table
```
每個有效的程序名稱都捆綁了 MapReduce 作業。 要運行其中一個作業,請根據以下示例構建命令.
```
$ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-mapreduce-VERSION.jar rowcounter myTable
```
## 51\. HBase 作為 MapReduce 作業數據源和數據接收器
HBase 可以被用作 MapReduce Job 的數據源 [TableInputFormat](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html), 和數據接收器[TableOutputFormat](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.html) or [MultiTableOutputFormat](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.html).編寫對 HBase 讀寫的 MapReduce jbos 時,建議使用子類 [TableMapper](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableMapper.html) 或者 [TableReducer](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableReducer.html). 有關基本用法請參閱不做任何處理的傳遞類 [IdentityTableMapper](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.html) 和 [IdentityTableReducer](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.html) . 對于更復雜的例子, 請參閱 [RowCounter](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/RowCounter.html) 或者查看 `org.apache.hadoop.hbase.mapreduce.TestTableMapReduce` 的單元測試.
如果運行使用 HBase 作為源或接收器的 MapReduce job,則需要在配置中指定源和接收的表和列名稱。
當您從 HBase 讀取時,`TableInputFormat`從 HBase 請求分區列表并生成一個映射,該映射可以是“map-per-region”或“mapreduce.job.maps”映射,以較小者為準。如果您的 job 只有兩個 maps,請將`mapreduce.job.maps`提升到大于分區數的數字。如果您每個節點正在運行 TaskTracer / NodeManager 和 RegionServer,則 Maps 將在相鄰的 TaskTracker / NodeManager 上運行。寫入 HBase 時,避免 Reduce 步驟并從 Map 中寫回 HBase 可能是有意義的。當您的作業不需要 MapReduce 對 map 發出的數據進行排序和整理時,此方法有效。
在插入時,對于 HBase'排序',除非您需要,否則沒有必要對您的 MapReduce 集群進行雙重排序(以及對數據進行移動)。如果您不需要 Reduce,您的 map 可能會在作業結束時輸出為了報告而處理的記錄數,或者將 Reduced 數設置為零并使用 TableOutputFormat。如果在您的情況下運行 Reduce 步驟是有意義的,您通常應該使用多個 reducer,以便負載分布在 HBase 集群中。
新的 HBase 分區程序[HRegionPartitioner](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.html)可以盡可能多的減少運行分區的數量。當您的表很大時,HRegionPartitioner 是合適的,并且您的上傳在完成后不會大大改變現有區域的數量。否則請使用默認分區程序。
## 52\. 批量導入時直接寫 HFiles
如果要導入新表,則可以繞過 HBase API 并將內容直接寫入文件系統,格式化為 HBase 數據文件(HFiles)。您的導入將運行得更快,可能會快一個數量級。有關此機制如何工作的更多信息,請參閱[Bulk Loading](#arch.bulk.load)。
## 53\. 行數統計的例子
包含 [RowCounter](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/RowCounter.html) 的 MapReduce 作業使用 `TableInputFormat` 并且 統計了指定表格的行數.請使用以下命令運行:
```
$ ./bin/hadoop jar hbase-X.X.X.jar
```
這將調用 HBase MapReduce 驅動程序類。請從提供的 jobs 中選擇`rowcounter`。這將打印 rowcounter 使用建議到標準輸出。指定表名,要計數的列和輸出目錄。如果您有類路徑錯誤,請參閱 [HBase, MapReduce, and the CLASSPATH](#hbase.mapreduce.classpath).
## 54\. Map 任務 拆分
### 54.1\. HBase 默認的 MapReduce 拆分器
當[TableInputFormat](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html)在 MapReduce job 中被用做獲取 HBase 表時,它的拆分器將為表的每個分區指定一個 map 任務.因此如果表中有 100 個分區,無論要掃描多少列,都會為該任務 拆分出 100 個 map 任務.
### 54.2\. 自定義拆分器
如果對自定義拆分器感興趣,請參閱[TableInputFormatBase](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.html)中的`getSplits`方法,它是 map 任務拆分邏輯所在.
## 55\. HBase MapReduce 示例
### 55.1\. HBase MapReduce 讀示例
以下是以只讀方式使用 HBase 作為 MapReduce 源的示例。 具體來說,有一個 Mapper 實例但沒有 Reducer,并且 Mapper 沒有發出任何內容。 Job 將定義如下......
```
Configuration config = HBaseConfiguration.create();
Job job = new Job(config, "ExampleRead");
job.setJarByClass(MyReadJob.class); // class that contains mapper
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
...
TableMapReduceUtil.initTableMapperJob(
tableName, // input HBase table name
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper
null, // mapper output key
null, // mapper output value
job);
job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
```
…?并且 mapper 實例將繼承 [TableMapper](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableMapper.html)…?
```
public static class MyMapper extends TableMapper<Text, Text> {
public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
// process data for the row from the Result instance.
}
}
```
### 55.2\. HBase MapReduce 讀/寫示例
以下 HBase 既作為源也作為 MapReduce 的接收器的示例。 此示例將簡單地將數據從一個表復制到另一個表。
```
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleReadWrite");
job.setJarByClass(MyReadWriteJob.class); // class that contains mapper
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
null, // mapper output key
null, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
targetTable, // output table
null, // reducer class
job);
job.setNumReduceTasks(0);
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
```
很有必要解釋一下`TableMapReduceUtil`的作用是什么,尤其是 reducer. [TableOutputFormat](https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.html) 被用作 outputFormat class ,一些參數已經進行了配置,例如`TableOutputFormat.OUTPUT_TABLE`,同時設置了 reducer 的 output key 為`TableOutputFormat.OUTPUT_TABLE` 并且 value 為`Writable`. 這些配置項可以由開發工程師在 job 和配置文件中進行設置,`TableMapReduceUtil`試圖將這些工作進行簡化.
下面是一個 mapper 的例子,它將創建一個"Put" ,匹配輸入的"Result "并輸出.而這些工作正是 CopyTable 工具的作用.
```
public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
// this example is just copying the data from the source table...
context.write(row, resultToPut(row,value));
}
private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
Put put = new Put(key.get());
for (KeyValue kv : result.raw()) {
put.add(kv);
}
return put;
}
}
```
這實際上并不是一個 reducer 過程, 所以由`TableOutputFormat` 負責將'Put'發送到目標表.
這只是一個例子,開發人員可以選擇不使用`TableOutputFormat`并自行鏈接到目標表.
### 55.3\. HBase MapReduce 多表輸出的讀寫示例
TODO: `MultiTableOutputFormat` 樣例.
### 55.4\. HBase MapReduce 匯總到 HBase 示例
以下示例使用 HBase 作為 MapReduce 源并接收匯總信息。此示例將計算表中某個 value 的不同實例的數量,并將這些匯總計數寫入另一個表中。
```
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class); // class that contains mapper and reducer
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
Text.class, // mapper output key
IntWritable.class, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
targetTable, // output table
MyTableReducer.class, // reducer class
job);
job.setNumReduceTasks(1); // at least one, adjust as required
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
```
在示例中的 mapper 在一個 String 類型的 value 上進行匯總操作,并將 value作為 mapper 輸出的 key,`IntWritable`表示實例計數器。
```
public static class MyMapper extends TableMapper<Text, IntWritable> {
public static final byte[] CF = "cf".getBytes();
public static final byte[] ATTR1 = "attr1".getBytes();
private final IntWritable ONE = new IntWritable(1);
private Text text = new Text();
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
String val = new String(value.getValue(CF, ATTR1));
text.set(val); // we can only emit Writables...
context.write(text, ONE);
}
}
```
在 reducer 中,計算“ones”(就像執行此操作的任何其他 MR 示例一樣),然后發出“Put”。
```
public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
public static final byte[] CF = "cf".getBytes();
public static final byte[] COUNT = "count".getBytes();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int i = 0;
for (IntWritable val : values) {
i += val.get();
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(CF, COUNT, Bytes.toBytes(i));
context.write(null, put);
}
}
```
### 55.5\. HBase MapReduce 文件匯總示例
這與上面的匯總示例很相似,不同之處在于該匯總使用 HBase 作為 MapReduce 的數據源而使用 HDFS 作為接收器.這樣的不同體現在 job 啟動和 reduce 過程,而 mapper 過程沒有區別.
```
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummaryToFile");
job.setJarByClass(MySummaryFileJob.class); // class that contains mapper and reducer
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
Text.class, // mapper output key
IntWritable.class, // mapper output value
job);
job.setReducerClass(MyReducer.class); // reducer class
job.setNumReduceTasks(1); // at least one, adjust as required
FileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile")); // adjust directories as required
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
```
如上所述,本示例的中的 mappper 與上例無異,至于 Reducer,則采用一個'通用'的而不是繼承自 TableMapper 并且發出 Puts.
```
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int i = 0;
for (IntWritable val : values) {
i += val.get();
}
context.write(key, new IntWritable(i));
}
}
```
### 55.6 不使用 Reducer ,HBase MapReduce 匯總到 HBase
如果使用 HBase 作為 Reducer,也可以在沒有 Reducer 的情況下進行匯總.
匯總任務要求 HBase 目標表存在.表方法`incrementColumnValue`將被用作值的原子增長.從性能角度看,為每個 map-task 中那些會值增長的值保留一個 Map,并且在 mapper 執行`cleanup` 方法時每個 key 更新一次,這可能是有意義的.但是,您的里程可能會根據要處理的行數和惟一鍵的不同而有所不同。
最后,匯總結果在 HBase 中.
### 55.7\. HBase MapReduce 匯總到 RDBMS
有時,為 RDBMS 生成摘要更合適。對于這些情況,可以通過自定義 reducer 直接生成 RDBMS 的摘要。 `setup`方法可以連接到 RDBMS(連接信息可以通過上下文中的自定義參數傳遞),清理方法可以關閉連接。
一個 job 的 reducer 數量對匯總實現至關重要,您將必須將其設計到 reducer 中.具體來說,不管被設計成一個 reducer 還是多個 reducer,這沒有對錯之分,完全依賴于您的用例.指定給 job 的 reducer 越多,與 RDMS 建立的實時鏈接越多,這可以在一定程度上提高吞吐量.
```
public static class MyRdbmsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private Connection c = null;
public void setup(Context context) {
// create DB connection...
}
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// do summarization
// in this example the keys are Text, but this is just an example
}
public void cleanup(Context context) {
// close db connection
}
}
```
最終,匯總結果寫入到 RDMS 表中.
## 56\. 在 MapReduce 任務中訪問其他 HBase 表
盡管目前框架允許一個 HBase 表作為 MapReduce 作業的輸入,但其他 HBase 表只可以通過作為查找表(lookup tables)才能訪問,例如在 MapReduce 作業中通過 mapper 的 setup 方法創建 Table 實例.
```
public class MyMapper extends TableMapper<Text, LongWritable> {
private Table myOtherTable;
public void setup(Context context) {
// In here create a Connection to the cluster and save it or use the Connection
// from the existing table
myOtherTable = connection.getTable("myOtherTable");
}
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
// process Result...
// use 'myOtherTable' for lookups
}
```
## 57\. 推測執行(Speculative Execution)
通常建議關閉以 HBASE 為數據源的 MapReduce 作業的推測執行(speculative execution).關閉推測執行可以通過設置單個任務的屬性,也可以設置整個集群.特對是對于執行時間較長的任務,推測執行(speculative execution)為其創建一個重復任務,將進行雙重數據寫入,這可能不是你想要的.
查閱 [spec.ex](#spec.ex) 獲取更多信息.
## 58\. 級聯(Cascading)
[[Cascading](http://www.cascading.org/)是 MapReduce 的替代 API,本質上使用 MapReduce,但允許您以簡化的方式編寫 MapReduce 代碼。
下面的示例顯示了一個 Cascading“Flow”,它將數據“匯入”到 HBase 集群中。 同樣的`hBaseTap` API 也可用于“獲取”數據。
```
// read data from the default filesystem
// emits two fields: "offset" and "line"
Tap source = new Hfs( new TextLine(), inputFileLhs );
// store data in an HBase cluster
// accepts fields "num", "lower", and "upper"
// will automatically scope incoming fields to their proper familyname, "left" or "right"
Fields keyFields = new Fields( "num" );
String[] familyNames = {"left", "right"};
Fields[] valueFields = new Fields[] {new Fields( "lower" ), new Fields( "upper" ) };
Tap hBaseTap = new HBaseTap( "multitable", new HBaseScheme( keyFields, familyNames, valueFields ), SinkMode.REPLACE );
// a simple pipe assembly to parse the input into fields
// a real app would likely chain multiple Pipes together for more complex processing
Pipe parsePipe = new Each( "insert", new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ), " " ) );
// "plan" a cluster executable Flow
// this connects the source Tap and hBaseTap (the sink Tap) to the parsePipe
Flow parseFlow = new FlowConnector( properties ).connect( source, hBaseTap, parsePipe );
// start the flow, and block until complete
parseFlow.complete();
// open an iterator on the HBase table we stuffed data into
TupleEntryIterator iterator = parseFlow.openSink();
while(iterator.hasNext())
{
// print out each tuple from HBase
System.out.println( "iterator.next() = " + iterator.next() );
}
iterator.close();
```
- HBase? 中文參考指南 3.0
- Preface
- Getting Started
- Apache HBase Configuration
- Upgrading
- The Apache HBase Shell
- Data Model
- HBase and Schema Design
- RegionServer Sizing Rules of Thumb
- HBase and MapReduce
- Securing Apache HBase
- Architecture
- In-memory Compaction
- Backup and Restore
- Synchronous Replication
- Apache HBase APIs
- Apache HBase External APIs
- Thrift API and Filter Language
- HBase and Spark
- Apache HBase Coprocessors
- Apache HBase Performance Tuning
- Troubleshooting and Debugging Apache HBase
- Apache HBase Case Studies
- Apache HBase Operational Management
- Building and Developing Apache HBase
- Unit Testing HBase Applications
- Protobuf in HBase
- Procedure Framework (Pv2): HBASE-12439
- AMv2 Description for Devs
- ZooKeeper
- Community
- Appendix