<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                # 一個快速的例子 在我們進入如何編寫Spark Streaming程序的細節之前,讓我們快速地瀏覽一個簡單的例子。在這個例子中,程序從監聽TCP套接字的數據服務器獲取文本數據,然后計算文本中包含的單詞數。做法如下: 首先,我們導入Spark Streaming的相關類以及一些從StreamingContext獲得的隱式轉換到我們的環境中,為我們所需的其他類(如DStream)提供有用的方法。[StreamingContext](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.StreamingContext)是Spark所有流操作的主要入口。然后,我們創建了一個具有兩個執行線程以及1秒批間隔時間(即以秒為單位分割數據流)的本地StreamingContext。 ~~~ import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ // Create a local StreamingContext with two working thread and batch interval of 1 second val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) ~~~ 利用這個上下文,我們能夠創建一個DStream,它表示從TCP源(主機位localhost,端口為9999)獲取的流式數據。 ~~~ // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999) ~~~ 這個`lines`變量是一個DStream,表示即將從數據服務器獲得的流數據。這個DStream的每條記錄都代表一行文本。下一步,我們需要將DStream中的每行文本都切分為單詞。 ~~~ // Split each line into words val words = lines.flatMap(_.split(" ")) ~~~ `flatMap`是一個一對多的DStream操作,它通過把源DStream的每條記錄都生成多條新記錄來創建一個新的DStream。在這個例子中,每行文本都被切分成了多個單詞,我們把切分的單詞流用`words`這個DStream表示。下一步,我們需要計算單詞的個數。 ~~~ import org.apache.spark.streaming.StreamingContext._ // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ~~~ `words`這個DStream被mapper(一對一轉換操作)成了一個新的DStream,它由(word,1)對組成。然后,我們就可以用這個新的DStream計算每批數據的詞頻。最后,我們用`wordCounts.print()`打印每秒計算的詞頻。 需要注意的是,當以上這些代碼被執行時,Spark Streaming僅僅準備好了它要執行的計算,實際上并沒有真正開始執行。在這些轉換操作準備好之后,要真正執行計算,需要調用如下的方法 ~~~ ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate ~~~ 完整的例子可以在[NetworkWordCount](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala)中找到。 如果你已經下載和構建了Spark環境,你就能夠用如下的方法運行這個例子。首先,你需要運行Netcat作為數據服務器 ~~~ $ nc -lk 9999 ~~~ 然后,在不同的終端,你能夠用如下方式運行例子 ~~~ $ ./bin/run-example streaming.NetworkWordCount localhost 9999 ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看