<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                [TOC] ## **什么是Spark?** Spark是**用于大規模數據處理的,基于內存計算的統一分析引擎**。Spark借鑒MapReduce發展而來,保留了其分布式并行計算的優點并改進了其明顯的缺陷。讓中間數據存儲在內存中提高了運行速度,并提供豐富的API提高了開發速度。Spark可以計算:**結構化、半結構化、非結構化等各種類型的數據結構**,支持使用Python、Java、Scala、R以及SQL語言去開發數據計算程序。 - Spark是Apache基金會旗下的頂級開源項目,用于對海量數據進行大規模分布式計算。 ## **什么是PySpark** Spark安裝目錄里面的bin/pyspark 程序,提供一個Python解釋器執行環境來運行Spark任務。我們現在說的PySpark, 是Spark官方提供的一個Python類庫,內置了完全的Spark API, 可以通過PySpark類庫來編寫Spark應用程序,并將其提交到Spark集群中運行 - PySpark是Spark的Python實現,是Spark為Python開發者提供的編程入口,用于以Python代碼完成Spark任務的開發 - PySpark不僅可以作為Python第三方庫使用,也可以將程序提交的Spark集群環境中,調度大規模集群進行執行。 ## **PySpark安裝** ~~~ pip install pyspark ~~~ ## **PySpark使用** ~~~python # 導包 from pyspark import SparkConf, SparkContext # 創建SparkConf對象 conf = SparkConf().setMaster("local[*]").setAppName("demo2") # 基于SparkConf對象創建SparkContext對象 sc = SparkContext(conf=conf) # 執行邏輯代碼 # 停止SparkContext對象運行(停止PySpark程序) sc.stop() ~~~ :-: ![](https://img.kancloud.cn/12/d5/12d56463e5dd5479d7f82fb14af876ef_1713x250.png) :-: ![](https://img.kancloud.cn/8e/d8/8ed8115f9b1c66b6be0efbb0347ac8b8_1068x653.png) - 通過SparkContext對象,完成數據輸入 - 輸入數據后得到RDD對象,對RDD對象進行迭代計算 - 最終通過RDD對象的成員方法,完成數據輸出工作 ### **RDD對象** **RDD全稱**:彈性分布式數據集(Resilient Distributed Datasets) 通過 SparkContext 對象的 `parallelize `成員方法,將`list`、`tuple`、`set`、`dict`、`str`轉換為PySpark的RDD對象 ~~~python from pyspark import SparkConf, SparkContext # 創建SparkConf對象 conf = SparkConf().setMaster("local[*]").setSparkHome("test_spark") # 基于SparkConf類對象創建SparkContext對象 sc = SparkContext(conf=conf) # 通過parallelize方法將python對象加載到spark內,成為RDD對象 rdd1 = sc.parallelize([1, 2, 3, 4, 5]) rdd2 = sc.parallelize((1, 2, 3, 4, 5)) rdd3 = sc.parallelize("方式腐惡哦吼") rdd4 = sc.parallelize({1, 2, 3, 4, 5}) rdd5 = sc.parallelize({"key1": "value1", "key2": "value2", }) # # 查看RDD中的內容 print(rdd1.collect()) print(rdd2.collect()) print(rdd3.collect()) print(rdd4.collect()) print(rdd5.collect()) ~~~ ## **算子** ### **數據計算類** #### **Map** - 功能:map算子,是將RDD的數據一條條處理 處理的邏輯基于map算子中接收的處理函數 ),返回新的RDD - 語法: ![](https://img.kancloud.cn/f6/ca/f6ca69dc1fd224e42dafbcfbbfdb331a_1151x361.png) - 代碼: ~~~ rdd = sc.parallelize([1, 2, 3, 4, 5]) rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 5) print(rdd2.collect()) # 結果: # [15, 25, 35, 45, 55] ~~~ #### **flatMap** - 功能:對rdd執行map操作,然后進行**解除嵌套**操作 ![](https://img.kancloud.cn/0b/fe/0bfe704515d10d63eb27a51764b469e2_572x189.png) - 代碼 ~~~ rdd = sc.parallelize(["fsfes gsg 45", "56 f omo", "65 dsfmles 5", "dsads 56 dd"]) rdd2 = rdd.flatMap(lambda x: x.split(" ")) print(rdd2.collect()) # 結果: # ['fsfes', 'gsg', '45', '56', 'f', 'omo', '65', 'dsfmles', '5', 'dsads', '56', 'dd'] ~~~ #### **reducrByKey** - 功能:針對key-value型RDD,自動按照key分組,然后根據你提供的聚合邏輯,完成**組內數據(value)**的聚合操作。 - 用法: ![](https://img.kancloud.cn/0f/dc/0fdc013955413dfbfe5efaeba0d5b2e9_761x107.png) ![](https://img.kancloud.cn/c5/ac/c5ac9e209608b3ac8490c75b605e8ece_854x433.png) - 代碼: ~~~ rdd = sc.parallelize([('男', 6), ('女', 98), ('男', 87), ('男', 99), ('女', 78), ('女', 66)]) rdd2 = rdd.reduceByKey(lambda a, b: a + b) print(rdd2.collect()) # 結果: # [('男', 192), ('女', 242)] ~~~ #### **Filter** - 功能:過濾想要的數據進行保留 - 用法: ~~~ rdd.filter(func) # func:(T) -> bool 傳入一個隨機類型的參數,返回值必須是 True 或 False ~~~ - 用法: ~~~ rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]) rdd2 = rdd.filter(lambda num: num % 2 == 0) print(rdd2.collect()) # 結果: # [2, 4, 6] ~~~ #### **Distinct** - 功能:對RDD數據去重 - 代碼: ~~~ rdd = sc.parallelize([1, 2, 3, 2, 3, 4, 4, 5, 6, 7]) rdd2 = rdd.distinct() print(rdd2.collect()) # 結果: # [1, 2, 3, 4, 5, 6, 7] ~~~ ![](https://img.kancloud.cn/29/6e/296ef1907e97b5f8adf145bd232344a8_859x226.png) #### **sortBy** - 功能:對RDD數據排序 - 用法: ~~~ rdd.sortBy(func, ascending = False, numPartitions = 1) # func: (T) -> U:告知安裝RDD的哪個數據進行排序,例如:lambda x: x[1] 表示按照rdd的第二個元素進行排序 # ascending:True 升序、False 降序。默認 True # numPartitions:用多少分區排序 ~~~ - 代碼: ~~~ rdd = sc.parallelize([2, 5, 5, 54, 25, 85, 32]) rdd2 = rdd.sortBy(lambda x: x) print(rdd2.collect()) # 結果: # [2, 5, 5, 25, 32, 54, 85] ~~~ ### **數據輸出類** #### **Collect** - 功能:將RDD各個分區內的數據,統一收集到Driver中,形成一個List對象 - 用法: ~~~ rdd.collect() ~~~ #### **Reduce** - 功能:對RDD數據集按照傳入的邏輯進行聚合 - 代碼: ~~~ rdd = sc.parallelize([2, 5, 5, 54, 25, 85, 32]) rdd2 = rdd.reduce(lambda a, b: a + b) print(rdd2) # 結果 # 208 ~~~ #### **Take** - 功能:去RDD數據集中的前N個元素,組合成List返回 - 代碼: ~~~ rdd = sc.parallelize([2, 5, 5, 54, 25, 85, 32]) rdd2 = rdd.take(3) print(rdd2) # 結果 # [2, 5, 5] ~~~ #### **Count** - 功能:統計RDD共有多少條數據,返回值一個數字 - 代碼: ~~~ rdd = sc.parallelize([2, 5, 5, 54, 25, 85, 32]) rdd2 = rdd.count() print(rdd2) # 結果 # 7 ~~~ #### **saveAsTextFile** - 功能:將RDD數據寫入**文本文件**中 - 代碼: ~~~ rdd = sc.parallelize([2, 5, 5, 54, 25, 85, 32], numSlices=1) # 設置使用的分區為1() rdd.saveAsTextFile("num_list") ~~~ 結果: ![](https://img.kancloud.cn/36/07/3607d8bc49e943f0ce6c50ea19ac6f93_570x182.png) ![](https://img.kancloud.cn/64/a3/64a3078121c625dde8841e09ba7e33ba_301x314.png) > **注意**: > 調用保存文件的算子,需要配置**Hadoop**依賴 > - 下載Hadoop安裝包 > http://archive.apache.org/dist/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz > - 解壓到電腦任意位置 > - 在Python代碼中使用os模塊配置:os.environ[‘HADOOP_HOME’] = ‘HADOOP解壓文件夾路徑’ > - 下載winutils.exe,并放入Hadoop解壓文件夾的bin目錄內 > https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe > - 下載hadoop.dll,并放入:C:/Windows/System32 文件夾內 > https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dll
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看