[TOC]
# 準備數據
flow.txt
~~~
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
~~~
是手機號碼跟后面的上下行流量
我們要統計每個手機號碼后面的流量
# 代碼
**FlowBean**
~~~
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow;
private long downFlow;
private long sumFlow;
//序列化框架在反序列化的時候創建對象的實例會去調用我們的無參構造函數
public FlowBean() {
}
public FlowBean(long upFlow, long downFlow, long sumFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = sumFlow;
}
public FlowBean(long upFlow, long downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
public void set(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
//序列化的方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
//反序列化的方法
//注意:字段的反序列化的順序跟序列化的順序必須保持一致
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
/**
* 這里進行我們自定義比較大小的規則
* 在reduce中會進行自動排序
*/
@Override
public int compareTo(FlowBean o) {
return (int) (o.getSumFlow() - this.getSumFlow());
}
//getter和setter方法
~~~
流量求和類
里面包含map,reduce,還有
~~~
package com.folwsum;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
public class FlowSum {
//在kv中傳輸我們自定義的對象是可以的,不過必須要實現hadoop的序列化機制,也就是implement writable
//輸入的LongWritable,Text
//輸出 Text,FlowBean
public static class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
Text k = new Text();
FlowBean v = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//將抽取到的每一行數據進行字段的切分
String line = value.toString();
String[] fields = StringUtils.split(line, ' ');
//抽取我們業務所需要的字段,
String phoneNum = fields[1];
//取上下行流量
long upFlow = Long.parseLong(fields[fields.length -3]);
long downFlow = Long.parseLong(fields[fields.length -2]);
k.set(phoneNum);
v.set(upFlow, downFlow);
//賦值一次就序列化出去了,不會數據都是一致的
context.write(k, v);
}
}
public static class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
FlowBean v = new FlowBean();
//這里reduce方法接收到的key就是某一組<手機號,bean><手機號,bean><手機號,bean>當中一個的手機號
//這里的reduce方法接收到的value就是這一組kv對中的所有bean的一個迭代器
//reduce會把手機號碼歸類
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long upFlowCount = 0;
long downFlowCount = 0;
for (FlowBean bean : values) {
upFlowCount += bean.getUpFlow();
downFlowCount += bean.getDownFlow();
}
v.set(upFlowCount, downFlowCount);
context.write(key, v);
}
}
//job驅動
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//上面這樣寫,不好,換了路徑又要重新寫,我們改為用他的類加載器加載他自己
job.setJarByClass(FlowSum.class);
//告訴框架,我們程序所用的mapper類和reduce類是什么
job.setMapperClass(FlowSumMapper.class);
job.setReducerClass(FlowSumReducer.class);
//告訴框架我們程序輸出的類型,
// 如果map階段和最終輸出結果是一樣的,這2行可以不寫
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//告訴框架,我們程序使用的數據讀取組件,結果輸出所用的組件是什么
//TextInputFormat是mapreduce程序中內置的一種讀取數據組件,準備的叫做讀取文本的輸入組件
//程序默認的輸出組件就是TextOutputFormat,下面那個可以注釋
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//告訴框架,我們要處理的數據文件在那個路徑下
FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input/"));
//告訴框架我們的處理結果要輸出到什么地方
FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output/"));
//這邊不用submit,因為一提交就和我這個沒關系了,我這就斷開了就看不見了
// job.submit();
//提交后,然后等待服務器端返回值,看是不是true
boolean res = job.waitForCompletion(true);
//設置成功就退出碼為0
System.exit(res ? 0 : 1);
}
}
~~~
# 按總量排序需求
MR程序在處理數據的過程中會對數據排序(map輸出的kv對傳輸到reduce之前,會排序),排序的依據是map輸出的key
所以,我們如果要實現自己需要的排序規則,則可以考慮將排序因素放到key中,讓key實現接口:WritableComparable
然后重寫key的compareTo方法
排序默認是按照字典排序
自定義排序,他自己他自定義的每個類里面都有compareTo方法
比如LongWritable
或者一些類實現或繼承了一些比較接口
如果是我們自己定義的類呢?
如下

然后代碼
~~~
package com.folwsum;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
// 實現流量匯總并按照流量大小的倒序排序
public class FlowSumSort {
public static class FlowSumSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
FlowBean k = new FlowBean();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//將抽取到的每一行數據進行字段的切分
String line = value.toString();
String[] fields = StringUtils.split(line, ' ');
//抽取我們業務所需要的字段
String phoNum = fields[0];
long upFlow = Long.parseLong(fields[1]);
long downFlow = Long.parseLong(fields[2]);
k.set(upFlow, downFlow);
v.set(phoNum);
//賦值一次就序列化出去了,不會數據都是一致的
context.write(k, v);
}
}
public static class FlowSumSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean bean, Iterable<Text> PhoneNum, Context context) throws IOException, InterruptedException {
//這邊寫的時候會自動排序的
context.write(PhoneNum.iterator().next(), bean);
}
}
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowSumSort.class);
//告訴程序,我們的程序所用的mapper類和reducer類是什么
job.setMapperClass(FlowSumSortMapper.class);
job.setReducerClass(FlowSumSortReducer.class);
//告訴框架,我們程序輸出的數據類型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//告訴框架,我們程序使用的數據讀取組件 結果輸出所用的組件是什么
//TextInputFormat是mapreduce程序中內置的一種讀取數據組件 準確的說 叫做 讀取文本文件的輸入組件
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//告訴框架,我們要處理的數據文件在那個路勁下
FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input/"));
//告訴框架,我們的處理結果要輸出到什么地方
FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output/"));
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
~~~
# 一次性完成,統計和排序
我們看下繼承的reduce

