<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                # Storm Kafka Integration 提供核心的 Storm 和Trident 的spout實現,用來從Apache Kafka 0.8x版本消費數據. ## Spouts 我們支持 Trident 和 core Storm 的spout.對于這兩種spout實現,我們使用BorkerHosts接口來跟蹤Kafka broker host partition 映射關系,用KafkaConfig來控制Kafka 相關參數. ### BrokerHosts 為了初始化 Kafka spout/emitter,你需要構造一個 BrokerHosts 標記接口的實例。當前,我們支持以下兩種實現方式. #### ZkHosts 如果你想要動態的跟蹤Kafka broker partition 映射關系,你應該使用ZkHosts。這個類使用 Kafka Zookeeper實體跟蹤 brokerHost-&gt;分區映射. 你可以調用下面的方法來得到一個實例. `java public ZkHosts(String brokerZkStr, String brokerZkPath) public ZkHosts(String brokerZkStr)` ZkStr 字符串格式是 ip:port(例如:localhost:2181).brokerZkPath 是存儲所有 topic 和 partition信息的zk 根路徑.默認情況下,Kafka使用 /brokers路徑. 默認情況下,broker-partition 映射關系60s秒從Zookeeper刷新一次.如果你想要改變這個時間,你需要設置 host.refreshFreqSecs 配置. #### StaticHosts 這是一種可替代的實現,broker-&gt;partition 信息是靜態的.要構造這個類的實例,你需要先構造一個 GlobalPartitionInformation 的實例. ``` Broker brokerForPartition0 = new Broker("localhost");//localhost:9092 Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string. GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation(); partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0 partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1 partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2 StaticHosts hosts = new StaticHosts(partitionInfo); ``` ### KafkaConfig 構造一個KafkaSpout的實例,第二件事情就是要實例化KafkaConfig。 `java public KafkaConfig(BrokerHosts hosts, String topic) public KafkaConfig(BrokerHosts hosts, String topic, String clientId)` BrokerHosts可以通過多個BrokerHosts接口實現.topic 就是Kafka topic 的名稱.可選擇的ClientId就是當前消費的offset存儲的zk的路徑. 有兩個KafkaConfig 繼承類正在被使用. Spoutconfig是KafkaConfig的擴展,它支持Zookeeper 連接信息的其他字段,并且可以控制KafkaSpout的行為.Zkroot就是用來存儲消費者offset信息的根路徑.id是唯一的,用來標識spout. `java public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id); public SpoutConfig(BrokerHosts hosts, String topic, String id);` 除此之外,SpoutConfig包含下面這些字段,用來控制KafkaSpout的行為: ```java // setting for how often to save the current Kafka offset to ZooKeeper public long stateUpdateIntervalMs = 2000; ``` // Retry strategy for failed messages public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName(); // Exponential back-off retry settings. These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used. // Initial delay between successive retries public long retryInitialDelayMs = 0; public double retryDelayMultiplier = 1.0; // Maximum delay between successive retries public long retryDelayMaxMs = 60 * 1000; // Failed message will be retried infinitely if retryLimit is less than zero. public int retryLimit = -1; ``` ``` 核心KafkaSpout只接口一個SpoutConfig實例 TridentKafkaConfig是KafkaConfig的另外一個擴展. TridentKafkaEmitter只接受TridentKafkaConfig作為參數. KafkaConfig類也有一些公共變量來控制你的應用程序的行為。以下是默認值: ```java public int fetchSizeBytes = 1024 * 1024; public int socketTimeoutMs = 10000; public int fetchMaxWait = 10000; public int bufferSizeBytes = 1024 * 1024; public MultiScheme scheme = new RawMultiScheme(); public boolean ignoreZkOffsets = false; public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); public long maxOffsetBehind = Long.MAX_VALUE; public boolean useStartOffsetTimeIfOffsetOutOfRange = true; public int metricsTimeBucketSizeInSecs = 60; ``` 除MultiScheme之外,大部分都可以讀命名就可以理解。 ### MultiScheme MultiScheme 是一個用來規定 ByteBuffer 如何Kafka 消費,并轉換成一個 storm tuple.并且會控制 output field的命名. ``` public Iterable<List<Object>> deserialize(ByteBuffer ser); public Fields getOutputFields(); ``` 默認的 `RawMultiScheme` 接受 `ByteBuffer` 參數,并返回一個 tuple.就是將ByteBuffer 轉換成 `byte[]`.outPutField 的名稱是 “bytes”。還有可替換的的實現,像 `SchemeAsMultiScheme` 和 `KeyValueSchemeAsMultiScheme`,他們會將 `ByteBuffer` 轉換成 `String`. 當然還有個`SchemeAsMultiScheme` 的擴展類,`MessageMetadataSchemeAsMultiScheme`,MessageMetadataSchemeAsMultiScheme有一個額外的反序列化方法,會接受ByteBuffer 信息,還會伴隨著`Partition` 和 `offset` 信息. ``` public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) ``` 上面這個方法對于審計/重新處理Kafka topic上任意一個點的消息非常有用,保存了每條消息的partition和offset,而不是保留整個消息. ### Failed message retry FailedMsgRetryManager是一個定義失敗消息的重試策略的接口。默認實現是ExponentialBackoffMsgRetryManager,它在連續重試之間以指數延遲重試。要使用自定義實現,請將SpoutConfig.failedMsgRetryManagerClass設置為完整的實現類名稱。下面是接口: ```java // Spout initialization can go here. This can be called multiple times during lifecycle of a worker. void prepare(SpoutConfig spoutConfig, Map stormConf); ``` // Message corresponding to offset has failed. This method is called only if retryFurther returns true for offset. void failed(Long offset); // Message corresponding to offset has been acked. void acked(Long offset); // Message corresponding to the offset, has been re-emitted and under transit. void retryStarted(Long offset); /** * The offset of message, which is to be re-emitted. Spout will fetch messages starting from this offset * and resend them, except completed messages. */ Long nextFailedMessageToRetry(); /** * @return True if the message corresponding to the offset should be emitted NOW. False otherwise. */ boolean shouldReEmitMsg(Long offset); /** * Spout will clean up the state for this offset if false is returned. If retryFurther is set to true, * spout will called failed(offset) in next call and acked(offset) otherwise */ boolean retryFurther(Long offset); /** * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka. */ Set<Long> clearOffsetsBefore(Long kafkaOffset); ``` ``` #### Version incompatibility 在1.0之前的Storm版本中,MultiScheme方法接受一個 `byte []` 而不是 `ByteBuffer`。 MultScheme和相關的方案apis在版本1.0中被更改為接受ByteBuffer而不是byte []。 這意味著,在1.0版及更高版本之前,1.0版的kafka spouts將無法使用。在Storm 1.0及更高版本中運行拓撲時,必須確保storm-kafka版本至少為1.0。1.0之前的 topology jar 必須重新和storm-kafka 1.0版本構建,以便在Storm 1.0及更高版本的群集中運行。 ### Examples #### Core Spout ```java BrokerHosts hosts = new ZkHosts(zkConnString); SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString()); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); ``` #### Trident Spout ``` TridentTopology topology = new TridentTopology(); BrokerHosts zk = new ZkHosts("localhost"); TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic"); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf); ``` ### How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures 如上面的KafkaConfig屬性所示,您可以通過設置 `KafkaConfig.startOffsetTime` 來控制從Kafka topic 的哪個端口開始讀取,如下所示: 1. `kafka.api.OffsetRequest.EarliestTime()`: 從topic 初始位置讀取消息 (例如,從最老的那個消息開始) 2. `kafka.api.OffsetRequest.LatestTime()`: 從topic尾部開始讀取消息 (例如,新寫入topic的信息) 3. 一個Unix時間戳,從當前 epoch 開始.(例如,可以通過System.currentTimeMillis()),具體的可以查看Kafka FAQ中的 [How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?](https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest?) . 當topology(拓撲)運行Kafka Spout ,并跟蹤讀取和發送的offset,并將狀態信息存儲到zk path `SpoutConfig.zkRoot+ "/" + SpoutConfig.id`.在故障的情況下,它會從ZooKeeper的最后一次寫入偏移中恢復。 > **Important:** 新部署topology(拓撲)時,請確保`SpoutConfig.zkRoot`和`SpoutConfig.id`的設置未被修改, 否則spout將無法從ZooKeeper中讀取以前的消費者狀態信息(即偏移量)導致意外的行為和/或數據丟失,具體取決于您的用例。 這意味著當topology(拓撲)運行一旦設置`KafkaConfig.startOffsetTime`將不會對 topology(拓撲)的后續運行產生影響, 因為現在 topology(拓撲)將依賴于ZooKeeper中的消費者狀態信息(偏移量)來確定從哪里開始(更多準確地:簡歷)閱讀。 如果要強制該端口忽略存儲在ZooKeeper中的任何消費者狀態信息,則應將參數`KafkaConfig.ignoreZkOffsets` 設置為true。如果為`true`, 則如上所述,spout 將始終從`KafkaConfig.startOffsetTime`定義的偏移量開始讀取。 ## Using storm-kafka with different versions of Kafka Storm-kafka的Kafka依賴關系在maven中scope 定義為 `provided` ,這意味著它不會被作為傳遞依賴。這允許您使用與Kafka集群兼容的Kafka依賴關系版本。 當使用storm-kafka構建項目時,必須明確地添加Kafka依賴項。例如,要使用針對Scala 2.10構建的Kafka 0.8.1.1,您將在 `pom.xml` 中使用以下依賴關系: ``` <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> ``` 請注意,排除ZooKeeper和log4j依賴關系以防止與Storm的依賴關系發生版本沖突。 您還可以覆蓋從maven構建的kafka依賴關系版本,其中包含參數`storm.kafka.version`和`storm.kafka.artifact.id`,例如`mvn clean install -Dstorm.kafka.artifact.id = kafka_2.11 -Dstorm.kafka.version = 0.9.0.1` 選擇kafka依賴版本時,您應該確保 - 1\. kafka api與storm-kafka兼容。目前,storm-kafka模塊僅支持0.9.x和0.8.x客戶端API。如果要使用更高版本,應該使用storm-kafka-client模塊替換。 2\. 您選擇的kafka客戶端應與 broker 兼容。例如0.9.x客戶端將無法使用0.8.x broker。 ## Writing to Kafka as part of your topology 您可以創建一個org.apache.storm.kafka.bolt.KafkaBolt的實例,并將其作為組件附加到 topology(拓撲)中,或者如果您使用Trident,則可以使用org.apache.storm.kafka.trident.TridentState,org.apache .storm.kafka.trident.TridentStateFactory和org.apache.storm.kafka.trident.TridentKafkaUpdater。 您需要提供以下2個接口的實現: ### TupleToKafkaMapper and TridentTupleToKafkaMapper 這個接口有下面兩個方法: ``` K getKeyFromTuple(Tuple/TridentTuple tuple); V getMessageFromTuple(Tuple/TridentTuple tuple); ``` 顧名思義,這些方法被稱為將 tuple 映射到Kafka key 和Kafka消息。 如果您只需要一個字段作為鍵和一個字段作為值,則可以使用提供的FieldNameBasedTupleToKafkaMapper.java實現。 在KafkaBolt中,如果使用默認構造函數構造FieldNameBasedTupleToKafkaMapper,則實現始終會查找字段名稱為“key”和“message”的字段,以實現向后兼容性的原因。 或者,您也可以使用非默認構造函數指定不同的鍵和消息字段。在TridentKafkaState中,您必須指定鍵和消息的字段名稱,因為沒有默認構造函數。 在構造FieldNameBasedTupleToKafkaMapper實例時應該指定這些。 ### KafkaTopicSelector and trident KafkaTopicSelector This interface has only one method `java public interface KafkaTopicSelector { String getTopics(Tuple/TridentTuple tuple); }` 該接口的實現應該返回要發送 tuple的密鑰/消息映射的topic,您可以返回一個null,該消息將被忽略。 如果您有一個靜態的topic 名稱,那么可以使用DefaultTopicSelector.java并在構造函數中設置主題的名稱。 `FieldNameTopicSelector`和`FieldIndexTopicSelector`用于支持決定哪個topic 應該從tuple 送消息。 用戶可以在tuple中指定字段名稱或字段索引,selector將使用該值作為發布消息的topic 名稱。 當找不到topic 名稱時,`KafkaBolt`會將消息寫入默認topic。請確保已創建默認topic。 ### Specifying Kafka producer properties `TridentKafkaStateFactory.withProducerProperties()`來提供Storm拓撲中的所有生產屬性。有關詳細信息,請參閱[http://kafka.apache.org/documentation.html#newproducerconfigs“](http://kafka.apache.org/documentation.html#newproducerconfigs%E2%80%9C) producer 的重要配置屬性”部分。 ### Using wildcard kafka topic match 您可以通過添加以下配置來進行通配符 topic 匹配 ``` Config config = new Config(); config.put("kafka.topic.wildcard.match",true); ``` 之后,您可以指定一個通配符 topic ,以匹配例如點擊流。*記錄。這將匹配所有流匹配clickstream.my.log,clickstream.cart.log等 ###Putting it all together 對于bolt: ```java TopologyBuilder builder = new TopologyBuilder(); Fields fields = new Fields("key", "message"); FixedBatchSpout spout = new FixedBatchSpout(fields, 4, new Values("storm", "1"), new Values("trident", "1"), new Values("needs", "1"), new Values("javadoc", "1") ); spout.setCycle(true); builder.setSpout("spout", spout, 5); //set producer properties. Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "1"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaBolt bolt = new KafkaBolt() .withProducerProperties(props) .withTopicSelector(new DefaultTopicSelector("test")) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout"); Config conf = new Config(); StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology()); ``` 對于 Trident: ``` Fields fields = new Fields("word", "count"); FixedBatchSpout spout = new FixedBatchSpout(fields, 4, new Values("storm", "1"), new Values("trident", "1"), new Values("needs", "1"), new Values("javadoc", "1") ); spout.setCycle(true); TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream("spout1", spout); //set producer properties. Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "1"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() .withProducerProperties(props) .withKafkaTopicSelector(new DefaultTopicSelector("test")) .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count")); stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields()); Config conf = new Config(); StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build()); ``` ### Committer Sponsors P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看