# 插件
Airflow內置了一個簡單的插件管理器,可以通過簡單地刪除`$AIRFLOW_HOME/plugins`文件夾中的文件,將外部功能集成到其核心。
`plugins`文件夾中的python模塊將被導入, **鉤子** , **操作符** , **傳感器** , **宏** , **執行器**和Web **視圖**將集成到Airflow的主要集合中,并可供使用。
## 做什么的?
Airflow提供了一個用于處理數據的通用工具箱。 不同的組織有不同的堆棧和不同的需求。 使用Airflow插件可以讓公司定制他們的Airflow安裝以反映他們的生態系統。
插件可以用作編寫,共享和激活新功能集的簡便方法。
還需要一組更復雜的應用程序來與不同風格的數據和元數據進行交互。
例子:
* 一組用于解析Hive日志和公開Hive元數據(CPU / IO /階段/傾斜/ ...)的工具
* 異常檢測框架,允許人們收集指標,設置閾值和警報
* 審計工具,幫助了解誰訪問了什么
* 配置驅動的SLA監控工具,允許您設置受監控的表以及應該在何時著陸,提醒人員并公開停機的可視化
* ...
## 為什么要建立在Airflow之上?
Airflow有許多組件可以在構建應用程序時重用:
* 可用于呈現視圖的Web服務器
* 用于存儲模型的元數據數據庫
* 訪問您的數據庫,以及如何連接到它們的知識
* 應用程序可以將工作負載推送到的一組工作者
* 部署了Airflow,您可以專注于部署物流
* 基本的圖表功能,底層庫和抽象
## 接口
要創建插件,您需要派生`airflow.plugins_manager.AirflowPlugin`類并引用要插入Airflow的對象。 以下是您需要派生的類看起來像:
```
class AirflowPlugin ( object ):
# The name of your plugin (str)
name = None
# A list of class(es) derived from BaseOperator
operators = []
# A list of class(es) derived from BaseSensorOperator
sensors = []
# A list of class(es) derived from BaseHook
hooks = []
# A list of class(es) derived from BaseExecutor
executors = []
# A list of references to inject into the macros namespace
macros = []
# A list of objects created from a class derived
# from flask_admin.BaseView
admin_views = []
# A list of Blueprint object created from flask.Blueprint
flask_blueprints = []
# A list of menu links (flask_admin.base.MenuLink)
menu_links = []
```
您可以通過繼承派生它(請參閱下面的示例)。 請注意,必須指定此類中的`name` 。
將插件導入Airflow后,您可以使用類似語句調用它
```
from airflow. { type , like "operators" , "sensors" } . { name specificed inside the plugin class } import *
```
當您編寫自己的插件時,請確保您理解它們。 每種類型的插件都有一些基本屬性。 例如,
* 對于`Operator`插件,必須使用`execute`方法。
* 對于`Sensor`插件,必須使用返回布爾值的`poke`方法。
## 例
下面的代碼定義了一個插件,它在Airflow中注入一組虛擬對象定義。
```
# This is the class you derive to create a plugin
from airflow.plugins_manager import AirflowPlugin
from flask import Blueprint
from flask_admin import BaseView , expose
from flask_admin.base import MenuLink
# Importing base classes that we need to derive
from airflow.hooks.base_hook import BaseHook
from airflow.models import BaseOperator
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.executors.base_executor import BaseExecutor
# Will show up under airflow.hooks.test_plugin.PluginHook
class PluginHook ( BaseHook ):
pass
# Will show up under airflow.operators.test_plugin.PluginOperator
class PluginOperator ( BaseOperator ):
pass
# Will show up under airflow.sensors.test_plugin.PluginSensorOperator
class PluginSensorOperator ( BaseSensorOperator ):
pass
# Will show up under airflow.executors.test_plugin.PluginExecutor
class PluginExecutor ( BaseExecutor ):
pass
# Will show up under airflow.macros.test_plugin.plugin_macro
def plugin_macro ():
pass
# Creating a flask admin BaseView
class TestView ( BaseView ):
@expose ( '/' )
def test ( self ):
# in this example, put your test_plugin/test.html template at airflow/plugins/templates/test_plugin/test.html
return self . render ( "test_plugin/test.html" , content = "Hello galaxy!" )
v = TestView ( category = "Test Plugin" , name = "Test View" )
# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint (
"test_plugin" , __name__ ,
template_folder = 'templates' , # registers airflow/plugins/templates as a Jinja template folder
static_folder = 'static' ,
static_url_path = '/static/test_plugin' )
ml = MenuLink (
category = 'Test Plugin' ,
name = 'Test Menu Link' ,
url = 'https://airflow.incubator.apache.org/' )
# Defining the plugin class
class AirflowTestPlugin ( AirflowPlugin ):
name = "test_plugin"
operators = [ PluginOperator ]
sensors = [ PluginSensorOperator ]
hooks = [ PluginHook ]
executors = [ PluginExecutor ]
macros = [ plugin_macro ]
admin_views = [ v ]
flask_blueprints = [ bp ]
menu_links = [ ml ]
```