<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                [TOC] # 一次性完成,統計和排序 我們看下繼承的reduce ![](https://box.kancloud.cn/0c0e45166309ed1600b9542a73179364_772x453.png) reduce需要調用run方法,run方法中不僅執行了reduce最后還執行了cleanup 因為map不斷的提交給reduce,reduce排序好了就要寫,但是這時候一旦寫到文件中,后面再來任務,再寫的話,就不能和前面一起排序了 所以我們寫到一個treeMap中,然后在cleanup中做treeMap做排序 代碼主要把繼承reduce中的那個類改了下 ~~~ package com.folwsum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; import java.util.Map; import java.util.Set; import java.util.TreeMap; public class OneStepFlowSumSort { public static class OneStepFlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //將讀取到的每一行數據進行字段的切分 String line = value.toString(); String[] fields = StringUtils.split(line, ' '); //抽取我們業務所需要的字段 String phoneNum = fields[1]; long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); k.set(phoneNum); v.set(upFlow, downFlow); context.write(k, v); } } public static class OneStepFlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> { //在這里進行reduce端的局部緩存TreeMap TreeMap<FlowBean,Text> treeMap = new TreeMap<FlowBean, Text>(); //這里reduce方法接收到的key就是某一組《a手機號,bean》《a手機號,bean》 《b手機號,bean》《b手機號,bean》當中的第一個手機號 //這里reduce方法接收到的values就是這一組kv對中的所以bean的一個迭代器 @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upFlowCount = 0; long downFlowCount = 0; for(FlowBean bean : values){ upFlowCount += bean.getUpFlow(); downFlowCount += bean.getDownFlow(); } FlowBean sumbean = new FlowBean(); sumbean.set(upFlowCount, downFlowCount); Text text = new Text(key.toString()); treeMap.put(sumbean, text); } //這里進行我們全局的最終輸出 @Override protected void cleanup(Context context) throws IOException, InterruptedException { Set<Map.Entry<FlowBean,Text>> entrySet = treeMap.entrySet(); for(Map.Entry<FlowBean,Text> ent :entrySet){ context.write(ent.getValue(), ent.getKey()); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(OneStepFlowSumSort.class); //告訴程序,我們的程序所用的mapper類和reducer類是什么 job.setMapperClass(OneStepFlowSumMapper.class); job.setReducerClass(OneStepFlowSumReducer.class); //告訴框架,我們程序輸出的數據類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //告訴框架,我們程序使用的數據讀取組件 結果輸出所用的組件是什么 //TextInputFormat是mapreduce程序中內置的一種讀取數據組件 準確的說 叫做 讀取文本文件的輸入組件 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //告訴框架,我們要處理的數據文件在那個路勁下 FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input/")); //告訴框架,我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output/")); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看