[TOC]
# 思路

# 準備數據
1.txt
~~~
hello tom hello jdxia hello allen
tom allen jdxia hello
~~~
2.txt
~~~
hello allen hello tom allen
hello jack hello
~~~
然后在hdfs上創建目錄
~~~
hadoop fs -mkdir -p /worldCount/input
~~~
然后把這2個文件傳到hdfs上
~~~
hadoop fs -put 1.txt 2.txt /worldCount/input
~~~
# 代碼
## map代碼
~~~
package com.hadooprpc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* mapper繼承mapreduce的mapper
* Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
* KEYIN:是值框架讀取到的數據的key類型
* 在默認的讀取數據組件InputFormat下,讀取的key是一行文本的偏移量,所以key的類型是long類型的
* <p>
* VALUEIN是指框架讀取到的數據的value類型
* 在默認的讀取數據組件InputFormat下,讀到的value就是一行文本的內容,所以value的類型就是String類型的
* <p>
* KEYOUT是指用戶自定義的邏輯方法返回的數據中的key的類型,
* 這個是由用戶業務邏輯決定的,在我們單詞統計中,我們輸出的是單詞作為key,所以類型是String
* <p>
* VALUEOUT是指用戶自定義邏輯方法返回的數據中value類型,這個是由用戶業務邏輯決定的
* 在我們的單詞統計業務中,我們輸出的是單詞數量作為value,所以類型是Integer
* <p>
* 但是,String,Long都是jdk自帶的數據類型,在序列化的時候,效率比較低,hadoop為了提高效率,他就自定義了一套數據結構
* 所以說我們的hadoop程序中,如果該數據需要進行序列化(寫磁盤,或者網絡傳輸),就一定要用實現了hadoop虛擬化框架的數據類型
* <p>
* Long -----> LongWritable
* String ---> Text
* Integer ----> IntWritable
* null -----> nullWritable
*/
public class WorldCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* 這個map方法就是mapreduce主體程序,MapTask所調用的用戶業務邏輯方法
* Maptask會驅動我們讀取數據組件InputFormat去讀取數據(KEYIN,VALUEIN),每讀取一個(K,V),他就會傳入這個用戶寫的map方法中調用一次
* 在默認的inputFormat實現中,此處的key就是一行的起始偏移量,value就是一行的內容
* 這個方法會被調用一次,當他key/value傳進來的時候,傳一次調用一次
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//獲取每一行的文本內容,首先要把他轉為jdk的類型
String lines = value.toString();
//按照空格切割,成為一個string數組
String[] words = lines.split(" ");
for (String world : words) {
//如果每行中都出現個相同單詞呢
//單詞是遍歷的world,標記為1,只要出現一次就標記一次
//單詞是string類型,但是hadoop有自己的類型Text
context.write(new Text(world),new IntWritable(1));
}
}
}
~~~
## reduce代碼
~~~
package com.hadooprpc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* reduce類
* reducetask在調用我們的reduce方法
* reducetask應該接收到map階段(第一階段)中所有的maptask輸出的數據中的一部分,比如(hello world先統計hello)
* 如何進行數據分發
* (key.hashcode%numReduceTask==本ReduceTask編號)numReduceTask是機器的個數,這個表示數據要分為幾份
* <p>
* reducetask將接收到的kv數量拿來處理時,是這樣調用我們的reduce方法的
* 先將自己接收到的所有的kv對接k分組(根據k是否相同)
* 然后將一組kv中的k傳給我們的reduce方法的key變量,把這一組kv中的所有v用一個迭代器傳給reduce方法的變量values
* <p>
* map的輸出就是這里的輸入
* <p>
* Reducer<Text, IntWritable, Text, IntWritable>
* 這個和map那邊對應
*/
public class WorldCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable v : values) {
//把v進行疊加,就是單詞的數量
count += v.get();
}
context.write(key,new IntWritable(count));
}
}
~~~
## 運行的jar包類
~~~
package com.hadooprpc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
/**
* 本類是客戶端用來指定WorldCount job程序運行時所需要的很多參數
* 比如:指定那個類作為map階段的業務邏輯,那個類作為reduce階段的業務邏輯類
* 指定那個組件作為數據的讀取組件,數據結果輸出組件
* ....
* 以及其他各種所需要的參數
*/
public class WorldCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//在哪里運行就在哪里拿配置
//機器上和hadoop相關的配置文件讀取過來
//這是在hadoop服務器上運行
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//告訴框架,我們程序的位置
job.setJar("/root/wordCount.jar");
//告訴框架,我們程序所用的mapper類和reduce類是什么
job.setMapperClass(WorldCountMapper.class);
job.setReducerClass(WorldCountReducer.class);
//告訴框架我們程序輸出的類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//告訴框架,我們程序使用的數據讀取組件,結果輸出所用的組件是什么
//TextInputFormat是mapreduce程序中內置的一種讀取數據組件,準備的叫做讀取文本的輸入組件
job.setInputFormatClass(TextInputFormat.class);
//job.setOutputFormatClass(TextOutputFormat.class);
//告訴框架,我們要處理的數據文件在那個路徑下
FileInputFormat.setInputPaths(job,new Path("/worldCount/input"));
//告訴框架我們的處理結果要輸出到什么地方
FileOutputFormat.setOutputPath(job,new Path("/worldCount/output"));
//這邊不用submit,因為一提交就和我這個沒關系了,我這就斷開了就看不見了
// job.submit();
//提交后,然后等待服務器端返回值,看是不是true
boolean res = job.waitForCompletion(true);
//設置成功就退出碼為0
System.exit(res?0:1);
}
}
~~~
# 集群服務器上運行
maven把運行的jar類,WorldCountDriver這個類打成一個jar包
然后上傳到服務器上
執行
~~~
hadoop jar wordCount.jar com.hadooprpc.WorldCountDriver
~~~
網頁

