<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                # 多線程 Consumer Instance ## Kafka Java Consumer 設計原理 * Kafka Java Consumer 是單線程設計 * 從 Kafka V0.10.1.0,KafkaConsumer 是雙線程:用戶主線程 & 心跳線程 * 用戶主線程 * 啟動 Consumer 應用 main 方法的線程 * 心跳線程 * 只負責定期給對應的 Broker 發送心跳,標示 Consumer 的存活性(liveness) * 新版本設計:單線程 + 輪詢機制: * 實現非阻塞式的消息獲取 ## 多線程方案 * KafkaConsumer 類不是 thread-safe * 所有的網絡 IO 處理都是發生在用戶主線程中 * 不能在多個線程中共享同一個 KafkaConsumer 實例 * 可以使用 `KafkaConsumer.wakeup()` 在其他線程中喚醒 Consumer 基于非 thread-safe,兩套多線程方案 * 消費者程序啟動多個線程,每個線程維護專屬的 KafkaConsumer Instance,負責完整的消息獲取、消息處理流程 * 消費者程序使用單或多線程獲取消息,同時創建多個消費線程執行消息處理邏輯 * 處理消息交由特定的線程池來做 * 將消息獲取與處理解耦 ![](https://img.kancloud.cn/40/70/4070c15055bf275c44cb7b470fb1f850_696x326.jpeg) ## Code ### 方案 1 ``` public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public void run() { try { consumer.subscribe(Arrays.asList("topic")); while (!closed.get()) { ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); // 執行消息處理邏輯 } } catch (WakeupException e) { // Ignore exception if closing if (!closed.get()) throw e; } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } ``` ### 方案 2 ``` private final KafkaConsumer<String, String> consumer; private ExecutorService executors; ... private int workerNum = ...; executors = new ThreadPoolExecutor( workerNum, workerNum, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); ... while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); for (final ConsumerRecord record : records) { executors.submit(new Worker(record)); } } .. ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看