# **任務:**
currentTime=$(date "+%Y%m%d%H%M%S")
source /etc/profile &> /dev/null
FLINK\_HOME=/var/lib/hadoop-hdfs/softwares/flink
${FLINK\_HOME}/bin/flink run -t yarn-per-job \\
\--detached \\
\-c com.sobot.icall.analysis.job.ICallDataAnalysisJob \\
\-p 2 \\
\-D taskmanager.numberOfTaskSlots=1 \\
\-D yarn.appmaster.vcores=1 \\
\-D yarn.containers.vcores=1 \\
\-D yarn.application.name=ICallDataAnalysisJob \\
\-D taskmanager.memory.process.size=1024m \\
\-D jobmanager.memory.process.size=1024m \\
\-s hdfs://cdh.test.ten.sobot.com:8020/data/flink/stream/icall-data-anasysis/savepoint/savepoint-92d9ba-63e37de2e4d1 \\
\--allowNonRestoredState \\
/var/lib/hadoop-hdfs/flinkJob/icall-data-analysis.jar \\
\--systemScheme hdfs://cdh.test.ten.sobot.com:8020 \\
\--kafkaParallelism 3 \\
\> /var/lib/hadoop-hdfs/flinkJob/icall-data-analysis/logs/icall-data-analysis-${currentTime}.log 2>&1 &
參數解釋:
\-c == --class 指定jar包運行類
\-p == --parallelism 指定并行度
\-D 設定任務運行相關參數
\-s == --fromSavepoint 使用原有保存快照來重起工作
\--allowNonRestoredState只允許 job從狀態快照(保存點或 checkpoints)啟動,該快照包含在正在啟動的 job中無處可還原的狀態。換言之,一些 state被撤銷了。
\--kafkaParallelism 3 寫入Kafka并行度 ,jar包自定義傳入參數
2>&1 : 對于command>a 2>&1這條命令,等價于command 1>a 2>&1
2就是標準錯誤,1是標準輸出,那么這條命令不就是相當于把標準錯誤重定向到標準輸出(0 表示stdin標準輸入,1 表示stdout標準輸出,2 表示stderr標準錯誤)
# **1、日志的配置:**
log4j-cli.properties:由 Flink 命令行客戶端使用(例如?flink run)(不包括在集群上執行的代碼)。這個文件是我們使用flink run提交任務時,任務提交到集群前打印的日志所需的配置。
log4j-session.properties:Flink 命令行客戶端在啟動 YARN 或 Kubernetes session 時使用(yarn-session.sh,kubernetes-session.sh)。
log4j.properties:作為 JobManager/TaskManager 日志配置使用(standalone 和 YARN 兩種模式下皆使用)
日志文件可以設置滾動策略,具體設置如下:(log4j.properties)
\# Allows this configuration to be modified at runtime. The file will be checked every 60 seconds.
monitorInterval=60
\# 滾動日志的配置
\# This affects logging for both user code and Flink
rootLogger.level = DEBUG
rootLogger.appenderRef.rolling.ref = RollingFileAppender
\# Uncomment this if you want to \_only\_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
logger.sobot.name = com.sobot
logger.sobot.level = INFO
logger.sobot.additivity = false
\# The following lines keep the log level of common libraries/connectors on
\# log level INFO. The root logger does not override this. You have to manually
\# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
\# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
#日志文件名
appender.rolling.fileName = ${sys:log.file}
#指定當發生文件滾動時,文件重命名規則
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
\# 輸出模板
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\# 指定記錄文件的保存策略,該策略主要是完成周期性的日志文件保存工作
appender.rolling.policies.type = Policies
\# 基于日志文件大小的觸發策略
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
\# 當日志文件大小大于size指定的值時,觸發滾動
appender.rolling.policies.size.size = 5M
\# 文件保存的覆蓋策略
appender.rolling.strategy.type = DefaultRolloverStrategy
\# 生成分割(保存)文件的個數,默認為5(-1,-2,-3,-4,-5)
appender.rolling.strategy.max = 10
\# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
## **2、yarn中需要占用多少資源是怎么決定的:**
給定參數,指定taskmanager擁有多少slot和內存,然后在根據任務的共享分組數和算子的并行度來分配資源,算子未設置分組和并行度就采用全局默認值。同一分組可以共用slot,算子并行度要小于等于分組內slot數量,否則重分配slot給該分組。任務鏈條的分配與前后并行度不同和shuffle操作有關。
## **3、Flink?UI中需要使用啟動幾個Slot是如何決定的:**
Slot數量與任務并行度和設置的共享組有關,同一任務在同一共享組可以共用slot,且該共享組solt數量與它運行的任務并行度相等,不同共享組需要不同solt,一個處理流程可能不同算子設置不同共享組,那么slot總數是共享組solt數之和。
## **4、任務提交(on yarn)流程:**

