<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                ## MapReduce 編程模型 ![](https://img.kancloud.cn/10/50/10504daab5da985afd79efd0eb6987f9_918x407.png) > 業務被分成map階段和reduce階段。 ## Mapreduce執行步驟 ![](https://img.kancloud.cn/d7/c0/d7c022a25de361c148150aaa8cf15baa_837x713.png) ## 自定義mapper步驟 ~~~ package com.bizzbee.bigdata.hadoop.mr.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /* * Mapper是一個范型類 * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN:map 任務讀數據的key類型 long * VALUEIN :map讀數據的value類型 string * * 詞頻統計 * hello world welcome * hello welcome * * 輸出--》(word,1) * * KEYOUT key輸出類型 String * VALUEOUT value輸出類型 integer * * *因為Long String Integer是Java里面的數據類型。 * Hadoop自定義類型:序列化和反序列化 * * * */ public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> { /* * 重寫map方法 * */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //每一行數據用tab分割拆開 String[] words = value.toString().split("\t"); for(String word:words){ context.write(new Text(word),new IntWritable(1)); } } } ~~~ ## 自定義reduce ~~~ package com.bizzbee.bigdata.hadoop.mr.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; /* * 輸入: (hello,1)(hello,1) * (welcome,1) * map輸出到reduce,是按照相同的key分發到一個reduce上去執行。 * * reduce1:(hello,1)(hello,1)(hello,1)==》(hello,<1,1,1>) * */ public class WordCountReducer extends Reducer<Text, IntWritable, Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count =0; Iterator<IntWritable> iterator = values.iterator(); while (iterator.hasNext()){ IntWritable value = iterator.next(); //get就是把IntWritable轉換回int count +=value.get(); } context.write(key,new IntWritable(count)); } } ~~~ ## 創建driver運行統計 ~~~ package com.bizzbee.bigdata.hadoop.mr.wc; import com.bizzbee.bigdata.hadoop.hdfs.Constants; import com.bizzbee.bigdata.hadoop.hdfs.ParamsUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.net.URI; import java.util.Properties; public class WordCountApp { public static void main(String[] args) throws Exception{ //加載配置文件 Properties properties = ParamsUtils.getProperties(); //填寫hdfs上面的用戶,不加這句報錯 System.setProperty("HADOOP_USER_NAME","bizzbee"); Configuration configuration = new Configuration(); configuration.set("dfs.client.use.datanode.hostname", "true"); configuration.set("fs.defaultFS","hdfs://tencent2:8020"); Job job = Job.getInstance(configuration); job.setJarByClass(WordCountApp.class); // 設置Job對應的參數: 主類 job.setJarByClass(WordCountApp.class); // 設置Job對應的參數: 設置自定義的Mapper和Reducer處理類 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 設置Job對應的參數: Mapper輸出key和value的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 設置Job對應的參數: Reduce輸出key和value的類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 如果輸出目錄已經存在,則先刪除 FileSystem fileSystem = FileSystem.get(new URI("hdfs://tencent2:8020"),configuration, "bizzbee"); Path outputPath = new Path("/bizzbee/output/result"); if(fileSystem.exists(outputPath)) { fileSystem.delete(outputPath,true); } // 設置Job對應的參數: Mapper輸出key和value的類型:作業輸入和輸出的路徑 FileInputFormat.setInputPaths(job, new Path("/bizzbee/input/article")); FileOutputFormat.setOutputPath(job, outputPath); // 提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : -1); } } ~~~ ![](https://img.kancloud.cn/c1/3a/c13aec83757736335a9de5a5cfe753c1_174x691.png) ## combiner combiner是在map階段線進行一次聚合操作。 優點:減少io,提高性能,可以節省網絡帶寬。 缺點:求平均數等幾種操作不可用。 ![](https://img.kancloud.cn/7b/80/7b80f62278f1e890c750ff82356f9444_975x521.png)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看