<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                自定義數據源即自定義 Receiver。自定義接收器必須通過實現兩個方法來擴展 Receiver 抽象類。 * onStart():開始接收數據要做的事情。 * onStop():停止接收數據的操作。 onStart()和 onStop()不能無限阻塞。通常,onStart()將啟動負責接收數據的線程,而 onStop()將確保這些接收數據的線程被停止。接收線程也可以使用 isStopped() 方法來檢查它們是否應該停止接收數據。 <br/> 一旦接收到數據,就可以通過調用 store(data)將數據存儲在 Spark 中,這是Receiver 類提供的方法。store()有多種形式,可以一次存儲接收到的數據記錄,也可以作為對象/序列化字節的整個集合。注意,用于實現接收器的 store() 的風格會影響其可靠性和容錯語義。 <br/> 接收線程中的任何異常都應該被捕獲并正確處理,以避免接收方的無聲故障。restart() 將通過異步調用 onStop()和延遲后調用 onStart()來重新啟動接收器。stop()將調用 onStop()并終止接收方。此外reportError() 在不停止/重新啟動接收器的情況下向驅動程序報告錯誤消息(在日志和 UI 中可見)。 <br/> 下面是一個自定義 Socket 接收器示例。 ```scala import java.io.{BufferedReader, InputStreamReader} import java.net.Socket import java.nio.charset.StandardCharsets import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.receiver.Receiver class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { def onStart() { //啟動接收線程 new Thread("Socket Receiver") { override def run() { receive() } }.start() } def onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself if isStopped() returns false } /** 創建 Socket 連接并接收數據直到 receiver 停止 */ private def receive() { var socket: Socket = null var userInput: String = null try { // 連接到 host:port socket = new Socket(host, port) //讀 Socket val reader = new BufferedReader( new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) userInput = reader.readLine() while (!isStopped && userInput != null) { store(userInput) //存儲接收到的數據,抽象類已實現 userInput = reader.readLine() } reader.close() socket.close() // 重新啟動,當服務器再次激活時嘗試重新連接 restart("Trying to connect again") } catch { case e: java.net.ConnectException => // 如果無法連接到服務器,重新啟動 restart("Error connecting to " + host + ":" + port, e) case t: Throwable => // 如果有任何其他錯誤,重新啟動 restart("Error receiving data", t) } } } object SparkDemo1 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]"); // Seconds(5)是批處理間隔,即將5秒內新收集的數據作為一個單位進行處理 val ssc: StreamingContext = new StreamingContext(conf, Seconds(5)) val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port)) val words = customReceiverStream.flatMap(_.split(" ")) } } ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看