1.Client先上傳flink任務所需jar包至HDFS,便于JobManager和TaskManager共享HDFS的數據。
2.Client接著向Yarn ResourceManager 提交任務,ResouceManager接到請求后,先分配container資源,然后選取某一個NodeManager啟動ApplicationMaster。
3.ApplicationMaster啟動后加載flink的jar包和配置構建環境,然后啟動JobManager,然后JobManager會分析當前的作業圖,將它轉化成執行圖(包含了所有可以并發執行的任務),從而知道當前需要的具體資源。
4.JobManager會向ResourceManager申請資源,ResouceManager接到請求后,繼續分配container資源,然后通知ApplictaionMaster啟動更多的TaskManager(先分配好container資源,再到指定NodeManager啟動TaskManager)。NodeManager在啟動TaskManager時也會從HDFS加載數據。
5.最后,TaskManager啟動后,會向JobManager發送心跳包。JobManager向TaskManager分配任務。
# **5、狀態后端:**
## **State Backends 的作用**
有狀態的流計算是Flink的一大特點,狀態本質上是數據,數據是需要維護的,例如數據庫就是維護數據的一種解決方案。State Backends 的作用就是用來維護State的。一個 State Backend 主要負責兩件事:Local State Management(本地狀態管理) 和 Remote State Checkpointing(遠程狀態備份)。
## **Local State Management(本地狀態管理)**
State Management 的主要任務是確保狀態的更新和訪問。類似于數據庫系統對數據的管理,State Backends 的狀態管理就是提供對 State 的訪問或更新操作,從這一點上看,State Backends 與數據庫很相似。Flink 提供的 State Backends 主要有兩種形式的狀態管理:
·直接將 State 以對象的形式存儲到JVM的堆上面。
·將 State 對象序列化后存儲到 RocksDB 中(RocksDB會寫到本地的磁盤上)
以上兩種方式,第一種存儲到JVM堆中,因為是在內存中讀寫,延遲會很低,但State的大小受限于內存的大小;第二種方式存儲到State Backends上(本地磁盤上),讀寫較內存會慢一些,但不受內存大小的限制,同時因為state存儲在磁盤上,可以減少應用程序對內存的占用。根據使用經驗,對延遲不是特別敏感的應用,選擇第二種方式較好,尤其是State比較大的情況下。
## **Remote State Checkpointing(遠程狀態備份)**
Flink程序是分布式運行的,而State都是存儲到各個節點上的,一旦TaskManager節點出現問題,就會導致State的丟失。State Backend 提供了 State Checkpointing 的功能,將 TaskManager 本地的 State 的備份到遠程的存儲介質上,可以是分布式的存儲系統或者數據庫。不同的 State Backends 備份的方式不同,會有效率高低的區別。
## **FLink 目前提供了三種狀態后端:**
### **1、MemoryStateBackend**
對于狀態管理,MemoryStateBackend直接將State對象存儲到TaskManager的JVM堆上,如MapState會被存儲為一個HashMap對象。can suffer from garbage collection pauses because it puts many long-lived objects on the heap.
對于遠程備份,MemoryStateBackend會將State備份到JobManager的堆內存上,這種方式是非常不安全的,且受限于JobManager的內存大小。
### **2、FsStateBackend**
對于狀態管理,FsStateBackend與MemoryStateBackend一樣,將State存儲到TaskManager的JVM堆上。
對于遠程備份,FsStateBackend會將State寫入到遠程的文件系統,如HDFS中。
### **3、RocksDBStateBackend**
對于狀態管理,RocksDBStateBackend將state存儲到TaskManager節點上的RocksDB數據庫實例上。
對于遠程備份,RocksDBstateBackend會將State備份到遠程的存儲系統中。
# **6、Flink Checkpoint 深入理解**
## **1、flink中state(狀態)**
·state泛指:flink中有狀態函數和運算符在各個元素(element)/事件(event)的處理過程中存儲的數據(注意:狀態數據可以修改和查詢,可以自己維護,根據自己的業務場景,保存歷史數據或者中間結果到狀態(state)中);
使用狀態計算的例子:
·當應用程序搜索某些事件模式時,狀態將存儲到目前為止遇到的事件序列。
·在每分鐘/小時/天聚合事件時,狀態保存待處理的聚合。
·當在數據點流上訓練機器學習模型時,狀態保持模型參數的當前版本。
·當需要管理歷史數據時,狀態允許有效訪問過去發生的事件。
## **為什么需要state管理**
流式作業的特點是7\*24小時運行,數據不重復消費,不丟失,保證只計算一次,數據實時產出不延遲,但是當狀態很大,內存容量限制,或者實例運行奔潰,或需要擴展并發度等情況下,如何保證狀態正確的管理,在任務重新執行的時候能正確執行,狀態管理就顯得尤為重要。
## **2、什么是checkpoint**
·checkpoint機制是Flink可靠性的基石,可以保證Flink集群在某個算子因為某些原因(如 異常退出)出現故障時,能夠通過checkpoint將整個應用流圖的狀態恢復到故障之前的某一狀態,保證應用流圖狀態的一致性。Flink的checkpoint機制原理來自“Chandy-Lamport algorithm”算法。 (分布式快照算)
·每個需要checkpoint的應用在啟動時,Flink的JobManager為其創建一個 CheckpointCoordinator,CheckpointCoordinator全權負責本應用的快照制作。
## **3、checkpoint的過程**
1.JobManager端的 CheckPointCoordinator向 所有SourceTask發送CheckPointTrigger,Source Task會在數據流中安插CheckPoint barrier
2.當task收到所有的barrier后,向自己的下游繼續傳遞barrier,然后自身執行快照,并將自己的狀態異步寫入到持久化存儲中。增量CheckPoint只是把最新的一部分更新寫入到 外部存儲;為了下游盡快做CheckPoint,所以會先發送barrier到下游,自身再同步進行快照
3.當task完成備份后,會將備份數據的地址(state handle)通知給JobManager的CheckPointCoordinator;如果CheckPoint的持續時長超過 了CheckPoint設定的超時時間,CheckPointCoordinator 還沒有收集完所有的 State Handle,CheckPointCoordinator就會認為本次CheckPoint失敗,會把這次CheckPoint產生的所有 狀態數據全部刪除。
4.最后 CheckPoint Coordinator 會把整個 StateHandle 封裝成 completed CheckPoint Meta,寫入到hdfs或其他文件系統。
## **4、從checkpoint的恢復**
當我們任務執行過程中出現失敗后仍能從設定的checkpoint地址恢復任務失敗前狀態。默認情況下,如果設置了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink程序失敗時,可以從最近的這個Checkpoint來進行恢復。如果我們希望保留多個Checkpoint,并能夠根據實際需要選擇其中一個進行恢復,這樣會更加靈活,比如,我們發現最近4個小時數據記錄處理有問題,希望將整個狀態還原到4小時之前。(但因有一定的時間間隔,每次checkpoint的間隙勢必出現重復消費處理數據的情況,設置極短的間隙則會產生大量的check文件和I/O開銷,增加執行任務負擔,故要合理設置時間和保留適量check文件)
## **開啟checkpoint例子:**
// 設置保存點的保存路徑,這里是保存在hdfs中
env.setStateBackend(new FsStateBackend("hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints"));
CheckpointConfig config = env.getCheckpointConfig();
// 任務流取消和故障應保留檢查點
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN\_ON\_CANCELLATION);
// 保存點模式:exactly\_once
config.setCheckpointingMode(CheckpointingMode.EXACTLY\_ONCE);
// 觸發保存點的時間間隔
config.setCheckpointInterval(60000);
## **為了確保作業在失敗后能自動恢復,我們可以設置重啟策略,例如失敗后最多重啟3次,每次重啟間隔10s:**
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
## **那如果重啟3次后仍失敗呢?**
## **這個時候因為設置了任務完全失敗后保留check文件**
// 任務流取消和故障應保留檢查點?config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN\_ON\_CANCELLATION);
我們可以找到對應的check文件
Shell模式下:bin/flink run**?\-s (指定文件地址如:hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/\_metadata)**
## **5、checkpoint與savepoint**
1、checkpoint的側重點是“容錯”,即Flink作業意外失敗并重啟之后,能夠直接從早先打下的checkpoint恢復運行,且不影響作業邏輯的準確性。而savepoint的側重點是“維護”,即Flink作業需要在人工干預下手動重啟、升級、遷移或A/B測試時,先將狀態整體寫入可靠存儲,維護完畢之后再從savepoint恢復現場。
2、savepoint是“通過checkpoint機制”創建的,所以savepoint本質上是特殊的checkpoint。
3、checkpoint面向Flink Runtime本身,由Flink的各個TaskManager定時觸發快照并自動清理,一般不需要用戶干預;savepoint面向用戶,完全根據用戶的需要觸發與清理。
4、checkpoint的頻率往往比較高(因為需要盡可能保證作業恢復的準確度),所以checkpoint的存儲格式非常輕量級,但作為trade-off犧牲了一切可移植(portable)的東西,比如不保證改變并行度和升級的兼容性。savepoint則以二進制形式存儲所有狀態數據和元數據,執行起來比較慢而且“貴”,但是能夠保證portability,如并行度改變或代碼升級之后,仍然能正常恢復。
5、checkpoint是支持增量的(通過RocksDB),特別是對于超大狀態的作業而言可以降低寫入成本。savepoint并不會連續自動觸發,所以savepoint沒有必要支持增量。
## **6、什么是barrier對齊與不對齊**

