# 調度和觸發器
> 貢獻者:[@Ray](https://github.com/echo-ray)
Airflow 調度程序監視所有任務和所有 DAG,并觸發已滿足其依賴關系的任務實例。 在幕后,它監視并與其可能包含的所有 DAG 對象的文件夾保持同步,并定期(每分鐘左右)檢查活動任務以查看是否可以觸發它們。
Airflow 調度程序旨在作為 Airflow 生產環境中的持久服務運行。 要開始,您需要做的就是執行`airflow scheduler` 。 它將使用`airflow.cfg`指定的配置。
請注意,如果您在一天的`schedule_interval`上運行 DAG,則會在`2016-01-01T23:59`之后不久觸發標記為`2016-01-01`的運行。 換句話說,作業實例在其覆蓋的時間段結束后啟動。
請注意,如果您運行一個`schedule_interval`為 1 天的的`DAG`,`run`標記為`2016-01-01`,那么它會在`2016-01-01T23:59`之后馬上觸發。換句話說,一旦設定的時間周期結束后,工作實例將立馬開始。
**讓我們重復一遍**調度`schedule_interval`在開始日期之后,在句點結束時運行您的作業一個`schedule_interval` 。
調度程序啟動`airflow.cfg`指定的執行程序的實例。 如果碰巧是`LocalExecutor` ,任務將作為子`LocalExecutor`執行; 在`CeleryExecutor`和`MesosExecutor`的情況下,任務是遠程執行的。
要啟動調度程序,只需運行以下命令:
```py
airflow scheduler
```
## DAG 運行
DAG Run 是一個表示 DAG 實例化的對象。
每個 DAG 可能有也可能沒有時間表,通知如何創建`DAG Runs` 。 `schedule_interval`被定義為 DAG 參數,并且優選地接收作為`str`的[cron 表達式](https://en.wikipedia.org/wiki/Cron)或`datetime.timedelta`對象。 或者,您也可以使用其中一個 cron“預設”:
<colgroup><col width="15%"><col width="69%"><col width="16%"></colgroup>
| 預置 | 含義 | cron 的 |
| --- | --- | --- |
| `None` | 不要安排,專門用于“外部觸發”的 DAG | |
| `@once` | 安排一次,只安排一次 | |
| `@hourly` | 在小時開始時每小時運行一次 | `0 * * * *` |
| `@daily` | 午夜一天運行一次 | `0 0 * * *` |
| `@weekly` | 周日早上每周午夜運行一次 | `0 0 * * 0` |
| `@monthly` | 每個月的第一天午夜運行一次 | `0 0 1 * *` |
| `@yearly` | 每年 1 月 1 日午夜運行一次 | `0 0 1 1 *` |
您的 DAG 將針對每個計劃進行實例化,同時為每個計劃創建`DAG Run`條目。
DAG 運行具有與它們相關聯的狀態(運行,失敗,成功),并通知調度程序應該針對任務提交評估哪組調度。 如果沒有 DAG 運行級別的元數據,Airflow 調度程序將需要做更多的工作才能確定應該觸發哪些任務并進行爬行。 在更改 DAG 的形狀時,也可能會添加新任務,從而創建不需要的處理。
## 回填和追趕
具有`start_date` (可能是`end_date` )和`schedule_interval`的 Airflow DAG 定義了一系列間隔,調度程序將這些間隔轉換為單獨的 Dag 運行并執行。 Airflow 的一個關鍵功能是這些 DAG 運行是原子的冪等項,默認情況下,調度程序將檢查 DAG 的生命周期(從開始到結束/現在,一次一個間隔)并啟動 DAG 運行對于尚未運行(或已被清除)的任何間隔。 這個概念叫做 Catchup。
如果你的 DAG 被編寫來處理它自己的追趕(IE 不僅限于間隔,而是改為“現在”。),那么你將需要關閉追趕(在 DAG 本身上使用`dag.catchup = False` )或者默認情況下在配置文件級別使用`catchup_by_default = False` 。 這樣做,是指示調度程序僅為 DAG 間隔序列的最新實例創建 DAG 運行。
```py
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 12, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'schedule_interval': '@hourly',
}
dag = DAG('tutorial', catchup=False, default_args=default_args)
```
在上面的示例中,如果調度程序守護程序在 2016-01-02 上午 6 點(或從命令行)拾取 DAG,則將創建單個 DAG 運行,其`execution_date`為 2016-01-01 ,下一個將在 2016-01-03 上午午夜后創建,執行日期為 2016-01-02。
如果`dag.catchup`值為 True,則調度程序將為 2015-12-01 和 2016-01-02 之間的每個完成時間間隔創建一個 DAG Run(但不是 2016-01-02 中的一個,因為該時間間隔)尚未完成)并且調度程序將按順序執行它們。 對于可以輕松拆分為句點的原子數據集,此行為非常有用。 如果您的 DAG 運行在內部執行回填,則關閉追趕是很好的。
## 外部觸發器
請注意,在運行`airflow trigger_dag`命令時,也可以通過 CLI 手動創建`DAG Runs` ,您可以在其中定義特定的`run_id` 。 在調度程序外部創建的`DAG Runs`與觸發器的時間戳相關聯,并將與預定的`DAG runs`一起顯示在 UI 中。
此外,您還可以使用 Web UI 手動觸發`DAG Run` (選項卡“DAG” - >列“鏈接” - >按鈕“觸發器 Dag”)。
## 要牢記
* 第一個`DAG Run`是基于 DAG 中任務的最小`start_date`創建的。
* 后續`DAG Runs`由調度程序進程根據您的 DAG 的`schedule_interval`順序創建。
* 當清除一組任務的狀態以期讓它們重新運行時,重要的是要記住`DAG Run`的狀態,因為它定義了調度程序是否應該查看該運行的觸發任務。
以下是一些可以**取消阻止任務的方法** :
* 在 UI 中,您可以從任務實例對話框中**清除** (如刪除狀態)各個任務實例,同時定義是否要包括過去/未來和上游/下游依賴項。 請注意,接下來會出現一個確認窗口,您可以看到要清除的設置。 您還可以清除與 dag 關聯的所有任務實例。
* CLI 命令`airflow clear -h`在清除任務實例狀態時有很多選項,包括指定日期范圍,通過指定正則表達式定位 task_ids,包含上游和下游親屬的標志,以及特定狀態下的目標任務實例( `failed`或`success` )
* 清除任務實例將不再刪除任務實例記錄。 相反,它更新 max_tries 并將當前任務實例狀態設置為 None。
* 將任務實例標記為失敗可以通過 UI 完成。 這可用于停止運行任務實例。
* 將任務實例標記為成功可以通過 UI 完成。 這主要是為了修復漏報,或者例如在 Airflow 之外應用修復時。
* `airflow backfill` CLI 子命令具有`--mark_success`標志,允許選擇 DAG 的子部分以及指定日期范圍。