<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                ## 基于Receiver的整合 * 首先要啟動zookeeper * 啟動kafka ``` [bizzbee@spark bin]$ ./kafka-server-start.sh -daemon /home/bizzbee/app/kafka_2.11-2.1.1/config/server.properties ``` * 創建Topic ``` [bizzbee@spark bin]$ ./kafka-topics.sh --create --zookeeper spark:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic ``` * 查看現有的topic ``` [bizzbee@spark bin]$ ./kafka-topics.sh --list --zookeeper spark:2181 __consumer_offsets bizzbee bizzbee-replicated-topic bizzbee-topic bizzbee_topic jjj kafka_steaming_topic ``` * 控制臺創建生產者 ``` [bizzbee@spark bin]$ ./kafka-console-producer.sh --broker-list spark:9092 --topic kafka_streaming_topic > ``` * 控制臺消費者 ``` kafka-console-consumer.sh --bootstrap-server spark:9092 --topic kafka_streaming_topic //老版本消費者 kafka-console-consumer.sh --zookeeper spark:2181 --topic kafka_streaming_topic ``` * 編程。 **使用kafka0.8以上的整合包** ~~~ groupId = org.apache.spark artifactId = spark-streaming-kafka-0-8_2.11 version = 2.2.0 ~~~ **參數配置** ``` spark:2181 test kafka_streaming_topic 1 ``` ~~~ def main(args: Array[String]): Unit = { if(args.length != 4) { System.err.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>") } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount") .setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap // TODO... Spark Streaming如何對接Kafka val messages = KafkaUtils.createStream(ssc, zkQuorum, group,topicMap) //(null,ddd ddd ddd) println(messages.print()) //ddd ddd ddd println(messages.map(_._2).print()) //aaa //aaa //aaa //aaa //aaa println(messages.map(_._2).flatMap(_.split(" ")).print()) //(aaa,1) //(aaa,1) //(aaa,1) //(aaa,1) //(aaa,1) println(messages.map(_._2).flatMap(_.split(" ")).map((_,1)).print()) // TODO... 自己去測試為什么要取第二個 //(aaa,5) messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } ~~~ * 然后在控制臺生產者發送消息。 ### 打包服務器運行 * 打包 ``` mvn clean package -DskipTests ``` * 上傳 ``` scp target/spark-train-1.0.jar bizzbee@spark:~/lib ``` * 啟動spark streaming ``` spark-submit --class com.bizzbee.spark.streaming.KafkaReceiverWordCount --master local[2] --name KafkaReceiverWordCount --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 /home/bizzbee/lib/spark-train-1.0.jar spark:2181 test kafka_streaming_topic 1 ``` * 查看streaming 的UI界面。 **在服務區4040端口** ![](https://img.kancloud.cn/0c/6c/0c6c43485aae51f5f018335ad610b34a_1006x476.png) * [ ] 啊
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看