<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                [TOC] # 數據采集 ~~~ package com.jdxia.ack; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import java.util.List; import java.util.Map; import java.util.UUID; public class AckSpout extends BaseRichSpout { private SpoutOutputCollector collector; //初始化方法 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } //循環調用,每調用一次就發送一條消息 @Override public void nextTuple() { //隨機生產一條數據 String uuid = UUID.randomUUID().toString().replace("_", " "); collector.emit(new Values(uuid), new Values(uuid)); try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } //定義發送的字段 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("uuid")); } @Override public void ack(Object msgId) { System.out.println("消息處理成功" + msgId); } @Override public void fail(Object msgId) { System.out.println("消息處理失敗" + msgId); //重新發送消息 collector.emit((List) msgId, msgId); } } ~~~ # 數據處理 1. ~~~ package com.jdxia.ack; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class Bolt1 extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { //輸出數據 collector.emit(new Values(input.getString(0))); System.out.println("Bolt1輸出消息"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("uuid")); } } ~~~ 2. ~~~ package com.jdxia.ack; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.FailedException; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; public class Bolt2 extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { //如果需要拋出異常,成功就不要拋出異常 throw new FailedException("異常"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } ~~~ # 任務編排 ~~~ package com.jdxia.ack; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; public class AckTopologyDriver { public static void main(String[] args) { //1. 準備任務信息 TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("mySpout", new AckSpout(), 1); topologyBuilder.setBolt("bolt1", new Bolt1(), 1).shuffleGrouping("mySpout"); topologyBuilder.setBolt("bolt2", new Bolt2(), 1).shuffleGrouping("bolt1"); //2. 任務提交 Config config = new Config(); StormTopology stormTopology = topologyBuilder.createTopology(); //本地模式 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("ack", config, stormTopology); } } ~~~
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看