<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                [TOC] # 數據獲取 ~~~ package com.jdxia.storm; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class MySplitBolt extends BaseBasicBolt { //處理函數 @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { //1. 數據用tuple獲取 //和kafka接入,這邊的名字就變為bytes了 byte[] juzi = (byte[]) tuple.getValueByField("bytes"); //2. 進行切割 String[] strings = new String(juzi).split(" "); //3. 發送數據 for (String string : strings) { basicOutputCollector.emit(new Values(string, 1)); } } //定義下我的輸出 @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word", "num")); } } ~~~ # 數據計算 ~~~ package com.jdxia.storm; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; import redis.clients.jedis.Jedis; import java.util.HashMap; import java.util.Map; public class MyWordCountAndPrintBolt extends BaseBasicBolt { private Map<String, String> wordCountMap = new HashMap<String, String>(); private Jedis jedis; //初始化連接redis @Override public void prepare(Map stormConf, TopologyContext context) { //建立redis連接 jedis = new Jedis("0.0.0.0", 6379); jedis.auth("root"); //調用本來的方法 super.prepare(stormConf, context); } //處理函數 @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { //根據之前定義的word和num //強轉為string String word = (String) tuple.getValueByField("word"); Integer num = (Integer) tuple.getValueByField("num"); //1.查看單詞對應的value是否存在 Integer integer = wordCountMap.get(word) == null ? 0 : Integer.parseInt(wordCountMap.get(word)); if (integer == 0) { //如果不存在就直接放入新的 wordCountMap.put(word, num + ""); } else { //如果之前已經有了,就把對應統計加上 wordCountMap.put(word, (integer + num) + ""); } //保存數據到redis // redis key wordCount:->Map jedis.hmset("wordCount",wordCountMap); } //不需要定義輸出字段了 @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } } ~~~ # 任務提交 ~~~ package com.jdxia.storm; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.ZkHosts; public class StormTopologyDriver { public static void main(String[] args) { //1. 描述任務 TopologyBuilder topologyBuilder = new TopologyBuilder(); //任務的名字自己定義 //kafka中第一個參數寫broker對應的zk,第二個寫topic,第三個寫zk的節點,第四個寫id //參數3:zkRoot將offset值存放在zk的哪里 //參數4:zk的子目錄,防止被覆蓋和其他人沖突 topologyBuilder.setSpout("kafkaSpout", new KafkaSpout(new SpoutConfig(new ZkHosts("master:2181"), "wordCount", "/wc", "wc"))); //shuffleGrouping和前一個任務關聯.shuffleGrouping可以連接多個任務 topologyBuilder.setBolt("bolt1", new MySplitBolt()).shuffleGrouping("kafkaSpout"); topologyBuilder.setBolt("bolt2", new MyWordCountAndPrintBolt()).shuffleGrouping("bolt1"); //2. 任務提交 //提交給誰?提交什么內容? Config config = new Config(); //Config類實際上是繼承HashMap //設置在幾個work上運行,也就是在幾個jvm中運行,如果不指定,默認是在一個work中 // config.setNumWorkers(2); StormTopology stormTopology = topologyBuilder.createTopology(); //本地模式 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("wordCount", config, stormTopology); //這種是集群模式 // StormSubmitter.submitTopology("worldCount1", config, stormTopology); } } ~~~ # 測試 我們創建對應的topic,然后往topic寫入數據,數據用空格分開
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看