<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                # Storm MongoDB 集成 Storm/Trident集成[MongoDB](https://www.mongodb.org/)。該包中包括核心bolts和trident states,允許storm topology將storm tuples插入到數據庫集合中,或者針storm topology中的數據庫集合執行更新查詢。 ## Insert into Database 此包中包含用于將數據插入數據庫集合的bolt和trident state。 ### MongoMapper 使用MongoDB在集合中插入數據的主要API是 `org.apache.storm.mongodb.common.mapper.MongoMapper` 接口: ``` public interface MongoMapper extends Serializable { Document toDocument(ITuple tuple); } ``` ### SimpleMongoMapper `storm-mongodb`包括一個通用的`MongoMapper`實現,稱為`SimpleMongoMapper`,可以將Storm元組映射到一個數據庫文件。 `SimpleMongoMapper`假定storm tuple具有與您要寫入的數據庫集合中的文檔字段名稱相同的字段。 ``` public class SimpleMongoMapper implements MongoMapper { private String[] fields; @Override public Document toDocument(ITuple tuple) { Document document = new Document(); for(String field : fields){ document.append(field, tuple.getValueByField(field)); } return document; } public SimpleMongoMapper withFields(String... fields) { this.fields = fields; return this; } } ``` ### MongoInsertBolt 要使用`MongoInsertBolt`,您可以通過指定url,collectionName和將 storm tuple轉換為DB文檔的 `MongoMapper`實現來構造它的一個實例。 以下是標準的URI連接方案: `mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]` 有關Mongo URI的更多選項信息(例如:寫關注選項),您可以訪問 [https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options](https://docs.mongodb.org/manual/reference/connection-string/#connections-connection-options) ``` String url = "mongodb://127.0.0.1:27017/test"; String collectionName = "wordcount"; MongoMapper mapper = new SimpleMongoMapper() .withFields("word", "count"); MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper); ``` ### MongoTridentState 我們還支持在trident topologies中持久化trident state 。 要創建一個Mongo持久的trident state,您需要使用url,collectionName,“MongoMapper”實例初始化它。 見下面的例子: ``` MongoMapper mapper = new SimpleMongoMapper() .withFields("word", "count"); MongoState.Options options = new MongoState.Options() .withUrl(url) .withCollectionName(collectionName) .withMapper(mapper); StateFactory factory = new MongoStateFactory(options); TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream("spout1", spout); stream.partitionPersist(factory, fields, new MongoStateUpdater(), new Fields()); ``` **NOTE**: > 如果沒有提供唯一的索引,在發生故障的情況下,trident state插入可能會導致重復的文檔。 ## Update from Database 包中包含用于從數據庫集合更新數據的bolt。 ### SimpleMongoUpdateMapper `storm-mongodb`包括一個通用的`MongoMapper`實現,稱為`SimpleMongoUpdateMapper`,可以將Storm元組映射到數據庫文檔。 `SimpleMongoUpdateMapper`假定風暴元組具有與您要寫入的數據庫集合中的文檔字段名稱相同的字段。 `SimpleMongoUpdateMapper`使用`$ set`運算符來設置文檔中字段的值。 有關更新操作的更多信息,可以訪問 [https://docs.mongodb.org/manual/reference/operator/update/](https://docs.mongodb.org/manual/reference/operator/update/) ``` public class SimpleMongoUpdateMapper implements MongoMapper { private String[] fields; @Override public Document toDocument(ITuple tuple) { Document document = new Document(); for(String field : fields){ document.append(field, tuple.getValueByField(field)); } return new Document("$set", document); } public SimpleMongoUpdateMapper withFields(String... fields) { this.fields = fields; return this; } } ``` ### QueryFilterCreator 用于創建MongoDB查詢過濾器的主要API是 `org.apache.storm.mongodb.common.QueryFilterCreator` 接口: ``` public interface QueryFilterCreator extends Serializable { Bson createFilter(ITuple tuple); } ``` ### SimpleQueryFilterCreator `storm-mongodb`包括一個通用的`QueryFilterCreator`實現,稱為`SimpleQueryFilterCreator`,可以通過給定的Tuple創建一個MongoDB查詢過濾器。 `QueryFilterCreator`使用`$ eq`運算符匹配等于指定值的值。 有關查詢運算符的更多信息,可以訪問 [https://docs.mongodb.org/manual/reference/operator/query/](https://docs.mongodb.org/manual/reference/operator/query/) ``` public class SimpleQueryFilterCreator implements QueryFilterCreator { private String field; @Override public Bson createFilter(ITuple tuple) { return Filters.eq(field, tuple.getValueByField(field)); } public SimpleQueryFilterCreator withField(String field) { this.field = field; return this; } } ``` ### MongoUpdateBolt 要使用`MongoUpdateBolt`,你可以通過指定Mongo url,collectionName,一個`QueryFilterCreator`實現和一個```MongoMapper`實現來將storm tuple轉換成DB文檔來構造一個實例。 ``` MongoMapper mapper = new SimpleMongoUpdateMapper() .withFields("word", "count"); QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator() .withField("word"); MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper); //if a new document should be inserted if there are no matches to the query filter //updateBolt.withUpsert(true); ``` 或者為 `QueryFilterCreator`使用匿名內部類實現: ``` MongoMapper mapper = new SimpleMongoUpdateMapper() .withFields("word", "count"); QueryFilterCreator updateQueryCreator = new QueryFilterCreator() { @Override public Bson createFilter(ITuple tuple) { return Filters.gt("count", 3); } }; MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper); //if a new document should be inserted if there are no matches to the query filter //updateBolt.withUpsert(true); ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看