<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                [TOC] # 對不同的手機號碼分成不同文件 接著上面的例子 默認的分區規則 ![](https://box.kancloud.cn/1b730c49dd2e0a7cd8e609328db5aba3_928x469.png) ## 分區類 ~~~ package com.folwsum; import org.apache.hadoop.mapreduce.Partitioner; import java.util.HashMap; public class ProvivcePartitioner extends Partitioner { private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>(); static{ //我們這邊就提前設置手機號碼對應的分區 provinceMap.put("135", 0); provinceMap.put("136", 1); provinceMap.put("137", 2); provinceMap.put("138", 3); provinceMap.put("139", 4); } @Override public int getPartition(Object o, Object o2, int numPartitions) { //根據手機的前3位,進行取他的值,就是上面定義的 Integer code = provinceMap.get(o.toString().substring(0, 3)); if(code != null){ return code; } //沒有取到就分到5去 return 5; } } ~~~ ## 任務類 主要是main方法里面的 ~~~ package com.folwsum; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; public class FlowSumProvince { public static class ProvinceFlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //將讀取到的每一行數據進行字段的切分 String line = value.toString(); String[] fields = StringUtils.split(line, ' '); //抽取我們業務所需要的字段 String phoneNum = fields[1]; long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); k.set(phoneNum); v.set(upFlow, downFlow); context.write(k, v); } } public static class ProvinceFlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upFlowCount = 0; long downFlowCount = 0; for(FlowBean bean : values){ upFlowCount += bean.getUpFlow(); downFlowCount += bean.getDownFlow(); } FlowBean sumbean = new FlowBean(); sumbean.set(upFlowCount, downFlowCount); context.write(key, sumbean); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumProvince.class); //告訴程序,我們的程序所用的mapper類和reducer類是什么 job.setMapperClass(ProvinceFlowSumMapper.class); job.setReducerClass(ProvinceFlowSumReducer.class); //告訴框架,我們程序輸出的數據類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //設置我們的shuffer的分區組件使用我們自定義的組件 job.setPartitionerClass(ProvivcePartitioner.class); //這里設置我們的reduce task個數 默認是一個partition分區對應一個reduce task 輸出文件也是一對一 //如果我們的Reduce task個數 < partition分區數 就會報錯Illegal partition //如果我們的Reduce task個數 > partition分區數 不會報錯,會有空文件產生 //如果我們的Reduce task個數 = 1 partitoner組件就無效了 不存在分區的結果 //這邊設置為6,因為沒有匹配到的就到第5個 job.setNumReduceTasks(6); //告訴框架,我們程序使用的數據讀取組件 結果輸出所用的組件是什么 //TextInputFormat是mapreduce程序中內置的一種讀取數據組件 準確的說 叫做 讀取文本文件的輸入組件 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); //告訴框架,我們要處理的數據文件在那個路勁下 FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/input")); //如果有這個文件夾就刪除 Path out = new Path("/Users/jdxia/Desktop/website/hdfs/flowsum/output"); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(out)) { fileSystem.delete(out, true); } //告訴框架,我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job, out); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看