# Storm Elasticsearch 集成
## Storm Elasticsearch Bolt & Trident State
EdIndexBolt,EsPercolateBolt和Estate允許用戶將storm中的數據直接傳輸到Elasticsearch。 詳細說明請參考以下內容。
## EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt)
EsIndexBolt將tuples直接流入Elasticsearch索。 Tuples以指定的索引和類型組合進行索引。 用戶應確保`EsTupleMapper`可以從輸入元組中提取“source”,“index”,“type”和“id”,“index”和“type”用于識別目標索引和類型。“source” 一個JSON格式的文檔,將在Elasticsearch中編入索引。
```
EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper);
```
## EsPercolateBolt (org.apache.storm.elasticsearch.bolt.EsPercolateBolt)
EsPercolateBolt將tuples直接流入Elasticsearch。 tuples用于發送滲透請求到指定的索引和類型組合。 用戶應該確保`EsTupleMapper` 可以從輸入元組中提取“source”,“index”,“type”,“index”和“type”用于識別目標索引和類型,“source”是一個文檔 在JSON格式的字符串將發送到滲透請求到彈性搜索。
```
EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig, tupleMapper);
```
如果存在非空的滲漏響應,EsPercolateBolt將會為PercolateResponse中每個Percolate.Match發出具有原始源和Percolate.Match的tuple。
## EsState (org.apache.storm.elasticsearch.trident.EsState)
Elasticsearch Trident state也與EsBolts類似。 它將EsConfig和EsTupleMapper作為參數。
```
EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
```
## EsLookupBolt (org.apache.storm.elasticsearch.bolt.EsLookupBolt)
EsLookupBolt對Elasticsearch執行獲取請求。 為了做到這一點,需要滿足三個依賴。 除了通常的EsConfig,還必須提供其他兩個依賴關系: ElasticsearchGetRequest用于將傳入的元組轉換為將針對Elasticsearch執行的GetRequest。 EsLookupResultOutput用于聲明輸出字段,并將GetResponse轉換為由bolt發出的值。
傳入的tuple被傳遞給提供的GetRequest創建者,該執行的結果被傳遞給Elasticsearch客戶端。 然后,bolt使用提供程序輸出適配器(EsLookupResultOutput)將GetResponse轉換為值以發送。 輸出字段也由bolt的用戶通過輸出適配器(EsLookupResultOutput)指定。
```
EsConfig esConfig = createEsConfig();
ElasticsearchGetRequest getRequestAdapter = createElasticsearchGetRequest();
EsLookupResultOutput output = createOutput();
EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, output);
```
## EsConfig (org.apache.storm.elasticsearch.common.EsConfig)
=提供的組件(Bolt,State)以EsConfig作為構造函數arg。
```
EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
```
or
```
Map<String, String> additionalParameters = new HashMap<>();
additionalParameters.put("client.transport.sniff", "true");
EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"}, additionalParameters);
```
### EsConfig params
| Arg | Description | Type |
| --- | --- | --- |
| clusterName | Elasticsearch cluster name | String (required) |
| nodes | Elasticsearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) |
| additionalParameters | Additional Elasticsearch Transport Client configuration parameters | Map <string, string="">(optional)</string,> |
## EsTupleMapper (org.apache.storm.elasticsearch.common.EsTupleMapper)
對于存儲在Elasticsearch中的tuple或者從Elasticsearch搜索到的tuple,我們需要定義使用哪些字段。 用戶需要通過實現`EsTupleMapper`定義你自己的。 Storm-elasticsearch提供了默認的mapper`org.apache.storm.elasticsearch.common.DefaultEsTupleMapper`,它從相同的字段中提取其源,索引,類型,id值。 您可以參考DefaultEsTupleMapper的實現來看看如何實現自己的。
## Committer Sponsors
* Sriharsha Chintalapani ([@harshach](https://github.com/harshach))
* Jungtaek Lim ([@HeartSaVioR](https://github.com/HeartSaVioR))
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度