<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                [TOC] # 簡介 ![](https://box.kancloud.cn/7f1280423308af086da8edc222ef5755_1287x997.png) # 準備數據 flow.txt ~~~ 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 ~~~ 是手機號碼跟后面的上下行流量 我們要統計每個手機號碼后面的流量 # 代碼 **FlowBean** ~~~ import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FlowBean implements WritableComparable<FlowBean> { private long upFlow; private long downFlow; private long sumFlow; //序列化框架在反序列化的時候創建對象的實例會去調用我們的無參構造函數 public FlowBean() { } public FlowBean(long upFlow, long downFlow, long sumFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = sumFlow; } public FlowBean(long upFlow, long downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } //序列化的方法 @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } //反序列化的方法 //注意:字段的反序列化的順序跟序列化的順序必須保持一致 @Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } /** * 這里進行我們自定義比較大小的規則 * 在reduce中會進行自動排序 */ @Override public int compareTo(FlowBean o) { return (int) (o.getSumFlow() - this.getSumFlow()); } //getter和setter方法 ~~~ ## 流量求和類 里面包含map,reduce,還有 ~~~ package com.folwsum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; public class FlowSum { //在kv中傳輸我們自定義的對象是可以的,不過必須要實現hadoop的序列化機制,也就是implement writable //輸入的LongWritable,Text //輸出 Text,FlowBean public static class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //將抽取到的每一行數據進行字段的切分 String line = value.toString(); String[] fields = StringUtils.split(line, ' '); //抽取我們業務所需要的字段, String phoneNum = fields[1]; //取上下行流量 long upFlow = Long.parseLong(fields[fields.length -3]); long downFlow = Long.parseLong(fields[fields.length -2]); k.set(phoneNum); v.set(upFlow, downFlow); //賦值一次就序列化出去了,不會數據都是一致的 context.write(k, v); } } public static class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> { FlowBean v = new FlowBean(); //這里reduce方法接收到的key就是某一組<手機號,bean><手機號,bean><手機號,bean>當中一個的手機號 //這里的reduce方法接收到的value就是這一組kv對中的所有bean的一個迭代器 //reduce會把手機號碼歸類 @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upFlowCount = 0; long downFlowCount = 0; for (FlowBean bean : values) { upFlowCount += bean.getUpFlow(); downFlowCount += bean.getDownFlow(); } v.set(upFlowCount, downFlowCount); context.write(key, v); } } //job驅動 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //上面這樣寫,不好,換了路徑又要重新寫,我們改為用他的類加載器加載他自己 job.setJarByClass(FlowSum.class); //告訴框架,我們程序所用的mapper類和reduce類是什么 job.setMapperClass(FlowSumMapper.class); job.setReducerClass(FlowSumReducer.class); //告訴框架我們程序輸出的類型, // 如果map階段和最終輸出結果是一樣的,這2行可以不寫 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //告訴框架,我們程序使用的數據讀取組件,結果輸出所用的組件是什么 //TextInputFormat是mapreduce程序中內置的一種讀取數據組件,準備的叫做讀取文本的輸入組件 //程序默認的輸出組件就是TextOutputFormat,下面那個可以注釋 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //告訴框架,我們要處理的數據文件在那個路徑下 FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input/")); //告訴框架我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output/")); //這邊不用submit,因為一提交就和我這個沒關系了,我這就斷開了就看不見了 // job.submit(); //提交后,然后等待服務器端返回值,看是不是true boolean res = job.waitForCompletion(true); //設置成功就退出碼為0 System.exit(res ? 0 : 1); } } ~~~ ## 按總量排序需求 MR程序在處理數據的過程中會對數據排序(map輸出的kv對傳輸到reduce之前,會排序),**排序的依據是map輸出的key** 所以,我們如果要實現自己需要的排序規則,則可以考慮將排序因素放到key中,讓key實現接口:WritableComparable 然后重寫key的compareTo方法 排序默認是按照字典排序 自定義排序,他自己他自定義的每個類里面都有compareTo方法 比如LongWritable 或者一些類實現或繼承了一些比較接口 如果是我們自己定義的類呢? 如下 ![](https://box.kancloud.cn/4273ecbde88a979f9d49a5dcceb64b8f_862x473.png) 然后代碼 這邊主要是把Bean當做key,就可以用到bean中自定義的compareTo ~~~ package com.folwsum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; public class FlowSum { public static class FlowSumSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> { FlowBean k = new FlowBean(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //將抽取到每一行數據按照字段劃分 String line = value.toString(); String[] fields = StringUtils.split(line, ' '); //抽取我們需要的業務字段 String phoneNum = fields[1]; //取上下行流量 long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); k.set(upFlow, downFlow); v.set(phoneNum); //賦值一次就序列化出去了,不會數據都是一致的 context.write(k, v); } } public static class FlowSumSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> { @Override protected void reduce(FlowBean bean, Iterable<Text> PhoneNum, Context context) throws IOException, InterruptedException { //這邊寫的時候會自動排序的 context.write(PhoneNum.iterator().next(), bean); //for (Text text : PhoneNum) { // context.write(text, bean); //} } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //用類加載器加載自己 job.setJarByClass(FlowSum.class); //告訴程序,我們的程序所用的mapper類和reducer類是什么 job.setMapperClass(FlowSumSortMapper.class); job.setReducerClass(FlowSumSortReducer.class); //告訴框架,我們程序輸出的數據類型 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //告訴框架,我們程序使用的數據讀取組件 結果輸出所用的組件是什么 //TextInputFormat是mapreduce程序中內置的一種讀取數據組件 準確的說 叫做 讀取文本文件的輸入組件 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //告訴框架,我們要處理的數據在哪個路徑下 FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/data/input")); //告訴框架我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/data/output/")); //這邊不用submit,因為一提交就和我這個沒關系了,我這就斷開了就看不見了 // job.submit(); //提交后,然后等待服務器端返回值,看是不是true boolean res = job.waitForCompletion(true); //設置成功就退出碼為0 System.exit(res ? 0 : 1); } } ~~~ # 注意 reducer中 ~~~ Iterable<FlowBean> values ~~~ 這個循環遍歷的話,他是對遍歷出來的對象不斷的重新賦值 如果把遍歷出來的對象存到鏈表中,永遠是最后一個遍歷出來的值是一 個 這時如果你需要把遍歷出來的對象保存起來,那需要自己每次new出個對象,然后把值賦值給自己的對象,然后保存起來
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看