# 概念
Airflow Platform是用于描述,執行和監控工作流的工具。
## 核心理念
### DAG的
在Airflow中, `DAG` (或定向非循環圖)是您要運行的所有任務的集合,以反映其關系和依賴關系的方式進行組織。
例如,一個簡單的DAG可以包含三個任務:A,B和C.可以說A必須在B可以運行之前成功運行,但C可以隨時運行。 它可以說任務A在5分鐘后超時,并且B可以重新啟動最多5次以防它失敗。 它也可能會說工作流程將在每天晚上10點運行,但不應該在某個特定日期之前開始。
通過這種方式,DAG描述_了_您希望如何執行工作流程; 但請注意,我們還沒有說過我們真正想做的事情! A,B和C可以是任何東西。 當C發送電子郵件時,也許A準備B進行分析的數據。 或者也許A監控你的位置,這樣B可以打開你的車庫門,而C打開你的房子燈。 重要的是,發展議程集團并不關心其組成任務的作用; 它的工作是確保無論他們做什么在正確的時間,或正確的順序,或正確處理任何意外的問題。
DAG在標準Python文件中定義,這些文件放在Airflow的`DAG_FOLDER` 。 Airflow將執行每個文件中的代碼以動態構建`DAG`對象。 您可以擁有任意數量的DAG,每個DAG都描述任意數量的任務。 通常,每個應該對應于單個邏輯工作流。
注意
搜索DAG時,Airflow將僅考慮字符串“airflow”和“DAG”都出現在`.py`文件內容中的文件。
#### 范圍
Airflow將加載它可以從`DAG`導入的任何`DAG`對象。 重要的是,這意味著DAG必須出現在`globals()` 。 考慮以下兩個DAG。 只會加載`dag_1` ; 另一個只出現在本地范圍內。
```
dag_1 = DAG ( 'this_dag_will_be_discovered' )
def my_function ():
dag_2 = DAG ( 'but_this_dag_will_not' )
my_function ()
```
有時這可以很好地利用。 例如, `SubDagOperator`的常見模式是定義函數內的子標記,以便Airflow不會嘗試將其作為獨立的DAG加載。
#### 默認參數
如果將`default_args`字典傳遞給DAG,它將把它們應用于任何運算符。 這使得很容易將公共參數應用于許多運算符而無需多次鍵入。
```
default_args = {
'start_date' : datetime ( 2016 , 1 , 1 ),
'owner' : 'Airflow'
}
dag = DAG ( 'my_dag' , default_args = default_args )
op = DummyOperator ( task_id = 'dummy' , dag = dag )
print ( op . owner ) # Airflow
```
#### Context Manager
_在Airflow 1.8中添加_
DAG可用作上下文管理器,以自動將新運算符分配給該DAG。
```
with DAG ( 'my_dag' , start_date = datetime ( 2016 , 1 , 1 )) as dag :
op = DummyOperator ( 'op' )
op . dag is dag # True
```
### 運營商
雖然DAG描述了_如何_運行工作流,但`Operators`確定實際完成的工作。
操作員描述工作流中的單個任務。 運營商通常(但并非總是)是原子的,這意味著他們可以獨立運營,而不需要與任何其他運營商共享資源。 DAG將確保運營商以正確的順序運行; 除了這些依賴項之外,運營商通常獨立運行。 實際上,它們可能在兩臺完全不同的機器上運行。
這是一個微妙但非常重要的一點:通常,如果兩個運營商需要共享信息,如文件名或少量數據,您應該考慮將它們組合到一個運算符中。 如果絕對無法避免,Airflow確實具有操作員交叉通信的功能,稱為XCom,本文檔的其他部分對此進行了描述。
Airflow為許多常見任務提供操作員,包括:
* `BashOperator` - 執行bash命令
* `PythonOperator` - 調用任意Python函數
* `EmailOperator` - 發送電子郵件
* `SimpleHttpOperator` - 發送HTTP請求
* `MySqlOperator` , `SqliteOperator` , `PostgresOperator` , `MsSqlOperator` , `OracleOperator` , `JdbcOperator`等 - 執行SQL命令
* `Sensor` - 等待一定時間,文件,數據庫行,S3鍵等...
除了這些基本構建塊之外,還有許多特定的運算符: `DockerOperator` , `HiveOperator` , `S3FileTransformOperator` , `PrestoToMysqlOperator` , `SlackOperator` ......你明白了!
`airflow/contrib/`目錄包含更多由社區構建的運算符。 這些運算符并不總是像主發行版中那樣完整或經過良好測試,但允許用戶更輕松地向平臺添加新功能。
如果將操作員分配給DAG,則操作員僅由Airflow加載。
請參閱[使用運算符](howto/operator.html)了解如何使用Airflow運算符。
#### DAG分配
_在Airflow 1.8中添加_
操作員不必立即分配給DAG(之前的`dag`是必需參數)。 但是,一旦將運營商分配給DAG,就無法轉移或取消分配。 在創建運算符時,通過延遲賦值或甚至從其他運算符推斷,可以顯式地完成DAG分配。
```
dag = DAG ( 'my_dag' , start_date = datetime ( 2016 , 1 , 1 ))
# sets the DAG explicitly
explicit_op = DummyOperator ( task_id = 'op1' , dag = dag )
# deferred DAG assignment
deferred_op = DummyOperator ( task_id = 'op2' )
deferred_op . dag = dag
# inferred DAG assignment (linked operators must be in the same DAG)
inferred_op = DummyOperator ( task_id = 'op3' )
inferred_op . set_upstream ( deferred_op )
```
#### Bitshift成分
_在Airflow 1.8中添加_
傳統上,使用`set_upstream()`和`set_downstream()`方法設置運算符關系。 在Airflow 1.8中,這可以通過Python bitshift操作符`>>`和`<<`來完成。 以下四個語句在功能上都是等效的:
```
op1 >> op2
op1 . set_downstream ( op2 )
op2 << op1
op2 . set_upstream ( op1 )
```
當使用bitshift組合運算符時,關系設置在bitshift運算符指向的方向上。 例如, `op1 >> op2`表示`op1`先運行, `op2`運行第二。 可以組成多個運算符 - 請記住,鏈從左到右執行,并且始終返回最右邊的對象。 例如:
```
op1 >> op2 >> op3 << op4
```
相當于:
```
op1 . set_downstream ( op2 )
op2 . set_downstream ( op3 )
op3 . set_upstream ( op4 )
```
為方便起見,bitshift運算符也可以與DAG一起使用。 例如:
```
dag >> op1 >> op2
```
相當于:
```
op1 . dag = dag
op1 . set_downstream ( op2 )
```
我們可以把這一切放在一起構建一個簡單的管道:
```
with DAG ( 'my_dag' , start_date = datetime ( 2016 , 1 , 1 )) as dag :
(
DummyOperator ( task_id = 'dummy_1' )
>> BashOperator (
task_id = 'bash_1' ,
bash_command = 'echo "HELLO!"' )
>> PythonOperator (
task_id = 'python_1' ,
python_callable = lambda : print ( "GOODBYE!" ))
)
```
### 任務
一旦運算符被實例化,它就被稱為“任務”。 實例化在調用抽象運算符時定義特定值,參數化任務成為DAG中的節點。
### 任務實例
任務實例表示任務的特定運行,其特征在于dag,任務和時間點的組合。 任務實例也有一個指示狀態,可以是“運行”,“成功”,“失敗”,“跳過”,“重試”等。
### 工作流程
您現在熟悉Airflow的核心構建模塊。 有些概念可能聽起來非常相似,但詞匯表可以概念化如下:
* DAG:描述工作應該發生的順序
* 運算符:作為執行某些工作的模板的類
* 任務:運算符的參數化實例
* 任務實例:1)已分配給DAG的任務,2)具有與DAG的特定運行相關聯的狀態
通過組合`DAGs`和`Operators`來創建`TaskInstances` ,您可以構建復雜的工作流。
## 附加功能
除了核心Airflow對象之外,還有許多更復雜的功能可以實現限制同時訪問資源,交叉通信,條件執行等行為。
### 鉤
鉤子是外部平臺和數據庫的接口,如Hive,S3,MySQL,Postgres,HDFS和Pig。 Hooks盡可能實現通用接口,并充當運營商的構建塊。 他們還使用`airflow.models.Connection`模型來檢索主機名和身份驗證信息。 掛鉤將身份驗證代碼和信息保存在管道之外,集中在元數據數據庫中。
鉤子在Python腳本,Airflow airflow.operators.PythonOperator以及iPython或Jupyter Notebook等交互式環境中使用它們也非常有用。
### 池
當有太多進程同時攻擊它們時,某些系統可能會被淹沒。 氣流池可用于**限制**任意任務集上**的執行并行性** 。 通過為池命名并為其分配多個工作槽來在UI( `Menu -> Admin -> Pools` )中管理池列表。 然后,在創建任務時(即,實例化運算符),可以使用`pool`參數將任務與其中一個現有池相關聯。
```
aggregate_db_message_job = BashOperator (
task_id = 'aggregate_db_message_job' ,
execution_timeout = timedelta ( hours = 3 ),
pool = 'ep_data_pipeline_db_msg_agg' ,
bash_command = aggregate_db_message_job_cmd ,
dag = dag )
aggregate_db_message_job . set_upstream ( wait_for_empty_queue )
```
`pool`參數可以與`priority_weight`結合使用,以定義隊列中的優先級,以及在池中打開的槽時首先執行哪些任務。 默認的`priority_weight`是`1` ,可以碰到任何數字。 在對隊列進行排序以評估接下來應該執行哪個任務時,我們使用`priority_weight` ,與來自此任務下游任務的所有`priority_weight`值相加。 您可以使用它來執行特定的重要任務,并相應地優先處理該任務的整個路徑。
當插槽填滿時,任務將照常安排。 達到容量后,可運行的任務將排隊,其狀態將在UI中顯示。 當插槽空閑時,排隊的任務將根據`priority_weight` (任務及其后代)開始運行。
請注意,默認情況下,任務不會分配給任何池,并且它們的執行并行性僅限于執行程序的設置。
### 連接
外部系統的連接信息存儲在Airflow元數據數據庫中并在UI中進行管理( `Menu -> Admin -> Connections` )在那里定義了`conn_id` ,并附加了主機名/登錄/密碼/架構信息。 氣流管道可以簡單地引用集中管理的`conn_id`而無需在任何地方硬編碼任何此類信息。
可以定義具有相同`conn_id`許多連接,并且在這種情況下,并且當**掛鉤**使用來自`BaseHook`的`get_connection`方法時,Airflow將隨機選擇一個連接,允許在與重試一起使用時進行一些基本的負載平衡和容錯。
Airflow還能夠通過操作系統中的環境變量引用連接。 但它只支持URI格式。 如果您需要為連接指定`extra` ,請使用Web UI。
如果在Airflow元數據數據庫和環境變量中都定義了具有相同`conn_id`連接,則Airflow將僅引用環境變量中的連接(例如,給定`conn_id` `postgres_master` ,Airflow將`AIRFLOW_CONN_POSTGRES_MASTER`在環境變量中搜索`AIRFLOW_CONN_POSTGRES_MASTER`并直接引用它,如果發現,在開始搜索元數據數據庫之前)。
許多掛鉤都有一個默認的`conn_id` ,使用該掛鉤的運算符不需要提供顯式連接ID。 例如, [`PostgresHook`](code.html "airflow.hooks.postgres_hook.PostgresHook")的默認`conn_id`是`postgres_default` 。
請參閱[管理Connections](howto/manage-connections.html)以了解如何創建和管理連接。
### 隊列
使用CeleryExecutor時,可以指定發送任務的芹菜隊列。 `queue`是BaseOperator的一個屬性,因此任何任務都可以分配給任何隊列。 環境的默認隊列在`airflow.cfg`的`celery -> default_queue` 。 這定義了未指定任務時分配給的隊列,以及Airflow工作程序在啟動時偵聽的隊列。
工作人員可以收聽一個或多個任務隊列。 當工作程序啟動時(使用命令`airflow worker` ),可以指定一組逗號分隔的隊列名稱(例如, `airflow worker -q spark` )。 然后,該工作人員將僅接收連接到指定隊列的任務。
如果您需要專業工作人員,從資源角度來看(例如,一個工作人員可以毫無問題地執行數千個任務),或者從環境角度(您希望工作人員從Spark群集中運行),這可能非常有用本身,因為它需要一個非常具體的環境和安全權利)。
### XComs
XComs允許任務交換消息,允許更細微的控制形式和共享狀態。 該名稱是“交叉通信”的縮寫。 XComs主要由鍵,值和時間戳定義,但也跟蹤創建XCom的任務/ DAG以及何時應該可見的屬性。 任何可以被pickle的對象都可以用作XCom值,因此用戶應該確保使用適當大小的對象。
可以“推”(發送)或“拉”(接收)XComs。 當任務推送XCom時,它通常可用于其他任務。 任務可以通過調用`xcom_push()`方法隨時推送XComs。 此外,如果任務返回一個值(來自其Operator的`execute()`方法,或者來自PythonOperator的`python_callable`函數),則會自動推送包含該值的XCom。
任務調用`xcom_pull()`來檢索XComs,可選地根據`key` ,source `task_ids`和source `dag_id`等條件應用過濾器。 默認情況下, `xcom_pull()`過濾掉從執行函數返回時被自動賦予XCom的鍵(與手動推送的XCom相反)。
如果為`xcom_pull`傳遞`xcom_pull`單個字符串,則返回該任務的最新XCom值; 如果傳遞了task_ids列表,則返回相應的XCom值列表。
```
# inside a PythonOperator called 'pushing_task'
def push_function ():
return value
# inside another PythonOperator where provide_context=True
def pull_function ( ** context ):
value = context [ 'task_instance' ] . xcom_pull ( task_ids = 'pushing_task' )
```
也可以直接在模板中提取XCom,這是一個示例:
```
SELECT * FROM {{ task_instance . xcom_pull ( task_ids = 'foo' , key = 'table_name' ) }}
```
請注意,XCom與[變量](20)類似,但專門用于任務間通信而非全局設置。
### 變量
變量是將任意內容或設置存儲和檢索為Airflow中的簡單鍵值存儲的通用方法。 可以從UI( `Admin -> Variables` ),代碼或CLI列出,創建,更新和刪除`Admin -> Variables` 。 此外,json設置文件可以通過UI批量上傳。 雖然管道代碼定義和大多數常量和變量應該在代碼中定義并存儲在源代碼控制中,但是通過UI可以訪問和修改某些變量或配置項會很有用。
```
from airflow.models import Variable
foo = Variable . get ( "foo" )
bar = Variable . get ( "bar" , deserialize_json = True )
```
第二個調用假設`json`內容,并將反序列化為`bar` 。 請注意, `Variable`是sqlalchemy模型,可以這樣使用。
您可以使用jinja模板中的變量,其語法如下:
```
echo {{ var . value .< variable_name > }}
```
或者如果需要從變量反序列化json對象:
```
echo {{ var . json .< variable_name > }}
```
### 分枝
有時您需要一個工作流來分支,或者只根據任意條件走下某條路徑,這通常與上游任務中發生的事情有關。 一種方法是使用`BranchPythonOperator` 。
`BranchPythonOperator`與PythonOperator非常相似,只是它需要一個返回task_id的python_callable。 返回返回的task_id,并跳過所有其他路徑。 Python函數返回的task_id必須直接引用BranchPythonOperator任務下游的任務。
請注意,在`depends_on_past=True`中使用`depends_on_past=True`下游的任務在邏輯上是不合理的,因為`skipped`狀態將總是導致依賴于過去成功的塊任務。 `skipped`狀態在所有直接上游任務被`skipped`地方傳播。
如果你想跳過一些任務,請記住你不能有一個空路徑,如果是這樣,那就做一個虛擬任務。
像這樣,跳過虛擬任務“branch_false”

