<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ThinkChat2.0新版上線,更智能更精彩,支持會話、畫圖、視頻、閱讀、搜索等,送10W Token,即刻開啟你的AI之旅 廣告
                # Storm and Kestrel 本頁介紹如何使用Storm消費來自Kestrel集群的項目。 ## 準備階段 ### Storm 本教程使用的示例來自于 [storm-kestrel](https://github.com/nathanmarz/storm-kestrel) 項目和 [storm-starter](http://github.com/apache/storm/blob//examples/storm-starter) 項目。建議你克隆這些項目并根著示例走。 閱讀 [Setting up development environment](Setting-up-development-environment.html) 和 [Creating a new Storm project](Creating-a-new-Storm-project.html) 來設置你的機器。 ### Kestrel 它假設您可以在本地選擇Kestrel 服務器,如上所述 [here](https://github.com/nathanmarz/storm-kestrel). ## Kestrel 服務 和 隊列 單個kestrel服務器具有一組隊列。Kestrel 隊列是在JVM上運行的非常簡單的消息隊列,并使用memcache協議(具有一定的擴展名)與客戶端進行通信。對于更加詳細的信息,你可查看 [KestrelThriftClient](https://github.com/nathanmarz/storm-kestrel/blob/master/src/jvm/org/apache/storm/spout/KestrelThriftClient.java) 類里面提供的 [storm-kestrel](https://github.com/nathanmarz/storm-kestrel) 項目. 每個隊列按照FIFO(先進先出)的原則進行嚴格排序。跟隨性能項目緩存在系統內存中;但是,只有前128MB保存在內存中。當服務器停止時,隊列狀態存儲在日志文件中。 此外,還可以從 [here](https://github.com/nathanmarz/kestrel/blob/master/docs/guide.md) 找到細節。 Kestrel is: * fast * small * durable(耐久) * reliable(穩定) 例如,Twitter 使用 Kestrel 作為其消息傳遞基礎設施的骨干,如上所述[here](http://bhavin.directi.com/notes-on-kestrel-the-open-source-twitter-queue/). ## 添加項目至 Kestrel 首先,我們需要一個可以將項目添加到Kestrel隊列的程序。以下方法受益于 KestrelClient的實現 [storm-kestrel](https://github.com/nathanmarz/storm-kestrel). 它將句子添加到從包含五個可能句子的數組中隨機選擇的Kestrel隊列中。 ``` private static void queueSentenceItems(KestrelClient kestrelClient, String queueName) throws ParseError, IOException { String[] sentences = new String[] { "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"}; Random _rand = new Random(); for(int i=1; i<=10; i++){ String sentence = sentences[_rand.nextInt(sentences.length)]; String val = "ID " + i + " " + sentence; boolean queueSucess = kestrelClient.queue(queueName, val); System.out.println("queueSucess=" +queueSucess+ " [" + val +"]"); } } ``` ## 將項目從 Kestrel 移除 此方法將隊列中的項目排隊,而不是刪除。 ``` private static void dequeueItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError { for(int i=1; i&lt;=12; i++){ ``` Item item = kestrelClient.dequeue(queueName); if(item==null){ System.out.println("The queue (" + queueName + ") contains no items."); } else { byte[] data = item._data; String receivedVal = new String(data); System.out.println("receivedItem=" + receivedVal); } } ``` ``` 此方法將隊列中的項目排隊,然后將其刪除。 This method dequeues items from a queue and then removes them. ``` ``` private static void dequeueAndRemoveItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError { for(int i=1; i<=12; i++){ Item item = kestrelClient.dequeue(queueName); if(item==null){ System.out.println("The queue (" + queueName + ") contains no items."); } else { int itemID = item._id; byte[] data = item._data; String receivedVal = new String(data); kestrelClient.ack(queueName, itemID); System.out.println("receivedItem=" + receivedVal); } } } ``` ``` ## 連續添加項目至 Kestrel 這是我們的最終運行程序,以便連續地將句子項添加到本地運行的Kestrel服務器的名為 **sentence_queue** 的隊列中。 為了阻止它在控制臺中鍵入一個關閉括號 char ']' ,然后按 'Enter'。 ``` ``` import java.io.IOException; import java.io.InputStream; import java.util.Random; import org.apache.storm.spout.KestrelClient; import org.apache.storm.spout.KestrelClient.Item; import org.apache.storm.spout.KestrelClient.ParseError; public class AddSentenceItemsToKestrel { /** * @param args */ public static void main(String[] args) { InputStream is = System.in; char closing_bracket = ']'; int val = closing_bracket; boolean aux = true; try { KestrelClient kestrelClient = null; String queueName = "sentence_queue"; while(aux){ kestrelClient = new KestrelClient("localhost",22133); queueSentenceItems(kestrelClient, queueName); kestrelClient.close(); Thread.sleep(1000); if(is.available()>0){ if(val==is.read()) aux=false; } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ParseError e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("end"); } } ``` ``` ## 使用 KestrelSpout 該拓撲結構使用KestrelSpout從Kestrel隊列中讀取句子,將句子分解成其組成詞(Bolt: SplitSentence),然后為每個單詞發出它之前(Bolt: WordCount)所見到的次數。數據的處理方式如下所述 [Guaranteeing message processing](Guaranteeing-message-processing.html). ``` ``` TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentences", new KestrelSpout("localhost",22133,"sentence_queue",new StringScheme())); builder.setBolt("split", new SplitSentence(), 10) .shuffleGrouping("sentences"); builder.setBolt("count", new WordCount(), 20) .fieldsGrouping("split", new Fields("word")); ``` ``` ## 執行 首先,在生產或開發模式下啟動您本地的Kestrel服務器。 等大約5秒,以避免ConnectionException。 現在執行程序將項目添加到隊列并啟動Storm 拓撲。啟動程序的排序并不重要。 如果您使用TOPOLOGY_DEBUG運行拓撲,您應該會看到在拓撲中發出的元組。 ```
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看