<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                # **任務:** #!/bin/bash workPath=/data/bigdata/job/callLog-export dataDate=`date ?+"%Y-%m-%d" -d ?"-1 days"` if \[ -n "$1" \] ;then dataDate=$1 fi readFileName=/data/data\_center/cus/call\_log\_record/call\_log\_record-${dataDate}\* resultDataDir=/tmp/sunzm/callLog-tmp/${dataDate}/ companyId=0302fbb6bf6b4fb1bad1416ab641202c echo "處理的數據日期: $collechDate" echo "讀取的文件為: ${readFileName}" echo "處理結果保存路徑: ${resultDataDir}" echo "處理的公司Id為: ${companyId}" /var/lib/hadoop-hdfs/softwares/spark/bin/spark-submit \\ \-class com.sobot.offlineJobProcess.handler.temporary.job.hdfstocsv.HDFSDataToCSVJob?\\ \-master spark://192.168.30.178:7077?\\ \-total-executor-cores 18 \\ \-executor-memory 4G?\\ \-executor-cores 3?\\ \-driver-cores 3 \\ \-driver-memory 4G?\\ \-conf spark.sql.shuffle.partitions=16 \\ \-conf spark.default.parallelism=16 \\ ${workPath}/offlineJobProcess-es6-1.0-SNAPSHOT.jar \\ \-readFileName ${readFileName} \\ \-resultFileDir ${resultDataDir} \\ \-companyId ${companyId} \\ * \-companyIdFieldName${companyIdFieldName} \\ \-fields startTime,caller,voiceAliyunUrl \\ \-headers 通話開始時間,主叫號碼,錄音地址 \\ \-whereCondition callFlag=1 \\ \-selectExpr "date\_format(cast(startTime/1000 as timestamp), 'yyyy-MM-dd HH:mm:ss') AS startTime,caller,voiceAliyunUrl" \\ > ${workPath}/logs/callLog-filter.log # **Spark任務提交:** **StandaLone與Yarn的任務執行情況不同在于資源分配管理者不同,獨立模式下是有Master負責管理,yarn模式是ResourceManager負責調度。** 1、當集群啟動后,Worker會向Master匯報資源,然后Master就會掌握Worker的集群信息。 2、當啟動集群后,NodeManager會向RsourceManager匯報資源,而RM就掌握了集群的資源。 任務提交模式有一種是client客戶端,另一種是cluster集群方式,由shell命令指定模式。 ## **Client:** 1.當在客戶端提交SparkApplication時,Driver會在客戶端啟動,客戶端會向RM申請啟動ApplicationMaster。 2.RM 收到請求會向隨機找一個滿足資源的NM啟動Application Master,AM啟動后,會向RM申請資源用于啟動executor,RM會返回一批NM節點給AM,AM收到返回結果后,會真正的向NM中去啟動executor,每個executor中會有線程池。 3.executor啟動后會向Driver注冊,Driver會向executor發送task,并且監控task執行,收回task執行的結果。 它的問題在于:因為Driver運行在本地,Driver會與yarn集群中的Executor進行大量的通信,會造成客戶機網卡流量的大量增加,可以在客戶端看到task的執行和結果.Yarn-client模式同樣是適用于測試。 ## **Cluster:** 1.當客戶端提交Spark Appliction。會向RM申請啟動ApplictaionMaster,而RM會隨機找到一個滿足資源的NM去啟動AM。 2.當AM啟動之后它負責任務調度,所以這里就不啟動Driver,而AM就相當于Driver一樣的功能存在。 3.AM啟動后會向RM申請啟動Executor,每個Executor會由線程池,RM會返回一批滿足資源的NM節點。 4.AM接收到返回結果會找到相應的NM,啟動Executor,executor啟動后會向AM注冊,而AM會將task發送到executor去執行,并且監控task,回收task'處理的結果。 它的問題在于:Yarn-Cluster主要用于生產環境中,因為Driver運行在Yarn集群中某一臺nodeManager中,每次提交任務的Driver所在的機器都是隨機的,不會產生某一臺機器網卡流量激增的現象,缺點是任務提交后不能看到日志。只能通過yarn查看日志。 # **spark算子相關** ## **UpdateStateByKey(基于磁盤讀寫)** UpdateStateBykey會統計全局的key的狀態,不管有沒有數據輸入,它會在每一個批次間隔返回之前的key的狀態。updateStateBykey會對已存在的key進行state的狀態更新,同時還會對每個新出現的key執行相同的更新函數操作。如果通過更新函數對state更新后返回來為none,此時刻key對應的state狀態會刪除(state可以是任意類型的數據結構)。 ### **適用場景:** UpdataStateBykey可以用來統計歷史數據,每次輸出所有的key值。列如統計不同時間段用戶平均消費金額,消費次數,消費總額,網站的不同時間段的返回量等指標。 ### **適用實例條件:** 1. 首先會以DStream中的數據進行按key做reduce操作,然后再對各個批次的數據進行累加。 2. updataStateByKey要求必須設置checkpoint點(設置中間結果文件夾) 3. updataStateByKey方法中updataFunc就要傳入的參數,Seq\[V\]表示當前key對應的所有值,Option\[S\]是當前key的歷史狀態,返回的是新的封裝的數據。 ## **MapWithState(基于磁盤存儲+緩存)** mapWithState也是用于對于全局統計key的狀態,但是它如果沒有數據輸入,便不會返回之前的key的狀態,類型于增量的感覺。使用場景mapWithState可以用于一些實時性較高,延遲較少的一些場景,例如你在某寶上下單買了個東西,付款之后返回你賬戶里余額信息。 ### **適用實例條件:** 1. 如果有初始化的值得需要,可以使用initialState(RDD)來初始化key的值 2. 還可以指定timeout函數,該函數的作用是,如果一個key超過timeout設定的時間沒有更新值,那么這個key將會失效。這個控制需要在fun中實現,必須使用state.isTimingOut()來判斷失效的key值。如果在失效時間之后,這個key又有新的值了,則會重新計算。如果沒有使用isTimingOut,則會報錯。3.?checkpoint不會必須的 ### **區別:** updataeStateByKey可以在指定的批次間隔內返回之前的全部歷史數據,包括新增的,改變的和沒有改變的。由于updateStateByKey在使用的時候一定要做checkpoint,當數據量過大的時候,checkpoint會占據龐大的數據量,會影響性能,效率不高。 mapWithState只返回變化后的key的值,這樣做的好處是,我們可以只關心那些已經發生的變化的key,對于沒有數據輸入,則不會返回那些沒有變化的key 的數據。這樣的話,即使數據量很大,checkpint也不會updateBykey那樣,占用太多的存儲,效率比較高(再生產環境中建議使用這個)。 詳細使用:https://www.jianshu.com/p/a54b142067e5 ## **Map和MapPartition的區別:** map是對RDD的每一個元素使用一個方法操作,mapPartitions是對每個partition的迭代器使用一個方法操作。 ## **MapPartitions的優點:** 使用MapPartitions操作之后,一個task僅僅會執行一次function,function一次接收所有的partition數據。只要執行一次就可以了,性能比較高。通常體現在map過程中需要頻繁創建額外的對象(例如將rdd中的數據通過jdbc寫入數據庫,map需要為每個元素創建一個鏈接而mapPartition為每個partition創建一個鏈接),則mapPartitions效率比map高的多。SparkSql或DataFrame默認會對程序進行mapPartition的優化。 ## **MapPartitions的缺點:** 如果是普通的map操作,一次function的執行就處理一條數據,可以將已經處理完的數據從內存里面釋放掉。所以說普通的map操作通常不會導致內存的OOM異常。 但是MapPartitions操作,對于大量數據來說,如果直接將迭代器中數據取出來放內存,可能就OOM(內存溢出)。 ## **Foreach和ForeachPartition的區別:** ### **foreachPartition:** foreachPartition是spark-core的action算子,foreachPartition是對每個partition中的iterator分別處理,通過將iterator傳入function對進行數據的處理,也就是說在foreachPartition中函數處理的是分區迭代器,而非具體的數據,源碼中的注釋是:Applies a function func to each parition of this RDD.(將函數func應用于此RDD的每個分區) ### **foreach:** foreach也是spark-core的action算子,與foreachPartition類似的是,foreach也是對每個partition中的iterator分別處理,通過對每個iterator迭代獲取數據傳給function進行數據的處理,也就是說在foreach中函數處理的是具體的數據,源碼中的注釋是:Applies a function fun to all elements of this RDD.(將函數func用于此RDD的所有元素). ### **foreachRDD與上面兩個的區別:** foreachRDD是sparkStreaming的OutputOperation算子。但是foreachRDD并不會觸發立即處理,必須在碰到sparkcore的foreach或者foreachPartition算子后,才會觸發action動作。同時要注意,function的應用在的driver端進行,而不是Executor端進行。 ## **GoupByKey和ReduceByKey的區別:** ![](images/screenshot_1639970836485.png) ![](images/screenshot_1639970842720.png) ### GroupByKey:只是將鍵相同的值給歸納到一個序列,沒有其它函數操作。不同分區數據直接shuffle到新分區在聚合,不會先再shuffle前聚合,造成shuffle過程IO壓力更大 ProcessRDD.GroupByKey() ### ReduceByKey:將鍵相同的值使用同一個函數聚合操作,先再shuffle前使用函數局部聚合,再shuffle到不同分區后繼續使用函數聚合 ProcessRDD.ReduceByKey(“函數(U,U) => U”) aggregateByKey:將鍵相同的值在兩端使用不同函數操作,一端是shuffle前局部聚合使用的一個函數,另一端是shuffle后聚合使用的另一個函數,相對ReduceBykey可以提前指定初始值,并依據初始值類型返回值 ### ProcessRDD.aggregateByKey("初始值K類型")(“函數1(K,U) => K”, “函數2(K,K) => K”) **這三種都可以指定shuffle時的分區數,假如分區數不夠,實際會出現不同鍵在同一物理分區,但spark處理時還是認為不同鍵是屬于不同區的。既aggregateByKey初始值K是會給每個鍵初始時加上的。** # **SparkSQL相關:** ## **SQL 解析:** SQL Query,需要經過詞法和語法解析,由字符串轉換為,樹形的抽象語法樹。 * 通過遍歷抽象語法樹生成未解析的邏輯語法樹(unresolved logic plan),對應SQL解析后的一種樹形結構,本身不包含任務數據信息。 * 需要經過一次遍歷之后,轉換成成包含解析后的邏輯算子樹(Analyzed LogicPlan),本身攜帶了各種信息。 * 最后經過優化后得到最終的邏輯語法樹(Optimized LogicPlan)。 不管解析被劃分為幾步,在Spark 執行環境中,都要轉化成RDD的調用代碼,才能被spark core所執行,示意圖如下: ## **創建視圖:** createOrReplaceTempView?的作用是創建一個臨時的表?,?一旦創建這個表的會話關閉?,?這個表也會立馬消失?其他的SparkSession?不能共享應已經創建的臨時表createOrReplaceGlobalTempView創建一個全局的臨時表?,?這個表的生命周期是?整個Spark應用程序?,只要Spark?應用程序不關閉?,?那么.?這個臨時表依然是可以使用的?,并且這個表對其他的SparkSession共享(**要?global\_temp.‘tablename’ 使用**) ## **分組語句:** GROUP BY : 使用時不像MySQL分組,MySQL可以返回不在group by條件中的列的第一條數據作為該列返回值,spark sql 和 hive sql 類似,不能這樣隨機返回,能返回聚合字段和聚合函數,但可以通過指定Frist(col)或者 Last(col)。 ## **窗口函數:** function OVER (PARITION BY … ORDER BY … FRAME\_TYPE ??BETWEEN … AND …) function :對窗口內所有行都處理的函數 PARITION BY : 依據指定列進行分窗處理 ORDER BY : 窗口內依據指定字段排序 FRAME\_TYPE :FRAME是當前分區的一個子集,子句用來定義子集的規則,通常用來作為滑動窗口使用。主要用來控制每行數據在應用窗口函數時,這個窗口函數的作用范圍。 ( FRAME\_TYPE rows 指定函數基于當前行的窗口范圍 rows between …T… and …T… T 如下: unbounded preceding 前面所有行 unbounded following 后面所有行 current row 當前行 n following 后面n行 n preceding 前面n行 )
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看