<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                [TOC] # 高級API ~~~ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; //消費者 public class CustomConsumer { public static void main(String[] args) { Properties props = new Properties(); //定義kafka服務器的地址,不需要將所有broker指定上 props.put("bootstrap.servers", "master:9092"); //制定consumer group props.put("group.id", "g1"); //是否自動確認offset props.put("enable.auto.commit", "true"); //自動確認offset的時間間隔,每隔多少時間確認offset是否正確 props.put("auto.commit.interval.ms", "1000"); // key的反序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的反序列化類 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 定義consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //消費者訂閱的topic,可同時訂閱多個 consumer.subscribe(Arrays.asList("test1")); while (true) { //讀取數據,讀取超時時間為100ms,100ms滾動一次 ConsumerRecords<String, String> records = consumer.poll(100); //消息不是一條一條的傳送的,上面定義的是100ms讀取一堆數據,那這些數據要循環 for (ConsumerRecord<String, String> record : records) { System.out.printf("Topic = %s, PartitionId = %d, offset = %d, key = %s, value = %s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } } ~~~ 用多線程方式創建出來 ~~~ import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class KafkaConsumerSimple implements Runnable { public String title; public KafkaStream<byte[], byte[]> stream; public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) { //獲取自己的消費編號,以及要消耗的kafkaStream this.title = title; this.stream = stream; } @Override public void run() { System.out.println("開始運行 " + title); //從kafkaStream中獲取一個迭代器 ConsumerIterator<byte[], byte[]> it = stream.iterator(); /** * 不停地從stream讀取新到來的消息,在等待新的消息時,hasNext()會阻塞 * 如果調用 `ConsumerConnector#shutdown`,那么`hasNext`會返回false * */ while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> data = it.next(); String topic = data.topic(); int partition = data.partition(); long offset = data.offset(); String msg = new String(data.message()); System.out.println(String.format( "Consumer: [%s], Topic: [%s], PartitionId: [%d], Offset: [%d], msg: [%s]", title, topic, partition, offset, msg)); } System.err.println(String.format("Consumer: [%s] exiting ...", title)); } public static void main(String[] args) throws Exception { // 準備些配置參數 Properties props = new Properties(); props.put("group.id", "testGroup"); props.put("zookeeper.connect", "master:2181,slave1:2181,slave2:2181"); //從最新的開始消費 props.put("auto.offset.reset", "largest"); props.put("auto.commit.interval.ms", "1000"); props.put("partition.assignment.strategy", "roundrobin"); ConsumerConfig config = new ConsumerConfig(props); //準備要消耗的topic String topic = "order"; //創建一個consumer的連接器 //只要ConsumerConnector還在的話,consumer會一直等待新消息,不會自己退出 ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config); //創建topicCountMap Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); //創建幾個kafkaStream,分片數 topicCountMap.put(topic, 1); //獲取每個topic對應的kafkaStream //Map的key就是topic,value就是kafkaStream Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap); //取出對應的 streams List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic); //創建一個容量為4的線程池 ExecutorService executor = Executors.newFixedThreadPool(4); //創建20個consumer threads for (int i = 0; i < streams.size(); i++) executor.execute(new KafkaConsumerSimple("消費者" + (i + 1), streams.get(i))); } } ~~~ # 低級API 實現使用低級API讀取topic,指定partition,指定offset的數據 消費者使用低級API的主要步驟 | 步驟 | 主要工作 | |---|---| | 1 | 根據指定的分區從主題元數據中找到主副本 | | 2 | 獲取分區最新消費進度 | | 3 | 從主副本拉取分區的消息 | | 4 | 識別主副本變化,重試 | 方法描述 | 方法 | 描述 | |---|---| | findLeader() | 客戶端向種子節點發送主題元數據,將副本集加入備用節點 | | getLastOffset() | 消費者客戶端發送偏移量請求,獲取分區最近的偏移量 | | run() | 消費者低級API拉取消息的主要方法 | | findNewLeader() | 當分區的主副本節點發送故障,客戶將要找出新的主副本 | ~~~ import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.javaapi.*; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.MessageAndOffset; import java.util.ArrayList; import java.util.Collections; import java.util.List; public class MySimpleConsumer { //消費指定主題,指定分區,指定偏移量數據 public static void main(String[] args) { //kafka集群 ArrayList<String> brokers = new ArrayList<>(); brokers.add("master"); brokers.add("slave1"); brokers.add("slave2"); //端口號 int port = 9092; //主題 String topic = "second"; //分區號 int partition = 0; //偏移量 long offset = 15; MySimpleConsumer mySimpleConsumer = new MySimpleConsumer(); mySimpleConsumer.getData(brokers, port, topic, partition, offset); } public void getData(List<String> brokers, int port, String topic, int partition, long offset) { PartitionMetadata partitionMetadata = getLeader(brokers, port, topic, partition); //獲取指定分區的leader(String) String leader = partitionMetadata.leader().host(); //獲取consumer對象 SimpleConsumer consumer = new SimpleConsumer(leader, port, 1000, 1024 * 4, "client" + topic); //拉取數據請求 FetchRequest fetchRequest = new FetchRequestBuilder().clientId("client" + topic).addFetch(topic, partition, offset, 1000).build(); //拉取數據 FetchResponse fetchResponse = consumer.fetch(fetchRequest); ByteBufferMessageSet messageAndOffsets = fetchResponse.messageSet(topic, partition); //打印數據信息 for (MessageAndOffset messageAndOffset : messageAndOffsets) { String s = String.valueOf(messageAndOffset.message().payload().get()); System.out.println("offset:" + messageAndOffset.offset() + "---" + s); } consumer.close(); } //獲取分區leader public PartitionMetadata getLeader(List<String> brokers, int port, String topic, int partition) { SimpleConsumer consumer = null; for (String broker : brokers) { consumer = new SimpleConsumer(broker, port, 1000, 1024 * 4, "client"); TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Collections.singletonList(topic)); //獲取topic元數據信息 TopicMetadataResponse topicMetadataResponse = consumer.send(topicMetadataRequest); List<TopicMetadata> topicMetadata = topicMetadataResponse.topicsMetadata(); for (TopicMetadata topicMetadatum : topicMetadata) { //獲取一個topic中所有的分區元數據信息 List<PartitionMetadata> partitionMetadata = topicMetadatum.partitionsMetadata(); for (PartitionMetadata partitionMetadatum : partitionMetadata) { if (partitionMetadatum.partitionId() == partition) { consumer.close(); return partitionMetadatum; } } } } assert consumer != null; consumer.close(); return null; } } ~~~ 添加查找新leader等功能 ~~~ import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.javaapi.*; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.MessageAndOffset; import java.util.ArrayList; import java.util.Collections; import java.util.List; public class MySimpleConsumer { //消費指定主題,指定分區,指定偏移量數據 public static void main(String[] args) { //kafka集群 ArrayList<String> brokers = new ArrayList<>(); brokers.add("192.168.9.102"); brokers.add("192.168.9.103"); brokers.add("192.168.9.104"); //端口號 int port = 9092; //主題 String topic = "second"; //分區號 int partition = 0; //偏移量 long offset = 15; MySimpleConsumer mySimpleConsumer = new MySimpleConsumer(); mySimpleConsumer.getData(brokers, port, topic, partition, offset); } public void getData(List<String> brokers, int port, String topic, int partition, long offset) { PartitionMetadata partitionMetadata = getLeader(brokers, port, topic, partition); //獲取指定分區的leader(String) String leader = partitionMetadata.leader().host(); //獲取consumer對象 SimpleConsumer consumer = new SimpleConsumer(leader, port, 1000, 1024 * 4, "client" + topic); //拉取數據請求 FetchRequest fetchRequest = new FetchRequestBuilder().clientId("client" + topic).addFetch(topic, partition, offset, 1000).build(); //拉取數據 FetchResponse fetchResponse = consumer.fetch(fetchRequest); ByteBufferMessageSet messageAndOffsets = fetchResponse.messageSet(topic, partition); //打印數據信息 for (MessageAndOffset messageAndOffset : messageAndOffsets) { String s = String.valueOf(messageAndOffset.message().payload().get()); System.out.println("offset:" + messageAndOffset.offset() + "---" + s); } consumer.close(); } //獲取分區leader public PartitionMetadata getLeader(List<String> brokers, int port, String topic, int partition) { SimpleConsumer consumer = null; for (String broker : brokers) { consumer = new SimpleConsumer(broker, port, 1000, 1024 * 4, "client"); TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(Collections.singletonList(topic)); //獲取topic元數據信息 TopicMetadataResponse topicMetadataResponse = consumer.send(topicMetadataRequest); List<TopicMetadata> topicMetadata = topicMetadataResponse.topicsMetadata(); for (TopicMetadata topicMetadatum : topicMetadata) { //獲取一個topic中所有的分區元數據信息 List<PartitionMetadata> partitionMetadata = topicMetadatum.partitionsMetadata(); for (PartitionMetadata partitionMetadatum : partitionMetadata) { if (partitionMetadatum.partitionId() == partition) { consumer.close(); return partitionMetadatum; } } } } assert consumer != null; consumer.close(); return null; } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看