<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                Spark Streaming 通過 Push 和 Pull 兩種方式對接 Flume 數據源。以 Spark Streaming 的角度來看,Push 方式屬于推送(由 Flume 向 Spark 推送)而 Pull 屬于拉取(Spark 拉取 Flume 的輸出)。<br/> 不論以何種方式,開發過程類似,都是由 Spark Streaming 對接 Flume 數據流,Flume 做為 Spark Streaming 的數據源。Push 和 Pull 兩者的差別主要體現在Flume Sink 的不同,而 Flume Source 與 Channel 不會受影響。在演示示例時,Flume Source 以 nectcat 為例,Channel 為 memory,重點關注 Sink 的變化。在下文中也是如此。 [TOC] # 1. Push方式 1. 編寫Flume的配置文件 ```conf # 定義 source, channel, 和sink的名字 a1.sources = s1 a1.channels = c1 a1.sinks = avroSink # 對source的一些設置 a1.sources.s1.type = netcat a1.sources.s1.bind = localhost a1.sources.s1.port = 5678 a1.sources.s1.channels = c1 # 對channel的一些設置 a1.channels.c1.type = memory # 對sink的一些設置 a1.sinks.avroSink.type = avro a1.sinks.avroSink.channel = c1 a1.sinks.avroSink.hostname = hadoop101 a1.sinks.avroSink.port = 9999 ``` 2. 編寫Spark Streaming程序 ```scala package streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @Date 2021/2/8 */ object FlumePushWordCount { def main(args: Array[String]): Unit = { /** ********* 1. 創建StreamingContext ************/ val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[4]") // Seconds(5)是批處理間隔,即將5秒內新收集的數據作為一個單位進行處理 val ssc = new StreamingContext(conf,Seconds(5)) /** ********* 2. 加載數據 ************/ // FlumeUtils.createStream(StreamingContext, hostname, port) val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(ssc,"hadoop101",9999) val lines: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()).trim) /** ************ 3. 進行統計 **************/ val result: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) result.print() // 打印輸出 /** ********* 4. 啟動Streaming程序 ************/ ssc.start() /** *********** 5. 等待應用程序終止 ****************/ ssc.awaitTermination() } } ``` 3. 將上面的Spark Streaming打成jar包,上傳到集群運行 ```shell [root@hadoop101 spark]# bin/spark-submit --class streaming.FlumePushWordCount /opt/software/streaming-1.0-SNAPSHOT-jar-with-dependencies.jar ``` 4. 啟動Flume ```shell [root@hadoop101 flume]# bin/flume-ng agent --name a1 -f myconf/sink_spark-push.conf -Dflume.root.logger=INFO,console ``` 5. 啟動`telnet`并輸入數據 ```shell [root@hadoop101 /]# telnet localhost 5678 Trying ::1... telnet: connect to address ::1: Connection refused Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. hello world spark hadoop hello OK ``` 查看Spark Streaming程序的輸出結果如下 ```txt ------------------------------------------- Time: 1611154690000 ms ------------------------------------------- (spark,1) (hadoop,1) (hello,2) (world,1) ``` <br/> # 2. Pull方式 1. 編寫Flume的配置文件 ```conf # 定義 source, channel, 和sink的名字 a1.sources = s1 a1.channels = c1 a1.sinks = spark # 對source的一些設置 a1.sources.s1.type = netcat a1.sources.s1.bind = localhost a1.sources.s1.port = 5678 a1.sources.s1.channels = c1 # 對channel的一些設置 a1.channels.c1.type = memory # 對sink的一些設置 # 需要將spark-streaming-flume_2.11-2.4.4.jar包拷貝到$FLUME_HOME/lib a1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.spark.hostname = hadoop101 a1.sinks.spark.port = 9999 a1.sinks.spark.channel = c1 ``` 2. 上傳Flume sink必要的包到`$FLUME_HOME/lib`目錄【一共6個】 ```txt avro-1.8.2.jar avro-ipc-1.8.2.jar commons-lang3-3.5.jar scala-library-2.11.8.jar spark-streaming-flume_2.11-2.4.4.jar spark-streaming-flume-sink_2.11-2.4.4.jar ``` 我安裝的flume默認已經有下面版本的jar包,為了防止沖突所以需要將其刪除 ```shell [root@hadoop101 lib]# rm -rf avro-1.7.4.jar [root@hadoop101 lib]# rm -rf avro-ipc-1.7.4.jar [root@hadoop101 lib]# rm -rf commons-lang-2.5.jar [root@hadoop101 lib]# rm -rf scala-library-2.10.5.jar ``` ```shell [root@hadoop101 flume-sink]# ls | grep spark spark-streaming-flume_2.11-2.4.4.jar spark-streaming-flume-sink_2.11-2.4.4.jar ``` 3. 編寫Spark Streaming程序 ```scala package streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * @Date 2021/2/8 */ object FlumePullWordCount { def main(args: Array[String]): Unit = { /** ********* 1. 創建StreamingContext ************/ val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[4]") // Seconds(5)是批處理間隔,即將5秒內新收集的數據作為一個單位進行處理 val ssc = new StreamingContext(conf, Seconds(5)) /** ********* 2. 加載數據 ************/ // FlumeUtils.createPollingStream(StreamingContext, hostname, port) val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, "hadoop101", 9999) val lines: DStream[String] = flumeStream.map(x => new String(x.event.getBody.array()).trim) /** ************ 3. 進行統計 **************/ val result: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) result.print() // 打印輸出 /** ********* 4. 啟動Streaming程序 ************/ ssc.start() /** *********** 5. 等待應用程序終止 ****************/ ssc.awaitTermination() } } ``` 4. 啟動Flume ```shell [root@hadoop101 flume]# bin/flume-ng agent --name a1 -f myconf/sink_spark-pull.conf -Dflume.root.logger=INFO,console ``` 5. 將上面的Spark Streaming打成jar包,上傳到集群運行 ```shell [root@hadoop101 spark]# bin/spark-submit --class streaming.FlumePullWordCount /opt/software/streaming-1.0-SNAPSHOT-jar-with-dependencies.jar ``` 6. 啟動`telnet`并輸入數據 ```shell [root@hadoop101 /]# telnet localhost 5678 Trying ::1... telnet: connect to address ::1: Connection refused Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. hello world spark hadoop hello OK ``` 查看Spark Streaming程序的輸出結果如下 ```txt ------------------------------------------- Time: 1611154690000 ms ------------------------------------------- (spark,1) (hadoop,1) (hello,2) (world,1) ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看