<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                [TOC] # 為什么要有kafka stream spark和storm都是流式處理框架,而kafka stream提供的是一個基于kafka的流式處理類庫.框架要求開發者按照特定的方式去開發邏輯部分,供框架調用.開發者很難了解框架的具體運行方式,從而使得調試成本高,并且使用受限. 而kafka stream作為流式處理類庫,直接提供具體的類給開發者調用,整個應用的運行方式主要由開發者控制,方便使用和調試. 就流式處理系統而言,基本都支持kafka作為數據源.例如,storm具有專門的kafka-spout,而spark也提供專門的spark-streaming-kafka模塊.事實上,kafka基本上是主流的流式處理系統的標準數據源,換而言之,大部分流式系統中都已部署了kafka,此時使用kafka stream的成本非常低 使用Storm或Spark Streaming時,需要為框架本身的進程預留資源,如Storm的supervisor和Spark on YARN的node manager。即使對于應用實例而言,框架本身也會占用部分資源,如Spark Streaming需要為shuffle和storage預留內存。但是Kafka作為類庫不占用系統資源。 由于Kafka本身提供數據持久化,因此Kafka Stream提供滾動部署和滾動升級以及重新計算的能力。 由于Kafka Consumer Rebalance機制,Kafka Stream可以在線動態調整并行度 # 例子 **需求** 實時處理單詞帶有">>>"前綴的內容.例如輸入"123>>>abc",最終處理成abc 需求分析 數據清洗案例 ![](https://box.kancloud.cn/07de74fcbf992bcc671732e6c07d8c19_821x168.png) **代碼** 主類 ~~~ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; //泛型一般用序列化的字節數組不用string public class LogProcessor implements Processor<byte[], byte[]> { private ProcessorContext context; //初始化 @Override public void init(ProcessorContext processorContext) { this.context = processorContext; } //處理,實時處理,每條都調用這個 @Override public void process(byte[] bytes, byte[] bytes2) { //原來數據 String inputOri = new String(bytes2); //如果包含>>>則去除 if (inputOri.contains(">>>")) { inputOri = inputOri.split(">>>")[1].trim(); } //把數據寫出去 context.forward(bytes, inputOri.getBytes()); } //周期性調用 @Override public void punctuate(long l) { } //釋放資源 @Override public void close() { } } ~~~ 啟動類 ~~~ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; import java.util.Properties; public class Application { public static void main(String[] args) { //來自的topic String fromTopic = "test2"; //目的的topic String toTopic = "test3"; //設置參數 Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092,slave2:9092"); //實例化StreamConfig StreamsConfig config = new StreamsConfig(props); //構建拓撲 TopologyBuilder builder = new TopologyBuilder(); //定義kafka stream處理數據的來源,名字隨便起 builder.addSource("SOURCE", fromTopic) .addProcessor("PROCESSOR", new ProcessorSupplier<byte[], byte[]>() { @Override public Processor get() { //我們定義的processor return new LogProcessor(); } }, "SOURCE") .addSink("SINK", toTopic, "PROCESSOR"); //根據StreamConfig對象以及用于構建拓補的Builder對象實例化kafka stream KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); } } ~~~ **測試** 生產者 ~~~ kafka-console-producer.sh --broker-list master:9092.slave1:9092,slave2:9092 --topic test2 ~~~ 消費者 ~~~ kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic test3 ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看