# **任務:**
#!/bin/bash
workPath=/data/bigdata/job/callLog-export
dataDate=`date ?+"%Y-%m-%d" -d ?"-1 days"`
if \[ -n "$1" \] ;then
dataDate=$1
fi
readFileName=/data/data\_center/cus/call\_log\_record/call\_log\_record-${dataDate}\*
resultDataDir=/tmp/sunzm/callLog-tmp/${dataDate}/
companyId=0302fbb6bf6b4fb1bad1416ab641202c
echo "處理的數據日期: $collechDate"
echo "讀取的文件為: ${readFileName}"
echo "處理結果保存路徑: ${resultDataDir}"
echo "處理的公司Id為: ${companyId}"
/var/lib/hadoop-hdfs/softwares/spark/bin/spark-submit \\
\-class com.sobot.offlineJobProcess.handler.temporary.job.hdfstocsv.HDFSDataToCSVJob?\\
\-master spark://192.168.30.178:7077?\\
\-total-executor-cores 18 \\
\-executor-memory 4G?\\
\-executor-cores 3?\\
\-driver-cores 3 \\
\-driver-memory 4G?\\
\-conf spark.sql.shuffle.partitions=16 \\
\-conf spark.default.parallelism=16 \\
${workPath}/offlineJobProcess-es6-1.0-SNAPSHOT.jar \\
\-readFileName ${readFileName} \\
\-resultFileDir ${resultDataDir} \\
\-companyId ${companyId} \\
* \-companyIdFieldName${companyIdFieldName} \\
\-fields startTime,caller,voiceAliyunUrl \\
\-headers 通話開始時間,主叫號碼,錄音地址 \\
\-whereCondition callFlag=1 \\
\-selectExpr "date\_format(cast(startTime/1000 as timestamp), 'yyyy-MM-dd HH:mm:ss') AS startTime,caller,voiceAliyunUrl" \\
> ${workPath}/logs/callLog-filter.log
# **Spark任務提交:**
**StandaLone與Yarn的任務執行情況不同在于資源分配管理者不同,獨立模式下是有Master負責管理,yarn模式是ResourceManager負責調度。**
1、當集群啟動后,Worker會向Master匯報資源,然后Master就會掌握Worker的集群信息。
2、當啟動集群后,NodeManager會向RsourceManager匯報資源,而RM就掌握了集群的資源。
任務提交模式有一種是client客戶端,另一種是cluster集群方式,由shell命令指定模式。
## **Client:**
1.當在客戶端提交SparkApplication時,Driver會在客戶端啟動,客戶端會向RM申請啟動ApplicationMaster。
2.RM 收到請求會向隨機找一個滿足資源的NM啟動Application Master,AM啟動后,會向RM申請資源用于啟動executor,RM會返回一批NM節點給AM,AM收到返回結果后,會真正的向NM中去啟動executor,每個executor中會有線程池。
3.executor啟動后會向Driver注冊,Driver會向executor發送task,并且監控task執行,收回task執行的結果。
它的問題在于:因為Driver運行在本地,Driver會與yarn集群中的Executor進行大量的通信,會造成客戶機網卡流量的大量增加,可以在客戶端看到task的執行和結果.Yarn-client模式同樣是適用于測試。
## **Cluster:**
1.當客戶端提交Spark Appliction。會向RM申請啟動ApplictaionMaster,而RM會隨機找到一個滿足資源的NM去啟動AM。
2.當AM啟動之后它負責任務調度,所以這里就不啟動Driver,而AM就相當于Driver一樣的功能存在。
3.AM啟動后會向RM申請啟動Executor,每個Executor會由線程池,RM會返回一批滿足資源的NM節點。
4.AM接收到返回結果會找到相應的NM,啟動Executor,executor啟動后會向AM注冊,而AM會將task發送到executor去執行,并且監控task,回收task'處理的結果。
它的問題在于:Yarn-Cluster主要用于生產環境中,因為Driver運行在Yarn集群中某一臺nodeManager中,每次提交任務的Driver所在的機器都是隨機的,不會產生某一臺機器網卡流量激增的現象,缺點是任務提交后不能看到日志。只能通過yarn查看日志。
# **spark算子相關**
## **UpdateStateByKey(基于磁盤讀寫)**
UpdateStateBykey會統計全局的key的狀態,不管有沒有數據輸入,它會在每一個批次間隔返回之前的key的狀態。updateStateBykey會對已存在的key進行state的狀態更新,同時還會對每個新出現的key執行相同的更新函數操作。如果通過更新函數對state更新后返回來為none,此時刻key對應的state狀態會刪除(state可以是任意類型的數據結構)。
### **適用場景:**
UpdataStateBykey可以用來統計歷史數據,每次輸出所有的key值。列如統計不同時間段用戶平均消費金額,消費次數,消費總額,網站的不同時間段的返回量等指標。
### **適用實例條件:**
1. 首先會以DStream中的數據進行按key做reduce操作,然后再對各個批次的數據進行累加。
2. updataStateByKey要求必須設置checkpoint點(設置中間結果文件夾)
3. updataStateByKey方法中updataFunc就要傳入的參數,Seq\[V\]表示當前key對應的所有值,Option\[S\]是當前key的歷史狀態,返回的是新的封裝的數據。
## **MapWithState(基于磁盤存儲+緩存)**
mapWithState也是用于對于全局統計key的狀態,但是它如果沒有數據輸入,便不會返回之前的key的狀態,類型于增量的感覺。使用場景mapWithState可以用于一些實時性較高,延遲較少的一些場景,例如你在某寶上下單買了個東西,付款之后返回你賬戶里余額信息。
### **適用實例條件:**
1. 如果有初始化的值得需要,可以使用initialState(RDD)來初始化key的值
2. 還可以指定timeout函數,該函數的作用是,如果一個key超過timeout設定的時間沒有更新值,那么這個key將會失效。這個控制需要在fun中實現,必須使用state.isTimingOut()來判斷失效的key值。如果在失效時間之后,這個key又有新的值了,則會重新計算。如果沒有使用isTimingOut,則會報錯。3.?checkpoint不會必須的
### **區別:**
updataeStateByKey可以在指定的批次間隔內返回之前的全部歷史數據,包括新增的,改變的和沒有改變的。由于updateStateByKey在使用的時候一定要做checkpoint,當數據量過大的時候,checkpoint會占據龐大的數據量,會影響性能,效率不高。
mapWithState只返回變化后的key的值,這樣做的好處是,我們可以只關心那些已經發生的變化的key,對于沒有數據輸入,則不會返回那些沒有變化的key 的數據。這樣的話,即使數據量很大,checkpint也不會updateBykey那樣,占用太多的存儲,效率比較高(再生產環境中建議使用這個)。
詳細使用:https://www.jianshu.com/p/a54b142067e5
## **Map和MapPartition的區別:**
map是對RDD的每一個元素使用一個方法操作,mapPartitions是對每個partition的迭代器使用一個方法操作。
## **MapPartitions的優點:**
使用MapPartitions操作之后,一個task僅僅會執行一次function,function一次接收所有的partition數據。只要執行一次就可以了,性能比較高。通常體現在map過程中需要頻繁創建額外的對象(例如將rdd中的數據通過jdbc寫入數據庫,map需要為每個元素創建一個鏈接而mapPartition為每個partition創建一個鏈接),則mapPartitions效率比map高的多。SparkSql或DataFrame默認會對程序進行mapPartition的優化。
## **MapPartitions的缺點:**
如果是普通的map操作,一次function的執行就處理一條數據,可以將已經處理完的數據從內存里面釋放掉。所以說普通的map操作通常不會導致內存的OOM異常。 但是MapPartitions操作,對于大量數據來說,如果直接將迭代器中數據取出來放內存,可能就OOM(內存溢出)。
## **Foreach和ForeachPartition的區別:**
### **foreachPartition:**
foreachPartition是spark-core的action算子,foreachPartition是對每個partition中的iterator分別處理,通過將iterator傳入function對進行數據的處理,也就是說在foreachPartition中函數處理的是分區迭代器,而非具體的數據,源碼中的注釋是:Applies a function func to each parition of this RDD.(將函數func應用于此RDD的每個分區)
### **foreach:**
foreach也是spark-core的action算子,與foreachPartition類似的是,foreach也是對每個partition中的iterator分別處理,通過對每個iterator迭代獲取數據傳給function進行數據的處理,也就是說在foreach中函數處理的是具體的數據,源碼中的注釋是:Applies a function fun to all elements of this RDD.(將函數func用于此RDD的所有元素).
### **foreachRDD與上面兩個的區別:**
foreachRDD是sparkStreaming的OutputOperation算子。但是foreachRDD并不會觸發立即處理,必須在碰到sparkcore的foreach或者foreachPartition算子后,才會觸發action動作。同時要注意,function的應用在的driver端進行,而不是Executor端進行。
## **GoupByKey和ReduceByKey的區別:**
 