命令行

然后我們看下結果
在hdfs`/worldCount/output`會有結果
可以先在網頁中看這個結果的文件名字叫什么
~~~
[root@master ~]# hadoop fs -cat /worldCount/output/part-r-00000
allen 4
hello 8
jack 1
jdxia 2
tom 3
~~~
# 本地運行(一般是本地開發,方便debug調試)
## 不提交到yarn上
我們先在本地創建文件夾`worldCount/input`
output文件夾不要創建
input文件夾里面還是寫1.txt,2.txt
然后我們要在WorldCountDriver類中把jar運行的路徑改下,還有input,output
~~~
package com.hadooprpc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
/**
* 本類是客戶端用來指定WorldCount job程序運行時所需要的很多參數
* 比如:指定那個類作為map階段的業務邏輯,那個類作為reduce階段的業務邏輯類
* 指定那個組件作為數據的讀取組件,數據結果輸出組件
* ....
* 以及其他各種所需要的參數
*/
public class WorldCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//告訴框架,我們程序的位置
// job.setJar("/root/wordCount.jar");
//上面這樣寫,不好,換了路徑又要重新寫,我們改為用他的類加載器加載他自己
job.setJarByClass(WorldCountDriver.class);
//告訴框架,我們程序所用的mapper類和reduce類是什么
job.setMapperClass(WorldCountMapper.class);
job.setReducerClass(WorldCountReducer.class);
//告訴框架我們程序輸出的類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//告訴框架,我們程序使用的數據讀取組件,結果輸出所用的組件是什么
//TextInputFormat是mapreduce程序中內置的一種讀取數據組件,準備的叫做讀取文本的輸入組件
job.setInputFormatClass(TextInputFormat.class);
//job.setOutputFormatClass(TextOutputFormat.class);
//告訴框架,我們要處理的數據文件在那個路徑下
FileInputFormat.setInputPaths(job,new Path("/Users/jdxia/Desktop/website/hdfs/worldCount/input/"));
//告訴框架我們的處理結果要輸出到什么地方
FileOutputFormat.setOutputPath(job,new Path("/Users/jdxia/Desktop/website/hdfs/worldCount/output/"));
//這邊不用submit,因為一提交就和我這個沒關系了,我這就斷開了就看不見了
// job.submit();
//提交后,然后等待服務器端返回值,看是不是true
boolean res = job.waitForCompletion(true);
//設置成功就退出碼為0
System.exit(res?0:1);
}
}
~~~
然后我們運行下main方法就行
由于在本地運行

他這邊找不到運行的配置會找包下的默認配置,發現這邊的framework是local是本地,他就不會提交到yarn上

還有這個默認配置

