<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                # 概念 > 貢獻者:[@ImPerat0R\_](https://github.com/tssujt) [@zhongjiajie](https://github.com/zhongjiajie) Airflow Platform 是用于描述,執行和監控工作流的工具。 ## 核心理念 ### DAGs 在 Airflow 中,`DAG`(或者叫有向無環圖)是您要運行的所有任務的集合,以反映其關系和依賴關系的方式進行組織。 例如,一個簡單的 DAG 可以包含三個任務:A,B 和 C.可以說 A 必須在 B 可以運行之前成功運行,但 C 可以隨時運行。它可以說任務 A 在 5 分鐘后超時,并且 B 可以重新啟動最多 5 次以防它失敗。它也可能會說工作流程將在每天晚上 10 點運行,但不應該在某個特定日期之前開始。 通過這種方式,DAG 描述了您希望如何執行工作流程; 但請注意,我們還沒有說過我們真正想做的事情!A,B 和 C 可以是任何東西。當 C 發送電子郵件時,也許 A 準備 B 進行分析的數據。或者也許 A 監控你的位置,這樣 B 可以打開你的車庫門,而 C 打開你的房子燈。 重要的是,DAG 并不關心其組成任務的作用;它的工作是確保無論他們做什么在正確的時間,或正確的順序,或正確處理任何意外的問題。 DAG 是在標準 Python 文件中定義的,這些文件放在 Airflow 的`DAG_FOLDER`中。Airflow 將執行每個文件中的代碼以動態構建`DAG`對象。 您可以擁有任意數量的 DAG,每個 DAG 都可以描述任意數量的任務。通常,每個應該對應于單個邏輯工作流。 > 注意 > > 搜索 DAG 時,Airflow 將僅考慮字符串“airflow”和“DAG”都出現在`.py`文件內容中的文件。 #### 范圍 Airflow 將加載它可以從`DAG`文件導入的任何`DAG`對象。重要的是,這意味著 DAG 必須出現在`globals()`。考慮以下兩個 DAG。只會加載`dag_1` ; 另一個只出現在本地范圍內。 ```py dag_1 = DAG('this_dag_will_be_discovered') def my_function(): dag_2 = DAG('but_this_dag_will_not') my_function() ``` 有時這可以很好地利用。例如,`SubDagOperator`的常見模式是定義函數內的子標記,以便 Airflow 不會嘗試將其作為獨立的 DAG 加載。 #### 默認參數 如果將`default_args`字典傳遞給 DAG,它將把它們應用于任何運算符。這使得很容易將公共參數應用于許多運算符而無需多次鍵入。 ```py default_args = { 'start_date': datetime(2016, 1, 1), 'owner': 'Airflow' } dag = DAG('my_dag', default_args=default_args) op = DummyOperator(task_id='dummy', dag=dag) print(op.owner) # Airflow ``` #### 上下文管理器 _ 在 Airflow 1.8 中添加 _ DAG 可用作上下文管理器,以自動將新 Operator 分配給該 DAG。 ```py with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag: op = DummyOperator('op') op.dag is dag # True ``` ### 運營商 雖然 DAG 描述了如何運行工作流,但`Operators`確定實際完成的工作。 Operator 描述工作流中的單個任務。Operator 通常(但并非總是)是原子的,這意味著他們可以獨立運營,而不需要與任何其他 Operator 共享資源。DAG 將確保 Operator 以正確的順序運行; 除了這些依賴項之外, Operator 通常獨立運行。實際上,它們可以在兩臺完全不同的機器上運行。 這是一個微妙但非常重要的一點:通常,如果兩個 Operator 需要共享信息,如文件名或少量數據,您應該考慮將它們組合到一個 Operator 中。如果絕對無法避免,Airflow 確實具有操作員交叉通信的功能,稱為 XCom,本文檔的其他部分對此進行了描述。 Airflow 為許多常見任務提供 Operator ,包括: * `BashOperator` - 執行 bash 命令 * `PythonOperator` - 調用任意 Python 函數 * `EmailOperator` - 發送電子郵件 * `SimpleHttpOperator` - 發送 HTTP 請求 * `MySqlOperator`,`SqliteOperator`,`PostgresOperator`,`MsSqlOperator`,`OracleOperator`,`JdbcOperator`等 - 執行 SQL 命令 * `Sensor` - 等待一定時間,文件,數據庫行,S3 鍵等... 除了這些基本構建塊之外,還有許多特定的 Operator : `DockerOperator`,`HiveOperator`,`S3FileTransformOperator`,`PrestoToMysqlOperator`,`SlackOperator`......你會明白的! `airflow/contrib/`目錄包含更多由社區構建的 Operator 。這些運算符并不總是像主發行版中那樣完整或經過良好測試,但允許用戶更輕松地向平臺添加新功能。 如果將 Operator 分配給 DAG,則 Operator 僅由 Airflow 加載。 請參閱[使用Operators](zh/howto/operator.md)了解如何使用 Airflow Operator 。 #### DAG 分配 _ 在 Airflow 1.8 中添加 _ Operator 不必立即分配給 DAG(之前的`dag`是必需參數)。但是,一旦將 Operator 分配給 DAG,就無法轉移或取消分配。在創建運算符時,通過延遲賦值或甚至從其他運算符推斷,可以顯式地完成 DAG 分配。 ```py dag = DAG('my_dag', start_date=datetime(2016, 1, 1)) # sets the DAG explicitly explicit_op = DummyOperator(task_id='op1', dag=dag) # deferred DAG assignment deferred_op = DummyOperator(task_id='op2') deferred_op.dag = dag # inferred DAG assignment (linked operators must be in the same DAG) inferred_op = DummyOperator(task_id='op3') inferred_op.set_upstream(deferred_op) ``` #### 位運算符 _ 在 Airflow 1.8 中添加 _ 傳統上,使用`set_upstream()`和`set_downstream()`方法設置運算符關系。在 Airflow 1.8 中,這可以通過 Python 位運算符`>>`和`<<`來完成。以下四個語句在功能上都是等效的: ```py op1 >> op2 op1.set_downstream(op2) op2 << op1 op2.set_upstream(op1) ``` 當使用位運算符時,關系設置在位運算符指向的方向上。例如,`op1 >> op2`表示`op1`先運行,`op2`第二運行。可以組成多個運算符 - 請記住,鏈從左到右執行,并且始終返回最右邊的對象。 例如: ```py op1 >> op2 >> op3 << op4 ``` 相當于: ```py op1.set_downstream(op2) op2.set_downstream(op3) op3.set_upstream(op4) ``` 為方便起見,位運算符也可以與 DAG 一起使用。 例如: ```py dag >> op1 >> op2 ``` 相當于: ```py op1.dag = dag op1.set_downstream(op2) ``` 我們可以把這一切放在一起構建一個簡單的管道: ```py with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag: ( DummyOperator(task_id='dummy_1') >> BashOperator( task_id='bash_1', bash_command='echo "HELLO!"') >> PythonOperator( task_id='python_1', python_callable=lambda: print("GOODBYE!")) ) ``` ### 任務 一旦 Operator 被實例化,它就被稱為“任務”。實例化在調用抽象 Operator 時定義特定值,參數化任務成為 DAG 中的節點。 ### 任務實例 任務實例表示任務的特定運行,其特征在于 dag,任務和時間點的組合。 任務實例也有一個指示狀態,可以是“運行”,“成功”,“失敗”,“跳過”,“重試”等。 ### 工作流程 您現在熟悉 Airflow 的核心構建模塊。 有些概念可能聽起來非常相似,但詞匯表可以概念化如下: * DAG:描述工作應該發生的順序 * Operator:作為執行某些工作的模板的類 * 任務: Operator 參數化實例 * 任務實例:1)已分配給 DAG 的任務,2)具有與 DAG 的特定運行相關聯的狀態 通過組合`DAGs`和`Operators`來創建`TaskInstances`,您可以構建復雜的工作流。 ## 附加功能 除了核心 Airflow 對象之外,還有許多更復雜的功能可以實現限制同時訪問資源,交叉通信,條件執行等行為。 ### 鉤 鉤子是外部平臺和數據庫的接口,如 Hive,S3,MySQL,Postgres,HDFS 和 Pig。Hooks 盡可能實現通用接口,并充當 Operator 的構建塊。他們還使用`airflow.models.Connection`模型來檢索主機名和身份驗證信息。鉤子將身份驗證代碼和信息保存在管道之外,集中在元數據數據庫中。 鉤子在 Python 腳本,Airflow airflow.operators.PythonOperator 以及 iPython 或 Jupyter Notebook 等交互式環境中使用它們也非常有用。 ### 池 當有太多進程同時運行時,某些系統可能會被淹沒。Airflow 池可用于**限制**任意任務集上**的執行并行性** 。通過為池命名并為其分配多個工作槽來在 UI( `Menu -> Admin -> Pools` )中管理池列表。然后,在創建任務時(即,實例化運算符),可以使用`pool`參數將任務與其中一個現有池相關聯。 ```py aggregate_db_message_job = BashOperator( task_id='aggregate_db_message_job', execution_timeout=timedelta(hours=3), pool='ep_data_pipeline_db_msg_agg', bash_command=aggregate_db_message_job_cmd, dag=dag) aggregate_db_message_job.set_upstream(wait_for_empty_queue) ``` `pool`參數可以與`priority_weight`結合使用,以定義隊列中的優先級,以及在池中打開的槽時首先執行哪些任務。默認的`priority_weight`是`1`,可以碰到任何數字。 在對隊列進行排序以評估接下來應該執行哪個任務時,我們使用`priority_weight`與來自此任務下游任務的所有`priority_weight`值相加。您可以使用它來執行特定的重要任務,并相應地優先處理該任務的整個路徑。 當插槽填滿時,任務將照常安排。達到容量后,可運行的任務將排隊,其狀態將在 UI 中顯示。當插槽空閑時,排隊的任務將根據`priority_weight` (任務及其后代)開始運行。 請注意,默認情況下,任務不會分配給任何池,并且它們的執行并行性僅限于執行程序的設置。 ### 連接 外部系統的連接信息存儲在 Airflow 元數據數據庫中,并在 UI 中進行管理(`Menu -> Admin -> Connections`)。在那里定義了`conn_id` ,并附加了主機名/登錄/密碼/結構信息。 Airflow 管道可以簡單地引用集中管理的`conn_id`而無需在任何地方硬編碼任何此類信息。 可以定義具有相同`conn_id`許多連接,并且在這種情況下,并且當**掛鉤**使用來自`BaseHook`的`get_connection`方法時,Airflow 將隨機選擇一個連接,允許在與重試一起使用時進行一些基本的負載平衡和容錯。 Airflow 還能夠通過操作系統中的環境變量引用連接。但它只支持 URI 格式。如果您需要為連接指定`extra`信息,請使用 Web UI。 如果在 Airflow 元數據數據庫和環境變量中都定義了具有相同`conn_id`連接,則 Airflow 將僅引用環境變量中的連接(例如,給定`conn_id` `postgres_master`,在開始搜索元數據數據庫之前,Airflow 將優先在環境變量中搜索`AIRFLOW_CONN_POSTGRES_MASTER`并直接引用它)。 許多鉤子都有一個默認的`conn_id`,使用該掛鉤的 Operator 不需要提供顯式連接 ID。 例如,[`PostgresHook`](zh/code.md)的默認`conn_id`是`postgres_default` 。 請參閱[管理連接](zh/howto/manage-connections.md)以了解如何創建和管理連接。 ### 隊列 使用 CeleryExecutor 時,可以指定發送任務到 Celery 隊列。`queue`是 BaseOperator 的一個屬性,因此任何任務都可以分配給任何隊列。 環境的默認隊列配置在`airflow.cfg`的`celery -> default_queue`。這定義了未指定任務時分配給的隊列,以及 Airflow 工作程序在啟動時偵聽的隊列。 Workers 可以收聽一個或多個任務隊列。當工作程序啟動時(使用命令`airflow worker`),可以指定一組逗號分隔的隊列名稱(例如, `airflow worker -q spark`)。然后,該 worker 將僅接收連接到指定隊列的任務。 如果您需要特定的 workers,從資源角度來看(例如,一個工作人員可以毫無問題地執行數千個任務),或者從環境角度(您希望工作人員從 Spark 群集中運行),這可能非常有用本身,因為它需要一個非常具體的環境和安全的權利)。 ### XComs XComs 允許任務交換消息,允許更細微的控制形式和共享狀態。該名稱是“交叉通信”的縮寫。XComs 主要由鍵,值和時間戳定義,但也跟蹤創建 XCom 的任務/DAG 以及何時應該可見的屬性。任何可以被 pickle 的對象都可以用作 XCom 值,因此用戶應該確保使用適當大小的對象。 可以“推”(發送)或“拉”(接收)XComs。當任務推送 XCom 時,它通常可用于其他任務。任務可以通過調用`xcom_push()`方法隨時推送 XComs。 此外,如果任務返回一個值(來自其 Operator 的`execute()`方法,或者來自 PythonOperator 的`python_callable`函數),則會自動推送包含該值的 XCom。 任務調用`xcom_pull()`來檢索 XComs,可選地根據`key`,source `task_ids`和 source `dag_id`等條件應用過濾器。默認情況下, `xcom_pull()`過濾掉從執行函數返回時被自動賦予 XCom 的鍵(與手動推送的 XCom 相反)。 如果為`task_ids`傳遞`xcom_pull`單個字符串,則返回該任務的最新 XCom 值; 如果傳遞了 task_ids 列表,則返回相應的 XCom 值列表。 ```py # inside a PythonOperator called 'pushing_task' def push_function(): return value # inside another PythonOperator where provide_context=True def pull_function(**context): value = context['task_instance'].xcom_pull(task_ids='pushing_task') ``` 也可以直接在模板中提取 XCom,這是一個示例: ```py SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }} ``` 請注意,XCom 與[變量](zh/concepts.md)類似,但專門用于任務間通信而非全局設置。 ### 變量 變量是將任意內容或設置存儲和檢索為 Airflow 中的簡單鍵值存儲的通用方法。可以從 UI(`Admin -> Variables`),代碼或 CLI 列出,創建,更新和刪除變量。此外,json 設置文件可以通過 UI 批量上傳。雖然管道代碼定義和大多數常量和變量應該在代碼中定義并存儲在源代碼控制中,但是通過 UI 可以訪問和修改某些變量或配置項會很有用。 ```py from airflow.models import Variable foo = Variable.get("foo") bar = Variable.get("bar", deserialize_json=True) ``` 第二個調用假設`json`內容,并將反序列化為`bar`。請注意, `Variable`是 sqlalchemy 模型,可以這樣使用。 您可以使用 jinja 模板中的變量,其語法如下: ```py echo {{ var.value.<variable_name> }} ``` 或者如果需要從變量反序列化 json 對象: ```py echo {{ var.json.<variable_name> }} ``` ### 分枝 有時您需要一個工作流來分支,或者只根據任意條件走下某條路徑,這通常與上游任務中發生的事情有關。 一種方法是使用`BranchPythonOperator` 。 `BranchPythonOperator`與 PythonOperator 非常相似,只是它需要一個返回 task_id 的 python_callable。返回返回的 task_id,并跳過所有其他路徑。Python 函數返回的 task_id 必須直接引用 BranchPythonOperator 任務下游的任務。 請注意,使用`depends_on_past=True`下游的任務在邏輯上是不合理的,因為`skipped`狀態將總是導致依賴于過去成功的塊任務。 `skipped`狀態在所有直接上游任務被`skipped`地方傳播。 如果你想跳過一些任務,請記住你不能有一個空路徑,如果是這樣,那就做一個虛擬任務。 像這樣,跳過虛擬任務“branch_false” ![https://airflow.apache.org/_images/branch_good.png](https://box.kancloud.cn/d834daec97e445000e7d9e5058c84bf4_573x130.jpg) 不喜歡這樣,跳過連接任務 ![https://airflow.apache.org/_images/branch_bad.png](https://box.kancloud.cn/3a8500b9b995b624440e0a9298566145_545x101.jpg) ### SubDAGs SubDAG 非常適合重復模式。在使用 Airflow 時,定義一個返回 DAG 對象的函數是一個很好的設計模式。 Airbnb 在加載數據時使用階段檢查交換模式。數據在臨時表中暫存,然后對該表執行數據質量檢查。 一旦檢查全部通過,分區就會移動到生產表中。 再舉一個例子,考慮以下 DAG: ![https://airflow.apache.org/_images/subdag_before.png](https://box.kancloud.cn/ff08647dba87b73d409e1a5b05326243_490x243.jpg) 我們可以將所有并行`task-*`運算符組合到一個 SubDAG 中,以便生成的 DAG 類似于以下內容: ![https://airflow.apache.org/_images/subdag_after.png](https://box.kancloud.cn/1513a94effe885d258965d133574b060_550x63.jpg) 請注意,SubDAG 運算符應包含返回 DAG 對象的工廠方法。 這將阻止 SubDAG 在主 UI 中被視為單獨的 DAG。 例如: ```py #dags/subdag.py from airflow.models import DAG from airflow.operators.dummy_operator import DummyOperator # Dag is returned by a factory method def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval): dag = DAG( '%s.%s' % (parent_dag_name, child_dag_name), schedule_interval=schedule_interval, start_date=start_date, ) dummy_operator = DummyOperator( task_id='dummy_task', dag=dag, ) return dag ``` 然后可以在主 DAG 文件中引用此 SubDAG: ```py # main_dag.py from datetime import datetime, timedelta from airflow.models import DAG from airflow.operators.subdag_operator import SubDagOperator from dags.subdag import sub_dag PARENT_DAG_NAME = 'parent_dag' CHILD_DAG_NAME = 'child_dag' main_dag = DAG( dag_id=PARENT_DAG_NAME, schedule_interval=timedelta(hours=1), start_date=datetime(2016, 1, 1) ) sub_dag = SubDagOperator( subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date, main_dag.schedule_interval), task_id=CHILD_DAG_NAME, dag=main_dag, ) ``` 您可以從主 DAG 的圖形視圖放大 SubDagOperator,以顯示 SubDAG 中包含的任務: ![https://airflow.apache.org/_images/subdag_zoom.png](https://box.kancloud.cn/b3a26c0a0c05d648f7c9e31de260aa64_1000x550.jpg) 使用 SubDAG 時的一些其他提示: * 按照慣例,SubDAG 的`dag_id`應以其父級和點為前綴。 和在`parent.child` * 通過將參數傳遞給 SubDAG 運算符來共享主 DAG 和 SubDAG 之間的參數(如上所示) * SubDAG 必須有一個計劃并啟用。如果 SubDAG 的時間表設置為`None`或`@once` ,SubDAG 將成功完成而不做任何事情 * 清除 SubDagOperator 也會清除其中的任務狀態 * 在 SubDagOperator 上標記成功不會影響其中的任務狀態 * 避免在任務中使用`depends_on_past=True`,因為這可能會造成混淆 * 可以為 SubDAG 指定執行程序。 如果要在進程中運行 SubDAG 并有效地將其并行性限制為 1,則通常使用 SequentialExecutor。使用 LocalExecutor 可能會有問題,因為它可能會過度消耗您的 workers,在單個插槽中運行多個任務 有關演示,請參閱`airflow/example_dags` 。 ### SLAs 服務級別協議或者叫任務或 DAG 應該成功的時間,可以在任務級別設置為`timedelta`。如果此時一個或多個實例未成功,則會發送警報電子郵件,詳細說明錯過其 SLA 的任務列表。該事件也記錄在數據庫中,并在`Browse->Missed SLAs`下的 Web UI 中顯示,其中可以分析和記錄事件。 ### 觸發規則 雖然正常的工作流行為是在所有直接上游任務都成功時觸發任務,但 Airflow 允許更復雜的依賴項設置。 所有運算符都有一個`trigger_rule`參數,該參數定義生成的任務被觸發的規則。`trigger_rule`的默認值是`all_success`,可以定義為“當所有直接上游任務都成功時觸發此任務”。此處描述的所有其他規則都基于直接父任務,并且是在創建任務時可以傳遞給任何操作員的值: * `all_success` :(默認)所有父任務都成功了 * `all_failed` :所有父`all_failed`都處于`failed`或`upstream_failed`狀態 * `all_done` :所有父任務都完成了他們的執行 * `one_failed` :一旦至少一個父任務就會觸發,它不會等待所有父任務完成 * `one_success` :一旦至少一個父任務成功就觸發,它不會等待所有父母完成 * `dummy` :依賴項僅用于顯示,隨意觸發 請注意,這些可以與`depends_on_past`(boolean)結合使用,當設置為`True`,如果任務的先前計劃未成功,則不會觸發任務。 ### 只運行最新的 標準工作流行為涉及為特定日期/時間范圍運行一系列任務。但是,某些工作流執行的任務與運行時無關,但需要按計劃運行,就像標準的 cron 作業一樣。在這些情況下,暫停期間錯過的回填或運行作業會浪費 CPU 周期。 對于這種情況,您可以使用`LatestOnlyOperator`跳過在 DAG 的最近計劃運行期間未運行的任務。如果現在的時間不在其`execution_time`和下一個計劃的`execution_time`之間,則`LatestOnlyOperator`跳過所有直接下游任務及其自身。 必須意識到跳過的任務和觸發器規則之間的相互作用。跳過的任務將通過觸發器規則`all_success`和`all_failed`級聯,但不是`all_done` ,`one_failed` ,`one_success`和`dummy`。如果您希望將`LatestOnlyOperator`與不級聯跳過的觸發器規則一起使用,則需要確保`LatestOnlyOperator`**直接**位于您要跳過的任務的上游。 通過使用觸發器規則來混合應該在典型的日期/時間依賴模式下運行的任務和使用`LatestOnlyOperator`任務是可能的。 例如,考慮以下 dag: ```py #dags/latest_only_with_trigger.py import datetime as dt from airflow.models import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.utils.trigger_rule import TriggerRule dag = DAG( dag_id='latest_only_with_trigger', schedule_interval=dt.timedelta(hours=4), start_date=dt.datetime(2016, 9, 20), ) latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag) task1 = DummyOperator(task_id='task1', dag=dag) task1.set_upstream(latest_only) task2 = DummyOperator(task_id='task2', dag=dag) task3 = DummyOperator(task_id='task3', dag=dag) task3.set_upstream([task1, task2]) task4 = DummyOperator(task_id='task4', dag=dag, trigger_rule=TriggerRule.ALL_DONE) task4.set_upstream([task1, task2]) ``` 在這個 dag 的情況下,對于除最新運行之外的所有運行,`latest_only`任務將顯示為跳過。`task1`直接位于`latest_only`下游,并且除了最新的之外還將跳過所有運行。`task2`完全獨立于`latest_only`,將在所有計劃的時間段內運行。`task3`是`task1`和`task2`下游,由于默認的`trigger_rule`是`all_success`將從`all_success`接收級聯跳過。`task4`是`task1`和`task2`下游,但由于其`trigger_rule`設置為`all_done`因此一旦跳過`all_done`(有效的完成狀態)并且`task2`成功,它將立即觸發。 ![https://airflow.apache.org/_images/latest_only_with_trigger.png](https://box.kancloud.cn/3d4109b81c2b5fa963e6c4f33961616f_1378x526.jpg) ### 僵尸與不死 任務實例一直在死,通常是正常生命周期的一部分,但有時會出乎人到意料。 僵尸任務的特點是沒有心跳(由工作定期發出)和數據庫中的`running`狀態。當工作節點無法訪問數據庫,Airflow 進程在外部被終止或者節點重新啟動時,它們可能會發生。僵尸任務查殺是由調度程序的進程定期執行。 不死進程的特點是存在進程和匹配的心跳,但 Airflow 不知道此任務在數據庫中`running`。這種不匹配通常在數據庫狀態發生變化時發生,最有可能是通過刪除 UI 中“任務實例”視圖中的行。指示任務驗證其作為心跳例程的一部分的狀態,并在確定它們處于這種“不死”狀態時終止自身。 ### 集群策略 您的本地 Airflow 設置文件可以定義一個`policy`功能,該功能可以根據其他任務或 DAG 屬性改變任務屬性。它接收單個參數作為對任務對象的引用,并期望改變其屬性。 例如,此函數可以在使用特定運算符時應用特定隊列屬性,或強制執行任務超時策略,確保沒有任務運行超過 48 小時。 以下是`airflow_settings.py` : ```py def policy(task): if task.__class__.__name__ == 'HivePartitionSensor': task.queue = "sensor_queue" if task.timeout > timedelta(hours=48): task.timeout = timedelta(hours=48) ``` ### 文檔和注釋 可以在 Web 界面中顯示的 dag 和任務對象中添加文檔或注釋(dag 為“Graph View”,任務為“Task Details”)。如果定義了一組特殊任務屬性,它們將被呈現為豐富內容: | 屬性 | 渲染到 | | --- | --- | | doc | monospace | | doc_json | JSON | | doc_yaml | YAML | | doc_md | markdown | | doc_rst | reStructuredText | 請注意,對于 dags,doc_md 是解釋的唯一屬性。 如果您的任務是從配置文件動態構建的,則此功能特別有用,它允許您公開 Airflow 中相關任務的配置。 ```py """ ### My great DAG """ dag = DAG('my_dag', default_args=default_args) dag.doc_md = __doc__ t = BashOperator("foo", dag=dag) t.doc_md = """\ #Title" Here's a [url](www.airbnb.com) """ ``` 此內容將分別在“圖表視圖”和“任務詳細信息”頁面中呈現為 markdown。 ### Jinja 模板 Airflow 充分利用了[Jinja Templating](http://jinja.pocoo.org/docs/dev/)的強大功能,這可以成為與宏結合使用的強大工具(參見[宏](zh/code.md)部分)。 例如,假設您希望使用`BashOperator`將執行日期作為環境變量傳遞給 Bash 腳本。 ```py # The execution date as YYYY-MM-DD date = "{{ ds }}" t = BashOperator( task_id='test_env', bash_command='/tmp/test.sh ', dag=dag, env={'EXECUTION_DATE': date}) ``` 這里, `{{ ds }}`是一個宏,并且由于`BashOperator`的`env`參數是使用 Jinja 模板化的,因此執行日期將作為 Bash 腳本中名為`EXECUTION_DATE`的環境變量提供。 您可以將 Jinja 模板與文檔中標記為“模板化”的每個參數一起使用。模板替換發生在調用運算符的 pre_execute 函數之前。 ## 打包的 dags 雖然通常會在單個`.py`文件中指定 dags,但有時可能需要將 dag 及其依賴項組合在一起。 例如,您可能希望將多個 dag 組合在一起以將它們一起版本,或者您可能希望將它們一起管理,或者您可能需要一個額外的模塊,默認情況下在您運行 airflow 的系統上不可用。 為此,您可以創建一個 zip 文件,其中包含 zip 文件根目錄中的 dag,并在目錄中解壓縮額外的模塊。 例如,您可以創建一個如下所示的 zip 文件: ```py my_dag1.py my_dag2.py package1/__init__.py package1/functions.py ``` Airflow 將掃描 zip 文件并嘗試加載`my_dag1.py`和`my_dag2.py` 。 它不會進入子目錄,因為它們被認為是潛在的包。 如果您想將模塊依賴項添加到 DAG,您基本上也可以這樣做,但是更多的是使用 virtualenv 和 pip。 ```py virtualenv zip_dag source zip_dag/bin/activate mkdir zip_dag_contents cd zip_dag_contents pip install --install-option = "--install-lib= $PWD " my_useful_package cp ~/my_dag.py . zip -r zip_dag.zip * ``` > 注意 > > zip 文件將插入模塊搜索列表(sys.path)的開頭,因此它將可用于駐留在同一解釋器中的任何其他代碼。 > 注意 > > 打包的 dags 不能與打開 pickling 時一起使用。 > 注意 > > 打包的 dag 不能包含動態庫(例如 libz.so),如果模塊需要這些庫,則需要在系統上使用這些庫。換句話說,只能打包純 python 模塊。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看