<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                [TOC] # 攔截器原理 producer攔截器(interceptor)是在kafka 0.10版本被引入的,主要用于實現clients端的定制化控制邏輯. 對于producer而言,interceptor使得用戶在消息發送前以及producer回調邏輯前有機會對消息做一些定制化需求,比如**修改消息**等. 同時,producer允許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain). Interceptor的實現接口是`org.apache.kafka.clients.producerInterceptor`,其定義的方法包括: 1. configure(configs) 獲取配置信息和初始化數據時調用 2. onSend(ProducerRecord) 該方法封裝進`KafkaProducer.send`方法中,即它運行在用戶主線程中。Producer確保在消息被序列化以及計算分區前調用該方法。**用戶可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的topic和分區**,否則會影響目標分區的計算 3. onAcknowledgement(RecordMetadata, Exception) **該方法會在消息被應答或消息發送失敗時調用**,并且通常都是在producer回調邏輯觸發之前。onAcknowledgement運行在producer的IO線程中,因此不要在該方法中放入很重的邏輯,否則會拖慢producer的消息發送效率 4. close **關閉interceptor,主要用于執行一些資源清理工作** 如前所述,interceptor可能被運行在多個線程中,因此在具體實現時用戶需要自行確保線程安全。另外**倘若指定了多個interceptor,則producer將按照指定順序調用它們**,并僅僅是捕獲每個interceptor可能拋出的異常記錄到錯誤日志中而非在向上傳遞。這在使用過程中要特別留意。 # 例子 需求: 實現一個簡單的雙interceptor組成的攔截鏈.第一個interceptor會在消息發送前將時間戳信息加到value的最前部位.第二個interceptor會在消息發送后更新成功發送消息數或失敗發送消息數. ![](https://box.kancloud.cn/e9eebc0cb26c1a680e953d65cf416820_1193x562.png) **TimeInterceptor** ~~~ import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class TimeInterceptor implements ProducerInterceptor<String, String> { //消息會傳到這,還沒有進入kafka集群 @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) { //一般不改變原來的主題和分區,我們給他的value加上時間 return new ProducerRecord<String, String>(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), System.currentTimeMillis() + producerRecord.value()); } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } } ~~~ **CounterInterceptor** ~~~ import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class CounterInterceptor implements ProducerInterceptor<String, String> { //成功個數統計 private long successCount = 0; //失敗個數統計 private long errorCount = 0; //消息不做改變 @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) { return producerRecord; } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { if (e == null) { successCount++; } else { errorCount++; } } @Override public void close() { System.out.println("成功的個數" + successCount); System.out.println("失敗的個數" + errorCount); } @Override public void configure(Map<String, ?> map) { } } ~~~ 添加到kafka的生產者中 ![](https://box.kancloud.cn/5477050eaae124b0643e66b8d164235d_1618x764.png)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看