<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                **1. RDD創建方式** 使用集合創建RDD、使用外部存儲創建 RDD。 使用外部存儲創建 RDD方式包括:本地文件系統、HDFS、HBase、Cassandra 等。 [TOC] # 1. 使用集合創建RDD 通過集合創建 RDD 有兩種方法:parallelize 與 makeRDD。 <br/> 兩種方法基本上是一樣的,不同的是 makeRDD 還有一個重載方法,該重載方法會分配一系列本地 Scala 集合形成一個 RDD,<ins>可以為每個集合對象創建一個分區,并指定優先位置便于在運行中優化調度</ins>。<br/> 使用本地集合創建 RDD 的問題在于:由于這種方法需要用到一臺機器中集合的全部數據,所以這種方式在測試和原型構造之外很少使用。<br/> 示例代碼: ```scala package spark.core import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} /** * 通過集合創建RDD * Date: 2020/1/4 */ object CreateRDDByCollection { def main(args: Array[String]): Unit = { // 程序入口方式1 val conf: SparkConf = new SparkConf().setMaster("local[4]") .setAppName(this.getClass.getName) val sc: SparkContext = SparkContext.getOrCreate(conf) /* // 程序入口方式2 val spark = SparkSession.builder.master("local[2]") .appName("appName") .getOrCreate() val sc:SparkContext = spark.sparkContext */ // 通過集合創建RDD val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6)) // 獲取元素個數 println(rdd1.count()) // 6 // 獲取分區數 println(rdd1.getNumPartitions) // 4 // 1、Spark默認會根據集群的情況來設置分區的數量,也可以通過parallelize()第二參數來指定 // 2、Spark會為每一個分區運行一個任務進行處理 val rdd2: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6), 5) println(rdd2.count()) // 6 println(rdd2.getNumPartitions) // 5 val rdd3: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6)) println(rdd3.count()) // 6 println(rdd3.getNumPartitions) // 4 } } ``` <br/> # 2. 使用外部存儲創建 RDD 任何 Hadoop 支持的存儲類型都可以用于創建 RDD,包括:本地文件系統、HDFS、HBase、Cassandra 等。 <br/> **1. 加載本地文件或hdfs創建RDD** 對于本地文件、HDFS 及 Hadoop 支持的文件系統使用 textFile 創建 RDD,文件中每一行作為 RDD 中的一條數據。 ```scala package spark.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 通過本地文件或hdfs創建RDD * Date: 2021/1/4 */ object CreateRDDByFile { def main(args: Array[String]): Unit = { // 編程入口方式1 val conf:SparkConf = new SparkConf() .setMaster("local[4]") .setAppName(this.getClass.getName) val sc:SparkContext = new SparkContext(conf) /* // 編程入口方式2 val spark = SparkSession.builder.master("local[2]") .appName("appName") .getOrCreate() val sc:SparkContext = spark.sparkContext */ // 通過本地文件創建RDD val rdd1:RDD[String] = sc.textFile("file:///E:\\hadoop\\input\\hello.txt") rdd1.foreach(println) // 會按行輸出hello.txt文件的內容 // 通過hdfs創建RDD val rdd2:RDD[String] = sc.textFile("hdfs://hadoop101:9000/spark/hello.txt") rdd2.foreach(println) } } ``` ```scala // 文件中的一行文本作為RDD的一個元素 val distFile=sc.textFile("file:///home/hadoop/data/hello.txt") // 支持目錄、壓縮文件以及通配符 sc.textFile("/my/directory") sc.textFile("/my/directory/*.txt") sc.textFile("/my/directory/*.gz") ``` 1、Spark默認訪問的是HDFS; 2、Spark默認為HDFS文件的每一個數據塊創建一個分區,也可以通過 textFile() 第二個參數指定,但只能比數據塊數量多; 3、默認分區數量情況下不能超過 2。原因如下圖所示。 ![](https://img.kancloud.cn/a9/11/a91149edd30fefa0569b06b815586683_999x264.png) 其中,totalCoreCount 是一個來跟蹤集群中的核心總數原子變量。 <br/> **2. 通過PairRDD創建RDD** SparkContext.wholeTextFiles():可以針對一個目錄中的大量小文件返回`<filename,fileContent>`作為PairRDD。 * 普通RDD:org.apache.spark.rdd.RDD[data_type] * PairRDD:org.apache.spark.rdd.RDD[(key_type, value_type)] <br/> Spark 可以為包含鍵值對類型的 RDD 提供了一些專有的操作,比如:reduceByKey()、groupByKey()等。 也可以通過鍵值對集合創建PairRDD:sc.parallelize(List((1,2),(1,3))) <br/> 示例代碼: ```scala package spark.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 通過PairRDD方式創建RDD * Date: 2021/1/4 */ object CreateRDDByPairRDD { def main(args: Array[String]): Unit = { // 編程入口方式1 val conf: SparkConf = new SparkConf().setMaster("local[4]") .setAppName(this.getClass.getName) val sc: SparkContext = SparkContext.getOrCreate(conf) /* // 編程入口方式2 val spark = SparkSession.builder.master("local[2]") .appName("appName") .getOrCreate() val sc:SparkContext = spark.sparkContext */ // 通過PairRDD創建RDD val rdd:RDD[(String, String)] = sc.wholeTextFiles("hdfs://hadoop101:9000/spark/hello.txt") rdd.foreach(x=>println(s"文件名:${x._1}, 文件內容:${x._2}")) } } ``` <br/> **3. 其他創建RDD的方法** ```scala SparkContext.sequenceFile[K,V]() 提供對Hadoop SequenceFile的讀寫支持; SparkContext.hadoopRDD()、newAPIHadoopRDD()從Hadoop接口API創建RDD; SparkContext.objectFile() 是RDD.saveAsObjectFile()的逆操作; ``` 從已有 RDD 創建新的 RDD 通過轉換算子實現。通常涉及:map,map, filter, count, distinct, flatMap 等。這些操作與 Scala 集合操作類似。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看