不喜歡這樣,跳過連接任務

### SubDAGs
SubDAG非常適合重復模式。 在使用Airflow時,定義一個返回DAG對象的函數是一個很好的設計模式。
Airbnb在加載數據時使用_階段檢查交換_模式。 數據在臨時表中暫存,然后對該表執行數據質量檢查。 一旦檢查全部通過,分區就會移動到生產表中。
再舉一個例子,考慮以下DAG:

我們可以將所有并行`task-*`運算符組合到一個SubDAG中,以便生成的DAG類似于以下內容:

請注意,SubDAG運算符應包含返回DAG對象的工廠方法。 這將阻止SubDAG在主UI中被視為單獨的DAG。 例如:
```
#dags/subdag.py
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
# Dag is returned by a factory method
def sub_dag ( parent_dag_name , child_dag_name , start_date , schedule_interval ):
dag = DAG (
' %s . %s ' % ( parent_dag_name , child_dag_name ),
schedule_interval = schedule_interval ,
start_date = start_date ,
)
dummy_operator = DummyOperator (
task_id = 'dummy_task' ,
dag = dag ,
)
return dag
```
然后可以在主DAG文件中引用此SubDAG:
```
# main_dag.py
from datetime import datetime , timedelta
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from dags.subdag import sub_dag
PARENT_DAG_NAME = 'parent_dag'
CHILD_DAG_NAME = 'child_dag'
main_dag = DAG (
dag_id = PARENT_DAG_NAME ,
schedule_interval = timedelta ( hours = 1 ),
start_date = datetime ( 2016 , 1 , 1 )
)
sub_dag = SubDagOperator (
subdag = sub_dag ( PARENT_DAG_NAME , CHILD_DAG_NAME , main_dag . start_date ,
main_dag . schedule_interval ),
task_id = CHILD_DAG_NAME ,
dag = main_dag ,
)
```
您可以從主DAG的圖形視圖放大SubDagOperator,以顯示SubDAG中包含的任務:

