<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                ## 一、簡介 Storm-Redis 提供了 Storm 與 Redis 的集成支持,你只需要引入對應的依賴即可使用: ~~~ <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-redis</artifactId> <version>${storm.version}</version> <type>jar</type> </dependency> ~~~ Storm-Redis 使用 Jedis 為 Redis 客戶端,并提供了如下三個基本的 Bolt 實現: * **RedisLookupBolt**:從 Redis 中查詢數據; * **RedisStoreBolt**:存儲數據到 Redis; * **RedisFilterBolt** : 查詢符合條件的數據; `RedisLookupBolt`、`RedisStoreBolt`、`RedisFilterBolt` 均繼承自 `AbstractRedisBolt` 抽象類。我們可以通過繼承該抽象類,實現自定義 RedisBolt,進行功能的拓展。 ## 二、集成案例 ### 2.1 項目結構 這里首先給出一個集成案例:進行詞頻統計并將最后的結果存儲到 Redis。項目結構如下: ![](https://img.kancloud.cn/3b/c3/3bc3b3c74306a37b98ac90d92c502596_931x265.png) > 用例源碼下載地址:[storm-redis-integration](https://github.com/heibaiying/BigData-Notes/tree/master/code/Storm/storm-redis-integration) ### 2.2 項目依賴 項目主要依賴如下: ~~~ <properties> <storm.version>1.2.2</storm.version> </properties> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-redis</artifactId> <version>${storm.version}</version> </dependency> </dependencies> ~~~ ### 2.3 DataSourceSpout ~~~ /** * 產生詞頻樣本的數據源 */ public class DataSourceSpout extends BaseRichSpout { private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); private SpoutOutputCollector spoutOutputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; } @Override public void nextTuple() { // 模擬產生數據 String lineData = productData(); spoutOutputCollector.emit(new Values(lineData)); Utils.sleep(1000); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("line")); } /** * 模擬數據 */ private String productData() { Collections.shuffle(list); Random random = new Random(); int endIndex = random.nextInt(list.size()) % (list.size()) + 1; return StringUtils.join(list.toArray(), "\t", 0, endIndex); } } ~~~ 產生的模擬數據格式如下: ~~~ Spark HBase Hive Flink Storm Hadoop HBase Spark Flink HBase Storm HBase Hadoop Hive Flink HBase Flink Hive Storm Hive Flink Hadoop HBase Hive Hadoop Spark HBase Storm ~~~ ### 2.4 SplitBolt ~~~ /** * 將每行數據按照指定分隔符進行拆分 */ public class SplitBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String line = input.getStringByField("line"); String[] words = line.split("\t"); for (String word : words) { collector.emit(new Values(word, String.valueOf(1))); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } ~~~ ### 2.5 CountBolt ~~~ /** * 進行詞頻統計 */ public class CountBolt extends BaseRichBolt { private Map<String, Integer> counts = new HashMap<>(); private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector=collector; } @Override public void execute(Tuple input) { String word = input.getStringByField("word"); Integer count = counts.get(word); if (count == null) { count = 0; } count++; counts.put(word, count); // 輸出 collector.emit(new Values(word, String.valueOf(count))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } ~~~ ### 2.6 WordCountStoreMapper 實現 RedisStoreMapper 接口,定義 tuple 與 Redis 中數據的映射關系:即需要指定 tuple 中的哪個字段為 key,哪個字段為 value,并且存儲到 Redis 的何種數據結構中。 ~~~ /** * 定義 tuple 與 Redis 中數據的映射關系 */ public class WordCountStoreMapper implements RedisStoreMapper { private RedisDataTypeDescription description; private final String hashKey = "wordCount"; public WordCountStoreMapper() { description = new RedisDataTypeDescription( RedisDataTypeDescription.RedisDataType.HASH, hashKey); } @Override public RedisDataTypeDescription getDataTypeDescription() { return description; } @Override public String getKeyFromTuple(ITuple tuple) { return tuple.getStringByField("word"); } @Override public String getValueFromTuple(ITuple tuple) { return tuple.getStringByField("count"); } } ~~~ ### 2.7 WordCountToRedisApp ~~~ /** * 進行詞頻統計 并將統計結果存儲到 Redis 中 */ public class WordCountToRedisApp { private static final String DATA_SOURCE_SPOUT = "dataSourceSpout"; private static final String SPLIT_BOLT = "splitBolt"; private static final String COUNT_BOLT = "countBolt"; private static final String STORE_BOLT = "storeBolt"; //在實際開發中這些參數可以將通過外部傳入 使得程序更加靈活 private static final String REDIS_HOST = "192.168.200.226"; private static final int REDIS_PORT = 6379; public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout()); // split builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT); // count builder.setBolt(COUNT_BOLT, new CountBolt()).shuffleGrouping(SPLIT_BOLT); // save to redis JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(REDIS_HOST).setPort(REDIS_PORT).build(); RedisStoreMapper storeMapper = new WordCountStoreMapper(); RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper); builder.setBolt(STORE_BOLT, storeBolt).shuffleGrouping(COUNT_BOLT); // 如果外部傳參 cluster 則代表線上環境啟動否則代表本地啟動 if (args.length > 0 && args[0].equals("cluster")) { try { StormSubmitter.submitTopology("ClusterWordCountToRedisApp", new Config(), builder.createTopology()); } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); } } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalWordCountToRedisApp", new Config(), builder.createTopology()); } } } ~~~ ### 2.8 啟動測試 可以用直接使用本地模式運行,也可以打包后提交到服務器集群運行。本倉庫提供的源碼默認采用 `maven-shade-plugin` 進行打包,打包命令如下: ~~~ # mvn clean package -D maven.test.skip=true ~~~ 啟動后,查看 Redis 中的數據: ![](https://img.kancloud.cn/43/f7/43f7dbf101de079e1e62fa8c5193585e_932x295.png) ## 三、storm-redis 實現原理 ### 3.1 AbstractRedisBolt `RedisLookupBolt`、`RedisStoreBolt`、`RedisFilterBolt` 均繼承自 `AbstractRedisBolt` 抽象類,和我們自定義實現 Bolt 一樣,`AbstractRedisBolt` 間接繼承自 `BaseRichBolt`。 ![](https://img.kancloud.cn/aa/0a/aa0a079c5b6f0861706a16546bb133b5_613x489.png) `AbstractRedisBolt` 中比較重要的是 prepare 方法,在該方法中通過外部傳入的 jedis 連接池配置 ( jedisPoolConfig/jedisClusterConfig) 創建用于管理 Jedis 實例的容器 `JedisCommandsInstanceContainer`。 ~~~ public abstract class AbstractRedisBolt extends BaseTickTupleAwareRichBolt { protected OutputCollector collector; private transient JedisCommandsInstanceContainer container; private JedisPoolConfig jedisPoolConfig; private JedisClusterConfig jedisClusterConfig; ...... @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) { // FIXME: stores map (stormConf), topologyContext and expose these to derived classes this.collector = collector; if (jedisPoolConfig != null) { this.container = JedisCommandsContainerBuilder.build(jedisPoolConfig); } else if (jedisClusterConfig != null) { this.container = JedisCommandsContainerBuilder.build(jedisClusterConfig); } else { throw new IllegalArgumentException("Jedis configuration not found"); } } ....... } ~~~ `JedisCommandsInstanceContainer` 的 `build()` 方法如下,實際上就是創建 JedisPool 或 JedisCluster 并傳入容器中。 ~~~ public static JedisCommandsInstanceContainer build(JedisPoolConfig config) { JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase()); return new JedisContainer(jedisPool); } public static JedisCommandsInstanceContainer build(JedisClusterConfig config) { JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getTimeout(), config.getMaxRedirections(), config.getPassword(), DEFAULT_POOL_CONFIG); return new JedisClusterContainer(jedisCluster); } ~~~ ### 3.2 RedisStoreBolt和RedisLookupBolt `RedisStoreBolt` 中比較重要的是 process 方法,該方法主要從 storeMapper 中獲取傳入 key/value 的值,并按照其存儲類型 `dataType` 調用 jedisCommand 的對應方法進行存儲。 RedisLookupBolt 的實現基本類似,從 lookupMapper 中獲取傳入的 key 值,并進行查詢操作。 ~~~ public class RedisStoreBolt extends AbstractRedisBolt { private final RedisStoreMapper storeMapper; private final RedisDataTypeDescription.RedisDataType dataType; private final String additionalKey; public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) { super(config); this.storeMapper = storeMapper; RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription(); this.dataType = dataTypeDescription.getDataType(); this.additionalKey = dataTypeDescription.getAdditionalKey(); } public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) { super(config); this.storeMapper = storeMapper; RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription(); this.dataType = dataTypeDescription.getDataType(); this.additionalKey = dataTypeDescription.getAdditionalKey(); } @Override public void process(Tuple input) { String key = storeMapper.getKeyFromTuple(input); String value = storeMapper.getValueFromTuple(input); JedisCommands jedisCommand = null; try { jedisCommand = getInstance(); switch (dataType) { case STRING: jedisCommand.set(key, value); break; case LIST: jedisCommand.rpush(key, value); break; case HASH: jedisCommand.hset(additionalKey, key, value); break; case SET: jedisCommand.sadd(key, value); break; case SORTED_SET: jedisCommand.zadd(additionalKey, Double.valueOf(value), key); break; case HYPER_LOG_LOG: jedisCommand.pfadd(key, value); break; case GEO: String[] array = value.split(":"); if (array.length != 2) { throw new IllegalArgumentException("value structure should be longitude:latitude"); } double longitude = Double.valueOf(array[0]); double latitude = Double.valueOf(array[1]); jedisCommand.geoadd(additionalKey, longitude, latitude, key); break; default: throw new IllegalArgumentException("Cannot process such data type: " + dataType); } collector.ack(input); } catch (Exception e) { this.collector.reportError(e); this.collector.fail(input); } finally { returnInstance(jedisCommand); } } ......... } ~~~ ### 3.3 JedisCommands JedisCommands 接口中定義了所有的 Redis 客戶端命令,它有以下三個實現類,分別是 Jedis、JedisCluster、ShardedJedis。Strom 中主要使用前兩種實現類,具體調用哪一個實現類來執行命令,由傳入的是 jedisPoolConfig 還是 jedisClusterConfig 來決定。 ![](https://img.kancloud.cn/d9/62/d962c17c556dbbb6ae850aa3adf47254_737x221.png) ### 3.4 RedisMapper 和 TupleMapper RedisMapper 和 TupleMapper 定義了 tuple 和 Redis 中的數據如何進行映射轉換。 ![](https://img.kancloud.cn/8c/10/8c10782d5ce840854fdc00b443b5ab18_883x233.png) #### 1\. TupleMapper TupleMapper 主要定義了兩個方法: * getKeyFromTuple(ITuple tuple): 從 tuple 中獲取那個字段作為 Key; * getValueFromTuple(ITuple tuple):從 tuple 中獲取那個字段作為 Value; #### 2\. RedisMapper 定義了獲取數據類型的方法 `getDataTypeDescription()`,RedisDataTypeDescription 中 RedisDataType 枚舉類定義了所有可用的 Redis 數據類型: ~~~ public class RedisDataTypeDescription implements Serializable { public enum RedisDataType { STRING, HASH, LIST, SET, SORTED_SET, HYPER_LOG_LOG, GEO } ...... } ~~~ #### 3\. RedisStoreMapper RedisStoreMapper 繼承 TupleMapper 和 RedisMapper 接口,用于數據存儲時,沒有定義額外方法。 #### 4\. RedisLookupMapper RedisLookupMapper 繼承 TupleMapper 和 RedisMapper 接口: * 定義了 declareOutputFields 方法,聲明輸出的字段。 * 定義了 toTuple 方法,將查詢結果組裝為 Storm 的 Values 的集合,并用于發送。 下面的例子表示從輸入 `Tuple` 的獲取 `word` 字段作為 key,使用 `RedisLookupBolt` 進行查詢后,將 key 和查詢結果 value 組裝為 values 并發送到下一個處理單元。 ~~~ class WordCountRedisLookupMapper implements RedisLookupMapper { private RedisDataTypeDescription description; private final String hashKey = "wordCount"; public WordCountRedisLookupMapper() { description = new RedisDataTypeDescription( RedisDataTypeDescription.RedisDataType.HASH, hashKey); } @Override public List<Values> toTuple(ITuple input, Object value) { String member = getKeyFromTuple(input); List<Values> values = Lists.newArrayList(); values.add(new Values(member, value)); return values; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("wordName", "count")); } @Override public RedisDataTypeDescription getDataTypeDescription() { return description; } @Override public String getKeyFromTuple(ITuple tuple) { return tuple.getStringByField("word"); } @Override public String getValueFromTuple(ITuple tuple) { return null; } } ~~~ #### 5\. RedisFilterMapper RedisFilterMapper 繼承 TupleMapper 和 RedisMapper 接口,用于查詢數據時,定義了 declareOutputFields 方法,聲明輸出的字段。如下面的實現: ~~~ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("wordName", "count")); } ~~~ ## 四、自定義RedisBolt實現詞頻統計 ### 4.1 實現原理 自定義 RedisBolt:主要利用 Redis 中哈希結構的 `hincrby key field` 命令進行詞頻統計。在 Redis 中 `hincrby` 的執行效果如下。hincrby 可以將字段按照指定的值進行遞增,如果該字段不存在的話,還會新建該字段,并賦值為 0。通過這個命令可以非常輕松的實現詞頻統計功能。 ~~~ redis> HSET myhash field 5 (integer) 1 redis> HINCRBY myhash field 1 (integer) 6 redis> HINCRBY myhash field -1 (integer) 5 redis> HINCRBY myhash field -10 (integer) -5 redis> ~~~ ### 4.2 項目結構 ![](https://img.kancloud.cn/5c/d8/5cd8e61ac5c9241e849aebabb91563d2_716x258.png) ### 4.3 自定義RedisBolt的代碼實現 ~~~ /** * 自定義 RedisBolt 利用 Redis 的哈希數據結構的 hincrby key field 命令進行詞頻統計 */ public class RedisCountStoreBolt extends AbstractRedisBolt { private final RedisStoreMapper storeMapper; private final RedisDataTypeDescription.RedisDataType dataType; private final String additionalKey; public RedisCountStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) { super(config); this.storeMapper = storeMapper; RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription(); this.dataType = dataTypeDescription.getDataType(); this.additionalKey = dataTypeDescription.getAdditionalKey(); } @Override protected void process(Tuple tuple) { String key = storeMapper.getKeyFromTuple(tuple); String value = storeMapper.getValueFromTuple(tuple); JedisCommands jedisCommand = null; try { jedisCommand = getInstance(); if (dataType == RedisDataTypeDescription.RedisDataType.HASH) { jedisCommand.hincrBy(additionalKey, key, Long.valueOf(value)); } else { throw new IllegalArgumentException("Cannot process such data type for Count: " + dataType); } collector.ack(tuple); } catch (Exception e) { this.collector.reportError(e); this.collector.fail(tuple); } finally { returnInstance(jedisCommand); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } ~~~ ### 4.4 CustomRedisCountApp ~~~ /** * 利用自定義的 RedisBolt 實現詞頻統計 */ public class CustomRedisCountApp { private static final String DATA_SOURCE_SPOUT = "dataSourceSpout"; private static final String SPLIT_BOLT = "splitBolt"; private static final String STORE_BOLT = "storeBolt"; private static final String REDIS_HOST = "192.168.200.226"; private static final int REDIS_PORT = 6379; public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout()); // split builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT); // save to redis and count JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(REDIS_HOST).setPort(REDIS_PORT).build(); RedisStoreMapper storeMapper = new WordCountStoreMapper(); RedisCountStoreBolt countStoreBolt = new RedisCountStoreBolt(poolConfig, storeMapper); builder.setBolt(STORE_BOLT, countStoreBolt).shuffleGrouping(SPLIT_BOLT); // 如果外部傳參 cluster 則代表線上環境啟動,否則代表本地啟動 if (args.length > 0 && args[0].equals("cluster")) { try { StormSubmitter.submitTopology("ClusterCustomRedisCountApp", new Config(), builder.createTopology()); } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); } } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalCustomRedisCountApp", new Config(), builder.createTopology()); } } } ~~~ 作者:heibaiying 鏈接:https://juejin.cn/post/6844903950039121934 來源:掘金 著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看