# 使用操作器
操作器代表一個理想情況下是冪等的任務。 操作員確定DAG運行時實際執行的內容。
有關更多信息,請參閱[Operators Concepts](https://apachecn.github.io/airflow-doc-zh/concepts.html)文檔和[Operators API Reference](https://apachecn.github.io/airflow-doc-zh/code.html) 。
* [BashOperator](9)
* [模板](9)
* [故障排除](9)
* [找不到Jinja模板](9)
* [PythonOperator](9)
* [傳遞參數](9)
* [模板](9)
* [Google云端平臺運營商](9)
* [GoogleCloudStorageToBigQueryOperator](9)
## [BashOperator](9)
使用[`BashOperator`](https://apachecn.github.io/airflow-doc-zh/code.html "airflow.operators.bash_operator.BashOperator")在[Bash](https://www.gnu.org/software/bash/) shell中執行命令。
```
run_this = BashOperator (
task_id = 'run_after_loop' , bash_command = 'echo 1' , dag = dag )
```
### [模板](9)
您可以使用[Jinja模板](https://apachecn.github.io/airflow-doc-zh/concepts.html)來參數化`bash_command`參數。
```
task = BashOperator (
task_id = 'also_run_this' ,
bash_command = 'echo "run_id={{ run_id }} | dag_run={{ dag_run }}"' ,
dag = dag )
```
### [故障排除](9)
#### [找不到Jinja模板](9)
在使用`bash_command`參數直接調用Bash腳本時,在腳本名稱后添加空格。 這是因為Airflow嘗試將Jinja模板應用于它,這將失敗。
```
t2 = BashOperator (
task_id = 'bash_example' ,
# This fails with `Jinja template not found` error
# bash_command="/home/batcher/test.sh",
# This works (has a space after)
bash_command = "/home/batcher/test.sh " ,
dag = dag )
```
## [PythonOperator](9)
使用[`PythonOperator`](https://apachecn.github.io/airflow-doc-zh/code.html "airflow.operators.python_operator.PythonOperator")執行Python callables。
```
def print_context ( ds , ** kwargs ):
pprint ( kwargs )
print ( ds )
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator (
task_id = 'print_the_context' ,
provide_context = True ,
python_callable = print_context ,
dag = dag )
```
### [傳遞參數](9)
使用`op_args`和`op_kwargs`參數將其他參數傳遞給Python可調用對象。
```
def my_sleeping_function ( random_base ):
"""This is a function that will run within the DAG execution"""
time . sleep ( random_base )
# Generate 10 sleeping tasks, sleeping from 0 to 4 seconds respectively
for i in range ( 5 ):
task = PythonOperator (
task_id = 'sleep_for_' + str ( i ),
python_callable = my_sleeping_function ,
op_kwargs = { 'random_base' : float ( i ) / 10 },
dag = dag )
task . set_upstream ( run_this )
```
### [模板](9)
當您將`provide_context`參數設置為`True` ,Airflow會傳入一組額外的關鍵字參數:一個用于每個[Jinja模板變量](https://apachecn.github.io/airflow-doc-zh/code.html)和一個`templates_dict`參數。
`templates_dict`參數是模板化的,因此字典中的每個值都被評估為[Jinja模板](https://apachecn.github.io/airflow-doc-zh/concepts.html) 。
## [Google云端平臺運營商](9)
### [GoogleCloudStorageToBigQueryOperator](9)
使用[`GoogleCloudStorageToBigQueryOperator`](https://apachecn.github.io/airflow-doc-zh/integration.html "airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator")執行BigQuery加載作業。
```
load_csv = gcs_to_bq . GoogleCloudStorageToBigQueryOperator (
task_id = 'gcs_to_bq_example' ,
bucket = 'cloud-samples-data' ,
source_objects = [ 'bigquery/us-states/us-states.csv' ],
destination_project_dataset_table = 'airflow_test.gcs_to_bq_table' ,
schema_fields = [
{ 'name' : 'name' , 'type' : 'STRING' , 'mode' : 'NULLABLE' },
{ 'name' : 'post_abbr' , 'type' : 'STRING' , 'mode' : 'NULLABLE' },
],
write_disposition = 'WRITE_TRUNCATE' ,
dag = dag )
```