<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                [TOC] ## 一、簡介 下圖為 Strom 的運行流程圖,在開發 Storm 流處理程序時,我們需要采用內置或自定義實現 `spout`(數據源) 和 `bolt`(處理單元),并通過 `TopologyBuilder` 將它們之間進行關聯,形成 `Topology`。 ![](https://img.kancloud.cn/7d/6c/7d6c576087dbeba5d2d981967c04e0cf_771x322.png) ## 二、IComponent接口 `IComponent` 接口定義了 Topology 中所有組件 (spout/bolt) 的公共方法,自定義的 spout 或 bolt 必須直接或間接實現這個接口。 ~~~ public interface IComponent extends Serializable { /** * 聲明此拓撲的所有流的輸出模式。 * @param declarer 這用于聲明輸出流 id,輸出字段以及每個輸出流是否是直接流(direct stream) */ void declareOutputFields(OutputFieldsDeclarer declarer); /** * 聲明此組件的配置。 * */ Map<String, Object> getComponentConfiguration(); } 復制代碼 ~~~ ## 三、Spout ### 3.1 ISpout接口 自定義的 spout 需要實現 `ISpout` 接口,它定義了 spout 的所有可用方法: ~~~ public interface ISpout extends Serializable { /** * 組件初始化時候被調用 * * @param conf ISpout 的配置 * @param context 應用上下文,可以通過其獲取任務 ID 和組件 ID,輸入和輸出信息等。 * @param collector 用來發送 spout 中的 tuples,它是線程安全的,建議保存為此 spout 對象的實例變量 */ void open(Map conf, TopologyContext context, SpoutOutputCollector collector); /** * ISpout 將要被關閉的時候調用。但是其不一定會被執行,如果在集群環境中通過 kill -9 殺死進程時其就無法被執行。 */ void close(); /** * 當 ISpout 從停用狀態激活時被調用 */ void activate(); /** * 當 ISpout 停用時候被調用 */ void deactivate(); /** * 這是一個核心方法,主要通過在此方法中調用 collector 將 tuples 發送給下一個接收器,這個方法必須是非阻塞的。 * nextTuple/ack/fail/是在同一個線程中執行的,所以不用考慮線程安全方面。當沒有 tuples 發出時應該讓 * nextTuple 休眠 (sleep) 一下,以免浪費 CPU。 */ void nextTuple(); /** * 通過 msgId 進行 tuples 處理成功的確認,被確認后的 tuples 不會再次被發送 */ void ack(Object msgId); /** * 通過 msgId 進行 tuples 處理失敗的確認,被確認后的 tuples 會再次被發送進行處理 */ void fail(Object msgId); } 復制代碼 ~~~ ### 3.2 BaseRichSpout抽象類 **通常情況下,我們實現自定義的 Spout 時不會直接去實現 `ISpout` 接口,而是繼承 `BaseRichSpout`。**`BaseRichSpout` 繼承自 `BaseCompont`,同時實現了 `IRichSpout` 接口。 ![](https://img.kancloud.cn/62/94/62949958827568397fdafe9a1fbfa0fc_440x319.png) `IRichSpout` 接口繼承自 `ISpout` 和 `IComponent`,自身并沒有定義任何方法: ~~~ public interface IRichSpout extends ISpout, IComponent { } ~~~ `BaseComponent` 抽象類空實現了 `IComponent` 中 `getComponentConfiguration` 方法: ~~~ public abstract class BaseComponent implements IComponent { @Override public Map<String, Object> getComponentConfiguration() { return null; } } ~~~ `BaseRichSpout` 繼承自 `BaseCompont` 類并實現了 `IRichSpout` 接口,并且空實現了其中部分方法: ~~~ public abstract class BaseRichSpout extends BaseComponent implements IRichSpout { @Override public void close() {} @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} } ~~~ 通過這樣的設計,我們在繼承 `BaseRichSpout` 實現自定義 spout 時,就只有三個方法必須實現: * **open** : 來源于 ISpout,可以通過此方法獲取用來發送 tuples 的 `SpoutOutputCollector`; * **nextTuple** :來源于 ISpout,必須在此方法內部發送 tuples; * **declareOutputFields** :來源于 IComponent,聲明發送的 tuples 的名稱,這樣下一個組件才能知道如何接受。 ## 四、Bolt bolt 接口的設計與 spout 的類似: ### 4.1 IBolt 接口 ~~~ /** * 在客戶端計算機上創建的 IBolt 對象。會被被序列化到 topology 中(使用 Java 序列化),并提交給集群的主機(Nimbus)。 * Nimbus 啟動 workers 反序列化對象,調用 prepare,然后開始處理 tuples。 */ public interface IBolt extends Serializable { /** * 組件初始化時候被調用 * * @param conf storm 中定義的此 bolt 的配置 * @param context 應用上下文,可以通過其獲取任務 ID 和組件 ID,輸入和輸出信息等。 * @param collector 用來發送 spout 中的 tuples,它是線程安全的,建議保存為此 spout 對象的實例變量 */ void prepare(Map stormConf, TopologyContext context, OutputCollector collector); /** * 處理單個 tuple 輸入。 * * @param Tuple 對象包含關于它的元數據(如來自哪個組件/流/任務) */ void execute(Tuple input); /** * IBolt 將要被關閉的時候調用。但是其不一定會被執行,如果在集群環境中通過 kill -9 殺死進程時其就無法被執行。 */ void cleanup(); ~~~ ### 4.2 BaseRichBolt抽象類 同樣的,在實現自定義 bolt 時,通常是繼承 `BaseRichBolt` 抽象類來實現。`BaseRichBolt` 繼承自 `BaseComponent` 抽象類并實現了 `IRichBolt` 接口。 ![](https://img.kancloud.cn/e0/be/e0be0523d8cca8d1f4df91cb863827eb_469x361.png) `IRichBolt` 接口繼承自 `IBolt` 和 `IComponent`,自身并沒有定義任何方法: ~~~ public interface IRichBolt extends IBolt, IComponent { } ~~~ 通過這樣的設計,在繼承 `BaseRichBolt` 實現自定義 bolt 時,就只需要實現三個必須的方法: * **prepare**: 來源于 IBolt,可以通過此方法獲取用來發送 tuples 的 `OutputCollector`; * **execute**:來源于 IBolt,處理 tuples 和發送處理完成的 tuples; * **declareOutputFields** :來源于 IComponent,聲明發送的 tuples 的名稱,這樣下一個組件才能知道如何接收。 ## 五、詞頻統計案例 ### 5.1 案例簡介 這里我們使用自定義的 `DataSourceSpout` 產生詞頻數據,然后使用自定義的 `SplitBolt` 和 `CountBolt` 來進行詞頻統計。 ![](https://img.kancloud.cn/43/8e/438ee67dfa720eb94533e3c4cf7c892f_794x262.png) > 案例源碼下載地址:[storm-word-count](https://github.com/heibaiying/BigData-Notes/tree/master/code/Storm/storm-word-count) ### 5.2 代碼實現 #### 1\. 項目依賴 ~~~ <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.2.2</version> </dependency> ~~~ #### 2\. DataSourceSpout ~~~ public class DataSourceSpout extends BaseRichSpout { private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); private SpoutOutputCollector spoutOutputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; } @Override public void nextTuple() { // 模擬產生數據 String lineData = productData(); spoutOutputCollector.emit(new Values(lineData)); Utils.sleep(1000); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("line")); } /** * 模擬數據 */ private String productData() { Collections.shuffle(list); Random random = new Random(); int endIndex = random.nextInt(list.size()) % (list.size()) + 1; return StringUtils.join(list.toArray(), "\t", 0, endIndex); } } ~~~ 上面類使用 `productData` 方法來產生模擬數據,產生數據的格式如下: ~~~ Spark HBase Hive Flink Storm Hadoop HBase Spark Flink HBase Storm HBase Hadoop Hive Flink HBase Flink Hive Storm Hive Flink Hadoop HBase Hive Hadoop Spark HBase Storm ~~~ #### 3\. SplitBolt ~~~ public class SplitBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector=collector; } @Override public void execute(Tuple input) { String line = input.getStringByField("line"); String[] words = line.split("\t"); for (String word : words) { collector.emit(new Values(word)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } ~~~ #### 4\. CountBolt ~~~ public class CountBolt extends BaseRichBolt { private Map<String, Integer> counts = new HashMap<>(); @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } @Override public void execute(Tuple input) { String word = input.getStringByField("word"); Integer count = counts.get(word); if (count == null) { count = 0; } count++; counts.put(word, count); // 輸出 System.out.print("當前實時統計結果:"); counts.forEach((key, value) -> System.out.print(key + ":" + value + "; ")); System.out.println(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } ~~~ #### 5\. LocalWordCountApp 通過 TopologyBuilder 將上面定義好的組件進行串聯形成 Topology,并提交到本地集群(LocalCluster)運行。通常在開發中,可先用本地模式進行測試,測試完成后再提交到服務器集群運行。 ~~~ public class LocalWordCountApp { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("DataSourceSpout", new DataSourceSpout()); // 指明將 DataSourceSpout 的數據發送到 SplitBolt 中處理 builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout"); // 指明將 SplitBolt 的數據發送到 CountBolt 中 處理 builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt"); // 創建本地集群用于測試 這種模式不需要本機安裝 storm,直接運行該 Main 方法即可 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalWordCountApp", new Config(), builder.createTopology()); } } ~~~ #### 6\. 運行結果 啟動 `WordCountApp` 的 main 方法即可運行,采用本地模式 Storm 會自動在本地搭建一個集群,所以啟動的過程會稍慢一點,啟動成功后即可看到輸出日志。 ![](https://img.kancloud.cn/f3/89/f389763d89557d51e138579ee4afed49_927x360.png) ## 六、提交到服務器集群運行 ### 6.1 代碼更改 提交到服務器的代碼和本地代碼略有不同,提交到服務器集群時需要使用 `StormSubmitter` 進行提交。主要代碼如下: > 為了結構清晰,這里新建 ClusterWordCountApp 類來演示集群模式的提交。實際開發中可以將兩種模式的代碼寫在同一個類中,通過外部傳參來決定啟動何種模式。 ~~~ public class ClusterWordCountApp { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("DataSourceSpout", new DataSourceSpout()); // 指明將 DataSourceSpout 的數據發送到 SplitBolt 中處理 builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout"); // 指明將 SplitBolt 的數據發送到 CountBolt 中 處理 builder.setBolt("CountBolt", new CountBolt()).shuffleGrouping("SplitBolt"); // 使用 StormSubmitter 提交 Topology 到服務器集群 try { StormSubmitter.submitTopology("ClusterWordCountApp", new Config(), builder.createTopology()); } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); } } } ~~~ ### 6.2 打包上傳 打包后上傳到服務器任意位置,這里我打包后的名稱為 `storm-word-count-1.0.jar` ~~~ # mvn clean package -Dmaven.test.skip=true ~~~ ### 6.3 提交Topology 使用以下命令提交 Topology 到集群: ~~~ # 命令格式: storm jar jar包位置 主類的全路徑 ...可選傳參 storm jar /usr/appjar/storm-word-count-1.0.jar com.heibaiying.wordcount.ClusterWordCountApp ~~~ 出現 `successfully` 則代表提交成功: ![](https://img.kancloud.cn/a3/60/a3602e1fe295555fe64376fe850e2f90_1056x352.png) ### 6.4 查看Topology與停止Topology(命令行方式) ~~~ # 查看所有Topology storm list # 停止 storm kill topology-name [-w wait-time-secs] storm kill ClusterWordCountApp -w 3 ~~~ ![](https://img.kancloud.cn/62/ec/62ecea5797dbbb8446382d51a5c8fd43_1044x390.png) ### 6.5 查看Topology與停止Topology(界面方式) 使用 UI 界面同樣也可進行停止操作,進入 WEB UI 界面(8080 端口),在 `Topology Summary` 中點擊對應 Topology 即可進入詳情頁面進行操作。 ![](https://img.kancloud.cn/fd/03/fd035e1d02b76c1dfa172b58c3d40caf_1342x351.png) ## 七、關于項目打包的擴展說明 ### mvn package的局限性 在上面的步驟中,我們沒有在 POM 中配置任何插件,就直接使用 `mvn package` 進行項目打包,這對于沒有使用外部依賴包的項目是可行的。但如果項目中使用了第三方 JAR 包,就會出現問題,因為 `package` 打包后的 JAR 中是不含有依賴包的,如果此時你提交到服務器上運行,就會出現找不到第三方依賴的異常。 這時候可能大家會有疑惑,在我們的項目中不是使用了 `storm-core` 這個依賴嗎?其實上面之所以我們能運行成功,是因為在 Storm 的集群環境中提供了這個 JAR 包,在安裝目錄的 lib 目錄下: ![](https://img.kancloud.cn/5c/36/5c366b19942e8f71a856ed4490dda903_630x369.png) 為了說明這個問題我在 Maven 中引入了一個第三方的 JAR 包,并修改產生數據的方法: ~~~ <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.8.1</version> </dependency> ~~~ `StringUtils.join()` 這個方法在 `commons.lang3` 和 `storm-core` 中都有,原來的代碼無需任何更改,只需要在 `import` 時指明使用 `commons.lang3`。 ~~~ import org.apache.commons.lang3.StringUtils; private String productData() { Collections.shuffle(list); Random random = new Random(); int endIndex = random.nextInt(list.size()) % (list.size()) + 1; return StringUtils.join(list.toArray(), "\t", 0, endIndex); } ~~~ 此時直接使用 `mvn clean package` 打包運行,就會拋出下圖的異常。因此這種直接打包的方式并不適用于實際的開發,因為實際開發中通常都是需要第三方的 JAR 包。 ![](https://img.kancloud.cn/23/d1/23d1e49dae5a65cd797e8ead1849ce1b_1035x412.png) 想把依賴包一并打入最后的 JAR 中,maven 提供了兩個插件來實現,分別是 `maven-assembly-plugin` 和 `maven-shade-plugin`。鑒于本篇文章篇幅已經比較長,且關于 Storm 打包還有很多需要說明的地方,所以關于 Storm 的打包方式單獨整理至下一篇文章 作者:heibaiying 鏈接:https://juejin.cn/post/6844903950034960391 來源:掘金 著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看