<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                輸入1: TCP 安裝netcat nc -l -p 9999 代碼 ~~~ def main(args: Array[String]): Unit = { val conf=new SparkConf().setMaster("local").setAppName("WordCount") val ssc=new StreamingContext(conf,Seconds(10)); //通過 ssc 創建 Dstream 對象,得到 lines 變量從服務器獲得數據,每條記錄表示一行文本數據 val lines=ssc.socketTextStream("localhost",9999) val words=lines.flatMap(_.split(" ")) //統計單詞個數,words 由 map+reduceByKey 兩個操作,pairs 類型 DStream import org.apache.spark.streaming.StreamingContext._ val pairs=words.map(w=>(w,1)) val wordCounts=pairs.reduceByKey(_+_) wordCounts.print() //啟動上面計算并等待終止 ssc.start() ssc.awaitTermination() } ~~~ 輸入2:文件 ~~~ def main(args: Array[String]): Unit = { println("heelllsssssssssssssssssssssssssssssssssssssss") val conf=new SparkConf().setMaster("local").setAppName("WordCount") val ssc=new StreamingContext(conf,Seconds(10)); //通過 ssc 創建 Dstream 對象,得到 lines 變量從服務器獲得數據,每條記錄表示一行文本數據 val lines=ssc.textFileStream("d:/aa") val words=lines.flatMap(_.split(" ")) //統計單詞個數,words 由 map+reduceByKey 兩個操作,pairs 類型 DStream import org.apache.spark.streaming.StreamingContext._ val pairs=words.map(w=>(w,1)) val wordCounts=pairs.reduceByKey(_+_) wordCounts.print() //啟動上面計算并等待終止 ssc.start() ssc.awaitTermination() } ~~~ 輸入3:kafaka 1) 啟動zookeeper zkServer.sh start 查看zookeeper狀態 zkServer.sh status 2) 啟動kafka bin/kafka-server-start.sh –daemon config/server.properties 3) 創建kafka topic bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 2 --partitions 4 --topic test_1 bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic test_1 ![](https://box.kancloud.cn/4c2987f3c2ae07507d113b8460a13983_576x61.png) 查看topic bin/kafka-topics.sh --list --zookeeper master:2181 4) 啟動生產者消費者 在啟動 kafka-server 之后啟動,運行producer: $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_1 在另一個終端運行 consumer: $ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_1 5) 使用程序替換消費者 ~~~ import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Duration, StreamingContext} def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName("kafka-spark-demo") val scc = new StreamingContext(sparkConf, Duration(5000)) scc.sparkContext.setLogLevel("ERROR") scc.checkpoint(".") // 因為使用到了updateStateByKey,所以必須要設置checkpoint val topics = Set("test") //我們需要消費的kafka數據的topic val brokers = "192.168.80.3:9092" val kafkaParam = Map[String, String]( "zookeeper.connect" -> "192.168.80.3:2181", // "group.id" -> "test-consumer-group", "metadata.broker.list" -> brokers, // kafka的broker list地址 "serializer.class" -> "kafka.serializer.StringEncoder" ) val stream:InputDStream[(String, String)] = createStream(scc, kafkaParam, topics) stream.map(_._2) //取出value .flatMap(_.split(" ")) // 將字符串使用空格分隔 .map(r => (r, 1)) // 每個單詞映射成一個pair .updateStateByKey[Int](updateFunc) // 用當前batch的數據區更新已有的數據 .print() // 打印前10個數據 scc.start() // 真正啟動程序 scc.awaitTermination() //阻塞等待 } val updateFunc = (currentValues: Seq[Int], preValue: Option[Int]) => { val curr = currentValues.sum val pre = preValue.getOrElse(0) Some(curr + pre) } def createStream(scc: StreamingContext, kafkaParam: Map[String, String], topics: Set[String]) = { KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](scc, kafkaParam, topics) } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看