# 集成
> 貢獻者:[@morefreeze](https://github.com/morefreeze) [@zhongjiajie](https://github.com/zhongjiajie)
* [反向代理](#反向代理)
* [Azure:Microsoft Azure](#azuremicrosoft-azure)
* [AWS:亞馬遜網絡服務](#aws亞馬遜網絡服務)
* [Databricks](#databricks)
* [GCP:Google云端平臺](#gcpgoogle云端平臺)
## 反向代理
Airflow 可以通過設置反向代理,使其可以靈活設置訪問地址。
例如,您可以這樣配置反向代理:
```
https://lab.mycompany.com/myorg/airflow/
```
為此,您需要在`airflow.cfg中`設置:
```config
base_url = http://my_host/myorg/airflow
```
此外,如果您使用 Celery Executor,您可以配置`myorg/flower`的地址:
```config
flower_url_prefix = /myorg/flower
```
您的反向代理(例如:nginx)應配置如下:
* 傳遞 url 和 http 頭給 Airflow 服務器,不需要重寫,例如:
```
server {
listen 80;
server_name lab.mycompany.com;
location /myorg/airflow/ {
proxy_pass http://localhost:8080;
proxy_set_header Host $host;
proxy_redirect off;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
```
* 重寫 flower 的地址:
```py
server {
listen 80;
server_name lab.mycompany.com;
location /myorg/flower/ {
rewrite ^/myorg/flower/(.*)$ /$1 break; # remove prefix from http header
proxy_pass http://localhost:5555;
proxy_set_header Host $host;
proxy_redirect off;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
```
## Azure:Microsoft Azure
Airflow 對 Microsoft Azure 的支持有限:僅支持 Azure Blob Storage 和 Azure Data Lake 的接口。 對 Blob Storage 的 Hook, Sensor 和 Operator 以及 Azure Data Lake 的 Hook 都在 contrib 部分。
### Azure Blob Storage
所有類都通過 Window Azure Storage Blob 協議進行通信。 確保 Airflow 的 wasb 連接存在。 可以通過在額外字段中指定登錄用戶名(=Storage account name)和密碼(=KEY),或用 SAS 令牌來完成授權(參看 wasb_default 連接例子)。
* [WasbBlobSensor](#WasbBlobSensor) :檢查一個 blob 是否在 Azure Blob Storage 上。
* [WasbPrefixSensor](#WasbPrefixSensor) :檢查滿足前綴匹配的 blob 是否在 Azure Blob Storage 上。
* [FileToWasbOperator](#FileToWasbOperator) :將本地文件作為blob 上傳到容器。
* [WasbHook](#WasbHook) :與 Azure Blob Storage 的接口。
#### WasbBlobSensor
```py
class airflow.contrib.sensors.wasb_sensor.WasbBlobSensor(container_name, blob_name, wasb_conn_id='wasb_default', check_options=None, *args, **kwargs)
```
基類: `airflow.sensors.base_sensor_operator.BaseSensorOperator`
等待 blob 到達 Azure Blob Storage。
參數:
* `container_name(str)` - 容器的名稱。
* `blob_name(str)` - blob 的名稱。
* `wasb_conn_id(str)` - 對 wasb 連接的引用。
* `check_options(dict)` - 傳給`WasbHook.check_for_blob()`的可選關鍵字參數。
```py
poke(context)
```
Operator 在繼承此類時應該覆蓋以上函數。
#### WasbPrefixSensor
```py
class airflow.contrib.sensors.wasb_sensor.WasbPrefixSensor(container_name, prefix, wasb_conn_id='wasb_default', check_options=None, *args, **kwargs)
```
基類: `airflow.sensors.base_sensor_operator.BaseSensorOperator`
等待前綴匹配的 blob 到達 Azure Blob Storage。
參數:
* `container_name(str)` - 容器的名稱。
* `prefix(str)` - blob 的前綴。
* `wasb_conn_id(str)` - 對 wasb 連接的引用。
* `check_options(dict)` - 傳給`WasbHook.check_for_prefix()`的可選關鍵字參數。
```py
poke(context)
```
Operator 在繼承此類時應該覆蓋以上函數。
#### FileToWasbOperator
```py
class airflow.contrib.operators.file_to_wasb.FileToWasbOperator(file_path, container_name, blob_name, wasb_conn_id='wasb_default', load_options=None, *args, **kwargs)
```
基類: `airflow.models.BaseOperator`
將文件上傳到 Azure Blob Storage。
參數:
* `file_path(str)` - 要加載的文件的路徑。(模板渲染后)
* `container_name(str)` - 容器的名稱。(模板渲染后)
* `blob_name(str)` - blob 的名稱。(模板渲染后)
* `wasb_conn_id(str)` - 對 wasb 連接的引用。
* `load_options(dict)` - 傳給`WasbHook.load_file()`的可選關鍵字參數。
```py
execute(context)
```
將文件上傳到 Azure Blob Storage。
#### WasbHook
```py
class airflow.contrib.hooks.wasb_hook.WasbHook(wasb_conn_id='wasb_default')
```
基類: `airflow.hooks.base_hook.BaseHook`
通過 wasb:// 協議與 Azure Blob Storage進行交互。
在連接的"extra"字段中傳遞的其他參數將傳遞給`BlockBlockService()`構造函數。 例如,通過添加{"sas_token":"YOUR_TOKEN"}使用 SAS 令牌進行身份驗證。
參數:`wasb_conn_id(str)` - 對 wasb 連接的引用。
```py
check_for_blob(container_name, blob_name, **kwargs)
```
檢查 Azure Blob Storage 上是否存在 Blob。
參數:
* `container_name(str)` - 容器的名稱。
* `blob_name(str)` - blob 的名稱。
* `kwargs(object)` - 傳給`BlockBlobService.exists()`的可選關鍵字參數。
返回:如果 blob 存在則為 True,否則為 False。
返回類型:bool
```py
check_for_prefix(container_name, prefix, **kwargs)
```
檢查 Azure Blob Storage 上是否存在前綴匹配的 blob。
參數:
* `container_name(str)` - 容器的名稱。
* `prefix(str)` - blob 的前綴。
* `kwargs(object)` - 傳給`BlockBlobService.list_blobs()`的可選關鍵字參數。
返回:如果存在與前綴匹配的 blob,則為 True,否則為 False。
返回類型:bool
```py
get_conn()
```
返回 BlockBlobService 對象。
```py
get_file(file_path, container_name, blob_name, **kwargs)
```
從 Azure Blob Storage 下載文件。
參數:
* `file_path(str)` - 要下載的文件的路徑。
* `container_name(str)` - 容器的名稱。
* `blob_name(str)` - blob 的名稱。
* `kwargs(object)` - 傳給`BlockBlobService.create_blob_from_path()`的可選關鍵字參數。
```py
load_file(file_path, container_name, blob_name, **kwargs)
```
將文件上傳到 Azure Blob Storage。
參數:
* `file_path(str)` - 要加載的文件的路徑。
* `container_name(str)` - 容器的名稱。
* `blob_name(str)` - blob 的名稱。
* `kwargs(object)` - 傳給`BlockBlobService.create_blob_from_path()`的可選關鍵字參數。
```py
load_string(string_data, container_name, blob_name, **kwargs)
```
將字符串上傳到 Azure Blob Storage。
參數:
* `string_data(str)` - 要上傳的字符串。
* `container_name(str)` - 容器的名稱。
* `blob_name(str)` - blob 的名稱。
* `kwargs(object)` - 傳給`BlockBlobService.create_blob_from_text()`的可選關鍵字參數。
```py
read_file(container_name, blob_name, **kwargs)
```
從 Azure Blob Storage 讀取文件并以字符串形式返回。
參數:
* `container_name(str)` - 容器的名稱。
* `blob_name(str)` - blob 的名稱。
* `kwargs(object)` - 傳給`BlockBlobService.create_blob_from_path()`的可選關鍵字參數。
### Azure File Share
SMB 文件共享的云變體。 確保 Airflow 存在類型為`wasb`的連接。 可以通過在額外字段中提供登錄(=Storage 帳戶名稱)和密碼(=Storage 帳戶密鑰)或通過 SAS 令牌來完成授權(請參閱連接`wasb_default`示例)。
#### AzureFileShareHook
```py
class airflow.contrib.hooks.azure_fileshare_hook.AzureFileShareHook(wasb_conn_id='wasb_default')
```
基類: `airflow.hooks.base_hook.BaseHook`
與 Azure FileShare Storage 交互。
在連接的"extra"字段中傳遞的參數將傳遞給`FileService()`構造函數。
參數:`wasb_conn_id(str)` - 對 wasb 連接的引用。
```py
check_for_directory(share_name, directory_name, **kwargs)
```
檢查 Azure File Share 上是否存在目錄。
參數:
* `share_name(str)` - 共享的名稱。
* `directory_name(str)` - 目錄的名稱。
* `kwargs(object)` - 傳給`FileService.exists()`的可選關鍵字參數。
返回:如果文件存在則為 True,否則為 False。
返回類型:bool
```py
check_for_file(share_name, directory_name, file_name, **kwargs)
```
檢查 Azure File Share 上是否存在文件。
參數:
* `share_name(str)` - 共享的名稱。
* `directory_name(str)` - 目錄的名稱。
* `file_name(str)` - 文件名。
* `kwargs(object)` - 傳給`FileService.exists()`的可選關鍵字參數。
返回:如果文件存在則為 True,否則為 False。
返回類型:bool
```py
create_directory(share_name, directory_name, **kwargs)
```
在 Azure File Share 上創建目錄。
參數:
* `share_name(str)` - 共享的名稱。
* `directory_name(str)` - 目錄的名稱。
* `kwargs(object)` - 傳給`FileService.create_directory()`的可選關鍵字參數。
返回:文件和目錄列表
返回類型:list
```py
get_conn()
```
返回 FileService 對象。
```py
get_file(file_path, share_name, directory_name, file_name, **kwargs)
```
從 Azure File Share 下載文件。
參數:
* `file_path(str)` - 存儲文件的位置。
* `share_name(str)` - 共享的名稱。
* `directory_name(str)` - 目錄的名稱。
* `file_name(str)` - 文件名。
* `kwargs(object)` - 傳給`FileService.get_file_to_path()`的可選關鍵字參數。
```py
get_file_to_stream(stream, share_name, directory_name, file_name, **kwargs)
```
以流的形式從 Azure File Share 下載文件。
參數:
* `stream(類文件對象)` - 用于存儲文件的文件句柄。
* `share_name(str)` - 共享的名稱。
* `directory_name(str)` - 目錄的名稱。
* `file_name(str)` - 文件名。
* `kwargs(object)` - 傳給`FileService.get_file_to_stream()`的可選關鍵字參數。
```py
list_directories_and_files(share_name, directory_name=None, **kwargs)
```
返回存儲在 Azure File Share 中的目錄和文件列表。
參數:
* `share_name(str)` - 共享的名稱。
* `directory_name(str)` - 目錄的名稱。
* `kwargs(object)` - 傳給`FileService.list_directories_and_files()`的可選關鍵字參數。
返回:文件和目錄列表
返回類型:list
```py
load_file(file_path, share_name, directory_name, file_name, **kwargs)
```
將文件上傳到 Azure File Share。
參數:
* `file_path(str)` - 要上傳的文件路徑。
* `share_name(str)` - 共享的名稱。
* `directory_name(str)` - 目錄的名稱。
* `file_name(str)` - 文件名。
* `kwargs(object)` - 傳給`FileService.create_file_from_path()`的可選關鍵字參數。
```py
load_stream(stream, share_name, directory_name, file_name, count, **kwargs)
```
將流上傳到 Azure File Share。
參數:
* `stream(類文件對象)` - 打開的文件/流作為文件內容上傳。
* `share_name(str)` - 共享的名稱。
* `directory_name(str)` - 目錄的名稱。
* `file_name(str)` - 文件名。
* `count(int)` - 流的大小(以字節為單位)
* `kwargs(object)` - 傳給`FileService.create_file_from_stream()`的可選關鍵字參數。
```py
load_string(string_data, share_name, directory_name, file_name, **kwargs)
```
將字符串上傳到 Azure File Share。
參數:
* `string_data(str)` - 要加載的字符串。
* `share_name(str)` - 共享的名稱。
* `directory_name(str)` - 目錄的名稱。
* `file_name(str)` - 文件名。
* `kwargs(object)` - 傳給`FileService.create_file_from_text()`的可選關鍵字參數。
### Logging
可以將 Airflow 配置為在 Azure Blob Storage 中讀取和寫入任務日志。 請參閱[將日志寫入 Azure Blob Storage](zh/howto/write-logs.md#將日志寫入azure-blob-storage) 。
### Azure Data Lake
AzureDataLakeHook 通過與 WebHDFS 兼容的 REST API 進行通信。 確保 Airflow 存在`azure_data_lake`類型的連接。 可以通過提供用戶名(=客戶端 ID),密碼(=客戶端密鑰),額外字段可以提供租戶和帳戶名稱來完成授權。
> (請參閱鏈接`azure_data_lake_default`示例)。
* [AzureDataLakeHook](#AzureDataLakeHook):與 Azure Data Lake 的接口。
#### AzureDataLakeHook
```py
class airflow.contrib.hooks.azure_data_lake_hook.AzureDataLakeHook(azure_data_lake_conn_id='azure_data_lake_default')
```
基類: `airflow.hooks.base_hook.BaseHook`
與 Azure Data Lake 進行交互。
客戶端 ID 和客戶端密鑰應該在用戶和密碼參數中。 租戶和帳戶名稱應為額外字段,如 {"tenant": "<TENANT>", "account_name": "ACCOUNT_NAME"}
參數:`azure_data_lake_conn_id(str)` - 對 Azure Data Lake 連接的引用。
```py
check_for_file(file_path)
```
檢查 Azure Data Lake 上是否存在文件。
參數:`file_path(str)` - 文件的路徑和名稱。
返回:如果文件存在則為 True,否則為 False。
返回類型:bool
```py
download_file(local_path, remote_path, nthreads=64, overwrite=True, buffersize=4194304, blocksize=4194304)
```
從 Azure Blob Storage 下載文件。
參數:
* `local_path(str)` - 本地路徑。 如果下載單個文件,將寫入此文件,如果它是現有目錄,將在其中創建文件。 如果下載多個文件,這是要寫入的根目錄。將根據需要創建目錄。
* `remote_path(str)` - 用于查找遠程文件的遠程路徑,可以使用通配符。 不支持使用`**`的遞歸 glob 模式。
* `nthreads(int)` - 要使用的線程數。 如果為 None,則使用核心數。
* `overwrite(bool)` - 是否強制覆蓋現有文件/目錄。 如果 False 并且遠程路徑是目錄,則無論是否覆蓋任何文件都將退出。 如果為 True,則實際僅覆蓋匹配的文件名。
* `buffersize(int)` - 支持最大 2 ** 22 內部緩沖區的字節數。 block 不能大于 trunk。
* `blocksize(int)` - 一個 block 支持最大 2 ** 22 字節數。 在每個 trunk 中,我們為每個 API 調用寫一個較小的block。 這個 block 不能大于 trunk。
```py
get_conn()
```
返回 AzureDLFileSystem 對象。
```py
upload_file(local_path, remote_path, nthreads=64, overwrite=True, buffersize=4194304, blocksize=4194304)
```
將文件上傳到 Azure Data Lake。
參數:
* `local_path(str)` - 本地路徑。 可以是單個文件,目錄(在這種情況下,遞歸上傳)或 glob 模式。 不支持使用`**`的遞歸 glob 模式。
* `remote_path(str)` - 要上傳的遠程路徑; 如果有多個文件,這就是要寫入的根目錄。
* `nthreads(int)` - 要使用的線程數。 如果為 None,則使用核心數。
* `overwrite(bool)` - 是否強制覆蓋現有文件/目錄。 如果 False 并且遠程路徑是目錄,則無論是否覆蓋任何文件都將退出。 如果為 True,則實際僅覆蓋匹配的文件名。
* `buffersize(int)` - 支持最大 2 ** 22 內部緩沖區的字節數。 block 不能大于 trunk。
* `blocksize(int)` - 一個 block 支持最大 2 ** 22 字節數。 在每個 trunk 中,我們為每個 API 調用寫一個較小的block。 這個 block 不能大于 trunk。
## AWS: Amazon Web Services
Airflow 大量支持 Amazon Web Services。 但請注意,Hook,Sensors 和 Operators 都在 contrib 部分。
### AWS EMR
* [EmrAddStepsOperator](#EmrAddStepsOperator) :向現有 EMR JobFlow 添加步驟。
* [EmrCreateJobFlowOperator](#EmrCreateJobFlowOperator) :創建 EMR JobFlow,從 EMR 連接讀取配置。
* [EmrTerminateJobFlowOperator](#EmrTerminateJobFlowOperator) :終止 EMR JobFlow。
* [EmrHook](#EmrHook) :與 AWS EMR 互動。
#### EmrAddStepsOperator
```py
class airflow.contrib.operators.emr_add_steps_operator.EmrAddStepsOperator(job_flow_id, aws_conn_id='s3_default', steps=None, *args, **kwargs)
```
基類: `airflow.models.BaseOperator`
向現有 EMR job_flow 添加步驟的 operator。
參數:
* `job_flow_id` - 要添加步驟的 JobFlow 的 ID。(模板渲染后)
* `aws_conn_id(str)` - 與使用的 aws 連接
* `steps(list)` - 要添加到作業流的 boto3 步驟。 (模板渲染后)
#### EmrCreateJobFlowOperator
```py
class airflow.contrib.operators.emr_create_job_flow_operator.EmrCreateJobFlowOperator(aws_conn_id='s3_default', emr_conn_id='emr_default', job_flow_overrides=None, *args, **kwargs)
```
基類: `airflow.models.BaseOperator`
創建 EMR JobFlow,從 EMR 連接讀取配置。 可以向 JobFlow 傳遞參數以覆蓋連接中的配置。
參數:
* `aws_conn_id(str)` - 要使用的 aws 連接
* `emr_conn_id(str)` - 要使用的 emr 連接
* `job_flow_overrides` - 用于覆蓋 emr_connection extra 的 boto3 式參數。 (模板渲染后)
#### EmrTerminateJobFlowOperator
```py
class airflow.contrib.operators.emr_terminate_job_flow_operator.EmrTerminateJobFlowOperator(job_flow_id, aws_conn_id='s3_default', *args, **kwargs)
```
基類: `airflow.models.BaseOperator`
終止 EMR JobFlows 的 operator。
參數:
* `job_flow_id` - 要終止的 JobFlow 的 id。(模板渲染后)
* `aws_conn_id(str)` - 要使用的 aws 連接
#### EmrHook
```py
class airflow.contrib.hooks.emr_hook.EmrHook(emr_conn_id=None, *args, **kwargs)
```
基類: `airflow.contrib.hooks.aws_hook.AwsHook`
與 AWS EMR 交互。 emr_conn_id 是使用 create_job_flow 方法唯一必需的。
```py
create_job_flow(job_flow_overrides)
```
使用 EMR 連接中的配置創建作業流。 json_flow_overrrides 是傳給 run_job_flow 方法的參數。
### AWS S3
* [S3Hook](#S3Hook) :與 AWS S3 交互。
* [S3FileTransformOperator](#S3FileTransformOperator) :將數據從 S3 源位置復制到本地文件系統上的臨時位置。
* [S3ListOperator](#S3ListOperator) :列出與 S3 位置的鍵前綴匹配的文件。
* [S3ToGoogleCloudStorageOperator](#S3ToGoogleCloudStorageOperator) :將 S3 位置與 Google 云端存儲[分區](28)同步。
* [S3ToHiveTransfer](#S3ToHiveTransfer) :將數據從 S3 移動到 Hive。 operator 從 S3 下載文件,在將文件加載到 Hive 表之前將其存儲在本地。
#### S3Hook
```py
class airflow.hooks.S3_hook.S3Hook(aws_conn_id='aws_default')
```
基類: `airflow.contrib.hooks.aws_hook.AwsHook`
使用 boto3 庫與 AWS S3 交互。
```py
check_for_bucket(bucket_name)
```
檢查 bucket_name 是否存在。
參數:`bucket_name(str)` - 存儲桶的名稱
```py
check_for_key(key, bucket_name=None)
```
檢查存儲桶中是否存在密鑰
參數:
* `key(str)` - 指向文件的 S3 的 key
* `bucket_name(str)` - 存儲桶的名稱
```py
check_for_prefix(bucket_name, prefix, delimiter)
```
檢查存儲桶中是否存在前綴
```py
check_for_wildcard_key(wildcard_key, bucket_name=None, delimiter='')
```
檢查桶中是否存在與通配符表達式匹配的密鑰
```py
get_bucket(bucket_name)
```
返回 boto3.S3.Bucket 對象
參數:`bucket_name(str)` - 存儲桶的名稱
```py
get_key(key, bucket_name=None)
```
返回 boto3.s3.Object
參數:
* `key(str)` - 密鑰的路徑
* `bucket_name(str)` - 存儲桶的名稱
```py
get_wildcard_key(wildcard_key, bucket_name=None, delimiter='')
```
返回與通配符表達式匹配的 boto3.s3.Object 對象
參數:
* `wildcard_key(str)` - 密鑰的路徑
* `bucket_name(str)` - 存儲桶的名稱
```py
list_keys(bucket_name, prefix='', delimiter='', page_size=None, max_items=None)
```
列出前綴下的存儲桶中的密鑰,但不包含分隔符
參數:
* `bucket_name(str)` - 存儲桶的名稱
* `prefix(str)` - 一個密鑰前綴
* `delimiter(str)` - 分隔符標記鍵層次結構。
* `page_size(int)` - 分頁大小
* `max_items(int)` - 要返回的最大項目數
```py
list_prefixes(bucket_name, prefix='', delimiter='', page_size=None, max_items=None)
```
列出前綴下的存儲桶中的前綴
參數:
* `bucket_name(str)` - 存儲桶的名稱
* `prefix(str)` - 一個密鑰前綴
* `delimiter(str)` - 分隔符標記鍵層次結構。
* `page_size(int)` - 分頁大小
* `max_items(int)` - 要返回的最大項目數
```py
load_bytes(bytes_data, key, bucket_name=None, replace=False, encrypt=False)
```
將字節加載到 S3
這是為了方便在 S3 中刪除字符串。 它使用 boto 基礎結構將文件發送到 s3。
參數:
* `bytes_data(bytes)` - 設置為密鑰內容的字節。
* `key(str)` - 指向文件的 S3 鍵
* `bucket_name(str)` - 存儲桶的名稱
* `replace(bool)` - 一個標志,用于決定是否覆蓋密鑰(如果已存在)
* `encrypt(bool)` - 如果為 True,則文件將在服務器端由 S3 加密,并在 S3 中靜止時以加密形式存儲。
```py
load_file(filename, key, bucket_name=None, replace=False, encrypt=False)
```
將本地文件加載到 S3
參數:
* `filename(str)` - 要加載的文件的名稱。
* `key(str)` - 指向文件的 S3 鍵
* `bucket_name(str)` - 存儲桶的名稱
* `replace(bool)` - 一個標志,用于決定是否覆蓋密鑰(如果已存在)。 如果 replace 為 False 且密鑰存在,則會引發錯誤。
* `encrypt(bool)` - 如果為 True,則文件將在服務器端由 S3 加密,并在 S3 中靜止時以加密形式存儲。
```py
load_string(string_data, key, bucket_name=None, replace=False, encrypt=False, encoding='utf-8')
```
將字符串加載到 S3
這是為了方便在 S3 中刪除字符串。 它使用 boto 基礎結構將文件發送到 s3。
參數:
* `string_data(str)` - 要設置為鍵的內容的字符串。
* `key(str)` - 指向文件的 S3 鍵
* `bucket_name(str)` - 存儲桶的名稱
* `replace(bool)` - 一個標志,用于決定是否覆蓋密鑰(如果已存在)
* `encrypt(bool)` - 如果為 True,則文件將在服務器端由 S3 加密,并在 S3 中靜止時以加密形式存儲。
```py
read_key(key, bucket_name=None)
```
從 S3 讀取密鑰
參數:
* `key(str)` - 指向文件的 S3 鍵
* `bucket_name(str)` - 存儲桶的名稱
```py
select_key(key, bucket_name=None, expression='SELECT * FROM S3Object', expression_type='SQL', input_serialization={'CSV': {}}, output_serialization={'CSV': {}})
```
使用 S3 Select 讀取密鑰。
參數:
* `key(str)` - 指向文件的 S3 鍵
* `bucket_name(str)` - 存儲桶的名稱
* `expression(str)` - S3 選擇表達式
* `expression_type(str)` - S3 選擇表達式類型
* `input_serialization(dict)` - S3 選擇輸入數據序列化格式
* `output_serialization(dict)` - S3 選擇輸出數據序列化格式
返回:通過 S3 Select 檢索原始數據的子集
返回類型:str
也可以看看
有關 S3 Select 參數的更多詳細信息: [http://boto3.readthedocs.io/en/latest/reference/services/s3.html#S3.Client.select_object_content](http://boto3.readthedocs.io/en/latest/reference/services/s3.html#S3.Client.select_object_content)
#### S3FileTransformOperator
```py
class airflow.operators.s3_file_transform_operator.S3FileTransformOperator(source_s3_key, dest_s3_key, transform_script=None, select_expression=None, source_aws_conn_id='aws_default', dest_aws_conn_id='aws_default', replace=False, *args, **kwargs)
```
基類: `airflow.models.BaseOperator`
將數據從源 S3 位置復制到本地文件系統上的臨時位置。 用轉換腳本對此文件運行轉換,并將輸出上傳到目標 S3 位置。
本地文件系統中的源文件和目標文件的位置作為轉換腳本的第一個和第二個參數提供。 轉換腳本應該從源讀取數據,轉換它并將輸出寫入本地目標文件。 然后,operator 將本地目標文件上傳到 S3。
S3 Select 也可用于過濾源內容。 如果指定了 S3 Select 表達式,則用戶可以省略轉換腳本。
參數:
* `source_s3_key(str)` - 源 S3 的密鑰。(模板渲染后)
* `source_aws_conn_id(str)` - 源 s3 連接
* `dest_s3_key(str)` - 目的 S3 的密鑰。(模板渲染后)
* `dest_aws_conn_id(str)` - 目標 s3 連接
* `replace(bool)` - 替換 dest S3 密鑰(如果已存在)
* `transform_script(str)` - 可執行轉換腳本的位置
* `select_expression(str)` - S3 選擇表達式
#### S3ListOperator
```py
class airflow.contrib.operators.s3listoperator.S3ListOperator(bucket, prefix='', delimiter='', aws_conn_id='aws_default', *args, **kwargs)
```
基類: `airflow.models.BaseOperator`
列出桶中具有給定前綴的所有對象。
此 operator 返回一個 python 列表,其中包含可由`xcom`在下游任務中使用的對象名稱。
參數:
* `bucket(str)` - S3 存儲桶在哪里找到對象。(模板渲染后)
* `prefix(str)` - 用于過濾名稱以此字符串為前綴的對象。(模板渲染后)
* `delimiter(str)` - 分隔符標記鍵層次結構。(模板渲染后)
* `aws_conn_id(str)` - 連接到 S3 存儲時使用的連接 ID。
以下 operator 將列出`data`存儲區中 S3 `customers/2018/04/` key 的所有文件(不包括子文件夾)。
```py
s3_file = S3ListOperator (
task_id = 'list_3s_files' ,
bucket = 'data' ,
prefix = 'customers/2018/04/' ,
delimiter = '/' ,
aws_conn_id = 'aws_customers_conn'
)
```
#### S3ToGoogleCloudStorageOperator
```py
class airflow.contrib.operators.s3_to_gcs_operator.S3ToGoogleCloudStorageOperator(bucket, prefix='', delimiter='', aws_conn_id='aws_default', dest_gcs_conn_id=None, dest_gcs=None, delegate_to=None, replace=False, *args, **kwargs)
```
基類: `airflow.contrib.operators.s3listoperator.S3ListOperator`
將 S3 密鑰(可能是前綴)與 Google 云端存儲目標路徑同步。
參數:
* `bucket(str)` - S3 存儲桶在哪里找到對象。(模板渲染后)
* `prefix(str)` - 用于過濾名稱以此字符串為前綴的對象。(模板渲染后)
* `delimiter(str)` - 分隔符標記鍵層次結構。(模板渲染后)
* `aws_conn_id(str)` - 源 S3 連接
* `dest_gcs_conn_id(str)` - 連接到 Google 云端存儲時要使用的目標連接 ID。
* `dest_gcs(str)` - 要存儲文件的目標 Google 云端存儲**分區**和前綴。(模板渲染后)
* `delegate_to(str)` - 代理的帳戶(如果有)。 為此,發出請求的服務帳戶必須啟用域范圍委派。
* `replace(bool)` - 是否要替換現有目標文件。
例子
```py
s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
task_id ='s3_to_gcs_example',bucket ='my-s3-bucket',prefix ='data / customers-201804',dest_gcs_conn_id ='google_cloud_default',dest_gcs ='gs://my.gcs.bucket/some/customers/' ,replace = False,dag = my-dag)
```
請注意, `bucket` , `prefix` , `delimiter`和`dest_gcs`是模板化的,因此如果您愿意,可以在其中使用變量。
#### S3ToHiveTransfer
```py
class airflow.operators.s3_to_hive_operator.S3ToHiveTransfer(s3_key, field_dict, hive_table, delimiter=', ', create=True, recreate=False, partition=None, headers=False, check_headers=False, wildcard_match=False, aws_conn_id='aws_default', hive_cli_conn_id='hive_cli_default', input_compressed=False, tblproperties=None, select_expression=None, *args, **kwargs)
```
基類: `airflow.models.BaseOperator`
將數據從 S3 移動到 Hive。 operator 從 S3 下載文件,在將文件加載到 Hive 表之前將其存儲在本地。 如果`create`或`recreate`參數設置為`True` ,則會生成`CREATE TABLE`和`DROP TABLE`語句。 Hive 數據類型是從游標的元數據中推斷出來的。
請注意,Hive 中生成的表使用`STORED AS textfile` ,這不是最有效的序列化格式。 如果加載了大量數據和/或表格被大量查詢,您可能只想使用此 operator 將數據暫存到臨時表中,然后使用`HiveOperator`將其加載到最終目標表中。
參數:
* `s3_key(str)` - 從 S3 檢索的密鑰。(模板渲染后)
* `field_dict(dict)` - 字段的字典在文件中命名為鍵,其 Hive 類型為值
* `hive_table(str)` - 目標 Hive 表,使用點表示法來定位特定數據庫。(模板渲染后)
* `create(bool)` - 是否創建表,如果它不存在
* `recreate(bool)` - 是否在每次執行時刪除并重新創建表
* `partition(dict)` - 將目標分區作為分區列和值的字典。(模板渲染后)
* `headers(bool)` - 文件是否包含第一行的列名
* `check_headers(bool)` - 是否應該根據 field_dict 的鍵檢查第一行的列名
* `wildcard_match(bool)` - 是否應將 s3_key 解釋為 Unix 通配符模式
* `delimiter(str)` - 文件中的字段分隔符
* `aws_conn_id(str)` - 源 s3 連接
* `hive_cli_conn_id(str)` - 目標配置單元連接
* `input_compressed(bool)` - 布爾值,用于確定是否需要文件解壓縮來處理標頭
* `tblproperties(dict)` - 正在創建的 hive 表的 TBLPROPERTIES
* `select_expression(str)` - S3 選擇表達式
### AWS EC2 容器服務
* [ECSOperator](#ECSOperator) :在 AWS EC2 容器服務上執行任務。
#### ECSOperator
```py
class airflow.contrib.operators.ecs_operator.ECSOperator(task_definition, cluster, overrides, aws_conn_id=None, region_name=None, launch_type='EC2', **kwargs)
```
基類: `airflow.models.BaseOperator`
在 AWS EC2 Container Service 上執行任務
參數:
* `task_definition(str)` - EC2 容器服務上的任務定義名稱
* `cluster(str)` - EC2 Container Service 上的集群名稱
* `aws_conn_id(str)` - AWS 的連接 ID。 如果為 None,將使用 boto3 憑證( [http://boto3.readthedocs.io/en/latest/guide/configuration.html](http://boto3.readthedocs.io/en/latest/guide/configuration.html) )。
* `region_name` - 要在 AWS Hook 中使用的 region 名稱。 覆蓋連接中的 region_name(如果提供)
* `launch_type` - 運行任務的啟動類型('EC2'或'FARGATE')
參數:boto3 將接收的相同參數(模板化): [http://boto3.readthedocs.org/en/latest/reference/services/ecs.html#ECS.Client.run_task](http://boto3.readthedocs.org/en/latest/reference/services/ecs.html#ECS.Client.run_task)
類型:dict
類型:launch_type:str
### AWS Batch Service
* [AWSBatchOperator](#AWSBatchOperator) :在 AWS Batch Service 上執行任務。
#### AWSBatchOperator
```py
class airflow.contrib.operators.awsbatch_operator.AWSBatchOperator(job_name, job_definition, job_queue, overrides, max_retries=4200, aws_conn_id=None, region_name=None, **kwargs)
```
基類: `airflow.models.BaseOperator`
在 AWS Batch Service 上執行作業
參數:
* `job_name(str)` - 將在 AWS Batch 上運行的作業的名稱
* `job_definition(str)` - AWS Batch 上的作業定義名稱
* `job_queue(str)` - AWS Batch 上的隊列名稱
* `max_retries(int)` - 服務器未合并時的指數退避重試,4200 = 48 小時
* `aws_conn_id(str)` - AWS 的連接 ID。 如果為 None,將使用憑證 boto3 策略( [http://boto3.readthedocs.io/en/latest/guide/configuration.html](http://boto3.readthedocs.io/en/latest/guide/configuration.html) )。
* **region_name** - 要在 AWS Hook 中使用的區域名稱。 覆蓋連接中的 region_name(如果提供)
參數:boto3 將在 containerOverrides 上接收的相同參數(模板化): [http://boto3.readthedocs.io/en/latest/reference/services/batch.html#submit_job](http://boto3.readthedocs.io/en/latest/reference/services/batch.html)
類型:dict
### AWS RedShift
* [AwsRedshiftClusterSensor](#AwsRedshiftClusterSensor) :等待 Redshift 集群達到特定狀態。
* [RedshiftHook](#RedshiftHook) :使用 boto3 庫與 AWS Redshift 交互。
* [RedshiftToS3Transfer](#RedshiftToS3Transfer) :對帶有或不帶標頭的 CSV 執行卸載命令。
* [S3ToRedshiftTransfer](#S3ToRedshiftTransfer) :從 S3 執行復制命令為 CSV,帶或不帶標題。
#### AwsRedshiftClusterSensor
```py
class airflow.contrib.sensors.aws_redshift_cluster_sensor.AwsRedshiftClusterSensor(cluster_identifier, target_status='available', aws_conn_id='aws_default', *args, **kwargs)
```
基類: [`airflow.sensors.base_sensor_operator.BaseSensorOperator`]
等待 Redshift 集群達到特定狀態。
參數:
* `cluster_identifier(str)` - 要 ping 的集群的標識符。
* `target_status(str)` - 所需的集群狀態。
```py
poke(context)
```
Operator 在繼承此類時應該覆蓋以上函數。
#### RedshiftHook
```py
class airflow.contrib.hooks.redshift_hook.RedshiftHook(aws_conn_id='aws_default')
```
基類: [`airflow.contrib.hooks.aws_hook.AwsHook`]
使用 boto3 庫與 AWS Redshift 交互
```py
cluster_status(cluster_identifier)
```
返回集群的狀態
參數:`cluster_identifier(str)` - 集群的唯一標識符
```py
create_cluster_snapshot(snapshot_identifier, cluster_identifier)
```
創建集群的快照
參數:
* `snapshot_identifier(str)` - 集群快照的唯一標識符
* `cluster_identifier(str)` - 集群的唯一標識符
```py
delete_cluster(cluster_identifier, skip_final_cluster_snapshot=True, final_cluster_snapshot_identifier='')
```
刪除集群并可選擇創建快照
參數:
* `cluster_identifier(str)` - 集群的唯一標識符
* `skip_final_cluster_snapshot(bool)` - 確定集群快照創建
* `final_cluster_snapshot_identifier(str)` - 最終集群快照的名稱
```py
describe_cluster_snapshots(cluster_identifier)
```
獲取集群的快照列表
參數:`cluster_identifier(str)` - 集群的唯一標識符
```py
restore_from_cluster_snapshot(cluster_identifier, snapshot_identifier)
```
從其快照還原集群
參數:
* `cluster_identifier(str)` - 集群的唯一標識符
* `snapshot_identifier(str)` - 集群快照的唯一標識符
#### RedshiftToS3Transfer
```py
class airflow.operators.redshift_to_s3_operator.RedshiftToS3Transfer(schema,table,s3_bucket,s3_key,redshift_conn_id ='redshift_default',aws_conn_id ='aws_default',unload_options =(),autocommit = False,parameters = None,include_header = False,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
執行 UNLOAD 命令,將 s3 作為帶標題的 CSV
參數:
* `schema(str)` - 對 redshift 數據庫中特定模式的引用
* `table(str)` - 對 redshift 數據庫中特定表的引用
* `s3_bucket(str)` - 對特定 S3 存儲桶的引用
* `s3_key(str)` - 對特定 S3 密鑰的引用
* `redshift_conn_id(str)` - 對特定 redshift 數據庫的引用
* `aws_conn_id(str)` - 對特定 S3 連接的引用
* `unload_options(list)` - 對 UNLOAD 選項列表的引用
#### S3ToRedshiftTransfer
```py
class airflow.operators.s3_to_redshift_operator.S3ToRedshiftTransfer(schema,table,s3_bucket,s3_key,redshift_conn_id ='redshift_default',aws_conn_id ='aws_default',copy_options =(),autocommit = False,parameters = None,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
執行 COPY 命令將文件從 s3 加載到 Redshift
參數:
* `schema(str)` - 對 redshift 數據庫中特定模式的引用
* `table(str)` - 對 redshift 數據庫中特定表的引用
* `s3_bucket(str)` - 對特定 S3 存儲桶的引用
* `s3_key(str)` - 對特定 S3 密鑰的引用
* `redshift_conn_id(str)` - 對特定 redshift 數據庫的引用
* `aws_conn_id(str)` - 對特定 S3 連接的引用
* `copy_options(list)` - 對 COPY 選項列表的引用
## Databricks
[Databricks](https://databricks.com/)貢獻了一個 Airflow operator,可以將運行提交到 Databricks 平臺。在運營商內部與`api/2.0/jobs/runs/submit` [端點進行通信](https://docs.databricks.com/api/latest/jobs.html)。
### DatabricksSubmitRunOperator
```py
class airflow.contrib.operators.databricks_operator.DatabricksSubmitRunOperator(json = None,spark_jar_task = None,notebook_task = None,new_cluster = None,existing_cluster_id = None,libraries = None,run_name = None,timeout_seconds = None,databricks_conn_id ='databricks_default',polling_period_seconds = 30,databricks_retry_limit = 3,do_xcom_push = False,**kwargs)
```
基類: `airflow.models.BaseOperator`
使用[api / 2.0 / jobs / runs / submit](https://docs.databricks.com/api/latest/jobs.html) API 端點向 Databricks 提交 Spark 作業運行。
有兩種方法可以實例化此 operator。
在第一種方式,你可以把你通常用它來調用的 JSON 有效載荷`api/2.0/jobs/runs/submit`端點并將其直接傳遞到我們`DatabricksSubmitRunOperator`通過`json`參數。例如
```py
json = {
'new_cluster' : {
'spark_version' : '2.1.0-db3-scala2.11' ,
'num_workers' : 2
},
'notebook_task' : {
'notebook_path' : '/Users/airflow@example.com/PrepareData' ,
},
}
notebook_run = DatabricksSubmitRunOperator ( task_id = 'notebook_run' , json = json )
```
另一種完成同樣事情的方法是直接使用命名參數`DatabricksSubmitRunOperator`。請注意,`runs/submit`端點中的每個頂級參數都只有一個命名參數。在此方法中,您的代碼如下所示:
```py
new_cluster = {
'spark_version' : '2.1.0-db3-scala2.11' ,
'num_workers' : 2
}
notebook_task = {
'notebook_path' : '/Users/airflow@example.com/PrepareData' ,
}
notebook_run = DatabricksSubmitRunOperator (
task_id = 'notebook_run' ,
new_cluster = new_cluster ,
notebook_task = notebook_task )
```
在提供 json 參數**和**命名參數的情況下,它們將合并在一起。如果在合并期間存在沖突,則命名參數將優先并覆蓋頂級`json`鍵。
```py
目前 DatabricksSubmitRunOperator 支持的命名參數是
```
* `spark_jar_task`
* `notebook_task`
* `new_cluster`
* `existing_cluster_id`
* `libraries`
* `run_name`
* `timeout_seconds`
參數:
* `json(dict)` -
包含 API 參數的 JSON 對象,將直接傳遞給`api/2.0/jobs/runs/submit`端點。其他命名參數(即`spark_jar_task`,`notebook_task`..)到該運營商將與此 JSON 字典合并如果提供他們。如果在合并期間存在沖突,則命名參數將優先并覆蓋頂級 json 鍵。(模板渲染后)
也可以看看
有關模板的更多信息,請參閱[Jinja 模板](concepts.html)。[https://docs.databricks.com/api/latest/jobs.html#runs-submit](https://docs.databricks.com/api/latest/jobs.html)
* `spark_jar_task(dict)` -
JAR 任務的主要類和參數。請注意,實際的 JAR 在`libraries`。中指定。_ 無論是 _ `spark_jar_task` _ 或 _ `notebook_task`應符合規定。該字段將被模板化。
也可以看看
[https://docs.databricks.com/api/latest/jobs.html#jobssparkjartask](https://docs.databricks.com/api/latest/jobs.html)
* `notebook_task(dict)` -
筆記本任務的筆記本路徑和參數。_ 無論是 _ `spark_jar_task` _ 或 _ `notebook_task`應符合規定。該字段將被模板化。
也可以看看
[https://docs.databricks.com/api/latest/jobs.html#jobsnotebooktask](https://docs.databricks.com/api/latest/jobs.html)
* `new_cluster(dict)` -
將在其上運行此任務的新集群的規范。_ 無論是 _ `new_cluster` _ 或 _ `existing_cluster_id`應符合規定。該字段將被模板化。
也可以看看
[https://docs.databricks.com/api/latest/jobs.html#jobsclusterspecnewcluster](https://docs.databricks.com/api/latest/jobs.html)
* `existing_cluster_id(str)` - 要運行此任務的現有集群的 ID。_ 無論是 _ `new_cluster` _ 或 _ `existing_cluster_id`應符合規定。該字段將被模板化。
* `libraries(list 或 dict)` -
這個運行的庫將使用。該字段將被模板化。
也可以看看
[https://docs.databricks.com/api/latest/libraries.html#managedlibrarieslibrary](https://docs.databricks.com/api/latest/libraries.html)
* `run_name(str)` - 用于此任務的運行名稱。默認情況下,這將設置為 Airflow `task_id`。這`task_id`是超類`BaseOperator`的必需參數。該字段將被模板化。
* `timeout_seconds(int32)` - 此次運行的超時。默認情況下,使用值 0 表示沒有超時。該字段將被模板化。
* `databricks_conn_id(str)` - 要使用的 Airflow 連接的名稱。默認情況下,在常見情況下,這將是`databricks_default`。要使用基于令牌的身份驗證,請在連接的額外字段`token`中提供密鑰。
* `polling_period_seconds(int)` - 控制我們輪詢此運行結果的速率。默認情況下,operator 每 30 秒輪詢一次。
* `databricks_retry_limit(int)` - 如果 Databricks 后端無法訪問,則重試的次數。其值必須大于或等于 1。
* `do_xcom_push(bool)` - 我們是否應該將 run_id 和 run_page_url 推送到 xcom。
## GCP:Google云端平臺
Airflow 廣泛支持 Google Cloud Platform。但請注意,大多數 Hooks 和 Operators 都在 contrib 部分。這意味著他們具有 _beta_ 狀態,這意味著他們可以在次要版本之間進行重大更改。
請參閱[GCP 連接類型](howto/manage-connections.html)文檔以配置與 GCP 的連接。
### 記錄
可以將 Airflow 配置為在 Google 云端存儲中讀取和寫入任務日志。請參閱[將日志寫入 Google 云端存儲](howto/write-logs.html)。
### BigQuery 的
#### BigQuery Operator
* [BigQueryCheckOperator](#BigQueryCheckOperator):對 SQL 查詢執行檢查,該查詢將返回具有不同值的單行。
* [BigQueryValueCheckOperator](#BigQueryValueCheckOperator):使用 SQL 代碼執行簡單的值檢查。
* [BigQueryIntervalCheckOperator](#BigQueryIntervalCheckOperator):檢查作為 SQL 表達式給出的度量值是否在 days_back 之前的某個容差范圍內。
* [BigQueryCreateEmptyTableOperator](#BigQueryCreateEmptyTableOperator):在指定的 BigQuery 數據集中創建一個新的空表,可選擇使用模式。
* [BigQueryCreateExternalTableOperator](#BigQueryCreateExternalTableOperator):使用 Google Cloud Storage 中的數據在數據集中創建新的外部表。
* [BigQueryDeleteDatasetOperator](#BigQueryDeleteDatasetOperator):刪除現有的 BigQuery 數據集。
* [BigQueryOperator](#BigQueryOperator):在特定的 BigQuery 數據庫中執行 BigQuery SQL 查詢。
* [BigQueryToBigQueryOperator](#BigQueryToBigQueryOperator):將 BigQuery 表復制到另一個 BigQuery 表。
* [BigQueryToCloudStorageOperator](#BigQueryToCloudStorageOperator):將 BigQuery 表傳輸到 Google Cloud Storage 存儲桶
##### BigQueryCheckOperator
```py
class airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator(sql,bigquery_conn_id ='bigquery_default',*args,**kwargs)
```
基類: [`airflow.operators.check_operator.CheckOperator`]
對 BigQuery 執行檢查。該`BigQueryCheckOperator`預期的 SQL 查詢將返回一行。使用 python `bool`強制轉換第一行的每個值。如果任何值返回,`False`則檢查失敗并輸出錯誤。
請注意,Python bool 強制轉換如下`False`:
* `False`
* `0`
* 空字符串(`""`)
* 空列表(`[]`)
* 空字典或集(`{}`)
給定一個查詢,`SELECT COUNT(*) FROM foo` 當且僅當`== 0`時會失敗。您可以制作更復雜的查詢,例如,可以檢查表與上游源表的行數相同,或者今天的分區計數大于昨天的分區,或者一組指標是否更少 7 天平均值超過 3 個標準差。
此 Operator 可用作管道中的數據質量檢查,并且根據您在 DAG 中的位置,您可以選擇在關鍵路徑停止,防止發布可疑數據,或者接收電子郵件報警而不阻止 DAG 的繼續。
參數:
* `sql(str)` - 要執行的 sql
* `bigquery_conn_id(str)` - 對 BigQuery 數據庫的引用
##### BigQueryValueCheckOperator
```py
class airflow.contrib.operators.bigquery_check_operator.BigQueryValueCheckOperator(sql,pass_value,tolerance = None,bigquery_conn_id ='bigquery_default',*args,**kwargs)
```
基類: [`airflow.operators.check_operator.ValueCheckOperator`]
使用 sql 代碼執行簡單的值檢查。
參數:`sql(str)` - 要執行的 sql
##### BigQueryIntervalCheckOperator
```py
class airflow.contrib.operators.bigquery_check_operator.BigQueryIntervalCheckOperator(table,metrics_thresholds,date_filter_column ='ds',days_back = -7,bigquery_conn_id ='bigquery_default',*args,**kwargs)
```
基類: [`airflow.operators.check_operator.IntervalCheckOperator`]
檢查作為 SQL 表達式給出的度量值是否在 days_back 之前的某個容差范圍內。
此方法構造一個類似的查詢
```py
SELECT { metrics_thresholddictkey } FROM { table }
WHERE { date_filter_column } =< date >
```
參數:
* `table(str)` - 表名
* `days_back(int)` - ds 與我們要檢查的 ds 之間的天數。默認為 7 天
* `metrics_threshold(dict)` - 由指標索引的比率字典,例如'COUNT(*)':1.5 將需要當前日和之前的 days_back 之間 50%或更小的差異。
##### BigQueryGetDataOperator
```py
class airflow.contrib.operators.bigquery_get_data.BigQueryGetDataOperator(dataset_id,table_id,max_results ='100',selected_fields = None,bigquery_conn_id ='bigquery_default',delegate_to = None,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
從 BigQuery 表中獲取數據(或者為所選列獲取數據)并在 python 列表中返回數據。返回列表中的元素數將等于獲取的行數。列表中的每個元素將是一個列表,其中元素將表示該行的列值。
**結果示例**:`[['Tony', '10'], ['Mike', '20'], ['Steve', '15']]`
注意
如果傳遞的字段`selected_fields`的順序與 BQ 表中已有的列的順序不同,則數據仍將按 BQ 表的順序排列。例如,如果 BQ 表有 3 列,`[A,B,C]`并且您傳遞'B, A',`selected_fields` 仍然是表格'A,B'。
**示例** :
```py
get_data = BigQueryGetDataOperator (
task_id = 'get_data_from_bq' ,
dataset_id = 'test_dataset' ,
table_id = 'Transaction_partitions' ,
max_results = '100' ,
selected_fields = 'DATE' ,
bigquery_conn_id = 'airflow-service-account'
)
```
參數:
* `dataset_id` - 請求的表的數據集 ID。(模板渲染后)
* `table_id(str)` - 請求表的表 ID。(模板渲染后)
* `max_results(str)` - 從表中獲取的最大記錄數(行數)。(模板渲染后)
* `selected_fields(str)` - 要返回的字段列表(逗號分隔)。如果未指定,則返回所有字段。
* `bigquery_conn_id(str)` - 對特定 BigQuery 的引用。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
##### BigQueryCreateEmptyTableOperator
```py
class airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator(dataset_id,table_id,project_id = None,schema_fields = None,gcs_schema_object = None,time_partitioning = {},bigquery_conn_id ='bigquery_default',google_cloud_storage_conn_id ='google_cloud_default',delegate_to = None,*args ,**kwargs)
```
基類: `airflow.models.BaseOperator`
在指定的 BigQuery 數據集中創建一個新的空表,可選擇使用模式。
可以用兩種方法之一指定用于 BigQuery 表的模式。您可以直接傳遞架構字段,也可以將運營商指向 Google 云存儲對象名稱。Google 云存儲中的對象必須是包含架構字段的 JSON 文件。您還可以創建沒有架構的表。
參數:
* `project_id(str)` - 將表創建的項目。(模板渲染后)
* `dataset_id(str)` - 用于創建表的數據集。(模板渲染后)
* `table_id(str)` - 要創建的表的名稱。(模板渲染后)
* `schema_fields(list)` -
如果設置,則此處定義的架構字段列表:[https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema](https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema)
**示例** :
```py
schema_fields = [{ "name" : "emp_name" , "type" : "STRING" , "mode" : "REQUIRED" },
{ "name" : "salary" , "type" : "INTEGER" , "mode" : "NULLABLE" }]
```
* `gcs_schema_object(str)` - 包含模式(模板化)的 JSON 文件的完整路徑。例如:`gs://test-bucket/dir1/dir2/employee_schema.json`
* `time_partitioning(dict)` -
配置可選的時間分區字段,即按 API 規范按字段,類型和到期分區。
也可以看看
[https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#timePartitioning](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#timePartitioning)
* `bigquery_conn_id(str)` - 對特定 BigQuery 的引用。
* `google_cloud_storage_conn_id(str)` - 對特定 Google 云存儲的引用。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
**示例(在 GCS 中使用 JSON schema)**:
```py
CreateTable = BigQueryCreateEmptyTableOperator (
task_id = 'BigQueryCreateEmptyTableOperator_task' ,
dataset_id = 'ODS' ,
table_id = 'Employees' ,
project_id = 'internal-gcp-project' ,
gcs_schema_object = 'gs://schema-bucket/employee_schema.json' ,
bigquery_conn_id = 'airflow-service-account' ,
google_cloud_storage_conn_id = 'airflow-service-account'
)
```
**對應的 Schema 文件**(`employee_schema.json`):
```py
[
{
"mode" : "NULLABLE" ,
"name" : "emp_name" ,
"type" : "STRING"
},
{
"mode" : "REQUIRED" ,
"name" : "salary" ,
"type" : "INTEGER"
}
]
```
**示例(在 DAG 中使用 schema)**:
```py
CreateTable = BigQueryCreateEmptyTableOperator (
task_id = 'BigQueryCreateEmptyTableOperator_task' ,
dataset_id = 'ODS' ,
table_id = 'Employees' ,
project_id = 'internal-gcp-project' ,
schema_fields = [{ "name" : "emp_name" , "type" : "STRING" , "mode" : "REQUIRED" },
{ "name" : "salary" , "type" : "INTEGER" , "mode" : "NULLABLE" }],
bigquery_conn_id = 'airflow-service-account' ,
google_cloud_storage_conn_id = 'airflow-service-account'
)
```
##### BigQueryCreateExternalTableOperator
```py
class airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperator(bucket,source_objects,destination_project_dataset_table,schema_fields = None,schema_object = None,source_format ='CSV',compression ='NONE',skip_leading_rows = 0,field_delimiter =',',max_bad_records = 0 ,quote_character = None,allow_quoted_newlines = False,allow_jagged_rows = False,bigquery_conn_id ='bigquery_default',google_cloud_storage_conn_id ='google_cloud_default',delegate_to = None,src_fmt_configs = {},*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
使用 Google 云端存儲中的數據在數據集中創建新的外部表。
可以用兩種方法之一指定用于 BigQuery 表的模式。您可以直接傳遞架構字段,也可以將運營商指向 Google 云存儲對象名稱。Google 云存儲中的對象必須是包含架構字段的 JSON 文件。
參數:
* `bucket(str)` - 指向外部表的存儲桶。(模板渲染后)
* **source_objects** - 指向表格的 Google 云存儲 URI 列表。(模板化)如果 source_format 是'DATASTORE_BACKUP',則列表必須只包含一個 URI。
* `destination_project_dataset_table(str)` - 用于將數據加載到(模板化)的表\1\<project>。)<dataset>。<table> BigQuery 表。如果未包\1\<project>,則項目將是連接 json 中定義的項目。
* `schema_fields(list)` -
如果設置,則此處定義的架構字段列表:[https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema](https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema)
**示例** :
```py
schema_fields = [{ "name" : "emp_name" , "type" : "STRING" , "mode" : "REQUIRED" },
{ "name" : "salary" , "type" : "INTEGER" , "mode" : "NULLABLE" }]
```
當 source_format 為'DATASTORE_BACKUP'時,不應設置。
* **schema_object** - 如果設置,則指向包含表的架構的.json 文件的 GCS 對象路徑。(模板渲染后)
* **schema_object** - 字符串
* `source_format(str)` - 數據的文件格式。
* `compression(str)` - [可選]數據源的壓縮類型。可能的值包括 GZIP 和 NONE。默認值為 NONE。Google Cloud Bigtable,Google Cloud Datastore 備份和 Avro 格式會忽略此設置。
* `skip_leading_rows(int)` - 從 CSV 加載時要跳過的行數。
* `field_delimiter(str)` - 用于 CSV 的分隔符。
* `max_bad_records(int)` - BigQuery 在運行作業時可以忽略的最大錯誤記錄數。
* `quote_character(str)` - 用于引用 CSV 文件中數據部分的值。
* `allow_quoted_newlines(bool)` - 是否允許引用的換行符(true)或不允許(false)。
* `allow_jagged_rows(bool)` - 接受缺少尾隨可選列的行。缺失值被視為空值。如果為 false,則缺少尾隨列的記錄將被視為錯誤記錄,如果錯誤記錄太多,則會在作業結果中返回無效錯誤。僅適用于 CSV,忽略其他格式。
* `bigquery_conn_id(str)` - 對特定 BigQuery 掛鉤的引用。
* `google_cloud_storage_conn_id(str)` - 對特定 Google 云存儲掛鉤的引用。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
* `src_fmt_configs(dict)` - 配置特定于源格式的可選字段
##### BigQueryDeleteDatasetOperator
##### BigQueryOperator
```py
class airflow.contrib.operators.bigquery_operator.BigQueryOperator(bql = None,sql = None,destination_dataset_table = False,write_disposition ='WRITE_EMPTY',allow_large_results = False,flatten_results = False,bigquery_conn_id ='bigquery_default',delegate_to = None,udf_config = False ,use_legacy_sql = True,maximum_billing_tier = None,maximumbytesbilled = None,create_disposition ='CREATE_IF_NEEDED',schema_update_options =(),query_params = None,priority ='INTERACTIVE',time_partitioning = {},*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
在特定的 BigQuery 數據庫中執行 BigQuery SQL 查詢
參數:
* `BQL(可接收表示 SQL 語句中的海峽,海峽列表(SQL 語句),或參照模板文件模板引用在“.SQL”結束海峽認可。)` - (不推薦使用。`SQL`參數代替)要執行的 sql 代碼(模板化)
* `SQL(可接收表示 SQL 語句中的海峽,海峽列表(SQL 語句),或參照模板文件模板引用在“.SQL”結束海峽認可。)` - SQL 代碼被執行(模板渲染后)
* `destination_dataset_table(str)` - 目的數據集表(\<project>|\<project>:)\<dataset>.\<table>,如果設置,將存儲查詢結果。(模板渲染后)
* `write_disposition(str)` - 指定目標表已存在時發生的操作。(默認:'WRITE_EMPTY')
* `create_disposition(str)` - 指定是否允許作業創建新表。(默認值:'CREATE_IF_NEEDED')
* `allow_large_results(bool)` - 是否允許大結果。
* `flatten_results(bool)` - 如果為 true 且查詢使用舊版 SQL 方言,則展平查詢結果中的所有嵌套和重復字段。`allow_large_results`必須是`true`如果設置為`false`。對于標準 SQL 查詢,將忽略此標志,并且結果永遠不會展平。
* `bigquery_conn_id(str)` - 對特定 BigQuery 鉤子的引用。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
* `udf_config(list)` - 查詢的用戶定義函數配置。有關詳細信息,請參閱[https://cloud.google.com/bigquery/user-defined-functions](https://cloud.google.com/bigquery/user-defined-functions)。
* `use_legacy_sql(bool)` - 是使用舊 SQL(true)還是標準 SQL(false)。
* `maximum_billing_tier(int)` - 用作基本價格乘數的正整數。默認為 None,在這種情況下,它使用項目中設置的值。
* `maximumbytesbilled(float)` - 限制為此作業計費的字節數。超出此限制的字節數的查詢將失敗(不會產生費用)。如果未指定,則將其設置為項目默認值。
* `schema_update_options(tuple)` - 允許更新目標表的模式作為加載作業的副作用。
* `query_params(dict)` - 包含查詢參數類型和值的字典,傳遞給 BigQuery。
* `priority(str)` - 指定查詢的優先級。可能的值包括 INTERACTIVE 和 BATCH。默認值為 INTERACTIVE。
* `time_partitioning(dict)` - 配置可選的時間分區字段,即按 API 規范按字段,類型和到期分區。請注意,'field'不能與 dataset.table $ partition 一起使用。
##### BigQueryTableDeleteOperator
```py
class airflow.contrib.operators.bigquery_table_delete_operator.BigQueryTableDeleteOperator(deletion_dataset_table,bigquery_conn_id ='bigquery_default',delegate_to = None,ignore_if_missing = False,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
刪除 BigQuery 表
參數:
* `deletion_dataset_table(str)` - 刪除的數據集表(\<project>|\<project>:)\<dataset>.\<table>,指示將刪除哪個表。(模板渲染后)
* `bigquery_conn_id(str)` - 對特定 BigQuery 鉤子的引用。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
* `ignore_if_missing(bool)` - 如果為 True,則即使請求的表不存在也返回成功。
##### BigQueryToBigQueryOperator
```py
class airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator(source_project_dataset_tables,destination_project_dataset_table,write_disposition ='WRITE_EMPTY',create_disposition ='CREATE_IF_NEEDED',bigquery_conn_id ='bigquery_default',delegate_to = None,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
將數據從一個 BigQuery 表復制到另一個。
也可以看看
有關這些參數的詳細信息,請訪問:[https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy](https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.copy)
參數:
* `source_project_dataset_tables(list|str)` - 一個或多個點(project:|project.)\<dataset>.\<table>用作源數據的 BigQuery 表。如果未包含\<project>,則 project 應當定義在連接 json 中。如果有多個源表,請使用列表。(模板渲染后)
* `destination_project_dataset_table(str)` - 目標 BigQuery 表。格式為:(`project:`|`project`)<dataset>.<table>(模板化)
* `write_disposition(str)` - 表已存在時的處理。
* `create_disposition(str)` - 如果表不存在,則創建處理。
* `bigquery_conn_id(str)` - 對特定 BigQuery 的引用。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
##### BigQueryToCloudStorageOperator
```py
class airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator(source_project_dataset_table,destination_cloud_storage_uris,compression ='NONE',export_format ='CSV',field_delimiter =',',print_header = True,bigquery_conn_id ='bigquery_default',delegate_to = None,*args, **kwargs)
```
基類: `airflow.models.BaseOperator`
將 BigQuery 表傳輸到 Google Cloud Storage 存儲桶。
也可以看看
有關這些參數的詳細信息,請訪問:[https://cloud.google.com/bigquery/docs/reference/v2/jobs](https://cloud.google.com/bigquery/docs/reference/v2/jobs)
參數:
* `source_project_dataset_table(str)` - 用作源數據的(\<project>.|<project>:)\<dataset>.\<table> BigQuery 表。如果未包含\<project>,則 project 應當定義在連接 json 中。(模板渲染后)
* `destination_cloud_storage_uris(list)` - 目標 Google 云端存儲 URI(例如 gs://some-bucket/some-file.txt)。(模板化)遵循此處定義的慣例:https://cloud.google.com/bigquery/exporting-data-from-bigquery#exportingmultiple
* `compression(str)` - 要使用的壓縮類型。
* `export_format` - 要導出的文件格式。
* `field_delimiter(str)` - 提取到 CSV 時使用的分隔符。
* `print_header(bool)` - 是否打印 CSV 文件頭。
* `bigquery_conn_id(str)` - 對特定 BigQuery 的引用。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
#### BigQueryHook
```py
class airflow.contrib.hooks.bigquery_hook.BigQueryHook(bigquery_conn_id ='bigquery_default',delegate_to = None,use_legacy_sql = True)
```
基類:[`airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook`],[`airflow.hooks.dbapi_hook.DbApiHook`],`airflow.utils.log.logging_mixin.LoggingMixin`
與 BigQuery 交互。此掛鉤使用 Google Cloud Platform 連接。
```py
get_conn()
```
返回 BigQuery PEP 249 連接對象。
```py
get_pandas_df(sql,parameters = None,dialect = None)
```
返回 BigQuery 查詢生成的結果的 Pandas DataFrame。必須重寫 DbApiHook 方法,因為 Pandas 不支持 PEP 249 連接,但 SQLite 除外。參看:
[https://github.com/pydata/pandas/blob/master/pandas/io/sql.py#L447 ](https://github.com/pydata/pandas/blob/master/pandas/io/sql.py#L447) [https://github.com/pydata/pandas/issues/6900](https://github.com/pydata/pandas/issues/6900)
參數:
* `sql(str)` - 要執行的 BigQuery SQL。
* `parameters(map 或 iterable)` - 用于呈現 SQL 查詢的參數(未使用,請保留覆蓋超類方法)
* `dialect({'legacy', 'standard'})` - BigQuery SQL 的方言 - 遺留 SQL 或標準 SQL 默認使用`self.use_legacy_sql(`如果未指定)
```py
get_service()
```
返回一個 BigQuery 服務對象。
```py
insert_rows(table,rows,target_fields = None,commit_every = 1000)
```
目前不支持插入。從理論上講,您可以使用 BigQuery 的流 API 將行插入表中,但這尚未實現。
```py
table_exists(project_id,dataset_id,table_id)
```
檢查 Google BigQuery 中是否存在表格。
參數:
* `project_id(str)` - 要在其中查找表的 Google 云項目。連接必須有對指定項目的訪問權限。
* `dataset_id(str)` - 要在其中查找表的數據集的名稱。
* `table_id(str)` - 要檢查的表的名稱。
### 云 DataFlow
#### DataFlow Operator
* [DataFlowJavaOperator](#DataFlowJavaOperator):啟動用 Java 編寫的 Cloud Dataflow 作業。
* [DataflowTemplateOperator](#DataflowTemplateOperator):啟動模板化的 Cloud DataFlow 批處理作業。
* [DataFlowPythonOperator](#DataFlowPythonOperator):啟動用 python 編寫的 Cloud Dataflow 作業。
##### DataFlowJavaOperator
```py
class airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator(jar,dataflow_default_options = None,options = None,gcp_conn_id ='google_cloud_default',delegate_to = None,poll_sleep = 10,job_class = None,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
啟動 Java Cloud DataFlow 批處理作業。操作的參數將傳遞給作業。
在 dag 的 default_args 中定義 dataflow_*參數是一個很好的做法,例如項目,區域和分段位置。
```py
default_args = {
'dataflow_default_options' : {
'project' : 'my-gcp-project' ,
'zone' : 'europe-west1-d' ,
'stagingLocation' : 'gs://my-staging-bucket/staging/'
}
}
```
您需要使用`jar`參數將路徑作為文件引用傳遞給數據流,jar 需要是一個自動執行的 jar(請參閱以下文檔:[https://beam.apache.org/documentation/runners/dataflow/](https://beam.apache.org/documentation/runners/dataflow/)。使用`options`傳遞你的參數。
```py
t1 = DataFlowOperation (
task_id = 'datapflow_example' ,
jar = '{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar' ,
options = {
'autoscalingAlgorithm' : 'BASIC' ,
'maxNumWorkers' : '50' ,
'start' : '{{ds}}' ,
'partitionType' : 'DAY' ,
'labels' : { 'foo' : 'bar' }
},
gcp_conn_id = 'gcp-airflow-service-account' ,
dag = my - dag )
```
這兩個`jar`和`options`模板化,所以你可以在其中使用變量。
```py
default_args = {
'owner' : 'airflow' ,
'depends_on_past' : False ,
'start_date' :
( 2016 , 8 , 1 ),
'email' : [ 'alex@vanboxel.be' ],
'email_on_failure' : False ,
'email_on_retry' : False ,
'retries' : 1 ,
'retry_delay' : timedelta ( minutes = 30 ),
'dataflow_default_options' : {
'project' : 'my-gcp-project' ,
'zone' : 'us-central1-f' ,
'stagingLocation' : 'gs://bucket/tmp/dataflow/staging/' ,
}
}
dag = DAG ( 'test-dag' , default_args = default_args )
task = DataFlowJavaOperator (
gcp_conn_id = 'gcp_default' ,
task_id = 'normalize-cal' ,
jar = '{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar' ,
options = {
'autoscalingAlgorithm' : 'BASIC' ,
'maxNumWorkers' : '50' ,
'start' : '{{ds}}' ,
'partitionType' : 'DAY'
},
dag = dag )
```
##### DataflowTemplateOperator
```py
class airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator(template,dataflow_default_options = None,parameters = None,gcp_conn_id ='google_cloud_default',delegate_to = None,poll_sleep = 10,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
啟動模板化云 DataFlow 批處理作業。操作的參數將傳遞給作業。在 dag 的 default_args 中定義 dataflow_ *參數是一個很好的做法,例如項目,區域和分段位置。
也可以看看
[https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters ](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters)[https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/RuntimeEnvironment)
```py
default_args = {
'dataflow_default_options' : {
'project' : 'my-gcp-project'
'zone' : 'europe-west1-d' ,
'tempLocation' : 'gs://my-staging-bucket/staging/'
}
}
}
```
您需要將路徑作為帶`template`參數的文件引用傳遞給數據流模板。使用`parameters`來傳遞參數給你的工作。使用`environment`對運行環境變量傳遞給你的工作。
```py
t1 = DataflowTemplateOperator (
task_id = 'datapflow_example' ,
template = '{{var.value.gcp_dataflow_base}}' ,
parameters = {
'inputFile' : "gs://bucket/input/my_input.txt" ,
'outputFile' : "gs://bucket/output/my_output.txt"
},
gcp_conn_id = 'gcp-airflow-service-account' ,
dag = my - dag )
```
`template`,`dataflow_default_options`并且`parameters`是模板化的,因此您可以在其中使用變量。
##### DataFlowPythonOperator
```py
class airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator(py_file,py_options = None,dataflow_default_options = None,options = None,gcp_conn_id ='google_cloud_default',delegate_to = None,poll_sleep = 10,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
```py
execute(context)
```
執行 python 數據流作業。
#### DataFlowHook
```py
class airflow.contrib.hooks.gcp_dataflow_hook.DataFlowHook(gcp_conn_id ='google_cloud_default',delegate_to = None,poll_sleep = 10)
```
基類: [`airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook`]
```py
get_conn()
```
返回 Google 云端存儲服務對象。
### Cloud DataProc
#### DataProc Operator
* [DataprocClusterCreateOperator](#DataprocClusterCreateOperator):在 Google Cloud Dataproc 上創建新集群。
* [DataprocClusterDeleteOperator](#DataprocClusterDeleteOperator):刪除 Google Cloud Dataproc 上的集群。
* [DataprocClusterScaleOperator](#DataprocClusterScaleOperator):在 Google Cloud Dataproc 上向上或向下擴展集群。
* [DataProcPigOperator](#DataProcPigOperator):在 Cloud DataProc 集群上啟動 Pig 查詢作業。
* [DataProcHiveOperator](#DataProcHiveOperator):在 Cloud DataProc 集群上啟動 Hive 查詢作業。
* [DataProcSparkSqlOperator](#DataProcSparkSqlOperator):在 Cloud DataProc 集群上啟動 Spark SQL 查詢作業。
* [DataProcSparkOperator](#DataProcSparkOperator):在 Cloud DataProc 集群上啟動 Spark 作業。
* [DataProcHadoopOperator](#DataProcHadoopOperator):在 Cloud DataProc 集群上啟動 Hadoop 作業。
* [DataProcPySparkOperator](#DataProcPySparkOperator):在 Cloud DataProc 集群上啟動 PySpark 作業。
* [DataprocWorkflowTemplateInstantiateOperator](#DataprocWorkflowTemplateInstantiateOperator):在 Google Cloud Dataproc 上實例化 WorkflowTemplate。
* [DataprocWorkflowTemplateInstantiateInlineOperator](#DataprocWorkflowTemplateInstantiateInlineOperator):在 Google Cloud Dataproc 上實例化 WorkflowTemplate 內聯。
##### DataprocClusterCreateOperator
```py
class airflow.contrib.operators.dataproc_operator.DataprocClusterCreateOperator(cluster_name,project_id,num_workers,zone,network_uri = None,subnetwork_uri = None,internal_ip_only = None,tags = None,storage_bucket = None,init_actions_uris = None,init_action_timeout ='10m',metadata = None,image_version = None,屬性= None,master_machine_type ='n1-standard-4',master_disk_size = 500,worker_machine_type ='n1-standard-4',worker_disk_size = 500,num_preemptible_workers = 0,labels = None,region =' global',gcp_conn_id ='google_cloud_default',delegate_to = None,service_account = None,service_account_scopes = None,idle_delete_ttl = None,auto_delete_time = None,auto_delete_ttl = None,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
在 Google Cloud Dataproc 上創建新集群。operator 將等待創建成功或創建過程中發生錯誤。
參數允許配置集群。請參閱
[https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters)
有關不同參數的詳細說明。鏈接中詳述的大多數配置參數都可作為此 operator 的參數。
參數:
* `cluster_name(str)` - 要創建的 DataProc 集群的名稱。(模板渲染后)
* `project_id(str)` - 用于創建集群的 Google 云項目的 ID。(模板渲染后)
* `num_workers(int)` - 工人數量
* `storage_bucket(str)` - 要使用的存儲桶,設置為 None 允許 dataproc 為您生成自定義存儲桶
* `init_actions_uris(list[str])` - 包含數據空間初始化腳本的 GCS uri 列表
* `init_action_timeout(str)` - init_actions_uris 中可執行腳本限定的完成時間
* `metadata(dict)` - 要添加到所有實例的鍵值 google 計算引擎元數據條目的字典
* `image_version(str)` - Dataproc 集群內的軟件版本
* `properties(dict)` - 配置的屬性字典(如spark-defaults.conf),見[https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#SoftwareConfig](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#SoftwareConfig)
* `master_machine_type(str)` - 計算要用于主節點的引擎機器類型
* `master_disk_size(int)` - 主節點的磁盤大小
* `worker_machine_type(str)` - 計算要用于工作節點的引擎計算機類型
* `worker_disk_size(int)` - 工作節點的磁盤大小
* `num_preemptible_workers(int)` - 可搶占的工作節點數
* `labels(dict)` - 要添加到集群的標簽的字典
* `zone(str)` - 集群所在的區域。(模板渲染后)
* `network_uri(str)` - 用于機器通信的網絡 uri,不能用 subnetwork_uri 指定
* `subnetwork_uri(str)` - 無法使用 network_uri 指定要用于機器通信的子網 uri
* `internal_ip_only(bool)` - 如果為 true,則集群中的所有實例將只具有內部 IP 地址。這只能為啟用子網的網絡啟用
* `tags(list[str])` - 要添加到所有實例的 GCE 標記
* `region` - 默認為'global',可能在未來變得相關。(模板渲染后)
* `gcp_conn_id(str)` - 用于連接到 Google Cloud Platform 的連接 ID。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
* `service_account(str)` - dataproc 實例的服務帳戶。
* `service_account_scopes(list[str])` - 要包含的服務帳戶范圍的 URI。
* `idle_delete_ttl(int)` - 集群在保持空閑狀態時保持活動狀態的最長持續時間。通過此閾值將導致集群被自動刪除。持續時間(秒)。
* `auto_delete_time(datetime.datetime)` - 自動刪除集群的時間。
* `auto_delete_ttl(int)` - 集群的生命周期,集群將在此持續時間結束時自動刪除,持續時間(秒)。(如果設置了 auto_delete_time,則將忽略此參數)
##### DataprocClusterScaleOperator
```py
class airflow.contrib.operators.dataproc_operator.DataprocClusterScaleOperator(cluster_name,project_id,region ='global',gcp_conn_id ='google_cloud_default',delegate_to = None,num_workers = 2,num_preemptible_workers = 0,graceful_decommission_timeout = None,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
在 Google Cloud Dataproc 上進行擴展,向上或向下擴展。operator 將等待,直到重新調整集群。
**示例** :
```py
t1 = DataprocClusterScaleOperator(
task_id ='dataproc_scale',project_id ='my-project',cluster_name ='cluster-1',num_workers = 10,num_preemptible_workers = 10,graceful_decommission_timeout ='1h'dag = dag)
```
也可以看看
有關擴展集群的更多詳細信息,請參閱以下參考:[https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/scaling-clusters](https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/scaling-clusters)
參數:
* `cluster_name(str)` - 要擴展的集群的名稱。(模板渲染后)
* `project_id(str)` - 集群運行的 Google 云項目的 ID。(模板渲染后)
* `region(str)` - 數據通路簇的區域。(模板渲染后)
* `gcp_conn_id(str)` - 用于連接到 Google Cloud Platform 的連接 ID。
* `num_workers(int)` - 工人數量
* `num_preemptible_workers(int)` - 新的可搶占工人數量
* `graceful_decommission_timeout(str)` - 優雅的 YARN decomissioning 超時。最大值為 1d
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
##### DataprocClusterDeleteOperator
```py
class airflow.contrib.operators.dataproc_operator.DataprocClusterDeleteOperator(cluster_name,project_id,region ='global',gcp_conn_id ='google_cloud_default',delegate_to = None,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
刪除 Google Cloud Dataproc 上的集群。operator 將等待,直到集群被銷毀。
參數:
* `cluster_name(str)` - 要創建的集群的名稱。(模板渲染后)
* `project_id(str)` - 集群運行的 Google 云項目的 ID。(模板渲染后)
* `region(str)` - 保留為“全局”,將來可能會變得相關。(模板渲染后)
* `gcp_conn_id(str)` - 用于連接到 Google Cloud Platform 的連接 ID。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
##### DataProcPigOperator
```py
class airflow.contrib.operators.dataproc_operator.DataProcPigOperator(query = None,query_uri = None,variables = None,job_name ='{{task.task_id}}_{{ds_nodash}}',cluster_name ='cluster-1',dataproc_pig_properties = None,dataproc_pig_jars = None,gcp_conn_id ='google_cloud_default',delegate_to = None,region ='global',*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
在 Cloud DataProc 集群上啟動 Pig 查詢作業。操作的參數將傳遞給集群。
在 dag 的 default_args 中定義 dataproc_*參數是一種很好的做法,比如集群名稱和 UDF。
```py
default_args = {
'cluster_name' : 'cluster-1' ,
'dataproc_pig_jars' : [
'gs://example/udf/jar/datafu/1.2.0/datafu.jar' ,
'gs://example/udf/jar/gpig/1.2/gpig.jar'
]
}
```
您可以將 pig 腳本作為字符串或文件引用傳遞。使用變量傳遞要在集群上解析的 pig 腳本的變量,或者使用要在腳本中解析的參數作為模板參數。
**示例** :
```py
t1 = DataProcPigOperator (
task_id = 'dataproc_pig' ,
query = 'a_pig_script.pig' ,
variables = { 'out' : 'gs://example/output/{{ds}}' },
dag = dag )
```
也可以看看
有關工作提交的更多詳細信息,請參閱以下參考:[https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs](https://cloud.google.com/dataproc/reference/rest/v1/projects.regions.jobs)
參數:
* `query(str)` - 對查詢文件的查詢或引用(pg 或 pig 擴展)。(模板渲染后)
* `query_uri(str)` - 云存儲上的豬腳本的 uri。
* `variables(dict)` - 查詢的命名參數的映射。(模板渲染后)
* `job_name(str)` - DataProc 集群中使用的作業名稱。默認情況下,此名稱是附加執行數據的 task_id,但可以進行模板化。該名稱將始終附加一個隨機數,以避免名稱沖突。(模板渲染后)
* `cluster_name(str)` - DataProc 集群的名稱。(模板渲染后)
* `dataproc_pig_properties(dict)` - Pig 屬性的映射。非常適合放入默認參數
* `dataproc_pig_jars(list)` - 在云存儲中配置的 jars 的 URI(例如:用于 UDF 和 lib),非常適合放入默認參數。
* `gcp_conn_id(str)` - 用于連接到 Google Cloud Platform 的連接 ID。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
* `region(str)` - 創建數據加載集群的指定區域。
##### DataProcHiveOperator
```py
class airflow.contrib.operators.dataproc_operator.DataProcHiveOperator(query = None,query_uri = None,variables = None,job_name ='{{task.task_id}}_{{ds_nodash}}',cluster_name ='cluster-1',dataproc_hive_properties = None,dataproc_hive_jars = None,gcp_conn_id ='google_cloud_default',delegate_to = None,region ='global',*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
在 Cloud DataProc 集群上啟動 Hive 查詢作業。
參數:
* `query(str)` - 查詢或對查詢文件的引用(q 擴展名)。
* `query_uri(str)` - 云存儲上的 hive 腳本的 uri。
* `variables(dict)` - 查詢的命名參數的映射。
* `job_name(str)` - DataProc 集群中使用的作業名稱。默認情況下,此名稱是附加執行數據的 task_id,但可以進行模板化。該名稱將始終附加一個隨機數,以避免名稱沖突。
* `cluster_name(str)` - DataProc 集群的名稱。
* `dataproc_hive_properties(dict)` - Pig 屬性的映射。非常適合放入默認參數
* `dataproc_hive_jars(list)` - 在云存儲中配置的 jars 的 URI(例如:用于 UDF 和 lib),非常適合放入默認參數。
* `gcp_conn_id(str)` - 用于連接到 Google Cloud Platform 的連接 ID。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
* `region(str)` - 創建數據加載集群的指定區域。
##### DataProcSparkSqlOperator
```py
class airflow.contrib.operators.dataproc_operator.DataProcSparkSqlOperator(query = None,query_uri = None,variables = None,job_name ='{{task.task_id}}_{{ds_nodash}}',cluster_name ='cluster-1',dataproc_spark_properties = None,dataproc_spark_jars = None,gcp_conn_id ='google_cloud_default',delegate_to = None,region ='global',*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
在 Cloud DataProc 集群上啟動 Spark SQL 查詢作業。
參數:
* `query(str)` - 查詢或對查詢文件的引用(q 擴展名)。(模板渲染后)
* `query_uri(str)` - 云存儲上的一個 spark sql 腳本的 uri。
* `variables(dict)` - 查詢的命名參數的映射。(模板渲染后)
* `job_name(str)` - DataProc 集群中使用的作業名稱。默認情況下,此名稱是附加執行數據的 task_id,但可以進行模板化。該名稱將始終附加一個隨機數,以避免名稱沖突。(模板渲染后)
* `cluster_name(str)` - DataProc 集群的名稱。(模板渲染后)
* `dataproc_spark_properties(dict)` - Pig 屬性的映射。非常適合放入默認參數
* `dataproc_spark_jars(list)` - 在云存儲中配置的 jars 的 URI(例如:用于 UDF 和 lib),非常適合放入默認參數。
* `gcp_conn_id(str)` - 用于連接到 Google Cloud Platform 的連接 ID。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
* `region(str)` - 創建數據加載集群的指定區域。
##### DataProcSparkOperator
```py
class airflow.contrib.operators.dataproc_operator.DataProcSparkOperator(main_jar = None,main_class = None,arguments = None,archives = None,files = None,job_name ='{{task.task_id}}_{{ds_nodash}}',cluster_name ='cluster-1',dataproc_spark_properties = None,dataproc_spark_jars = None,gcp_conn_id ='google_cloud_default',delegate_to = None,region ='global',*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
在 Cloud DataProc 集群上啟動 Spark 作業。
參數:
* `main_jar(str)` - 在云存儲上配置的作業 jar 的 URI。(使用 this 或 main_class,而不是兩者一起)。
* `main_class(str)` - 作業類的名稱。(使用 this 或 main_jar,而不是兩者一起)。
* `arguments(list)` - 作業的參數。(模板渲染后)
* `archives(list)` - 將在工作目錄中解壓縮的已歸檔文件列表。應存儲在云存儲中。
* `files(list)` - 要復制到工作目錄的文件列表
* `job_name(str)` - DataProc 集群中使用的作業名稱。默認情況下,此名稱是附加執行數據的 task_id,但可以進行模板化。該名稱將始終附加一個隨機數,以避免名稱沖突。(模板渲染后)
* `cluster_name(str)` - DataProc 集群的名稱。(模板渲染后)
* `dataproc_spark_properties(dict)` - Pig 屬性的映射。非常適合放入默認參數
* `dataproc_spark_jars(list)` - 在云存儲中配置的 jars 的 URI(例如:用于 UDF 和 lib),非常適合放入默認參數。
* `gcp_conn_id(str)` - 用于連接到 Google Cloud Platform 的連接 ID。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
* `region(str)` - 創建數據加載集群的指定區域。
##### DataProcHadoopOperator
```py
class airflow.contrib.operators.dataproc_operator.DataProcHadoopOperator(main_jar = None,main_class = None,arguments = None,archives = None,files = None,job_name ='{{task.task_id}}_{{ds_nodash}}',cluster_name ='cluster-1',dataproc_hadoop_properties = None,dataproc_hadoop_jars = None,gcp_conn_id ='google_cloud_default',delegate_to = None,region ='global',*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
在 Cloud DataProc 集群上啟動 Hadoop 作業。
參數:
* `main_jar(str)` - 在云存儲上配置的作業 jar 的 URI。(使用 this 或 main_class,而不是兩者一起)。
* `main_class(str)` - 作業類的名稱。(使用 this 或 main_jar,而不是兩者一起)。
* `arguments(list)` - 作業的參數。(模板渲染后)
* `archives(list)` - 將在工作目錄中解壓縮的已歸檔文件列表。應存儲在云存儲中。
* `files(list)` - 要復制到工作目錄的文件列表
* `job_name(str)` - DataProc 集群中使用的作業名稱。默認情況下,此名稱是附加執行數據的 task_id,但可以進行模板化。該名稱將始終附加一個隨機數,以避免名稱沖突。(模板渲染后)
* `cluster_name(str)` - DataProc 集群的名稱。(模板渲染后)
* `dataproc_hadoop_properties(dict)` - Pig 屬性的映射。非常適合放入默認參數
* `dataproc_hadoop_jars(list)` - 在云存儲中配置的 jars 的 URI(例如:用于 UDF 和 lib),非常適合放入默認參數。
* `gcp_conn_id(str)` - 用于連接到 Google Cloud Platform 的連接 ID。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
* `region(str)` - 創建數據加載集群的指定區域。
##### DataProcPySparkOperator
```py
class airflow.contrib.operators.dataproc_operator.DataProcPySparkOperator(main,arguments = None,archives = None,pyfiles = None,files = None,job_name ='{{task.task_id}}_{{ds_nodash}}',cluster_name =' cluster-1',dataproc_pyspark_properties = None,dataproc_pyspark_jars = None,gcp_conn_id ='google_cloud_default',delegate_to = None,region ='global',*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
在 Cloud DataProc 集群上啟動 PySpark 作業。
參數:
* `main(str)` - [必需]用作驅動程序的主 Python 文件的 Hadoop 兼容文件系統(HCFS)URI。必須是.py 文件。
* `arguments(list)` - 作業的參數。(模板渲染后)
* `archives(list)` - 將在工作目錄中解壓縮的已歸檔文件列表。應存儲在云存儲中。
* `files(list)` - 要復制到工作目錄的文件列表
* `pyfiles(list)` - 要傳遞給 PySpark 框架的 Python 文件列表。支持的文件類型:.py,.egg 和.zip
* `job_name(str)` - DataProc 集群中使用的作業名稱。默認情況下,此名稱是附加執行數據的 task_id,但可以進行模板化。該名稱將始終附加一個隨機數,以避免名稱沖突。(模板渲染后)
* `cluster_name(str)` - DataProc 集群的名稱。
* `dataproc_pyspark_properties(dict)` - Pig 屬性的映射。非常適合放入默認參數
* `dataproc_pyspark_jars(list)` - 在云存儲中配置的 jars 的 URI(例如:用于 UDF 和 lib),非常適合放入默認參數。
* `gcp_conn_id(str)` - 用于連接到 Google Cloud Platform 的連接 ID。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
* `region(str)` - 創建數據加載集群的指定區域。
##### DataprocWorkflowTemplateInstantiateOperator
```py
class airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(template_id,*args,**kwargs)
```
基類: [`airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateBaseOperator`]
在 Google Cloud Dataproc 上實例化 WorkflowTemplate。operator 將等待 WorkflowTemplate 完成執行。
也可以看看
請參閱:[https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiate](https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiate)
參數:
* `template_id(str)` - 模板的 id。(模板渲染后)
* `project_id(str)` - 模板運行所在的 Google 云項目的 ID
* `region(str)` - 保留為“全局”,將來可能會變得相關
* `gcp_conn_id(str)` - 用于連接到 Google Cloud Platform 的連接 ID。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
##### DataprocWorkflowTemplateInstantiateInlineOperator
```py
class airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateInstantiateInlineOperator(template,*args,**kwargs)
```
基類: [`airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateBaseOperator`]
在 Google Cloud Dataproc 上實例化 WorkflowTemplate 內聯。operator 將等待 WorkflowTemplate 完成執行。
也可以看看
請參閱:[https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiateInline](https://cloud.google.com/dataproc/docs/reference/rest/v1beta2/projects.regions.workflowTemplates/instantiateInline)
參數:
* `template(map)` - 模板內容。(模板渲染后)
* `project_id(str)` - 模板運行所在的 Google 云項目的 ID
* `region(str)` - 保留為“全局”,將來可能會變得相關
* `gcp_conn_id(str)` - 用于連接到 Google Cloud Platform 的連接 ID。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
### 云數據存儲區
#### 數據存儲區運營商
* [DatastoreExportOperator](#DatastoreExportOperator):將實體從 Google Cloud Datastore 導出到云存儲。
* [DatastoreImportOperator](#DatastoreImportOperator):將實體從云存儲導入 Google Cloud Datastore。
##### DatastoreExportOperator
```py
class airflow.contrib.operators.datastore_export_operator.DatastoreExportOperator(bucket,namespace = None,datastore_conn_id ='google_cloud_default',cloud_storage_conn_id ='google_cloud_default',delegate_to = None,entity_filter = None,labels = None,polling_interval_in_seconds = 10,overwrite_existing = False,xcom_push = False,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
將實體從 Google Cloud Datastore 導出到云存儲
參數:
* `bucket(str)` - 要備份數據的云存儲桶的名稱
* `namespace(str)` - 指定云存儲桶中用于備份數據的可選命名空間路徑。如果 GCS 中不存在此命名空間,則將創建該命名空間。
* `datastore_conn_id(str)` - 要使用的數據存儲區連接 ID 的名稱
* `cloud_storage_conn_id(str)` - 強制寫入備份的云存儲連接 ID 的名稱
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
* `entity_filter(dict)` - 導出中包含項目中哪些數據的說明,請參閱[https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter](https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter)
* `labels(dict)` - 客戶端分配的云存儲標簽
* `polling_interval_in_seconds(int)` - 再次輪詢執行狀態之前等待的秒數
* `overwrite_existing(bool)` - 如果存儲桶+命名空間不為空,則在導出之前將清空它。這樣可以覆蓋現有備份。
* `xcom_push(bool)` - 將操作名稱推送到 xcom 以供參考
##### DatastoreImportOperator
```py
class airflow.contrib.operators.datastore_import_operator.DatastoreImportOperator(bucket,file,namespace = None,entity_filter = None,labels = None,datastore_conn_id ='google_cloud_default',delegate_to = None,polling_interval_in_seconds = 10,xcom_push = False,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
將實體從云存儲導入 Google Cloud Datastore
參數:
* `bucket(str)` - 云存儲中用于存儲數據的容器
* `file(str)` - 指定云存儲桶中備份元數據文件的路徑。它應該具有擴展名.overall_export_metadata
* `namespace(str)` - 指定云存儲桶中備份元數據文件的可選命名空間。
* `entity_filter(dict)` - 導出中包含項目中哪些數據的說明,請參閱[https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter](https://cloud.google.com/datastore/docs/reference/rest/Shared.Types/EntityFilter)
* `labels(dict)` - 客戶端分配的云存儲標簽
* `datastore_conn_id(str)` - 要使用的連接 ID 的名稱
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
* `polling_interval_in_seconds(int)` - 再次輪詢執行狀態之前等待的秒數
* `xcom_push(bool)` - 將操作名稱推送到 xcom 以供參考
#### DatastoreHook
```py
class airflow.contrib.hooks.datastore_hook.DatastoreHook(datastore_conn_id ='google_cloud_datastore_default',delegate_to = None)
```
基類: [`airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook`]
與 Google Cloud Datastore 互動。此掛鉤使用 Google Cloud Platform 連接。
此對象不是線程安全的。如果要同時發出多個請求,則需要為每個線程創建一個鉤子。
```py
allocate_ids(partialKeys)
```
為不完整的密鑰分配 ID。請參閱[https://cloud.google.com/datastore/docs/reference/rest/v1/projects/allocateIds](https://cloud.google.com/datastore/docs/reference/rest/v1/projects/allocateIds)
參數:**partialKeys** - 部分鍵列表
返回:完整密鑰列表。
```py
begin_transaction()
```
獲取新的事務處理
> 也可以看看
>
> [https://cloud.google.com/datastore/docs/reference/rest/v1/projects/beginTransaction](https://cloud.google.com/datastore/docs/reference/rest/v1/projects/beginTransaction)
返回:交易句柄
```py
commit(body)
```
提交事務,可選地創建,刪除或修改某些實體。
也可以看看
[https://cloud.google.com/datastore/docs/reference/rest/v1/projects/commit](https://cloud.google.com/datastore/docs/reference/rest/v1/projects/commit)
參數:**body** - 提交請求的主體
返回:提交請求的響應主體
```py
delete_operation(名稱)
```
刪除長時間運行的操作
參數:**name** - 操作資源的名稱
```py
export_to_storage_bucket(bucket,namespace = None,entity_filter = None,labels = None)
```
將實體從 Cloud Datastore 導出到 Cloud Storage 進行備份
```py
get_conn(version= 'V1')
```
返回 Google 云端存儲服務對象。
```py
GET_OPERATION(name)
```
獲取長時間運行的最新狀態
參數:**name** - 操作資源的名稱
```py
import_from_storage_bucket(bucket,file,namespace = None,entity_filter = None,labels = None)
```
將備份從云存儲導入云數據存儲
```py
lookup(keys,read_consistency = None,transaction = None)
```
按鍵查找一些實體
也可以看看
[https://cloud.google.com/datastore/docs/reference/rest/v1/projects/lookup](https://cloud.google.com/datastore/docs/reference/rest/v1/projects/lookup)
參數:
* **keys** - 要查找的鍵
* **read_consistency** - 要使用的讀取一致性。默認,強或最終。不能與事務一起使用。
* **transaction** - 要使用的事務,如果有的話。
返回:查找請求的響應主體。
```py
poll_operation_until_done(name,polling_interval_in_seconds)
```
輪詢備份操作狀態直到完成
```py
rollback(transaction)
```
回滾事務
也可以看看
[https://cloud.google.com/datastore/docs/reference/rest/v1/projects/rollback](https://cloud.google.com/datastore/docs/reference/rest/v1/projects/rollback)
參數:**transaction** - 要回滾的事務
```py
run_query(體)
```
運行實體查詢。
也可以看看
[https://cloud.google.com/datastore/docs/reference/rest/v1/projects/runQuery](https://cloud.google.com/datastore/docs/reference/rest/v1/projects/runQuery)
參數:**body** - 查詢請求的主體
返回:批量查詢結果。
### 云 ML 引擎
#### 云 ML 引擎運營商
* [MLEngineBatchPredictionOperator](#MLEngineBatchPredictionOperator):啟動 Cloud ML Engine 批量預測作業。
* [MLEngineModelOperator](#MLEngineModelOperator):管理 Cloud ML Engine 模型。
* [MLEngineTrainingOperator](#MLEngineTrainingOperator):啟動 Cloud ML Engine 培訓工作。
* [MLEngineVersionOperator](#MLEngineVersionOperator):管理 Cloud ML Engine 模型版本。
##### MLEngineBatchPredictionOperator
```py
class airflow.contrib.operators.mlengine_operator.MLEngineBatchPredictionOperator(project_id,job_id,region,data_format,input_paths,output_path,model_name = None,version_name = None,uri = None,max_worker_count = None,runtime_version = None,gcp_conn_id ='google_cloud_default',delegate_to = None,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
啟動 Google Cloud ML Engine 預測作業。
注意:對于模型原點,用戶應該考慮以下三個選項中的一個:1。僅填充“uri”字段,該字段應該是指向 tensorflow savedModel 目錄的 GCS 位置。2.僅填充'model_name'字段,該字段引用現有模型,并將使用模型的默認版本。3.填充“model_name”和“version_name”字段,這些字段指特定模型的特定版本。
在選項 2 和 3 中,模型和版本名稱都應包含最小標識符。例如,打電話
```py
MLEngineBatchPredictionOperator (
... ,
model_name = 'my_model' ,
version_name = 'my_version' ,
... )
```
如果所需的型號版本是“projects / my_project / models / my_model / versions / my_version”。
有關參數的更多文檔,請參閱[https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs](https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs)。
參數:
* `project_id(str)` - 提交預測作業的 Google Cloud 項目名稱。(模板渲染后)
* `job_id(str)` - Google Cloud ML Engine 上預測作業的唯一 ID。(模板渲染后)
* `data_format(str)` - 輸入數據的格式。如果未提供或者不是[“TEXT”,“TF_RECORD”,“TF_RECORD_GZIP”]之一,它將默認為“DATA_FORMAT_UNSPECIFIED”。
* `input_paths(list[str])` - 批量預測的輸入數據的 GCS 路徑列表。接受通配符 operator [*](28),但僅限于結尾處。(模板渲染后)
* `output_path(str)` - 寫入預測結果的 GCS 路徑。(模板渲染后)
* `region(str)` - 用于運行預測作業的 Google Compute Engine 區域。(模板化)
* `model_name(str)` - 用于預測的 Google Cloud ML Engine 模型。如果未提供 version_name,則將使用此模型的默認版本。如果提供了 version_name,則不應為 None。如果提供 uri,則應為 None。(模板渲染后)
* `version_name(str)` - 用于預測的 Google Cloud ML Engine 模型版本。如果提供 uri,則應為 None。(模板渲染后)
* `uri(str)` - 用于預測的已保存模型的 GCS 路徑。如果提供了 model_name,則應為 None。它應該是指向張量流 SavedModel 的 GCS 路徑。(模板渲染后)
* `max_worker_count(int)` - 用于并行處理的最大 worker 數。如果未指定,則默認為 10。
* `runtime_version(str)` - 用于批量預測的 Google Cloud ML Engine 運行時版本。
* `gcp_conn_id(str)` - 用于連接到 Google Cloud Platform 的連接 ID。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用 doamin 范圍的委派。
```py
Raises:
`ValueError` :如果無法確定唯一的模型/版本來源。
```
##### MLEngineModelOperator
```py
class airflow.contrib.operators.mlengine_operator.MLEngineModelOperator(project_id,model,operation ='create',gcp_conn_id ='google_cloud_default',delegate_to = None,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
管理 Google Cloud ML Engine 模型的運營商。
參數:
* `project_id(str)` - MLEngine 模型所屬的 Google Cloud 項目名稱。(模板渲染后)
* `model(dict)` -
包含有關模型信息的字典。如果`操作`是`create`,則`model`參數應包含有關此模型的所有信息,例如`name`。
如果`操作`是`get`,則`model`參數應包含`模型`的`名稱`。
* `操作` -
執行的操作。可用的操作是:
* `create`:創建`model`參數提供的新模型。
* `get`:獲取在模型中指定名稱的特定`模型`。
* `gcp_conn_id(str)` - 獲取連接信息時使用的連接 ID。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
##### MLEngineTrainingOperator
```py
class airflow.contrib.operators.mlengine_operator.MLEngineTrainingOperator(project_id,job_id,package_uris,training_python_module,training_args,region,scale_tier = None,runtime_version = None,python_version = None,job_dir = None,gcp_conn_id ='google_cloud_default',delegate_to = None,mode ='PRODUCTION',*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
啟動 MLEngine 培訓工作的operator 。
參數:
* `project_id(str)` - 應在其中運行 MLEngine 培訓作業的 Google Cloud 項目名稱(模板化)。
* `job_id(str)` - 提交的 Google MLEngine 培訓作業的唯一模板化 ID。(模板渲染后)
* `package_uris(str)` - MLEngine 培訓作業的包位置列表,其中應包括主要培訓計劃+任何其他依賴項。(模板渲染后)
* `training_python_module(str)` - 安裝'package_uris'軟件包后,在 MLEngine 培訓作業中運行的 Python 模塊名稱。(模板渲染后)
* `training_args(str)` - 傳遞給 MLEngine 訓練程序的模板化命令行參數列表。(模板渲染后)
* `region(str)` - 用于運行 MLEngine 培訓作業的 Google Compute Engine 區域(模板化)。
* `scale_tier(str)` - MLEngine 培訓作業的資源層。(模板渲染后)
* `runtime_version(str)` - 用于培訓的 Google Cloud ML 運行時版本。(模板渲染后)
* `python_version(str)` - 訓練中使用的 Python 版本。(模板渲染后)
* `job_dir(str)` - 用于存儲培訓輸出和培訓所需的其他數據的 Google 云端存儲路徑。(模板渲染后)
* `gcp_conn_id(str)` - 獲取連接信息時使用的連接 ID。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
* `mode(str)` - 可以是'DRY_RUN'/'CLOUD'之一。在“DRY_RUN”模式下,不會啟動真正的培訓作業,但會打印出 MLEngine 培訓作業請求。在“CLOUD”模式下,將發出真正的 MLEngine 培訓作業創建請求。
##### MLEngineVersionOperator
```py
class airflow.contrib.operators.mlengine_operator.MLEngineVersionOperator(project_id,model_name,version_name = None,version = None,operation ='create',gcp_conn_id ='google_cloud_default',delegate_to = None,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
管理 Google Cloud ML Engine 版本的運營商。
參數:
* `project_id(str)` - MLEngine 模型所屬的 Google Cloud 項目名稱。
* `model_name(str)` - 版本所屬的 Google Cloud ML Engine 模型的名稱。(模板渲染后)
* `version_name(str)` - 用于正在操作的版本的名稱。如果沒有人及`版本`的說法是沒有或不具備的值`名稱`鍵,那么這將是有效載荷中用于填充`名稱`鍵。(模板渲染后)
* `version(dict)` - 包含版本信息的字典。如果`操作`是`create`,則`version`應包含有關此版本的所有信息,例如 name 和 deploymentUrl。如果`操作`是`get`或`delete`,則`version`參數應包含`版本`的`名稱`。如果是 None,則唯一可能的`操作`是`list`。(模板渲染后)
* `operation(str)` -
執行的操作。可用的操作是:
* `create`:在`model_name`指定的`模型中`創建新版本,在這種情況下,`version`參數應包含創建該版本的所有信息(例如`name`,`deploymentUrl`)。
* `get`:獲取`model_name`指定的`模型中`特定版本的完整信息。應在`version`參數中指定版本的名稱。
* `list`:列出`model_name`指定的`模型的`所有可用版本。
* `delete`:從`model_name`指定的`模型中`刪除`version`參數中指定的`版本`。應在`version`參數中指定版本的名稱。
* `gcp_conn_id(str)` - 獲取連接信息時使用的連接 ID。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
#### Cloud ML Engine Hook
##### MLEngineHook
```py
class airflow.contrib.hooks.gcp_mlengine_hook.MLEngineHook(gcp_conn_id ='google_cloud_default',delegate_to = None)
```
基類: [`airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook`]
```py
create_job(project_id,job,use_existing_job_fn = None)
```
啟動 MLEngine 作業并等待它達到終端狀態。
參數:
* `project_id(str)` - 將在其中啟動 MLEngine 作業的 Google Cloud 項目 ID。
* `job(dict)` -
應該提供給 MLEngine API 的 MLEngine Job 對象,例如:
```py
{
'jobId' : 'my_job_id' ,
'trainingInput' : {
'scaleTier' : 'STANDARD_1' ,
...
}
}
```
* `use_existing_job_fn(function)` - 如果已存在具有相同 job_id 的 MLEngine 作業,則此方法(如果提供)將決定是否應使用此現有作業,繼續等待它完成并返回作業對象。它應該接受 MLEngine 作業對象,并返回一個布爾值,指示是否可以重用現有作業。如果未提供“use_existing_job_fn”,我們默認重用現有的 MLEngine 作業。
返回:如果作業成功到達終端狀態(可能是 FAILED 或 CANCELED 狀態),則為 MLEngine 作業對象。
返回類型:字典
```py
create_model(project_id,model)
```
創建一個模型。阻止直到完成。
```py
create_version(project_id,model_name,version_spec)
```
在 Google Cloud ML Engine 上創建版本。
如果版本創建成功則返回操作,否則引發錯誤。
```py
delete_version(project_id,model_name,version_name)
```
刪除給定版本的模型。阻止直到完成。
```py
get_conn()
```
返回 Google MLEngine 服務對象。
```py
get_model(project_id,model_name)
```
獲取一個模型。阻止直到完成。
```py
list_versions(project_id,model_name)
```
列出模型的所有可用版本。阻止直到完成。
```py
set_default_version(project_id,model_name,version_name)
```
將版本設置為默認值。阻止直到完成。
### 云儲存
#### 存儲運營商
* [FileToGoogleCloudStorageOperator](#FileToGoogleCloudStorageOperator):將文件上傳到 Google 云端存儲。
* [GoogleCloudStorageCreateBucketOperator](#GoogleCloudStorageCreateBucketOperator):創建新的云存儲桶。
* [GoogleCloudStorageListOperator](#GoogleCloudStorageListOperator):列出存儲桶中的所有對象,并在名稱中添加字符串前綴和分隔符。
* [GoogleCloudStorageDownloadOperator](#GoogleCloudStorageDownloadOperator):從 Google 云端存儲中下載文件。
* [GoogleCloudStorageToBigQueryOperator](#GoogleCloudStorageToBigQueryOperator):將 Google 云存儲中的文件加載到 BigQuery 中。
* [GoogleCloudStorageToGoogleCloudStorageOperator](#GoogleCloudStorageToGoogleCloudStorageOperator):將對象從存儲桶復制到另一個存儲桶,并在需要時重命名。
##### FileToGoogleCloudStorageOperator
```py
class airflow.contrib.operators.file_to_gcs.FileToGoogleCloudStorageOperator(src,dst,bucket,google_cloud_storage_conn_id ='google_cloud_default',mime_type ='application / octet-stream',delegate_to = None,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
將文件上傳到 Google 云端存儲
參數:
* `src(str)` - 本地文件的路徑。(模板渲染后)
* `dst(str)` - 指定存儲桶中的目標路徑。(模板渲染后)
* `bucket(str)` - 要上傳的存儲桶。(模板渲染后)
* `google_cloud_storage_conn_id(str)` - 要上傳的 Airflow 連接 ID
* `mime_type(str)` - mime 類型字符串
* `delegate_to(str)` - 代理的帳戶(如果有)
```py
execute(context)
```
將文件上傳到 Google 云端存儲
##### GoogleCloudStorageCreateBucketOperator
```py
class airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator(bucket_name,storage_class ='MULTI_REGIONAL',location ='US',project_id = None,labels = None,google_cloud_storage_conn_id ='google_cloud_default',delegate_to = None,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
創建一個新存儲桶。Google 云端存儲使用平面命名空間,因此您無法創建名稱已在使用中的存儲桶。
> 也可以看看
>
> 有關詳細信息,請參閱存儲桶命名指南:[https://cloud.google.com/storage/docs/bucketnaming.html#requirements](https://cloud.google.com/storage/docs/bucketnaming.html#requirements)
參數:
* `bucket_name(str)` - 存儲桶的名稱。(模板渲染后)
* `storage_class(str)` -
這定義了存儲桶中對象的存儲方式,并確定了 SLA 和存儲成本(模板化)。價值包括
* `MULTI_REGIONAL`
* `REGIONAL`
* `STANDARD`
* `NEARLINE`
* `COLDLINE`
如果在創建存儲桶時未指定此值,則默認為 STANDARD。
* `location(str)` -
水桶的位置。(模板化)存儲桶中對象的對象數據駐留在此區域內的物理存儲中。默認為美國。
也可以看看
[https://developers.google.com/storage/docs/bucket-locations](https://developers.google.com/storage/docs/bucket-locations)
* `project_id(str)` - GCP 項目的 ID。(模板渲染后)
* `labels(dict)` - 用戶提供的鍵/值對標簽。
* `google_cloud_storage_conn_id(str)` - 連接到 Google 云端存儲時使用的連接 ID。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
示例:
以下 operator 將在區域中創建`test-bucket`具有`MULTI_REGIONAL`存儲類的新存儲桶`EU`
```py
CreateBucket = GoogleCloudStorageCreateBucketOperator (
task_id = 'CreateNewBucket' ,
bucket_name = 'test-bucket' ,
storage_class = 'MULTI_REGIONAL' ,
location = 'EU' ,
labels = { 'env' : 'dev' , 'team' : 'airflow' },
google_cloud_storage_conn_id = 'airflow-service-account'
)
```
##### GoogleCloudStorageDownloadOperator
```py
class airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator(bucket,object,filename = None,store_to_xcom_key = None,google_cloud_storage_conn_id ='google_cloud_default',delegate_to = None,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
從 Google 云端存儲下載文件。
參數:
* `bucket(str)` - 對象所在的 Google 云存儲桶。(模板渲染后)
* `object(str)` - 要在 Google 云存儲桶中下載的對象的名稱。(模板渲染后)
* `filename(str)` - 應將文件下載到的本地文件系統(正在執行操作符的位置)上的文件路徑。(模板化)如果未傳遞文件名,則下載的數據將不會存儲在本地文件系統中。
* `store_to_xcom_key(str)` - 如果設置了此參數,operator 將使用此參數中設置的鍵將下載文件的內容推送到 XCom。如果未設置,則下載的數據不會被推送到 XCom。(模板渲染后)
* `google_cloud_storage_conn_id(str)` - 連接到 Google 云端存儲時使用的連接 ID。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
##### GoogleCloudStorageListOperator
```py
class airflow.contrib.operators.gcslistoperator.GoogleCloudStorageListOperator(bucket,prefix = None,delimiter = None,google_cloud_storage_conn_id ='google_cloud_default',delegate_to = None,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
使用名稱中的給定字符串前綴和分隔符列出存儲桶中的所有對象。
```
此 operator 返回一個 python 列表,其中包含可供其使用的對象的名稱
```
`xcom`在下游任務中。
參數:
* `bucket(str)` - 用于查找對象的 Google 云存儲桶。(模板渲染后)
* `prefix(str)` - 前綴字符串,用于過濾名稱以此前綴開頭的對象。(模板渲染后)
* `delimiter(str)` - 要過濾對象的分隔符。(模板化)例如,要列出 GCS 目錄中的 CSV 文件,您可以使用 delimiter ='。csv'。
* `google_cloud_storage_conn_id(str)` - 連接到 Google 云端存儲時使用的連接 ID。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
示例:
以下 operator 將列出存儲桶中文件`sales/sales-2017`夾中的所有 Avro 文件`data`。
```py
GCS_Files = GoogleCloudStorageListOperator (
task_id = 'GCS_Files' ,
bucket = 'data' ,
prefix = 'sales/sales-2017/' ,
delimiter = '.avro' ,
google_cloud_storage_conn_id = google_cloud_conn_id
)
```
##### GoogleCloudStorageToBigQueryOperator
```py
class airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator(bucket,source_objects,destination_project_dataset_table,schema_fields = None,schema_object = None,source_format ='CSV',compression ='NONE',create_disposition ='CREATE_IF_NEEDED',skip_leading_rows = 0,write_disposition =' WRITE_EMPTY',field_delimiter =',',max_bad_records = 0,quote_character = None,ignore_unknown_values = False,allow_quoted_newlines = False,allow_jagged_rows = False,max_id_key = None,bigquery_conn_id ='bigquery_default',google_cloud_storage_conn_id ='google_cloud_default',delegate_to = None,schema_update_options =(),src_fmt_configs = {},external_table = False,time_partitioning = {},*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
將文件從 Google 云存儲加載到 BigQuery 中。
可以用兩種方法之一指定用于 BigQuery 表的模式。您可以直接傳遞架構字段,也可以將運營商指向 Google 云存儲對象名稱。Google 云存儲中的對象必須是包含架構字段的 JSON 文件。
參數:
* `bucket(str)` - 要加載的桶。(模板渲染后)
* `source_objects` - 要加載的 Google 云存儲 URI 列表(模板化)。如果 source_format 是'DATASTORE_BACKUP',則列表必須只包含一個 URI。
* `destination_project_dataset_table(str)` - 用于加載數據的表(\<project>.)\<dataset>.\<table> BigQuery 表。如果未包\<project>,則項目將是連接 json 中定義的項目。(模板渲染后)
* `schema_fields(list)` - 如果設置,則此處定義的架構字段列表:[https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load](https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load)當 source_format 為'DATASTORE_BACKUP'時,不應設置。
* `schema_object` - 如果設置,則指向包含表的架構的.json 文件的 GCS 對象路徑。(模板渲染后)
* `schema_object` - 字符串
* `source_format(str)` - 要導出的文件格式。
* `compression(str)` - [可選]數據源的壓縮類型。可能的值包括 GZIP 和 NONE。默認值為 NONE。Google Cloud Bigtable,Google Cloud Datastore 備份和 Avro 格式會忽略此設置。
* `create_disposition(str)` - 如果表不存在,則創建處置。
* `skip_leading_rows(int)` - 從 CSV 加載時要跳過的行數。
* `write_disposition(str)` - 表已存在時的寫處置。
* `field_delimiter(str)` - 從 CSV 加載時使用的分隔符。
* `max_bad_records(int)` - BigQuery 在運行作業時可以忽略的最大錯誤記錄數。
* `quote_character(str)` - 用于引用 CSV 文件中數據部分的值。
* `ignore_unknown_values(bool)` - [可選]指示 BigQuery 是否應允許表模式中未表示的額外值。如果為 true,則忽略額外值。如果為 false,則將具有額外列的記錄視為錯誤記錄,如果錯誤記錄太多,則在作業結果中返回無效錯誤。
* `allow_quoted_newlines(bool)` - 是否允許引用的換行符(true)或不允許(false)。
* `allow_jagged_rows(bool)` - 接受缺少尾隨可選列的行。缺失值被視為空值。如果為 false,則缺少尾隨列的記錄將被視為錯誤記錄,如果錯誤記錄太多,則會在作業結果中返回無效錯誤。僅適用于 CSV,忽略其他格式。
* `max_id_key(str)` - 如果設置,則是 BigQuery 表中要加載的列的名稱。在加載發生后,Thsi 將用于從 BigQuery 中選擇 MAX 值。結果將由 execute()命令返回,該命令又存儲在 XCom 中供將來的operator 使用。這對增量加載很有幫助 - 在將來的執行過程中,您可以從最大 ID 中獲取。
* `bigquery_conn_id(str)` - 對特定 BigQuery 的引用。
* `google_cloud_storage_conn_id(str)` - 對特定 Google 云存儲掛鉤的引用。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
* `schema_update_options(list)` - 允許更新目標表的模式作為加載作業的副作用。
* `src_fmt_configs(dict)` - 配置特定于源格式的可選字段
* `external_table(bool)` - 用于指定目標表是否應為 BigQuery 外部表的標志。默認值為 False。
* `time_partitioning(dict)` - 配置可選的時間分區字段,即按 API 規范按字段,類型和到期分區。請注意,“field”在 dataset.table $ partition 的并發中不可用。
##### GoogleCloudStorageToGoogleCloudStorageOperator
```py
class airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator(source_bucket,source_object,destination_bucket = None,destination_object = None,move_object = False,google_cloud_storage_conn_id ='google_cloud_default',delegate_to = None,*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
將對象從存儲桶復制到另一個存儲桶,并在需要時重命名。
參數:
* `source_bucket(str)` - 對象所在的源 Google 云存儲桶。(模板渲染后)
* `source_object(str)` -
要在 Google 云存儲分區中復制的對象的源名稱。(模板化)如果在此參數中使用通配符:
> 您只能在存儲桶中使用一個通配符作為對象(文件名)。通配符可以出現在對象名稱內或對象名稱的末尾。不支持在存儲桶名稱中附加通配符。
* `destination_bucket` - 目標 Google 云端存儲分區
對象應該在哪里。(模板化):type destination_bucket:string:param destination_object:對象的目標名稱
> 目標 Google 云存儲桶。(模板化)如果在 source_object 參數中提供了通配符,則這是將添加到最終目標對象路徑的前綴。請注意,將刪除通配符之前的源路徑部分; 如果需要保留,則應將其附加到 destination_object。例如,使用 prefix `foo/*`和 destination_object'blah `/``,文件`foo/baz`將被復制到`blah/baz`; 保留前綴寫入 destination_object,例如`blah/foo`,在這種情況下,復制的文件將被命名`blah/foo/baz`。
參數:**move_object** - 當移動對象為 True 時,移動對象
```py
復制到新位置。
```
這相當于 mv 命令而不是 cp 命令。
參數:
* `google_cloud_storage_conn_id(str)` - 連接到 Google 云端存儲時使用的連接 ID。
* `delegate_to(str)` - 代理的帳戶(如果有)。為此,發出請求的服務帳戶必須啟用域范圍委派。
```py
Examples:
```
下面的操作將命名一個文件復制`sales/sales-2017/january.avro`在`data`桶的文件和名為斗`copied_sales/2017/january-backup.avro` in the ``data_backup`
```py
copy_single_file = GoogleCloudStorageToGoogleCloudStorageOperator (
task_id = 'copy_single_file' ,
source_bucket = 'data' ,
source_object = 'sales/sales-2017/january.avro' ,
destination_bucket = 'data_backup' ,
destination_object = 'copied_sales/2017/january-backup.avro' ,
google_cloud_storage_conn_id = google_cloud_conn_id
)
```
以下 operator 會將文件`sales/sales-2017`夾中的所有 Avro 文件(即名稱以該前綴開頭)復制到存儲`data`桶中的`copied_sales/2017`文件夾中`data_backup`。
```py
copy_files = GoogleCloudStorageToGoogleCloudStorageOperator (
task_id = 'copy_files' ,
source_bucket = 'data' ,
source_object = 'sales/sales-2017/*.avro' ,
destination_bucket = 'data_backup' ,
destination_object = 'copied_sales/2017/' ,
google_cloud_storage_conn_id = google_cloud_conn_id
)
```
以下 operator 會將文件`sales/sales-2017`夾中的所有 Avro 文件(即名稱以該前綴開頭)移動到`data`存儲桶中的同一文件夾`data_backup`,刪除過程中的原始文件。
```py
move_files = GoogleCloudStorageToGoogleCloudStorageOperator (
task_id = 'move_files' ,
source_bucket = 'data' ,
source_object = 'sales/sales-2017/*.avro' ,
destination_bucket = 'data_backup' ,
move_object = True ,
google_cloud_storage_conn_id = google_cloud_conn_id
)
```
#### GoogleCloudStorageHook
```py
class airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook(google_cloud_storage_conn_id ='google_cloud_default',delegate_to = None)
```
基類: [`airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook`]
與 Google 云端存儲互動。此掛鉤使用 Google Cloud Platform 連接。
```py
copy(source_bucket,source_object,destination_bucket = None,destination_object = None)
```
將對象從存儲桶復制到另一個存儲桶,并在需要時重命名。
destination_bucket 或 destination_object 可以省略,在這種情況下使用源桶/對象,但不能同時使用兩者。
參數:
* `source_bucket(str)` - 要從中復制的對象的存儲桶。
* `source_object(str)` - 要復制的對象。
* `destination_bucket(str)` - 要復制到的對象的目標。可以省略; 然后使用相同的桶。
* `destination_object` - 給定對象的(重命名)路徑。可以省略; 然后使用相同的名稱。
```py
create_bucket(bucket_name,storage_class ='MULTI_REGIONAL',location ='US',project_id = None,labels = None)
```
創建一個新存儲桶。Google 云端存儲使用平面命名空間,因此您無法創建名稱已在使用中的存儲桶。
也可以看看
有關詳細信息,請參閱存儲桶命名指南:[https://cloud.google.com/storage/docs/bucketnaming.html#requirements](https://cloud.google.com/storage/docs/bucketnaming.html#requirements)
參數:
* `bucket_name(str)` - 存儲桶的名稱。
* `storage_class(str)` -
這定義了存儲桶中對象的存儲方式,并確定了 SLA 和存儲成本。價值包括
* `MULTI_REGIONAL`
* `REGIONAL`
* `STANDARD`
* `NEARLINE`
* `COLDLINE` 。
如果在創建存儲桶時未指定此值,則默認為 STANDARD。
* `location(str)` -
桶的位置。存儲桶中對象的對象數據駐留在此區域內的物理存儲中。默認為美國。
也可以看看
[https://developers.google.com/storage/docs/bucket-locations](https://developers.google.com/storage/docs/bucket-locations)
* `project_id(str)` - GCP 項目的 ID。
* `labels(dict)` - 用戶提供的鍵/值對標簽。
返回:如果成功,則返回`id`桶的內容。
```py
delete(bucket,object,generation=None)
```
如果未對存儲桶啟用版本控制,或者使用了生成參數,則刪除對象。
參數:
* `bucket(str)` - 對象所在的存儲桶的名稱
* `object(str)` - 要刪除的對象的名稱
* `generation(str)` - 如果存在,則永久刪除該代的對象
返回:如果成功則為真
```py
下載(bucket,object,filename = None)
```
從 Google 云端存儲中獲取文件。
參數:
* `bucket(str)` - 要獲取的存儲桶。
* `object(str)` - 要獲取的對象。
* `filename(str)` - 如果設置,則應寫入文件的本地文件路徑。
```py
exists(budket, object)
```
檢查 Google 云端存儲中是否存在文件。
參數:
* `bucket(str)` - 對象所在的 Google 云存儲桶。
* `object(str)` - 要在 Google 云存儲分區中檢查的對象的名稱。
```py
get_conn()
```
返回 Google 云端存儲服務對象。
```py
get_crc32c(bucket,object)
```
獲取 Google Cloud Storage 中對象的 CRC32c 校驗和。
參數:
* `bucket(str)` - 對象所在的 Google 云存儲桶。
* `object(str)` - 要在 Google 云存儲分區中檢查的對象的名稱。
```py
get_md5hash(bucket,object)
```
獲取 Google 云端存儲中對象的 MD5 哈希值。
參數:
* `bucket(str)` - 對象所在的 Google 云存儲桶。
* `object(str)` - 要在 Google 云存儲分區中檢查的對象的名稱。
```py
get_size(bucket,object)
```
獲取 Google 云端存儲中文件的大小。
參數:
* `bucket(str)` - 對象所在的 Google 云存儲桶。
* `object(str)` - 要在 Google 云存儲分區中檢查的對象的名稱。
```py
is_updated_after(bucket,object,ts)
```
檢查 Google Cloud Storage 中是否更新了對象。
參數:
* `bucket(str)` - 對象所在的 Google 云存儲桶。
* `object(str)` - 要在 Google 云存儲分區中檢查的對象的名稱。
* `ts(datetime)` - 要檢查的時間戳。
```py
list(bucket,versions = None,maxResults = None,prefix = None,delimiter = None)
```
使用名稱中的給定字符串前綴列出存儲桶中的所有對象
參數:
* `bucket(str)` - 存儲桶名稱
* `versions(bool)` - 如果為 true,則列出對象的所有版本
* `maxResults(int)` - 在單個響應頁面中返回的最大項目數
* `prefix(str)` - 前綴字符串,用于過濾名稱以此前綴開頭的對象
* `delimiter(str)` - 根據分隔符過濾對象(例如'.csv')
返回:與過濾條件匹配的對象名稱流
```py
rewrite(source_bucket,source_object,destination_bucket,destination_object = None)
```
具有與復制相同的功能,除了可以處理超過 5 TB 的文件,以及在位置和/或存儲類之間復制時。
destination_object 可以省略,在這種情況下使用 source_object。
參數:
* `source_bucket(str)` - 要從中復制的對象的存儲桶。
* `source_object(str)` - 要復制的對象。
* `destination_bucket(str)` - 要復制到的對象的目標。
* `destination_object` - 給定對象的(重命名)路徑。可以省略; 然后使用相同的名稱。
```py
upload(bucket,object,filename,mime_type ='application / octet-stream')
```
將本地文件上傳到 Google 云端存儲。
參數:
* `bucket(str)` - 要上傳的存儲桶。
* `object(str)` - 上傳本地文件時要設置的對象名稱。
* `filename(str)` - 要上傳的文件的本地文件路徑。
* `mime_type(str)` - 上傳文件時要設置的 MIME 類型。
### Google Kubernetes 引擎
#### Google Kubernetes 引擎集群 Operators
* [GKEClusterDeleteOperator](#GKEClusterDeleteOperator):在 Google Cloud Platform 中創建 Kubernetes 集群
* [GKEPodOperator](#GKEPodOperator):刪除 Google Cloud Platform 中的 Kubernetes 集群
##### GKEClusterCreateOperator
```py
class airflow.contrib.operators.gcp_container_operator.GKEClusterCreateOperator(project_id,location,body = {},gcp_conn_id ='google_cloud_default',api_version ='v2',*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
##### GKEClusterDeleteOperator
```py
class airflow.contrib.operators.gcp_container_operator.GKEClusterDeleteOperator(project_id,name,location,gcp_conn_id ='google_cloud_default',api_version ='v2',*args,**kwargs)
```
基類: `airflow.models.BaseOperator`
##### GKEPodOperator
#### Google Kubernetes Engine Hook
```py
class airflow.contrib.hooks.gcp_container_hook.GKEClusterHook(project_id,location)
```
基類: `airflow.hooks.base_hook.BaseHook`
```py
create_cluster(cluster,retry = <object object>,timeout = <object object>)
```
創建一個集群,由指定數量和類型的 Google Compute Engine 實例組成。
參數:
* `cluster(dict 或 google.cloud.container_v1.types.Cluster)` - 集群 protobuf 或 dict。如果提供了 dict,它必須與 protobuf 消息的格式相同 google.cloud.container_v1.types.Cluster
* `重試(google.api_core.retry.Retry)` - 用于重試請求的重試對象(google.api_core.retry.Retry)。如果指定 None,則不會重試請求。
* `timeout(float)` - 等待請求完成的時間(以秒為單位)。請注意,如果指定了重試,則超時適用于每次單獨嘗試。
返回:新集群或現有集群的完整 URL
raise
ParseError:在嘗試轉換 dict 時出現 JSON 解析問題 AirflowException:cluster 不是 dict 類型也不是 Cluster proto 類型
```py
delete_cluster(name,retry = <object object>,timeout = <object object>)
```
刪除集群,包括 Kubernetes 端點和所有工作節點。在集群創建期間配置的防火墻和路由也將被刪除。集群可能正在使用的其他 Google Compute Engine 資源(例如,負載均衡器資源)如果在初始創建時不存在,則不會被刪除。
參數:
* `name(str)` - 要刪除的集群的名稱
* `retry(google.api_core.retry.Retry)` - 重試用于確定何時/是否重試請求的對象。如果指定 None,則不會重試請求。
* `timeout(float)` - 等待請求完成的時間(以秒為單位)。請注意,如果指定了重試,則超時適用于每次單獨嘗試。
返回:如果成功則刪除操作的完整 URL,否則為 None
```py
get_cluster(name,retry = <object object>,timeout = <object object>)
```
獲取指定集群的詳細信息:param name:要檢索的集群的名稱:type name:str:param retry:用于重試請求的重試對象。如果指定了 None,
> 請求不會被重試。
參數:`timeout(float)` - 等待請求完成的時間(以秒為單位)。請注意,如果指定了重試,則超時適用于每次單獨嘗試。
返回:一個 google.cloud.container_v1.types.Cluster 實例
```py
get_operation(operation_name)
```
從 Google Cloud 獲取操作:param operation_name:要獲取的操作的名稱:type operation_name:str:return:來自 Google Cloud 的新的更新操作
```py
wait_for_operation(operation)
```
給定操作,持續從 Google Cloud 獲取狀態,直到完成或發生錯誤:param 操作:等待的操作:鍵入操作:google.cloud.container_V1.gapic.enums.Operator:return:a new,updated 從 Google Cloud 獲取的操作