reduce需要調用run方法,run方法中不僅執行了reduce最后還執行了cleanup
因為map不斷的提交給reduce,reduce排序好了就要寫,但是這時候一旦寫到文件中,后面再來任務,再寫的話,就不能和前面一起排序了
所以我們寫到一個treeMap中,然后在cleanup中做treeMap做排序
代碼主要把繼承reduce中的那個類改了下
~~~
package com.folwsum;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
public class OneStepFlowSumSort {
public static class OneStepFlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
Text k = new Text();
FlowBean v = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//將讀取到的每一行數據進行字段的切分
String line = value.toString();
String[] fields = StringUtils.split(line, ' ');
//抽取我們業務所需要的字段
String phoneNum = fields[1];
long upFlow = Long.parseLong(fields[fields.length - 3]);
long downFlow = Long.parseLong(fields[fields.length - 2]);
k.set(phoneNum);
v.set(upFlow, downFlow);
context.write(k, v);
}
}
public static class OneStepFlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
//在這里進行reduce端的局部緩存TreeMap
TreeMap<FlowBean,Text> treeMap = new TreeMap<FlowBean, Text>();
//這里reduce方法接收到的key就是某一組《a手機號,bean》《a手機號,bean》 《b手機號,bean》《b手機號,bean》當中的第一個手機號
//這里reduce方法接收到的values就是這一組kv對中的所以bean的一個迭代器
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long upFlowCount = 0;
long downFlowCount = 0;
for(FlowBean bean : values){
upFlowCount += bean.getUpFlow();
downFlowCount += bean.getDownFlow();
}
FlowBean sumbean = new FlowBean();
sumbean.set(upFlowCount, downFlowCount);
Text text = new Text(key.toString());
treeMap.put(sumbean, text);
}
//這里進行我們全局的最終輸出
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
Set<Map.Entry<FlowBean,Text>> entrySet = treeMap.entrySet();
for(Map.Entry<FlowBean,Text> ent :entrySet){
context.write(ent.getValue(), ent.getKey());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(OneStepFlowSumSort.class);
//告訴程序,我們的程序所用的mapper類和reducer類是什么
job.setMapperClass(OneStepFlowSumMapper.class);
job.setReducerClass(OneStepFlowSumReducer.class);
//告訴框架,我們程序輸出的數據類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//告訴框架,我們程序使用的數據讀取組件 結果輸出所用的組件是什么
//TextInputFormat是mapreduce程序中內置的一種讀取數據組件 準確的說 叫做 讀取文本文件的輸入組件
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//告訴框架,我們要處理的數據文件在那個路勁下
FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input/"));
//告訴框架,我們的處理結果要輸出到什么地方
FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output/"));
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
}
~~~
# 對不同的手機號碼分成不同文件
默認的分區規則

