# Storm Kafka Integration
提供核心的 Storm 和Trident 的spout實現,用來從Apache Kafka 0.8x版本消費數據.
## Spouts
我們支持 Trident 和 core Storm 的spout.對于這兩種spout實現,我們使用BorkerHosts接口來跟蹤Kafka broker host partition 映射關系,用KafkaConfig來控制Kafka 相關參數.
### BrokerHosts
為了初始化 Kafka spout/emitter,你需要構造一個 BrokerHosts 標記接口的實例。當前,我們支持以下兩種實現方式.
#### ZkHosts
如果你想要動態的跟蹤Kafka broker partition 映射關系,你應該使用ZkHosts。這個類使用 Kafka Zookeeper實體跟蹤 brokerHost->分區映射. 你可以調用下面的方法來得到一個實例. `java public ZkHosts(String brokerZkStr, String brokerZkPath) public ZkHosts(String brokerZkStr)` ZkStr 字符串格式是 ip:port(例如:localhost:2181).brokerZkPath 是存儲所有 topic 和 partition信息的zk 根路徑.默認情況下,Kafka使用 /brokers路徑.
默認情況下,broker-partition 映射關系60s秒從Zookeeper刷新一次.如果你想要改變這個時間,你需要設置 host.refreshFreqSecs 配置.
#### StaticHosts
這是一種可替代的實現,broker->partition 信息是靜態的.要構造這個類的實例,你需要先構造一個 GlobalPartitionInformation 的實例.
```
Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
StaticHosts hosts = new StaticHosts(partitionInfo);
```
### KafkaConfig
構造一個KafkaSpout的實例,第二件事情就是要實例化KafkaConfig。 `java public KafkaConfig(BrokerHosts hosts, String topic) public KafkaConfig(BrokerHosts hosts, String topic, String clientId)`
BrokerHosts可以通過多個BrokerHosts接口實現.topic 就是Kafka topic 的名稱.可選擇的ClientId就是當前消費的offset存儲的zk的路徑.
有兩個KafkaConfig 繼承類正在被使用.
Spoutconfig是KafkaConfig的擴展,它支持Zookeeper 連接信息的其他字段,并且可以控制KafkaSpout的行為.Zkroot就是用來存儲消費者offset信息的根路徑.id是唯一的,用來標識spout. `java public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id); public SpoutConfig(BrokerHosts hosts, String topic, String id);` 除此之外,SpoutConfig包含下面這些字段,用來控制KafkaSpout的行為: ```java // setting for how often to save the current Kafka offset to ZooKeeper public long stateUpdateIntervalMs = 2000;
```
// Retry strategy for failed messages
public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
// Exponential back-off retry settings. These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
// calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
// Initial delay between successive retries
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
// Maximum delay between successive retries
public long retryDelayMaxMs = 60 * 1000;
// Failed message will be retried infinitely if retryLimit is less than zero.
public int retryLimit = -1;
```
```
核心KafkaSpout只接口一個SpoutConfig實例
TridentKafkaConfig是KafkaConfig的另外一個擴展.
TridentKafkaEmitter只接受TridentKafkaConfig作為參數.
KafkaConfig類也有一些公共變量來控制你的應用程序的行為。以下是默認值:
```java
public int fetchSizeBytes = 1024 * 1024;
public int socketTimeoutMs = 10000;
public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
public boolean ignoreZkOffsets = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = Long.MAX_VALUE;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
public int metricsTimeBucketSizeInSecs = 60;
```
除MultiScheme之外,大部分都可以讀命名就可以理解。
### MultiScheme
MultiScheme 是一個用來規定 ByteBuffer 如何Kafka 消費,并轉換成一個 storm tuple.并且會控制 output field的命名.
```
public Iterable<List<Object>> deserialize(ByteBuffer ser);
public Fields getOutputFields();
```
默認的 `RawMultiScheme` 接受 `ByteBuffer` 參數,并返回一個 tuple.就是將ByteBuffer 轉換成 `byte[]`.outPutField 的名稱是 “bytes”。還有可替換的的實現,像 `SchemeAsMultiScheme` 和 `KeyValueSchemeAsMultiScheme`,他們會將 `ByteBuffer` 轉換成 `String`.
當然還有個`SchemeAsMultiScheme` 的擴展類,`MessageMetadataSchemeAsMultiScheme`,MessageMetadataSchemeAsMultiScheme有一個額外的反序列化方法,會接受ByteBuffer 信息,還會伴隨著`Partition` 和 `offset` 信息.
```
public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset)
```
上面這個方法對于審計/重新處理Kafka topic上任意一個點的消息非常有用,保存了每條消息的partition和offset,而不是保留整個消息.
### Failed message retry
FailedMsgRetryManager是一個定義失敗消息的重試策略的接口。默認實現是ExponentialBackoffMsgRetryManager,它在連續重試之間以指數延遲重試。要使用自定義實現,請將SpoutConfig.failedMsgRetryManagerClass設置為完整的實現類名稱。下面是接口: ```java // Spout initialization can go here. This can be called multiple times during lifecycle of a worker. void prepare(SpoutConfig spoutConfig, Map stormConf);
```
// Message corresponding to offset has failed. This method is called only if retryFurther returns true for offset.
void failed(Long offset);
// Message corresponding to offset has been acked.
void acked(Long offset);
// Message corresponding to the offset, has been re-emitted and under transit.
void retryStarted(Long offset);
/**
* The offset of message, which is to be re-emitted. Spout will fetch messages starting from this offset
* and resend them, except completed messages.
*/
Long nextFailedMessageToRetry();
/**
* @return True if the message corresponding to the offset should be emitted NOW. False otherwise.
*/
boolean shouldReEmitMsg(Long offset);
/**
* Spout will clean up the state for this offset if false is returned. If retryFurther is set to true,
* spout will called failed(offset) in next call and acked(offset) otherwise
*/
boolean retryFurther(Long offset);
/**
* Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.
*/
Set<Long> clearOffsetsBefore(Long kafkaOffset);
```
```
#### Version incompatibility
在1.0之前的Storm版本中,MultiScheme方法接受一個 `byte []` 而不是 `ByteBuffer`。 MultScheme和相關的方案apis在版本1.0中被更改為接受ByteBuffer而不是byte []。
這意味著,在1.0版及更高版本之前,1.0版的kafka spouts將無法使用。在Storm 1.0及更高版本中運行拓撲時,必須確保storm-kafka版本至少為1.0。1.0之前的 topology jar 必須重新和storm-kafka 1.0版本構建,以便在Storm 1.0及更高版本的群集中運行。
### Examples
#### Core Spout
```java
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
```
#### Trident Spout
```
TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
```
### How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures
如上面的KafkaConfig屬性所示,您可以通過設置 `KafkaConfig.startOffsetTime` 來控制從Kafka topic 的哪個端口開始讀取,如下所示:
1. `kafka.api.OffsetRequest.EarliestTime()`: 從topic 初始位置讀取消息 (例如,從最老的那個消息開始)
2. `kafka.api.OffsetRequest.LatestTime()`: 從topic尾部開始讀取消息 (例如,新寫入topic的信息)
3. 一個Unix時間戳,從當前 epoch 開始.(例如,可以通過System.currentTimeMillis()),具體的可以查看Kafka FAQ中的 [How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?](https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest?) .
當topology(拓撲)運行Kafka Spout ,并跟蹤讀取和發送的offset,并將狀態信息存儲到zk path `SpoutConfig.zkRoot+ "/" + SpoutConfig.id`.在故障的情況下,它會從ZooKeeper的最后一次寫入偏移中恢復。
> **Important:** 新部署topology(拓撲)時,請確保`SpoutConfig.zkRoot`和`SpoutConfig.id`的設置未被修改, 否則spout將無法從ZooKeeper中讀取以前的消費者狀態信息(即偏移量)導致意外的行為和/或數據丟失,具體取決于您的用例。
這意味著當topology(拓撲)運行一旦設置`KafkaConfig.startOffsetTime`將不會對 topology(拓撲)的后續運行產生影響, 因為現在 topology(拓撲)將依賴于ZooKeeper中的消費者狀態信息(偏移量)來確定從哪里開始(更多準確地:簡歷)閱讀。 如果要強制該端口忽略存儲在ZooKeeper中的任何消費者狀態信息,則應將參數`KafkaConfig.ignoreZkOffsets` 設置為true。如果為`true`, 則如上所述,spout 將始終從`KafkaConfig.startOffsetTime`定義的偏移量開始讀取。
## Using storm-kafka with different versions of Kafka
Storm-kafka的Kafka依賴關系在maven中scope 定義為 `provided` ,這意味著它不會被作為傳遞依賴。這允許您使用與Kafka集群兼容的Kafka依賴關系版本。
當使用storm-kafka構建項目時,必須明確地添加Kafka依賴項。例如,要使用針對Scala 2.10構建的Kafka 0.8.1.1,您將在 `pom.xml` 中使用以下依賴關系:
```
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
```
請注意,排除ZooKeeper和log4j依賴關系以防止與Storm的依賴關系發生版本沖突。
您還可以覆蓋從maven構建的kafka依賴關系版本,其中包含參數`storm.kafka.version`和`storm.kafka.artifact.id`,例如`mvn clean install -Dstorm.kafka.artifact.id = kafka_2.11 -Dstorm.kafka.version = 0.9.0.1`
選擇kafka依賴版本時,您應該確保 - 1\. kafka api與storm-kafka兼容。目前,storm-kafka模塊僅支持0.9.x和0.8.x客戶端API。如果要使用更高版本,應該使用storm-kafka-client模塊替換。 2\. 您選擇的kafka客戶端應與 broker 兼容。例如0.9.x客戶端將無法使用0.8.x broker。
## Writing to Kafka as part of your topology
您可以創建一個org.apache.storm.kafka.bolt.KafkaBolt的實例,并將其作為組件附加到 topology(拓撲)中,或者如果您使用Trident,則可以使用org.apache.storm.kafka.trident.TridentState,org.apache .storm.kafka.trident.TridentStateFactory和org.apache.storm.kafka.trident.TridentKafkaUpdater。
您需要提供以下2個接口的實現:
### TupleToKafkaMapper and TridentTupleToKafkaMapper
這個接口有下面兩個方法:
```
K getKeyFromTuple(Tuple/TridentTuple tuple);
V getMessageFromTuple(Tuple/TridentTuple tuple);
```
顧名思義,這些方法被稱為將 tuple 映射到Kafka key 和Kafka消息。 如果您只需要一個字段作為鍵和一個字段作為值,則可以使用提供的FieldNameBasedTupleToKafkaMapper.java實現。 在KafkaBolt中,如果使用默認構造函數構造FieldNameBasedTupleToKafkaMapper,則實現始終會查找字段名稱為“key”和“message”的字段,以實現向后兼容性的原因。 或者,您也可以使用非默認構造函數指定不同的鍵和消息字段。在TridentKafkaState中,您必須指定鍵和消息的字段名稱,因為沒有默認構造函數。 在構造FieldNameBasedTupleToKafkaMapper實例時應該指定這些。
### KafkaTopicSelector and trident KafkaTopicSelector
This interface has only one method `java public interface KafkaTopicSelector { String getTopics(Tuple/TridentTuple tuple); }` 該接口的實現應該返回要發送 tuple的密鑰/消息映射的topic,您可以返回一個null,該消息將被忽略。 如果您有一個靜態的topic 名稱,那么可以使用DefaultTopicSelector.java并在構造函數中設置主題的名稱。 `FieldNameTopicSelector`和`FieldIndexTopicSelector`用于支持決定哪個topic 應該從tuple 送消息。 用戶可以在tuple中指定字段名稱或字段索引,selector將使用該值作為發布消息的topic 名稱。 當找不到topic 名稱時,`KafkaBolt`會將消息寫入默認topic。請確保已創建默認topic。
### Specifying Kafka producer properties
`TridentKafkaStateFactory.withProducerProperties()`來提供Storm拓撲中的所有生產屬性。有關詳細信息,請參閱[http://kafka.apache.org/documentation.html#newproducerconfigs“](http://kafka.apache.org/documentation.html#newproducerconfigs%E2%80%9C) producer 的重要配置屬性”部分。
### Using wildcard kafka topic match
您可以通過添加以下配置來進行通配符 topic 匹配 ``` Config config = new Config(); config.put("kafka.topic.wildcard.match",true);
```
之后,您可以指定一個通配符 topic ,以匹配例如點擊流。*記錄。這將匹配所有流匹配clickstream.my.log,clickstream.cart.log等
###Putting it all together
對于bolt:
```java
TopologyBuilder builder = new TopologyBuilder();
Fields fields = new Fields("key", "message");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", "1"),
new Values("trident", "1"),
new Values("needs", "1"),
new Values("javadoc", "1")
);
spout.setCycle(true);
builder.setSpout("spout", spout, 5);
//set producer properties.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaBolt bolt = new KafkaBolt()
.withProducerProperties(props)
.withTopicSelector(new DefaultTopicSelector("test"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
Config conf = new Config();
StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
```
對于 Trident:
```
Fields fields = new Fields("word", "count");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", "1"),
new Values("trident", "1"),
new Values("needs", "1"),
new Values("javadoc", "1")
);
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
//set producer properties.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(props)
.withKafkaTopicSelector(new DefaultTopicSelector("test"))
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
Config conf = new Config();
StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
```
### Committer Sponsors
P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度