·一旦Operator從輸入流接收到CheckPoint barrier n,它就不能處理來自該流的任何數據記錄,直到它從其他所有輸入接收到barrier n為止。否則,它會混合屬于快照n的記錄和屬于快照n + 1的記錄;
·上圖中第2個圖,雖然數字流對應的barrier?n已經到達了,但是barrier?n之后的1、2、3這些數據只能放到buffer中,等待字母流的barrier?n到達;
·一旦最后所有輸入流都接收到barrier n,Operator就會把緩沖區中pending 的輸出數據發出去,然后把CheckPoint barrier n+1接著往下游發送。
·這里還會對自身進行快照;之后,Operator將繼續處理來自所有輸入流的記錄,在處理來自流的記錄之前先處理來自輸入緩沖區的記錄。
·上述圖2中,當還有其他輸入流的barrier還沒有到達時,會把已到達的barrier之后的數據1、2、3擱置在緩沖區,等待其他流的barrier到達后才能處理,這便是barrier對齊了。
·barrier不對齊就是指當還有其他流的barrier還沒到達時,為了不影響性能,也不用理會,直接處理barrier之后的數據。等到所有流的barrier的都到達后,就可以對該Operator做CheckPoint了;
## **為什么要進行barrier對齊?不對齊到底行不行?**
答:Exactly Once(精準一次)時必須barrier對齊,barrier不對齊就變成了At Least Once(至少一次);
**詳情來源:**[**https://blog.csdn.net/qq\_43081842/article/details/112161557**](https://blog.csdn.net/qq_43081842/article/details/112161557)
# 7、**算子相關**
### **Dataset與Dataframe很多算子相同,不過一個偏向批處理,一個則是流處理。**
**流處理是很難對全部數據進行全局操作的,除開窗或處理離線數據即批處理。**
## **Source 算子:**
**fromCollection**:從本地集合讀取數據,參數可以是可迭代的對象或能轉迭代的集合類。(流批處理)
**fromElements**: 直接按個的輸入任意類型數據,可以用作簡單示例。(流批處理)
**fromSource**: 指定數據來源,可以是數據庫,消息隊列,socket等(原:addSource)(流批處理)
**readTextFile**: ?從指定文件路徑讀取文件。可以是hdfs或本地等等。(流批處理)
**readFile**: ?直接讀取文件,需要提前獲取文件當參數。(流批處理)
## **Transform轉換算子:**
**Map**: map是最基本的數據轉換算子,可以用清洗數據與轉換。參數可以是自定義實現的MapFunction或匿名(Lamaba)函數。(流批處理)
**Filter**: ?filter是過濾篩選算子,可以用作篩選符合要求的數據。參數可以是自定義實現的FilterFunct或匿名(Lamaba)函數。(流批處理)
**FlatMap**: 與map算子類似,屬于轉換算子,用于進行數據扁平化(降維)處理。常作為多維數據轉一維且也能起到數據過濾或減少的作用。參數如以上......(流批處理)
**mapPartition**: ?同map功能,不同在于它按分區來組織數據放入迭代器傳給一個函數處理。map則是每個數據應用一次函數處理。優點是減少開辟函數,缺點是數據收集到迭代器需要很大的內存空間,容易OOM。(批處理
**Reduce**: ?可以對一個dataset或者一個group指定字段來進行聚合計算,最終同字段聚合成一個元素。可以用作數據統計或合并。(Key流批處理)
**Fold**: ?具有初始值的被Keys化數據流上的“滾動”折疊。將當前數據元與最后折疊的值組合并發出新值 (Key流處理)
**ReduceGroup**: ?同Reduce算子,不同點在于它會先在各計算(task)節點上進行分組reduce,在將所有數據做整體reduce。這樣做的好處就是可以減少網絡IO。(批處理)
**KeyBy**: ?邏輯上將流分區為不相交的分區。具有相同Keys的所有記錄都分配給同一分區。在內部,keyBy是使用散列分區實現的。指定鍵有不同的方法。(流批處理)
**SortPartition**: ?根據指定的字段值進行分區的排序,會將同key放至同分區。(批處理)
**minBy和maxBy**: ?選擇具有最小值或最大值的元素。(Key流批處理)
**Distinct**: ?去除重復的數據(批處理)
**First:** ?返回前n條數據(批處理)
**Join**: ?將兩個DataSet按照一定條件連接到一起,形成新的DataSet,類似表關聯。還有leftOuterJoin:左外連接,
**rightOuterJoin**:右外連接,fullOuterJoin:全連接 (批處理)
**Cross**: ?交叉操作,通過形成這個數據集和其他數據集的笛卡爾積,創建一個新的數據集和join類似,但是這種交叉操作會產生笛卡爾積,在數據比較大的時候,是非常消耗內存的操作。(批處理)
**Window**: ?可以在已經分區的KeyedStream上定義Windows。Windows根據某些特征(例如,在最后5秒內到達的數據)對每個Keys中的數據進行分窗。(Key流處理)
**WindowAll**: ?Windows可以在常規DataStream上定義。Windows根據某些特征(例如,在最后5秒內到達的數據)對所有流事件進行分窗。(流處理)
**Connect**: ?“連接”兩個保存其類型的數據流。連接允許兩個流之間的共享狀態(流批處理)
**Select**: 從可拆分流中選擇一個或多個流(流批處理)
**Union**: 聯合操作,創建包含來自該數據集和其他數據集的元素的新數據集,不會去重(流批處理)
**Rebalance**:??數據重分區,內部使用round robin方法將數據均勻打散。這對于數據傾斜時是很好的選擇。(流批處理)
## **Sink算子:**
**Collect**: ?將數據輸出到本地集合(批處理)
**writeAsText**: ?將數據輸出到文件,包括本地文件,hdfs文件等(流批處理)
**addSink**: ?添加sink接收數據,可以自定義接收器或使用指定sinkUtils。如JdbcSink...(流批處理)
**sinkTo**: ?給定一個已經構建好的sink。(流批處理)
# **8、Flink時間機制**
Flink在流處理程序支持不同的時間概念。分別為Event Time/Processing Time/Ingestion Time,也就是事件時間、處理時間、提取時間。
從時間序列角度來說,發生的先后順序是:
**事件時間(Event Time)----> 攝入時間(Ingestion Time)----> 處理時間(Processing Time)**
**處理時間**
是數據流入到具體某個算子時候相應的系統時間。
這個系統時間指的是執行相應操作的機器的系統時間。當一個流程序通過處理時間來運行時,所有基于時間的操作(如: 時間窗口)將使用各自操作所在的物理機的系統時間。
ProcessingTime 有最好的性能和最低的延遲。但在分布式計算環境或者異步環境中,ProcessingTime具有不確定性,相同數據流多次運行有可能產生不同的計算結果。因為它容易受到從記錄到達系統的速度(例如從消息隊列)到記錄在系統內的operator之間流動的速度的影響(停電,調度或其他)。
**提取時間**
IngestionTime是數據進入Apache Flink框架的時間,是在Source Operator中設置的。每個記錄將源的當前時間作為時間戳,并且后續基于時間的操作(如時間窗口)引用該時間戳。
提取時間在概念上位于事件時間和處理時間之間。與處理時間相比,它稍早一些。IngestionTime與ProcessingTime相比可以提供更可預測的結果,因為IngestionTime的時間戳比較穩定(在源處只記錄一次),所以同一數據在流經不同窗口操作時將使用相同的時間戳,而對于ProcessingTime同一數據在流經不同窗口算子會有不同的處理時間戳。
與事件時間相比,提取時間程序無法處理任何無序事件或后期數據,但程序不必指定如何生成水位線。
在內部,提取時間與事件時間非常相似,但具有自動時間戳分配和自動水位線生成功能。
**事件時間**
事件時間就是事件在真實世界的發生時間,即每個事件在產生它的設備上發生的時間(當地時間)。比如一個點擊事件的時間發生時間,是用戶點擊操作所在的手機或電腦的時間。
在進入Apache Flink框架之前EventTime通常要嵌入到記錄中,并且EventTime也可以從記錄中提取出來。在實際的網上購物訂單等業務場景中,大多會使用EventTime來進行數據計算。
基于事件時間處理的強大之處在于即使在亂序事件,延遲事件,歷史數據以及從備份或持久化日志中的重復數據也能獲得正確的結果。對于事件時間,時間的進度取決于數據,而不是任何時鐘。
事件時間程序必須指定如何生成事件時間的Watermarks,這是表示事件時間進度的機制。
現在假設我們正在創建一個排序的數據流。這意味著應用程序處理流中的亂序到達的事件,并生成同樣事件但按時間戳(事件時間)排序的新數據流。
比如:
有1~10個事件。
亂序到達的序列是:1,2,4,5,6,3,8,9,10,7
經過按 事件時間 處理后的序列是:1,2,3,4,5,6,7,8,9,10
為了處理事件時間,Flink需要知道事件的時間戳,這意味著流中的每條數據都需要分配其事件時間戳。這通常通過提取每條數據中的固定字段來完成時間戳的獲取。
# **9、Watermark機制**
**先看下flink是如何處理流式數據中的按時分段統計情況:**
**聚合類的處理:** Flink可以每來一個消息就處理一次,但是有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內有多少用戶點擊了我們的網頁。所以Flink引入了窗口概念。
**窗口:** 窗口的作用為了周期性的獲取數據。就是把傳入的原始數據流切分成多個buckets,所有計算都在單一的buckets中進行。窗口(window)就是從 Streaming 到 Batch 的一個橋梁。
**帶來的問題:** 聚合類處理帶來了新的問題,比如亂序/延遲。其解決方案就是 Watermark / allowLateNess / sideOutPut 這一組合拳。
**三者的作用:**
**1、 Watermark?**的作用是防止 數據亂序 / 指定時間內獲取不到全部數據。
**2、 allowLateNess?**是將窗口關閉時間再延遲一段時間。
**3、 sideOutPut**是最后兜底操作,當指定窗口已經徹底關閉后,就會把所有過期延遲數據放到側輸出流,讓用戶決定如何處理。
**一句話: ?用Windows把流數據分塊處理,用Watermark確定什么時候不再等待更早的數據/觸發窗口進行計算,用allowLateNess 將窗口關閉時間再延遲一段時間。用sideOutPut 最后兜底把數據導出到其他地方。**
**三者的理解:**
**1、 Watermark** 標識的是當前流的時間段,可以通過在最大事件時間上設定延遲時間來改變窗口認為的當前流時間。**例如實際到達12.05的數據,設定延遲1分鐘,那么對應窗口則認為流還處在12.04的時間段。只有當12.06事件數據到了后才認為這個窗口接收最后12.05的數據并觸發窗口計算。**
**2、 allowLateNess?**設置的時間是真正允許遲到的時間,**Watermark**?設定的延遲時間只是假設流可能會遲到,但認為延遲時間后流一定能到。**allowLateNess?允許在窗口收到對應Watermark**?要觸發計算時仍保留一定時間的狀態繼續等待數據,超過等待時間后才開始真正計算。
**3、 sideOutPut ?**是最后兜底操作,當指定窗口已經徹底關閉后,就會把所有過期延遲數據放到側輸出流,讓用戶決定如何處理。
# **10、Flink遲到事件**
雖說水位線表明著早于它的事件不應該再出現,可以起到一定的數據亂序整理。但是上如上文所講,接收到水位線以前的的消息是不可避免的,這就是所謂的遲到事件。實際上遲到事件是亂序事件的特例,和一般亂序事件不同的是它們的亂序程度超出了水位線的預計,導致窗口在它們到達之前已經關閉。
遲到事件出現時窗口已經關閉并產出了計算結果,因此處理的方法有3種:
·重新激活已經關閉的窗口并重新計算以修正結果。
·將遲到事件收集起來另外處理。
·將遲到事件視為錯誤消息并丟棄。
Flink 默認的處理方式是第3種直接丟棄,其他兩種方式分別使用**Side Output**和**Allowed Lateness。**
Side Output機制可以將遲到事件單獨放入一個數據流分支,這會作為 window 計算結果的副產品,以便用戶獲取并對其進行特殊處理。
Allowed Lateness機制允許用戶設置一個允許的最大遲到時長。Flink 會在窗口關閉后一直保存窗口的狀態直至超過允許遲到時長,這期間的遲到事件不會被丟棄,而是默認會觸發窗口重新計算。因為保存窗口狀態需要額外內存,并且如果窗口計算使用了?ProcessWindowFunction?API 還可能使得每個遲到事件觸發一次窗口的全量計算,代價比較大,所以允許遲到時長不宜設得太長,遲到事件也不宜過多,否則應該考慮降低水位線提高的速度或者調整算法。
# **11、旁路輸出(sideOutPut)**
除了來自數據流算子的主流結果輸出之外,可以產生任意數量的流旁路輸出結果。旁路輸出結果數據類型與主流結果的數據類型以及其他旁路輸出結果數據類型可以是完全不同的。當你需要分割數據流時,這個算子非常有用。**通常需要復制流,然后從每個數據流中過濾掉不需要的數據。也常用來處理延遲亂序數據,通過截取延遲數據另外計算在合并到最終數據流并依照原有時間順序排序。**
可以通過以下函數發射數據到旁路輸出。
·[?ProcessFunction](https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/process_function.html)
·CoProcessFunction
·[?ProcessWindowFunction](#processwindowfunction)
·ProcessAllWindowFunction
例子:
val input: DataStream\[Int\] = ...
val outputTag = OutputTag\[String\]("side-output")
val mainDataStream = input
??.process(new ProcessFunction\[Int, Int\] {
????override def processElement(
????????value: Int,
????????ctx: ProcessFunction\[Int, Int\]#Context,
????????out: Collector\[Int\]): Unit = {
??????// emit data to regular output
??????out.collect(value)
??????// emit data to side output
??????ctx.output(outputTag, "sideout-" + String.valueOf(value))
????}
??})
要讀取旁路輸出流,在數據流運算后使用**getSideOutput(OutputTag)**。此時將會獲得鍵入旁路輸出流的結果。
val sideOutputStream: DataStream\[String\] = mainDataStream.getSideOutput(outputTag)
# **12、Flink窗口機制**
窗口分類可以分成:翻滾窗口(Tumbling Window,無重疊),滑動窗口(Sliding Window,有重疊),和會話窗口,(Session Window,有間隙)
**滾動窗口**
滾動窗口分配器將每個元素分配給固定窗口大小的窗口。滾動窗口大小固定的并且不重疊。例如,如果指定大小為5分鐘的滾動窗口,則將執行當前窗口,并且每五分鐘將啟動一個新窗口。

**滑動窗口**
滑動窗口與滾動窗口的區別就是滑動窗口有重復的計算部分。
滑動窗口分配器將每個元素分配給固定窗口大小的窗口。類似于滾動窗口分配器,窗口的大小由窗口大小參數配置。另外一個窗口滑動參數控制滑動窗口的啟動頻率(how frequently a sliding window is started)。因此,如果滑動大小小于窗口大小,滑動窗可以重疊。在這種情況下,元素被分配到多個窗口。
例如,你可以使用窗口大小為10分鐘的窗口,滑動大小為5分鐘。這樣,每5分鐘會生成一個窗口,包含最后10分鐘內到達的事件。

**會話窗口**
會話窗口分配器通過活動會話分組元素。與滾動窗口和滑動窗口相比,會話窗口不會重疊,也沒有固定的開始和結束時間。相反,當會話窗口在一段時間內沒有接收到元素時會關閉。
例如,不活動的間隙時。會話窗口分配器配置會話間隙,定義所需的不活動時間長度(defines how long is the required period of inactivity)。當此時間段到期時,當前會話關閉,后續元素被分配到新的會話窗口。

**更多相關:**
[https://www.cnblogs.com/rossiXYZ/p/12286407.html、https://mp.weixin.qq.com/s/S-RmP5OWiGqwn-C\_TZNO5A](https://www.cnblogs.com/rossiXYZ/p/12286407.html、https:/mp.weixin.qq.com/s/S-RmP5OWiGqwn-C_TZNO5A)
# **13、Flink異步IO**
**將Flink用于流計算時,若涉及到和外部系統進行交互,如利用Flink從數據庫中讀取數據,這種需要獲取I/O的場景時,我們需要考慮交互所帶來的時延問題。**

若圖1虛線左側所示,請求a發送到database后,MapFunction等待回復后才進行下發送下一個請求b,期間,I/O處于空閑狀態,請求b又開始重復此過程,這樣在兩個來回的時間內(發送請求-收到結果為一個來回),只處理兩個請求。如圖1虛線右側所示,同樣是在兩個來回的時間內,以異步的形式進行交互,請求a發出去后,在等待回復時,請求b,c,d依次發出,這樣既可以處理4個請求了。(好比你煲飯時可以繼續炒菜)
**在某些場景下,為了提高系統的吞吐能力,可以僅通過增大MapFunction的并發度以達目的,但是隨之而來是資源的大量消耗且利用率仍然未提高。**
**注意點:**
1、**為了實現以異步I/O訪問數據庫或K/V存儲,數據庫等需要有能支持異步請求的client;若是沒有,可以通過創建多個同步的client并使用線程池處理同步call的方式實現類似并發的client,但是這方式沒有異步I/O的性能好。**
2、**AsyncFunction不是以多線程方式調用的,一個AsyncFunction實例按順序為每個獨立消息發送請求。只是不用一直等請求回應才能繼續操作而已,可以通過請求回應后回調函數取回結果。**
**異步結果:**
**由于請求響應的快慢可能不一樣,AsyncFunction的“并發”請求可能導致結果的亂序 。如圖1中虛線右側所示,若請求b發出之后,其結果在請求a的之前返回,這樣異步I/O算子前后的消息順序就不一致了。**
**為了控制結果的返回順序,Flink提供了兩種模式:**
1)**Unordered**:當異步的請求完成時,其結果立馬返回,不考慮結果順序即亂序模式。當以processing time作為時間屬性時,該模式可以獲得最小的延時和最小的開銷,使用方式:**AsyncDataStream.unorderedWait**(...);
2)**Ordered**:該模式下,消息在異步I/O算子前后的順序一致,先請求的先返回,即有序模式。為實現有序模式,算子將請求返回的結果放入緩存,直到該請求之前的結果全部返回或超時。該模式通常情況下回引入額外的時延以及在checkpoint過程中會帶來開銷,這是因為,和無序模式相比,消息和請求返回的結果都會在checkpoint的狀態中維持更長時間。使用方式:**AsyncDataStream.orderedWai**t(...);
在此,我們需要針對流任務和**event time**相結合的情況進行補充說明。為什么?是因為**watermark**和消息的整體相對位置是不會變的,什么意思了?發生在某個**watermark**之后的消息,只能在**watermark**被發出之后發出,其請求結果也是。換句話說,兩個**watermark**之間的消息整體與**watermark**是有序的。當然這個區間內消息之間是否有序這得根據使用的模式來分析。
1)對**Ordered**模式,因為消息本身是有序的,所以**watermark**和消息之間也是有序的,和**processing time**相比,其不需要引入額外的開銷;
2)對**Unordered**模式,其模式是先響應先返回,但在與**event time**結合的情況里,消息或結果都需在特定**watermark**發出之后才能發出,此時,就會引入延時和開銷,其開銷的大小取決于**watermark**的頻率
13、**FlinkCDC**
### **CDC (Change Data Capture)****:**簡稱 改變數據捕獲
flink自己實現的實時同步MySQL的binlog日志,從而監測數據的變化與獲取變化后的數據。能夠較快的響應數據的變更,對于要求實時性通知變更的場景非常適用。原有方案是通過中間件kafka不斷獲取新數據來變更通知,CDC則幫助這個場景減少了對Kafka的依賴。
**前提MySQL授予flink使用的用戶讀取binlog權限**
# **14、Flink自定義數據類型TypeInformation**


