<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                [TOC] # 1. 序列化注意事項 (1)必須實現 Writable 接口。 (2)反序列化時,需要反射調用空參構造函數,所以必須有空參構造。 ```java public FlowBean() { super(); } ``` (3)重寫序列化方法 ```java @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } ``` (4)重寫反序列化方法 ```java @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } ``` (5)**注意反序列化的字段順序和序列化的字段順序完全一致** (6)要想把結果顯示在文件中,需要重寫 toString(),可用`\t`分開。 (7)如果需要將自定義的 bean 放在 key 中傳輸,則還需要實現 comparable接口,實現 compareTo()方法,因為 MapReduce 框中的 shuffle 過程一定會對 Key進行排序。 ```java @Override public int compareTo(FlowBean o) { return this.sumFlow > o.getSumFlow() ? -1 : 1; } ``` <br/> # 2. 案例:統計手機流量信息 **1. 需求** 給定的文件中是手機流量信息,統計每一個手機號耗費的總上行流量、下行流量、總流量。 <br/> **2. 案例數據`phone_data.txt`** 輸入數據格式: ```txt 1368544993057 13568795243 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 手機號碼 上行流量 下行流量 ``` 數據中每個字段之間的分隔符為`\t`,要求輸出數據格式: ``` 13568795243 1116 954 2070 手機號碼 上行流量 下行流量 總流量 ``` **3. 分析** Map 階段: (1)讀取文件中的每一行數據,按`\t`切分出字段。 (2)提取出手機號、上行流量、下行流量。 (3)以手機號為 key,bean 對象為 value 輸出,即 context.write(手機號,Bean)。 Reduce 階段: (1)按照手機號累加求和算出上行流量和下行流量得到總流量。 (2)實現自定義的 bean 來封裝流量信息,并將 bean 作為輸出的 key 來傳輸。 (3)MapRedduce 程序在處理數據的過程中會對數據排序(map 輸出的 k/v 在傳輸到 reduce 之前會排序),排序的依據是 map 輸出的 key。 <br/> **4. 編寫mapreduce程序** (1)編寫流量統計的 FlowBean 對象 ```java package com.exa.mapreduce001.flow; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * Date: 2020/12/30 */ public class FlowBean implements Writable { //定義相關屬性 private long upFlow; private long downFlow; private long sumFlow; public FlowBean() { } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } //序列化方法 public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } //反序列化方法 public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } //set方法 public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } } ``` (2)編寫 FlowCountMapper ```java package com.exa.mapreduce001.flow; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * Date: 2020/12/30 * * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> * KEYIN:輸入的key * VALUEIN:輸入的value * KEYOUT:輸出的key * VALUEOUT:輸出的value * * 1368544993057 13568795243 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 */ public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1368544993057 13568795243 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 String[] split = value.toString().split("\t"); String phone = split[1]; long upFlow = Long.parseLong(split[split.length - 3]); long downFlow = Long.parseLong(split[split.length - 2]); k.set(phone); v.set(upFlow, downFlow); context.write(k, v); } } ``` (3)編寫 FlowCountReducer ```java package com.exa.mapreduce001.flow; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Date: 2020/12/30 * * Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> */ public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> { FlowBean v = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sum_upFlow = 0; long sum_downFlow = 0; for (FlowBean flowBean : values) { sum_upFlow += flowBean.getUpFlow(); sum_downFlow += flowBean.getDownFlow(); } v.set(sum_upFlow, sum_downFlow); context.write(key, v); } } ``` (4)編寫驅動類 ```java package com.exa.mapreduce001.flow; import com.exa.mapreduce001.wordcount.WordCountReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * Date: 2020/12/30 */ public class FlowsumDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1 獲取配置信息,或者 job 對象實例 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2. 指定本程序的 jar 包所在的本地路徑 job.setJarByClass(FlowsumDriver.class); // 3. 指定本業務 job 要使用的 mapper/Reducer 業務類 job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); // 4. 指定 mapper 輸出數據的 kv 類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 5. 指定最終輸出的數據的 kv 類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 6. 指定 job 的輸入/輸出目錄 // 輸入目錄要已經存在 FileInputFormat.setInputPaths(job, new Path("file:///E:\\hadoop\\input")); // 輸出目錄不能已經存在 FileOutputFormat.setOutputPath(job, new Path("file:///E:\\hadoop\\output")); // 7. 將 job 中配置的相關參數,以及 job 所用的 java 類所在的 jar 包,提交給 yarn 去運行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } } ``` <br/> **5. 得出的結果如下** *`E:\hadoop\output\part-r-00000`* ```txt 13480253104 180 180 360 13502468823 7335 110349 117684 13560436666 3597 25635 29232 13560439658 2034 5892 7926 ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看