<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                [TOC] # 需求 對2張表進行合并,并對產品id進行排序 ![](https://box.kancloud.cn/ca40b538d618ee50001cc0b9b4cc28bc_1638x314.png) **maptask** map中處理的事情 1. 獲取輸入文件類型 2. 獲取輸入數據 3. 不同文件分別處理 4. 封裝bean對象輸出 ![](https://box.kancloud.cn/3b969fd60aff3bd0f6e5d7d40d34cc0a_391x233.png) 加個flage標記是那個表 默認對產品id排序 ![](https://box.kancloud.cn/0ca068e7701e036fdb479dce5243649b_400x302.png) **reducetask** reduce方法緩存訂單數據集合和產品表,然后合并 ![](https://box.kancloud.cn/7df32b8c8ad4e04d86ac20e0ebe186ec_390x257.png) # 準備數據 order.txt ~~~ 1001 01 1 1002 02 2 1003 03 3 1001 01 1 1002 02 2 1003 03 3 ~~~ pd.txt ~~~ 01 小米 02 華為 03 格力 ~~~ # 代碼 ## bean ~~~ import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class TableBean implements Writable { //訂單id private String order_id; //產品id private String pid; //產品數量 private int amount; //產品名稱 private String pName; //標記是訂單表(0)還是產品表(1) private String flag; public TableBean() { super(); } public TableBean(String order_id, String pid, int amount, String pName, String flag) { super(); this.order_id = order_id; this.pid = pid; this.amount = amount; this.pName = pName; this.flag = flag; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(order_id); out.writeUTF(pid); out.writeInt(amount); out.writeUTF(pName); out.writeUTF(flag); } @Override public void readFields(DataInput in) throws IOException { order_id = in.readUTF(); pid = in.readUTF(); amount = in.readInt(); pName = in.readUTF(); flag = in.readUTF(); } //getter/setter/toString } ~~~ ## map階段 ~~~ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> { Text k = new Text(); TableBean v = new TableBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //區分兩張表 FileSplit split = (FileSplit) context.getInputSplit(); String name = split.getPath().getName(); //獲取一行 String line = value.toString(); if (name.startsWith("order")) { //訂單表 String[] fields = line.split(" "); v.setOrder_id(fields[0]); v.setPid(fields[1]); v.setAmount(Integer.parseInt(fields[2])); v.setpName(""); v.setFlag("0"); k.set(fields[1]); } else { //產品表 String[] fields = line.split(" "); v.setOrder_id(""); v.setPid(fields[0]); v.setAmount(0); v.setpName(fields[1]); v.setFlag("1"); k.set(fields[0]); } context.write(k, v); } } ~~~ ## reduce階段 ~~~ import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.ArrayList; public class TableReucer extends Reducer<Text, TableBean, TableBean, NullWritable> { @Override protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException { //準備存儲訂單的集合 ArrayList<TableBean> orderBeans = new ArrayList<>(); //準備bean對象 TableBean pdBean = new TableBean(); //把訂單表放到集合中,產品表放到bean中 for (TableBean bean : values) { if ("0".equals(bean.getFlag())) { //訂單表 //拷貝傳遞過來的每條訂單數據到集合中 TableBean orderBean = new TableBean(); try { BeanUtils.copyProperties(orderBean, bean); } catch (Exception e) { e.printStackTrace(); } //如果不放拷貝的在這邊的話,放bean的話,這邊會一直都是最后一個值 orderBeans.add(orderBean); } else { //產品表 try { //拷貝傳遞過來的產品表到bean中 BeanUtils.copyProperties(pdBean, bean); } catch (Exception e) { e.printStackTrace(); } } } //拼接表,循環鏈表,拼接數據 for (TableBean tableBean : orderBeans) { tableBean.setpName(pdBean.getpName()); context.write(tableBean, NullWritable.get()); } } } ~~~ ## 驅動類 ~~~ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class TableDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TableDriver.class); job.setMapperClass(TableMapper.class); job.setReducerClass(TableReucer.class); //告訴框架,我們程序輸出的數據類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TableBean.class); job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class); //設置輸入文件和輸出路徑 FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/data/input")); FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/data/output")); job.waitForCompletion(true); } } ~~~ # 缺點 合并的操作是在reduce階段完成的,reduce端處理壓力太大,map節點的運算負載則很低,資源利用率不高,而且在reduce階段容易產生數據傾斜
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看