# 調度和觸發器
Airflow調度程序監視所有任務和所有DAG,并觸發已滿足其依賴關系的任務實例。 在幕后,它監視并與其可能包含的所有DAG對象的文件夾保持同步,并定期(每分鐘左右)檢查活動任務以查看是否可以觸發它們。
Airflow調度程序旨在作為Airflow生產環境中的持久服務運行。 要開始,您需要做的就是執行`airflow scheduler` 。 它將使用`airflow.cfg`指定的配置。
請注意,如果您在一天的`schedule_interval`上運行DAG,則會在`2016-01-01T23:59`之后不久觸發標記為`2016-01-01`的運行。 換句話說,作業實例在其覆蓋的時間段結束后啟動。
**讓我們重復一遍**調度`schedule_interval`在開始日期之后,在句點結束時運行您的作業一個`schedule_interval` 。
調度程序啟動`airflow.cfg`指定的執行程序的實例。 如果碰巧是`LocalExecutor` ,任務將作為子`LocalExecutor`執行; 在`CeleryExecutor`和`MesosExecutor`的情況下,任務是遠程執行的。
要啟動調度程序,只需運行以下命令:
```
airflow scheduler
```
## DAG運行
DAG Run是一個表示DAG實例化的對象。
每個DAG可能有也可能沒有時間表,通知如何創建`DAG Runs` 。 `schedule_interval`被定義為DAG參數,并且優選地接收作為`str`的[cron表達式](https://en.wikipedia.org/wiki/Cron)或`datetime.timedelta`對象。 或者,您也可以使用其中一個cron“預設”:
<colgroup><col width="15%"><col width="69%"><col width="16%"></colgroup>
| 預置 | 含義 | cron的 |
| --- | --- | --- |
| `None` | 不要安排,專門用于“外部觸發”的DAG | |
| `@once` | 安排一次,只安排一次 | |
| `@hourly` | 在小時開始時每小時運行一次 | `0 * * * *` |
| `@daily` | 午夜一天運行一次 | `0 0 * * *` |
| `@weekly` | 周日早上每周午夜運行一次 | `0 0 * * 0` |
| `@monthly` | 每個月的第一天午夜運行一次 | `0 0 1 * *` |
| `@yearly` | 每年1月1日午夜運行一次 | `0 0 1 1 *` |
您的DAG將針對每個計劃進行實例化,同時為每個計劃創建`DAG Run`條目。
DAG運行具有與它們相關聯的狀態(運行,失敗,成功),并通知調度程序應該針對任務提交評估哪組調度。 如果沒有DAG運行級別的元數據,Airflow調度程序將需要做更多的工作才能確定應該觸發哪些任務并進行爬行。 在更改DAG的形狀時,也可能會添加新任務,從而創建不需要的處理。
## 回填和追趕
具有`start_date` (可能是`end_date` )和`schedule_interval`的Airflow DAG定義了一系列間隔,調度程序將這些間隔轉換為單獨的Dag運行并執行。 Airflow的一個關鍵功能是這些DAG運行是原子的冪等項,默認情況下,調度程序將檢查DAG的生命周期(從開始到結束/現在,一次一個間隔)并啟動DAG運行對于尚未運行(或已被清除)的任何間隔。 這個概念叫做Catchup。
如果你的DAG被編寫來處理它自己的追趕(IE不僅限于間隔,而是改為“現在”。),那么你將需要關閉追趕(在DAG本身上使用`dag.catchup = False` )或者默認情況下在配置文件級別使用`catchup_by_default = False` 。 這樣做,是指示調度程序僅為DAG間隔序列的最新實例創建DAG運行。
```
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime , timedelta
default_args = {
'owner' : 'airflow' ,
'depends_on_past' : False ,
'start_date' : datetime ( 2015 , 12 , 1 ),
'email' : [ 'airflow@example.com' ],
'email_on_failure' : False ,
'email_on_retry' : False ,
'retries' : 1 ,
'retry_delay' : timedelta ( minutes = 5 ),
'schedule_interval' : '@hourly' ,
}
dag = DAG ( 'tutorial' , catchup = False , default_args = default_args )
```
在上面的示例中,如果調度程序守護程序在2016-01-02上午6點(或從命令行)拾取DAG,則將創建單個DAG運行,其`execution_date`為2016-01-01 ,下一個將在2016-01-03上午午夜后創建,執行日期為2016-01-02。
如果`dag.catchup`值為True,則調度程序將為2015-12-01和2016-01-02之間的每個完成時間間隔創建一個DAG Run(但不是2016-01-02中的一個,因為該時間間隔)尚未完成)并且調度程序將按順序執行它們。 對于可以輕松拆分為句點的原子數據集,此行為非常有用。 如果您的DAG運行在內部執行回填,則關閉追趕是很好的。
## 外部觸發器
請注意,在運行`airflow trigger_dag`命令時,也可以通過CLI手動創建`DAG Runs` ,您可以在其中定義特定的`run_id` 。 在調度程序外部創建的`DAG Runs`與觸發器的時間戳相關聯,并將與預定的`DAG runs`一起顯示在UI中。
此外,您還可以使用Web UI手動觸發`DAG Run` (選項卡“DAG” - >列“鏈接” - >按鈕“觸發器Dag”)。
## 要牢記
* 第一個`DAG Run`是基于DAG中任務的最小`start_date`創建的。
* 后續`DAG Runs`由調度程序進程根據您的DAG的`schedule_interval`順序創建。
* 當清除一組任務的狀態以期讓它們重新運行時,重要的是要記住`DAG Run`的狀態,因為它定義了調度程序是否應該查看該運行的觸發任務。
以下是一些可以**取消阻止任務的方法** :
* 在UI中,您可以從任務實例對話框中**清除** (如刪除狀態)各個任務實例,同時定義是否要包括過去/未來和上游/下游依賴項。 請注意,接下來會出現一個確認窗口,您可以看到要清除的設置。 您還可以清除與dag關聯的所有任務實例。
* CLI命令`airflow clear -h`在清除任務實例狀態時有很多選項,包括指定日期范圍,通過指定正則表達式定位task_ids,包含上游和下游親屬的標志,以及特定狀態下的目標任務實例( `failed`或`success` )
* 清除任務實例將不再刪除任務實例記錄。 相反,它更新max_tries并將當前任務實例狀態設置為None。
* 將任務實例標記為失敗可以通過UI完成。 這可用于停止運行任務實例。
* 將任務實例標記為成功可以通過UI完成。 這主要是為了修復漏報,或者例如在Airflow之外應用修復時。
* `airflow backfill` CLI子命令具有`--mark_success`標志,允許選擇DAG的子部分以及指定日期范圍。