RDD的transformation和action可以組成起來完成復雜的計算。 比如查找包含最多單詞的一行:
~~~
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
~~~
第一步map一行包含的單詞數到一個整數, 第二步調用reduce得到最大的單詞數。map和reduce的參數都是lambda表達式(closures), 可以調用 Scala/Java庫. 例如我們很容易的調用在其它地方聲明的方法。 這里我們使用`Math.max()`函數簡化代碼:
~~~
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
~~~
一個通用的數據流模式就是MapReduce,在Hadoop中相當流行. Spark實現MapReduce流很容易:
~~~
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8
~~~
此處我們使用flatMap, map 和 reduceByKey轉換來計算文件中每個單詞的頻度。 為了收集單詞頻度結果,我們可以調用collect action:
~~~
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
~~~