<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                # Storm Hive 集成 Hive 提供了 streaming API, 它允許將數據連續地寫入 Hive. 傳入的數據可以用小批量 record 的方式連續提交到現有的 Hive partition 或 table 中. 一旦提交了數據,它就可以立即顯示給所有的 hive 查詢. 有關 Hive Streaming API 的更多信息請參閱 [https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest](https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest) 在 Hive Streaming API 的幫助下, HiveBolt 和 HiveState 允許用戶將 Storm 中的數據直接傳輸到 Hive 中. 要使用 Hive streaming API, 用戶需要創建一個使用了 ORC 格式的 bucketed table. 如下所示 ``` create table test_table ( id INT, name STRING, phone STRING, street STRING) partitioned by (city STRING, state STRING) stored as orc tblproperties ("orc.compress"="NONE"); ``` ## HiveBolt (org.apache.storm.hive.bolt.HiveBolt) HiveBolt 控制 tuples 直接流入到 Hive 中. 使用 Hive 事務寫入 Tuples. HiveBolt 將流式傳輸的分區可以創建或預先創建,或者也可用 HiveBolt 來創建它們,如果它們不存在的話. ``` DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() .withColumnFields(new Fields(colNames)); HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper); HiveBolt hiveBolt = new HiveBolt(hiveOptions); ``` ### RecordHiveMapper 該 class 將 Tuple 的字段名映射到 Hive table 的列名. * DelimitedRecordHiveMapper (org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper) * JsonRecordHiveMapper (org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper) ``` DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() .withColumnFields(new Fields(colNames)) .withPartitionFields(new Fields(partNames)); or DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() .withColumnFields(new Fields(colNames)) .withTimeAsPartitionField("YYYY/MM/DD"); ``` | Arg(參數) | Description(描述) | Type(類型) | | --- | --- | --- | | withColumnFields | tuple 中要被映射到 table 列名的字段名稱 | Fields (必需的) | | withPartitionFields | tuple 中要被映射到 hive table partition 的字段名稱 | Fields | | withTimeAsPartitionField | 用戶可以使用系統時間作為 hive table 的 partition | String . Date format | ### HiveOptions (org.apache.storm.hive.common.HiveOptions) HiveBolt 將 HiveOptions 作為一個構造參數. ``` HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper) .withTxnsPerBatch(10) .withBatchSize(1000) .withIdleTimeout(10) ``` HiveOptions 參數 | Arg(參數) | Description(描述) | Type(類型) | | --- | --- | --- | | metaStoreURI | hive meta store URI (可以在 hive-site.xml 中找到) | String (必需的) | | dbName | 數據庫名 | String (必需的) | | tblName | 表名 | String (必需的) | | mapper | Mapper class, 映射 Tuple 的字段名稱到 Table 的列名稱 | DelimitedRecordHiveMapper 或 JsonRecordHiveMapper (必需的) | | withTxnsPerBatch | Hive 向 HiveBolt 的流客戶端授予 _一批事務_ 而不是單個事務. 此設置配置每個事務批處理所需的事務數. 來自單個批次中所有事務的數據最終在單個文件中. Flume 將在批處理中的每個事務中寫入最大的 batchSize 事件. 與 batchSize 配合使用的設置可以控制每個文件的大小. 請注意, 最終 Hive 將透明地將這些文件壓縮成較大的文件. | Integer . 默認 100 | | withMaxOpenConnections | 只允許這個數量的 open connections. 如果超過該數量, 則最近最少使用的 connection 將被 closed. | Integer . 默認 100 | | withBatchSize | 在單個 Hive 事務中寫入 Hive 的最大事件數 | Integer. 默認 15000 | | withCallTimeout | (In milliseconds) 針對 Hive & HDFS I/O operations 的超時, 例如 openTxn, write, commit, abort. | Integer. 默認 10000 | | withHeartBeatInterval | (In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring. Set this value to 0 to disable heartbeats. | Integer. 默認 240 | | withAutoCreatePartitions | HiveBolt 將自動創建必要的 Hive partition 以流式傳輸. | Boolean. 默認 true | | withKerberosPrinicipal | Kerberos user principal 用于安全的訪問 Hive | String | | withKerberosKeytab | Kerberos keytab 用戶安全的訪問 Hive | String | | withTickTupleInterval | (In seconds) 如果 &gt; 0, 則 Hive Bolt 將定期刷新事務批次. 建議啟用此功能, 以避免在等待批次阻塞時出現元組超時. | Integer. 默認 0 | ## HiveState (org.apache.storm.hive.trident.HiveTrident) Hive Trident state 也遵循 HiveBolt 類似的模式, 它以 HiveOptions 作為參數. ``` DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() .withColumnFields(new Fields(colNames)) .withTimeAsPartitionField("YYYY/MM/DD"); HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper) .withTxnsPerBatch(10) .withBatchSize(1000) .withIdleTimeout(10) StateFactory factory = new HiveStateFactory().withOptions(hiveOptions); TridentState state = stream.partitionPersist(factory, hiveFields, new HiveUpdater(), new Fields()); ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看