<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                # Storm Redis 集成 Storm/Trident 集成 [Redis](http://redis.io/) Storm-redis使用Jedis為Redis客戶端。 ## 用法 ### 如何使用它? 使用它作為一個maven依賴: ``` <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-redis</artifactId> <version>${storm.version}</version> <type>jar</type> </dependency> ``` ### 常用Bolt Storm-redis提供了基本的Bolt實現, `RedisLookupBolt` and `RedisStoreBolt`。 根據名稱可以知道其功能,`RedisLookupBolt`使用鍵從Redis中檢索值,而`RedisStoreBolt`將鍵/值存儲到Redis。 一個元組將匹配一個鍵/值對,您可以將匹配模式定義為“`TupleMapper```。 您還可以從`RedisDataTypeDescription`中選擇數據類型來使用。請參考 `RedisDataTypeDescription.RedisDataType`來查看支持哪些數據類型。在一些數據類型(散列和排序集)中,它需要額外的鍵和從元組轉換的元素成為元素。 這些接口與 `RedisLookupMapper` 和 `RedisStoreMapper`組合,分別適合 `RedisLookupBolt` 和`RedisStoreBolt`。 #### RedisLookupBolt示例 ``` class WordCountRedisLookupMapper implements RedisLookupMapper { private RedisDataTypeDescription description; private final String hashKey = "wordCount"; public WordCountRedisLookupMapper() { description = new RedisDataTypeDescription( RedisDataTypeDescription.RedisDataType.HASH, hashKey); } @Override public List<Values> toTuple(ITuple input, Object value) { String member = getKeyFromTuple(input); List<Values> values = Lists.newArrayList(); values.add(new Values(member, value)); return values; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("wordName", "count")); } @Override public RedisDataTypeDescription getDataTypeDescription() { return description; } @Override public String getKeyFromTuple(ITuple tuple) { return tuple.getStringByField("word"); } @Override public String getValueFromTuple(ITuple tuple) { return null; } } ``` ``` JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(host).setPort(port).build(); RedisLookupMapper lookupMapper = new WordCountRedisLookupMapper(); RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper); ``` #### RedisStoreBolt示例 ``` class WordCountStoreMapper implements RedisStoreMapper { private RedisDataTypeDescription description; private final String hashKey = "wordCount"; public WordCountStoreMapper() { description = new RedisDataTypeDescription( RedisDataTypeDescription.RedisDataType.HASH, hashKey); } @Override public RedisDataTypeDescription getDataTypeDescription() { return description; } @Override public String getKeyFromTuple(ITuple tuple) { return tuple.getStringByField("word"); } @Override public String getValueFromTuple(ITuple tuple) { return tuple.getStringByField("count"); } } ``` ``` JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(host).setPort(port).build(); RedisStoreMapper storeMapper = new WordCountStoreMapper(); RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper); ``` ### 非簡單的 Bolt 如果您的場景不適合 `RedisStoreBolt`和 `RedisLookupBolt`,Storm-redis還提供了 `AbstractRedisBolt`,讓您擴展和應用業務邏輯。 ``` public static class LookupWordTotalCountBolt extends AbstractRedisBolt { private static final Logger LOG = LoggerFactory.getLogger(LookupWordTotalCountBolt.class); private static final Random RANDOM = new Random(); public LookupWordTotalCountBolt(JedisPoolConfig config) { super(config); } public LookupWordTotalCountBolt(JedisClusterConfig config) { super(config); } @Override public void execute(Tuple input) { JedisCommands jedisCommands = null; try { jedisCommands = getInstance(); String wordName = input.getStringByField("word"); String countStr = jedisCommands.get(wordName); if (countStr != null) { int count = Integer.parseInt(countStr); this.collector.emit(new Values(wordName, count)); // print lookup result with low probability if(RANDOM.nextInt(1000) > 995) { LOG.info("Lookup result - word : " + wordName + " / count : " + count); } } else { // skip LOG.warn("Word not found in Redis - word : " + wordName); } } finally { if (jedisCommands != null) { returnInstance(jedisCommands); } this.collector.ack(input); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // wordName, count declarer.declare(new Fields("wordName", "count")); } } ``` ### Trident State 用法 1. RedisState和RedisMapState,它提供Jedis接口,僅用于單次重新啟動。 2. RedisClusterState和RedisClusterMapState,它們提供JedisCluster接口,僅用于redis集群。 RedisState ```java JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(redisHost).setPort(redisPort) .build(); RedisStoreMapper storeMapper = new WordCountStoreMapper(); RedisLookupMapper lookupMapper = new WordCountLookupMapper(); RedisState.Factory factory = new RedisState.Factory(poolConfig); ``` TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream("spout1", spout); stream.partitionPersist(factory, fields, new RedisStateUpdater(storeMapper).withExpire(86400000), new Fields()); TridentState state = topology.newStaticState(factory); stream = stream.stateQuery(state, new Fields("word"), new RedisStateQuerier(lookupMapper), new Fields("columnName","columnValue")); ``` ``` RedisClusterState ```java Set<InetSocketAddress> nodes = new HashSet<InetSocketAddress>(); for (String hostPort : redisHostPort.split(",")) { String[] host_port = hostPort.split(":"); nodes.add(new InetSocketAddress(host_port[0], Integer.valueOf(host_port[1]))); } JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes) .build(); RedisStoreMapper storeMapper = new WordCountStoreMapper(); RedisLookupMapper lookupMapper = new WordCountLookupMapper(); RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig); TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream("spout1", spout); stream.partitionPersist(factory, fields, new RedisClusterStateUpdater(storeMapper).withExpire(86400000), new Fields()); TridentState state = topology.newStaticState(factory); stream = stream.stateQuery(state, new Fields("word"), new RedisClusterStateQuerier(lookupMapper), new Fields("columnName","columnValue")); ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看