### Direct方式
這種方法不使用接收者來接收數據,而是定期查詢Kafka在每個主題+分區中的最新偏移量,并相應地定義每個批處理中的偏移范圍。
* 編碼
**運行參數**
```
spark:9092 kafka_streaming_topic
```
~~~
object KafkaDirectWordCount {
def main(args: Array[String]): Unit = {
if(args.length != 2) {
System.err.println("Usage: KafkaDirectWordCount <brokers> <topics>")
System.exit(1)
}
val Array(brokers, topics) = args
val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers)
val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
ssc,kafkaParams,topicsSet
)
messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
~~~
* 打包-上傳
* 啟動streaming
```
spark-submit --class com.bizzbee.spark.streaming.KafkaDirectWordCount --master local[2] --name KafkaDirectWordCount --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 /home/bizzbee/lib/spark-train-1.0.jar spark:9092 kafka_streaming_topic
```