<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                ## 一、Storm集成HDFS ### 1.1 項目結構 ![](https://img.kancloud.cn/4d/47/4d477c581db864125400a7c5d98c6b4a_590x229.png) > 本用例源碼下載地址:[storm-hdfs-integration](https://github.com/heibaiying/BigData-Notes/tree/master/code/Storm/storm-hdfs-integration) ### 1.2 項目主要依賴 項目主要依賴如下,有兩個地方需要注意: * 這里由于我服務器上安裝的是 CDH 版本的 Hadoop,在導入依賴時引入的也是 CDH 版本的依賴,需要使用 `<repository>` 標簽指定 CDH 的倉庫地址; * `hadoop-common`、`hadoop-client`、`hadoop-hdfs` 均需要排除 `slf4j-log4j12` 依賴,原因是 `storm-core` 中已經有該依賴,不排除的話有 JAR 包沖突的風險; ~~~ <properties> <storm.version>1.2.2</storm.version> </properties> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> </dependency> <!--Storm 整合 HDFS 依賴--> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hdfs</artifactId> <version>${storm.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.15.2</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-cdh5.15.2</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.15.2</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> </dependencies> ~~~ ### 1.3 DataSourceSpout ~~~ /** * 產生詞頻樣本的數據源 */ public class DataSourceSpout extends BaseRichSpout { private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); private SpoutOutputCollector spoutOutputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; } @Override public void nextTuple() { // 模擬產生數據 String lineData = productData(); spoutOutputCollector.emit(new Values(lineData)); Utils.sleep(1000); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("line")); } /** * 模擬數據 */ private String productData() { Collections.shuffle(list); Random random = new Random(); int endIndex = random.nextInt(list.size()) % (list.size()) + 1; return StringUtils.join(list.toArray(), "\t", 0, endIndex); } } ~~~ 產生的模擬數據格式如下: ~~~ Spark HBase Hive Flink Storm Hadoop HBase Spark Flink HBase Storm HBase Hadoop Hive Flink HBase Flink Hive Storm Hive Flink Hadoop HBase Hive Hadoop Spark HBase Storm ~~~ ### 1.4 將數據存儲到HDFS 這里 HDFS 的地址和數據存儲路徑均使用了硬編碼,在實際開發中可以通過外部傳參指定,這樣程序更為靈活。 ~~~ public class DataToHdfsApp { private static final String DATA_SOURCE_SPOUT = "dataSourceSpout"; private static final String HDFS_BOLT = "hdfsBolt"; public static void main(String[] args) { // 指定 Hadoop 的用戶名 如果不指定,則在 HDFS 創建目錄時候有可能拋出無權限的異常 (RemoteException: Permission denied) System.setProperty("HADOOP_USER_NAME", "root"); // 定義輸出字段 (Field) 之間的分隔符 RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter("|"); // 同步策略: 每 100 個 tuples 之后就會把數據從緩存刷新到 HDFS 中 SyncPolicy syncPolicy = new CountSyncPolicy(100); // 文件策略: 每個文件大小上限 1M,超過限定時,創建新文件并繼續寫入 FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, Units.MB); // 定義存儲路徑 FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPath("/storm-hdfs/"); // 定義 HdfsBolt HdfsBolt hdfsBolt = new HdfsBolt() .withFsUrl("hdfs://hadoop001:8020") .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); // 構建 Topology TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout()); // save to HDFS builder.setBolt(HDFS_BOLT, hdfsBolt, 1).shuffleGrouping(DATA_SOURCE_SPOUT); // 如果外部傳參 cluster 則代表線上環境啟動,否則代表本地啟動 if (args.length > 0 && args[0].equals("cluster")) { try { StormSubmitter.submitTopology("ClusterDataToHdfsApp", new Config(), builder.createTopology()); } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); } } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalDataToHdfsApp", new Config(), builder.createTopology()); } } } ~~~ ### 1.5 啟動測試 可以用直接使用本地模式運行,也可以打包后提交到服務器集群運行。本倉庫提供的源碼默認采用 `maven-shade-plugin` 進行打包,打包命令如下: ~~~ # mvn clean package -D maven.test.skip=true ~~~ 運行后,數據會存儲到 HDFS 的 `/storm-hdfs` 目錄下。使用以下命令可以查看目錄內容: ~~~ # 查看目錄內容 hadoop fs -ls /storm-hdfs # 監聽文內容變化 hadoop fs -tail -f /strom-hdfs/文件名 ~~~ ![](https://img.kancloud.cn/cb/98/cb9870f3f80e8097f43e767e3cc3b84a_953x445.png) ## 二、Storm集成HBase ### 2.1 項目結構 集成用例: 進行詞頻統計并將最后的結果存儲到 HBase,項目主要結構如下: ![](https://img.kancloud.cn/e9/7e/e97eafb858d54fb15c8990c31147a83f_935x264.png) > 本用例源碼下載地址:[storm-hbase-integration](https://github.com/heibaiying/BigData-Notes/tree/master/code/Storm/storm-hbase-integration) ### 2.2 項目主要依賴 ~~~ <properties> <storm.version>1.2.2</storm.version> </properties> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> </dependency> <!--Storm 整合 HBase 依賴--> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hbase</artifactId> <version>${storm.version}</version> </dependency> </dependencies> 復制代碼 ~~~ ### 2.3 DataSourceSpout ~~~ /** * 產生詞頻樣本的數據源 */ public class DataSourceSpout extends BaseRichSpout { private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); private SpoutOutputCollector spoutOutputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; } @Override public void nextTuple() { // 模擬產生數據 String lineData = productData(); spoutOutputCollector.emit(new Values(lineData)); Utils.sleep(1000); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("line")); } /** * 模擬數據 */ private String productData() { Collections.shuffle(list); Random random = new Random(); int endIndex = random.nextInt(list.size()) % (list.size()) + 1; return StringUtils.join(list.toArray(), "\t", 0, endIndex); } } 復制代碼 ~~~ 產生的模擬數據格式如下: ~~~ Spark HBase Hive Flink Storm Hadoop HBase Spark Flink HBase Storm HBase Hadoop Hive Flink HBase Flink Hive Storm Hive Flink Hadoop HBase Hive Hadoop Spark HBase Storm 復制代碼 ~~~ ### 2.4 SplitBolt ~~~ /** * 將每行數據按照指定分隔符進行拆分 */ public class SplitBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String line = input.getStringByField("line"); String[] words = line.split("\t"); for (String word : words) { collector.emit(tuple(word, 1)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } 復制代碼 ~~~ ### 2.5 CountBolt ~~~ /** * 進行詞頻統計 */ public class CountBolt extends BaseRichBolt { private Map<String, Integer> counts = new HashMap<>(); private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector=collector; } @Override public void execute(Tuple input) { String word = input.getStringByField("word"); Integer count = counts.get(word); if (count == null) { count = 0; } count++; counts.put(word, count); // 輸出 collector.emit(new Values(word, String.valueOf(count))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } ~~~ ### 2.6 WordCountToHBaseApp ~~~ /** * 進行詞頻統計 并將統計結果存儲到 HBase 中 */ public class WordCountToHBaseApp { private static final String DATA_SOURCE_SPOUT = "dataSourceSpout"; private static final String SPLIT_BOLT = "splitBolt"; private static final String COUNT_BOLT = "countBolt"; private static final String HBASE_BOLT = "hbaseBolt"; public static void main(String[] args) { // storm 的配置 Config config = new Config(); // HBase 的配置 Map<String, Object> hbConf = new HashMap<>(); hbConf.put("hbase.rootdir", "hdfs://hadoop001:8020/hbase"); hbConf.put("hbase.zookeeper.quorum", "hadoop001:2181"); // 將 HBase 的配置傳入 Storm 的配置中 config.put("hbase.conf", hbConf); // 定義流數據與 HBase 中數據的映射 SimpleHBaseMapper mapper = new SimpleHBaseMapper() .withRowKeyField("word") .withColumnFields(new Fields("word","count")) .withColumnFamily("info"); /* * 給 HBaseBolt 傳入表名、數據映射關系、和 HBase 的配置信息 * 表需要預先創建: create 'WordCount','info' */ HBaseBolt hbase = new HBaseBolt("WordCount", mapper) .withConfigKey("hbase.conf"); // 構建 Topology TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout(),1); // split builder.setBolt(SPLIT_BOLT, new SplitBolt(), 1).shuffleGrouping(DATA_SOURCE_SPOUT); // count builder.setBolt(COUNT_BOLT, new CountBolt(),1).shuffleGrouping(SPLIT_BOLT); // save to HBase builder.setBolt(HBASE_BOLT, hbase, 1).shuffleGrouping(COUNT_BOLT); // 如果外部傳參 cluster 則代表線上環境啟動,否則代表本地啟動 if (args.length > 0 && args[0].equals("cluster")) { try { StormSubmitter.submitTopology("ClusterWordCountToRedisApp", config, builder.createTopology()); } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); } } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalWordCountToRedisApp", config, builder.createTopology()); } } } ~~~ ### 2.7 啟動測試 可以用直接使用本地模式運行,也可以打包后提交到服務器集群運行。本倉庫提供的源碼默認采用 `maven-shade-plugin` 進行打包,打包命令如下: ~~~ # mvn clean package -D maven.test.skip=true ~~~ 運行后,數據會存儲到 HBase 的 `WordCount` 表中。使用以下命令查看表的內容: ~~~ hbase > scan 'WordCount' ~~~ ![](https://img.kancloud.cn/1c/fa/1cfa94aff17f2890d3e6a277e9df1356_867x259.png) ### 2.8 withCounterFields 在上面的用例中我們是手動編碼來實現詞頻統計,并將最后的結果存儲到 HBase 中。其實也可以在構建 `SimpleHBaseMapper` 的時候通過 `withCounterFields` 指定 count 字段,被指定的字段會自動進行累加操作,這樣也可以實現詞頻統計。需要注意的是 withCounterFields 指定的字段必須是 Long 類型,不能是 String 類型。 ~~~ SimpleHBaseMapper mapper = new SimpleHBaseMapper() .withRowKeyField("word") .withColumnFields(new Fields("word")) .withCounterFields(new Fields("count")) .withColumnFamily("cf"); ~~~ ## 參考資料 1. [Apache HDFS Integration](http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-hdfs.html) 2. [Apache HBase Integration](http://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-hbase.html) 作者:heibaiying 鏈接:https://juejin.cn/post/6844903950039138318 來源:掘金 著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看