<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                ## **數據流組** 設計一個拓撲時,你要做的最重要的事情之一就是定義如何在各組件之間交換數據(數據流是如何被*bolts*消費的)。一個*數據流組*指定了每個*bolt*會消費哪些數據流,以及如何消費它們。 **NOTE**:一個節點能夠發布一個以上的數據流,一個數據流組允許我們選擇接收哪個。 數據流組在定義拓撲時設置,就像我們在[第二章](http://ifeve.com/getting-started-with-storm-2/#more-10677)看到的: ~~~ ··· builder.setBolt("word-normalizer", new WordNormalizer()) .shuffleGrouping("word-reader"); ··· ~~~ 在前面的代碼塊里,一個*bolt*由**TopologyBuilder**對象設定, 然后使用隨機數據流組指定數據源。數據流組通常將數據源組件的ID作為參數,取決于數據流組的類型不同還有其它可選參數。 **NOTE:**每個**InputDeclarer**可以有一個以上的數據源,而且每個數據源可以分到不同的組。 ### [](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter3/%E6%8B%93%E6%89%91.md#%E9%9A%8F%E6%9C%BA%E6%95%B0%E6%8D%AE%E6%B5%81%E7%BB%84)**隨機數據流組** 隨機流組是最常用的數據流組。它只有一個參數(數據源組件),并且數據源會向隨機選擇的*bolt*發送元組,保證每個消費者收到近似數量的元組。 隨機數據流組用于數學計算這樣的原子操作。然而,如果操作不能被隨機分配,就像[第二章](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter2/Getting%20Started.md)為單詞計數的例子,你就要考慮其它分組方式了。 ### [](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter3/%E6%8B%93%E6%89%91.md#%E5%9F%9F%E6%95%B0%E6%8D%AE%E6%B5%81%E7%BB%84)**域數據流組** 域數據流組允許你基于元組的一個或多個域控制如何把元組發送給*bolts*。它保證擁有相同域組合的值集發送給同一個*bolt*。回到單詞計數器的例子,如果你用*word*域為數據流分組,**word-normalizer**?*bolt*將只會把相同單詞的元組發送給同一個**word-counter***bolt*實例。 ~~~ ··· builder.setBolt("word-counter", new WordCounter(),2) .fieldsGrouping("word-normalizer", new Fields("word")); ··· ~~~ **NOTE:**?在域數據流組中的所有域集合必須存在于數據源的域聲明中。 ### [](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter3/%E6%8B%93%E6%89%91.md#%E5%85%A8%E9%83%A8%E6%95%B0%E6%8D%AE%E6%B5%81%E7%BB%84)**全部數據流組** 全部數據流組,為每個接收數據的實例復制一份元組副本。這種分組方式用于向*bolts*發送信號。比如,你要刷新緩存,你可以向所有的*bolts*發送一個*刷新緩存信號*。在單詞計數器的例子里,你可以使用一個全部數據流組,添加清除計數器緩存的功能(見[拓撲示例](https://github.com/storm-book/examples-ch03-topologies)) ~~~ public void execute(Tuple input) { String str = null; try{ if(input.getSourceStreamId().equals("signals")){ str = input.getStringByField("action"); if("refreshCache".equals(str)) counters.clear(); } }catch (IllegalArgumentException e){ //什么也不做 } ··· } ~~~ 我們添加了一個if分支,用來檢查源數據流。Storm允許我們聲明具名數據流(如果你不把元組發送到一個具名數據流,默認發送到名為”**default**“的數據流)。這是一個識別元組的極好的方式,就像這個例子中,我們想識別**signals**一樣。 在拓撲定義中,你要向**word-counter**?*bolt*添加第二個數據流,用來接收從**signals-spout**數據流發送到所有*bolt*實例的每一個元組。 ~~~ builder.setBolt("word-counter", new WordCounter(),2) .fieldsGroupint("word-normalizer",new Fields("word")) .allGrouping("signals-spout","signals"); ~~~ **signals-spout**的實現請參考[git倉庫](https://github.com/storm-book/examples-ch03-topologies)。 ### [](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter3/%E6%8B%93%E6%89%91.md#%E8%87%AA%E5%AE%9A%E4%B9%89%E6%95%B0%E6%8D%AE%E6%B5%81%E7%BB%84)**自定義數據流組** 你可以通過實現**backtype.storm.grouping.CustormStreamGrouping**接口創建自定義數據流組,讓你自己決定哪些*bolt*接收哪些元組。 讓我們修改單詞計數器示例,使首字母相同的單詞由同一個*bolt*接收。 ~~~ public class ModuleGrouping mplents CustormStreamGrouping, Serializable{ int numTasks = 0; @Override public List<Integer> chooseTasks(List<Object> values) { List<Integer> boltIds = new ArrayList<Integer>(); if(values.size()>0){ String str = values.get(0).toString(); if(str.isEmpty()){ boltIds.add(0); }else{ boltIds.add(str.charAt(0) % numTasks); } } return boltIds; } @Override public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) { numTasks = targetTasks.size(); } } ~~~ 這是一個**CustomStreamGrouping**的簡單實現,在這里我們采用單詞首字母字符的整數值與任務數的余數,決定接收元組的*bolt*。 按下述方式**word-normalizer**修改即可使用這個自定義數據流組。 ~~~ builder.setBolt("word-normalizer", new WordNormalizer()) .customGrouping("word-reader", new ModuleGrouping()); ~~~ ### [](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter3/%E6%8B%93%E6%89%91.md#%E7%9B%B4%E6%8E%A5%E6%95%B0%E6%8D%AE%E6%B5%81%E7%BB%84)**直接數據流組** 這是一個特殊的數據流組,數據源可以用它決定哪個組件接收元組。與前面的例子類似,數據源將根據單詞首字母決定由哪個*bolt*接收元組。要使用直接數據流組,在**WordNormalizer**?*bolt*中,使用**emitDirect**方法代替**emit**。 ~~~ public void execute(Tuple input) { ... for(String word : words){ if(!word.isEmpty()){ ... collector.emitDirect(getWordCountIndex(word),new Values(word)); } } //對元組做出應答 collector.ack(input); } public Integer getWordCountIndex(String word) { word = word.trim().toUpperCase(); if(word.isEmpty()){ return 0; }else{ return word.charAt(0) % numCounterTasks; } } ~~~ 在**prepare**方法中計算任務數 ~~~ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.numCounterTasks = context.getComponentTasks("word-counter"); } ~~~ 在拓撲定義中指定數據流將被直接分組: ~~~ builder.setBolt("word-counter", new WordCounter(),2) .directGrouping("word-normalizer"); ~~~ ### [](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter3/%E6%8B%93%E6%89%91.md#%E5%85%A8%E5%B1%80%E6%95%B0%E6%8D%AE%E6%B5%81%E7%BB%84)**全局數據流組** 全局數據流組把所有數據源創建的元組發送給單一目標實例(即擁有最低ID的任務)。 ### [](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter3/%E6%8B%93%E6%89%91.md#%E4%B8%8D%E5%88%86%E7%BB%84)**不分組** 寫作本書時(Stom0.7.1版),這個數據流組相當于隨機數據流組。也就是說,使用這個數據流組時,并不關心數據流是如何分組的。 ### [](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter3/%E6%8B%93%E6%89%91.md#localcluster-vs-stormsubmitter)**LocalCluster VS StormSubmitter** 到目前為止,你已經用一個叫做**LocalCluster**的工具在你的本地機器上運行了一個拓撲。Storm的基礎工具,使你能夠在自己的計算機上方便的運行和調試不同的拓撲。但是你怎么把自己的拓撲提交給運行中的Storm集群呢?Storm有一個有趣的功能,在一個真實的集群上運行自己的拓撲是很容易的事情。要實現這一點,你需要把**LocalCluster**換成**StormSubmitter**并實現**submitTopology**方法, 它負責把拓撲發送給集群。 下面是修改后的代碼: ~~~ //LocalCluster cluster = new LocalCluster(); //cluster.submitTopology("Count-Word-Topology-With-Refresh-Cache", conf, //builder.createTopology()); StormSubmitter.submitTopology("Count-Word-Topology-With_Refresh-Cache", conf, builder.createTopology()); //Thread.sleep(1000); //cluster.shutdown(); ~~~ **NOTE:**?當你使用**StormSubmitter**時,你就不能像使用**LocalCluster**時一樣通過代碼控制集群了。 接下來,把源碼壓縮成一個jar包,運行Storm客戶端命令,把拓撲提交給集群。如果你已經使用了Maven, 你只需要在命令行進入源碼目錄運行:**mvn package**。 現在你生成了一個jar包,使用**storm jar**命令提交拓撲(關于如何安裝Storm客戶端請參考[附錄A](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/appendix/A.md))。命令格式:**storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3**。 對于這個例子,在拓撲工程目錄下面運行: ~~~ storm jar target/Topologies-0.0.1-SNAPSHOT.jar countword.TopologyMain src/main/resources/words.txt ~~~ 通過這些命令,你就把拓撲發布集群上了。 如果想停止或殺死它,運行: ~~~ storm kill Count-Word-Topology-With-Refresh-Cache ~~~ **NOTE:**拓撲名稱必須保證惟一性。 **NOTE:**如何安裝Storm客戶端,參考[附錄A](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/appendix/A.md) ### **DRPC拓撲** 有一種特殊的拓撲類型叫做分布式遠程過程調用(DRPC),它利用Storm的分布式特性執行遠程過程調用(RPC)(見下圖)。Storm提供了一些用來實現DRPC的工具。第一個是DRPC服務器,它就像是客戶端和Storm拓撲之間的連接器,作為拓撲的*spout*的數據源。它接收一個待執行的函數和函數參數,然后對于函數操作的每一個數據塊,這個服務器都會通過拓撲分配一個請求ID用來識別RPC請求。拓撲執行最后的*bolt*時,它必須分配RPC請求ID和結果,使DRPC服務器把結果返回正確的客戶端。 [![](https://box.kancloud.cn/2015-09-21_55ffed6591b64.png)](http://ifeve.com/getting-started-with-storm-3/figure3-1-drpc-topology-schema/) **NOTE:**單實例DRPC服務器能夠執行許多函數。每個函數由一個惟一的名稱標識。 Storm提供的第二個工具(已在例子中用過)是**LineDRPCTopologyBuilder**,一個輔助構建DRPC拓撲的抽象概念。生成的拓撲創建**DRPCSpouts**——它連接到DRPC服務器并向拓撲的其它部分分發數據——并包裝*bolts*,使結果從最后一個*bolt*返回。依次執行所有添加到**LinearDRPCTopologyBuilder**對象的*bolts*。 作為這種類型的拓撲的一個例子,我們創建了一個執行加法運算的進程。雖然這是一個簡單的例子,但是這個概念可以擴展到復雜的分布式計算。 *bolt*按下面的方式聲明輸出: ~~~ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id","result")); } ~~~ 因為這是拓撲中惟一的*bolt*,它必須發布RPC ID和結果。**execute**方法負責執行加法運算。 ~~~ public void execute(Tuple input) { String[] numbers = input.getString(1).split("\\+"); Integer added = 0; if(numbers.length<2){ throw new InvalidParameterException("Should be at least 2 numbers"); } for(String num : numbers){ added += Integer.parseInt(num); } collector.emit(new Values(input.getValue(0),added)); } ~~~ 包含加法*bolt*的拓撲定義如下: ~~~ public static void main(String[] args) { LocalDRPC drpc = new LocalDRPC(); LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add"); builder.addBolt(AdderBolt(),2); Config conf = new Config(); conf.setDebug(true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("drpcder-topology", conf, builder.createLocalTopology(drpc)); String result = drpc.execute("add", "1+-1"); checkResult(result,0); result = drpc.execute("add", "1+1+5+10"); checkResult(result,17); cluster.shutdown(); drpc.shutdown(); } ~~~ 創建一個**LocalDRPC**對象在本地運行DRPC服務器。接下來,創建一個拓撲構建器(譯者注:LineDRpctopologyBuilder對象),把*bolt*添加到拓撲。運行DRPC對象(LocalDRPC對象)的**execute**方法測試拓撲。 **NOTE:**使用**DRPCClient**類連接遠程DRPC服務器。DRPC服務器暴露了[Thrift API](http://thrift.apache.org/),因此可以跨語言編程;并且不論是在本地還是在遠程運行DRPC服務器,它們的API都是相同的。 對于采用Storm配置的DRPC配置參數的Storm集群,調用構建器對象的**createRemoteTopology**向Storm集群提交一個拓撲,而不是調用**createLocalTopology**。 **原創文章,轉載請注明:**?轉載自[并發編程網 – ifeve.com](http://ifeve.com/) **本文鏈接地址:**?[Storm入門之第三章拓撲](http://ifeve.com/getting-started-with-storm-3/)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看