<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                [TOC] # 準備數據 flow.txt ~~~ 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 ~~~ 是手機號碼跟后面的上下行流量 我們要統計每個手機號碼后面的流量 # 代碼 **FlowBean** ~~~ import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements WritableComparable<FlowBean> { private long upFlow; private long downFlow; private long sumFlow; //序列化框架在反序列化的時候創建對象的實例會去調用我們的無參構造函數 public FlowBean() { } public FlowBean(long upFlow, long downFlow, long sumFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = sumFlow; } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } //序列化的方法 @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } //反序列化的方法 //注意:字段的反序列化的順序跟序列化的順序必須保持一致 @Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } /** * 這里進行我們自定義比較大小的規則 * 在reduce中會進行自動排序 */ @Override public int compareTo(FlowBean o) { return (int) (o.getSumFlow() - this.getSumFlow()); } //getter和setter方法 ~~~ 流量求和類 里面包含map,reduce,還有 ~~~ package com.folwsum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; public class FlowSum { //在kv中傳輸我們自定義的對象是可以的,不過必須要實現hadoop的序列化機制,也就是implement writable //輸入的LongWritable,Text //輸出 Text,FlowBean public static class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //將抽取到的每一行數據進行字段的切分 String line = value.toString(); String[] fields = StringUtils.split(line, ' '); //抽取我們業務所需要的字段, String phoneNum = fields[1]; //取上下行流量 long upFlow = Long.parseLong(fields[fields.length -3]); long downFlow = Long.parseLong(fields[fields.length -2]); k.set(phoneNum); v.set(upFlow, downFlow); //賦值一次就序列化出去了,不會數據都是一致的 context.write(k, v); } } public static class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> { FlowBean v = new FlowBean(); //這里reduce方法接收到的key就是某一組<手機號,bean><手機號,bean><手機號,bean>當中一個的手機號 //這里的reduce方法接收到的value就是這一組kv對中的所有bean的一個迭代器 //reduce會把手機號碼歸類 @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upFlowCount = 0; long downFlowCount = 0; for (FlowBean bean : values) { upFlowCount += bean.getUpFlow(); downFlowCount += bean.getDownFlow(); } v.set(upFlowCount, downFlowCount); context.write(key, v); } } //job驅動 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //上面這樣寫,不好,換了路徑又要重新寫,我們改為用他的類加載器加載他自己 job.setJarByClass(FlowSum.class); //告訴框架,我們程序所用的mapper類和reduce類是什么 job.setMapperClass(FlowSumMapper.class); job.setReducerClass(FlowSumReducer.class); //告訴框架我們程序輸出的類型, // 如果map階段和最終輸出結果是一樣的,這2行可以不寫 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //告訴框架,我們程序使用的數據讀取組件,結果輸出所用的組件是什么 //TextInputFormat是mapreduce程序中內置的一種讀取數據組件,準備的叫做讀取文本的輸入組件 //程序默認的輸出組件就是TextOutputFormat,下面那個可以注釋 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //告訴框架,我們要處理的數據文件在那個路徑下 FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input/")); //告訴框架我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output/")); //這邊不用submit,因為一提交就和我這個沒關系了,我這就斷開了就看不見了 // job.submit(); //提交后,然后等待服務器端返回值,看是不是true boolean res = job.waitForCompletion(true); //設置成功就退出碼為0 System.exit(res ? 0 : 1); } } ~~~ # 按總量排序需求 MR程序在處理數據的過程中會對數據排序(map輸出的kv對傳輸到reduce之前,會排序),排序的依據是map輸出的key 所以,我們如果要實現自己需要的排序規則,則可以考慮將排序因素放到key中,讓key實現接口:WritableComparable 然后重寫key的compareTo方法 排序默認是按照字典排序 自定義排序,他自己他自定義的每個類里面都有compareTo方法 比如LongWritable 或者一些類實現或繼承了一些比較接口 如果是我們自己定義的類呢? 如下 ![](https://box.kancloud.cn/df7eb02fcce7fecb56fbe7e500199823_1258x680.png) 然后代碼 ~~~ package com.folwsum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; // 實現流量匯總并按照流量大小的倒序排序 public class FlowSumSort { public static class FlowSumSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> { FlowBean k = new FlowBean(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //將抽取到的每一行數據進行字段的切分 String line = value.toString(); String[] fields = StringUtils.split(line, ' '); //抽取我們業務所需要的字段 String phoNum = fields[0]; long upFlow = Long.parseLong(fields[1]); long downFlow = Long.parseLong(fields[2]); k.set(upFlow, downFlow); v.set(phoNum); //賦值一次就序列化出去了,不會數據都是一致的 context.write(k, v); } } public static class FlowSumSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> { @Override protected void reduce(FlowBean bean, Iterable<Text> PhoneNum, Context context) throws IOException, InterruptedException { //這邊寫的時候會自動排序的 context.write(PhoneNum.iterator().next(), bean); } } public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumSort.class); //告訴程序,我們的程序所用的mapper類和reducer類是什么 job.setMapperClass(FlowSumSortMapper.class); job.setReducerClass(FlowSumSortReducer.class); //告訴框架,我們程序輸出的數據類型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //告訴框架,我們程序使用的數據讀取組件 結果輸出所用的組件是什么 //TextInputFormat是mapreduce程序中內置的一種讀取數據組件 準確的說 叫做 讀取文本文件的輸入組件 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //告訴框架,我們要處理的數據文件在那個路勁下 FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input/")); //告訴框架,我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output/")); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } } ~~~ # 一次性完成,統計和排序 我們看下繼承的reduce ![](https://box.kancloud.cn/acb5180a8191603e0b6892998eaf1603_1656x826.png) reduce需要調用run方法,run方法中不僅執行了reduce最后還執行了cleanup 因為map不斷的提交給reduce,reduce排序好了就要寫,但是這時候一旦寫到文件中,后面再來任務,再寫的話,就不能和前面一起排序了 所以我們寫到一個treeMap中,然后在cleanup中做treeMap做排序 代碼主要把繼承reduce中的那個類改了下 ~~~ package com.folwsum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.util.Map; import java.util.Set; import java.util.TreeMap; public class OneStepFlowSumSort { public static class OneStepFlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //將讀取到的每一行數據進行字段的切分 String line = value.toString(); String[] fields = StringUtils.split(line, ' '); //抽取我們業務所需要的字段 String phoneNum = fields[1]; long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); k.set(phoneNum); v.set(upFlow, downFlow); context.write(k, v); } } public static class OneStepFlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> { //在這里進行reduce端的局部緩存TreeMap TreeMap<FlowBean,Text> treeMap = new TreeMap<FlowBean, Text>(); //這里reduce方法接收到的key就是某一組《a手機號,bean》《a手機號,bean》 《b手機號,bean》《b手機號,bean》當中的第一個手機號 //這里reduce方法接收到的values就是這一組kv對中的所以bean的一個迭代器 @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upFlowCount = 0; long downFlowCount = 0; for(FlowBean bean : values){ upFlowCount += bean.getUpFlow(); downFlowCount += bean.getDownFlow(); } FlowBean sumbean = new FlowBean(); sumbean.set(upFlowCount, downFlowCount); Text text = new Text(key.toString()); treeMap.put(sumbean, text); } //這里進行我們全局的最終輸出 @Override protected void cleanup(Context context) throws IOException, InterruptedException { Set<Map.Entry<FlowBean,Text>> entrySet = treeMap.entrySet(); for(Map.Entry<FlowBean,Text> ent :entrySet){ context.write(ent.getValue(), ent.getKey()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(OneStepFlowSumSort.class); //告訴程序,我們的程序所用的mapper類和reducer類是什么 job.setMapperClass(OneStepFlowSumMapper.class); job.setReducerClass(OneStepFlowSumReducer.class); //告訴框架,我們程序輸出的數據類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //告訴框架,我們程序使用的數據讀取組件 結果輸出所用的組件是什么 //TextInputFormat是mapreduce程序中內置的一種讀取數據組件 準確的說 叫做 讀取文本文件的輸入組件 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //告訴框架,我們要處理的數據文件在那個路勁下 FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input/")); //告訴框架,我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output/")); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } } } ~~~ # 對不同的手機號碼分成不同文件 默認的分區規則 ![](https://box.kancloud.cn/5a3c3c0506d330fbe7231416bbe2cd3c_1626x838.png) ## 分區類 Mapreduce中會將map輸出的kv對,按照相同key分組,然后分發給不同的reducetask 默認的分發規則為:根據key的`hashcode%reducetask`數來分發 所以:如果要按照我們自己的需求進行分組,則需要改寫數據分發(分組)組件Partitioner 自定義一個CustomPartitioner繼承抽象類:Partitioner 然后在job對象中,設置自定義`partitioner: job.setPartitionerClass(CustomPartitioner.class)` 我們需要繼承Partitioner這個分區類,來實現我們自己的分區 ~~~ package com.folwsum; import org.apache.hadoop.mapreduce.Partitioner; import java.util.HashMap; public class ProvivcePartitioner extends Partitioner { private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>(); static{ //我們這邊就提前設置手機號碼對應的分區 provinceMap.put("135", 0); provinceMap.put("136", 1); provinceMap.put("137", 2); provinceMap.put("138", 3); provinceMap.put("139", 4); } @Override public int getPartition(Object o, Object o2, int numPartitions) { //根據手機的前3位,進行取他的值,就是上面定義的 Integer code = provinceMap.get(o.toString().substring(0, 3)); if(code != null){ return code; } //沒有取到就分到5去 return 5; } } ~~~ ## 任務類 主要是main方法里面的 ~~~ package com.folwsum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; public class FlowSumProvince { public static class ProvinceFlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //將讀取到的每一行數據進行字段的切分 String line = value.toString(); String[] fields = StringUtils.split(line, ' '); //抽取我們業務所需要的字段 String phoneNum = fields[1]; long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); k.set(phoneNum); v.set(upFlow, downFlow); context.write(k, v); } } public static class ProvinceFlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upFlowCount = 0; long downFlowCount = 0; for(FlowBean bean : values){ upFlowCount += bean.getUpFlow(); downFlowCount += bean.getDownFlow(); } FlowBean sumbean = new FlowBean(); sumbean.set(upFlowCount, downFlowCount); context.write(key, sumbean); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumProvince.class); //告訴程序,我們的程序所用的mapper類和reducer類是什么 job.setMapperClass(ProvinceFlowSumMapper.class); job.setReducerClass(ProvinceFlowSumReducer.class); //告訴框架,我們程序輸出的數據類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //設置我們的shuffer的分區組件使用我們自定義的組件 job.setPartitionerClass(ProvivcePartitioner.class); //這里設置我們的reduce task個數 默認是一個partition分區對應一個reduce task 輸出文件也是一對一 //如果我們的Reduce task個數 < partition分區數 就會報錯Illegal partition //如果我們的Reduce task個數 > partition分區數 不會報錯,會有空文件產生 //如果我們的Reduce task個數 = 1 partitoner組件就無效了 不存在分區的結果 //這邊設置為6,因為沒有匹配到的就到第5個 job.setNumReduceTasks(6); //告訴框架,我們程序使用的數據讀取組件 結果輸出所用的組件是什么 //TextInputFormat是mapreduce程序中內置的一種讀取數據組件 準確的說 叫做 讀取文本文件的輸入組件 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //告訴框架,我們要處理的數據文件在那個路勁下 FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input")); //如果有這個文件夾就刪除 Path out = new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output"); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(out)) { fileSystem.delete(out, true); } //告訴框架,我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job, out); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看