<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                # Kafka實戰二 ### 前言 在上一章 [Kafka實戰](http://blog.csdn.net/u013291394/article/details/50231681) 中我們在局域網中搭建了一個Kafka節點,并嘗試了通過命令行腳本來實現本地消息的發布與接收,了解了主從節點之間的關系等。這一章主要實現在本機通過Java代碼實現對局域網中的Kafka節點進行消息的發布與接收。 ### 準備工作 在Java中進行Kafka編程需要依賴kafka和kafka-clients兩個包,下面直接提供maven配置文件pom.xml,不要忘記修改工程名: ~~~ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>Test</groupId> <artifactId>Test</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.9.0.0</version> </dependency> </dependencies> </project> ~~~ ### 生產者 生產者有兩種發布方式,同步和異步。異步方式增加了一個Callback參數來實現在消息成功發送后,開展后續的工作。 ~~~ import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; public class Producer extends Thread { private final KafkaProducer<Integer, String> producer; private final String topic; // 是否需要異步發送 private final Boolean isAsync; // 裝有Kafka的機器的IP地址 private final String serverIp = "10.64.***.***"; public Producer(String topic, Boolean isAsync) { Properties props = new Properties(); props.put("bootstrap.servers", serverIp+":9092"); props.put("client.id", "DemoProducer"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<Integer, String>(props); this.topic = topic; this.isAsync = isAsync; } public void run() { int messageNo = 1; while(true) { String messageStr = "Message_" + messageNo; long startTime = System.currentTimeMillis(); if (isAsync) { producer.send(new ProducerRecord<Integer, String>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr)); } else { try { producer.send(new ProducerRecord<Integer, String>(topic, messageNo, messageStr)).get(); System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } ++messageNo; } } } class DemoCallBack implements Callback { private long startTime; private int key; private String message; public DemoCallBack(long startTime, int key, String message) { this.startTime = startTime; this.key = key; this.message = message; } /** * 當異步發送完成后需要進行的處理 **/ public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { System.out.println( "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() + "), " + "offset(" + metadata.offset() + ") in " + elapsedTime + " ms"); } else { exception.printStackTrace(); } } } ~~~ 調用方式: ~~~ // 開啟生產者線程后,會向Kafka節點中對應的topic發送Message_**類型的消息 boolean isAsync = true; Producer producerThread = new Producer(KafkaProperties.topic, isAsync); producerThread.start(); ~~~ ### 消費者 消費者用來接收特定話題的消息。 ~~~ import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import kafka.utils.ShutdownableThread; public class Consumer extends ShutdownableThread { private final KafkaConsumer<Integer, String> consumer; private final String topic; private final String serverIp = "10.64.***.***"; public Consumer(String topic) { super("KafkaConsumerExample", false); Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverIp+":9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); this.topic = topic; } @Override public void doWork() { consumer.subscribe(Collections.singletonList(this.topic)); ConsumerRecords<Integer, String> records = consumer.poll(1000); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } } @Override public String name() { return null; } @Override public boolean isInterruptible() { return false; } } ~~~ 調用方式: ~~~ //開啟消費者線程后,會接收到之前生產者發送的消息 Consumer consumerThread = new Consumer(KafkaProperties.topic); consumerThread.start(); ~~~ ### 總結 通過上面的簡單的例子,我們就可以在自己的工程中向Kafka發送消息,并接收到自己訂閱的消息了。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看