# 教程
> 貢獻者:[@ImPerat0R\_](https://github.com/tssujt)、[@ThinkingChen](https://github.com/cdmikechen)、[@Ray](https://github.com/echo-ray)、[@zhongjiajie](https://github.com/zhongjiajie)
本教程將向您介紹一些 Airflow 的基本概念、對象以及它們在編寫第一個 pipline(管道)時的用法。
## 定義 Pipeline(管道)的例子
以下是定義一個基本 pipline(管道)的示例。如果這看起來很復雜,請不要擔心,下面將逐行說明。
```py
"""
Airflow 教程代碼位于:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
# t1、t2 和 t3 是通過實例化 Operators 創建的任務示例
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{ % for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{ % end for %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
```
## 這是一個 DAG 定義文件
有一件事需要考慮(一開始可能不是很直觀),這個 Airflow 的 Python 腳本實際上只是一個將 DAG 的結構指定為代碼的配置文件。此處定義的實際任務將在與此腳本定義的不同上下文中運行。不同的任務在不同的時間點運行在不同的 worker(工作節點)上,這意味著該腳本不能在任務之間交叉通信。請注意,為此,我們有一個名為`XCom`的更高級功能。
人們有時會將 DAG 定義文件視為可以進行實際數據處理的地方 - 但事實并非如此!該腳本的目的是定義 DAG 對象。它需要快速評估(秒,而不是幾分鐘),因為 scheduler(調度器)將定期執行它以反映更改(如果有的話)。
## 導入模塊
一個 Airflow 的 pipeline 就是一個 Python 腳本,這個腳本的作用是為了定義 Airflow 的 DAG 對象。讓我們首先導入我們需要的庫。
```py
# DAG 對象; 我們將需要它來實例化一個 DAG
from airflow import DAG
# Operators; 我們需要利用這個對象去執行流程!
from airflow.operators.bash_operator import BashOperator
```
## 默認參數
我們即將創建一個 DAG 和一些任務,我們可以選擇顯式地將一組參數傳遞給每個任務的構造函數(這可能變得多余),或者(最好地)我們可以定義一個默認參數的字典,這樣我們可以在創建任務時使用它。
```py
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
```
有關 BaseOperator 參數及其功能的更多信息,請參閱[airflow.models.BaseOperator](zh/31?id=baseoperator)文檔。
另外,請注意,您可以輕松定義可用于不同目的的不同參數集。一個典型的例子是在生產和開發環境之間進行不同的設置。
## 實例化一個 DAG
我們需要一個 DAG 對象來嵌入我們的任務。這里我們傳遞一個定義為`dag_id`的字符串,把它用作 DAG 的唯一標識符。我們還傳遞我們剛剛定義的默認參數字典,同時也為 DAG 定義`schedule_interval`,設置調度間隔為每天一次。
```py
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
```
## (Task)任務
在實例化 operator(執行器)時會生成任務。從一個 operator(執行器)實例化出來的對象的過程,被稱為一個構造方法。第一個參數`task_id`充當任務的唯一標識符。
```py
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
```
注意到我們傳遞了一個 BaseOperator 特有的參數(`bash_command`)和所有的 operator 構造函數中都會有的一個參數(`retries`)。這比為每個構造函數傳遞所有的參數要簡單很多。另請注意,在第二個任務中,我們使用`3`覆蓋了默認的`retries`參數值。
任務參數的優先規則如下:
1. 明確傳遞參數
2. `default_args`字典中存在的值
3. operator 的默認值(如果存在)
任務必須包含或繼承參數`task_id`和`owner`,否則 Airflow 將出現異常。
## 使用 Jinja 作為模版
Airflow 充分利用了[Jinja Templating](http://jinja.pocoo.org/docs/dev/)的強大功能,并為 pipline(管道)的作者提供了一組內置參數和 macros(宏)。Airflow 還為 pipline(管道)作者提供了自定義參數,macros(宏)和 templates(模板)的能力。
本教程幾乎沒有涉及在 Airflow 中使用模板進行操作的工作領域,但本節的目的是讓您知道此功能的存在,讓您熟悉`{{ }}`雙花括號的用途,并指出最常見的模板變量: `{{ ds }}` (今天的“日期戳”)。
```py
templated_command = """
{ % f or i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7) }}"
echo "{{ params.my_param }}"
{ % e ndfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
```
請注意,`templated_command`包含`{% %}`塊中的代碼邏輯,引用參數如`{{ ds }}`,調用函數方式如`{{ macros.ds_add(ds, 7)}}`,引用用戶定義的參數如`{{ params.my_param }}`。
在`BaseOperator`中的`params`hook 允許您將參數或對象的字典傳遞給您的模板。請花一些時間去了解`my_param`這個參數是如何在模板中被使用的。
文件也可以當做`bash_command`的參數進行傳遞,例如`bash_command='templated_command.sh'`,不過這個文件的位置要在 pipeline(管道)文件的目錄內(在本例中為`tutorial.py`)。這可能是出于多種原因,比如將腳本的邏輯和 pipeline 代碼分隔開,允許在使用不同語言編寫的文件中進行正確的代碼突出顯示,以及靈活地構建 pipeline(管道)。還可以定義您的`template_searchpath`,以指向 DAG 構造函數調用中的任何文件夾位置。
使用同樣的 DAG 構造函數調用,可以使用`user_defined_macros`來定義您自己的變量。例如,將`dict(foo='bar')`傳遞給此參數允許您在模板中使用`{{ foo }}` 。此外,允許您指定`user_defined_filters`來注冊自己的過濾器。例如,將`dict(hello=lambda name: 'Hello %s' % name)`傳遞給此參數可以允許您在你的模板中使用`{{ 'world' | hello }}`。有關自定義過濾器的更多信息,請查看[Jinja 文檔](http://jinja.pocoo.org/docs/dev/api/#writing-filters)
有關可以在模板中引用的變量和宏的更多信息,請務必閱讀[宏](zh/code.md)部分
## 設置依賴關系
我們有三個不相互依賴任務,分別是`t1`,`t2`,`t3`。以下是一些可以定義它們之間依賴關系的方法:
```py
t1.set_downstream(t2)
# 這意味著 t2 會在 t1 成功執行之后才會執行
# 與下面這種寫法相等
t2.set_upstream(t1)
# 位移運算符也可用于鏈式運算
# 用于鏈式關系 和上面達到一樣的效果
t1 >> t2
# 位移運算符用于上游關系中
t2 << t1
# 使用位移運算符能夠鏈接
# 多個依賴關系變得簡潔
t1 >> t2 >> t3
# 任務列表也可以設置為依賴項。
# 下面的這些操作都具有相同的效果:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
```
請注意,在執行腳本時,在 DAG 中如果存在循環或多次引用依賴項時,Airflow 會引發異常。
## 回顧
到此,我們有了一個非常基本的 DAG。此時,您的代碼應如下所示:
```py
"""
Airflow 教程代碼位于:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{ % f or i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{ % e ndfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
```
## 測試
### 運行腳本
是時候進行一些測試了。首先讓我們確保 pipeline(管道)能夠被解析。讓我們保證已經將前面的幾個步驟的代碼保存在`tutorial.py`文件中,并將文件放置在`airflow.cfg`設置的 DAGs 文件夾中。DAGs 的默認位置是`~/airflow/dags`。
```bash
python ~/airflow/dags/tutorial.py
```
如果這個腳本沒有報錯,那就證明您的代碼和您的 Airflow 環境沒有特別大的問題。
### 命令行元數據驗證
讓我們運行一些命令來進一步驗證這個腳本。
```bash
# 打印出所有正在活躍狀態的 DAGs
airflow list_dags
# 打印出 'tutorial' DAG 中所有的任務
airflow list_tasks tutorial
# 打印出 'tutorial' DAG 的任務層次結構
airflow list_tasks tutorial --tree
```
### 測試實例
讓我們通過在特定日期運行實際任務實例來進行測試。通過`execution_date`這個上下文指定日期,它會模擬 scheduler 在特定的 日期 + 時間 運行您的任務或者 dag:
```bash
# 命令樣式: command subcommand dag_id task_id date
# 測試 print_date
airflow test tutorial print_date 2015-06-01
# 測試 sleep
airflow test tutorial sleep 2015-06-01
```
現在還記得我們早些時候利用模板都做了什么?讓我們通過執行這個命令看看模板會被渲染成什么樣子:
```bash
# 測試模版渲染
airflow test tutorial templated 2015-06-01
```
用過運行 bash 命令,應該會顯示詳細的事件日志并打印結果。
請注意,`airflow test`命令在本地運行任務實例時,會將其日志輸出到 stdout(在屏幕上),不會受依賴項影響,并且不向數據庫傳達狀態(運行,成功,失敗,...)。它只允許測試單個任務實例。
### Backfill(回填)
一切看起來都運行良好,所以此時讓我們運行 backfill(回填)。`backfill`將尊重您的依賴關系,將日志發送到文件并與數據庫通信以記錄狀態。如果您啟動了一個 web 服務,您可以跟蹤它的進度。`airflow webserver`將啟動 Web 服務器,如果您有興趣在 backfill(回填)過程中直觀地跟蹤進度。
請注意,如果使用`depends_on_past=True`,則單個任務實例的執行將取決于前面任務實例是否成功,除了以 start_date 作為開始時間的實例(即第一個運行的 DAG 實例),他的依賴性會被忽略。
此上下文中的日期范圍是`start_date`和可選的`end_date`,它們用于使用此 dag 中的任務實例填充運行計劃。
```bash
# 可選,在后臺以 debug 模式運行 web 服務器
# airflow webserver --debug &
# 在時間范圍內回填執行任務
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07
```
## 接下來做什么
就如上面這樣,您已經編寫,測試并 backfill(回填)了您的第一個 Airflow 的 pipeline(管道)。將您的代碼合并到一個有 scheduler(調度管理器)的代碼庫中,這樣可以啟動任務并在每天執行它。
以下是您可能想要做的一些事情:
* 深入了解用戶界面 - 點擊所有內容!
* 繼續閱讀文檔! 特別是以下部分:
* 命令行界面
* Operators(運營商)
* Macros(宏)
* 寫下你的第一個 pipline(管道)!