<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                ## 一個快速的例子 在我們進入如何編寫Spark Streaming程序的細節之前,讓我們快速地瀏覽一個簡單的例子。在這個例子中,程序從監聽TCP套接字的數據服務器獲取文本數據,然后計算文本中包含的單詞數。做法如下: 首先,我們導入Spark Streaming的相關類以及一些從StreamingContext獲得的隱式轉換到我們的環境中,為我們所需的其他類(如DStream)提供有用的方法。[StreamingContext](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext) 是Spark所有流操作的主要入口。然后,我們創建了一個具有兩個執行線程以及1秒批間隔時間(即以秒為單位分割數據流)的本地StreamingContext。 ```scala import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ // Create a local StreamingContext with two working thread and batch interval of 1 second val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) ``` 利用這個上下文,我們能夠創建一個DStream,它表示從TCP源(主機位localhost,端口為9999)獲取的流式數據。 ```scala // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999) ``` 這個`lines`變量是一個DStream,表示即將從數據服務器獲得的流數據。這個DStream的每條記錄都代表一行文本。下一步,我們需要將DStream中的每行文本都切分為單詞。 ```scala // Split each line into words val words = lines.flatMap(_.split(" ")) ``` `flatMap`是一個一對多的DStream操作,它通過把源DStream的每條記錄都生成多條新記錄來創建一個新的DStream。在這個例子中,每行文本都被切分成了多個單詞,我們把切分 的單詞流用`words`這個DStream表示。下一步,我們需要計算單詞的個數。 ```scala import org.apache.spark.streaming.StreamingContext._ // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ``` `words`這個DStream被mapper(一對一轉換操作)成了一個新的DStream,它由(word,1)對組成。然后,我們就可以用這個新的DStream計算每批數據的詞頻。最后,我們用`wordCounts.print()` 打印每秒計算的詞頻。 需要注意的是,當以上這些代碼被執行時,Spark Streaming僅僅準備好了它要執行的計算,實際上并沒有真正開始執行。在這些轉換操作準備好之后,要真正執行計算,需要調用如下的方法 ```scala ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate ``` 完整的例子可以在[NetworkWordCount](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala)中找到。 如果你已經下載和構建了Spark環境,你就能夠用如下的方法運行這個例子。首先,你需要運行Netcat作為數據服務器 ```shell $ nc -lk 9999 ``` 然后,在不同的終端,你能夠用如下方式運行例子 ```shell $ ./bin/run-example streaming.NetworkWordCount localhost 9999 ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看