<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                [TOC] # 準備數據 ~~~ Order_0000001,pd001,222.8 Order_0000001,pd005,25.8 Order_0000002,pd005,325.8 Order_0000002,pd003,522.8 Order_0000002,pd004,122.4 Order_0000003,pd001,222.8 Order_0000003,pd001,322.8 ~~~ ![](https://box.kancloud.cn/3cb59971cd2977453d2fb5ea5f490ba2_1874x1106.png) 他是記錄訂單編號,商品和成交金額 然后取出每個訂單的top1和topN的數據 里面需要用到一個分組的 1. 利用“訂單id和成交金額”作為key,可以將map階段讀取到的所有訂單數據按照id分區,按照金額排序,發送到reduce 2. 在reduce端利用GroupingComparator將訂單id相同的kv聚合成組,然后取第一個即是最大值 # top1代碼 **OrderBean** ~~~ package com.top; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class OrderBean implements WritableComparable<OrderBean> { private Text itemid; private DoubleWritable amount; public OrderBean() { } public OrderBean(Text itemid, DoubleWritable amount) { set(itemid, amount); } public void set(Text itemid, DoubleWritable amount) { this.itemid = itemid; this.amount = amount; } public Text getItemid() { return itemid; } public DoubleWritable getAmount() { return amount; } @Override public int compareTo(OrderBean o) { //比較他的訂單id int cmp = this.itemid.compareTo(o.getItemid()); //如果訂單id相同就比較金額 if (cmp == 0) { //-號表示倒敘 cmp = -this.amount.compareTo(o.getAmount()); } return cmp; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(itemid.toString()); out.writeDouble(amount.get()); } @Override public void readFields(DataInput in) throws IOException { String readUTF = in.readUTF(); double readDouble = in.readDouble(); this.itemid = new Text(readUTF); this.amount = new DoubleWritable(readDouble); } @Override public String toString() { return "OrderBean{" + "itemid=" + itemid + ", amount=" + amount + '}'; } } ~~~ **ItemIdPartitioner** ~~~ package com.top; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable> { @Override public int getPartition(OrderBean key, NullWritable nullWritable, int numPartitions) { //模擬源碼中寫的,保證一個訂單中的相同bean的id一定能分到同一個地方 return (key.getItemid().hashCode() & Integer.MAX_VALUE) % numPartitions; } } ~~~ **ItemidGroupingComparator** ~~~ package com.top; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class ItemidGroupingComparator extends WritableComparator { protected ItemidGroupingComparator() { //一定要調用下super,里面放你要比較的對象 super(OrderBean.class, true); } //他會傳入2個你上面的寫的對象,比如這邊是2個bean @Override public int compare(WritableComparable a, WritableComparable b) { //把這個bean強行轉換下 OrderBean abean = (OrderBean) a; OrderBean bbean = (OrderBean) b; //取出這2個bean,如果這2個bean的id相比較是一樣就放到一起 return abean.getItemid().compareTo(bbean.getItemid()); } } ~~~ **TopOne** ~~~ package com.top; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; public class TopOne { public static class TopOneMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> { OrderBean bean = new OrderBean(); // Text itemid = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, ','); bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2]))); context.write(bean, NullWritable.get()); } } public static class TopOneReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> { @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TopOne.class); job.setMapperClass(TopOneMapper.class); job.setReducerClass(TopOneReducer.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/index/input")); //如果有這個文件夾就刪除 Path out = new Path("/Users/jdxia/Desktop/website/hdfs/index/output/"); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(out)) { fileSystem.delete(out, true); } //告訴框架,我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job, out); //注冊一個GroupingComparator job.setGroupingComparatorClass(ItemidGroupingComparator.class); job.setPartitionerClass(ItemIdPartitioner.class); job.setNumReduceTasks(1); job.waitForCompletion(true); } } ~~~ # topN代碼 bean中要添加 ~~~ @Override public boolean equals(Object o) { OrderBean bean = (OrderBean) o; return bean.getItemid().equals(this.itemid); } ~~~ 主類中修改 ~~~ package com.top; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.StringUtils; import java.io.IOException; public class TopN { static class TopNMapper extends Mapper<LongWritable, Text, OrderBean, OrderBean> { OrderBean v = new OrderBean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line, ','); k.set(fields[0]); v.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2]))); context.write(v, v); } } static class TopNReducer extends Reducer<OrderBean, OrderBean, NullWritable, OrderBean> { int topn = 1; int count = 0; @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); topn = Integer.parseInt(conf.get("topn")); } @Override protected void reduce(OrderBean key, Iterable<OrderBean> values, Context context) throws IOException, InterruptedException { count = 0; for (OrderBean bean : values) { if ((count++) == topn) { return; } context.write(NullWritable.get(), bean); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // ?如果要寫配置文件就這樣寫 // conf.addResource("userconfig.xml"); // System.out.println(conf.get("top.n")); // 我這邊就直接設置要求top2了 conf.set("topn", "2"); Job job = Job.getInstance(conf); job.setJarByClass(TopN.class); job.setMapperClass(TopNMapper.class); job.setReducerClass(TopNReducer.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(OrderBean.class); FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/index/input")); //如果有這個文件夾就刪除 Path out = new Path("/Users/jdxia/Desktop/website/hdfs/index/output/"); FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(out)) { fileSystem.delete(out, true); } //告訴框架,我們的處理結果要輸出到什么地方 FileOutputFormat.setOutputPath(job, out); //注冊一個GroupingComparator job.setGroupingComparatorClass(ItemidGroupingComparator.class); job.setPartitionerClass(ItemIdPartitioner.class); job.setNumReduceTasks(1); job.waitForCompletion(true); } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看