<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                通過將關聯條件作為 map 輸出的 key,將兩表滿足 join 條件的數據并攜帶數據所來源的文件信息,發往同一個 reduce task,在 reduce 中進行數據的串聯。 1. **創建客戶信息和訂單合并后的 bean 類**。 ```java package com.kgc.mapreduce.entry; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class CustomerOrders implements Writable { //客戶 id private String customerId; //訂單 id private String orderId; //客戶名稱 private String customerName; //訂單狀態 private String orderStatus; //標志 private int flag; public CustomerOrders() { } public CustomerOrders(String customerId, String orderId, String customerName, String orderStatus, int flag) { this.customerId = customerId; this.orderId = orderId; this.customerName = customerName; this.orderStatus = orderStatus; this.flag = flag; } /** * 序列化 * * @param dataOutput * @throws IOException */ @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(customerId); dataOutput.writeUTF(customerName); dataOutput.writeUTF(orderId); dataOutput.writeUTF(orderStatus); dataOutput.writeInt(flag); } /** * 反序列化 * * @param dataInput * @throws IOException */ @Override public void readFields(DataInput dataInput) throws IOException { this.customerId = dataInput.readUTF(); this.customerName = dataInput.readUTF(); this.orderId = dataInput.readUTF(); this.orderStatus = dataInput.readUTF(); this.flag = dataInput.readInt(); } @Override public String toString() { return orderId + "\t" + customerName + "\t" + orderStatus; } public String getCustomerId() { return customerId; } public void setCustomerId(String customerId) { this.customerId = customerId; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getCustomerName() { return customerName; } public void setCustomerName(String customerName) { this.customerName = customerName; } public String getOrderStatus() { return orderStatus; } public void setOrderStatus(String orderStatus) { this.orderStatus = orderStatus; } public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } } ``` 2. **編寫 CustomerOrderMapper 程序。** ```java package com.kgc.mapreduce.mapper; import com.kgc.mapreduce.entry.CustomerOrders; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class CustomerOrderMapper extends Mapper<LongWritable, Text, Text, CustomerOrders> { private String name; private CustomerOrders customerOrders = new CustomerOrders(); @Override protected void setup(Context context) throws IOException, InterruptedException { //獲取分片 FileSplit fileInput = (FileSplit) context.getInputSplit(); name = fileInput.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(","); if (name.startsWith("order")) {//訂單文件 customerOrders.setCustomerId(split[2]); customerOrders.setOrderId(split[0]); customerOrders.setOrderStatus(split[3]); customerOrders.setFlag(1); customerOrders.setCustomerName(""); } else { customerOrders.setCustomerId(split[0]); customerOrders.setCustomerName(split[1]); customerOrders.setFlag(0); customerOrders.setOrderId(""); customerOrders.setOrderStatus(""); } context.write(new Text(customerOrders.getCustomerId()), customerOrders); } } ``` 3. **編寫 COReducer 程序。** ```java package com.kgc.mapreduce.reducer; import com.kgc.mapreduce.entry.CustomerOrders; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.ArrayList; public class COReducer extends Reducer<Text, CustomerOrders, CustomerOrders, NullWritable> { @Override protected void reduce(Text key, Iterable<CustomerOrders> values, Context context) throws IOException, InterruptedException { // 1 準備存儲訂單的集合 ArrayList<CustomerOrders> orderBeans = new ArrayList<>(); // 2 準備合并 bean 對象 CustomerOrders cusBean = new CustomerOrders(); for (CustomerOrders bean : values) { if (1 == bean.getFlag()) {// 訂單表 // 拷貝傳遞過來的每條訂單數據到集合中 CustomerOrders orderBean = new CustomerOrders(); try { BeanUtils.copyProperties(orderBean, bean); } catch (Exception e) { e.printStackTrace(); } orderBeans.add(orderBean); } else {// 客戶信息表 try { // 拷貝傳遞過來的產品表到內存中 BeanUtils.copyProperties(cusBean, bean); } catch (Exception e) { e.printStackTrace(); } } } // 3 客戶信息表與訂單表的拼接 for (CustomerOrders bean : orderBeans) { bean.setCustomerName(cusBean.getCustomerName()); // 4 寫數據 context.write(bean, NullWritable.get()); } } } ``` 4. **編寫 CODriver 程序。** ```java package com.kgc.mapreduce.driver; import com.kgc.mapreduce.entry.CustomerOrders; import com.kgc.mapreduce.mapper.CustomerOrderMapper; import com.kgc.mapreduce.reducer.COReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CODriver { public static void main(String[] args) throws Exception { // 1 獲取配置信息,或者 job 對象實例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 指定本程序的 jar 包所在的本地路徑 job.setJarByClass(CODriver.class); // 3 指定本業務 job 要使用的 mapper/Reducer 業務類 job.setMapperClass(CustomerOrderMapper.class); job.setReducerClass(COReducer.class); // 4 指定 mapper 輸出數據的 kv 類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CustomerOrders.class); // 5 指定最終輸出的數據的 kv 類型 job.setOutputKeyClass(CustomerOrders.class); job.setOutputValueClass(NullWritable.class); // 6 指定 job 的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path("d:\\input")); FileOutputFormat.setOutputPath(job, new Path("d:\\output")); // 7 將 job 中配置的相關參數,以及 job 所用的 java 類所在的 jar 包,提交給 yarn 去運行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } } ``` 缺點:這種方式中,合并的操作是在 reduce 階段完成,reduce 端的處理壓力太大,map 節點的運算負載則很低,資源利用率不高,且在 reduce 階段極易產生數據傾斜。<br/> 解決方案:map 端實現數據合并。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看