<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                [TOC] # 思路 ![](https://box.kancloud.cn/0eafbe667c1274b34a72936bc7f500df_3114x1238.png) # 準備數據 1.txt ~~~ hello tom hello jdxia hello allen tom allen jdxia hello ~~~ 2.txt ~~~ hello allen hello tom allen hello jack hello ~~~ 然后在hdfs上創建目錄 ~~~ hadoop fs -mkdir -p /worldCount/input ~~~ 然后把這2個文件傳到hdfs上 ~~~ hadoop fs -put 1.txt 2.txt /worldCount/input ~~~ # 代碼 ## map代碼 ~~~ package com.hadooprpc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * mapper繼承mapreduce的mapper * Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> * KEYIN:是值框架讀取到的數據的key類型 * 在默認的讀取數據組件InputFormat下,讀取的key是一行文本的偏移量,所以key的類型是long類型的 * <p> * VALUEIN是指框架讀取到的數據的value類型 * 在默認的讀取數據組件InputFormat下,讀到的value就是一行文本的內容,所以value的類型就是String類型的 * <p> * KEYOUT是指用戶自定義的邏輯方法返回的數據中的key的類型, * 這個是由用戶業務邏輯決定的,在我們單詞統計中,我們輸出的是單詞作為key,所以類型是String * <p> * VALUEOUT是指用戶自定義邏輯方法返回的數據中value類型,這個是由用戶業務邏輯決定的 * 在我們的單詞統計業務中,我們輸出的是單詞數量作為value,所以類型是Integer * <p> * 但是,String,Long都是jdk自帶的數據類型,在序列化的時候,效率比較低,hadoop為了提高效率,他就自定義了一套數據結構 * 所以說我們的hadoop程序中,如果該數據需要進行序列化(寫磁盤,或者網絡傳輸),就一定要用實現了hadoop虛擬化框架的數據類型 * <p> * Long -----> LongWritable * String ---> Text * Integer ----> IntWritable * null -----> nullWritable */ public class WorldCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /** * 這個map方法就是mapreduce主體程序,MapTask所調用的用戶業務邏輯方法 * Maptask會驅動我們讀取數據組件InputFormat去讀取數據(KEYIN,VALUEIN),每讀取一個(K,V),他就會傳入這個用戶寫的map方法中調用一次 * 在默認的inputFormat實現中,此處的key就是一行的起始偏移量,value就是一行的內容 * 這個方法會被調用一次,當他key/value傳進來的時候,傳一次調用一次 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //獲取每一行的文本內容,首先要把他轉為jdk的類型 String lines = value.toString(); //按照空格切割,成為一個string數組 String[] words = lines.split(" "); for (String world : words) { //如果每行中都出現個相同單詞呢 //單詞是遍歷的world,標記為1,只要出現一次就標記一次 //單詞是string類型,但是hadoop有自己的類型Text context.write(new Text(world),new IntWritable(1)); } } } ~~~ ## reduce代碼 ~~~ package com.hadooprpc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * reduce類 * reducetask在調用我們的reduce方法 * reducetask應該接收到map階段(第一階段)中所有的maptask輸出的數據中的一部分,比如(hello world先統計hello) * 如何進行數據分發 * (key.hashcode%numReduceTask==本ReduceTask編號)numReduceTask是機器的個數,這個表示數據要分為幾份 * <p> * reducetask將接收到的kv數量拿來處理時,是這樣調用我們的reduce方法的 * 先將自己接收到的所有的kv對接k分組(根據k是否相同) * 然后將一組kv中的k傳給我們的reduce方法的key變量,把這一組kv中的所有v用一個迭代器傳給reduce方法的變量values * <p> * map的輸出就是這里的輸入 * <p> * Reducer<Text, IntWritable, Text, IntWritable> * 這個和map那邊對應 */ public class WorldCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable v : values) { //把v進行疊加,就是單詞的數量 count += v.get(); } context.write(key,new IntWritable(count)); } } ~~~ ## 運行的jar包類 ~~~ package com.hadooprpc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; /** * 本類是客戶端用來指定WorldCount job程序運行時所需要的很多參數 * 比如:指定那個類作為map階段的業務邏輯,那個類作為reduce階段的業務邏輯類 * 指定那個組件作為數據的讀取組件,數據結果輸出組件 * .... * 以及其他各種所需要的參數 */ public class WorldCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //在哪里運行就在哪里拿配置 //機器上和hadoop相關的配置文件讀取過來 //這是在hadoop服務器上運行 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //告訴框架,我們程序的位置 job.setJar("/root/wordCount.jar"); //告訴框架,我們程序所用的mapper類和reduce類是什么 job.setMapperClass(WorldCountMapper.class); job.setReducerClass(WorldCountReducer.class); //告訴框架我們程序輸出的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //告訴框架,我們程序使用的數據讀取組件,結果輸出所用的組件是什么 //TextInputFormat是mapreduce程序中內置的一種讀取數據組件,準備的叫做讀取文本的輸入組件 job.setInputFormatClass(TextInputFormat.class); //job.setOutputFormatClass(TextOutputFormat.class); //告訴框架,我們要處理的數據文件在那個路徑下 FileInputFormat.setInputPaths(job,new Path("/worldCount/input")); //告訴框架我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job,new Path("/worldCount/output")); //這邊不用submit,因為一提交就和我這個沒關系了,我這就斷開了就看不見了 // job.submit(); //提交后,然后等待服務器端返回值,看是不是true boolean res = job.waitForCompletion(true); //設置成功就退出碼為0 System.exit(res?0:1); } } ~~~ # 集群服務器上運行 maven把運行的jar類,WorldCountDriver這個類打成一個jar包 然后上傳到服務器上 執行 ~~~ hadoop jar wordCount.jar com.hadooprpc.WorldCountDriver ~~~ 網頁 ![](https://box.kancloud.cn/7360ec2e9e4481af60bb634e0b285597_2996x1132.png) 命令行 ![](https://box.kancloud.cn/ee569954529756e7545e29288358f54d_3270x1204.png) 然后我們看下結果 在hdfs`/worldCount/output`會有結果 可以先在網頁中看這個結果的文件名字叫什么 ~~~ [root@master ~]# hadoop fs -cat /worldCount/output/part-r-00000 allen 4 hello 8 jack 1 jdxia 2 tom 3 ~~~ # 本地運行(一般是本地開發,方便debug調試) ## 不提交到yarn上 我們先在本地創建文件夾`worldCount/input` output文件夾不要創建 input文件夾里面還是寫1.txt,2.txt 然后我們要在WorldCountDriver類中把jar運行的路徑改下,還有input,output ~~~ package com.hadooprpc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; /** * 本類是客戶端用來指定WorldCount job程序運行時所需要的很多參數 * 比如:指定那個類作為map階段的業務邏輯,那個類作為reduce階段的業務邏輯類 * 指定那個組件作為數據的讀取組件,數據結果輸出組件 * .... * 以及其他各種所需要的參數 */ public class WorldCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //告訴框架,我們程序的位置 // job.setJar("/root/wordCount.jar"); //上面這樣寫,不好,換了路徑又要重新寫,我們改為用他的類加載器加載他自己 job.setJarByClass(WorldCountDriver.class); //告訴框架,我們程序所用的mapper類和reduce類是什么 job.setMapperClass(WorldCountMapper.class); job.setReducerClass(WorldCountReducer.class); //告訴框架我們程序輸出的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //告訴框架,我們程序使用的數據讀取組件,結果輸出所用的組件是什么 //TextInputFormat是mapreduce程序中內置的一種讀取數據組件,準備的叫做讀取文本的輸入組件 job.setInputFormatClass(TextInputFormat.class); //job.setOutputFormatClass(TextOutputFormat.class); //告訴框架,我們要處理的數據文件在那個路徑下 FileInputFormat.setInputPaths(job,new Path("/Users/jdxia/Desktop/website/hdfs/worldCount/input/")); //告訴框架我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job,new Path("/Users/jdxia/Desktop/website/hdfs/worldCount/output/")); //這邊不用submit,因為一提交就和我這個沒關系了,我這就斷開了就看不見了 // job.submit(); //提交后,然后等待服務器端返回值,看是不是true boolean res = job.waitForCompletion(true); //設置成功就退出碼為0 System.exit(res?0:1); } } ~~~ 然后我們運行下main方法就行 由于在本地運行 ![](https://box.kancloud.cn/e915f8b251c3c72489173da3395dde73_2582x400.png) 他這邊找不到運行的配置會找包下的默認配置,發現這邊的framework是local是本地,他就不會提交到yarn上 ![](https://box.kancloud.cn/0c1a4a6a65f8627509ed4f43d4c109f6_2842x936.png) 還有這個默認配置 ![](https://box.kancloud.cn/bee0de7ea8020dc8c255e88a3f834453_2780x624.png) 沒有配置他會找這個,這是本地的文件系統 ## 提交到yarn上 注意配置些環境變量,不然會報一些類找不到 ~~~ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); //設置權限,也可以在vm那邊偽造 System.setProperty("HADOOP_USER_NAME", "root"); conf.set("fs.defaultFS","hdfs://master:9000"); conf.set("mapreduce.framework.name","yarn"); conf.set("yarn.resourcemanager.hostname","master"); Job job = Job.getInstance(conf); //告訴框架,我們程序的位置 // job.setJar("/root/wordCount.jar"); //上面這樣寫,不好,換了路徑又要重新寫,我們改為用他的類加載器加載他自己 job.setJarByClass(WorldCountDriver.class); //告訴框架,我們程序所用的mapper類和reduce類是什么 job.setMapperClass(WorldCountMapper.class); job.setReducerClass(WorldCountReducer.class); //告訴框架,我們程序所用的mapper類和reduce類是什么 job.setMapperClass(WorldCountMapper.class); job.setReducerClass(WorldCountReducer.class); //告訴框架我們程序輸出的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //告訴框架,我們程序使用的數據讀取組件,結果輸出所用的組件是什么 //TextInputFormat是mapreduce程序中內置的一種讀取數據組件,準備的叫做讀取文本的輸入組件 job.setInputFormatClass(TextInputFormat.class); //告訴框架,我們要處理的數據文件在那個路徑下 FileInputFormat.setInputPaths(job,new Path("/worldCount/input/")); //告訴框架我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job,new Path("/worldCount/output/")); //這邊不用submit,因為一提交就和我這個沒關系了,我這就斷開了就看不見了 // job.submit(); //提交后,然后等待服務器端返回值,看是不是true boolean res = job.waitForCompletion(true); //設置成功就退出碼為0 System.exit(res?0:1); } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看