<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                [TOC] # 常用數據序列化類型 常用的數據類型對應的hadoop數據序列化類型 | java類型 | Hadoop Writable類型 | | --- | --- | | boolean | BooleanWritable | | byte | ByteWritable | | int | IntWritable | | float | FloatWritable | | long | LongWritable | | double | DoubleWritable | | string | Text | | map | MapWritable | | array | ArrayWritable | # 思路 ![](https://box.kancloud.cn/8dea64dd5a1554cdd52fd740e5501e0e_1837x746.png) # 準備數據 1.txt ~~~ hello tom hello jdxia hello allen tom allen jdxia hello ~~~ 2.txt ~~~ hello allen hello tom allen hello jack hello ~~~ 然后在hdfs上創建目錄 ~~~ hadoop fs -mkdir -p /worldCount/input ~~~ 然后把這2個文件傳到hdfs上 ~~~ hadoop fs -put 1.txt 2.txt /worldCount/input ~~~ # 代碼 ## map代碼 ~~~ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * mapper繼承mapreduce的mapper * Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> * KEYIN:是值框架讀取到的數據的key類型 * 在默認的讀取數據組件InputFormat下,讀取的key是一行文本的偏移量,所以key的類型是long類型的 * <p> * VALUEIN是指框架讀取到的數據的value類型 * 在默認的讀取數據組件InputFormat下,讀到的value就是一行文本的內容,所以value的類型就是String類型的 * <p> * KEYOUT是指用戶自定義的邏輯方法返回的數據中的key的類型, * 這個是由用戶業務邏輯決定的,在我們單詞統計中,我們輸出的是單詞作為key,所以類型是String * <p> * VALUEOUT是指用戶自定義邏輯方法返回的數據中value類型,這個是由用戶業務邏輯決定的 * 在我們的單詞統計業務中,我們輸出的是單詞數量作為value,所以類型是Integer * <p> * 但是,String,Long都是jdk自帶的數據類型,在序列化的時候,效率比較低,hadoop為了提高效率,他就自定義了一套數據結構 * 所以說我們的hadoop程序中,如果該數據需要進行序列化(寫磁盤,或者網絡傳輸),就一定要用實現了hadoop虛擬化框架的數據類型 * <p> * Long -----> LongWritable * String ---> Text * Integer ----> IntWritable * null -----> nullWritable */ public class WorldCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { Text k = new Text(); IntWritable v = new IntWritable(1); /** * 這個map方法就是mapreduce主體程序,MapTask所調用的用戶業務邏輯方法 * Maptask會驅動我們讀取數據組件InputFormat去讀取數據(KEYIN,VALUEIN),每讀取一個(K,V),他就會傳入這個用戶寫的map方法中調用一次 * 在默認的inputFormat實現中,此處的key就是一行的起始偏移量,value就是一行的內容 * 這個方法會被調用一次,當他key/value傳進來的時候,傳一次調用一次 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //獲取每一行的文本內容,首先要把他轉為jdk的類型 String lines = value.toString(); //按照空格切割,成為一個string數組 String[] words = lines.split(" "); for (String world : words) { //如果每行中都出現個相同單詞呢 //單詞是遍歷的world,標記為1,只要出現一次就標記一次 //單詞是string類型,但是hadoop有自己的類型Text k.set(world); //context.write(new Text(world),new IntWritable(1)); context.write(k, v); } } } ~~~ ## reduce代碼 ~~~ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * reduce類 * reducetask在調用我們的reduce方法 * reducetask應該接收到map階段(第一階段)中所有的maptask輸出的數據中的一部分,比如(hello world先統計hello) * 如何進行數據分發 * (key.hashcode%numReduceTask==本ReduceTask編號)numReduceTask是機器的個數,這個表示數據要分為幾份 * <p> * reducetask將接收到的kv數量拿來處理時,是這樣調用我們的reduce方法的 * 先將自己接收到的所有的kv對接k分組(根據k是否相同) * 然后將一組kv中的k傳給我們的reduce方法的key變量,把這一組kv中的所有v用一個迭代器傳給reduce方法的變量values * <p> * map的輸出就是這里的輸入 * <p> * Reducer<Text, IntWritable, Text, IntWritable> * 這個和map那邊對應 */ public class WorldCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable v : values) { //把v進行疊加,就是單詞的數量 count += v.get(); } context.write(key,new IntWritable(count)); } } ~~~ ## 運行的jar包類 ~~~ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; /** * 本類是客戶端用來指定WorldCount job程序運行時所需要的很多參數 * 比如:指定那個類作為map階段的業務邏輯,那個類作為reduce階段的業務邏輯類 * 指定那個組件作為數據的讀取組件,數據結果輸出組件 * .... * 以及其他各種所需要的參數 */ public class WorldCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //在哪里運行就在哪里拿配置 //機器上和hadoop相關的配置文件讀取過來 //這是在hadoop服務器上運行 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //告訴框架,我們程序的位置 job.setJar("/root/wordCount.jar"); //告訴框架,我們程序所用的mapper類和reduce類是什么 job.setMapperClass(WorldCountMapper.class); job.setReducerClass(WorldCountReducer.class); //告訴框架我們程序輸出的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //告訴框架,我們程序使用的數據讀取組件,結果輸出所用的組件是什么 //TextInputFormat是mapreduce程序中內置的一種讀取數據組件,準備的叫做讀取文本的輸入組件 job.setInputFormatClass(TextInputFormat.class); //job.setOutputFormatClass(TextOutputFormat.class); //告訴框架,我們要處理的數據文件在那個路徑下 FileInputFormat.setInputPaths(job,new Path("/worldCount/input")); //告訴框架我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job,new Path("/worldCount/output")); //這邊不用submit,因為一提交就和我這個沒關系了,我這就斷開了就看不見了 // job.submit(); //提交后,然后等待服務器端返回值,看是不是true boolean res = job.waitForCompletion(true); //設置成功就退出碼為0 System.exit(res?0:1); } } ~~~ # 集群服務器上運行 maven把運行的jar類,WorldCountDriver這個類打成一個jar包 然后上傳到服務器上 執行 ~~~ hadoop jar wordCount.jar com.hadooprpc.WorldCountDriver ~~~ 網頁 ![](https://box.kancloud.cn/d41e983f0fa494673c0563e3c75191da_1843x641.png) 命令行 ![](https://box.kancloud.cn/5b8396b07af2ed129bb160dfd347828f_1621x640.png) 然后我們看下結果 在hdfs`/worldCount/output`會有結果 可以先在網頁中看這個結果的文件名字叫什么 ~~~ [root@master ~]# hadoop fs -cat /worldCount/output/part-r-00000 allen 4 hello 8 jack 1 jdxia 2 tom 3 ~~~ # 本地運行(一般是本地開發,方便debug調試) ## 不提交到yarn上 我們先在本地創建文件夾`worldCount/input` output文件夾不要創建 input文件夾里面還是寫1.txt,2.txt 然后我們要在WorldCountDriver類中把jar運行的路徑改下,還有input,output ~~~ package com.hadooprpc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; /** * 本類是客戶端用來指定WorldCount job程序運行時所需要的很多參數 * 比如:指定那個類作為map階段的業務邏輯,那個類作為reduce階段的業務邏輯類 * 指定那個組件作為數據的讀取組件,數據結果輸出組件 * .... * 以及其他各種所需要的參數 */ public class WorldCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //告訴框架,我們程序的位置 // job.setJar("/root/wordCount.jar"); //上面這樣寫,不好,換了路徑又要重新寫,我們改為用他的類加載器加載他自己 job.setJarByClass(WorldCountDriver.class); //告訴框架,我們程序所用的mapper類和reduce類是什么 job.setMapperClass(WorldCountMapper.class); job.setReducerClass(WorldCountReducer.class); //告訴框架我們程序輸出的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //告訴框架,我們程序使用的數據讀取組件,結果輸出所用的組件是什么 //TextInputFormat是mapreduce程序中內置的一種讀取數據組件,準備的叫做讀取文本的輸入組件 job.setInputFormatClass(TextInputFormat.class); //job.setOutputFormatClass(TextOutputFormat.class); //告訴框架,我們要處理的數據文件在那個路徑下 FileInputFormat.setInputPaths(job,new Path("/Users/jdxia/Desktop/website/hdfs/worldCount/input/")); //告訴框架我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job,new Path("/Users/jdxia/Desktop/website/hdfs/worldCount/output/")); //這邊不用submit,因為一提交就和我這個沒關系了,我這就斷開了就看不見了 // job.submit(); //提交后,然后等待服務器端返回值,看是不是true boolean res = job.waitForCompletion(true); //設置成功就退出碼為0 System.exit(res?0:1); } } ~~~ 然后我們運行下main方法就行 由于在本地運行 ![](https://box.kancloud.cn/f4b44219fa641471d1abdf5114a54e35_1755x283.png) 他這邊找不到運行的配置會找包下的默認配置,發現這邊的framework是local是本地,他就不會提交到yarn上 ![](https://box.kancloud.cn/b6e12230fc444068dd222045e99e70b2_1506x490.png) 還有這個默認配置 ![](https://box.kancloud.cn/e9a2349ed15980115aa7b36fe4033a63_1423x334.png) 沒有配置他會找這個,這是本地的文件系統 ## 提交到yarn上 注意配置些環境變量,不然會報一些類找不到 ~~~ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); //設置權限,也可以在vm那邊偽造 System.setProperty("HADOOP_USER_NAME", "root"); conf.set("fs.defaultFS","hdfs://master:9000"); conf.set("mapreduce.framework.name","yarn"); conf.set("yarn.resourcemanager.hostname","master"); Job job = Job.getInstance(conf); //告訴框架,我們程序的位置 // job.setJar("/root/wordCount.jar"); //上面這樣寫,不好,換了路徑又要重新寫,我們改為用他的類加載器加載他自己 job.setJarByClass(WorldCountDriver.class); //告訴框架,我們程序所用的mapper類和reduce類是什么 job.setMapperClass(WorldCountMapper.class); job.setReducerClass(WorldCountReducer.class); //告訴框架,我們程序所用的mapper類和reduce類是什么 job.setMapperClass(WorldCountMapper.class); job.setReducerClass(WorldCountReducer.class); //告訴框架我們程序輸出的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //告訴框架,我們程序使用的數據讀取組件,結果輸出所用的組件是什么 //TextInputFormat是mapreduce程序中內置的一種讀取數據組件,準備的叫做讀取文本的輸入組件 job.setInputFormatClass(TextInputFormat.class); //告訴框架,我們要處理的數據文件在那個路徑下 FileInputFormat.setInputPaths(job,new Path("/worldCount/input/")); //告訴框架我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job,new Path("/worldCount/output/")); //這邊不用submit,因為一提交就和我這個沒關系了,我這就斷開了就看不見了 // job.submit(); //提交后,然后等待服務器端返回值,看是不是true boolean res = job.waitForCompletion(true); //設置成功就退出碼為0 System.exit(res?0:1); } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看