<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                **和push方式相反,pull方式要先啟動flume,然后啟動spark streaming** ## 配置文件 ``` simple-agent.sources = netcat-source simple-agent.sinks = spark-sink simple-agent.channels = memory-channel simple-agent.sources.netcat-source.type = netcat simple-agent.sources.netcat-source.bind = spark simple-agent.sources.netcat-source.port = 44444 simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink simple-agent.sinks.spark-sink.hostname = spark simple-agent.sinks.spark-sink.port = 41414 simple-agent.channels.memory-channel.type = memory simple-agent.sources.netcat-source.channels = memory-channel simple-agent.sinks.spark-sink.channel = memory-channel ``` ## 啟動flume ``` flume-ng agent --name simple-agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/flume_pull_streaming.conf -Dflume.root.logger=INFO,console & ``` ## 啟動telnet ``` [bizzbee@spark ~]$ telnet spark 44444 Trying 192.168.31.70... Connected to spark. Escape character is '^]'. ``` ## 運行streaming 代碼 * 運行參數是: spark 41414 * linux主機名 和flume sink的端口 ~~~ object FlumePullWordCount { def main(args: Array[String]): Unit = { if(args.length != 2) { System.err.println("Usage: FlumePushWordCount <hostname> <port>") System.exit(1) } val Array(hostname, port) = args val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePullWordCount") val ssc = new StreamingContext(sparkConf, Seconds(5)) //TODO... 如何使用SparkStreaming整合Flume val flumeStream = FlumeUtils.createPollingStream(ssc, hostname, port.toInt) flumeStream.map(x=> new String(x.event.getBody.array()).trim) .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } } ~~~ ## 向spark主機44444端口發送數據即可。 ## 生產環境運行 * 代碼中注釋掉setMaster。 * 打包。 ``` wangyijiadeMacBook-Air:sparktrain bizzbee$ mvn clean package -DskipTests ``` * 上傳。 ``` scp spark-train-1.0.jar bizzbee@192.168.31.70:~/lib/ ``` * 服務器端啟動streaming
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看