<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                [TOC] # HBASE操作及API ## 1 命令行演示 ### 1.1 基本shell命令 > 進入hbase命令行 ~~~ ./hbase shell ~~~ > 顯示hbase中的表 ~~~ list ~~~ > 創建user表,包含info、data兩個列族 ~~~ create 'user', 'info1', 'data1' create 'user', {NAME => 'info', VERSIONS => '3'} ~~~ > 向user表中插入信息,row key為rk0001,列族info中添加name列標示符,值為zhangsan ~~~ put 'user', 'rk0001', 'info:name', 'zhangsan' ~~~ > 向user表中插入信息,row key為rk0001,列族info中添加gender列標示符,值為female ~~~ put 'user', 'rk0001', 'info:gender', 'female' ~~~ > 向user表中插入信息,row key為rk0001,列族info中添加age列標示符,值為20 ~~~ put 'user', 'rk0001', 'info:age', 20 ~~~ > 向user表中插入信息,row key為rk0001,列族data中添加pic列標示符,值為picture ~~~ put 'user', 'rk0001', 'data:pic', 'picture' ~~~ > 獲取user表中row key為rk0001的所有信息 ~~~ get 'user', 'rk0001' ~~~ > 獲取user表中row key為rk0001,info列族的所有信息 ~~~ get 'user', 'rk0001', 'info' ~~~ > 獲取user表中row key為rk0001,info列族的name、age列標示符的信息 ~~~ get 'user', 'rk0001', 'info:name', 'info:age' ~~~ > 獲取user表中row key為rk0001,info、data列族的信息 ~~~ get 'user', 'rk0001', 'info', 'data' get 'user', 'rk0001', {COLUMN => ['info', 'data']} get 'user', 'rk0001', {COLUMN => ['info:name', 'data:pic']} ~~~ > 獲取user表中row key為rk0001,列族為info,版本號最新5個的信息 ~~~ get 'user', 'rk0001', {COLUMN => 'info', VERSIONS => 2} get 'user', 'rk0001', {COLUMN => 'info:name', VERSIONS => 5} get 'user', 'rk0001', {COLUMN => 'info:name', VERSIONS => 5, TIMERANGE => [1392368783980, 1392380169184]} ~~~ > 獲取user表中row key為rk0001,cell的值為zhangsan的信息 ~~~ get 'people', 'rk0001', {FILTER => "ValueFilter(=, 'binary:圖片')"} ~~~ > 獲取user表中row key為rk0001,列標示符中含有a的信息 ~~~ get 'people', 'rk0001', {FILTER => "(QualifierFilter(=,'substring:a'))"} put 'user', 'rk0002', 'info:name', 'fanbingbing' put 'user', 'rk0002', 'info:gender', 'female' put 'user', 'rk0002', 'info:nationality', '中國' get 'user', 'rk0002', {FILTER => "ValueFilter(=, 'binary:中國')"} ~~~ > 查詢user表中的所有信息 ~~~ scan 'user' ~~~ > 查詢user表中列族為info的信息 ~~~ scan 'user', {COLUMNS => 'info'} scan 'user', {COLUMNS => 'info', RAW => true, VERSIONS => 5} scan 'persion', {COLUMNS => 'info', RAW => true, VERSIONS => 3} ~~~ > 查詢user表中列族為info和data的信息 ~~~ scan 'user', {COLUMNS => ['info', 'data']} scan 'user', {COLUMNS => ['info:name', 'data:pic']} ~~~ > 查詢user表中列族為info、列標示符為name的信息 ~~~ scan 'user', {COLUMNS => 'info:name'} ~~~ > 查詢user表中列族為info、列標示符為name的信息,并且版本最新的5個 ~~~ scan 'user', {COLUMNS => 'info:name', VERSIONS => 5} ~~~ > 查詢user表中列族為info和data且列標示符中含有a字符的信息 ~~~ scan 'user', {COLUMNS => ['info', 'data'], FILTER => "(QualifierFilter(=,'substring:a'))"} ~~~ > 查詢user表中列族為info,rk范圍是[rk0001, rk0003)的數據 ~~~ scan 'people', {COLUMNS => 'info', STARTROW => 'rk0001', ENDROW => 'rk0003'} ~~~ > 查詢user表中row key以rk字符開頭的 ~~~ scan 'user',{FILTER=>"PrefixFilter('rk')"} ~~~ > 查詢user表中指定范圍的數據 ~~~ scan 'user', {TIMERANGE => [1392368783980, 1392380169184]} ~~~ > 刪除數據 > 刪除user表row key為rk0001,列標示符為info:name的數據 ~~~ delete 'people', 'rk0001', 'info:name' ~~~ > 刪除user表row key為rk0001,列標示符為info:name,timestamp為1392383705316的數據 ~~~ delete 'user', 'rk0001', 'info:name', 1392383705316 ~~~ > 清空user表中的數據 ~~~ truncate 'people' ~~~ > 修改表結構 > 首先停用user表(新版本不用) ~~~ disable 'user' ~~~ > 添加兩個列族f1和f2 ~~~ alter 'people', NAME => 'f1' alter 'user', NAME => 'f2' ~~~ > 啟用表 ~~~ enable 'user' ~~~ > ###disable 'user'(新版本不用) > 刪除一個列族: ~~~ alter 'user', NAME => 'f1', METHOD => 'delete' 或 alter 'user', 'delete' => 'f1' ~~~ > 添加列族f1同時刪除列族f2 ~~~ alter 'user', {NAME => 'f1'}, {NAME => 'f2', METHOD => 'delete'} ~~~ > 將user表的f1列族版本號改為5 ~~~ alter 'people', NAME => 'info', VERSIONS => 5 ~~~ > 啟用表 ~~~ enable 'user' ~~~ > 刪除表 ~~~ disable 'user' drop 'user' get 'person', 'rk0001', {FILTER => "ValueFilter(=, 'binary:中國')"} get 'person', 'rk0001', {FILTER => "(QualifierFilter(=,'substring:a'))"} scan 'person', {COLUMNS => 'info:name'} scan 'person', {COLUMNS => ['info', 'data'], FILTER => "(QualifierFilter(=,'substring:a'))"} scan 'person', {COLUMNS => 'info', STARTROW => 'rk0001', ENDROW => 'rk0003'} scan 'person', {COLUMNS => 'info', STARTROW => '20140201', ENDROW => '20140301'} scan 'person', {COLUMNS => 'info:name', TIMERANGE => [1395978233636, 1395987769587]} delete 'person', 'rk0001', 'info:name' alter 'person', NAME => 'ffff' alter 'person', NAME => 'info', VERSIONS => 10 get 'user', 'rk0002', {COLUMN => ['info:name', 'data:pic']} ~~~ ## 2 hbase代碼開發(基本,過濾器查詢) ### 2.1 基本增刪改查java實現 ~~~ public class HbaseDemo { private Configuration conf = null; @Before public void init(){ conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "weekend05,weekend06,weekend07"); } @Test public void testDrop() throws Exception{ HBaseAdmin admin = new HBaseAdmin(conf); admin.disableTable("account"); admin.deleteTable("account"); admin.close(); } @Test public void testPut() throws Exception{ HTable table = new HTable(conf, "person_info"); Put p = new Put(Bytes.toBytes("person_rk_bj_zhang_000002")); p.add("base_info".getBytes(), "name".getBytes(), "zhangwuji".getBytes()); table.put(p); table.close(); } @Test public void testDel() throws Exception{ HTable table = new HTable(conf, "user"); Delete del = new Delete(Bytes.toBytes("rk0001")); del.deleteColumn(Bytes.toBytes("data"), Bytes.toBytes("pic")); table.delete(del); table.close(); } @Test public void testGet() throws Exception{ HTable table = new HTable(conf, "person_info"); Get get = new Get(Bytes.toBytes("person_rk_bj_zhang_000001")); get.setMaxVersions(5); Result result = table.get(get); List<Cell> cells = result.listCells(); for(Cell c:cells){ } //result.getValue(family, qualifier); 可以從result中直接取出一個特定的value //遍歷出result中所有的鍵值對 List<KeyValue> kvs = result.list(); //kv ---> f1:title:superise.... f1:author:zhangsan f1:content:asdfasldgkjsldg for(KeyValue kv : kvs){ String family = new String(kv.getFamily()); System.out.println(family); String qualifier = new String(kv.getQualifier()); System.out.println(qualifier); System.out.println(new String(kv.getValue())); } table.close(); } ~~~ ### 2.2 過濾器查詢 > 引言:過濾器的類型很多,但是可以分為兩大類——比較過濾器,專用過濾器 > 過濾器的作用是在服務端判斷數據是否滿足條件,然后只將滿足條件的數據返回給客戶端; #### 1 hbase過濾器的比較運算符: ~~~ LESS < LESS_OR_EQUAL <= EQUAL = NOT_EQUAL <> GREATER_OR_EQUAL >= GREATER > NO_OP 排除所有 ~~~ #### 2 Hbase過濾器的比較器(指定比較機制): | BinaryComparator| 按字節索引順序比較指定字節數組,采用Bytes.compareTo(byte[])| | --- | --- | | BinaryPrefixComparator | 跟前面相同,只是比較左端的數據是否相同| | NullComparator | 判斷給定的是否為空| | BitComparator| 按位比較| | RegexStringComparator | 提供一個正則的比較器,僅支持 EQUAL 和非EQUAL| | SubstringComparator | 判斷提供的子串是否出現在value中。| #### 3 Hbase的過濾器分類 1) 比較過濾器 > 1.1 行鍵過濾器RowFilter ~~~ Filter filter1 = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-22"))); scan.setFilter(filter1); ~~~ > 1.2 列族過濾器FamilyFilter ~~~ Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("colfam3"))); scan.setFilter(filter1); ~~~ > 1.3 列過濾器QualifierFilter ~~~ filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("col-2"))); scan.setFilter(filter1); ~~~ > 1.4 值過濾器 ValueFilter ~~~ Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(".4") ); scan.setFilter(filter1); ~~~ 2) 專用過濾器 > 2.1 單列值過濾器 SingleColumnValueFilter ----會返回滿足條件的整行 ~~~ SingleColumnValueFilter filter = new SingleColumnValueFilter( Bytes.toBytes("colfam1"), Bytes.toBytes("col-5"), CompareFilter.CompareOp.NOT_EQUAL, new SubstringComparator("val-5")); filter.setFilterIfMissing(true); //如果不設置為true,則那些不包含指定column的行也會返回 scan.setFilter(filter1); ~~~ > 2.2 SingleColumnValueExcludeFilter > 與上相反 > 2.3 前綴過濾器 PrefixFilter----針對行鍵 ~~~ Filter filter = new PrefixFilter(Bytes.toBytes("row1")); scan.setFilter(filter1); ~~~ > 2.4 列前綴過濾器 ColumnPrefixFilter ~~~ Filter filter = new ColumnPrefixFilter(Bytes.toBytes("qual2")); scan.setFilter(filter1); ~~~ > 2.5分頁過濾器 PageFilter ~~~ public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "spark01:2181,spark02:2181,spark03:2181"); String tableName = "testfilter"; String cfName = "f1"; final byte[] POSTFIX = new byte[] { 0x00 }; HTable table = new HTable(conf, tableName); Filter filter = new PageFilter(3); byte[] lastRow = null; int totalRows = 0; while (true) { Scan scan = new Scan(); scan.setFilter(filter); if(lastRow != null){ //注意這里添加了POSTFIX操作,用來重置掃描邊界 byte[] startRow = Bytes.add(lastRow,POSTFIX); scan.setStartRow(startRow); } ResultScanner scanner = table.getScanner(scan); int localRows = 0; Result result; while((result = scanner.next()) != null){ System.out.println(localRows++ + ":" + result); totalRows ++; lastRow = result.getRow(); } scanner.close(); if(localRows == 0) break; } System.out.println("total rows:" + totalRows); } /** * 多種過濾條件的使用方法 * @throws Exception */ @Test public void testScan() throws Exception{ HTable table = new HTable(conf, "person_info".getBytes()); Scan scan = new Scan(Bytes.toBytes("person_rk_bj_zhang_000001"), Bytes.toBytes("person_rk_bj_zhang_000002")); //前綴過濾器----針對行鍵 Filter filter = new PrefixFilter(Bytes.toBytes("rk")); //行過濾器 ---針對行鍵 ByteArrayComparable rowComparator = new BinaryComparator(Bytes.toBytes("person_rk_bj_zhang_000001")); RowFilter rf = new RowFilter(CompareOp.LESS_OR_EQUAL, rowComparator); /** * 假設rowkey格式為:創建日期_發布日期_ID_TITLE * 目標:查找 發布日期 為 2014-12-21 的數據 * sc.textFile("path").flatMap(line=>line.split("\t")).map(x=>(x,1)).reduceByKey(_+_).map((_(2),_(1))).sortByKey().map((_(2),_(1))).saveAsTextFile("") * * */ rf = new RowFilter(CompareOp.EQUAL , new SubstringComparator("_2014-12-21_")); //單值過濾器1完整匹配字節數組 new SingleColumnValueFilter("base_info".getBytes(), "name".getBytes(), CompareOp.EQUAL, "zhangsan".getBytes()); //單值過濾器2 匹配正則表達式 ByteArrayComparable comparator = new RegexStringComparator("zhang."); new SingleColumnValueFilter("info".getBytes(), "NAME".getBytes(), CompareOp.EQUAL, comparator); //單值過濾器3匹配是否包含子串,大小寫不敏感 comparator = new SubstringComparator("wu"); new SingleColumnValueFilter("info".getBytes(), "NAME".getBytes(), CompareOp.EQUAL, comparator); //鍵值對元數據過濾-----family過濾----字節數組完整匹配 FamilyFilter ff = new FamilyFilter( CompareOp.EQUAL , new BinaryComparator(Bytes.toBytes("base_info")) //表中不存在inf列族,過濾結果為空 ); //鍵值對元數據過濾-----family過濾----字節數組前綴匹配 ff = new FamilyFilter( CompareOp.EQUAL , new BinaryPrefixComparator(Bytes.toBytes("inf")) //表中存在以inf打頭的列族info,過濾結果為該列族所有行 ); //鍵值對元數據過濾-----qualifier過濾----字節數組完整匹配 filter = new QualifierFilter( CompareOp.EQUAL , new BinaryComparator(Bytes.toBytes("na")) //表中不存在na列,過濾結果為空 ); filter = new QualifierFilter( CompareOp.EQUAL , new BinaryPrefixComparator(Bytes.toBytes("na")) //表中存在以na打頭的列name,過濾結果為所有行的該列數據 ); //基于列名(即Qualifier)前綴過濾數據的ColumnPrefixFilter filter = new ColumnPrefixFilter("na".getBytes()); //基于列名(即Qualifier)多個前綴過濾數據的MultipleColumnPrefixFilter byte[][] prefixes = new byte[][] {Bytes.toBytes("na"), Bytes.toBytes("me")}; filter = new MultipleColumnPrefixFilter(prefixes); //為查詢設置過濾條件 scan.setFilter(filter); scan.addFamily(Bytes.toBytes("base_info")); //一行 // Result result = table.get(get); //多行的數據 ResultScanner scanner = table.getScanner(scan); for(Result r : scanner){ /** for(KeyValue kv : r.list()){ String family = new String(kv.getFamily()); System.out.println(family); String qualifier = new String(kv.getQualifier()); System.out.println(qualifier); System.out.println(new String(kv.getValue())); } */ //直接從result中取到某個特定的value byte[] value = r.getValue(Bytes.toBytes("base_info"), Bytes.toBytes("name")); System.out.println(new String(value)); } table.close(); } ~~~ ## 3.Hbase高級應用 ### 3.1建表高級屬性 > 下面幾個shell 命令在hbase操作中可以起到很到的作用,且主要體現在建表的過程中,看下面幾個create 屬性 1) BLOOMFILTER 默認是NONE 是否使用布隆過慮及使用何種方式布隆過濾可以每列族單獨啟用。 > 使用 HColumnDescriptor.setBloomFilterType(NONE | ROW | ROWCOL) 對列族單獨啟用布隆。 1. Default = ROW 對行進行布隆過濾。 2. 對 ROW,行鍵的哈希在每次插入行時將被添加到布隆。 3. 對 ROWCOL,行鍵 + 列族 + 列族修飾的哈希將在每次插入行時添加到布隆 * 使用方法: create 'table',{BLOOMFILTER =>'ROW'} * 啟用布隆過濾可以節省讀磁盤過程,可以有助于降低讀取延遲 2) VERSIONS 默認是1 這個參數的意思是數據保留1個 版本,如果我們認為我們的數據沒有這么大的必要保留這么多,隨時都在更新,而老版本的數據對我們毫無價值,那將此參數設為1 能節約2/3的空間 ~~~ 使用方法: create 'table',{VERSIONS=>'2'} ~~~ > 附:MIN_VERSIONS => '0'是說在compact操作執行之后,至少要保留的版本 3) COMPRESSION 默認值是NONE 即不使用壓縮 > 這個參數意思是該列族是否采用壓縮,采用什么壓縮算法 ~~~ 使用方法: create 'table',{NAME=>'info',COMPRESSION=>'SNAPPY'} ~~~ * 建議采用SNAPPY壓縮算法 * HBase中,在Snappy發布之前(Google 2011年對外發布Snappy),采用的LZO算法,目標是達到盡可能快的壓縮和解壓速度,同時減少對CPU的消耗; * 在Snappy發布之后,建議采用Snappy算法(參考《HBase: The Definitive Guide》),具體可以根據實際情況對LZO和Snappy做過更詳細的對比測試后再做選擇。 | Algorithm | % remaining| Encoding| Decoding| | --- | --- | --- | --- | | GZIP| 13.4% | 21 MB/s | 118 MB/s| | LZO | 20.5% | 135 MB/s | 410 MB/s| | Zippy/Snappy | 22.2% | 172 MB/s| 409 MB/s| > 如果建表之初沒有壓縮,后來想要加入壓縮算法,可以通過alter修改schema 4) alter > 使用方法: > 如 修改壓縮算法 ~~~ disable 'table' alter 'table',{NAME=>'info',COMPRESSION=>'snappy'} enable 'table' ~~~ > 但是需要執行major_compact 'table' 命令之后 才會做實際的操作。 5) TTL > 默認是 2147483647 即:Integer.MAX_VALUE 值大概是68年 > 這個參數是說明該列族數據的存活時間,單位是s > 這個參數可以根據具體的需求對數據設定存活時間,超過存過時間的數據將在表中不在顯示,待下次major compact的時候再徹底刪除數據 > 注意的是TTL設定之后 MIN_VERSIONS=>'0' 這樣設置之后,TTL時間戳過期后,將全部徹底刪除該family下所有的數據,如果MIN_VERSIONS 不等于0那將保留最新的MIN_VERSIONS個版本的數據,其它的全部刪除,比如MIN_VERSIONS=>'1' 屆時將保留一個最新版本的數據,其它版本的數據將不再保存。 6) describe 'table' 這個命令查看了create table 的各項參數或者是默認值。 7) disable_all 'toplist.*' disable_all 支持正則表達式,并列出當前匹配的表的如下: ~~~ toplist_a_total_1001 toplist_a_total_1002 toplist_a_total_1008 toplist_a_total_1009 toplist_a_total_1019 toplist_a_total_1035 ... Disable the above 25 tables (y/n)? 并給出確認提示 ~~~ 8) drop_all 這個命令和disable_all的使用方式是一樣的 9) hbase 表預分區----手動分區 > 默認情況下,在創建HBase表的時候會自動創建一個region分區,當導入數據的時候,所有的HBase客戶端都向這一個region寫數據,直到這個region足夠大了才進行切分。一種可以加快批量寫入速度的方法是通過預先創建一些空的regions,這樣當數據寫入HBase時,會按照region分區情況,在集群內做數據的負載均衡。 > 命令方式: ~~~ create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'} ~~~ > 也可以使用api的方式: ~~~ bin/hbase org.apache.hadoop.hbase.util.RegionSplitter test_table HexStringSplit -c 10 -f info 參數: test_table是表名 HexStringSplit 是split 方式 -c 是分10個region -f 是family ~~~ > 可在UI上查看結果,如圖: ![](https://box.kancloud.cn/4d436820799190a23949e467b079d9d3_554x622.png) > 這樣就可以將表預先分為15個區,減少數據達到storefile 大小的時候自動分區的時間消耗,并且還有以一個優勢,就是合理設計rowkey 能讓各個region 的并發請求平均分配(趨于均勻) 使IO 效率達到最高,但是預分區需要將filesize 設置一個較大的值,設置哪個參數呢 hbase.hregion.max.filesize 這個值默認是10G 也就是說單個region 默認大小是10G > 這個參數的默認值在0.90 到0.92到0.94.3各版本的變化:256M--1G--10G >但是如果MapReduce Input類型為TableInputFormat 使用hbase作為輸入的時候,就要注意了,每個region一個map,如果數據小于10G 那只會啟用一個map 造成很大的資源浪費,這時候可以考慮適當調小該參數的值,或者采用預分配region的方式,并將檢測如果達到這個值,再手動分配region。 ### 3.2 hbase應用案例看行鍵設計 > 表結構設計 1) 列族數量的設定 > 以用戶信息為例,可以將必須的基本信息存放在一個列族,而一些附加的額外信息可以放在另一列族; 4) 行鍵的設計 > 語音詳單: ~~~ 13877889988-20150625 13877889988-20150625 13877889988-20150626 13877889988-20150626 13877889989 13877889989 13877889989 ~~~ > ----將需要批量查詢的數據盡可能連續存放 > CMS系統----多條件查詢 > 盡可能將查詢條件關鍵詞拼裝到rowkey中,查詢頻率最高的條件盡量往前靠 > 20150230-zhangsan-category… > 20150230-lisi-category… > (每一個條件的值長度不同,可以通過做定長映射來提高效率) > 參考:《hbase 實戰》----詳細講述了facebook /GIS等系統的表結構設計 ### 3.3 Hbase和mapreduce結合 > 為什么需要用mapreduce去訪問hbase的數據? > ——加快分析速度和擴展分析能力 > Mapreduce訪問hbase數據作分析一定是在離線分析的場景下應用 ![](https://box.kancloud.cn/444eb31438c9511d4268a531f71c2025_555x309.png) #### 3.3.1 從Hbase中讀取數據、分析,寫入hdfs ~~~ /** public abstract class TableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> { } * @author duanhaitao@itcast.cn * */ public class HbaseReader { public static String flow_fields_import = "flow_fields_import"; static class HdfsSinkMapper extends TableMapper<Text, NullWritable>{ @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { byte[] bytes = key.copyBytes(); String phone = new String(bytes); byte[] urlbytes = value.getValue("f1".getBytes(), "url".getBytes()); String url = new String(urlbytes); context.write(new Text(phone + "\t" + url), NullWritable.get()); } } static class HdfsSinkReducer extends Reducer<Text, NullWritable, Text, NullWritable>{ @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "spark01"); Job job = Job.getInstance(conf); job.setJarByClass(HbaseReader.class); // job.setMapperClass(HdfsSinkMapper.class); Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob(flow_fields_import, scan, HdfsSinkMapper.class, Text.class, NullWritable.class, job); job.setReducerClass(HdfsSinkReducer.class); FileOutputFormat.setOutputPath(job, new Path("c:/hbasetest/output")); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.waitForCompletion(true); } } ~~~ #### 3.3.2 從hdfs中讀取數據寫入Hbase ~~~ /** public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, Writable> { } * @author duanhaitao@itcast.cn * */ public class HbaseSinker { public static String flow_fields_import = "flow_fields_import"; static class HbaseSinkMrMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String phone = fields[0]; String url = fields[1]; FlowBean bean = new FlowBean(phone,url); context.write(bean, NullWritable.get()); } } static class HbaseSinkMrReducer extends TableReducer<FlowBean, NullWritable, ImmutableBytesWritable>{ @Override protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { Put put = new Put(key.getPhone().getBytes()); put.add("f1".getBytes(), "url".getBytes(), key.getUrl().getBytes()); context.write(new ImmutableBytesWritable(key.getPhone().getBytes()), put); } } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "spark01"); HBaseAdmin hBaseAdmin = new HBaseAdmin(conf); boolean tableExists = hBaseAdmin.tableExists(flow_fields_import); if(tableExists){ hBaseAdmin.disableTable(flow_fields_import); hBaseAdmin.deleteTable(flow_fields_import); } HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(flow_fields_import)); HColumnDescriptor hColumnDescriptor = new HColumnDescriptor ("f1".getBytes()); desc.addFamily(hColumnDescriptor); hBaseAdmin.createTable(desc); Job job = Job.getInstance(conf); job.setJarByClass(HbaseSinker.class); job.setMapperClass(HbaseSinkMrMapper.class); TableMapReduceUtil.initTableReducerJob(flow_fields_import, HbaseSinkMrReducer.class, job); FileInputFormat.setInputPaths(job, new Path("c:/hbasetest/data")); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Mutation.class); job.waitForCompletion(true); } } ~~~ ### 3.3 hbase高級編程 #### 3.3.1 協處理器---- Coprocessor > 協處理器有兩種:observer和endpoint * Observer允許集群在正常的客戶端操作過程中可以有不同的行為表現 * Endpoint允許擴展集群的能力,對客戶端應用開放新的運算命令 1) Observer協處理器 * 正常put請求的流程: ![](https://box.kancloud.cn/bc0a4513b34a5e24cc9ec438c2bc02a9_293x201.png) * ? 加入Observer協處理后的put流程: ![](https://box.kancloud.cn/e111e371f3a0d373986650af5891240f_377x275.png) 1) 客戶端發出put請求 2) 該請求被分派給合適的RegionServer和region 3) coprocessorHost攔截該請求,然后在該表上登記的每個RegionObserver上調用prePut() 4) 如果沒有被prePut()攔截,該請求繼續送到region,然后進行處理 5) region產生的結果再次被CoprocessorHost攔截,調用postPut() 6) 假如沒有postPut()攔截該響應,最終結果被返回給客戶端 2) Observer的類型 1) RegionObs——這種Observer鉤在數據訪問和操作階段,所有標準的數據操作命令都可以被pre-hooks和post-hooks攔截 2) WALObserver——WAL所支持的Observer;可用的鉤子是pre-WAL和post-WAL 3) MasterObserver——鉤住DDL事件,如表創建或模式修改 3) Observer應用場景示例 > 見下節; > Endpoint—參考《Hbase 權威指南》 #### 3.3.2 二級索引 > row key在HBase中是以B+ tree結構化有序存儲的,所以scan起來會比較效率。單表以row key存儲索引,column value存儲id值或其他數據 ,這就是Hbase索引表的結構。 > 由于HBase本身沒有二級索引(Secondary Index)機制,基于索引檢索數據只能單純地依靠RowKey,為了能支持多條件查詢,開發者需要將所有可能作為查詢條件的字段一一拼接到RowKey中,這是HBase開發中極為常見的做法 > 比如,現在有一張1億的用戶信息表,建有出生地和年齡兩個索引,我想得到一個條件是在杭州出生,年齡為20歲的按用戶id正序排列前10個的用戶列表。 > 有一種方案是,系統先掃描出生地為杭州的索引,得到一個用戶id結果集,這個集合的規模假設是10萬。然后掃描年齡,規模是5萬,最后merge這些用戶id,去重,排序得到結果。 > 這明顯有問題,如何改良? > 保證出生地和年齡的結果是排過序的,可以減少merge的數據量?但Hbase是按row key排序,value是不能排序的。 > 變通一下——將用戶id冗余到row key里?OK,這是一種解決方案了,這個方案的圖示如下: ![](https://box.kancloud.cn/202f53957657e08d21beb5ff7915ea84_301x209.png) > merge時提取交集就是所需要的列表,順序是靠索引增加了_id,以字典序保證的。 2) 按索引查詢種類建立組合索引。 > 在方案1的場景中,想象一下,如果單索引數量多達10個會怎么樣?10個索引,就要merge 10次,性能可想而知。 ![](https://box.kancloud.cn/03dd8f0da0451db5eb394b1ea74bf5a0_330x224.png) > 解決這個問題需要參考RDBMS的組合索引實現。 > 比如出生地和年齡需要同時查詢,此時如果建立一個出生地和年齡的組合索引,查詢時效率會高出merge很多。 > 當然,這個索引也需要冗余用戶id,目的是讓結果自然有序。結構圖示如下: ![](https://box.kancloud.cn/19544ce804313f37013bd9df92e57d17_201x265.png) > 這個方案的優點是查詢速度非常快,根據查詢條件,只需要到一張表中檢索即可得到結果list。缺點是如果有多個索引,就要建立多個與查詢條件一一對應的組合索引 > 而索引表的維護如果交給應用客戶端,則無疑增加了應用端開發的負擔 > 通過協處理器可以將索引表維護的工作從應用端剝離 * 利用Observer自動維護索引表示例 > 在社交類應用中,經常需要快速檢索各用戶的關注列表t_guanzhu,同時,又需要反向檢索各種戶的粉絲列表t_fensi,為了實現這個需求,最佳實踐是建立兩張互為反向的表: 1) 一個表為正向索引關注表 “t_guanzhu”: ~~~ Rowkey: A-B f1:From f1:To ~~~ 2) 另一個表為反向索引粉絲表:“t_fensi”: ~~~ Rowkey: B—A f1:From f1:To ~~~ > 插入一條關注信息時,為了減輕應用端維護反向索引表的負擔,可用Observer協處理器實現: ![](https://box.kancloud.cn/b71a6204b0473f73b210a95b4951f1ea_556x269.png) 1) 編寫自定義RegionServer ~~~ public class InverIndexCoprocessor extends BaseRegionObserver { @Override public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { // set configuration Configuration conf = HBaseConfiguration.create(); // need conf.set... HTable table = new HTable(conf, "t_fensi"); Cell fromCell = put.get("f1".getBytes(), "From".getBytes()).get(0); Cell toCell = put.get("f1".getBytes(), "To".getBytes()).get(0); byte[] valueArray = fromCell.getValue(); String from = new String(valueArray); valueArray = toCell.getValue(); String to = new String(valueArray); Put putIndex = new Put((to+"-"+from).getBytes()); putIndex.add("f1".getBytes(), "From".getBytes(),from.getBytes()); putIndex.add("f1".getBytes(), "To".getBytes(),to.getBytes()); table.put(putIndex); table.close(); } } ~~~ 2) 打成jar包“fensiguanzhu.jar”上傳hdfs ~~~ hadoop fs -put fensiguanzhu.jar /demo/ ~~~ 3) 修改t_fensi的schema,注冊協處理器 ~~~ hbase(main):017:0> alter ' t_fensi ',METHOD => 'table_att','coprocessor'=>'hdfs://spark01:9000/demo/ fensiguanzhu.jar|cn.itcast.bigdata.hbasecoprocessor. InverIndexCoprocessor|1001|' Updating all regions with the new schema... 0/1 regions updated. 1/1 regions updated. Done. ~~~ 4) 檢查是否注冊成功 ~~~ hbase(main):018:0> describe 'ff' DESCRIPTION ENABLED 'ff', {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://spark01:9000/demo/fensiguanzhu.jar|cn.itcast.bi true gdata.hbasecoprocessor.TestCoprocessor|1001|'}, {NAME => 'f1', DATA_BLOCK_ENCODING => 'NONE', BLOOMF ILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0 ', TTL => '2147483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', B LOCKCACHE => 'true'}, {NAME => 'f2', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATIO N_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '2147483647', KE EP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'} 1 row(s) in 0.0250 seconds ~~~ 5) 向正向索引表中插入數據進行驗證
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看