[TOC]
# MAPREDUCE加強
## 1. 自定義inputFormat
### 1.1 需求
> 無論hdfs還是mapreduce,對于小文件都有損效率,實踐中,又難免面臨處理大量小文件的場景,此時,就需要有相應解決方案
### 1.2 分析
> 小文件的優化無非以下幾種方式:
1) 在數據采集的時候,就將小文件或小批數據合成大文件再上傳HDFS
2) 在業務處理之前,在HDFS上使用mapreduce程序對小文件進行合并
3) 在mapreduce處理時,可采用combineInputFormat提高效率
### 1.3 實現
> 本節實現的是上述第二種方式
> 程序的核心機制:
> 自定義一個InputFormat
> 改寫RecordReader,實現一次讀取一個完整文件封裝為KV
> 在輸出時使用SequenceFileOutPutFormat輸出合并文件
> 代碼如下:
> 自定義InputFromat
~~~
public class WholeFileInputFormat extends
FileInputFormat<NullWritable, BytesWritable> {
//設置每個小文件不可分片,保證一個小文件生成一個key-value鍵值對
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
}
~~~
> 自定義RecordReader
~~~
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException,
InterruptedException {
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException,
InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException {
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException {
// do nothing
}
}
~~~
> 定義mapreduce處理流程
~~~
public class SmallFilesToSequenceFileConverter extends Configured implements
Tool {
static class SequenceFileMapper extends
Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
private Text filenameKey;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
InputSplit split = context.getInputSplit();
Path path = ((FileSplit) split).getPath();
filenameKey = new Text(path.toString());
}
@Override
protected void map(NullWritable key, BytesWritable value,
Context context) throws IOException, InterruptedException {
context.write(filenameKey, value);
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
System.setProperty("HADOOP_USER_NAME", "hdfs");
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: combinefiles <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf,"combine small files to sequencefile");
// job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
job.setMapperClass(SequenceFileMapper.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(),
args);
System.exit(exitCode);
}
}
~~~
## 2. 自定義outputFormat
### 2.1 需求
> 現有一些原始日志需要做增強解析處理,流程:
1) 從原始日志文件中讀取數據
2) 根據日志中的一個URL字段到外部知識庫中獲取信息增強到原始日志
3) 如果成功增強,則輸出到增強結果目錄;如果增強失敗,則抽取原始數據中URL字段輸出到待爬清單目錄
### 2.2 分析
> 程序的關鍵點是要在一個mapreduce程序中根據數據的不同輸出兩類結果到不同目錄,這類靈活的輸出需求可以通過自定義outputformat來實現
### 2.3 實現
> 實現要點:
1) 在mapreduce中訪問外部資源
2) 自定義outputformat,改寫其中的recordwriter,改寫具體輸出數據的方法write()
> 代碼實現如下:
> 數據庫獲取數據的工具
~~~
public class DBLoader {
public static void dbLoader(HashMap<String, String> ruleMap) {
Connection conn = null;
Statement st = null;
ResultSet res = null;
try {
Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection("jdbc:mysql://hdp-node01:3306/urlknowledge", "root", "root");
st = conn.createStatement();
res = st.executeQuery("select url,content from urlcontent");
while (res.next()) {
ruleMap.put(res.getString(1), res.getString(2));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try{
if(res!=null){
res.close();
}
if(st!=null){
st.close();
}
if(conn!=null){
conn.close();
}
}catch(Exception e){
e.printStackTrace();
}
}
}
public static void main(String[] args) {
DBLoader db = new DBLoader();
HashMap<String, String> map = new HashMap<String,String>();
db.dbLoader(map);
System.out.println(map.size());
}
}
~~~
> 自定義一個outputformat
~~~
public class LogEnhancerOutputFormat extends FileOutputFormat<Text, NullWritable>{
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(context.getConfiguration());
Path enhancePath = new Path("hdfs://hdp-node01:9000/flow/enhancelog/enhanced.log");
Path toCrawlPath = new Path("hdfs://hdp-node01:9000/flow/tocrawl/tocrawl.log");
FSDataOutputStream enhanceOut = fs.create(enhancePath);
FSDataOutputStream toCrawlOut = fs.create(toCrawlPath);
return new MyRecordWriter(enhanceOut,toCrawlOut);
}
static class MyRecordWriter extends RecordWriter<Text, NullWritable>{
FSDataOutputStream enhanceOut = null;
FSDataOutputStream toCrawlOut = null;
public MyRecordWriter(FSDataOutputStream enhanceOut, FSDataOutputStream toCrawlOut) {
this.enhanceOut = enhanceOut;
this.toCrawlOut = toCrawlOut;
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
//有了數據,你來負責寫到目的地 —— hdfs
//判斷,進來內容如果是帶tocrawl的,就往待爬清單輸出流中寫 toCrawlOut
if(key.toString().contains("tocrawl")){
toCrawlOut.write(key.toString().getBytes());
}else{
enhanceOut.write(key.toString().getBytes());
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
if(toCrawlOut!=null){
toCrawlOut.close();
}
if(enhanceOut!=null){
enhanceOut.close();
}
}
}
}
~~~
> 開發mapreduce處理流程
~~~
/**
* 這個程序是對每個小時不斷產生的用戶上網記錄日志進行增強(將日志中的url所指向的網頁內容分析結果信息追加到每一行原始日志后面)
*
* @author
*
*/
public class LogEnhancer {
static class LogEnhancerMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
HashMap<String, String> knowledgeMap = new HashMap<String, String>();
/**
* maptask在初始化時會先調用setup方法一次 利用這個機制,將外部的知識庫加載到maptask執行的機器內存中
*/
@Override
protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException {
DBLoader.dbLoader(knowledgeMap);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = StringUtils.split(line, "\t");
try {
String url = fields[26];
// 對這一行日志中的url去知識庫中查找內容分析信息
String content = knowledgeMap.get(url);
// 根據內容信息匹配的結果,來構造兩種輸出結果
String result = "";
if (null == content) {
// 輸往待爬清單的內容
result = url + "\t" + "tocrawl\n";
} else {
// 輸往增強日志的內容
result = line + "\t" + content + "\n";
}
context.write(new Text(result), NullWritable.get());
} catch (Exception e) {
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogEnhancer.class);
job.setMapperClass(LogEnhancerMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 要將自定義的輸出格式組件設置到job中
job.setOutputFormatClass(LogEnhancerOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 雖然我們自定義了outputformat,但是因為我們的outputformat繼承自fileoutputformat
// 而fileoutputformat要輸出一個_SUCCESS文件,所以,在這還得指定一個輸出目錄
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
System.exit(0);
}
}
~~~
## 3. 自定義GroupingComparator
### 3.1 需求
> 有如下訂單數據
| 訂單id | 商品id | 成交金額
| --- | --- |--- |
| Order_0000001| Pdt_01| 222.8|
| Order_0000001| Pdt_05| 25.8|
| Order_0000002| Pdt_03| 522.8|
| Order_0000002| Pdt_04| 122.4|
| Order_0000002| Pdt_05| 722.4|
| Order_0000003| Pdt_01| 222.8|
> 現在需要求出每一個訂單中成交金額最大的一筆交易
### 3.2 分析
1) 利用“訂單id和成交金額”作為key,可以將map階段讀取到的所有訂單數據按照id分區,按照金額排序,發送到reduce
2) 在reduce端利用groupingcomparator將訂單id相同的kv聚合成組,然后取第一個即是最大值
### 3.3 實現
> 自定義groupingcomparator
~~~
/**
* 用于控制shuffle過程中reduce端對kv對的聚合邏輯
* @author duanhaitao@itcast.cn
*
*/
public class ItemidGroupingComparator extends WritableComparator {
protected ItemidGroupingComparator() {
super(OrderBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean abean = (OrderBean) a;
OrderBean bbean = (OrderBean) b;
//將item_id相同的bean都視為相同,從而聚合為一組
return abean.getItemid().compareTo(bbean.getItemid());
}
}
~~~
> 定義訂單信息bean
~~~
/**
* 訂單信息bean,實現hadoop的序列化機制
* @author duanhaitao@itcast.cn
*
*/
public class OrderBean implements WritableComparable<OrderBean>{
private Text itemid;
private DoubleWritable amount;
public OrderBean() {
}
public OrderBean(Text itemid, DoubleWritable amount) {
set(itemid, amount);
}
public void set(Text itemid, DoubleWritable amount) {
this.itemid = itemid;
this.amount = amount;
}
public Text getItemid() {
return itemid;
}
public DoubleWritable getAmount() {
return amount;
}
@Override
public int compareTo(OrderBean o) {
int cmp = this.itemid.compareTo(o.getItemid());
if (cmp == 0) {
cmp = -this.amount.compareTo(o.getAmount());
}
return cmp;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(itemid.toString());
out.writeDouble(amount.get());
}
@Override
public void readFields(DataInput in) throws IOException {
String readUTF = in.readUTF();
double readDouble = in.readDouble();
this.itemid = new Text(readUTF);
this.amount= new DoubleWritable(readDouble);
}
@Override
public String toString() {
return itemid.toString() + "\t" + amount.get();
}
}
~~~
> 編寫mapreduce處理流程
~~~
/**
* 利用secondarysort機制輸出每種item訂單金額最大的記錄
* @author duanhaitao@itcast.cn
*
*/
public class SecondarySort {
static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
OrderBean bean = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = StringUtils.split(line, "\t");
bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));
context.write(bean, NullWritable.get());
}
}
static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
//在設置了groupingcomparator以后,這里收到的kv數據 就是: <1001 87.6>,null <1001 76.5>,null ....
//此時,reduce方法中的參數key就是上述kv組中的第一個kv的key:<1001 87.6>
//要輸出同一個item的所有訂單中最大金額的那一個,就只要輸出這個key
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SecondarySort.class);
job.setMapperClass(SecondarySortMapper.class);
job.setReducerClass(SecondarySortReducer.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//指定shuffle所使用的GroupingComparator類
job.setGroupingComparatorClass(ItemidGroupingComparator.class);
//指定shuffle所使用的partitioner類
job.setPartitionerClass(ItemIdPartitioner.class);
job.setNumReduceTasks(3);
job.waitForCompletion(true);
}
}
~~~
## 4. Mapreduce中的DistributedCache應用
### 4.1 Map端join案例
#### 4.1.1 需求
> 實現兩個“表”的join操作,其中一個表數據量小,一個表很大,這種場景在實際中非常常見,比如“訂單日志” join “產品信息”
#### 4.1.2 分析
> --原理闡述
1. 適用于關聯表中有小表的情形;
2. 可以將小表分發到所有的map節點,這樣,map節點就可以在本地對自己所讀到的大表數據進行join并輸出最終結果
3. 可以大大提高join操作的并發度,加快處理速度
1. --示例:先在mapper類中預先定義好小表,進行join
2. --并用distributedcache機制將小表的數據分發到每一個maptask執行節點,從而每一個maptask節點可以從本地加載到小表的數據,進而在本地即可實現join
#### 4.1.3 實現
~~~
public class TestDistributedCache {
static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{
FileReader in = null;
BufferedReader reader = null;
HashMap<String,String> b_tab = new HashMap<String, String>();
String localpath =null;
String uirpath = null;
//是在map任務初始化的時候調用一次
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//通過這幾句代碼可以獲取到cache file的本地絕對路徑,測試驗證用
Path[] files = context.getLocalCacheFiles();
localpath = files[0].toString();
URI[] cacheFiles = context.getCacheFiles();
//緩存文件的用法——直接用本地IO來讀取
//這里讀的數據是map task所在機器本地工作目錄中的一個小文件
in = new FileReader("b.txt");
reader =new BufferedReader(in);
String line =null;
while(null!=(line=reader.readLine())){
String[] fields = line.split(",");
b_tab.put(fields[0],fields[1]);
}
IOUtils.closeStream(reader);
IOUtils.closeStream(in);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//這里讀的是這個map task所負責的那一個切片數據(在hdfs上)
String[] fields = value.toString().split("\t");
String a_itemid = fields[0];
String a_amount = fields[1];
String b_name = b_tab.get(a_itemid);
// 輸出結果 1001 98.9 banan
context.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name ));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(TestDistributedCache.class);
job.setMapperClass(TestDistributedCacheMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//這里是我們正常的需要處理的數據所在路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//不需要reducer
job.setNumReduceTasks(0);
//分發一個文件到task進程的工作目錄
job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));
//分發一個歸檔文件到task進程的工作目錄
// job.addArchiveToClassPath(archive);
//分發jar包到task節點的classpath下
// job.addFileToClassPath(jarfile);
job.waitForCompletion(true);
}
}
~~~
## 5. Mapreduce的其他補充
### 5.1 計數器應用
> 在實際生產代碼中,常常需要將數據處理過程中遇到的不合規數據行進行全局計數,類似這種需求可以借助mapreduce框架中提供的全局計數器來實現
> 示例代碼如下:
~~~
public class MultiOutputs {
//通過枚舉形式定義自定義計數器
enum MyCounter{MALFORORMED,NORMAL}
static class CommaMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(",");
for (String word : words) {
context.write(new Text(word), new LongWritable(1));
}
//對枚舉定義的自定義計數器加1
context.getCounter(MyCounter.MALFORORMED).increment(1);
//通過動態設置自定義計數器加1
context.getCounter("counterGroupa", "countera").increment(1);
}
}
~~~
?
### 5.2 多job串聯
> 一個稍復雜點的處理邏輯往往需要多個mapreduce程序串聯處理,多job的串聯可以借助mapreduce框架的JobControl實現
> 示例代碼:
~~~
ControlledJob cJob1 = new ControlledJob(job1.getConfiguration());
ControlledJob cJob2 = new ControlledJob(job2.getConfiguration());
ControlledJob cJob3 = new ControlledJob(job3.getConfiguration());
cJob1.setJob(job1);
cJob2.setJob(job2);
cJob3.setJob(job3);
// 設置作業依賴關系
cJob2.addDependingJob(cJob1);
cJob3.addDependingJob(cJob2);
JobControl jobControl = new JobControl("RecommendationJob");
jobControl.addJob(cJob1);
jobControl.addJob(cJob2);
jobControl.addJob(cJob3);
// 新建一個線程來運行已加入JobControl中的作業,開始進程并等待結束
Thread jobControlThread = new Thread(jobControl);
jobControlThread.start();
while (!jobControl.allFinished()) {
Thread.sleep(500);
}
jobControl.stop();
return 0;
~~~
### 5.3 Configuration對象高級應用
## 6. mapreduce參數優化
### 1.MapReduce重要配置參數
#### 1.1 資源相關參數
> //以下參數是在用戶自己的mr應用程序中配置就可以生效
1. mapreduce.map.memory.mb: 一個Map Task可使用的資源上限(單位:MB),默認為1024。如果Map Task實際使用的資源量超過該值,則會被強制殺死。
2. mapreduce.reduce.memory.mb: 一個Reduce Task可使用的資源上限(單位:MB),默認為1024。如果Reduce Task實際使用的資源量超過該值,則會被強制殺死。
3. mapreduce.map.cpu.vcores: 每個Map task可使用的最多cpu core數目, 默認值: 1
4. mapreduce.reduce.cpu.vcores: 每個Reduce task可使用的最多cpu core數目, 默認值: 1
5. mapreduce.map.java.opts: Map Task的JVM參數,你可以在此配置默認的java heap size等參數, e.g.“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc” (@taskid@會被Hadoop框架自動換為相應的taskid), 默認值: “”
6. mapreduce.reduce.java.opts: Reduce Task的JVM參數,你可以在此配置默認的java heap size等參數, e.g.“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc”, 默認值: “”
> //應該在yarn啟動之前就配置在服務器的配置文件中才能生效
7) yarn.scheduler.minimum-allocation-mb 1024 給應用程序container分配的最小內存
8) yarn.scheduler.maximum-allocation-mb 8192 給應用程序container分配的最大內存
9) yarn.scheduler.minimum-allocation-vcores 1
10) yarn.scheduler.maximum-allocation-vcores 32
11) yarn.nodemanager.resource.memory-mb 8192
> //shuffle性能優化的關鍵參數,應在yarn啟動之前就配置好
12) mapreduce.task.io.sort.mb 100 //shuffle的環形緩沖區大小,默認100m
13) mapreduce.map.sort.spill.percent 0.8 //環形緩沖區溢出的閾值,默認80%
#### 1.2 容錯相關參數
1) mapreduce.map.maxattempts: 每個Map Task最大重試次數,一旦重試參數超過該值,則認為Map Task運行失敗,默認值:4。
2) mapreduce.reduce.maxattempts: 每個Reduce Task最大重試次數,一旦重試參數超過該值,則認為Map Task運行失敗,默認值:4。
3) mapreduce.map.failures.maxpercent: 當失敗的Map Task失敗比例超過該值為,整個作業則失敗,默認值為0. 如果你的應用程序允許丟棄部分輸入數據,則該該值設為一個大于0的值,比如5,表示如果有低于5%的Map Task失敗(如果一個Map Task重試次數超過mapreduce.map.maxattempts,則認為這個Map Task失敗,其對應的輸入數據將不會產生任何結果),整個作業扔認為成功。
4) mapreduce.reduce.failures.maxpercent: 當失敗的Reduce Task失敗比例超過該值為,整個作業則失敗,默認值為0.
5) mapreduce.task.timeout: Task超時時間,經常需要設置的一個參數,該參數表達的意思為:如果一個task在一定時間內沒有任何進入,即不會讀取新的數據,也沒有輸出數據,則認為該task處于block狀態,可能是卡住了,也許永遠會卡主,為了防止因為用戶程序永遠block住不退出,則強制設置了一個該超時時間(單位毫秒),默認是300000。如果你的程序對每條輸入數據的處理時間過長(比如會訪問數據庫,通過網絡拉取數據等),建議將該參數調大,該參數過小常出現的錯誤提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。
#### 1.3 本地運行mapreduce 作業
> 設置以下幾個參數:
> mapreduce.framework.name=local
> mapreduce.jobtracker.address=local
> fs.defaultFS=local
#### 1.4 效率和穩定性相關參數
1) mapreduce.map.speculative: 是否為Map Task打開推測執行機制,默認為false
2) mapreduce.reduce.speculative: 是否為Reduce Task打開推測執行機制,默認為false
3) mapreduce.job.user.classpath.first & mapreduce.task.classpath.user.precedence:當同一個class同時出現在用戶jar包和hadoop jar中時,優先使用哪個jar包中的class,默認為false,表示優先使用hadoop jar中的class。
4) mapreduce.input.fileinputformat.split.minsize: FileInputFormat做切片時的最小切片大小,(5)mapreduce.input.fileinputformat.split.maxsize: FileInputFormat做切片時的最大切片大小(切片的默認大小就等于blocksize,即 134217
- hadoop
- linux基礎
- Linux入門
- Linux進階
- shell
- Zookeeper
- Zookeeper簡介及部署
- Zookeeper使用及API
- Redis
- Redis簡介安裝部署
- Redis使用及API
- Java高級增強
- Java多線程增強
- Maven簡介及搭建
- Hive
- Hive簡介及安裝
- Hive操作
- HIve常用函數
- Hive數據類型
- Flume
- Flume簡介及安裝
- flume 攔截器(interceptor)
- azkaban
- azKaban簡介及安裝
- Sqoop
- Sqoop簡介及安裝
- HDFS
- HDFS原理
- HDFS操作API
- MAPREDUCE原理
- MAPREDUCE圖片資源
- MAPREDUCE加強
- HBASE
- HBASE簡介及安裝
- HBASE操作及API
- HBASE內部原理
- Storm
- Storm簡介及安裝
- Storm原理
- kafka
- kafka簡介及安裝
- kafka常用操作及API
- kafka原理
- kafka配置詳解
- Scala
- Scala簡介及安裝
- Scala基礎語法
- Scala實戰