# Lineage
注意
Lineage 支持是非常實驗性的,可能會發生變化。
Airflow可以幫助跟蹤數據的來源,發生的事情以及數據隨時間的變化。 這有助于實現審計跟蹤和數據治理,還可以調試數據流。
氣流通過任務的入口和出口跟蹤數據。 讓我們從一個例子開始,看看它是如何工作的。
```
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.lineage.datasets import File
from airflow.models import DAG
from datetime import timedelta
FILE_CATEGORIES = [ "CAT1" , "CAT2" , "CAT3" ]
args = {
'owner' : 'airflow' ,
'start_date' : airflow . utils . dates . days_ago ( 2 )
}
dag = DAG (
dag_id = 'example_lineage' , default_args = args ,
schedule_interval = '0 0 * * *' ,
dagrun_timeout = timedelta ( minutes = 60 ))
f_final = File ( "/tmp/final" )
run_this_last = DummyOperator ( task_id = 'run_this_last' , dag = dag ,
inlets = { "auto" : True },
outlets = { "datasets" : [ f_final ,]})
f_in = File ( "/tmp/whole_directory/" )
outlets = []
for file in FILE_CATEGORIES :
f_out = File ( "/tmp/ {} /{{{{ execution_date }}}}" . format ( file ))
outlets . append ( f_out )
run_this = BashOperator (
task_id = 'run_me_first' , bash_command = 'echo 1' , dag = dag ,
inlets = { "datasets" : [ f_in ,]},
outlets = { "datasets" : outlets }
)
run_this . set_downstream ( run_this_last )
```
任務采用參數<cite>入口</cite>和<cite>出口</cite> 。 入口可以由數據集列表<cite>{“數據集”:[dataset1,dataset2]}</cite>手動定義,也可以配置為從上游任務中查找出口<cite>{“task_ids”:[“task_id1”,“task_id2”]}</cite>或者可以配置為從直接上游任務<cite>{“auto”:True}</cite>或它們的組合中獲取出口。 出口被定義為數據集列表<cite>{“數據集”:[dataset1,dataset2]}</cite> 。 在執行任務時,數據集的任何字段都使用上下文進行模板化。
注意
如果操作員支持,操作員可以自動添加入口和出口。
在示例DAG任務中, <cite>run_me_first</cite>是一個BashOperator,它接收從列表生成的3個入口: <cite>CAT1</cite> , <cite>CAT2</cite> , <cite>CAT3</cite> 。 請注意, <cite>execution_date</cite>是一個模板化字段,將在任務運行時呈現。
注意
在幕后,Airflow將沿襲元數據作為任務的<cite>pre_execute</cite>方法的一部分進行準備。 當任務完成執行<cite>時,</cite>將調用<cite>post_execute</cite>并將lineage元數據推送到XCOM中。 因此,如果您要創建自己的覆蓋此方法的運算符,請確保分別使用<cite>prepare_lineage</cite>和<cite>apply_lineage</cite>修飾您的方法。
## Apache Atlas
Airflow可以將其沿襲元數據發送到Apache Atlas。 您需要啟用<cite>atlas</cite>后端并正確配置它,例如在<cite>airflow.cfg中</cite> :
```
[ lineage ]
backend = airflow . lineage . backend . atlas
[ atlas ]
username = my_username
password = my_password
host = host
port = 21000
```
請確保安裝了<cite>atlasclient</cite>軟件包。