# Storm and Kestrel
本頁介紹如何使用Storm消費來自Kestrel集群的項目。
## 準備階段
### Storm
本教程使用的示例來自于 [storm-kestrel](https://github.com/nathanmarz/storm-kestrel) 項目和 [storm-starter](http://github.com/apache/storm/blob//examples/storm-starter) 項目。建議你克隆這些項目并根著示例走。 閱讀 [Setting up development environment](Setting-up-development-environment.html) 和 [Creating a new Storm project](Creating-a-new-Storm-project.html) 來設置你的機器。
### Kestrel
它假設您可以在本地選擇Kestrel 服務器,如上所述 [here](https://github.com/nathanmarz/storm-kestrel).
## Kestrel 服務 和 隊列
單個kestrel服務器具有一組隊列。Kestrel 隊列是在JVM上運行的非常簡單的消息隊列,并使用memcache協議(具有一定的擴展名)與客戶端進行通信。對于更加詳細的信息,你可查看 [KestrelThriftClient](https://github.com/nathanmarz/storm-kestrel/blob/master/src/jvm/org/apache/storm/spout/KestrelThriftClient.java) 類里面提供的 [storm-kestrel](https://github.com/nathanmarz/storm-kestrel) 項目.
每個隊列按照FIFO(先進先出)的原則進行嚴格排序。跟隨性能項目緩存在系統內存中;但是,只有前128MB保存在內存中。當服務器停止時,隊列狀態存儲在日志文件中。
此外,還可以從 [here](https://github.com/nathanmarz/kestrel/blob/master/docs/guide.md) 找到細節。
Kestrel is: * fast * small * durable(耐久) * reliable(穩定)
例如,Twitter 使用 Kestrel 作為其消息傳遞基礎設施的骨干,如上所述[here](http://bhavin.directi.com/notes-on-kestrel-the-open-source-twitter-queue/).
## 添加項目至 Kestrel
首先,我們需要一個可以將項目添加到Kestrel隊列的程序。以下方法受益于 KestrelClient的實現 [storm-kestrel](https://github.com/nathanmarz/storm-kestrel). 它將句子添加到從包含五個可能句子的數組中隨機選擇的Kestrel隊列中。
```
private static void queueSentenceItems(KestrelClient kestrelClient, String queueName)
throws ParseError, IOException {
String[] sentences = new String[] {
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"};
Random _rand = new Random();
for(int i=1; i<=10; i++){
String sentence = sentences[_rand.nextInt(sentences.length)];
String val = "ID " + i + " " + sentence;
boolean queueSucess = kestrelClient.queue(queueName, val);
System.out.println("queueSucess=" +queueSucess+ " [" + val +"]");
}
}
```
## 將項目從 Kestrel 移除
此方法將隊列中的項目排隊,而不是刪除。 ``` private static void dequeueItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError { for(int i=1; i<=12; i++){
```
Item item = kestrelClient.dequeue(queueName);
if(item==null){
System.out.println("The queue (" + queueName + ") contains no items.");
}
else
{
byte[] data = item._data;
String receivedVal = new String(data);
System.out.println("receivedItem=" + receivedVal);
}
}
```
```
此方法將隊列中的項目排隊,然后將其刪除。
This method dequeues items from a queue and then removes them.
```
```
private static void dequeueAndRemoveItems(KestrelClient kestrelClient, String queueName)
throws IOException, ParseError
{
for(int i=1; i<=12; i++){
Item item = kestrelClient.dequeue(queueName);
if(item==null){
System.out.println("The queue (" + queueName + ") contains no items.");
}
else
{
int itemID = item._id;
byte[] data = item._data;
String receivedVal = new String(data);
kestrelClient.ack(queueName, itemID);
System.out.println("receivedItem=" + receivedVal);
}
}
}
```
```
## 連續添加項目至 Kestrel
這是我們的最終運行程序,以便連續地將句子項添加到本地運行的Kestrel服務器的名為 **sentence_queue** 的隊列中。
為了阻止它在控制臺中鍵入一個關閉括號 char ']' ,然后按 'Enter'。
```
```
import java.io.IOException;
import java.io.InputStream;
import java.util.Random;
import org.apache.storm.spout.KestrelClient;
import org.apache.storm.spout.KestrelClient.Item;
import org.apache.storm.spout.KestrelClient.ParseError;
public class AddSentenceItemsToKestrel {
/**
* @param args
*/
public static void main(String[] args) {
InputStream is = System.in;
char closing_bracket = ']';
int val = closing_bracket;
boolean aux = true;
try {
KestrelClient kestrelClient = null;
String queueName = "sentence_queue";
while(aux){
kestrelClient = new KestrelClient("localhost",22133);
queueSentenceItems(kestrelClient, queueName);
kestrelClient.close();
Thread.sleep(1000);
if(is.available()>0){
if(val==is.read())
aux=false;
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (ParseError e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("end");
}
}
```
```
## 使用 KestrelSpout
該拓撲結構使用KestrelSpout從Kestrel隊列中讀取句子,將句子分解成其組成詞(Bolt: SplitSentence),然后為每個單詞發出它之前(Bolt: WordCount)所見到的次數。數據的處理方式如下所述 [Guaranteeing message processing](Guaranteeing-message-processing.html).
```
```
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("localhost",22133,"sentence_queue",new StringScheme()));
builder.setBolt("split", new SplitSentence(), 10)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
.fieldsGrouping("split", new Fields("word"));
```
```
## 執行
首先,在生產或開發模式下啟動您本地的Kestrel服務器。
等大約5秒,以避免ConnectionException。
現在執行程序將項目添加到隊列并啟動Storm 拓撲。啟動程序的排序并不重要。
如果您使用TOPOLOGY_DEBUG運行拓撲,您應該會看到在拓撲中發出的元組。
```
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度