# Storm MongoDB 集成
Storm/Trident集成[MongoDB](https://www.mongodb.org/)。該包中包括核心bolts和trident states,允許storm topology將storm tuples插入到數據庫集合中,或者針storm topology中的數據庫集合執行更新查詢。
## Insert into Database
此包中包含用于將數據插入數據庫集合的bolt和trident state。
### MongoMapper
使用MongoDB在集合中插入數據的主要API是 `org.apache.storm.mongodb.common.mapper.MongoMapper` 接口:
```
public interface MongoMapper extends Serializable {
Document toDocument(ITuple tuple);
}
```
### SimpleMongoMapper
`storm-mongodb`包括一個通用的`MongoMapper`實現,稱為`SimpleMongoMapper`,可以將Storm元組映射到一個數據庫文件。 `SimpleMongoMapper`假定storm tuple具有與您要寫入的數據庫集合中的文檔字段名稱相同的字段。
```
public class SimpleMongoMapper implements MongoMapper {
private String[] fields;
@Override
public Document toDocument(ITuple tuple) {
Document document = new Document();
for(String field : fields){
document.append(field, tuple.getValueByField(field));
}
return document;
}
public SimpleMongoMapper withFields(String... fields) {
this.fields = fields;
return this;
}
}
```
### MongoInsertBolt
要使用`MongoInsertBolt`,您可以通過指定url,collectionName和將 storm tuple轉換為DB文檔的 `MongoMapper`實現來構造它的一個實例。 以下是標準的URI連接方案: `mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]`
有關Mongo URI的更多選項信息(例如:寫關注選項),您可以訪問 [https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options](https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options)
```
String url = "mongodb://127.0.0.1:27017/test";
String collectionName = "wordcount";
MongoMapper mapper = new SimpleMongoMapper()
.withFields("word", "count");
MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
```
### MongoTridentState
我們還支持在trident topologies中持久化trident state 。 要創建一個Mongo持久的trident state,您需要使用url,collectionName,“MongoMapper”實例初始化它。 見下面的例子:
```
MongoMapper mapper = new SimpleMongoMapper()
.withFields("word", "count");
MongoState.Options options = new MongoState.Options()
.withUrl(url)
.withCollectionName(collectionName)
.withMapper(mapper);
StateFactory factory = new MongoStateFactory(options);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
stream.partitionPersist(factory, fields, new MongoStateUpdater(), new Fields());
```
**NOTE**:
> 如果沒有提供唯一的索引,在發生故障的情況下,trident state插入可能會導致重復的文檔。
## Update from Database
包中包含用于從數據庫集合更新數據的bolt。
### SimpleMongoUpdateMapper
`storm-mongodb`包括一個通用的`MongoMapper`實現,稱為`SimpleMongoUpdateMapper`,可以將Storm元組映射到數據庫文檔。 `SimpleMongoUpdateMapper`假定風暴元組具有與您要寫入的數據庫集合中的文檔字段名稱相同的字段。 `SimpleMongoUpdateMapper`使用`$ set`運算符來設置文檔中字段的值。 有關更新操作的更多信息,可以訪問 [https://docs.mongodb.org/manual/reference/operator/update/](https://docs.mongodb.org/manual/reference/operator/update/)
```
public class SimpleMongoUpdateMapper implements MongoMapper {
private String[] fields;
@Override
public Document toDocument(ITuple tuple) {
Document document = new Document();
for(String field : fields){
document.append(field, tuple.getValueByField(field));
}
return new Document("$set", document);
}
public SimpleMongoUpdateMapper withFields(String... fields) {
this.fields = fields;
return this;
}
}
```
### QueryFilterCreator
用于創建MongoDB查詢過濾器的主要API是 `org.apache.storm.mongodb.common.QueryFilterCreator` 接口:
```
public interface QueryFilterCreator extends Serializable {
Bson createFilter(ITuple tuple);
}
```
### SimpleQueryFilterCreator
`storm-mongodb`包括一個通用的`QueryFilterCreator`實現,稱為`SimpleQueryFilterCreator`,可以通過給定的Tuple創建一個MongoDB查詢過濾器。 `QueryFilterCreator`使用`$ eq`運算符匹配等于指定值的值。 有關查詢運算符的更多信息,可以訪問 [https://docs.mongodb.org/manual/reference/operator/query/](https://docs.mongodb.org/manual/reference/operator/query/)
```
public class SimpleQueryFilterCreator implements QueryFilterCreator {
private String field;
@Override
public Bson createFilter(ITuple tuple) {
return Filters.eq(field, tuple.getValueByField(field));
}
public SimpleQueryFilterCreator withField(String field) {
this.field = field;
return this;
}
}
```
### MongoUpdateBolt
要使用`MongoUpdateBolt`,你可以通過指定Mongo url,collectionName,一個`QueryFilterCreator`實現和一個```MongoMapper`實現來將storm tuple轉換成DB文檔來構造一個實例。
```
MongoMapper mapper = new SimpleMongoUpdateMapper()
.withFields("word", "count");
QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
.withField("word");
MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
//if a new document should be inserted if there are no matches to the query filter
//updateBolt.withUpsert(true);
```
或者為 `QueryFilterCreator`使用匿名內部類實現:
```
MongoMapper mapper = new SimpleMongoUpdateMapper()
.withFields("word", "count");
QueryFilterCreator updateQueryCreator = new QueryFilterCreator() {
@Override
public Bson createFilter(ITuple tuple) {
return Filters.gt("count", 3);
}
};
MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
//if a new document should be inserted if there are no matches to the query filter
//updateBolt.withUpsert(true);
```
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度