<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                # 數據血緣(Lineage) > 貢獻者:[@morefreeze](https://github.com/morefreeze) > 注意: > Lineage 支持是實驗性的,可能隨時會發生變化。 Airflow 可以幫助跟蹤數據的來源,以及數據發生了什么變化。 這有助于實現審計跟蹤和數據治理,還可以調試數據流。 Airflow 通過任務的 inlets 和 outlets 跟蹤數據。 讓我們通過一個例子看看它是如何工作的。 ```py from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator from airflow.lineage.datasets import File from airflow.models import DAG from datetime import timedelta FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"] args = { 'owner': 'airflow' , 'start_date': airflow.utils.dates.days_ago(2) } dag = DAG( dag_id='example_lineage', default_args=args, schedule_interval='0 0 * * *', dagrun_timeout=timedelta(minutes=60)) f_final = File("/tmp/final") run_this_last = DummyOperator(task_id='run_this_last', dag=dag, inlets={"auto": True}, outlets={"datasets": [f_final,]}) f_in = File("/tmp/whole_directory/") outlets = [] for file in FILE_CATEGORIES: f_out = File("/tmp/{}/{{{{ execution_date }}}}".format(file)) outlets.append(f_out) run_this = BashOperator( task_id='run_me_first', bash_command='echo 1', dag=dag, inlets={"datasets": [f_in,]}, outlets={"datasets": outlets} ) run_this.set_downstream(run_this_last) ``` 任務定義了參數`inlets`和`outlets`。 `inlets`可以是一個數據集列表`{"datesets":[dataset1,dataset2]}`,也可以是指定的上游任務`outlets`像這樣`{"task_ids":["task_id1","task_id2"]}`,或者不想指定直接用`{"auto":True}`也可以,甚至是前面幾種的組合。 `outlets` 也是一個數據集列表`{"datesets":[dataset1,dataset2]}`。 在運行任務時,數據集的字段會被模板渲染。 > 注意: > 只要 Operator 支持,它會自動地加上 inlets 和 outlets。 在示例 DAG 任務中, `run_me_first`是一個 BashOperator,它接收`CAT1`, `CAT2`, `CAT3`作為 inlets(譯注:根據代碼,應為“輸出 outlets”)。 其中的`execution_date`會在任務運行時被渲染成執行時間。 > 注意: > 在底層,Airflow 會在`pre_execute`方法中準備 lineage 元數據。 當任務運行結束時,會調用`post_execute`將 lineage 元數據推送到 XCOM 中。 因此,如果您要創建自己的 Operator,并且需要覆寫這些方法,確保分別用`prepare_lineage`和`apply_lineage`裝飾這些方法。 ## Apache Atlas Airflow 可以將 lineage 元數據發送到 Apache Atlas。 您需要在`airflow.cfg`中配置`atlas`: ```config [lineage] backend = airflow.lineage.backend.atlas [atlas] username = my_username password = my_password host = host port = 21000 ``` 請確保已經安裝了`atlasclient`。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看