沒有配置他會找這個,這是本地的文件系統
## 提交到yarn上
注意配置些環境變量,不然會報一些類找不到
~~~
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
//設置權限,也可以在vm那邊偽造
System.setProperty("HADOOP_USER_NAME", "root");
conf.set("fs.defaultFS","hdfs://master:9000");
conf.set("mapreduce.framework.name","yarn");
conf.set("yarn.resourcemanager.hostname","master");
Job job = Job.getInstance(conf);
//告訴框架,我們程序的位置
// job.setJar("/root/wordCount.jar");
//上面這樣寫,不好,換了路徑又要重新寫,我們改為用他的類加載器加載他自己
job.setJarByClass(WorldCountDriver.class);
//告訴框架,我們程序所用的mapper類和reduce類是什么
job.setMapperClass(WorldCountMapper.class);
job.setReducerClass(WorldCountReducer.class);
//告訴框架,我們程序所用的mapper類和reduce類是什么
job.setMapperClass(WorldCountMapper.class);
job.setReducerClass(WorldCountReducer.class);
//告訴框架我們程序輸出的類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//告訴框架,我們程序使用的數據讀取組件,結果輸出所用的組件是什么
//TextInputFormat是mapreduce程序中內置的一種讀取數據組件,準備的叫做讀取文本的輸入組件
job.setInputFormatClass(TextInputFormat.class);
//告訴框架,我們要處理的數據文件在那個路徑下
FileInputFormat.setInputPaths(job,new Path("/worldCount/input/"));
//告訴框架我們的處理結果要輸出到什么地方
FileOutputFormat.setOutputPath(job,new Path("/worldCount/output/"));
//這邊不用submit,因為一提交就和我這個沒關系了,我這就斷開了就看不見了
// job.submit();
//提交后,然后等待服務器端返回值,看是不是true
boolean res = job.waitForCompletion(true);
//設置成功就退出碼為0
System.exit(res?0:1);
}
~~~
- linux
- 常用命令
- 高級文本命令
- 面試題
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推薦
- java高級特性
- 多線程
- 實現線程的三種方式
- 同步關鍵詞
- 讀寫鎖
- 鎖的相關概念
- 多線程的join
- 有三個線程T1 T2 T3,保證順序執行
- java五種線程池
- 守護線程與普通線程
- ThreadLocal
- BlockingQueue消息隊列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty簡介
- 案例一發送字符串
- 案例二發送對象
- 輕量級RPC開發
- 簡介
- spring(IOC/AOP)
- spring初始化順序
- 通過ApplicationContextAware加載Spring上下文
- InitializingBean的作用
- 結論
- 自定義注解
- zk在框架中的應用
- hadoop
- 簡介
- hadoop集群搭建
- hadoop單機安裝
- HDFS簡介
- hdfs基本操作
- hdfs環境搭建
- 常見問題匯總
- hdfs客戶端操作
- mapreduce工作機制
- 案列-單詞統計
- 局部聚合Combiner
- 案列-流量統計(分區,排序,比較)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法實現
- 案例-求topN(分組)
- 自定義inputFormat
- 自定義outputFormat
- 框架運算全流程
- mapreduce的優化方案
- HA機制
- Hive
- 安裝
- DDL操作
- 創建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 嚴格模式
- 數據類型
- shell參數
- 函數
- 內置運算符
- 內置函數
- 自定義函數
- Transform實現
- 特殊分割符處理
- 案例
- 級聯求和accumulate
- flume
- 簡介
- 安裝
- 常用的組件
- 攔截器
- 案例
- 采集目錄到HDFS
- 采集文件到HDFS
- 多個agent串聯
- 日志采集和匯總
- 自定義攔截器
- 高可用配置
- 使用注意
- sqoop
- 安裝
- 數據導入
- 導入數據到HDFS
- 導入關系表到HIVE
- 導入表數據子集
- 增量導入
- 數據導出
- 作業
- 原理
- azkaban
- 簡介
- 安裝
- 案例
- 簡介
- command類型單一job
- command類型多job工作流flow
- HDFS操作任務
- mapreduce任務
- hive腳本任務
- hbase
- 簡介
- 安裝
- 命令行
- 基本CURD
- 過濾器查詢
- 系統架構
- 物理存儲
- 尋址機制
- 讀寫過程
- Region管理
- master工作機制
- 建表高級屬性
- 與mapreduce結合
- 協處理器
- 點擊流平臺開發
- 簡介
- storm
- 簡介
- 安裝
- 集群啟動及任務過程分析
- 單詞統計
- 并行度
- ACK容錯機制
- ACK簡介