使用SubDAG時的一些其他提示:
* 按照慣例,SubDAG的`dag_id`應以其父級和點為前綴。 和在`parent.child`
* 通過將參數傳遞給SubDAG運算符來共享主DAG和SubDAG之間的參數(如上所示)
* SubDAG必須有一個計劃并啟用。 如果SubDAG的時間表設置為`None`或`@once` ,SubDAG將成功完成而不做任何事情
* 清除SubDagOperator也會清除其中的任務狀態
* 在SubDagOperator上標記成功不會影響其中的任務狀態
* 避免在`depends_on_past=True`中的任務中使用`depends_on_past=True` ,因為這可能會造成混淆
* 可以為SubDAG指定執行程序。 如果要在進程中運行SubDAG并有效地將其并行性限制為1,則通常使用SequentialExecutor。 使用LocalExecutor可能會有問題,因為它可能會過度訂閱您的工作人員,在單個插槽中運行多個任務
有關演示,請參閱`airflow/example_dags` 。
### 服務水平協議
服務級別協議或任務或DAG應該成功的時間可以在任務級別設置為`timedelta` 。 如果此時一個或多個實例未成功,則會發送警報電子郵件,詳細說明錯過其SLA的任務列表。 該事件也記錄在數據庫中,并在`Browse->Missed SLAs`下的Web UI中可用,其中可以分析和記錄事件。
### 觸發規則
雖然正常的工作流行為是在所有直接上游任務都成功時觸發任務,但Airflow允許更復雜的依賴項設置。
所有運算符都有一個`trigger_rule`參數,該參數定義生成的任務被觸發的規則。 `trigger_rule`的默認值是`all_success` ,可以定義為“當所有直接上游任務都成功時觸發此任務”。 此處描述的所有其他規則都基于直接父任務,并且是在創建任務時可以傳遞給任何操作員的值:
* `all_success` :(默認)所有父母都成功了
* `all_failed` :所有父`all_failed`都處于`failed`或`upstream_failed`狀態
* `all_done` :所有父母都完成了他們的執行
* `one_failed` :一旦至少一個父母失敗就會觸發,它不會等待所有父母完成
* `one_success` :一旦至少一個父成功就觸發,它不會等待所有父母完成
* `dummy` :依賴項僅用于show,隨意觸發
請注意,這些可以與`depends_on_past` (boolean)結合使用,當設置為`True` ,如果任務的先前計劃未成功,則不會觸發任務。
### 最新的運行
標準工作流行為涉及為特定日期/時間范圍運行一系列任務。 但是,某些工作流執行的任務與運行時無關,但需要按計劃運行,就像標準的cron作業一樣。 在這些情況下,暫停期間錯過的回填或運行作業會浪費CPU周期。
對于這種情況,您可以使用`LatestOnlyOperator`跳過在DAG的最近計劃運行期間未運行的任務。 如果現在的時間不在其`execution_time`和下一個計劃的`execution_time`之間,則`LatestOnlyOperator`跳過所有直接下游任務及其自身。
必須意識到跳過的任務和觸發器規則之間的相互作用。 跳過的任務將通過觸發器規則`all_success`和`all_failed`級聯,但不是`all_done` , `one_failed` , `one_success`和`dummy` 。 如果您希望將`LatestOnlyOperator`與不級聯跳過的觸發器規則一起使用,則需要確保`LatestOnlyOperator` **直接**位于您要跳過的任務的上游。
通過使用觸發器規則來混合應該在典型的日期/時間依賴模式下運行的任務和使用`LatestOnlyOperator`任務是可能的。
例如,考慮以下dag:
```
#dags/latest_only_with_trigger.py
import datetime as dt
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule
dag = DAG (
dag_id = 'latest_only_with_trigger' ,
schedule_interval = dt . timedelta ( hours = 4 ),
start_date = dt . datetime ( 2016 , 9 , 20 ),
)
latest_only = LatestOnlyOperator ( task_id = 'latest_only' , dag = dag )
task1 = DummyOperator ( task_id = 'task1' , dag = dag )
task1 . set_upstream ( latest_only )
task2 = DummyOperator ( task_id = 'task2' , dag = dag )
task3 = DummyOperator ( task_id = 'task3' , dag = dag )
task3 . set_upstream ([ task1 , task2 ])
task4 = DummyOperator ( task_id = 'task4' , dag = dag ,
trigger_rule = TriggerRule . ALL_DONE )
task4 . set_upstream ([ task1 , task2 ])
```
在這個dag的情況下,對于除最新運行之外的所有運行, `latest_only`任務將顯示為跳過。 `latest_only`直接位于`latest_only`下游,并且除了最新的之外還將跳過所有運行。 `task2`完全獨立于`latest_only` ,將在所有計劃的時間段內運行。 `task3`是`task3`和`task2`下游,由于默認的`trigger_rule`是`all_success`將從`all_success`接收級聯跳過。 `task4`是`task4`和`task2`下游,但由于其`trigger_rule`設置為`all_done`因此一旦跳過`all_done` (有效的完成狀態)并且`task2`成功,它將立即觸發。

