<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                [TOC] # 分析 為什么需要用mapreduce去訪問hbase的數據? ——加快分析速度和擴展分析能力 Mapreduce訪問hbase數據作分析一定是在離線分析的場景下應用 ![](https://box.kancloud.cn/197b9d0d56a8fd2e8d80d1070b2a3e15_737x418.png) # 代碼 ## 從Hbase中讀取數據分析寫入hdfs ~~~ package com.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class HbaseReader { public static String t_user_info = "t_user_info"; //這邊泛型決定出去 static class HdfsSinkMapper extends TableMapper<Text, NullWritable> { //key代表row key,value代表這一行結果 @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { byte[] bytes = key.copyBytes(); //把row key變為string String rowkey = new String(bytes); byte[] usernameBytes = value.getValue("base_info".getBytes(), "username".getBytes()); String username = new String(usernameBytes); context.write(new Text(rowkey + "\t" + username), NullWritable.get()); } } //reduce從map中拿數據 static class HdfsSinkReducer extends Reducer<Text, NullWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "master:2181,slave:2181"); Job job = Job.getInstance(conf); job.setJarByClass(HbaseReader.class); // job.setMapperClass(HdfsSinkMapper.class); Scan scan = new Scan(); //初始化 TableMapReduceUtil.initTableMapperJob(t_user_info, scan, HdfsSinkMapper.class, Text.class, NullWritable.class, job); job.setReducerClass(HdfsSinkReducer.class); FileOutputFormat.setOutputPath(job, new Path("/Users/jdxia/Desktop/website/hdfs/output")); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.waitForCompletion(true); } } ~~~ ## 從hdfs中讀取數據寫入Hbase ~~~ package com.study; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; public class HbaseSinker { public static String flow_fields_import = "flow_fields_import"; //正常讀取文件 static class HbaseSinkMrMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //進不來 String line = value.toString(); String[] fields = line.split(" "); String phone = fields[0]; String url = fields[1]; FlowBean bean = new FlowBean(phone, url); context.write(bean, NullWritable.get()); } } //輸出到hbase static class HbaseSinkMrReducer extends TableReducer<FlowBean, NullWritable, ImmutableBytesWritable> { @Override protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { Put put = new Put(key.getPhone().getBytes()); put.add("f1".getBytes(), "url".getBytes(), key.getUrl().getBytes()); context.write(new ImmutableBytesWritable(key.getPhone().getBytes()), put); } } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "master:2181,slave:2181"); HBaseAdmin hBaseAdmin = new HBaseAdmin(conf); //創建表,有就刪除 boolean tableExists = hBaseAdmin.tableExists(flow_fields_import); if (tableExists) { hBaseAdmin.disableTable(flow_fields_import); hBaseAdmin.deleteTable(flow_fields_import); } HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(flow_fields_import)); HColumnDescriptor hColumnDescriptor = new HColumnDescriptor("f1".getBytes()); desc.addFamily(hColumnDescriptor); hBaseAdmin.createTable(desc); Job job = Job.getInstance(conf); job.setJarByClass(HbaseSinker.class); job.setMapperClass(HbaseSinkMrMapper.class); TableMapReduceUtil.initTableReducerJob(flow_fields_import, HbaseSinkMrReducer.class, job); FileInputFormat.setInputPaths(job, new Path("/Users/jdxia/Desktop/website/hdfs/data")); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Mutation.class); job.waitForCompletion(true); } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看