### GroupByKey:只是將鍵相同的值給歸納到一個序列,沒有其它函數操作。不同分區數據直接shuffle到新分區在聚合,不會先再shuffle前聚合,造成shuffle過程IO壓力更大
ProcessRDD.GroupByKey()
### ReduceByKey:將鍵相同的值使用同一個函數聚合操作,先再shuffle前使用函數局部聚合,再shuffle到不同分區后繼續使用函數聚合
ProcessRDD.ReduceByKey(“函數(U,U) => U”)
aggregateByKey:將鍵相同的值在兩端使用不同函數操作,一端是shuffle前局部聚合使用的一個函數,另一端是shuffle后聚合使用的另一個函數,相對ReduceBykey可以提前指定初始值,并依據初始值類型返回值
### ProcessRDD.aggregateByKey("初始值K類型")(“函數1(K,U) => K”, “函數2(K,K) => K”)
**這三種都可以指定shuffle時的分區數,假如分區數不夠,實際會出現不同鍵在同一物理分區,但spark處理時還是認為不同鍵是屬于不同區的。既aggregateByKey初始值K是會給每個鍵初始時加上的。**
# **SparkSQL相關:**
## **SQL 解析:**
SQL Query,需要經過詞法和語法解析,由字符串轉換為,樹形的抽象語法樹。
* 通過遍歷抽象語法樹生成未解析的邏輯語法樹(unresolved logic plan),對應SQL解析后的一種樹形結構,本身不包含任務數據信息。
* 需要經過一次遍歷之后,轉換成成包含解析后的邏輯算子樹(Analyzed LogicPlan),本身攜帶了各種信息。
* 最后經過優化后得到最終的邏輯語法樹(Optimized LogicPlan)。
不管解析被劃分為幾步,在Spark 執行環境中,都要轉化成RDD的調用代碼,才能被spark core所執行,示意圖如下:
## **創建視圖:**
createOrReplaceTempView?的作用是創建一個臨時的表?,?一旦創建這個表的會話關閉?,?這個表也會立馬消失?其他的SparkSession?不能共享應已經創建的臨時表createOrReplaceGlobalTempView創建一個全局的臨時表?,?這個表的生命周期是?整個Spark應用程序?,只要Spark?應用程序不關閉?,?那么.?這個臨時表依然是可以使用的?,并且這個表對其他的SparkSession共享(**要?global\_temp.‘tablename’ 使用**)
## **分組語句:**
GROUP BY : 使用時不像MySQL分組,MySQL可以返回不在group by條件中的列的第一條數據作為該列返回值,spark sql 和 hive sql 類似,不能這樣隨機返回,能返回聚合字段和聚合函數,但可以通過指定Frist(col)或者 Last(col)。
## **窗口函數:**
function OVER (PARITION BY … ORDER BY … FRAME\_TYPE ??BETWEEN … AND …)
function :對窗口內所有行都處理的函數
PARITION BY : 依據指定列進行分窗處理
ORDER BY : 窗口內依據指定字段排序
FRAME\_TYPE :FRAME是當前分區的一個子集,子句用來定義子集的規則,通常用來作為滑動窗口使用。主要用來控制每行數據在應用窗口函數時,這個窗口函數的作用范圍。
(
FRAME\_TYPE rows 指定函數基于當前行的窗口范圍
rows between …T… and …T…
T 如下:
unbounded preceding 前面所有行
unbounded following 后面所有行
current row 當前行
n following 后面n行
n preceding 前面n行
)