### 僵尸與亡靈
任務實例一直在死,通常是正常生命周期的一部分,但有時會出乎意料。
僵尸任務的特點是沒有心跳(由工作定期發出)和數據庫中的`running`狀態。 當工作節點無法訪問數據庫,Airflow進程在外部被終止或者節點重新啟動時,它們可能會發生。 僵尸查殺由調度程序的進程定期執行。
Undead進程的特點是存在進程和匹配的心跳,但Airflow不知道此任務在數據庫中`running` 。 這種不匹配通常在數據庫狀態發生變化時發生,最有可能是通過刪除UI中“任務實例”視圖中的行。 指示任務驗證其作為心跳例程的一部分的狀態,并在確定它們處于這種“不死”狀態時終止自身。
### 群集策略
您的本地氣流設置文件可以定義一個`policy`功能,該功能可以根據其他任務或DAG屬性改變任務屬性。 它接收單個參數作為對任務對象的引用,并期望改變其屬性。
例如,此函數可以在使用特定運算符時應用特定隊列屬性,或強制執行任務超時策略,確保任何任務運行超過48小時。 以下是`airflow_settings.py` :
```
def policy ( task ):
if task . __class__ . __name__ == 'HivePartitionSensor' :
task . queue = "sensor_queue"
if task . timeout > timedelta ( hours = 48 ):
task . timeout = timedelta ( hours = 48 )
```
### 文檔和注釋
可以在Web界面中顯示的dag和任務對象中添加文檔或注釋(dag為“Graph View”,任務為“Task Details”)。 如果定義了一組特殊任務屬性,它們將被呈現為豐富內容:
<colgroup><col width="38%"><col width="62%"></colgroup>
| 屬性 | 渲染到 |
| --- | --- |
| DOC | 等寬 |
| doc_json | JSON |
| doc_yaml | YAML |
| doc_md | 降價 |
| doc_rst | reStructuredText的 |
請注意,對于dags,doc_md是解釋的唯一屬性。
如果您的任務是從配置文件動態構建的,則此功能特別有用,它允許您公開導致Airflow中相關任務的配置。
```
"""
### My great DAG
"""
dag = DAG ( 'my_dag' , default_args = default_args )
dag . doc_md = __doc__
t = BashOperator ( "foo" , dag = dag )
t . doc_md = """ \
#Title"
Here's a [url](www.airbnb.com)
"""
```
此內容將分別在“圖表視圖”和“任務詳細信息”頁面中呈現為降價。
### 金賈模板
Airflow充分利用了[Jinja Templating](http://jinja.pocoo.org/docs/dev/)的強大功能,這可以成為與宏結合使用的強大工具(參見[宏](code.html)部分)。
例如,假設您希望使用`BashOperator`將執行日期作為環境變量傳遞給Bash腳本。
```
# The execution date as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator (
task_id = 'test_env' ,
bash_command = '/tmp/test.sh ' ,
dag = dag ,
env = { 'EXECUTION_DATE' : date })
```
這里, `{{ ds }}`是一個宏,并且由于`BashOperator`的`env`參數是使用Jinja模板化的,因此執行日期將作為Bash腳本中名為`EXECUTION_DATE`的環境變量提供。
您可以將Jinja模板與文檔中標記為“模板化”的每個參數一起使用。 模板替換發生在調用運算符的pre_execute函數之前。
## 打包的dags
雖然通常會在單個`.py`文件中指定dags,但有時可能需要將dag及其依賴項組合在一起。 例如,您可能希望將多個dag組合在一起以將它們一起版本,或者您可能希望將它們一起管理,或者您可能需要一個額外的模塊,默認情況下在您運行airflow的系統上不可用。 為此,您可以創建一個zip文件,其中包含zip文件根目錄中的dag,并在目錄中解壓縮額外的模塊。
例如,您可以創建一個如下所示的zip文件:
```
my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py
```
Airflow將掃描zip文件并嘗試加載`my_dag1.py`和`my_dag2.py` 。 它不會進入子目錄,因為它們被認為是潛在的包。
如果您想將模塊依賴項添加到DAG,您基本上也會這樣做,但是更多的是使用virtualenv和pip。
```
virtualenv zip_dag
source zip_dag/bin/activate
mkdir zip_dag_contents
cd zip_dag_contents
pip install --install-option = "--install-lib= $PWD " my_useful_package
cp ~/my_dag.py .
zip -r zip_dag.zip *
```
注意
zip文件將插入模塊搜索列表(sys.path)的開頭,因此它將可用于駐留在同一解釋器中的任何其他代碼。
注意
包裝的dags不能與打開酸洗一起使用。
注意
打包的dag不能包含動態庫(例如libz.so),如果模塊需要這些庫,則需要在系統上使用這些庫。 換句話說,只能打包純python模塊。