# 教程
本教程將向您介紹一些基本的Airflow概念,對象及其在編寫第一個管道時的用法。
## 示例管道定義
以下是基本管道定義的示例。 如果這看起來很復雜,請不要擔心,下面將逐行說明。
```
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime , timedelta
default_args = {
'owner' : 'airflow' ,
'depends_on_past' : False ,
'start_date' : datetime ( 2015 , 6 , 1 ),
'email' : [ 'airflow@example.com' ],
'email_on_failure' : False ,
'email_on_retry' : False ,
'retries' : 1 ,
'retry_delay' : timedelta ( minutes = 5 ),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG ( 'tutorial' , default_args = default_args )
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator (
task_id = 'print_date' ,
bash_command = 'date' ,
dag = dag )
t2 = BashOperator (
task_id = 'sleep' ,
bash_command = 'sleep 5' ,
retries = 3 ,
dag = dag )
templated_command = """
{ % f or i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{ % e ndfor %}
"""
t3 = BashOperator (
task_id = 'templated' ,
bash_command = templated_command ,
params = { 'my_param' : 'Parameter I passed in' },
dag = dag )
t2 . set_upstream ( t1 )
t3 . set_upstream ( t1 )
```
## 這是一個DAG定義文件
包圍你的一件事(對于每個人來說可能不是很直觀)是這個Airflow Python腳本實際上只是一個配置文件,將DAG的結構指定為代碼。 此處定義的實際任務將在與此腳本的上下文不同的上下文中運行。 不同的任務在不同的時間點運行在不同的工作者上,這意味著該腳本不能用于在任務之間交叉通信。 請注意,為此,我們有一個名為`XCom`的更高級功能。
人們有時會將DAG定義文件視為可以進行實際數據處理的地方 - 事實并非如此! 該腳本的目的是定義DAG對象。 它需要快速評估(秒,而不是幾分鐘),因為調度程序將定期執行它以反映更改(如果有的話)。
## 導入模塊
Airflow管道只是一個Python腳本,恰好定義了Airflow DAG對象。 讓我們首先導入我們需要的庫。
```
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
```
## 默認參數
我們即將創建一個DAG和一些任務,我們可以選擇顯式地將一組參數傳遞給每個任務的構造函數(這將變得多余),或者(更好!)我們可以定義一個默認參數的字典,我們可以可以在創建任務時使用。
```
from datetime import datetime , timedelta
default_args = {
'owner' : 'airflow' ,
'depends_on_past' : False ,
'start_date' : datetime ( 2015 , 6 , 1 ),
'email' : [ 'airflow@example.com' ],
'email_on_failure' : False ,
'email_on_retry' : False ,
'retries' : 1 ,
'retry_delay' : timedelta ( minutes = 5 ),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
```
有關BaseOperator參數及其功能的更多信息,請參閱[`airflow.models.BaseOperator`](code.html "airflow.models.BaseOperator")文檔。
另外,請注意,您可以輕松定義可用于不同目的的不同參數集。 一個例子是在生產和開發環境之間進行不同的設置。
## 實例化DAG
我們需要一個DAG對象來嵌入我們的任務。 這里我們傳遞一個定義`dag_id`的字符串,它用作DAG的唯一標識符。 我們還傳遞我們剛剛定義的默認參數字典,并為DAG定義1天的`schedule_interval` 。
```
dag = DAG (
'tutorial' , default_args = default_args , schedule_interval = timedelta ( 1 ))
```
## 任務
在實例化操作員對象時生成任務。 從運算符實例化的對象稱為構造函數。 第一個參數`task_id`充當任務的唯一標識符。
```
t1 = BashOperator (
task_id = 'print_date' ,
bash_command = 'date' ,
dag = dag )
t2 = BashOperator (
task_id = 'sleep' ,
bash_command = 'sleep 5' ,
retries = 3 ,
dag = dag )
```
請注意我們如何將從BaseOperator繼承的所有運算符( `retries` )共同的運算符特定參數( `bash_command` )和通用參數傳遞給運算符的構造函數。 這比為每個構造函數調用傳遞每個參數更簡單。 另請注意,在第二個任務中,我們使用`3`覆蓋`retries`參數。
任務的優先規則如下:
1. 明確傳遞參數
2. `default_args`字典中存在的值
3. 運算符的默認值(如果存在)
任務必須包含或繼承參數`task_id`和`owner` ,否則Airflow將引發異常。
## 與金賈一起模仿
Airflow充分利用了[Jinja Templating](http://jinja.pocoo.org/docs/dev/)的強大功能,并為管道作者提供了一組內置參數和宏。 Airflow還為管道作者提供了定義自己的參數,宏和模板的鉤子。
本教程幾乎沒有涉及在Airflow中使用模板進行操作的表面,但本節的目的是讓您知道此功能的存在,讓您熟悉雙花括號,并指向最常見的模板變量: `{{ ds }}` (今天的“日期戳”)。
```
templated_command = """
{ % f or i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7) }}"
echo "{{ params.my_param }}"
{ % e ndfor %}
"""
t3 = BashOperator (
task_id = 'templated' ,
bash_command = templated_command ,
params = { 'my_param' : 'Parameter I passed in' },
dag = dag )
```
請注意, `templated_command`包含`{% %}`塊中的代碼邏輯,引用參數如`{{ ds }}` ,調用`{{ macros.ds_add(ds, 7)}}`的函數,并在`{{ macros.ds_add(ds, 7)}}`引用用戶定義的參數`{{ params.my_param }}` 。
`BaseOperator`的`params`鉤子允許您將參數和/或對象的字典傳遞給模板。 請花點時間了解參數`my_param`如何通過模板。
文件也可以傳遞給`bash_command`參數,例如`bash_command='templated_command.sh'` ,其中文件位置相對于包含管道文件的目錄(在本例中為`tutorial.py` )。 這可能是出于許多原因,例如分離腳本的邏輯和管道代碼,允許在使用不同語言編寫的文件中進行正確的代碼突出顯示,以及構造管道的一般靈活性。 也可以將`template_searchpath`定義為指向DAG構造函數調用中的任何文件夾位置。
使用相同的DAG構造函數調用,可以定義`user_defined_macros` ,它允許您指定自己的變量。 例如,將`dict(foo='bar')`傳遞給此參數允許您在模板中使用`{{ foo }}` 。 此外,指定`user_defined_filters`允許您注冊自己的過濾器。 例如,將`dict(hello=lambda name: 'Hello %s' % name)`傳遞給此參數允許您使用`{{ 'world' | hello }}` 你的模板中的`{{ 'world' | hello }}` 有關自定義過濾器的更多信息,請查看[Jinja文檔](http://jinja.pocoo.org/docs/dev/api/)
有關可以在模板中引用的變量和宏的更多信息,請務必閱讀[宏](code.html)部分
## 設置依賴關系
我們有兩個不相互依賴的簡單任務。 以下是一些可以定義它們之間依賴關系的方法:
```
t2 . set_upstream ( t1 )
# This means that t2 will depend on t1
# running successfully to run
# It is equivalent to
# t1.set_downstream(t2)
t3 . set_upstream ( t1 )
# all of this is equivalent to
# dag.set_dependency('print_date', 'sleep')
# dag.set_dependency('print_date', 'templated')
```
請注意,在執行腳本時,Airflow會在DAG中找到循環或多次引用依賴項時引發異常。
## 概括
好吧,所以我們有一個非常基本的DAG。 此時,您的代碼應如下所示:
```
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime , timedelta
default_args = {
'owner' : 'airflow' ,
'depends_on_past' : False ,
'start_date' : datetime ( 2015 , 6 , 1 ),
'email' : [ 'airflow@example.com' ],
'email_on_failure' : False ,
'email_on_retry' : False ,
'retries' : 1 ,
'retry_delay' : timedelta ( minutes = 5 ),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG (
'tutorial' , default_args = default_args , schedule_interval = timedelta ( 1 ))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator (
task_id = 'print_date' ,
bash_command = 'date' ,
dag = dag )
t2 = BashOperator (
task_id = 'sleep' ,
bash_command = 'sleep 5' ,
retries = 3 ,
dag = dag )
templated_command = """
{ % f or i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{ % e ndfor %}
"""
t3 = BashOperator (
task_id = 'templated' ,
bash_command = templated_command ,
params = { 'my_param' : 'Parameter I passed in' },
dag = dag )
t2 . set_upstream ( t1 )
t3 . set_upstream ( t1 )
```
## 測試
### 運行腳本
是時候進行一些測試了。 首先讓我們確保管道解析。 假設我們正在保存`airflow.cfg`引用的`airflow.cfg`文件夾中`tutorial.py`中上一步的代碼。 DAG的默認位置是`~/airflow/dags` 。
```
python ~/airflow/dags/tutorial.py
```
如果腳本沒有引發異常,則意味著您沒有做任何可怕的錯誤,并且您的Airflow環境有點健全。
### 命令行元數據驗證
讓我們運行一些命令來進一步驗證這個腳本。
```
# print the list of active DAGs
airflow list_dags
# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial
# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree
```
### 測試
讓我們通過在特定日期運行實際任務實例來進行測試。 在此上下文中指定的日期是`execution_date` ,它模擬在特定日期+時間運行任務或dag的調度程序:
```
# command layout: command subcommand dag_id task_id date
# testing print_date
airflow test tutorial print_date 2015 -06-01
# testing sleep
airflow test tutorial sleep 2015 -06-01
```
現在還記得我們之前用模板做過的事嗎? 通過運行此命令,了解如何呈現和執行此模板:
```
# testing templated
airflow test tutorial templated 2015 -06-01
```
這應該導致顯示詳細的事件日志并最終運行bash命令并打印結果。
請注意, `airflow test`命令在本地運行任務實例,將其日志輸出到stdout(在屏幕上),不依賴于依賴項,并且不向數據庫傳達狀態(運行,成功,失敗,...)。 它只允許測試單個任務實例。
### 回填
一切看起來都運行良好所以讓我們運行回填。 `backfill`將尊重您的依賴關系,將日志發送到文件并與數據庫通信以記錄狀態。 如果您有網絡服務器,您將能夠跟蹤進度。 如果您有興趣在回填過程中直觀地跟蹤進度, `airflow webserver`將啟動Web服務器。
請注意,如果使用`depends_on_past=True` ,則單個任務實例將取決于前面任務實例的成功,除了指定了自身的start_date,此依賴關系將被忽略。
此上下文中的日期范圍是`start_date`和可選的`end_date` ,它們用于使用此dag中的任務實例填充運行計劃。
```
# optional, start a web server in debug mode in the background
# airflow webserver --debug &
# start your backfill on a date range
airflow backfill tutorial -s 2015 -06-01 -e 2015 -06-07
```
## 下一步是什么?
就是這樣,你已經編寫,測試并回填了你的第一個Airflow管道。 將代碼合并到具有針對它運行的主調度程序的代碼存儲庫中應該讓它每天都被觸發并運行。
以下是您可能想要做的一些事情:
* 深入了解用戶界面 - 點擊所有內容!
* 繼續閱讀文檔! 特別是以下部分:
> * 命令行界面
> * 運營商
> * 宏
* 寫下你的第一個管道!