<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                [TOC=3] ## 準備開始 在本章,我們要創建一個Storm工程和我們的第一個Storm拓撲結構。 > **NOTE**: 下面假設你的JRE版本在1.6以上。我們推薦Oracle提供的JRE。你可以到[http://www.java.com/downloads/](http://www.java.com/downloads/)下載。 ## 操作模式 開始之前,有必要了解一下Storm的操作模式。有下面兩種方式。 ### 本地模式 在本地模式下,Storm拓撲結構運行在本地計算機的單一JVM進程上。這個模式用于開發、測試以及調試,因為這是觀察所有組件如何協同工作的最簡單方法。在這種模式下,我們可以調整參數,觀察我們的拓撲結構如何在不同的Storm配置環境下運行。要在本地模式下運行,我們要下載Storm開發依賴,以便用來開發并測試我們的拓撲結構。我們創建了第一個Storm工程以后,很快就會明白如何使用本地模式了。 >[info] **NOTE**: 在本地模式下,跟在集群環境運行很像。不過很有必要確認一下所有組件都是線程安全的,因為當把它們部署到遠程模式時它們可能會運行在不同的JVM進程甚至不同的物理機上,這個時候它們之間沒有直接的通訊或共享內存。 我們要在本地模式運行本章的所有例子。 ### 遠程模式 在遠程模式下,我們向Storm集群提交拓撲,它通常由許多運行在不同機器上的流程組成。遠程模式不會出現調試信息, 因此它也稱作生產模式。不過在單一開發機上建立一個Storm集群是一個好主意,可以在部署到生產環境之前,用來確認拓撲在集群環境下沒有任何問題。 你將在[第六章](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter6/A%20RealLife%20Example.md)學到更多關于遠程模式的內容,并在[附錄B](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/appendix/B.md)學到如何安裝一個Storm集群。 ### Hello World 我們在這個工程里創建一個簡單的拓撲,數單詞數量。我們可以把這個看作Storm的“Hello World”。不過,這是一個非常強大的拓撲,因為它能夠擴展到幾乎無限大的規模,而且只需要做一些小修改,就能用它構建一個統計系統。舉個例子,我們可以修改一下工程用來找出Twitter上的熱點話題。 要創建這個拓撲,我們要用一個*spout*讀取文本,第一個*bolt*用來標準化單詞,第二個*bolt*為單詞計數,如圖2-1所示。 [![](https://box.kancloud.cn/2015-09-20_55feacde7265d.png)](http://ifeve.com/storm%e5%85%a5%e9%97%a8-%e7%ac%ac%e4%ba%8c%e7%ab%a0%e5%87%86%e5%a4%87%e5%bc%80%e5%a7%8b/figure-2-1-getting-started-topology/) 你可以從這個網址下載源碼壓縮包,[?https://github.com/storm-book/examples-ch02-getting_started/zipball/master](https://github.com/storm-book/examples-ch02-getting_started/zipball/master)。 **NOTE**: 如果你使用[git](http://git-scm.com/)(一個分布式版本控制與源碼管理工具),你可以執行git clone?[git@github.com](mailto:git@github.com):storm-book/examples-ch02-getting_started.git,把源碼檢出到你指定的目錄。 ### Java安裝檢查 構建Storm運行環境的第一步是檢查你安裝的Java版本。打開一個控制臺窗口并執行命令:java -version。控制臺應該會顯示出類似如下的內容: ~~~ java -version java version "1.6.0_26" Java(TM) SE Runtime Enviroment (build 1.6.0_26-b03) Java HotSpot(TM) Server VM (build 20.1-b02, mixed mode) ~~~ 如果不是上述內容,檢查你的Java安裝情況。(參考[http://www.java.com/download/](http://www.java.com/download/)) ### 創建工程 開始之前,先為這個應用建一個目錄(就像你平常為Java應用做的那樣)。這個目錄用來存放工程源碼。 接下來我們要下載Storm依賴包,這是一些jar包,我們要把它們添加到應用類路徑中。你可以采用如下兩種方式之一完成這一步: * 下載所有依賴,解壓縮它們,把它 們添加到類路徑 * 使用*[Apache Maven](http://maven.apache.org/)* **NOTE**: Maven是一個軟件項目管理的綜合工具。它可以用來管理項目的開發周期的許多方面,從包依賴到版本發布過程。在這本書中,我們將廣泛使用它。如果要檢查是否已經安裝了maven,在命令行運行mvn。如果沒有安裝你可以從[http://maven.apache.org/download.html](http://maven.apache.org/download.html)下載。 沒有必要先成為一個Maven專家才能使用Storm,不過了解一下關于Maven工作方式的基礎知識仍然會對你有所幫助。你可以在Apache Maven的網站上找到更多的信息([http://maven.apache.org/](http://maven.apache.org/))。 **NOTE:**?Storm的Maven依賴引用了運行Storm本地模式的所有庫。 要運行我們的拓撲,我們可以編寫一個包含基本組件的pom.xml文件。 ~~~ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>storm.book</groupId> <artifactId>Getting-Started</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.6</source> <target>1.6</target> <compilerVersion>1.6</compilerVersion> </configuration> </plugin> </plugins> </build> <repositories> <!-- Repository where we can found the storm dependencies --> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> </repositories> <dependencies> <!-- Storm Dependency --> <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.6.0</version> </dependency> </dependencies> </project> ~~~ 開頭幾行指定了工程名稱和版本號。然后我們添加了一個編譯器插件,告知Maven我們的代碼要用Java1.6編譯。接下來我們定義了Maven倉庫(Maven支持為同一個工程指定多個倉庫)。clojars是存放Storm依賴的倉庫。Maven會為運行本地模式自動下載必要的所有子包依賴。 一個典型的Maven Java工程會擁有如下結構: ~~~ 我們的應用目錄/ ├── pom.xml └── src └── main └── java | ├── spouts | └── bolts └── resources ~~~ java目錄下的子目錄包含我們的代碼,我們把要統計單詞數的文件保存在resource目錄下。 **NOTE**:命令mkdir -p 會創建所有需要的父目錄。 ### 創建我們的第一個Topology 我們將為運行單詞計數創建所有必要的類。可能這個例子中的某些部分,現在無法講的很清楚,不過我們會在隨后的章節做進一步的講解。 ### Spout *pout*?WordReader類實現了IRichSpout接口。我們將在[第四章](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter4/Spouts.md)看到更多細節。WordReader負責從文件按行讀取文本,并把文本行提供給第一個*bolt*。 **NOTE:**?一個*spout*發布一個定義域列表。這個架構允許你使用不同的*bolts*從同一個*spout*流讀取數據,它們的輸出也可作為其它*bolts*的定義域,以此類推。 例2-1包含WordRead類的完整代碼(我們將會分析下述代碼的每一部分)。 ~~~ /** * 例2-1.src/main/java/spouts/WordReader.java */ package spouts; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class WordReader implements IRichSpout { private SpoutOutputCollector collector; private FileReader fileReader; private boolean completed = false; private TopologyContext context; public boolean isDistributed() {return false;} public void ack(Object msgId) { System.out.println("OK:"+msgId); } public void close() {} public void fail(Object msgId) { System.out.println("FAIL:"+msgId); } /** * 這個方法做的惟一一件事情就是分發文件中的文本行 */ public void nextTuple() { /** * 這個方法會不斷的被調用,直到整個文件都讀完了,我們將等待并返回。 */ if(completed){ try { Thread.sleep(1000); } catch (InterruptedException e) { //什么也不做 } return; } String str; //創建reader BufferedReader reader = new BufferedReader(fileReader); try{ //讀所有文本行 while((str = reader.readLine()) != null){ /** * 按行發布一個新值 */ this.collector.emit(new Values(str),str); } }catch(Exception e){ throw new RuntimeException("Error reading tuple",e); }finally{ completed = true; } } /** * 我們將創建一個文件并維持一個collector對象 */ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.context = context; this.fileReader = new FileReader(conf.get("wordsFile").toString()); } catch (FileNotFoundException e) { throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]"); } this.collector = collector; } /** * 聲明輸入域"word" */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line")); } } ~~~ 第一個被調用的*spout*方法都是**public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)**。它接收如下參數:配置對象,在定義topology對象是創建;TopologyContext對象,包含所有拓撲數據;還有SpoutOutputCollector對象,它能讓我們發布交給*bolts*處理的數據。下面的代碼主是這個方法的實現。 ~~~ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.context = context; this.fileReader = new FileReader(conf.get("wordsFile").toString()); } catch (FileNotFoundException e) { throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]"); } this.collector = collector; } ~~~ 我們在這個方法里創建了一個FileReader對象,用來讀取文件。接下來我們要實現**public void nextTuple()**,我們要通過它向*bolts*發布待處理的數據。在這個例子里,這個方法要讀取文件并逐行發布數據。 ~~~ public void nextTuple() { if(completed){ try { Thread.sleep(1); } catch (InterruptedException e) { //什么也不做 } return; } String str; BufferedReader reader = new BufferedReader(fileReader); try{ while((str = reader.readLine()) != null){ this.collector.emit(new Values(str)); } }catch(Exception e){ throw new RuntimeException("Error reading tuple",e); }finally{ completed = true; } } ~~~ **NOTE:**?Values是一個ArrarList實現,它的元素就是傳入構造器的參數。 **nextTuple()**會在同一個循環內被**ack()**和**fail()**周期性的調用。沒有任務時它必須釋放對線程的控制,其它方法才有機會得以執行。因此nextTuple的第一行就要檢查是否已處理完成。如果完成了,為了降低處理器負載,會在返回前休眠一毫秒。如果任務完成了,文件中的每一行都已被讀出并分發了。 **NOTE:**元組(tuple)是一個具名值列表,它可以是任意java對象(只要它是可序列化的)。默認情況,Storm會序列化字符串、字節數組、ArrayList、HashMap和HashSet等類型。 ### Bolts 現在我們有了一個*spout*,用來按行讀取文件并每行發布一個*元組*,還要創建兩個*bolts*,用來處理它們(看圖2-1)。*bolts*實現了接口**backtype.storm.topology.IRichBolt**。 *bolt*最重要的方法是**void execute(Tuple input)**,每次接收到元組時都會被調用一次,還會再發布若干個元組。 **NOTE:**?只要必要,*bolt*或*spout*會發布若干元組。當調用**nextTuple**或**execute**方法時,它們可能會發布0個、1個或許多個元組。你將在[第五章](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter5/Bolts.md)學習更多這方面的內容。 第一個*bolt*,**WordNormalizer**,負責得到并標準化每行文本。它把文本行切分成單詞,大寫轉化成小寫,去掉頭尾空白符。 首先我們要聲明*bolt*的出參: ~~~ public void declareOutputFields(OutputFieldsDeclarer declarer){ declarer.declare(new Fields("word")); } ~~~ 這里我們聲明*bolt*將發布一個名為“word”的域。 下一步我們實現**public void execute(Tuple input)**,處理傳入的元組: ~~~ public void execute(Tuple input){ String sentence=input.getString(0); String[] words=sentence.split(" "); for(String word : words){ word=word.trim(); if(!word.isEmpty()){ word=word.toLowerCase(); //發布這個單詞 collector.emit(new Values(word)); } } //對元組做出應答 collector.ack(input); } ~~~ 第一行從元組讀取值。值可以按位置或名稱讀取。接下來值被處理并用collector對象發布。最后,每次都調用collector對象的**ack()**方法確認已成功處理了一個元組。 例2-2是這個類的完整代碼。 ~~~ //例2-2 src/main/java/bolts/WordNormalizer.java package bolts; import java.util.ArrayList; import java.util.List; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class WordNormalizer implements IRichBolt{ private OutputCollector collector; public void cleanup(){} /** * *bolt*從單詞文件接收到文本行,并標準化它。 * 文本行會全部轉化成小寫,并切分它,從中得到所有單詞。 */ public void execute(Tuple input){ String sentence = input.getString(0); String[] words = sentence.split(" "); for(String word : words){ word = word.trim(); if(!word.isEmpty()){ word=word.toLowerCase(); //發布這個單詞 List a = new ArrayList(); a.add(input); collector.emit(a,new Values(word)); } } //對元組做出應答 collector.ack(input); } public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector=collector; } /** * 這個*bolt*只會發布“word”域 */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } ~~~ **NOTE:**通過這個例子,我們了解了在一次**execute**調用中發布多個元組。如果這個方法在一次調用中接收到句子“This is the Storm book”,它將會發布五個元組。 下一個*bolt*,**WordCounter**,負責為單詞計數。這個拓撲結束時(**cleanup()**方法被調用時),我們將顯示每個單詞的數量。 **NOTE**: 這個例子的*bolt*什么也沒發布,它把數據保存在map里,但是在真實的場景中可以把數據保存到數據庫。 ~~~ package bolts; import java.util.HashMap; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class WordCounter implements IRichBolt{ Integer id; String name; Map<String,Integer> counters; private OutputCollector collector; /** * 這個spout結束時(集群關閉的時候),我們會顯示單詞數量 */ @Override public void cleanup(){ System.out.println("-- 單詞數 【"+name+"-"+id+"】 --"); for(Map.Entry<String,Integer> entry : counters.entrySet()){ System.out.println(entry.getKey()+": "+entry.getValue()); } } /** * 為每個單詞計數 */ @Override public void execute(Tuple input) { String str=input.getString(0); /** * 如果單詞尚不存在于map,我們就創建一個,如果已在,我們就為它加1 */ if(!counters.containsKey(str)){ conters.put(str,1); }else{ Integer c = counters.get(str) + 1; counters.put(str,c); } //對元組作為應答 collector.ack(input); } /** * 初始化 */ @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector){ this.counters = new HashMap<String, Integer>(); this.collector = collector; this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) {} } ~~~ execute方法使用一個map收集單詞并計數。拓撲結束時,將調用**clearup()**方法打印計數器map。(雖然這只是一個例子,但是通常情況下,當拓撲關閉時,你應當使用**cleanup()**方法關閉活動的連接和其它資源。) ### 主類 你可以在主類中創建拓撲和一個本地集群對象,以便于在本地測試和調試。**LocalCluster**可以通過**Config**對象,讓你嘗試不同的集群配置。比如,當使用不同數量的工作進程測試你的拓撲時,如果不小心使用了某個全局變量或類變量,你就能夠發現錯誤。(更多內容請見[第三章](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter3/Topologies.md)) **NOTE:**所有拓撲節點的各個進程必須能夠獨立運行,而不依賴共享數據(也就是沒有全局變量或類變量),因為當拓撲運行在真實的集群環境時,這些進程可能會運行在不同的機器上。 接下來,**TopologyBuilder**將用來創建拓撲,它決定Storm如何安排各節點,以及它們交換數據的方式。 ~~~ TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word-reader", new WordReader()); builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader"); builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer"); ~~~ 在*spout*和*bolts*之間通過**shuffleGrouping**方法連接。這種分組方式決定了Storm會以隨機分配方式從源節點向目標節點發送消息。 下一步,創建一個包含拓撲配置的**Config**對象,它會在運行時與集群配置合并,并通過**prepare**方法發送給所有節點。 ~~~ Config conf = new Config(); conf.put("wordsFile", args[0]); conf.setDebug(true); ~~~ 由*spout*讀取的文件的文件名,賦值給**wordFile**屬性。由于是在開發階段,設置**debug**屬性為**true**,Strom會打印節點間交換的所有消息,以及其它有助于理解拓撲運行方式的調試數據。 正如之前講過的,你要用一個**LocalCluster**對象運行這個拓撲。在生產環境中,拓撲會持續運行,不過對于這個例子而言,你只要運行它幾秒鐘就能看到結果。 ~~~ LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology()); Thread.sleep(2000); cluster.shutdown(); ~~~ 調用**createTopology**和**submitTopology**,運行拓撲,休眠兩秒鐘(拓撲在另外的線程運行),然后關閉集群。 例2-3是完整的代碼 ~~~ //例2-3 src/main/java/TopologyMain.java import spouts.WordReader; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import bolts.WordCounter; import bolts.WordNormalizer; public class TopologyMain { public static void main(String[] args) throws InterruptedException { //定義拓撲 TopologyBuilder builder = new TopologyBuilder()); builder.setSpout("word-reader", new WordReader()); builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader"); builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word")); //配置 Config conf = new Config(); conf.put("wordsFile", args[0]); conf.setDebug(false); //運行拓撲 conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology(); Thread.sleep(1000); cluster.shutdown(); } } ~~~ [**觀察運行情況**](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter2/Hello%20World%20Storm.md#%E8%A7%82%E5%AF%9F%E8%BF%90%E8%A1%8C%E6%83%85%E5%86%B5) 你已經為運行你的第一個拓撲準備好了。在這個目錄下面創建一個文件,**/src/main/resources/words.txt**,一個單詞一行,然后用下面的命令運行這個拓撲:**mvn exec:java -Dexec.mainClass=”TopologyMain” -Dexec.args=”src/main/resources/words.txt**。 舉個例子,如果你的*words.txt*文件有如下內容:?**Storm test are great is an Storm simple application but very powerful really Storm is great**?你應該會在日志中看到類似下面的內容:?**is: 2 application: 1 but: 1 great: 1 test: 1 simple: 1 Storm: 3 really: 1 are: 1 great: 1 an: 1 powerful: 1 very: 1**?在這個例子中,每類節點只有一個實例。但是如果你有一個非常大的日志文件呢?你能夠很輕松的改變系統中的節點數量實現并行工作。這個時候,你就要創建兩個**WordCounter**實例。 ~~~ builder.setBolt("word-counter", new WordCounter(),2).shuffleGrouping("word-normalizer"); ~~~ 程序返回時,你將看到:?**— 單詞數 【word-counter-2】 — application: 1 is: 1 great: 1 are: 1 powerful: 1 Storm: 3 — 單詞數 [word-counter-3] — really: 1 is: 1 but: 1 great: 1 test: 1 simple: 1 an: 1 very: 1**?棒極了!修改并行度實在是太容易了(當然對于實際情況來說,每個實例都會運行在單獨的機器上)。不過似乎有一個問題:單詞*is*和*great*分別在每個**WordCounter**各計數一次。怎么會這樣?當你調用**shuffleGrouping**時,就決定了Storm會以隨機分配的方式向你的*bolt*實例發送消息。在這個例子中,理想的做法是相同的單詞問題發送給同一個**WordCounter**實例。你把**shuffleGrouping(“word-normalizer”)**換成**fieldsGrouping(“word-normalizer”, new Fields(“word”))**就能達到目的。試一試,重新運行程序,確認結果。 你將在后續章節學習更多分組方式和消息流類型。 ## 結論 我們已經討論了Storm的本地和遠程操作模式之間的不同,以及Storm的強大和易于開發的特性。你也學習了一些Storm的基本概念,我們將在后續章節深入講解它們。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看