<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                # Storm Kinesis 集成 ## Storm Kinesis Spout 提供的核心storm spout(噴口),用戶從Amzon Kinesis Streams 中的流中消費數據。它存儲可以在zookeeper中提交的序列號,并在重新啟動后默認啟動消息記錄。下面是創建使用spout的示例拓撲的代碼示例。下面說明配置spout(噴口)時使用的每個對象。理想情況下,spout(噴口)任務的數量應等于運動時間碎片的數量。但是,每個任務都可以從多個分片中讀取。 ``` public class KinesisSpoutTopology { public static void main (String args[]) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { String topologyName = args[0]; RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper(); KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(), new ClientConfiguration(), Regions.US_WEST_2, 1000); ZkInfo zkInfo = new ZkInfo("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000); KinesisConfig kinesisConfig = new KinesisConfig(args[1], ShardIteratorType.TRIM_HORIZON, recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), zkInfo, kinesisConnectionInfo, 10000L); KinesisSpout kinesisSpout = new KinesisSpout(kinesisConfig); TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("spout", kinesisSpout, 3); topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout"); Config topologyConfig = new Config(); topologyConfig.setDebug(true); topologyConfig.setNumWorkers(3); StormSubmitter.submitTopology(topologyName, topologyConfig, topologyBuilder.createTopology()); } } ``` 就像你可以看到的,在它的構造函數中,引出了一個KinesisConfig對象。KinesisConfig的構造函數需要8個對象,如下所描述。 #### `String` streamName 用于消費數據的kinesis時間流的名稱 #### `ShardIteratorType` shardIteratorType 支持3種類型 - TRIM_HORIZON(beginning of shard), LATEST and AT_TIMESTAMP. 默認情況下,如果分片狀態為this,則忽略此參數在zookeeper中尋找。所以它們將首次應用從拓撲開始。如果您想在拓撲的后續運行中使用任何這些拓撲,將需要清除用于存儲序列號的zookeeper節點的狀態。 #### `RecordToTupleMapper` recordToTupleMapper 一個 `RecordToTupleMapper` 接口的實現,該端口將會將kinesis記錄轉換為storm元組。它有兩種方法。getOutputFields 告訴spout 要從getTuple方法發出的元組中存在的數據。如果getTuple返回null,記錄將被確認。 `java Fields getOutputFields (); List&lt;Object&gt; getTuple (Record record);` #### `Date` timestamp 與 AT_TIMESTAMP sharedIteratorType參數結合使用。這將使得spout(噴口)從那時開始提取記錄。該kinesis時間使用的時間是與kinesis時間記錄相關的服務器端時間。 #### `FailedMessageRetryHadnler` failedMessageRetryHandler `FailedMessageRetryHandler` 接口的實現。 默認情況下,該模塊提供支持指數退避重試的實現失敗消息的機制。該實現有兩個構造函數。默認值沒有args 構造函數將配置在100毫秒的第一次重試隨后在Math.pow(2,i-1)中退出,其中i是范圍2中的重試次數到LONG.MAX_LONG。2表示指數函數的基數(秒)。另外一個構造函數以毫秒為單位進行重試間隔,作為第一個參數進行首次重試,以秒為單位的指數函數為第二個參數,重試作為第三個參數。這種接口的方法及其spout(噴口)的工作方式如下描述 `java boolean failed (KinesisMessageId messageId); KinesisMessageId getNextFailedMessageToRetry (); void failedMessageEmitted (KinesisMessageId messageId); void acked (KinesisMessageId messageId);` 將在每個發生故障的元組上調用失敗的方法。如果計劃重新嘗試失敗的消息,則返回true,否則返回false。 getNextFailedMessageToRetry 方法將被稱為第一件事,每次一個spout(噴口)想要發生一個元組。它應該返回一個應該重試的消息,如果有,否則為空。請注意,在該時間段內沒有任何重試信息的情況下,它可以返回null。但是,當該消息被調用失敗方法時,它最終將返回它返回true的每個消息 如果spout(噴口)成功的設法從kinesis時獲取記錄并發送,failedMessageEmitted將被調用。否則,當getNextFailedMessageToRetry方法被再次調用的時候,該實現應該會返回相同的消息 一但失敗的消息被重新發送并被spout(噴口)成功地確認,則會被呼叫。如果失敗,spout(噴口)失敗就會被再次呼叫。 #### `ZkInfo` zkInfo 封裝的對象信息用來zookeeper的交互。這個構造函數使用zkUrl作為第一個參數,它是一個逗號分隔的字符串zk host和端口,zkNode作為第二個參數將用作存儲提交序列號的根節點,會話超時以毫秒為單位,連接超時作為第四毫秒,提交間隔為提交序列號到zookeeper的毫秒數的五分之一,重試嘗試作為zk客戶端的第六個連接重試嘗試,重試間隔以毫秒為單位的等待時間,然后再重試連接。 #### `KinesisConnectionInfo` kinesisConnectionInfo 使用kinesis客戶端捕獲連接到kinesis的參數的對象。它有一個構造函數來實現 `AWSCredentialsProvider`作為第一個參數。該模塊提供了一個稱為 `CredentialsProviderChain` 的實現,它允許spout(噴口)使用以下之一進行kinesis檢測,這5個機制順序按照以下順序 - `EnvironmentVariableCredentialsProvider`, `SystemPropertiesCredentialsProvider`, `ClasspathPropertiesFileCredentialsProvider`, `InstanceProfileCredentialsProvider`, `ProfileCredentialsProvider`。它需要一個`ClientConfiguration` 對象作為配置。它需要一個 `ClientConfiguration` 對象作為配置運動的第二個參數客戶端,`Regions` 作為第三個參數,設置客戶端上要連接的區域,recordsLimit作為第四個參數,表示最大數量的記錄Kinesis客戶端將每個GetReocrds請求檢索。這個限制應該根據記錄的大小,運動時間來仔細選擇吞吐量限制和風暴中每個元組延遲的拓撲。另外如果一個任務將從多個分片讀取,那么這將影響選擇權限認證。 #### `Long` maxUncommittedRecords 這表示每個任務允許的最大未提交序列號數。一旦達到這個數字,spout就不會從kinesis中獲取任何新的記錄。未提交的序列號被定義為尚未提交給zookeeper的任務的所有消息的總和。這與拓撲級別最大待處理消息不同。例如,如果此值設置為10,并且spout將序列號從1發送到10。序號1正在等待,2到10次被告知。在這種情況下,未提交的序列的數量為10,因為1到10范圍內的序列號可以被提交到zk。但是,storm仍然可以在端口上調用下一個元組,因為只有一個等待消息。 ### Maven dependencies Aws sdk version that this was tested with is 1.10.77 ``` <dependencies> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk</artifactId> <version>${aws-java-sdk.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${project.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>${curator.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> </dependency> </dependencies> ``` ## 將來的工作 處理kinesis中的碎片的合并或分裂,Trident噴口實施和指標
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看