<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                # 教程 > 貢獻者:[@ImPerat0R\_](https://github.com/tssujt)、[@ThinkingChen](https://github.com/cdmikechen)、[@Ray](https://github.com/echo-ray)、[@zhongjiajie](https://github.com/zhongjiajie) 本教程將向您介紹一些 Airflow 的基本概念、對象以及它們在編寫第一個 pipline(管道)時的用法。 ## 定義 Pipeline(管道)的例子 以下是定義一個基本 pipline(管道)的示例。如果這看起來很復雜,請不要擔心,下面將逐行說明。 ```py """ Airflow 教程代碼位于: https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py """ from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 6, 1), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1)) # t1、t2 和 t3 是通過實例化 Operators 創建的任務示例 t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) templated_command = """ { % for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" { % end for %} """ t3 = BashOperator( task_id='templated', bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, dag=dag) t2.set_upstream(t1) t3.set_upstream(t1) ``` ## 這是一個 DAG 定義文件 有一件事需要考慮(一開始可能不是很直觀),這個 Airflow 的 Python 腳本實際上只是一個將 DAG 的結構指定為代碼的配置文件。此處定義的實際任務將在與此腳本定義的不同上下文中運行。不同的任務在不同的時間點運行在不同的 worker(工作節點)上,這意味著該腳本不能在任務之間交叉通信。請注意,為此,我們有一個名為`XCom`的更高級功能。 人們有時會將 DAG 定義文件視為可以進行實際數據處理的地方 - 但事實并非如此!該腳本的目的是定義 DAG 對象。它需要快速評估(秒,而不是幾分鐘),因為 scheduler(調度器)將定期執行它以反映更改(如果有的話)。 ## 導入模塊 一個 Airflow 的 pipeline 就是一個 Python 腳本,這個腳本的作用是為了定義 Airflow 的 DAG 對象。讓我們首先導入我們需要的庫。 ```py # DAG 對象; 我們將需要它來實例化一個 DAG from airflow import DAG # Operators; 我們需要利用這個對象去執行流程! from airflow.operators.bash_operator import BashOperator ``` ## 默認參數 我們即將創建一個 DAG 和一些任務,我們可以選擇顯式地將一組參數傳遞給每個任務的構造函數(這可能變得多余),或者(最好地)我們可以定義一個默認參數的字典,這樣我們可以在創建任務時使用它。 ```py from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 6, 1), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } ``` 有關 BaseOperator 參數及其功能的更多信息,請參閱[airflow.models.BaseOperator](zh/31?id=baseoperator)文檔。 另外,請注意,您可以輕松定義可用于不同目的的不同參數集。一個典型的例子是在生產和開發環境之間進行不同的設置。 ## 實例化一個 DAG 我們需要一個 DAG 對象來嵌入我們的任務。這里我們傳遞一個定義為`dag_id`的字符串,把它用作 DAG 的唯一標識符。我們還傳遞我們剛剛定義的默認參數字典,同時也為 DAG 定義`schedule_interval`,設置調度間隔為每天一次。 ```py dag = DAG( 'tutorial', default_args=default_args, schedule_interval=timedelta(days=1)) ``` ## (Task)任務 在實例化 operator(執行器)時會生成任務。從一個 operator(執行器)實例化出來的對象的過程,被稱為一個構造方法。第一個參數`task_id`充當任務的唯一標識符。 ```py t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) ``` 注意到我們傳遞了一個 BaseOperator 特有的參數(`bash_command`)和所有的 operator 構造函數中都會有的一個參數(`retries`)。這比為每個構造函數傳遞所有的參數要簡單很多。另請注意,在第二個任務中,我們使用`3`覆蓋了默認的`retries`參數值。 任務參數的優先規則如下: 1. 明確傳遞參數 2. `default_args`字典中存在的值 3. operator 的默認值(如果存在) 任務必須包含或繼承參數`task_id`和`owner`,否則 Airflow 將出現異常。 ## 使用 Jinja 作為模版 Airflow 充分利用了[Jinja Templating](http://jinja.pocoo.org/docs/dev/)的強大功能,并為 pipline(管道)的作者提供了一組內置參數和 macros(宏)。Airflow 還為 pipline(管道)作者提供了自定義參數,macros(宏)和 templates(模板)的能力。 本教程幾乎沒有涉及在 Airflow 中使用模板進行操作的工作領域,但本節的目的是讓您知道此功能的存在,讓您熟悉`{{ }}`雙花括號的用途,并指出最常見的模板變量: `{{ ds }}` (今天的“日期戳”)。 ```py templated_command = """ { % f or i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7) }}" echo "{{ params.my_param }}" { % e ndfor %} """ t3 = BashOperator( task_id='templated', bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, dag=dag) ``` 請注意,`templated_command`包含`{% %}`塊中的代碼邏輯,引用參數如`{{ ds }}`,調用函數方式如`{{ macros.ds_add(ds, 7)}}`,引用用戶定義的參數如`{{ params.my_param }}`。 在`BaseOperator`中的`params`hook 允許您將參數或對象的字典傳遞給您的模板。請花一些時間去了解`my_param`這個參數是如何在模板中被使用的。 文件也可以當做`bash_command`的參數進行傳遞,例如`bash_command='templated_command.sh'`,不過這個文件的位置要在 pipeline(管道)文件的目錄內(在本例中為`tutorial.py`)。這可能是出于多種原因,比如將腳本的邏輯和 pipeline 代碼分隔開,允許在使用不同語言編寫的文件中進行正確的代碼突出顯示,以及靈活地構建 pipeline(管道)。還可以定義您的`template_searchpath`,以指向 DAG 構造函數調用中的任何文件夾位置。 使用同樣的 DAG 構造函數調用,可以使用`user_defined_macros`來定義您自己的變量。例如,將`dict(foo='bar')`傳遞給此參數允許您在模板中使用`{{ foo }}` 。此外,允許您指定`user_defined_filters`來注冊自己的過濾器。例如,將`dict(hello=lambda name: 'Hello %s' % name)`傳遞給此參數可以允許您在你的模板中使用`{{ 'world' | hello }}`。有關自定義過濾器的更多信息,請查看[Jinja 文檔](http://jinja.pocoo.org/docs/dev/api/#writing-filters) 有關可以在模板中引用的變量和宏的更多信息,請務必閱讀[宏](zh/code.md)部分 ## 設置依賴關系 我們有三個不相互依賴任務,分別是`t1`,`t2`,`t3`。以下是一些可以定義它們之間依賴關系的方法: ```py t1.set_downstream(t2) # 這意味著 t2 會在 t1 成功執行之后才會執行 # 與下面這種寫法相等 t2.set_upstream(t1) # 位移運算符也可用于鏈式運算 # 用于鏈式關系 和上面達到一樣的效果 t1 >> t2 # 位移運算符用于上游關系中 t2 << t1 # 使用位移運算符能夠鏈接 # 多個依賴關系變得簡潔 t1 >> t2 >> t3 # 任務列表也可以設置為依賴項。 # 下面的這些操作都具有相同的效果: t1.set_downstream([t2, t3]) t1 >> [t2, t3] [t2, t3] << t1 ``` 請注意,在執行腳本時,在 DAG 中如果存在循環或多次引用依賴項時,Airflow 會引發異常。 ## 回顧 到此,我們有了一個非常基本的 DAG。此時,您的代碼應如下所示: ```py """ Airflow 教程代碼位于: https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py """ from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 6, 1), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } dag = DAG( 'tutorial', default_args=default_args, schedule_interval=timedelta(days=1)) # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) templated_command = """ { % f or i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" { % e ndfor %} """ t3 = BashOperator( task_id='templated', bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, dag=dag) t2.set_upstream(t1) t3.set_upstream(t1) ``` ## 測試 ### 運行腳本 是時候進行一些測試了。首先讓我們確保 pipeline(管道)能夠被解析。讓我們保證已經將前面的幾個步驟的代碼保存在`tutorial.py`文件中,并將文件放置在`airflow.cfg`設置的 DAGs 文件夾中。DAGs 的默認位置是`~/airflow/dags`。 ```bash python ~/airflow/dags/tutorial.py ``` 如果這個腳本沒有報錯,那就證明您的代碼和您的 Airflow 環境沒有特別大的問題。 ### 命令行元數據驗證 讓我們運行一些命令來進一步驗證這個腳本。 ```bash # 打印出所有正在活躍狀態的 DAGs airflow list_dags # 打印出 'tutorial' DAG 中所有的任務 airflow list_tasks tutorial # 打印出 'tutorial' DAG 的任務層次結構 airflow list_tasks tutorial --tree ``` ### 測試實例 讓我們通過在特定日期運行實際任務實例來進行測試。通過`execution_date`這個上下文指定日期,它會模擬 scheduler 在特定的 日期 + 時間 運行您的任務或者 dag: ```bash # 命令樣式: command subcommand dag_id task_id date # 測試 print_date airflow test tutorial print_date 2015-06-01 # 測試 sleep airflow test tutorial sleep 2015-06-01 ``` 現在還記得我們早些時候利用模板都做了什么?讓我們通過執行這個命令看看模板會被渲染成什么樣子: ```bash # 測試模版渲染 airflow test tutorial templated 2015-06-01 ``` 用過運行 bash 命令,應該會顯示詳細的事件日志并打印結果。 請注意,`airflow test`命令在本地運行任務實例時,會將其日志輸出到 stdout(在屏幕上),不會受依賴項影響,并且不向數據庫傳達狀態(運行,成功,失敗,...)。它只允許測試單個任務實例。 ### Backfill(回填) 一切看起來都運行良好,所以此時讓我們運行 backfill(回填)。`backfill`將尊重您的依賴關系,將日志發送到文件并與數據庫通信以記錄狀態。如果您啟動了一個 web 服務,您可以跟蹤它的進度。`airflow webserver`將啟動 Web 服務器,如果您有興趣在 backfill(回填)過程中直觀地跟蹤進度。 請注意,如果使用`depends_on_past=True`,則單個任務實例的執行將取決于前面任務實例是否成功,除了以 start_date 作為開始時間的實例(即第一個運行的 DAG 實例),他的依賴性會被忽略。 此上下文中的日期范圍是`start_date`和可選的`end_date`,它們用于使用此 dag 中的任務實例填充運行計劃。 ```bash # 可選,在后臺以 debug 模式運行 web 服務器 # airflow webserver --debug & # 在時間范圍內回填執行任務 airflow backfill tutorial -s 2015-06-01 -e 2015-06-07 ``` ## 接下來做什么 就如上面這樣,您已經編寫,測試并 backfill(回填)了您的第一個 Airflow 的 pipeline(管道)。將您的代碼合并到一個有 scheduler(調度管理器)的代碼庫中,這樣可以啟動任務并在每天執行它。 以下是您可能想要做的一些事情: * 深入了解用戶界面 - 點擊所有內容! * 繼續閱讀文檔! 特別是以下部分: * 命令行界面 * Operators(運營商) * Macros(宏) * 寫下你的第一個 pipline(管道)!
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看