<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                [TOC] # 數據樣本 ~~~ i am jdxia i am xjd i am jdxia i am jelly ~~~ # jar包 ~~~ <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <!--<scope>provided</scope>--> <version>0.9.5</version> </dependency> ~~~ 安裝log4j # 數據獲取 BaseRichSpout類是ISpout接口和IComponent接口的一個簡便的實現 open方法中接收三個參數 * conf包含了storm配置信息的map. * TopologyContext對象提供了topology中組件的信息 * SpoutOutputCollector對象提供了發射tuple的方法 ~~~ package com.learnstorm; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import org.apache.commons.lang.StringUtils; import java.io.*; import java.util.ArrayList; import java.util.List; import java.util.Map; //數據獲取 public class MyLocalFileSpout extends BaseRichSpout { //控制數據輸出 private SpoutOutputCollector collector; //讀取數據的 private BufferedReader bufferedReader; //初始化方法 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; try { //定義這個去讀取數據 this.bufferedReader = new BufferedReader(new FileReader(new File("/Users/jdxia/Desktop/MyFile/i.txt"))); } catch (FileNotFoundException e) { e.printStackTrace(); } } //storm流式計算的特征就是數據一條一條的處理 // while(true) { // this.nextTuple(); // } //這個方法會被循環調用 @Override public void nextTuple() { //每被調用一次就會發送一條數據出去 try { //讀取一行 String line = bufferedReader.readLine(); //如果不是空的話 if (StringUtils.isNotBlank(line)) { List<Object> arrayList = new ArrayList<Object>(); //把數據放到ArrayList中 arrayList.add(line); //把數據發出去 collector.emit(arrayList); } } catch (IOException e) { e.printStackTrace(); } } //定義下我的輸出 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("juzi")); } } ~~~ # 數據截取 BaseBasicBolt是IComponent和IBolt接口的一個簡便實現 BaseBasicBolt中還有個prepare()方法,是bolt初始化的時候調用的 ~~~ package com.learnstorm; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; //相當于map-->world,1 //業務邏輯 //對句子進行切割 public class MySplitBolt extends BaseBasicBolt { //處理函數 @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { //1.數據如何獲取,用tuple獲取,tuple是List數據結構,消息傳輸的基本單元 //強轉為string,juzi是上一步定義的 String juzi = (String) tuple.getValueByField("juzi"); //2.進行切割 String[] strings = juzi.split(" "); //3.發送數據 for (String word : strings) { //我們之前用ArrayList存儲,這邊怎么變為Values //可以看下Values的源碼,他是繼承了ArrayList,他存的時候用了一個循環 //values對象幫我們生成個list basicOutputCollector.emit(new Values(word, 1)); } } //定義下我的輸出 //單詞world和他的次數 @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word", "num")); } } ~~~ # 單詞統計 這里面用到了HashMap.這是可序列化的 如果spout或者bolt在序列化之前,(比如在構造函數中生成)實例化了任何無法序列化的實例變量,在進行序列化的時候會拋出NotSerialiableException 當topology發布時,所有的bolt和spout組件首先進行序列化,然后發布到網絡中. 通常情況下最好在構造函數中對基本數據類型和可序列化的對象進行賦值和實例化,在prepare()方法中對不可序列化的對象進行實例化 bolt中可以加入cleanup()方法,這個方法在IBolt中定義.storm在終止一個Bolt之前會調用這個方法.通常情況下cleanup會用來釋放bolt占用的資源 當在集群中運行的時候,cleanup是不可靠的,不能保證會執行 ~~~ package com.learnstorm; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; import java.util.HashMap; import java.util.Map; //打印 public class MyWordCountAndPrintBolt extends BaseBasicBolt { private Map<String, Integer> wordCountMap = new HashMap<String, Integer>(); //處理函數 @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { //根據之前定義的word和num //強轉為string String word = (String) tuple.getValueByField("word"); Integer num = (Integer) tuple.getValueByField("num"); //1.查看單詞對應的value是否存在 Integer integer = wordCountMap.get(word); if (integer == null || integer.intValue() == 0) { //如果不存在就直接放入新的 wordCountMap.put(word, num); } else { //如果之前已經有了,就把對應統計加上 wordCountMap.put(word, integer.intValue() + num); } System.out.println(wordCountMap); } //不需要定義輸出字段了 @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } //終止bolt會調用這個方法 @Override public void cleanup() { } } ~~~ # 任務描述 這邊寫的是本地提交到集群 ~~~ package com.learnstorm; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; public class StormTopologyDriver { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { //1. 描述任務 TopologyBuilder topologyBuilder = new TopologyBuilder(); //任務的名字自己定義 topologyBuilder.setSpout("mySpout", new MyLocalFileSpout()); //shuffleGrouping和前一個任務關聯.shuffleGrouping可以連接多個任務 topologyBuilder.setBolt("bolt1", new MySplitBolt()).shuffleGrouping("mySpout"); topologyBuilder.setBolt("bolt2", new MyWordCountAndPrintBolt()).shuffleGrouping("bolt1"); //2. 任務提交 //提交給誰?提交什么內容? Config config = new Config(); //Config類實際上是繼承HashMap //設置在幾個work上運行,也就是在幾個jvm中運行,如果不指定,默認是在一個work中 // config.setNumWorkers(2); StormTopology stormTopology = topologyBuilder.createTopology(); //本地模式 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("wordCount", config, stormTopology); //這種是集群模式 // StormSubmitter.submitTopology("worldCount1", config, stormTopology); } } ~~~ # 提交到集群 如果是提交到集群上面,那么storm的storm-core的作用域就要改下 ~~~ <scope>provided</scope> ~~~ 表示集群上提供了這個jar包, 然后maven對項目打包,上傳到服務器上,執行 ~~~ storm jar 上傳的jar包 主類名稱 ~~~ 然后我們在ui界面上看 ![](https://box.kancloud.cn/ec4feb3f0674f1f5c70c99c9dfcb2d6f_285x249.png) 點進去看 ![](https://box.kancloud.cn/b72f894cd9257f6ffd5b815a4704d708_227x236.png) 統計信息打印在bolt2上 查看下bolt2在那臺機器上 ![](https://box.kancloud.cn/ff5141f32b55d6c040beda3a9c97ae50_318x99.png) hadoop03的6700端口上 到這臺機器上的storm的logs目錄 目錄下有個worker-6700.log查看下這個日志
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看