輸入1: TCP
安裝netcat
nc -l -p 9999
代碼
~~~
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("WordCount")
val ssc=new StreamingContext(conf,Seconds(10));
//通過 ssc 創建 Dstream 對象,得到 lines 變量從服務器獲得數據,每條記錄表示一行文本數據
val lines=ssc.socketTextStream("localhost",9999)
val words=lines.flatMap(_.split(" "))
//統計單詞個數,words 由 map+reduceByKey 兩個操作,pairs 類型 DStream
import org.apache.spark.streaming.StreamingContext._
val pairs=words.map(w=>(w,1))
val wordCounts=pairs.reduceByKey(_+_)
wordCounts.print()
//啟動上面計算并等待終止
ssc.start()
ssc.awaitTermination()
}
~~~
輸入2:文件
~~~
def main(args: Array[String]): Unit = {
println("heelllsssssssssssssssssssssssssssssssssssssss")
val conf=new SparkConf().setMaster("local").setAppName("WordCount")
val ssc=new StreamingContext(conf,Seconds(10));
//通過 ssc 創建 Dstream 對象,得到 lines 變量從服務器獲得數據,每條記錄表示一行文本數據
val lines=ssc.textFileStream("d:/aa")
val words=lines.flatMap(_.split(" "))
//統計單詞個數,words 由 map+reduceByKey 兩個操作,pairs 類型 DStream
import org.apache.spark.streaming.StreamingContext._
val pairs=words.map(w=>(w,1))
val wordCounts=pairs.reduceByKey(_+_)
wordCounts.print()
//啟動上面計算并等待終止
ssc.start()
ssc.awaitTermination()
}
~~~
輸入3:kafaka
1) 啟動zookeeper zkServer.sh start
查看zookeeper狀態 zkServer.sh status
2) 啟動kafka
bin/kafka-server-start.sh –daemon config/server.properties
3) 創建kafka topic
bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 2 --partitions 4 --topic test_1
bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic test_1

查看topic
bin/kafka-topics.sh --list --zookeeper master:2181
4) 啟動生產者消費者
在啟動 kafka-server 之后啟動,運行producer:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_1
在另一個終端運行 consumer:
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_1
5) 使用程序替換消費者
~~~
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, StreamingContext}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local").setAppName("kafka-spark-demo")
val scc = new StreamingContext(sparkConf, Duration(5000))
scc.sparkContext.setLogLevel("ERROR")
scc.checkpoint(".") // 因為使用到了updateStateByKey,所以必須要設置checkpoint
val topics = Set("test") //我們需要消費的kafka數據的topic
val brokers = "192.168.80.3:9092"
val kafkaParam = Map[String, String](
"zookeeper.connect" -> "192.168.80.3:2181",
// "group.id" -> "test-consumer-group",
"metadata.broker.list" -> brokers, // kafka的broker list地址
"serializer.class" -> "kafka.serializer.StringEncoder"
)
val stream:InputDStream[(String, String)] = createStream(scc, kafkaParam, topics)
stream.map(_._2) //取出value
.flatMap(_.split(" ")) // 將字符串使用空格分隔
.map(r => (r, 1)) // 每個單詞映射成一個pair
.updateStateByKey[Int](updateFunc) // 用當前batch的數據區更新已有的數據
.print() // 打印前10個數據
scc.start() // 真正啟動程序
scc.awaitTermination() //阻塞等待
}
val updateFunc = (currentValues: Seq[Int], preValue: Option[Int]) => {
val curr = currentValues.sum
val pre = preValue.getOrElse(0)
Some(curr + pre)
}
def createStream(scc: StreamingContext, kafkaParam: Map[String, String], topics: Set[String]) = {
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](scc, kafkaParam, topics)
}
~~~
- 空白目錄
- 第一章 Linux虛擬機安裝
- 第二章 SSH配置
- 第三章 jdk配置
- 第四章 Hadoop配置-單機
- 第五章 Hadoop配置-集群
- 第六章 HDFS
- 第七章 MapReduce
- 7.1 MapReduce(上)
- 7.2 MapReduce(下)
- 7.3 MapReduce實驗1 去重
- 7.4 MapReduce實驗2 單例排序
- 7.5 MapReduce實驗3 TopK
- 7.6 MapReduce實驗4 倒排索引
- 第八章 Hive
- Hive安裝
- 數據定義
- 數據操作
- 第九章 HBase
- 第十章 SaCa RealRec數據科學平臺
- 第十一章 Spark Core
- 第十二章 Spark Streaming
- 第十章 Spark測試題