<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                [TOC] # 老API ~~~ import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class OldProducer { @SuppressWarnings("deprecation") public static void main(String[] args) { Properties properties = new Properties(); properties.put("metadata.broker.list", "master:9092"); properties.put("request.required.acks", "1"); properties.put("serializer.class", "kafka.serializer.StringEncoder"); Producer<Integer, String> producer = new Producer<Integer,String>(new ProducerConfig(properties)); KeyedMessage<Integer, String> message = new KeyedMessage<Integer, String>("first", "hello world"); producer.send(message ); } } ~~~ # 新API ## 創建生產者 ~~~ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; //生產者,這個api是只管發 public class CustomProducer { public static void main(String[] args) { Properties props = new Properties(); //kafka服務端的主機名和端口號 props.put("bootstrap.servers", "master:9092"); //等待所有副本節點的應答,如果你想3個副本應答就可以的話,這邊也可以寫3 props.put("acks", "all"); //消息發送最大嘗試次數 props.put("retries", 0); //一批消息處理大小,批量大小,有點緩存的意思 props.put("batch.size", 16384); //請求延時,一直請求 props.put("linger.ms", 1); //發送緩存區內存大小,攢到多少才寫到磁盤上 props.put("buffer.memory", 33554432); //key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //可以在new producer之前添加自定義攔截器 //ArrayList<Object> arrayList = new ArrayList<>(); //arrayList.add("com.jdxia.interceptor.TimeInterceptor"); //arrayList.add("com.jdxia.interceptor.CounterInterceptor"); //props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, arrayList); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 50; i++) { //發送數據,消息發送到test2這個主題,后面參數是key和value producer.send(new ProducerRecord<>("test1", Integer.toString(i), "hello world-" + i)); //還可以指定分區,這邊是指定第0個分區 //producer.send(new ProducerRecord<String, String>("test2", 0 ,Integer.toString(i), "hello world-" + i)); } producer.close(); } } ~~~ ## 創建生產者(新API) ~~~ import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; //生產者 public class CallBackProducer { public static void main(String[] args) { Properties props = new Properties(); //kafka服務端的主機名和端口號 props.put("bootstrap.servers", "master:9092"); /* * request.required.acks,設置發送數據是否需要服務端的反饋,有三個值0,1,-1 * 0,意味著producer永遠不會等待一個來自broker的ack,這就是0.7版本的行為。 * 這個選項提供了最低的延遲,但是持久化的保證是最弱的,當server掛掉的時候會丟失一些數據。 * 1,意味著在leader replica已經接收到數據后,producer會得到一個ack。 * 這個選項提供了更好的持久性,因為在server確認請求成功處理后,client才會返回。 * 如果剛寫到leader上,還沒來得及復制leader就掛了,那么消息才可能會丟失。 * -1,意味著在所有的ISR都接收到數據后,producer才得到一個ack。 * 這個選項提供了最好的持久性,只要還有一個replica存活,那么數據就不會丟失 */ //props.put("request.required.acks", "1"); //等待所有副本節點的應答,如果你想3個副本應答就可以的話,這邊也可以寫3 props.put("acks", "all"); //消息發送最大嘗試次數 props.put("retries", 0); //一批消息處理大小,批量大小,有點緩存的意思 props.put("batch.size", 16384); //請求延時,一直請求 props.put("linger.ms", 1); //發送緩存區內存大小,攢到多少才寫到磁盤上 props.put("buffer.memory", 33554432); //key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //自定義分區,可以指定自定義分區,也可以不指定 //props.put("partitioner.class", "com.jdxia.kafka.CustomPartitioner"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props); for (int i = 0; i < 50; i++) { //也可以指定分區h和key,這邊分區是第0個分區,key是aaa //kafkaProducer.send(new ProducerRecord<String, String>("test1", 0, "aaa", "hello" + i), new Callback() { //test1是topic,后面是發送的消息 kafkaProducer.send(new ProducerRecord<String, String>("test1", "hello" + i), new Callback() { //發送完成的方法,如果有異常可以拿到異常的 @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (recordMetadata != null) { //打印發送這條數據,在哪個分區和他的偏移量 System.out.printf(recordMetadata.partition() + "---" + recordMetadata.offset()); } } }); } kafkaProducer.close(); } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看