[TOC]
## **什么是Spark?**
Spark是**用于大規模數據處理的,基于內存計算的統一分析引擎**。Spark借鑒MapReduce發展而來,保留了其分布式并行計算的優點并改進了其明顯的缺陷。讓中間數據存儲在內存中提高了運行速度,并提供豐富的API提高了開發速度。Spark可以計算:**結構化、半結構化、非結構化等各種類型的數據結構**,支持使用Python、Java、Scala、R以及SQL語言去開發數據計算程序。
- Spark是Apache基金會旗下的頂級開源項目,用于對海量數據進行大規模分布式計算。
## **什么是PySpark**
Spark安裝目錄里面的bin/pyspark 程序,提供一個Python解釋器執行環境來運行Spark任務。我們現在說的PySpark, 是Spark官方提供的一個Python類庫,內置了完全的Spark API, 可以通過PySpark類庫來編寫Spark應用程序,并將其提交到Spark集群中運行
- PySpark是Spark的Python實現,是Spark為Python開發者提供的編程入口,用于以Python代碼完成Spark任務的開發
- PySpark不僅可以作為Python第三方庫使用,也可以將程序提交的Spark集群環境中,調度大規模集群進行執行。
## **PySpark安裝**
~~~
pip install pyspark
~~~
## **PySpark使用**
~~~python
# 導包
from pyspark import SparkConf, SparkContext
# 創建SparkConf對象
conf = SparkConf().setMaster("local[*]").setAppName("demo2")
# 基于SparkConf對象創建SparkContext對象
sc = SparkContext(conf=conf)
# 執行邏輯代碼
# 停止SparkContext對象運行(停止PySpark程序)
sc.stop()
~~~
:-: 
:-: 
- 通過SparkContext對象,完成數據輸入
- 輸入數據后得到RDD對象,對RDD對象進行迭代計算
- 最終通過RDD對象的成員方法,完成數據輸出工作
### **RDD對象**
**RDD全稱**:彈性分布式數據集(Resilient Distributed Datasets)
通過 SparkContext 對象的 `parallelize `成員方法,將`list`、`tuple`、`set`、`dict`、`str`轉換為PySpark的RDD對象
~~~python
from pyspark import SparkConf, SparkContext
# 創建SparkConf對象
conf = SparkConf().setMaster("local[*]").setSparkHome("test_spark")
# 基于SparkConf類對象創建SparkContext對象
sc = SparkContext(conf=conf)
# 通過parallelize方法將python對象加載到spark內,成為RDD對象
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize((1, 2, 3, 4, 5))
rdd3 = sc.parallelize("方式腐惡哦吼")
rdd4 = sc.parallelize({1, 2, 3, 4, 5})
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2", })
# # 查看RDD中的內容
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())
~~~
## **算子**
### **數據計算類**
#### **Map**
- 功能:map算子,是將RDD的數據一條條處理 處理的邏輯基于map算子中接收的處理函數 ),返回新的RDD
- 語法:

- 代碼:
~~~
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)
print(rdd2.collect())
# 結果:
# [15, 25, 35, 45, 55]
~~~
#### **flatMap**
- 功能:對rdd執行map操作,然后進行**解除嵌套**操作

- 代碼
~~~
rdd = sc.parallelize(["fsfes gsg 45", "56 f omo", "65 dsfmles 5", "dsads 56 dd"])
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(rdd2.collect())
# 結果:
# ['fsfes', 'gsg', '45', '56', 'f', 'omo', '65', 'dsfmles', '5', 'dsads', '56', 'dd']
~~~
#### **reducrByKey**
- 功能:針對key-value型RDD,自動按照key分組,然后根據你提供的聚合邏輯,完成**組內數據(value)**的聚合操作。
- 用法:


- 代碼:
~~~
rdd = sc.parallelize([('男', 6), ('女', 98), ('男', 87), ('男', 99), ('女', 78), ('女', 66)])
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd2.collect())
# 結果:
# [('男', 192), ('女', 242)]
~~~
#### **Filter**
- 功能:過濾想要的數據進行保留
- 用法:
~~~
rdd.filter(func)
# func:(T) -> bool 傳入一個隨機類型的參數,返回值必須是 True 或 False
~~~
- 用法:
~~~
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7])
rdd2 = rdd.filter(lambda num: num % 2 == 0)
print(rdd2.collect())
# 結果:
# [2, 4, 6]
~~~
#### **Distinct**
- 功能:對RDD數據去重
- 代碼:
~~~
rdd = sc.parallelize([1, 2, 3, 2, 3, 4, 4, 5, 6, 7])
rdd2 = rdd.distinct()
print(rdd2.collect())
# 結果:
# [1, 2, 3, 4, 5, 6, 7]
~~~