# **15、Flink廣播(變量)數據流**
創建廣播流需要指定MapStateDescriptor(它描述了用于存儲廣播流名稱與廣播流本身的map 存儲結構)

為了關聯一個非廣播流(keyed 或者 non-keyed)與一個廣播流(BroadcastStream),我們可以調用非廣播流的方法?connect(),并將?BroadcastStream?當做參數傳入。 這個方法的返回參數是?BroadcastConnectedStream,具有類型方法?process(),傳入一個特殊的?ProcessFunction?來書寫我們的模式識別邏輯。 具體傳入?process()?的是哪個類型取決于非廣播流的類型:
* 如果流是一個?keyed?流,那就是?KeyedBroadcastProcessFunction?類型;
* 如果流是一個?non-keyed?流,那就是?BroadcastProcessFunction?類型。
* 注冊一個定時器只能在?對應ProcessFunction?的?processElement()?方法中進行。 在?processBroadcastElement()?方法中不能注冊定時器,因為廣播的元素中并沒有關聯的 key。但廣播流狀態只能在processBroadcastElement()中進行改變,processElement()中只能讀取。


廣播的好處: 根據廣播流或廣播變量對主流數據進行相關更新等動態操作,并且廣播流自身也是動態流數據,可以實現不同狀態下的規則變更。
# **16、Flink廣播與累加器區別**
廣播變量只能在Driver端定義,不能在Executor端定義,在Driver端可以修改廣播變量的值,在Executor端無法修改廣播變量的值
累加器只能在Driver端定義賦予初始值,只能在在Driver端讀取,在 Excutor 端更新
Flink廣播變量,使用廣播變量的好處:每個節點的executor有一個副本,不是每個task有一個副本,可以優化資源提高性能。是將一個公用的小數據集通過廣播變量,發送到每個TaskManager中,作為公共只讀變量使用,供每個task共享使用,以減少復制變量到每個task的次數,降低資源開銷,從而提高性能。
累加器:累加器可以在各個executor之間共享,修改,需要注意,累加器只能在Driver端定義賦予初始值,只能在在Driver端讀取,在 Excutor 端更新。Flink現有的內置累加器為以下幾個:
IntCounter,LongCounter,DoubleCounter
# **17、**