<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                [TOC] # 依賴的maven ~~~ <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>1.4.3</version> <type>pom</type> </dependency> ~~~ # CURD代碼 注意: 全表掃描不是對某個時間點表的快照的掃描.如果掃描已經開始,但是在行R被掃描器對象讀出之前,行R被改變了,那么掃描器讀出行R更新后的版本.但是掃描器讀出的數據是一致的,得到R更新后的完整行 ## 前置操作 ~~~ package com.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; public class HbaseDemo { private Configuration conf = null; private Connection conn = null; @Before public void init() throws IOException { //構建個配置 conf = HBaseConfiguration.create(); //對于hbase的客戶端來說,只需要知道hbase所使用的zookeeper集群就可以了 //因為hbase的客戶端找hbase讀寫數據完全不用經過hmaster conf.set("hbase.zookeeper.quorum", "master:2181,slave1:2181,slave2:2181"); conn = ConnectionFactory.createConnection(conf); } } ~~~ 添加測試數據 ~~~ //添加數據 @Test public void testPut() throws IOException { Table table = conn.getTable(TableName.valueOf("t_user_info")); ArrayList<Put> puts = new ArrayList<Put>(); //構建一個put對象(kv),指定行鍵 Put put01 = new Put(Bytes.toBytes("user001")); put01.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhangsan")); Put put02 = new Put("user001".getBytes()); put02.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("password"), Bytes.toBytes("123456")); Put put03 = new Put("user002".getBytes()); put03.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("lisi")); put03.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false")); Put put04 = new Put("zhang_sh_01".getBytes()); put04.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhang01")); put04.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false")); Put put05 = new Put("zhang_sh_02".getBytes()); put05.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhang02")); put05.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false")); Put put06 = new Put("liu_sh_01".getBytes()); put06.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("liu01")); put06.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false")); Put put07 = new Put("zhang_bj_01".getBytes()); put07.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhang03")); put07.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false")); Put put08 = new Put("zhang_bj_01".getBytes()); put08.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username"), Bytes.toBytes("zhang04")); put08.addColumn(Bytes.toBytes("extra_info"), Bytes.toBytes("married"), Bytes.toBytes("false")); puts.add(put01); puts.add(put02); puts.add(put03); puts.add(put04); puts.add(put05); puts.add(put06); puts.add(put07); puts.add(put08); table.put(puts); table.close(); conn.close(); } ~~~ ## 表是否存在 ~~~ @Test public boolean testExists(String tableName) throws IOException { //老API //HBaseAdmin admin = new HBaseAdmin(conf); //新API Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); return admin.tableExists(TableName.valueOf(tableName)); } ~~~ ## 創建表 這是不需要命名空間的 ~~~ //建表 @Test public void testCreate() throws IOException { //獲取一個表管理器 Admin admin = conn.getAdmin(); //構造一個表描述器,并指定表名 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("t_user_info")); //構造一個列族描述器,并指定列族名 HColumnDescriptor hcd1 = new HColumnDescriptor("base_info"); //為該列族設定一個布隆過濾器類型參數/版本數量 hcd1.setBloomFilterType(BloomType.ROW).setVersions(1, 3); //構造第二個列族描述器,并指定列族名 HColumnDescriptor hcd2 = new HColumnDescriptor("extra_info"); //為該列族設定一個布隆過濾器類型參數/版本數量 hcd2.setBloomFilterType(BloomType.ROW).setVersions(1, 3); //將列族描述器添加到表描述器中 htd.addFamily(hcd1).addFamily(hcd2); admin.createTable(htd); admin.close(); conn.close(); } ~~~ ## 命名空間管理 命名空間可以被創建、移除、修改。 表和命名空間的隸屬關系在在創建表時決定,通過以下格式指定: `<namespace>:<table>` Example:hbase shell中創建命名空間、創建命名空間中的表、移除命名空間、修改命名空間 ~~~ #Create a namespace create_namespace 'my_ns' ~~~ ~~~ #create my_table in my_ns namespace create 'my_ns:my_table', 'fam' ~~~ ~~~ #drop namespace drop_namespace 'my_ns' ~~~ ~~~ #alter namespace alter_namespace 'my_ns', {METHOD => 'set', 'PROPERTY_NAME' => 'PROPERTY_VALUE'} ~~~ 預定義的命名空間 有兩個系統內置的預定義命名空間: * hbase:系統命名空間,用于包含hbase的內部表 * default:所有未指定命名空間的表都自動進入該命名空間 Example:指定命名空間和默認命名空間 ~~~ #namespace=foo and table qualifier=bar create 'foo:bar', 'fam' ~~~ ~~~ #namespace=default and table qualifier=bar create 'bar', 'fam' ~~~ **代碼** ~~~ Configuration conf = HBaseConfiguration.create(); HBaseAdmin admin = new HBaseAdmin(conf); //create namespace named "my_ns" admin.createNamespace(NamespaceDescriptor.create("my_ns").build()); //create tableDesc, with namespace name "my_ns" and table name "mytable" HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf("my_ns:mytable")); tableDesc.setDurability(Durability.SYNC_WAL); //add a column family "mycf" HColumnDescriptor hcd = new HColumnDescriptor("mycf"); tableDesc.addFamily(hcd); admin.createTable(tableDesc); admin.close(); ~~~ **關鍵知識點** 1. 必須將HBase集群的hbase-site.xml文件添加進工程的classpath中,或者通過Configuration對象設置相關屬性,否則程序獲取不到集群相關信息,也就無法找到集群,運行程序時會報錯; 2. HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf("my_ns:mytable"))代碼是描述表mytable,并將mytable放到了my_ns命名空間中,前提是該命名空間已存在,如果指定的是不存在命名空間,則會報錯org.apache.hadoop.hbase.NamespaceNotFoundException; 3. 命名空間一般在建模階段通過命令行創建,在java代碼中通過admin.createNamespace(NamespaceDescriptor.create("my_ns").build())創建的機會不多; 4. **創建HBaseAdmin對象時就已經建立了客戶端程序與HBase集群的connection**,所以在程序執行完成后,務必通過admin.close()關閉connection; 5. 可以**通過HTableDescriptor對象設置表的特性,比如:通過tableDesc.setMaxFileSize(512)設置一個region中的store文件的最大size,當一個region中的最大store文件達到這個size時,region就開始分裂;通過tableDesc.setMemStoreFlushSize(512)設置region內存中的memstore的最大值**,當memstore達到這個值時,開始往磁盤中刷數據。更多特性請自行查閱官網API; 6. 可以通過HColumnDescriptor對象設置列族的特性,比如:**通過hcd.setTimeToLive(5184000)設置數據保存的最長時間;通過hcd.setInMemory(true)設置數據保存在內存中以提高響應速度;通過 hcd.setMaxVersions(10)設置數據保存的最大版本數;通過hcd.setMinVersions(5)設置數據保存的最小版本數(配合TimeToLive使用)**。更多特性請自行查閱官網API; 7. 數據的版本數只能通過HColumnDescriptor對象設置,不能通過HTableDescriptor對象設置; 8. 由于HBase的數據是先寫入內存,數據累計達到內存閥值時才往磁盤中flush數據,所以,如果在數據還沒有flush進硬盤時,regionserver down掉了,內存中的數據將丟失。要想解決這個場景的問題就需要用到WAL(Write-Ahead-Log),tableDesc.setDurability(Durability.SYNC_WAL)就是設置寫WAL日志的級別,示例中設置的是同步寫WAL,該方式安全性較高,但無疑會一定程度影響性能,請根據具體場景選擇使用; 9. setDurability(Durability d)方法可以在相關的三個對象中使用,分別是:HTableDescriptor,Delete,Put(其中Delete和Put的該方法都是繼承自父類org.apache.hadoop.hbase.client.Mutation)。分別針對表、插入操作、刪除操作設定WAL日志寫入級別。需要注意的是,D**elete和Put并不會繼承Table的Durability級別(已實測驗證)**。Durability是一個枚舉變量,可選值參見4.2節。如果不通過該方法指定WAL日志級別,則為默認USE_DEFAULT級別。 ## 刪除表 刪除表沒創建表那么多學問,直接上代碼 ~~~ Configuration conf = HBaseConfiguration.create(); HBaseAdmin admin = new HBaseAdmin(conf); String tablename = "my_ns:mytable"; if(admin.tableExists(tablename)) { try { if (! admin.isTableDisabled(TableName.valueOf(tableName))) { admin.disableTable(tablename); } admin.deleteTable(tablename); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } admin.close(); ~~~ 刪除表前必須先disable表 ## 刪除表中的數據 Delete類用于刪除表中的一行數據,通過HTable.delete來執行該動作。 在執行Delete操作時,HBase并不會立即刪除數據,而是對需要刪除的數據打上一個“墓碑”標記,直到當Storefile合并時,再清除這些被標記上“墓碑”的數據。 如果希望刪除整行,用行鍵來初始化一個Delete對象即可。如果希望進一步定義刪除的具體內容,可以使用以下這些Delete對象的方法: * 為了刪除指定的列族,可以使用deleteFamily * 為了刪除指定列的多個版本,可以使用deleteColumns * 為了刪除指定列的指定版本,可以使用deleteColumn,這樣的話就只會刪除版本號(時間戳)與指定版本相同的列。如果不指定時間戳,默認只刪除最新的版本 **構造函數** 1. 指定要刪除的行鍵 ~~~ Delete(byte[] row) ~~~ 刪除行鍵指定行的數據。 如果沒有進一步的操作,使用該構造函數將刪除行鍵指定的行中**所有列族中所有列的所有版本!** 2. 指定要刪除的行鍵和時間戳 ~~~ Delete(byte[] row, long timestamp) ~~~ 刪除行鍵和時間戳共同確定行的數據。 如果沒有進一步的操作,使用該構造函數將刪除行鍵指定的行中,所有列族中所有列的**時間戳小于等于指定時間戳的數據版本**。 注意:該時間戳僅僅和刪除行有關,如果需要進一步指定列族或者列,你必須分別為它們指定時間戳。 3. 給定一個字符串,目標行鍵的偏移,截取的長度 ~~~ Delete(byte[] rowArray, int rowOffset, int rowLength) ~~~ 4. 給定一個字符串,目標行鍵的偏移,截取的長度,時間戳 ~~~ Delete(byte[] rowArray, int rowOffset, int rowLength, long ts) ~~~ **常用方法** * `Delete deleteColumn(byte[] family, byte[] qualifier)` 刪除指定列的**最新版本**的數據。 * `Delete deleteColumns(byte[] family, byte[] qualifier) ` 刪除指定列的**所有版本**的數據。 * `Delete deleteColumn(byte[] family, byte[] qualifier, long timestamp)` 刪除指定列的**指定版本**的數據。 * `Delete deleteColumns(byte[] family, byte[] qualifier, long timestamp)` 刪除指定列的,時間戳**小于等于給定時間戳**的**所有**版本的數據。 * `Delete deleteFamily(byte[] family)` 刪除指定列族的所有列的**所有**版本數據。 * `Delete deleteFamily(byte[] family, long timestamp)` 刪除指定列族的所有列中**時間戳小于等于指定時間戳**的所有數據。 * `Delete deleteFamilyVersion(byte[] family, long timestamp)` 刪除指定列族中所有**列的時間戳等于指定時間戳**的版本數據。 * `void setTimestamp(long timestamp)` 為Delete對象設置時間戳。 **實例代碼** **刪除整行的所有列族、所有行、所有版本** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Delete delete = new Delete(Bytes.toBytes("000")); table.delete(delete); table.close(); ~~~ **刪除指定列的最新版本** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Delete delete = new Delete(Bytes.toBytes("100003")); delete.deleteColumn(Bytes.toBytes("info"), Bytes.toBytes("address")); table.delete(delete); table.close(); ~~~ **刪除指定列的所有版本** 接以上場景,執行以下代碼: ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Delete delete = new Delete(Bytes.toBytes("100003")); delete.deleteColumns(Bytes.toBytes("info"), Bytes.toBytes("address")); table.delete(delete); table.close(); ~~~ **刪除指定列族中所有列的時間戳等于指定時間戳的版本數據** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Delete delete = new Delete(Bytes.toBytes("100003")); delete.deleteFamilyVersion(Bytes.toBytes("info"), 1405390959464L); table.delete(delete); table.close(); ~~~ ## 修改表 修改現有列族的屬性 ~~~ @Test public void testModify() throws IOException { Admin admin = conn.getAdmin(); // admin.disableTable(TableName.valueOf("t_user_info")); // 修改已有的ColumnFamily HTableDescriptor table = admin.getTableDescriptor(TableName.valueOf("t_user_info")); HColumnDescriptor f2 = table.getFamily("extra_info".getBytes()); //設置布隆過濾器 f2.setBloomFilterType(BloomType.ROWCOL); //設置版本 f2.setVersions(1, 5); // 添加新的ColumnFamily table.addFamily(new HColumnDescriptor("other_info")); //將修改后的描述對象應用到目標表 admin.modifyTable(TableName.valueOf("t_user_info"), table); admin.close(); conn.close(); } ~~~ 修改表,刪除三個列族,新增一個列族 ~~~ Configuration conf = HBaseConfiguration.create(); HBaseAdmin admin = new HBaseAdmin(conf); String tablename = "rd_ns:itable"; if(admin.tableExists(tablename)) { try { admin.disableTable(tablename); //get the TableDescriptor of target table HTableDescriptor newtd = admin.getTableDescriptor(Bytes.toBytes("rd_ns:itable")); //remove 3 useless column families newtd.removeFamily(Bytes.toBytes("note")); newtd.removeFamily(Bytes.toBytes("newcf")); newtd.removeFamily(Bytes.toBytes("sysinfo")); //create HColumnDescriptor for new column family HColumnDescriptor newhcd = new HColumnDescriptor("action_log"); newhcd.setMaxVersions(10); newhcd.setKeepDeletedCells(true); //add the new column family(HColumnDescriptor) to HTableDescriptor newtd.addFamily(newhcd); //modify target table struture admin.modifyTable(Bytes.toBytes("rd_ns:itable"),newtd); admin.enableTable(tablename); } catch (Exception e) { e.printStackTrace(); } } admin.close(); ~~~ 邏輯很簡單: 1. 通過admin.getTableDescriptor(Bytes.toBytes("rd_ns:itable"))取得目標表的描述對象,應該就是取得指向該對象的指針了; 2. 修改目標表描述對象; 3. 通過admin.modifyTable(Bytes.toBytes("rd_ns:itable"),newtd)將修改后的描述對象應用到目標表。 ## 添加數據 **新增、更新數據Put** **常用構造函數** 1. 指定行鍵 ~~~ public Put(byte[] row) ~~~ 參數:row 行鍵 2. 指定行鍵和時間戳 ~~~ public Put(byte[] row, long ts) ~~~ 參數:row 行鍵,ts 時間戳 3. 從目標字符串中提取子串,作為行鍵 ~~~ Put(byte[] rowArray, int rowOffset, int rowLength) ~~~ 4. 從目標字符串中提取子串,作為行鍵,并加上時間戳 ~~~ Put(byte[] rowArray, int rowOffset, int rowLength, long ts) ~~~ **常用方法** 1. 指定列族、限定符,添加值 ~~~ add(byte[] family, byte[] qualifier, byte[] value) ~~~ 2. 指定列族、限定符、時間戳,添加值 ~~~ add(byte[] family, byte[] qualifier, long ts, byte[] value) ~~~ 3. 設置寫WAL(Write-Ahead-Log)的級別 ~~~ public void setDurability(Durability d) ~~~ 參數是一個枚舉值,可以有以下幾種選擇: * ASYNC_WAL : 當數據變動時,異步寫WAL日志 * SYNC_WAL : 當數據變動時,同步寫WAL日志 * FSYNC_WAL : 當數據變動時,同步寫WAL日志,并且,強制將數據寫入磁盤 * SKIP_WAL : 不寫WAL日志 * USE_DEFAULT : 使用HBase全局默認的WAL寫入級別,即SYNC_WAL **實例代碼** **插入行** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Put put = new Put(Bytes.toBytes("100001")); put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("lion")); put.add(Bytes.toBytes("info"), Bytes.toBytes("address"), Bytes.toBytes("shangdi")); put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("30")); put.setDurability(Durability.SYNC_WAL); table.put(put); table.close(); ~~~ **更新行** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Put put = new Put(Bytes.toBytes("100001")); put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("lee")); put.add(Bytes.toBytes("info"), Bytes.toBytes("address"), Bytes.toBytes("longze")); put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("31")); put.setDurability(Durability.SYNC_WAL); table.put(put); table.close(); ~~~ 注意: 1. **Put的構造函數都需要指定行鍵,如果是全新的行鍵,則新增一行;如果是已有的行鍵,則更新現有行** 2. **創建Put對象及put.add過程都是在構建一行的數據,創建Put對象時相當于創建了行對象,add的過程就是往目標行里添加cell,直到table.put才將數據插入表格**; 3. 以上代碼創建Put對象用的是構造函數1,也可用構造函數2,第二個參數是時間戳; 4. Put還有別的構造函數,請查閱官網API。 **從目標字符串中提取子串,作為行鍵,構建Put** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Put put = new Put(Bytes.toBytes("100001_100002"),7,6); put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("show")); put.add(Bytes.toBytes("info"), Bytes.toBytes("address"), Bytes.toBytes("caofang")); put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes("30")); table.put(put); table.close(); ~~~ 注意,關于:Put put = new Put(Bytes.toBytes("100001_100002"),7,6) **第二個參數是偏移量,也就是行鍵從第一個參數的第幾個字符開始截取;** **第三個參數是截取長度;** **這個代碼實際是從 100001_100002 中截取了100002子串作為目標行的行鍵** ## 讀取數據 讀取,get一次讀取一行 ~~~ @Test public void testGet() throws IOException { Table table = conn.getTable(TableName.valueOf("t_user_info")); //構造一個get查詢對象.指定要get的是那一行 Get get = new Get("user001".getBytes()); Result result = table.get(get); CellScanner cellScanner = result.cellScanner(); //迭代 while (cellScanner.advance()) { Cell current = cellScanner.current(); //列族名 byte[] familyArray = current.getFamilyArray(); //列標識符的名稱 byte[] qualifierArray = current.getQualifierArray(); //具體的值 byte[] valueArray = current.getValueArray(); //獲取有用字符 System.out.printf(new String(familyArray, current.getFamilyOffset(), current.getFamilyLength())); System.out.printf(":" + new String(qualifierArray, current.getQualifierOffset(), current.getQualifierLength())); System.out.printf(" " + new String(valueArray, current.getValueOffset(), current.getValueLength())); System.out.println(); } table.close(); conn.close(); } ~~~ 批量查詢數據 ~~~ @Test public void testScan() throws IOException { Table t_user_info = conn.getTable(TableName.valueOf("t_user_info")); //表是liu_sh_01,row key是zhang_bj_01 //數據(字典排序,從liu_sh_01到zhang_bj_01之間的row key全部遍歷)("\000"不加這個是包頭不包尾,加了是全部包,原因是這個字段排序是排在zhang_bj_01后面),因為永遠不知道下一個rowkey是什么,就加個\000來表示下一個rowkey Scan scan = new Scan(Bytes.toBytes("liu_sh_01"), Bytes.toBytes("zhang_bj_01" + "\000")); ResultScanner scanner = t_user_info.getScanner(scan); //迭代器 Iterator<Result> iter = scanner.iterator(); while (iter.hasNext()) { //獲取一行記錄 Result result = iter.next(); //獲取到每一個cell CellScanner cellScanner = result.cellScanner(); //遍歷cell while (cellScanner.advance()) { Cell current = cellScanner.current(); byte[] familyArray = current.getFamilyArray(); byte[] valueArray = current.getValueArray(); byte[] qualifierArray = current.getQualifierArray(); byte[] rowArray = current.getRowArray(); System.out.print(new String(rowArray, current.getRowOffset(), current.getRowLength())+" "); System.out.print(new String(familyArray, current.getFamilyOffset(), current.getFamilyLength())); System.out.print(":" + new String(qualifierArray, current.getQualifierOffset(), current.getQualifierLength())); System.out.print(" " + new String(valueArray, current.getValueOffset(), current.getValueLength())); System.out.println(); } System.out.println("-----------------------------"); } } ~~~ 讀取指定的列,多版本 ~~~ @Test public void testGetColumn() throws IOException { Table table = conn.getTable(TableName.valueOf("t_user_info")); //構造一個get查詢對象.指定要get的是那一行 Get get = new Get("zhang_sh_02".getBytes()); //設置一次性取多少個版本的數據 get.setMaxVersions(4); // 獲取指定列族和列修飾符對應的列 get.addColumn(Bytes.toBytes("base_info"), Bytes.toBytes("username")); Result result = table.get(get); for (KeyValue kv : result.list()) { System.out.println("family:" + Bytes.toString(kv.getFamily())); System.out.println("qualifier:" + Bytes.toString(kv.getQualifier())); System.out.println("value:" + Bytes.toString(kv.getValue())); System.out.println("Timestamp:" + kv.getTimestamp()); System.out.println("-------------------------------------------"); } table.close(); conn.close(); } ~~~ ## Get獲取單行 如果希望獲取整行數據,用行鍵初始化一個Get對象就可以,如果希望進一步縮小獲取的數據范圍,可以使用Get對象的以下方法: * 如果希望取得指定列族的所有列數據,使用**addFamily**添加所有的目標列族即可; * 如果希望取得指定列的數據,使用**addColumn**添加所有的目標列即可; * 如果希望取得目標列的指定時間戳范圍的數據版本,使用**setTimeRange**; * 如果僅希望獲取目標列的指定時間戳版本,則使用**setTimestamp**; * 如果希望限制每個列返回的版本數,使用**setMaxVersions**; * 如果希望添加過濾器,使用**setFilte** 上述講述了如何使用Get從HBase中獲取數據,并將數據進行展示,其實Get對象中的很多屬性可以控制在進行查詢時的細節控制,從而控制數據從HBase服務器返回時的數據量,從而可以進行數據優化 1. `Get(byte[] row) / Get(byte[] row, RowLock lock)` 初始化函數。在初始化函數時必須要指定Get將要獲取的行鍵,第二個函數則是允許用戶自己對Get上一個行鎖,但是系統并不贊成用戶這么使用。因為在多個客戶端進行操作,且都上了自定義的行鎖以后,可能會出現因為彼此的行鎖需要對方的資源而死鎖現象。但是兩個客戶端的長時間等待與系統連接資源的占用。 2. `addFamily(byte[] family) / addColumn(byte[] family, byte[] qualifier)` 添加列簇 / 添加列函數。通過該函數Get在數據獲取時,獲取的數據范圍:兩個函數都不設定時獲取正行的所有數據。 使用 addFamily時獲取制定列簇的所有列的數據。 addColumn則獲取制定列的數據 3. `setTimeStamp(long timestamp)` 設置獲取數據的時間戳 4. `setTimeRange(long minTime,long maxTime)` 設置獲取數據的時間戳范圍 5. `setMaxVersion(int version) / setMaxVersion()` 在默認情況下,Get方法之獲取一列的最新的版本。但是有時需要的話則會一次獲取多個版本的數據。 第一個函數可以指定確切的返回的版本數量。第二個函數則相當于setMaxVersion(Integer.MAX_VALUE)。即獲取列中所有版本的 數據。 6. `setCacheBlock(boolean open)` 是否打開服務器端快緩存。設置該Get獲取的數據是否緩存在內存中 在HBase中,整個表以region分塊的方式被分布式的存在不同的region服務器中。每一個region服務器將會維護多個region。而在每一個region中都會存在快緩存區域。當每次去讀某一個KeyValue數據塊時,則會將整個數據加載到緩存區中。又因為加載的數據遠大于一個KeyValue所含的數據大小。所以一般情況下緩存區域內都會存放當前KeyValue對象的連續的數據。但是如果在隨機讀寫的程序中,這種數據加載進入緩存區并沒有任何的作用,反而會因為在家時間而使得數據獲取時間增長。因此我們要根據實際情況去選擇是否開啟region上的緩存區。連續讀寫時,開始緩存區可以增加搜索速度。在隨機讀寫時,關閉緩存區可以縮小讀取時間。 7. `setFilter(Filter f)` 添加過濾器。因為HBase并沒有原聲的SQL指定環境,因此在SQL語句中的where條件語句就需要通過特定的借口去實現,而Filter則就是頂替了where 語句的作用。能夠實現在在數據查詢中的一些精細的控制。 8. 設置獲取數據的版本 `Get setMaxVersions(int maxVersions)` 設定獲取數據的版本數 `Get setMaxVersions()` 設定獲取數據的所有版本 **代碼** **獲取行鍵指定行的所有列族、所有列的最新版本數據** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Get get = new Get(Bytes.toBytes("100003")); Result r = table.get(get); for (Cell cell : r.rawCells()) { System.out.println( "Rowkey : "+Bytes.toString(r.getRow())+ " Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+ " Value : "+Bytes.toString(CellUtil.cloneValue(cell)) ); } table.close(); ~~~ **獲取行鍵指定行中,指定列的最新版本數據** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Get get = new Get(Bytes.toBytes("100003")); get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")); Result r = table.get(get); for (Cell cell : r.rawCells()) { System.out.println( "Rowkey : "+Bytes.toString(r.getRow())+ " Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+ " Value : "+Bytes.toString(CellUtil.cloneValue(cell)) ); } table.close(); ~~~ **獲取行鍵指定的行中,指定時間戳的數據** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:leetable"); Get get = new Get(Bytes.toBytes("100003")); get.setTimeStamp(1405407854374L); Result r = table.get(get); for (Cell cell : r.rawCells()) { System.out.println( "Rowkey : "+Bytes.toString(r.getRow())+ " Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+ " Value : "+Bytes.toString(CellUtil.cloneValue(cell)) ); } table.close(); ~~~ **獲取行鍵指定的行中,所有版本的數據** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:itable"); Get get = new Get(Bytes.toBytes("100003")); get.setMaxVersions(); Result r = table.get(get); for (Cell cell : r.rawCells()) { System.out.println( "Rowkey : "+Bytes.toString(r.getRow())+ " Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+ " Value : "+Bytes.toString(CellUtil.cloneValue(cell))+ " Time : "+cell.getTimestamp() ); } table.close(); ~~~ 注意: **能輸出多版本數據的前提是當前列族能保存多版本數據,列族可以保存的數據版本數通過HColumnDescriptor的setMaxVersions(Int)方法設置** ## scan獲取多行 Scan對象可以返回滿足給定條件的多行數據。如果希望獲取所有的行,直接初始化一個Scan對象即可。如果希望限制掃描的行范圍,可以使用以下方法: * 如果希望獲取指定列族的所有列,可使用addFamily方法來添加所有希望獲取的列族 * 如果希望獲取指定列,使用addColumn方法來添加所有列 * 通過setTimeRange方法設定獲取列的時間范圍 * 通過setTimestamp方法指定具體的時間戳,只返回該時間戳的數據 * 通過setMaxVersions方法設定最大返回的版本數 * 通過setBatch方法設定返回數據的最大行數 * 通過setFilter方法為Scan對象添加過濾器,過濾器詳解請參見:http://blog.csdn.net/u010967382/article/details/37653177 * Scan的結果數據是可以緩存在內存中的,可以通過getCaching()方法來查看當前設定的緩存條數,也可以通過setCaching(int caching)來設定緩存在內存中的行數,緩存得越多,以后查詢結果越快,同時也消耗更多內存。此外,通過setCacheBlocks方法設置是否緩存Scan的結果數據塊,默認為true * 我們可以通過setMaxResultSize(long)方法來設定Scan返回的結果行數 **常用構造函數** 1. 創建掃描所有行的Scan ~~~ Scan() ~~~ 2. 創建Scan,從指定行開始掃描 ~~~ Scan(byte[] startRow) ~~~ 參數:startRow行鍵 注意:如果指定行不存在,從下一個最近的行開始 3. 創建Scan,指定起止行 ~~~ Scan(byte[] startRow, byte[] stopRow) ~~~ 參數:startRow起始行,stopRow終止行 注意:`startRow <= 結果集 < stopRow` 4. 創建Scan,指定起始行和過濾器 ~~~ Scan(byte[] startRow, Filter filter) ~~~ 參數:startRow起始行,filter過濾器 注意:過濾器的功能和構造參見http://blog.csdn.net/u010967382/article/details/37653177 **常用方法** * `Scan setStartRow(byte[] startRow)` 設置Scan的開始行,**默認結果集包含該行**。如果希望結果集不包含該行,可以在行鍵末尾加上0。 * `Scan setStopRow(byte[] stopRow)` 設置Scan的結束行,**默認結果集不包含該行**。如果希望結果集包含該行,可以在行鍵末尾加上0。 * `Scan setTimeRange(long minStamp, long maxStamp)` 掃描指定**時間范圍**的數據 * `Scan setTimeStamp(long timestamp)` 掃描指定**時間**的數據 * `Scan addColumn(byte[] family, byte[] qualifier)` 指定掃描的列 * `Scan addFamily(byte[] family)` 指定掃描的列族 * `Scan setFilter(Filter filter)` 為Scan設置過濾器 * `Scan setReversed(boolean reversed)` 設置Scan的掃描順序,默認是正向掃描(false),可以設置為逆向掃描(true)。注意:該方法0.98版本以后才可用!! * `Scan setMaxVersions()` 獲取所有版本的數據 * `Scan setMaxVersions(int maxVersions)` 設置獲取的最大版本數 * `void setCaching(int caching)` 設定緩存在內存中的行數,緩存得越多,以后查詢結果越快,同時也消耗更多內存 * `void setRaw(boolean raw)` 激活或者禁用raw模式。如果raw模式被激活,Scan將返回所有已經被**打上刪除標記但尚未被真正刪除的數據**。該功能僅用于激活了KEEP_DELETED_ROWS的列族,即列族開啟了hcd.setKeepDeletedCells(true)。Scan激活raw模式后,就不能指定任意的列,否則會報錯 **代碼** **掃描表中的所有行的最新版本數據** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:itable"); Scan s = new Scan(); ResultScanner rs = table.getScanner(s); for (Result r : rs) { for (Cell cell : r.rawCells()) { System.out.println( "Rowkey : "+Bytes.toString(r.getRow())+ " Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+ " Value : "+Bytes.toString(CellUtil.cloneValue(cell))+ " Time : "+cell.getTimestamp() ); } } table.close(); ~~~ **掃描指定行鍵范圍,通過末尾加0,使得結果集包含StopRow** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:itable"); Scan s = new Scan(); s.setStartRow(Bytes.toBytes("100001")); s.setStopRow(Bytes.toBytes("1000020")); ResultScanner rs = table.getScanner(s); for (Result r : rs) { for (Cell cell : r.rawCells()) { System.out.println( "Rowkey : "+Bytes.toString(r.getRow())+ " Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+ " Value : "+Bytes.toString(CellUtil.cloneValue(cell))+ " Time : "+cell.getTimestamp() ); } } table.close(); ~~~ **返回所有已經被打上刪除標記但尚未被真正刪除的數據** 然而,使用Scan強大的s.setRaw(true)方法,可以獲得所有已經被打上刪除標記但尚未被真正刪除的數據。 代碼如下: ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:itable"); Scan s = new Scan(); s.setStartRow(Bytes.toBytes("100003")); s.setRaw(true); s.setMaxVersions(); ResultScanner rs = table.getScanner(s); for (Result r : rs) { for (Cell cell : r.rawCells()) { System.out.println( "Rowkey : "+Bytes.toString(r.getRow())+ " Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+ " Value : "+Bytes.toString(CellUtil.cloneValue(cell))+ " Time : "+cell.getTimestamp() ); } } table.close(); ~~~ **結合過濾器,獲取所有age在25到30之間的行** ~~~ Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, "rd_ns:itable"); FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); SingleColumnValueFilter filter1 = new SingleColumnValueFilter( Bytes.toBytes("info"), Bytes.toBytes("age"), CompareOp.GREATER_OR_EQUAL, Bytes.toBytes("25") ); SingleColumnValueFilter filter2 = new SingleColumnValueFilter( Bytes.toBytes("info"), Bytes.toBytes("age"), CompareOp.LESS_OR_EQUAL, Bytes.toBytes("30") ); filterList.addFilter(filter1); filterList.addFilter(filter2); Scan scan = new Scan(); scan.setFilter(filterList); ResultScanner rs = table.getScanner(scan); for (Result r : rs) { for (Cell cell : r.rawCells()) { System.out.println( "Rowkey : "+Bytes.toString(r.getRow())+ " Familiy:Quilifier : "+Bytes.toString(CellUtil.cloneQualifier(cell))+ " Value : "+Bytes.toString(CellUtil.cloneValue(cell))+ " Time : "+cell.getTimestamp() ); } } table.close(); ~~~ ## 計數器 計數器可以-1也可以是0 **在mapreduce中要注意,mapreduce任務失敗可能會重試,而導致如果用這個可能會不準.因為在mapreduce中可能不是冪等運算** ### 單計數器 ~~~ Table table = conn.getTable(TableName.valueOf("t_user_info")); //記住這個值初始的時候不要用put去設置,會導致后面的錯誤 原因是'1'會轉換成Bytes.toBytes() long rel = table.incrementColumnValue(Bytes.toBytes("user001"), Bytes.toBytes("base_info"), Bytes.toBytes("hit"), 2L); //返回這一列的結果 System.out.println(rel); //存儲成功會變成 //column=base_info:hit, timestamp=1532337393697, value=\x00\x00\x00\x00\x00\x00\x00\x01 table.close(); ~~~ ### 復合計數器 ~~~ Table table = connection.getTable(TableName.valueOf("counters")); Increment increment1 = new Increment(Bytes.toBytes("20160101")); increment1.addColumn(Bytes.toBytes("daily"),Bytes.toBytes("clicks"),1); increment1.addColumn(Bytes.toBytes("daily"),Bytes.toBytes("hits"),1); increment1.addColumn(Bytes.toBytes("weekly"),Bytes.toBytes("clicks"),10); increment1.addColumn(Bytes.toBytes("weekly"),Bytes.toBytes("hits"),10); Result result = table.increment(increment1); for(Cell cell:result.rawCells()){ System.out.println("Cell: " + cell + " Value: " + Bytes.toLong(cell.getValueArray(), cell.getValueOffset(),cell.getValueLength())); } Increment increment2 = new Increment(Bytes.toBytes("20160101")); increment2.addColumn(Bytes.toBytes("daily"),Bytes.toBytes("clicks"), 5); increment2.addColumn(Bytes.toBytes("daily"),Bytes.toBytes("hits"), 1); increment2.addColumn(Bytes.toBytes("weekly"),Bytes.toBytes("clicks"), 0); increment2.addColumn(Bytes.toBytes("weekly"),Bytes.toBytes("hits"), -5); Result result2 = table.increment(increment2); for (Cell cell : result2.rawCells()) { System.out.println("Cell: " + cell + " Value: " + Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } table.close(); connection.close(); ~~~ ### 獲取計數器的值 ~~~ @Test public void testGet()throws Exception{ HTable table = new HTable(conf,"wc"); Get get =new Get("apple01".getBytes()); get.addColumn("cf".getBytes(),"hits".getBytes()); Result result = table.get(get); for (KeyValue kv : result.list()) { System.out.println("family:" + Bytes.toString(kv.getFamily())); System.out .println("qualifier:" + Bytes.toString(kv.getQualifier())); System.out.println("value:" + Bytes.toLong(kv.getValue())); 計數器的值獲取 } table.close(); } ~~~ # 掃描器緩存 在Hbase的設置里掃描每次RPC調用得到一批數據.這可以在掃描對象上使用setCaching(int)在每個掃描器(scanner)層次上設置,也可以在hbase-site.xml配置文件里使用HBase.client.scanner.caching屬性來設置. 如果緩存值設置為n,每次RPC調用掃描器返回n行,然后這些數據緩存在客戶端.這個設置的默認值是1,這意味著客戶端對HBase的每次RPC調用在掃描整張表后僅僅返回一行.這個數字很保守,可以調整它以獲得更好的性能. 但是該值設置過高意味著客戶端和hbase的交互會出現較長的暫停,這會導致hbase端的超時. ResultScanner接口也有一個next(int)調用,你可以用來要求返回掃描的下面n行.這是在API層面提供的遍歷,與為了獲取那n行數據客戶端對HBase的RPC調用次數無關. 在內部機制中,ResultScanner使用了多次RPC調用來滿足這個請求,每次RPC調用返回的行數只取決于為掃描器設置的緩存值
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看