#### **sortBy**
- 功能:對RDD數據排序
- 用法:
~~~
rdd.sortBy(func, ascending = False, numPartitions = 1)
# func: (T) -> U:告知安裝RDD的哪個數據進行排序,例如:lambda x: x[1] 表示按照rdd的第二個元素進行排序
# ascending:True 升序、False 降序。默認 True
# numPartitions:用多少分區排序
~~~
- 代碼:
~~~
rdd = sc.parallelize([2, 5, 5, 54, 25, 85, 32])
rdd2 = rdd.sortBy(lambda x: x)
print(rdd2.collect())
# 結果:
# [2, 5, 5, 25, 32, 54, 85]
~~~
### **數據輸出類**
#### **Collect**
- 功能:將RDD各個分區內的數據,統一收集到Driver中,形成一個List對象
- 用法:
~~~
rdd.collect()
~~~
#### **Reduce**
- 功能:對RDD數據集按照傳入的邏輯進行聚合
- 代碼:
~~~
rdd = sc.parallelize([2, 5, 5, 54, 25, 85, 32])
rdd2 = rdd.reduce(lambda a, b: a + b)
print(rdd2)
# 結果
# 208
~~~
#### **Take**
- 功能:去RDD數據集中的前N個元素,組合成List返回
- 代碼:
~~~
rdd = sc.parallelize([2, 5, 5, 54, 25, 85, 32])
rdd2 = rdd.take(3)
print(rdd2)
# 結果
# [2, 5, 5]
~~~
#### **Count**
- 功能:統計RDD共有多少條數據,返回值一個數字
- 代碼:
~~~
rdd = sc.parallelize([2, 5, 5, 54, 25, 85, 32])
rdd2 = rdd.count()
print(rdd2)
# 結果
# 7
~~~
#### **saveAsTextFile**
- 功能:將RDD數據寫入**文本文件**中
- 代碼:
~~~
rdd = sc.parallelize([2, 5, 5, 54, 25, 85, 32], numSlices=1) # 設置使用的分區為1()
rdd.saveAsTextFile("num_list")
~~~
結果:


> **注意**:
> 調用保存文件的算子,需要配置**Hadoop**依賴
> - 下載Hadoop安裝包
> http://archive.apache.org/dist/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz
> - 解壓到電腦任意位置
> - 在Python代碼中使用os模塊配置:os.environ[‘HADOOP_HOME’] = ‘HADOOP解壓文件夾路徑’
> - 下載winutils.exe,并放入Hadoop解壓文件夾的bin目錄內
> https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe
> - 下載hadoop.dll,并放入:C:/Windows/System32 文件夾內
> https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dll
- PHP
- PHP 核心架構
- PHP 生命周期
- PHP-FPM 詳解
- PHP-FPM 配置優化
- PHP 命名空間和自動加載
- PHP 運行模式
- PHP 的 Buffer(緩沖區)
- php.ini 配置文件參數優化
- 常見面試題
- 常用函數
- 幾種排序算法
- PHP - 框架
- Laravel
- Laravel 生命周期
- ThinkPHP
- MySQL
- 常見問題
- MySQL 索引
- 事務
- 鎖機制
- Explain 使用分析
- MySQL 高性能優化規范
- UNION 與 UNION ALL
- MySQL報錯:sql_mode=only_full_group_by
- MySQL 默認的 sql_mode 詳解
- 正則表達式
- Redis
- Redis 知識
- 持久化
- 主從復制、哨兵、集群
- Redis 緩存擊穿、穿透、雪崩
- Redis 分布式鎖
- RedisBloom
- 網絡
- 計算機網絡模型
- TCP
- UDP
- HTTP
- HTTPS
- WebSocket
- 常見幾種網絡攻擊方式
- Nginx
- 狀態碼
- 配置文件
- Nginx 代理+負載均衡
- Nginx 緩存
- Nginx 優化
- Nginx 配置 SSL 證書
- Linux
- 常用命令
- Vim 常用操作命令
- Supervisor 進程管理
- CentOS與Ubuntu系統區別
- Java
- 消息隊列
- 運維
- RAID 磁盤陣列
- 邏輯分區管理 LVM
- 業務
- 標準通信接口設計
- 業務邏輯開發套路的三板斧
- 微信小程序登錄流程
- 7種Web實時消息推送方案
- 用戶簽到
- 用戶注冊-短信驗證碼
- SQLServer 刪除同一天用戶重復簽到
- 軟件研發完整流程
- 前端
- Redux
- 其他
- 百度云盤大文件下載
- 日常報錯記錄
- GIT
- SSL certificate problem: unable to get local issuer certificate
- NPM
- reason: connect ECONNREFUSED 127.0.0.1:31181
- SVN
- SVN客戶端無法連接SVN服務器,主機積極拒絕
- Python
- 基礎
- pyecharts圖表
- 對象
- 數據庫
- PySpark
- 多線程
- 正則
- Hadoop
- 概述
- HDFS