<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                * 使用idea 創建新的maven工程,使用scala-simple原型。 ![](https://img.kancloud.cn/9c/bf/9cbf1e0d7b60ba2ba2307b0655afc073_844x470.png) ~~~xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.bizzbee.spark</groupId> <artifactId>spark-train</artifactId> <version>1.0</version> <inceptionYear>2008</inceptionYear> <properties> <scala.version>2.11.8</scala.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>2.1.1</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <version>2.9</version> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting> </project> ~~~ * 之后創建一個簡單的Kafka生產者類。 ~~~java package com.bizzbee.spark.kafka; //import util.properties packages import java.util.Properties; //import simple producer packages import org.apache.kafka.clients.producer.Producer; //import KafkaProducer packages import org.apache.kafka.clients.producer.KafkaProducer; //import ProducerRecord packages import org.apache.kafka.clients.producer.ProducerRecord; //Create java class named “SimpleProducer" /* * 簡單的生產者示例 * * */ public class SimpleKafkaProducer{ public static void main(String[] args) throws InterruptedException { // Check arguments length value // if(args.length == 0){ // System.out.println("Enter topic name"); // return; // } //Assign topicName to string variable String topicName = "bizzbee-replicated-topic"; // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id //請一定將服務器ip配到hosts中去!!!!!!!!有毒。。。 props.put("bootstrap.servers", "spark:9095,spark:9094,spark:9093"); // //Set acknowledgements for producer requests. props.put("acks", "all"); //If the request fails, the producer can automatically retry, props.put("retries", 3); //Specify buffer size in config //props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. //props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer <String, String>(props); for(int i = 0; i < 100; i++){ producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); Thread.sleep(1000); System.out.println("Message sent successfully"); } producer.flush(); } } ~~~ * 請一定注意將服務器ip配置到hosts當中去!!!!!!!! *然后檢查是否開啟zookeeper以及Kafka。 * 檢查topic對應的bloker。 * `kafka-topics.sh --describe --zookeeper spark:2181` ``` Topic:bizzbee PartitionCount:1 ReplicationFactor:1 Configs: Topic: bizzbee Partition: 0 Leader: 3 Replicas: 3 Isr: 3 Topic:bizzbee-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: bizzbee-replicated-topic Partition: 0 Leader: 3 Replicas: 1,3,2 Isr: 3,2,1 Topic:bizzbee-topic PartitionCount:1 ReplicationFactor:1 Configs: Topic: bizzbee-topic Partition: 0 Leader: 3 Replicas: 3 Isr: 3 Topic:bizzbee_topic PartitionCount:1 ReplicationFactor:1 Configs: Topic: bizzbee_topic Partition: 0 Leader: 2 Replicas: 2 Isr: 2 Topic:jjj PartitionCount:1 ReplicationFactor:1 Configs: Topic: jjj Partition: 0 Leader: 2 Replicas: 2 Isr: 2 ``` * 可以看到當前的topic :bizzbee-replicated-topic有三個blocker,并且都是啟動狀態。所以當前生產者向這三個bloker進行生產。 * 然后在服務器終端起一個消費者。bloker寫三個之中的一個就可以了。 `kafka-console-consumer.sh --bootstrap-server spark:9093 --topic bizzbee-replicated-topic` ![](https://img.kancloud.cn/53/48/5348190aea2350323272afedcddd6c2c_844x470.png)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看