## Push的方式
* 從配置文件中拷貝一份配置進行修改。
```
cp exec-memory-avro.conf flume_push_streaming.conf
```
* 配置文件內容
```
simple-agent.sources = netcat-source
simple-agent.sinks = avro-sink
simple-agent.channels = memory-channel
simple-agent.sources.netcat-source.type = netcat
#數據的來源
# 要從本機的44444端口獲取數據
simple-agent.sources.netcat-source.bind = spark
simple-agent.sources.netcat-source.port = 44444
simple-agent.sinks.avro-sink.type = avro
#接受數據的機器
# 實驗時為我的開發機器mac的地址。
simple-agent.sinks.avro-sink.hostname = 192.168.31.131
simple-agent.sinks.avro-sink.port = 41414
#simple-agentwhich buffers events in memory
simple-agent.channels.memory-channel.type = memory
#simple-agente and sink to the channel
simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.avro-sink.channel = memory-channel
```
* 在開發機mac上開發
~~~
object FlumePushWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount");
val ssc = new StreamingContext(sparkConf,Seconds(5));
val flumeStream = FlumeUtils.createStream(ssc,"0.0.0.0",41414)
flumeStream.map(x=>new String(x.event.getBody.array()).trim).flatMap(_.split(" ")).map((_,1))
.reduceByKey(_+_).print();
ssc.start();
ssc.awaitTermination();
}
}
~~~
* 然后啟動,會堅挺開發機的41414端口。
* 啟動linux上的flume。
```
flume-ng agent --name simple-agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/flume_push_streaming.conf -Dflume.root.logger=INFO,console &
```
* 在linux機器上向本機44444端口傳輸數據。
* 注意機器名字要用配置文件中的。(不然連不上,暫不知原因)
[bizzbee@spark conf]$ telnet spark 44444
Trying 192.168.31.70...
Connected to spark.
Escape character is '^]'.
### 提交到生產運行
* 打包
```
mvn clean package -DskipTests
```
* 上傳
```
scp spark-train-1.0.jar bizzbee@192.168.31.70:~/lib
```
* 修改flume配置文件
```
simple-agent.sinks.avro-sink.hostname = spark
```
* 啟動應用程序
* 這里--packages參數第一個是需要下載的依賴,會較長時間下載。
```
spark-submit --class com.bizzbee.spark.streaming.FlumePushWordCount --master local[2] --packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 spark-train-1.0.jar spark 41414
```
* 啟動flume
```
flume-ng agent --name simple-agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/flume_push_streaming.conf -Dflume.root.logger=INFO,console &
```
* 發送數據到44444端口
```
[bizzbee@spark conf]$ telnet spark 44444
Trying 192.168.31.70...
Connected to spark.
Escape character is '^]'.
啊
OK
s
OK
s
OK
OK
```
```
-------------------------------------------
Time: 1587472795000 ms
-------------------------------------------
(sk,2)
(ks,3)
('kk,1)
(k,6)
(jk,1)
```