# Storm Kafka 集成(0.10.x+)
## 使用 kafka-client jar 進行 Storm Apache Kafka 集成
這部分包含新的 Apache Kafka consumer API.
## 兼容性
Apache Kafka 版本 0.10+
## 寫入Kafka
您可以通過創建 org.apache.storm.kafka.bolt.KafkaBolt 實例并將其作為組件附加到您的topology.如果您使用 trident ,您可以通過使用以下對象完成 org.apache.storm.kafka.trident.TridentState, org.apache.storm.kafka.trident.TridentStateFactory and org.apache.storm.kafka.trident.TridentKafkaUpdater.
您需要為以下兩個接口提供實現
### TupleToKafkaMapper 和 TridentTupleToKafkaMapper
這些接口有兩個抽象方法:
```
K getKeyFromTuple(Tuple/TridentTuple tuple);
V getMessageFromTuple(Tuple/TridentTuple tuple);
```
顧名思義,這兩個方法被調用將tuple映射到Kafka message的key和message本身. 如果你只想要一個字段 作為鍵和一個字段作為值,那么您可以使用提供的FieldNameBasedTupleToKafkaMapper.java 實現. 在KafkaBolt中,使用默認構造函數構造FieldNameBasedTupleToKafkaMapper需要一個字段名稱為"key"和"message"的字段以實現向后兼容. 或者,您也可以使用非默認構造函數指定不同的鍵和消息字段. 在使用TridentKafkaState 時你必須明確key和message的字段名稱,因為TridentKafkaState默認的構造函數沒有設置參數.在構造FieldNameBasedTupleToKafkaMapper的實例時應明確這些.
### KafkaTopicSelector 和 trident KafkaTopicSelector
這個接口只有一個方法:
```
public interface KafkaTopicSelector {
String getTopics(Tuple/TridentTuple tuple);
}
```
該接口的實現應該要根據tuple的 key/message 返回相應的Kafka的topic,如果返回 null 則該消息將被忽略掉.如果您只需要一個靜態topic名稱,那么可以使用 DefaultTopicSelector.java 并在構造函數中設置topic的名稱.
`FieldNameTopicSelector` 和 `FieldIndexTopicSelector` 用于選擇 tuple 要發送到的topic,用戶只需要指定tuple中存儲 topic名稱的字段名稱或字段索引即可(即tuple中的某個字段是kafka topic的名稱).當topic的名稱不存在時, `Field*TopicSelector` 會將tuple寫入到默認的topic.請確保默認topic已經在kafka中創建并且在`Field*TopicSelector`正確設置.
### 設置 Kafka producer 屬性
你可以在 topology 通過調用 `KafkaBolt.withProducerProperties()` 和 `TridentKafkaStateFactory.withProducerProperties()` 設置kafka producer的所有屬性. Kafka producer[配置](http://kafka.apache.org/documentation.html#newproducerconfigs) 選擇 "Important configuration properties for the producer" 查看更多詳情. 所有的kafka producer配置項的key都在 `org.apache.kafka.clients.producer.ProducerConfig`類中
### 使用通配符匹配 Kafka topic
通過添加如下屬性開啟通配符匹配(此功能是為了storm可以動態讀取多個kafka topic中的數據,并支持動態發現.看相關功能的實現需求[feture](https://issues.apache.org/jira/browse/STORM-817))
```
Config config = new Config();
config.put("kafka.topic.wildcard.match",true);
```
之后,您可以指定一個通配符topic,例如clickstream.*.log. 這將匹配clickstream.my.log,clickstream.cart.log等topic
### bolt 和 Trident 的Kafka Producer實現
For the bolt :
```
TopologyBuilder builder = new TopologyBuilder();
Fields fields = new Fields("key", "message");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", "1"),
new Values("trident", "1"),
new Values("needs", "1"),
new Values("javadoc", "1")
);
spout.setCycle(true);
builder.setSpout("spout", spout, 5);
//set producer properties.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaBolt bolt = new KafkaBolt()
.withProducerProperties(props)
.withTopicSelector(new DefaultTopicSelector("test"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
Config conf = new Config();
StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
```
For Trident:
```
Fields fields = new Fields("word", "count");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", "1"),
new Values("trident", "1"),
new Values("needs", "1"),
new Values("javadoc", "1")
);
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
//set producer properties.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(props)
.withKafkaTopicSelector(new DefaultTopicSelector("test"))
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
Config conf = new Config();
StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
```
## 讀取Kafka (Spouts)
### 配置
spout通過使用`KafkaSpoutConfig`類來指定配置. 此類使用Builder模式,可以通過調用其中一個Builders構造函數或通過調用KafkaSpoutConfig類中的靜態方法創建一個Builder.創建builder的構造方法或靜態方法需要幾個鍵值(稍后可以更改),但這是啟動一個spout的所需的最小配置
`bootstrapServers`與Kafka Consumer Property"bootstrap.servers"相同. 配置項`topics' 配置的是spout將消費的kafka topic.可以是特定主題名稱(1個或多個)的集合列表或正則表達式"Pattern",它指定 任何與正則表達式匹配的主題都將被消費.
在構造函數的情況下,您可能還需要指定key deserializer和value deserializer. 這是為了通過使用Java泛型來保證類型安全. 默認值為"StringDeserializer",可以通過調用"setKeyDeserializer"和"setValueDeserializer"進行覆蓋.如果這些設置為null,代碼將回退到kafka屬性中設置的內容,但最好在這里明確,通過使用Java泛型來確保類型安全.
下面是一些需要特別注意的關鍵配置項.
`setFirstPollOffsetStrategy`允許你設置從哪里開始消費數據. 這在故障恢復和第一次啟動spout的情況下會被使用. 可選的的值包括:
* `EARLIEST` 無論之前的消費情況如何,spout會從每個kafka partition能找到的最早的offset開始的讀取
* `LATEST` 無論之前的消費情況如何,spout會從每個kafka partition當前最新的offset開始的讀取
* `UNCOMMITTED_EARLIEST` (默認值) spout 會從每個partition的最后一次提交的offset開始讀取. 如果offset不存在或者過期, 則會依照 `EARLIEST`進行讀取.
* `UNCOMMITTED_LATEST` spout 會從每個partition的最后一次提交的offset開始讀取, 如果offset不存在或者過期, 則會依照 `LATEST`進行讀取.
`setRecordTranslator`可以修改spout如何將Kafka消費者message轉換為tuple,以及將該tuple發布到哪個stream中.默認情況下,"topic","partition","offset","key"和"value"將被發送到"default"stream. 如果要將條目根據topic輸出到不同的stream中,Storm提供了"ByTopicRecordTranslator". 有關如何使用這些的更多示例,請參閱下文. `setProp`可用于設置kafka屬性. `setGroupId`可以讓您設置kafka使用者組屬性"group.id". `setSSLKeystore`和`setSSLTruststore`允許你配置SSL認證.
### 使用舉例
API是用java 8 lambda表達式寫的. 它也可以用于java7及更低的版本.
#### 創建一個簡單的不可靠spout
以下將消費kafka中"demo_topic"的所有消息,并將其發送到MyBolt,其中包含"topic","partition","offset","key","value".
```
final TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, "demo_topic").build()), 1);
tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout");
...
```
#### 通配符 Topics
通配符 topics 將消費所有符合通配符的topics. 在下面的例子中 "topic", "topic_foo" 和 "topic_bar" 適配通配符 "topic.*", 但是 "not_my_topic" 并不適配.
```
final TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, Pattern.compile("topic.*")).build()), 1);
tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout");
...
```
#### 多個 Streams
這個案例使用 java 8 lambda 表達式.
```
final TopologyBuilder tp = new TopologyBuilder();
//默認情況下,spout 消費但未被match到的topic的message的"topic","key"和"value"將發送到"STREAM_1"
ByTopicRecordTranslator<String, String> byTopic = new ByTopicRecordTranslator<>(
(r) -> new Values(r.topic(), r.key(), r.value()),
new Fields("topic", "key", "value"), "STREAM_1");
//topic_2 所有的消息的 "key" and "value" 將發送到 "STREAM_2"中
byTopic.forTopic("topic_2", (r) -> new Values(r.key(), r.value()), new Fields("key", "value"), "STREAM_2");
tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, "topic_1", "topic_2", "topic_3").build()), 1);
tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout", "STREAM_1");
tp.setBolt("another", new myOtherBolt()).shuffleGrouping("kafka_spout", "STREAM_2");
...
```
#### Trident
```
final TridentTopology tridentTopology = new TridentTopology();
final Stream spoutStream = tridentTopology.newStream("kafkaSpout",
new KafkaTridentSpoutOpaque<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, Pattern.compile("topic.*")).build()))
.parallelismHint(1)
...
```
Trident不支持多個stream且不支持設置將strem分發到多個output. 并且,如果每個output 的topic的字段不一致會拋出異常而不會繼續.
### 自定義 RecordTranslator(高級特性)
在大多數情況下,內置的SimpleRecordTranslator和ByTopicRecordTranslator應該滿足您的使用. 如果您遇到需要定制的情況那么這個文檔將會描述如何正確地做到這一點,涉及到一些不太常用的類.適用的要點是使用ConsumerRecord并將其轉換為可以emitted 的"List <object>". 難點是如何告訴spout將其發送到指定的stream中. 為此,您將需要返回一個"org.apache.storm.kafka.spout.KafkaTuple"的實例. 這提供了一個方法`routedTo`,它將說明tuple將要發送到哪個特定stream.</object>
For Example:
```
return new KafkaTuple(1, 2, 3, 4).routedTo("bar");
```
將會使tuple發送到"bar" stream中.
在編寫自定義record translators時要小心,因為在Storm spout 中,它需要自我一致. `streams`方法應該返回這個translator將會嘗試發到streams的set列表. 另外,`getFieldsFor`應該為每一個stream 返回一個有效的Fields對象(就是說通過字段名稱可以拿到對應的正確的對象). 如果您使用Trident執行此操作,則Fields對象中指定字段的所有值必須在stream名稱的List中,否則trident拋出異常.(原文:If you are doing this for Trident a value must be in the List returned by apply for every field in the Fields object for that stream)
### 手動分區控制 (高級特性)
默認情況下,Kafka將自動將partition分配給當前的一組spouts. 它處理很多事情,但在某些情況下,您可能需要手動分配partition.當spout 掛掉并重新啟動,但如果處理不正確,可能會導致很多問題. 這可以通過子類化Subscription來處理,我們有幾個實現,您可以查看有關如何執行此操作的示例. ManualPartitionNamedSubscription和ManualPartitionPatternSubscription. 再次強調,使用這些或自己實現時請務必注意.
## 使用Maven Shade Plugin構建Uber Jar
Add the following to `REPO_HOME/storm/external/storm-kafka-client/pom.xml` `xml <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin>`
執行命令生成 uber jar:
`mvn package -f REPO_HOME/storm/external/storm-kafka-client/pom.xml`
uber jar 文件會生成在如下目錄中:
`REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-1.0.x.jar`
### 運行 Storm Topology
復制`REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-*.jar` 到 `STORM_HOME/extlib`
使用kafka 命令行工具創建topic [test, test1, test2] 并使用 Kafka console producer 向topic添加數據
執行命令 `STORM_HOME/bin/storm jar REPO_HOME/storm/external/storm/target/storm-kafka-client-*.jar org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain`
開啟debug級別日志可以看到每個topic的消息根據設定的stream和設定的shuffle grouping被重定向到相應的spout.
## Using storm-kafka-client with different versions of kafka
Storm-kafka客戶端的Kafka依賴關系在maven中被定義為`provided`,這意味著它不會被拉入 作為傳遞依賴. 這允許您使用與您的kafka集群兼容的Kafka依賴版本.
當使用storm-kafka-client構建項目時,必須顯式添加Kafka clients依賴關系. 例如,使用Kafka client 0.10.0.0,您將使用以下依賴 `pom.xml`:
```
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
```
你也可以在使用maven build時通過指定參數`storm.kafka.client.version` 來指定 kafka clients 版本 e.g. `mvn clean install -Dstorm.kafka.client.version=0.10.0.0`
選擇kafka client版本時,您應該確保 - 1\. kafka api是兼容的. storm-kafka-client模塊僅支持** 0.10或更新的** kafka客戶端API. 對于舊版本, ? 您可以使用storm-kafka模塊 ([https://github.com/apache/storm/tree/master/external/storm-kafka](https://github.com/apache/storm/tree/master/external/storm-kafka)).
2\. 您選擇的kafka client 應與broker兼容. 例如 0.9.x client 將無法使用 ? 0.8.x broker
## Kafka Spout 性能調整
Kafka spout 提供了兩個內置參數來調節其性能. 參數可以通過 [KafkaSpoutConfig](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java) 的 [setOffsetCommitPeriodMs](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L189-L193) 和 [setMaxUncommittedOffsets](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L211-L217). 方法進行設置
* "offset.commit.period.ms" 控制spout多久向kafka注冊一次offset
* "max.uncommitted.offsets" 控制沒讀取多少條message向kafka注冊一次offset
[Kafka consumer config](http://kafka.apache.org/documentation.html#consumerconfigs) 參數也可能對spout的性能產生影響. 以下Kafka參數可能是spout性能中影響最大的一些參數:
* "fetch.min.bytes"
* "fetch.max.wait.ms"
* [Kafka Consumer](http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html) Kafka spout 使用 [KafkaSpoutConfig](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java) 的 [setPollTimeoutMs](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L180-L184)方法設置讀取數據的超時時間
根據您的Kafka群集的結構,數據的分布和數據的可用性,這些參數必須正確配置. 請參考關于Kafka參數調整的Kafka文檔.
### kafka spout配置默認值
目前 Kafka spout 有如下默認值,這在[blog post](https://hortonworks.com/blog/microbenchmarking-storm-1-0-performance/)所述的測試環境中表現出了良好的性能
* poll.timeout.ms = 200
* offset.commit.period.ms = 30000 (30s)
* max.uncommitted.offsets = 10000000
## Kafka 自動提交offset模式
如果可靠性對您不重要 - 也就是說,您不關心在失敗情況下丟失tuple,并且要消除tuple跟蹤的開銷,那么您可以使用AutoCommitMode運行KafkaSpout.
你需要開啟自動提交模式: * 設置 Config.TOPOLOGY_ACKERS 為 0; * 在Kafka consumer 配置中開啟 _AutoCommitMode_ ;
下面是一個在KafkaSpout中開啟AutoCommitMode的例子:
```
KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
.builder(String bootstrapServers, String ... topics)
.setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
.build();
```
_請注意,由于Kafka消費者定期進行提交offset,所以并不是完全符合 `At-Most-Once`.因為在KafkaSpout崩潰時,可能會重讀某些tuple._
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度