<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??碼云GVP開源項目 12k star Uniapp+ElementUI 功能強大 支持多語言、二開方便! 廣告
                ********前面 Transformationt 算子的測試都是在本地開發環境中直接跑代碼,這里 Actions 算子的測試主要在 spark-shell 中進行操作。需要說明的 Actions 算子如下: #### 下面來具體說明: **(1)reduce** 通過函數 func 聚集數據集中的所有元素。Func 函數接受 2 個參數,返回一個值。這個函數必須是關聯性的,確保可以被正確的并發執行。 關于 reduce 的執行過程,可以對比 scala 中類似的 reduce 函數。 ~~~ scala> val list = List(1, 2, 3, 4, 5, 6) list: List[Int] = List(1, 2, 3, 4, 5, 6) scala> val listRDD = sc.parallelize(list) listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:29 scala> val ret = listRDD.reduce((v1, v2) => v1 + v2) ... ret: Int = 21 ~~~ 注意:需要注意的是,不同于 Transformation 算子,其結果仍然是 RDD,但是`執行Actions算子之后,其結果不再是RDD,而是一個標量。` **(2)collect** 在 Driver 的程序中,以數組的形式,返回數據集的所有元素。**這通常會在使用 filter 或者其它操作后**,返回一個足夠小的數據子集再使用,直接將整個 RDD 集 Collect 返回,很可能會讓 Driver 程序 OOM,這點尤其需要注意。 ~~~ scala> val list = List(1, 2, 3, 4, 5, 6) list: List[Int] = List(1, 2, 3, 4, 5, 6) scala> val listRDD = sc.parallelize(list) listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:29 scala> val ret = listRDD.collect() ... ret: Array[Int] = Array(1, 2, 3, 4, 5, 6) ~~~ **(3)count** 返回數據集的元素個數 ~~~ scala> val list = List(1, 2, 3, 4, 5, 6) list: List[Int] = List(1, 2, 3, 4, 5, 6) scala> val listRDD = sc.parallelize(list) listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:29 scala> val ret = listRDD.count() ... ret: Long = 6 ~~~ **(4)take** 返回一個數組,由數據集的前 n 個元素組成。注意,這個操作目前并非在多個節點上,并行執行,而是 Driver 程序所在機器,單機計算所有的元素 (Gateway 的內存壓力會增大,需要謹慎使用)。 ~~~ scala> val list = List(1, 2, 3, 4, 5, 6) list: List[Int] = List(1, 2, 3, 4, 5, 6) scala> val listRDD = sc.parallelize(list) listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:29 scala> listRDD.take(3) ... res7: Array[Int] = Array(1, 2, 3) ~~~ **(5)first** 返回數據集的第一個元素(類似于 take(1)) ~~~ scala> val list = List(1, 2, 3, 4, 5, 6) list: List[Int] = List(1, 2, 3, 4, 5, 6) scala> val listRDD = sc.parallelize(list) listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:29 scala> listRDD.first() ... res8: Int = 1 ~~~ **(6)saveAsTextFile** 將數據集的元素,以 textfile 的形式,保存到本地文件系統,hdfs 或者任何其它 hadoop 支持的文件系統。Spark 將會調用每個元素的 toString 方法,并將它轉換為文件中的`一行文本`。 ~~~ scala> val list = List(1, 2, 3, 4, 5, 6) list: List[Int] = List(1, 2, 3, 4, 5, 6) scala> val listRDD = sc.parallelize(list) listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:29 scala> listRDD.saveAsTextFile("file:///home/uplooking/data/spark/action") ... ~~~ 可以在文件系統中查看到保存的文件: ~~~ [root@WGH action]$ pwd /home/uplooking/data/spark/action [root@WGH action]$ ls part-00000 part-00001 part-00002 part-00003 _SUCCESS ~~~ 其實可以看到,保存的跟 Hadoop 的格式是一樣的。 當然因為我的 spark 集群中已經做了跟 hadoop 相關的配置,所以也可以把文件保存到 hdfs 中: ~~~ scala> listRDD.saveAsTextFile("hdfs://ns1/output/spark/action") ... ~~~ 然后就可以在 hdfs 中查看到保存的文件: ~~~ [root@WGH action]$ hdfs dfs -ls /output/spark/action 18/04/27 10:27:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 5 items -rw-r--r-- 3 root supergroup 0 2018-04-27 10:25 /output/spark/action/_SUCCESS -rw-r--r-- 3 root supergroup 2 2018-04-27 10:25 /output/spark/action/part-00000 -rw-r--r-- 3 root supergroup 4 2018-04-27 10:25 /output/spark/action/part-00001 -rw-r--r-- 3 root supergroup 2 2018-04-27 10:25 /output/spark/action/part-00002 -rw-r--r-- 3 root supergroup 4 2018-04-27 10:25 /output/spark/action/part-00003 ~~~ 可以看到,保存的格式跟保存到本地文件系統是一樣的。 **(7)foreach** 在數據集的每一個元素上,運行函數 func。這通常用于更新一個累加器變量,或者和外部存儲系統做交互。 ~~~ scala> val list = List(1, 2, 3, 4, 5, 6) list: List[Int] = List(1, 2, 3, 4, 5, 6) scala> val listRDD = sc.parallelize(list) listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:29 scala> listRDD.foreach(println) ... ~~~ **(8)saveAsNewAPIHadoopFile** 也就是將數據保存到 Hadoop HDFS 中,但是需要注意的是,前面使用 saveAsTextFile 也可以進行相關操作,其使用的就是 saveAsNewAPIHadoopFile 或者 saveAsHadoopFile 這兩個 API,而其兩者的區別是: saveAsHadoopFile 的 OutputFormat 使用的:org.apache.hadoop.mapred 中的早期的類 saveAsNewAPIHadoopFile 的 OutputFormat 使用的:org.apache.hadoop.mapreduce 中的新的類。但不管使用哪一個,都是可以完成工作的。 測試代碼如下: ~~~ import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat import org.apache.spark.{SparkConf, SparkContext} /** * Spark算子操作之Action * saveAsNewAPIHAdoopFile * * saveAsHadoopFile * 和saveAsNewAPIHadoopFile的唯一區別就在于OutputFormat的不同 * saveAsHadoopFile的OutputFormat使用的:org.apache.hadoop.mapred中的早期的類 * saveAsNewAPIHadoopFile的OutputFormat使用的:org.apache.hadoop.mapreduce中的新的類 * 使用哪一個都可以完成工作 * * 前面在使用saveAsTextFile時也可以保存到hadoop文件系統中,注意其源代碼也是使用上面的操作的 * * Caused by: java.net.UnknownHostException: ns1 ... 35 more 找不到ns1,因為我們在本地沒有配置,無法正常解析,就需要將hadoop的配置文件信息給我們加載進來 hdfs-site.xml.heihei,core-site.xml.heihei */ object _05SparkActionOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName(_05SparkActionOps.getClass.getSimpleName) val sc = new SparkContext(conf) val list = List("hello you", "hello he", "hello me") val listRDD = sc.parallelize(list) val pairsRDD = listRDD.map(word => (word, 1)) val retRDD = pairsRDD.reduceByKey((v1, v2) => v1 + v2) retRDD.saveAsNewAPIHadoopFile( "hdfs://ns1/spark/action", // 保存的路徑 classOf[Text], // 相當于mr中的k3 classOf[IntWritable], // 相當于mr中的v3 classOf[TextOutputFormat[Text, IntWritable]] // 設置(k3, v3)的outputFormatClass ) } } ~~~ 之后我們可以在 hdfs 中查看到相應的文件輸出: ~~~ [root@WGH ~]$ hdfs dfs -ls /spark/action 18/04/27 12:07:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 3 items -rw-r--r-- 3 Administrator supergroup 0 2018-04-27 12:07 /spark/action/_SUCCESS -rw-r--r-- 3 Administrator supergroup 13 2018-04-27 12:07 /spark/action/part-r-00000 -rw-r--r-- 3 Administrator supergroup 11 2018-04-27 12:07 /spark/action/part-r-00001 [root@WGH ~]$ hdfs dfs -text /spark/action/part-r-00000 18/04/27 12:08:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable hello 3 me 1 [root@WGH ~]$ hdfs dfs -text /spark/action/part-r-00001 18/04/27 12:08:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable you 1 he 1 ~~~ ## Actions 下面的表格列了 Spark 支持的一些常用 actions。詳細內容請參閱 RDD API 文檔([Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD), [Java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaRDD.html), [Python](https://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html)) 和 PairRDDFunctions 文檔([Scala](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions), [Java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html))。 **Action 動作算子**: | 動作算子 | 含義 | | --------------------------------------- | ------------------------------------------------------------ | | reduce(func) | 通過 func 函數聚集 RDD 中的所有元素,這個功能必須是可交換且可并聯的 | | collect() | 在驅動程序中,以數組的形式返回數據集的所有元素 | | count() | 返回 RDD 的元素個數 | | first() | 返回 RDD 的第一個元素 (類似于 take(1)) | | take(n) | 返回一個由數據集的前 n 個元素組成的數組 | | takeSample(withReplacement,num, [seed]) | 返回一個數組,該數組由從數據集中隨機采樣的 num 個元素組成,可以選擇是否用隨機數替換不足的部分,seed 用于指定隨機數生成器種子 | | takeOrdered(n, [ordering]) | 返回自然順序或者自定義順序的前 n 個元素 | | **saveAsTextFile**(path) | 將數據集的元素以 textfile 的形式保存到 HDFS 文件系統或者其他支持的文件系統,對于每個元素,Spark 將會調用 toString 方法,將它裝換為文件中的文本 | | **saveAsSequenceFile**(path) | 將數據集中的元素以 Hadoop sequencefile 的格式保存到指定的目錄下,可以使 HDFS 或者其他 Hadoop 支持的文件系統 | | saveAsObjectFile(path) | 將數據集的元素,以 Java 序列化的方式保存到指定的目錄下 | | **countByKey**() | 針對 (K,V) 類型的 RDD,返回一個 (K,Int) 的 map,表示每一個 key 對應的元素個數 | | foreach(func) | 在數據集的每一個元素上,運行函數 func 進行更新 | | **foreachPartition**(func) | 在數據集的每一個分區上,運行函數 func | **統計操作**: | 算子 | 含義 | | -------------- | -------------------------- | | count | 個數 | | mean | 均值 | | sum | 求和 | | max | 最大值 | | min | 最小值 | | variance | 方差 | | sampleVariance | 從采樣中計算方差 | | stdev | 標準差: 衡量數據的離散程度 | | sampleStdev | 采樣的標準差 | | stats | 查看統計結果 |
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看