## 分區類
Mapreduce中會將map輸出的kv對,按照相同key分組,然后分發給不同的reducetask
默認的分發規則為:根據key的`hashcode%reducetask`數來分發
所以:如果要按照我們自己的需求進行分組,則需要改寫數據分發(分組)組件Partitioner
自定義一個CustomPartitioner繼承抽象類:Partitioner
然后在job對象中,設置自定義`partitioner: job.setPartitionerClass(CustomPartitioner.class)`
我們需要繼承Partitioner這個分區類,來實現我們自己的分區
~~~
package com.folwsum;
import org.apache.hadoop.mapreduce.Partitioner;
import java.util.HashMap;
public class ProvivcePartitioner extends Partitioner {
private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();
static{
//我們這邊就提前設置手機號碼對應的分區
provinceMap.put("135", 0);
provinceMap.put("136", 1);
provinceMap.put("137", 2);
provinceMap.put("138", 3);
provinceMap.put("139", 4);
}
@Override
public int getPartition(Object o, Object o2, int numPartitions) {
//根據手機的前3位,進行取他的值,就是上面定義的
Integer code = provinceMap.get(o.toString().substring(0, 3));
if(code != null){
return code;
}
//沒有取到就分到5去
return 5;
}
}
~~~
## 任務類
主要是main方法里面的
~~~
package com.folwsum;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
public class FlowSumProvince {
public static class ProvinceFlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
Text k = new Text();
FlowBean v = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//將讀取到的每一行數據進行字段的切分
String line = value.toString();
String[] fields = StringUtils.split(line, ' ');
//抽取我們業務所需要的字段
String phoneNum = fields[1];
long upFlow = Long.parseLong(fields[fields.length - 3]);
long downFlow = Long.parseLong(fields[fields.length - 2]);
k.set(phoneNum);
v.set(upFlow, downFlow);
context.write(k, v);
}
}
public static class ProvinceFlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long upFlowCount = 0;
long downFlowCount = 0;
for(FlowBean bean : values){
upFlowCount += bean.getUpFlow();
downFlowCount += bean.getDownFlow();
}
FlowBean sumbean = new FlowBean();
sumbean.set(upFlowCount, downFlowCount);
context.write(key, sumbean);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowSumProvince.class);
//告訴程序,我們的程序所用的mapper類和reducer類是什么
job.setMapperClass(ProvinceFlowSumMapper.class);
job.setReducerClass(ProvinceFlowSumReducer.class);
//告訴框架,我們程序輸出的數據類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//設置我們的shuffer的分區組件使用我們自定義的組件
job.setPartitionerClass(ProvivcePartitioner.class);
//這里設置我們的reduce task個數 默認是一個partition分區對應一個reduce task 輸出文件也是一對一
//如果我們的Reduce task個數 < partition分區數 就會報錯Illegal partition
//如果我們的Reduce task個數 > partition分區數 不會報錯,會有空文件產生
//如果我們的Reduce task個數 = 1 partitoner組件就無效了 不存在分區的結果
//這邊設置為6,因為沒有匹配到的就到第5個
job.setNumReduceTasks(6);
//告訴框架,我們程序使用的數據讀取組件 結果輸出所用的組件是什么
//TextInputFormat是mapreduce程序中內置的一種讀取數據組件 準確的說 叫做 讀取文本文件的輸入組件
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//告訴框架,我們要處理的數據文件在那個路勁下
FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input"));
//如果有這個文件夾就刪除
Path out = new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output");
FileSystem fileSystem = FileSystem.get(conf);
if (fileSystem.exists(out)) {
fileSystem.delete(out, true);
}
//告訴框架,我們的處理結果要輸出到什么地方
FileOutputFormat.setOutputPath(job, out);
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
~~~
- linux
- 常用命令
- 高級文本命令
- 面試題
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推薦
- java高級特性
- 多線程
- 實現線程的三種方式
- 同步關鍵詞
- 讀寫鎖
- 鎖的相關概念
- 多線程的join
- 有三個線程T1 T2 T3,保證順序執行
- java五種線程池
- 守護線程與普通線程
- ThreadLocal
- BlockingQueue消息隊列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty簡介
- 案例一發送字符串
- 案例二發送對象
- 輕量級RPC開發
- 簡介
- spring(IOC/AOP)
- spring初始化順序
- 通過ApplicationContextAware加載Spring上下文
- InitializingBean的作用
- 結論
- 自定義注解
- zk在框架中的應用
- hadoop
- 簡介
- hadoop集群搭建
- hadoop單機安裝
- HDFS簡介
- hdfs基本操作
- hdfs環境搭建
- 常見問題匯總
- hdfs客戶端操作
- mapreduce工作機制
- 案列-單詞統計
- 局部聚合Combiner
- 案列-流量統計(分區,排序,比較)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法實現
- 案例-求topN(分組)
- 自定義inputFormat
- 自定義outputFormat
- 框架運算全流程
- mapreduce的優化方案
- HA機制
- Hive
- 安裝
- DDL操作
- 創建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 嚴格模式
- 數據類型
- shell參數
- 函數
- 內置運算符
- 內置函數
- 自定義函數
- Transform實現
- 特殊分割符處理
- 案例
- 級聯求和accumulate
- flume
- 簡介
- 安裝
- 常用的組件
- 攔截器
- 案例
- 采集目錄到HDFS
- 采集文件到HDFS
- 多個agent串聯
- 日志采集和匯總
- 自定義攔截器
- 高可用配置
- 使用注意
- sqoop
- 安裝
- 數據導入
- 導入數據到HDFS
- 導入關系表到HIVE
- 導入表數據子集
- 增量導入
- 數據導出
- 作業
- 原理
- azkaban
- 簡介
- 安裝
- 案例
- 簡介
- command類型單一job
- command類型多job工作流flow
- HDFS操作任務
- mapreduce任務
- hive腳本任務
- hbase
- 簡介
- 安裝
- 命令行
- 基本CURD
- 過濾器查詢
- 系統架構
- 物理存儲
- 尋址機制
- 讀寫過程
- Region管理
- master工作機制
- 建表高級屬性
- 與mapreduce結合
- 協處理器
- 點擊流平臺開發
- 簡介
- storm
- 簡介
- 安裝
- 集群啟動及任務過程分析
- 單詞統計
- 并行度
- ACK容錯機制
- ACK簡介