# Storm Kinesis 集成
## Storm Kinesis Spout
提供的核心storm spout(噴口),用戶從Amzon Kinesis Streams 中的流中消費數據。它存儲可以在zookeeper中提交的序列號,并在重新啟動后默認啟動消息記錄。下面是創建使用spout的示例拓撲的代碼示例。下面說明配置spout(噴口)時使用的每個對象。理想情況下,spout(噴口)任務的數量應等于運動時間碎片的數量。但是,每個任務都可以從多個分片中讀取。
```
public class KinesisSpoutTopology {
public static void main (String args[]) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
String topologyName = args[0];
RecordToTupleMapper recordToTupleMapper = new TestRecordToTupleMapper();
KinesisConnectionInfo kinesisConnectionInfo = new KinesisConnectionInfo(new CredentialsProviderChain(), new ClientConfiguration(), Regions.US_WEST_2,
1000);
ZkInfo zkInfo = new ZkInfo("localhost:2181", "/kinesisOffsets", 20000, 15000, 10000L, 3, 2000);
KinesisConfig kinesisConfig = new KinesisConfig(args[1], ShardIteratorType.TRIM_HORIZON,
recordToTupleMapper, new Date(), new ExponentialBackoffRetrier(), zkInfo, kinesisConnectionInfo, 10000L);
KinesisSpout kinesisSpout = new KinesisSpout(kinesisConfig);
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout", kinesisSpout, 3);
topologyBuilder.setBolt("bolt", new KinesisBoltTest(), 1).shuffleGrouping("spout");
Config topologyConfig = new Config();
topologyConfig.setDebug(true);
topologyConfig.setNumWorkers(3);
StormSubmitter.submitTopology(topologyName, topologyConfig, topologyBuilder.createTopology());
}
}
```
就像你可以看到的,在它的構造函數中,引出了一個KinesisConfig對象。KinesisConfig的構造函數需要8個對象,如下所描述。
#### `String` streamName
用于消費數據的kinesis時間流的名稱
#### `ShardIteratorType` shardIteratorType
支持3種類型 - TRIM_HORIZON(beginning of shard), LATEST and AT_TIMESTAMP. 默認情況下,如果分片狀態為this,則忽略此參數在zookeeper中尋找。所以它們將首次應用從拓撲開始。如果您想在拓撲的后續運行中使用任何這些拓撲,將需要清除用于存儲序列號的zookeeper節點的狀態。
#### `RecordToTupleMapper` recordToTupleMapper
一個 `RecordToTupleMapper` 接口的實現,該端口將會將kinesis記錄轉換為storm元組。它有兩種方法。getOutputFields 告訴spout 要從getTuple方法發出的元組中存在的數據。如果getTuple返回null,記錄將被確認。 `java Fields getOutputFields (); List<Object> getTuple (Record record);`
#### `Date` timestamp
與 AT_TIMESTAMP sharedIteratorType參數結合使用。這將使得spout(噴口)從那時開始提取記錄。該kinesis時間使用的時間是與kinesis時間記錄相關的服務器端時間。
#### `FailedMessageRetryHadnler` failedMessageRetryHandler
`FailedMessageRetryHandler` 接口的實現。 默認情況下,該模塊提供支持指數退避重試的實現失敗消息的機制。該實現有兩個構造函數。默認值沒有args 構造函數將配置在100毫秒的第一次重試隨后在Math.pow(2,i-1)中退出,其中i是范圍2中的重試次數到LONG.MAX_LONG。2表示指數函數的基數(秒)。另外一個構造函數以毫秒為單位進行重試間隔,作為第一個參數進行首次重試,以秒為單位的指數函數為第二個參數,重試作為第三個參數。這種接口的方法及其spout(噴口)的工作方式如下描述 `java boolean failed (KinesisMessageId messageId); KinesisMessageId getNextFailedMessageToRetry (); void failedMessageEmitted (KinesisMessageId messageId); void acked (KinesisMessageId messageId);` 將在每個發生故障的元組上調用失敗的方法。如果計劃重新嘗試失敗的消息,則返回true,否則返回false。
getNextFailedMessageToRetry 方法將被稱為第一件事,每次一個spout(噴口)想要發生一個元組。它應該返回一個應該重試的消息,如果有,否則為空。請注意,在該時間段內沒有任何重試信息的情況下,它可以返回null。但是,當該消息被調用失敗方法時,它最終將返回它返回true的每個消息
如果spout(噴口)成功的設法從kinesis時獲取記錄并發送,failedMessageEmitted將被調用。否則,當getNextFailedMessageToRetry方法被再次調用的時候,該實現應該會返回相同的消息
一但失敗的消息被重新發送并被spout(噴口)成功地確認,則會被呼叫。如果失敗,spout(噴口)失敗就會被再次呼叫。
#### `ZkInfo` zkInfo
封裝的對象信息用來zookeeper的交互。這個構造函數使用zkUrl作為第一個參數,它是一個逗號分隔的字符串zk host和端口,zkNode作為第二個參數將用作存儲提交序列號的根節點,會話超時以毫秒為單位,連接超時作為第四毫秒,提交間隔為提交序列號到zookeeper的毫秒數的五分之一,重試嘗試作為zk客戶端的第六個連接重試嘗試,重試間隔以毫秒為單位的等待時間,然后再重試連接。
#### `KinesisConnectionInfo` kinesisConnectionInfo
使用kinesis客戶端捕獲連接到kinesis的參數的對象。它有一個構造函數來實現 `AWSCredentialsProvider`作為第一個參數。該模塊提供了一個稱為 `CredentialsProviderChain` 的實現,它允許spout(噴口)使用以下之一進行kinesis檢測,這5個機制順序按照以下順序 - `EnvironmentVariableCredentialsProvider`, `SystemPropertiesCredentialsProvider`, `ClasspathPropertiesFileCredentialsProvider`, `InstanceProfileCredentialsProvider`, `ProfileCredentialsProvider`。它需要一個`ClientConfiguration` 對象作為配置。它需要一個 `ClientConfiguration` 對象作為配置運動的第二個參數客戶端,`Regions` 作為第三個參數,設置客戶端上要連接的區域,recordsLimit作為第四個參數,表示最大數量的記錄Kinesis客戶端將每個GetReocrds請求檢索。這個限制應該根據記錄的大小,運動時間來仔細選擇吞吐量限制和風暴中每個元組延遲的拓撲。另外如果一個任務將從多個分片讀取,那么這將影響選擇權限認證。
#### `Long` maxUncommittedRecords
這表示每個任務允許的最大未提交序列號數。一旦達到這個數字,spout就不會從kinesis中獲取任何新的記錄。未提交的序列號被定義為尚未提交給zookeeper的任務的所有消息的總和。這與拓撲級別最大待處理消息不同。例如,如果此值設置為10,并且spout將序列號從1發送到10。序號1正在等待,2到10次被告知。在這種情況下,未提交的序列的數量為10,因為1到10范圍內的序列號可以被提交到zk。但是,storm仍然可以在端口上調用下一個元組,因為只有一個等待消息。
### Maven dependencies
Aws sdk version that this was tested with is 1.10.77
```
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>${aws-java-sdk.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
</dependency>
</dependencies>
```
## 將來的工作
處理kinesis中的碎片的合并或分裂,Trident噴口實施和指標
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度