# 使用 Operators(執行器)
> 貢獻者:[@ImPerat0R\_](https://github.com/tssujt)、[@ThinkingChen](https://github.com/cdmikechen) [@zhongjiajie](https://github.com/zhongjiajie)
operator(執行器)代表一個理想情況下是冪等的任務。operator(執行器)決定了 DAG 運行時實際執行的內容。
有關更多信息,請參閱[Operators Concepts](zh/concepts.md)文檔和[Operators API Reference](zh/code.md) 。
* [BashOperator](zh/howto/operator.md)
* [模板](zh/howto/operator.md)
* [故障排除](zh/howto/operator.md)
* [找不到 Jinja 模板](zh/howto/operator.md)
* [PythonOperator](zh/howto/operator.md)
* [傳遞參數](zh/howto/operator.md)
* [模板](zh/howto/operator.md)
* [Google Cloud Platform Operators](zh/howto/operator.md)
* [GoogleCloudStorageToBigQueryOperator](zh/howto/operator.md)
* [GceInstanceStartOperator](zh/howto/operator.md)
* [GceInstanceStopOperator](zh/howto/operator.md)
* [GceSetMachineTypeOperator](zh/howto/operator.md)
* [GcfFunctionDeleteOperator](zh/howto/operator.md)
* [故障排除](zh/howto/operator.md)
* [GcfFunctionDeployOperator](zh/howto/operator.md)
* [故障排除](zh/howto/operator.md)
* [CloudSqlInstanceDatabaseCreateOperator](zh/howto/operator.md)
* [參數](zh/howto/operator.md)
* [使用執行器](zh/howto/operator.md)
* [模版](zh/howto/operator.md)
* [更多信息](zh/howto/operator.md)
* [CloudSqlInstanceDatabaseDeleteOperator](zh/howto/operator.md)
* [參數](zh/howto/operator.md)
* [使用執行器](zh/howto/operator.md)
* [模版](zh/howto/operator.md)
* [更多信息](zh/howto/operator.md)
* [CloudSqlInstanceDatabasePatchOperator](zh/howto/operator.md)
* [參數](zh/howto/operator.md)
* [使用執行器](zh/howto/operator.md)
* [模版](zh/howto/operator.md)
* [更多信息](zh/howto/operator.md)
* [CloudSqlInstanceDeleteOperator](zh/howto/operator.md)
* [參數](zh/howto/operator.md)
* [使用執行器](zh/howto/operator.md)
* [模版](zh/howto/operator.md)
* [更多信息](zh/howto/operator.md)
* [CloudSqlInstanceCreateOperator](zh/howto/operator.md)
* [參數](zh/howto/operator.md)
* [使用執行器](zh/howto/operator.md)
* [模版](zh/howto/operator.md)
* [更多信息](zh/howto/operator.md)
* [CloudSqlInstancePatchOperator](zh/howto/operator.md)
* [參數](zh/howto/operator.md)
* [使用執行器](zh/howto/operator.md)
* [模版](zh/howto/operator.md)
* [更多信息](zh/howto/operator.md)
## BashOperator
使用[`BashOperator`](zh/code.md)在[Bash](https://www.gnu.org/software/bash/) shell 中執行命令。
```py
run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
dag=dag)
```
### 模板
您可以使用[Jinja 模板](zh/concepts.md)來參數化`bash_command`參數。
```py
also_run_this = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
dag=dag,
)
```
### 故障排除
#### 找不到 Jinja 模板
在使用`bash_command`參數直接調用 Bash 腳本時,需要在腳本名稱后添加空格。這是因為 Airflow 嘗試將 Jinja 模板應用于一個失敗的腳本。
```py
t2 = BashOperator(
task_id='bash_example',
# 這將會出現`Jinja template not found`的錯誤
# bash_command="/home/batcher/test.sh",
# 在加了空格之后,這會正常工作
bash_command="/home/batcher/test.sh ",
dag=dag)
```
## PythonOperator
使用[`PythonOperator`](zh/code.md)執行 Python 回調。
```py
def print_context ( ds , ** kwargs ):
pprint ( kwargs )
print ( ds )
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator (
task_id = 'print_the_context' ,
provide_context = True ,
python_callable = print_context ,
dag = dag )
```
### 傳遞參數
使用`op_args`和`op_kwargs`參數將額外參數傳遞給 Python 的回調函數。
```py
def my_sleeping_function(random_base):
"""這是一個將在 DAG 執行體中運行的函數"""
time.sleep(random_base)
# Generate 10 sleeping tasks, sleeping from 0 to 4 seconds respectively
for i in range(5):
task = PythonOperator(
task_id='sleep_for_' + str(i),
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(i) / 10},
dag=dag,
)
run_this >> task
```
### 模板
當您將`provide_context`參數設置為`True`,Airflow 會傳入一組額外的關鍵字參數:一個用于每個[Jinja 模板變量](zh/code.md)和一個`templates_dict`參數。
`templates_dict`參數是模板化的,因此字典中的每個值都被評估為[Jinja 模板](zh/howto/operator.md)。
## Google 云平臺 Operators(執行器)
### GoogleCloudStorageToBigQueryOperator
使用[`GoogleCloudStorageToBigQueryOperator`](zh/integration.md)執行 BigQuery 加載作業。
### GceInstanceStartOperator
允許啟動一個已存在的 Google Compute Engine 實例。
在此示例中,參數值從 Airflow 變量中提取。此外,`default_args`字典用于將公共參數傳遞給單個 DAG 中的所有 operator(執行器)。
```py
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
INSTANCE = models.Variable.get('INSTANCE', '')
SHORT_MACHINE_TYPE_NAME = models.Variable.get('SHORT_MACHINE_TYPE_NAME', '')
SET_MACHINE_TYPE_BODY = {
'machineType': 'zones/{}/machineTypes/{}'.format(LOCATION, SHORT_MACHINE_TYPE_NAME)
}
default_args = {
'start_date': airflow.utils.dates.days_ago(1)
}
```
通過將所需的參數傳遞給構造函數來定義`GceInstanceStartOperator`。
```py
gce_instance_start = GceInstanceStartOperator(
project_id=PROJECT_ID,
zone=LOCATION,
resource_id=INSTANCE,
task_id='gcp_compute_start_task'
)
```
### GceInstanceStopOperator
允許停止一個已存在的 Google Compute Engine 實例。
參數定義請參閱上面的`GceInstanceStartOperator`。
通過將所需的參數傳遞給構造函數來定義`GceInstanceStopOperator`。
```py
gce_instance_stop = GceInstanceStopOperator(
project_id=PROJECT_ID,
zone=LOCATION,
resource_id=INSTANCE,
task_id='gcp_compute_stop_task'
)
```
### GceSetMachineTypeOperator
允許把一個已停止實例的機器類型改變至特定的類型。
參數定義請參閱上面的`GceInstanceStartOperator`。
通過將所需的參數傳遞給構造函數來定義`GceSetMachineTypeOperator`。
```py
gce_set_machine_type = GceSetMachineTypeOperator(
project_id=PROJECT_ID,
zone=LOCATION,
resource_id=INSTANCE,
body=SET_MACHINE_TYPE_BODY,
task_id='gcp_compute_set_machine_type'
)
```
### GcfFunctionDeleteOperator
使用`default_args`字典來傳遞參數給 operator(執行器)。
```py
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
# A fully-qualified name of the function to delete
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
ENTRYPOINT)
default_args = {
'start_date': airflow.utils.dates.days_ago(1)
}
```
使用`GcfFunctionDeleteOperator`來從 Google Cloud Functions 刪除一個函數。
```py
t1 = GcfFunctionDeleteOperator(
task_id="gcf_delete_task",
name=FUNCTION_NAME
)
```
#### 故障排除
如果你想要使用服務賬號來運行或部署一個 operator(執行器),但得到了一個 403 禁止的錯誤,這意味著你的服務賬號沒有正確的 Cloud IAM 權限。
1. 指定該服務賬號為 Cloud Functions Developer 角色。
2. 授權 Cloud Functions 的運行賬戶為 Cloud IAM Service Account User 角色。
使用 gcloud 分配 Cloud IAM 權限的典型方法如下所示。只需將您的 Google Cloud Platform 項目 ID 替換為 PROJECT_ID,將 SERVICE_ACCOUNT_EMAIL 替換為您的服務帳戶的電子郵件 ID 即可。
```py
gcloud iam service-accounts add-iam-policy-binding \
PROJECT_ID@appspot.gserviceaccount.com \
--member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
--role="roles/iam.serviceAccountUser"
```
細節請參閱[Adding the IAM service agent user role to the runtime service](https://cloud.google.com/functions/docs/reference/iam/roles#adding_the_iam_service_agent_user_role_to_the_runtime_service_account)
### GcfFunctionDeployOperator
使用`GcfFunctionDeployOperator`來從 Google Cloud Functions 部署一個函數。
以下 Airflow 變量示例顯示了您可以使用的 default_args 的各種變體和組合。變量定義如下:
```py
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
SOURCE_ARCHIVE_URL = models.Variable.get('SOURCE_ARCHIVE_URL', '')
SOURCE_UPLOAD_URL = models.Variable.get('SOURCE_UPLOAD_URL', '')
SOURCE_REPOSITORY = models.Variable.get('SOURCE_REPOSITORY', '')
ZIP_PATH = models.Variable.get('ZIP_PATH', '')
ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
ENTRYPOINT)
RUNTIME = 'nodejs6'
VALIDATE_BODY = models.Variable.get('VALIDATE_BODY', True)
```
使用這些變量,您可以定義請求的主體:
```py
body = {
"name": FUNCTION_NAME,
"entryPoint": ENTRYPOINT,
"runtime": RUNTIME,
"httpsTrigger": {}
}
```
創建 DAG 時,default_args 字典可用于傳遞正文和其他參數:
```py
default_args = {
'start_date': dates.days_ago(1),
'project_id': PROJECT_ID,
'location': LOCATION,
'body': body,
'validate_body': VALIDATE_BODY
}
```
請注意,在上面的示例中,body 和 default_args 都是不完整的。根據設置的變量,如何傳遞源代碼相關字段可能有不同的變體。目前,您可以傳遞 sourceArchiveUrl,sourceRepository 或 sourceUploadUrl,[CloudFunction API 規范](https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#CloudFunction)中所述。此外,default_args 可能包含 zip_path 參數,以在部署源代碼之前運行上載源代碼的額外步驟。在最后一種情況下,您還需要在正文中提供一個空的 sourceUploadUrl 參數。
基于上面定義的變量,此處顯示了設置源代碼相關字段的示例邏輯:
```py
if SOURCE_ARCHIVE_URL:
body['sourceArchiveUrl'] = SOURCE_ARCHIVE_URL
elif SOURCE_REPOSITORY:
body['sourceRepository'] = {
'url': SOURCE_REPOSITORY
}
elif ZIP_PATH:
body['sourceUploadUrl'] = ''
default_args['zip_path'] = ZIP_PATH
elif SOURCE_UPLOAD_URL:
body['sourceUploadUrl'] = SOURCE_UPLOAD_URL
else:
raise Exception("Please provide one of the source_code parameters")
```
創建 operator(執行器)的代碼如下:
```py
deploy_task = GcfFunctionDeployOperator(
task_id="gcf_deploy_task",
name=FUNCTION_NAME
)
```
#### Troubleshooting
如果你想要使用服務賬號來運行或部署一個 operator(執行器),但得到了一個 403 禁止的錯誤,這意味著你的服務賬號沒有正確的 Cloud IAM 權限。
1. 指定該服務賬號為 Cloud Functions Developer 角色。
2. 授權 Cloud Functions 的運行賬戶為 Cloud IAM Service Account User 角色。
使用 gcloud 分配 Cloud IAM 權限的典型方法如下所示。只需將您的 Google Cloud Platform 項目 ID 替換為 PROJECT_ID,將 SERVICE_ACCOUNT_EMAIL 的替換為您的服務帳戶的電子郵件 ID 即可。
```py
gcloud iam service-accounts add-iam-policy-binding \
PROJECT_ID@appspot.gserviceaccount.com \
--member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
--role="roles/iam.serviceAccountUser"
```
細節請參閱[Adding the IAM service agent user role to the runtime service](https://cloud.google.com/functions/docs/reference/iam/roles#adding_the_iam_service_agent_user_role_to_the_runtime_service_account)
如果您的函數的源代碼位于 Google Source Repository 中,請確保您的服務帳戶具有 Source Repository Viewer 角色,以便在必要時可以下載源代碼。
### CloudSqlInstanceDatabaseCreateOperator
在 Cloud SQL 實例中創建新數據庫。
有關參數定義,請參閱上面的`GceInstanceStartOperator`。
通過將所需的參數傳遞給構造函數來定義`CloudSqlInstanceDatabaseCreateOperator`。
#### 參數
示例 DAG 中的一些參數取自環境變量:
```py
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
```
#### 使用 operator(執行器)
```py
sql_db_create_task = CloudSqlInstanceDatabaseCreateOperator(
project_id=PROJECT_ID,
body=db_create_body,
instance=INSTANCE_NAME,
task_id='sql_db_create_task'
)
```
示例請求體:
```py
db_create_body = {
"instance": INSTANCE_NAME,
"name": DB_NAME,
"project": PROJECT_ID
}
```
#### 模版
```py
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
```
#### 更多信息
有關數據庫插入,請參閱[Google Cloud SQL API 文檔](https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert)。
### CloudSqlInstanceDatabaseDeleteOperator
在 Cloud SQL 實例中刪除數據庫。
有關參數定義,請參閱`CloudSqlInstanceDatabaseDeleteOperator`。
#### 參數
示例 DAG 中的一些參數取自環境變量:
```py
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
```
#### 使用 operator(執行器)
```py
sql_db_delete_task = CloudSqlInstanceDatabaseDeleteOperator(
project_id=PROJECT_ID,
instance=INSTANCE_NAME,
database=DB_NAME,
task_id='sql_db_delete_task'
)
```
#### 模版
```py
template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id',
'api_version')
```
#### 更多信息
有關數據庫刪除,請參閱[Google Cloud SQL API 文檔](https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/delete)。
### CloudSqlInstanceDatabasePatchOperator
使用修補程序語義更新包含有關 Cloud SQL 實例內數據庫的信息的資源。請參閱: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch
有關參數定義,請參閱`CloudSqlInstanceDatabasePatchOperator`。
#### 參數
示例 DAG 中的一些參數取自環境變量:
```py
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
```
#### 使用 operator(執行器)
```py
sql_db_patch_task = CloudSqlInstanceDatabasePatchOperator(
project_id=PROJECT_ID,
body=db_patch_body,
instance=INSTANCE_NAME,
database=DB_NAME,
task_id='sql_db_patch_task'
)
```
示例請求體:
```py
db_patch_body = {
"charset": "utf16",
"collation": "utf16_general_ci"
}
```
#### 模版
```py
template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id',
'api_version')
```
#### 更多信息
有關數據庫修改,請參閱[Google Cloud SQL API 文檔](https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/patch)。
### CloudSqlInstanceDeleteOperator
示例 DAG 中的一些參數取自環境變量:
```py
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
```
#### 使用 operator(執行器)
```py
sql_instance_delete_task = CloudSqlInstanceDeleteOperator(
project_id=PROJECT_ID,
instance=INSTANCE_NAME,
task_id='sql_instance_delete_task'
)
```
#### 模版
```py
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
```
#### 更多信息
有關刪除,請參閱[Google Cloud SQL API 文檔](https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/delete)。
### CloudSqlInstanceCreateOperator
在 Google Cloud Platform 中創建新的 Cloud SQL 實例。
有關參數定義,請參閱`CloudSqlInstanceCreateOperator`。
如果存在具有相同名稱的實例,則不會執行任何操作,并且 operator(執行器)將成功執行。
#### 參數
示例 DAG 中的一些參數取自環境變量:
```py
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
```
定義實例的示例:
```py
body = {
"name": INSTANCE_NAME,
"settings": {
"tier": "db-n1-standard-1",
"backupConfiguration": {
"binaryLogEnabled": True,
"enabled": True,
"startTime": "05:00"
},
"activationPolicy": "ALWAYS",
"dataDiskSizeGb": 30,
"dataDiskType": "PD_SSD",
"databaseFlags": [],
"ipConfiguration": {
"ipv4Enabled": True,
"requireSsl": True,
},
"locationPreference": {
"zone": "europe-west4-a"
},
"maintenanceWindow": {
"hour": 5,
"day": 7,
"updateTrack": "canary"
},
"pricingPlan": "PER_USE",
"replicationType": "ASYNCHRONOUS",
"storageAutoResize": False,
"storageAutoResizeLimit": 0,
"userLabels": {
"my-key": "my-value"
}
},
"databaseVersion": "MYSQL_5_7",
"region": "europe-west4",
}
```
#### 使用 operator(執行器)
```py
sql_instance_create_task = CloudSqlInstanceCreateOperator(
project_id=PROJECT_ID,
body=body,
instance=INSTANCE_NAME,
task_id='sql_instance_create_task'
)
```
#### 模版
```py
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
```
#### 更多信息
有關插入,請參閱[Google Cloud SQL API 文檔](https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/insert)。
### CloudSqlInstancePatchOperator
更新 Google Cloud Platform 中的 Cloud SQL 實例的設置(部分更新)。
有關參數定義,請參閱`CloudSqlInstancePatchOperator`。
這是部分更新,因此僅設置/更新正文中指定的設置的值。現有實例的其余部分將保持不變。
#### 參數
示例 DAG 中的一些參數取自環境變量:
```py
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
```
定義實例的示例:
```py
patch_body = {
"name": INSTANCE_NAME,
"settings": {
"dataDiskSizeGb": 35,
"maintenanceWindow": {
"hour": 3,
"day": 6,
"updateTrack": "canary"
},
"userLabels": {
"my-key-patch": "my-value-patch"
}
}
}
```
#### 使用 operator(執行器)
```py
sql_instance_patch_task = CloudSqlInstancePatchOperator(
project_id=PROJECT_ID,
body=patch_body,
instance=INSTANCE_NAME,
task_id='sql_instance_patch_task'
)
```
#### 模版
```py
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
```
#### 更多信息
有關部分更新,請參閱[Google Cloud SQL API 文檔](https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/patch)。