我們的這個例子有三個主要組件(見圖6-1)
* 一個基于Node.js的web應用,用于測試系統
* 一個Redis服務器,用于持久化數據
* 一個Storm拓撲,用于分布式實時處理數據
[](http://ifeve.com/getting-started-with-storm6/%e5%9b%be6-1/)
圖6-1 ?架構概覽
**NOTE:**你如果想先把這個例子運行起來,請首先閱讀[附錄C](http://ifeve.com/getting-started-with-storm-appendixc)
[**基于Node.js的web應用**](http://ifeve.com/getting-started-with-storm6/#the-nodejs-web-application)
我們已經偽造了簡單的電子商務網站。這個網站只有三個頁面:一個主頁、一個產品頁和一個產品統計頁面。這個應用基于[Express](http://expressjs.com/)和[Socket.io](http://socket.io/)兩個框架實現了向瀏覽器推送內容更新。制作這個應用的目的是為了讓你體驗Storm集群功能并看到處理結果,但它不是本書的重點,所以我們不會對它的頁面做更詳細描述。
*主頁*
這個頁面提供了全部有效產品的鏈接。它從Redis服務器獲取數據并在頁面上把它們顯示出來。這個頁面的URL是http://localhost:3000/。(見圖6-2,譯者注,圖6-2翻譯如下,全是文字就不制圖了)
**有效產品:**
DVD播放器(帶環繞立體聲系統)
全高清藍光dvd播放器
媒體播放器(帶USB 2.0接口)
全高清攝像機
防水高清攝像機
防震防水高清攝像機
反射式攝像機
雙核安卓智能手機(帶64GB SD卡)
普通移動電話
衛星電話
64GB SD卡
32GB SD卡
16GB SD卡
粉紅色智能手機殼
黑色智能手機殼
小山羊皮智能手機殼
圖6-2 首頁
*產品頁*
產品頁用來顯示指定產品的相關信息,例如,價格、名稱、分類。這個頁面的URL是:http://localhost:3000/product/:id。(見圖6-3,譯者注:全是文字不再制圖,翻譯如下)
**產品頁:32英寸液晶電視**
分類:電視機
價格:400
相關分類
圖6-3,產品頁
*產品統計頁*
這個頁面顯示通過收集用戶瀏覽站點,用Storm集群計算的統計信息。可以顯示為如下概要:瀏覽這個產品的用戶,在那些分類下面瀏覽了n次產品。該頁的URL是:http://localhost:3000/product/:id/stats。(見圖6-4,譯者注:全是文字,不再制圖,翻譯如下)
**瀏覽了該產品的用戶也瀏覽了以下分類的產品:**
1.攝像機
2.播放器
3.手機殼
4.存儲卡
圖6-4\. 產品統計視圖
## **啟動這個Node.js web應用**
首先啟動Redis服務器,然后執行如下命令啟動web應用:
~~~
node webapp/app.js
~~~
為了向你演示,這個應用會自動向Redis填充一些產品數據作為樣本。
## **Storm拓撲**
為這個系統搭建Storm拓撲的目標是改進產品統計的實時性。產品統計頁顯示了一個分類計數器列表,用來顯示訪問了其它同類產品的用戶數。這樣可以幫助賣家了解他們的用戶需求。拓撲接收瀏覽日志,并更新產品統計結果(見圖6-5)。
[](http://ifeve.com/getting-started-with-storm6/figure6-5/)
圖6-5 ?Storm拓撲的輸入與輸出
我們的Storm拓撲有五個組件:一個*spout*向拓撲提供數據,四個*bolt*完成統計任務。
UsersNavigationSpout
從用戶瀏覽數據隊列讀取數據發送給拓撲
GetCategoryBolt
從Redis服務器讀取產品信息,向數據流添加產品分類
UserHistoryBolt
讀取用戶以前的產品瀏覽記錄,向下一步分發Product:Category鍵值對,在下一步更新計數器
**ProductCategoriesCounterBolt**
追蹤用戶瀏覽特定分類下的商品次數
NewsNotifierBolt
通知web應用立即更新用戶界面
下圖展示了拓撲的工作方式(見圖6-6)
~~~
package storm.analytics;
...
public class TopologyStarter {
public static void main(String[] args) {
Logger.getRootLogger().removeAllAppenders();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("read-feed", new UsersNavigationSpout(),3);
builder.setBolt("get-categ", new GetCategoryBolt(),3)
.shuffleGrouping("read-feed");
builder.setBolt("user-history", new UserHistoryBolt(),5)
.fieldsGrouping("get-categ", new Fields("user"));
builder.setBolt("product-categ-counter", new ProductCategoriesCounterBolt(),5)
.fieldsGrouping("user-history", new Fields("product"));
builder.setBolt("news-notifier", new NewsNotifierBolt(),5)
.shuffleGrouping("product-categ-counter");
Config conf = new Config();
conf.setDebug(true);
conf.put("redis-host",REDIS_HOST);
conf.put("redis-port",REDIS_PORT);
conf.put("webserver", WEBSERVER);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("analytics", conf, builder.createTopology());
}
}
~~~
[](http://ifeve.com/getting-started-with-storm6/figure6-6storm-topology/)
Figure 6-6 Storm拓撲
**UsersNavigationSpout**
UsersNavigationSpout負責向拓撲提供瀏覽數據。每條瀏覽數據都是一個用戶瀏覽過的產品頁的引用。它們都被web應用保存在Redis服務器。我們一會兒就要看到更多信息。
你可以使用[https://github.com/xetorthio/jedis](https://github.com/xetorthio/jedis)從Redis服務器讀取數據,這是個極為輕巧簡單的Java Redis客戶端。
**NOTE:**下面的代碼塊就是相關代碼。
~~~
package storm.analytics;
public class UsersNavigationSpout extends BaseRichSpout {
Jedis jedis;
...
@Override
public void nextTuple() {
String content = jedis.rpop("navigation");
if(content==null || "nil".equals(content)){
try { Thread.sleep(300); } catch (InterruptedException e) {}
} else {
JSONObject obj=(JSONObject)JSONValue.parse(content);
String user = obj.get("user").toString();
String product = obj.get("product").toString();
String type = obj.get("type").toString();
HashMap<String, String> map = new HashMap<String, String>();
map.put("product", product);
NavigationEntry entry = new NavigationEntry(user, type, map);
collector.emit(new Values(user, entry));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("user", "otherdata"));
}
}
~~~
*spout*首先調用**jedis.rpop(“navigation”)**從Redis刪除并返回”navigation”列表最右邊的元素。如果列表已經是空的,就休眠0.3秒,以免使用忙等待循環阻塞服務器。如果得到一條數據(數據是JSON格式),就解析它,并創建一個包含該數據的**NavigationEntry**?POJO:
* 瀏覽頁面的用戶
* 用戶瀏覽的頁面類型
* 由頁面類型決定的額外頁面信息。“產品”頁的額外信息就是用戶瀏覽的產品ID。
*spout*調用**collector.emit(new Values(user, entry))**分發包含這些信息的元組。這個元組的內容是拓撲里下一個*bolt*的輸入。
**GetCategoryBolt**
這個*bolt*非常簡單。它只負責反序列化前面的*spout*分發的元組內容。如果這是產品頁的數據,就通過ProductsReader類從Redis讀取產品信息,然后基于輸入的元組再分發一個新的包含具體產品信息的元組:
* 用戶
* 產品
* 產品類別
~~~
package storm.analytics;
public class GetCategoryBolt extends BaseBasicBolt {
private ProductReader reader;
...
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
NavigationEntry entry = (NavigationEntry)input.getValue(1);
if("PRODUCT".equals(entry.getPageType())){
try {
String product = (String)entry.getOtherData().get("product");
//調用產品條目API,得到產品信息
Product itm = reader.readItem(product);
if(itm == null) {
return;
}
String categ = itm.getCategory();
collector.emit(new Values(entry.getUserId(), product, categ));
} catch (Exception ex) {
System.err.println("Error processing PRODUCT tuple"+ ex);
ex.printStackTrace();
}
}
}
...
}
~~~
正如前面所提到的, 使用ProductsReader類讀取產品具體信息。
~~~
package storm.analytics.utilities;
...
public class ProductReader {
...
public Product readItem(String id) throws Exception{
String content = jedis.get(id);
if(content == null || ("nil".equals(content))){
return null;
}
Object obj = JSONValue.parse(content);
JSONObjectproduct = (JSONObject)obj;
Product i = new Product((Long)product.get("id"),
(String)product.get("title"),
(Long)product.get("price"),
(String)product.get("category"));
return i;
}
...
}
~~~
**UserHistoryBolt**
UserHistoryBolt是整個應用的核心。它負責持續追蹤每個用戶瀏覽過的產品,并決定應當增加計數的鍵值對。
我們使用Redis保存用戶的產品瀏覽歷史,同時基于性能方面的考慮,還應該保留一份本地副本。我們把數據訪問細節隱藏在方法**getUserNavigationHistory(user)**和**addProductToHistory(user,prodKey)**里,分別用來讀/寫訪問。它們的實現如下
~~~
package storm.analytics;
...
public class UserHistoryBolt extends BaseRichBolt{
@Override
public void execute(Tuple input) {
String user = input.getString(0);
String prod1 = input.getString(1);
String cat1 = input.getString(2);
//產品鍵嵌入了產品類別信息
String prodKey = prod1+":"+cat1;
Set productsNavigated = getUserNavigationHistory(user);
//如果用戶以前瀏覽過->忽略它
if(!productsNavigated.contains(prodKey)) {
//否則更新相關條目
for (String other : productsNavigated) {
String[] ot = other.split(":");
String prod2 = ot[0];
String cat2 = ot[1];
collector.emit(new Values(prod1, cat2));
collector.emit(new Values(prod2, cat1));
}
addProductToHistory(user, prodKey);
}
}
}
~~~
需要注意的是,這個*bolt*的輸出是那些類別計數應當獲得增長的產品。
看一看代碼。這個*bolt*維護著一組被每個用戶瀏覽過的產品。值得注意的是,這個集合包含產品:類別鍵值對,而不是只有產品。這是因為你會在接下來的調用中用到類別信息,而且這樣也比每次從數據庫獲取更高效。這樣做的原因是基于以下考慮,產品可能只有一個類別,而且它在整個產品的生命周期當中不會改變。
讀取了用戶以前瀏覽過的產品集合之后(以及它們的類別),檢查當前產品以前有沒有被瀏覽過。如果瀏覽過,這條瀏覽數據就被忽略了。如果這是首次瀏覽,遍歷用戶瀏覽歷史,并執行**collector.emit(new Values(prod1,cat2))**分發一個元組,這個元組包含當前產品和所有瀏覽歷史類別。第二個元組包含所有瀏覽歷史產品和當前產品類別,由**collectior.emit(new Values(prod2,cat1))**。最后,將當前產品和它的類別添加到集合。
比如,假設用戶John有以下瀏覽歷史:
[](http://ifeve.com/getting-started-with-storm6/johns-navigation-history/)
下面是將要處理的瀏覽數據
[](http://ifeve.com/getting-started-with-storm6/johns-navigation-entry-to-be-processed/)
該用戶沒有瀏覽過產品8,因此你需要處理它。
因此要分發以下元組:
[](http://ifeve.com/getting-started-with-storm6/to-emit-tuples-for-johns-navigation/)
注意,左邊的產品和右邊的類別之間的關系應當作為一個整體遞增。
現在,讓我們看看這個*Bolt*用到的持久化實現。
~~~
public class UserHistoryBolt extends BaseRichBolt{
...
private Set getUserNavigationHistory(String user) {
Set userHistory = usersNavigatedItems.get(user);
if(userHistory == null) {
userHistory = jedis.smembers(buildKey(user));
if(userHistory == null)
userHistory = new HashSet();
usersNavigatedItems.put(user, userHistory);
}
return userHistory;
}
private void addProductToHistory(String user, String product) {
Set userHistory = getUserNavigationHistory(user);
userHistory.add(product);
jedis.sadd(buildKey(user), product);
}
...
}
~~~
**getUserNavigationHistory**方法返回用戶瀏覽過的產品集。首先,通過**usersNavigatedItems.get(*user*)**方法試圖從本地內存得到用戶瀏覽歷史,否則,使用**jedis.smembers(buildKey(user))**從Redis服務器獲取,并把數據添加到本地數據結構**usersNavigatedItems**。
當用戶瀏覽一個新產品時,調用**addProductToHistory**,通過**userHistory.add(product)**和**jedis.sadd(buildKey(user),product)**同時更新內存數據結構和Redis服務器。
需要注意的是,當你需要做并行化處理時,只要*bolt*在內存中維護著用戶數據,你就得首先通過用戶做域數據流分組(譯者注:原文是fieldsGrouping,詳細情況請見[第三章的域數據流組](http://ifeve.com/getting-started-with-storm-3#fields-grouping)),這是一件很重要的事情,否則集群內將會有用戶瀏覽歷史的多個不同步的副本。
### **ProductCategoriesCounterBolt**
該類持續追蹤所有的產品-類別關系。它通過由UsersHistoryBolt分發的產品-類別數據對更新計數。
每個數據對的出現次數保存在Redis服務器。基于性能方面的考慮,要使用一個本地讀寫緩存,通過一個后臺線程向Redis發送數據。
該*Bolt*會向拓撲的下一個*Bolt*——NewsNotifierBolt——發送包含最新記數的元組,這也是最后一個*Bolt*,它會向最終用戶廣播實時更新的數據。
~~~
public class ProductCategoriesCounterBolt extends BaseRichBolt {
...
@Override
public void execute(){
String product = input.getString(0);
String categ = input.getString(1);
int total = count(product, categ);
collector.emit(new Values(product, categ, total));
}
...
private int count(String product, String categ) {
int count = getProductCategoryCount(categ, product);
count++;
storeProductCategoryCount(categ, product, count);
return count;
}
...
}
~~~
這個*bolt*的持久化工作隱藏在**getProductCategoryCount**和**storeProductCategoryCount**兩個方法中。它們的具體實現如下:
~~~
package storm.analytics;
...
public class ProductCategoriesCounterBolt extends BaseRichBolt {
// 條目:分類 -> 計數
HashMap<String,Integer> counter = new HashMap<String, Integer>();
//條目:分類 -> 計數
HashMap<String,Integer> pendingToSave = new HashMap<String,Integer>();
...
public int getProductCategoryCount(String categ, String product) {
Integer count = counter.get(buildLocalKey(categ, product));
if(count == null) {
String sCount = jedis.hget(buildRedisKey(product), categ);
if(sCount == null || "nil".equals(sCount)) {
count = 0;
} else {
count = Integer.valueOf(sCount);
}
}
return count;
}
...
private void storeProductCategoryCount(String categ, String product, int count) {
String key = buildLocalKey(categ, product);
counter.put(key, count);
synchronized (pendingToSave) {
pendingToSave.put(key, count);
}
}
...
}
~~~
方法**getProductCategoryCount**首先檢查內存緩存計數器。如果沒有有效令牌,就從Redis服務器取得數據。
方法**storeProductCategoryCount**更新計數器緩存和pendingToSae緩沖。緩沖數據由下述后臺線程持久化。
~~~
package storm.analytics;
public class ProductCategoriesCounterBolt extends BaseRichBolt {
...
private void startDownloaderThread() {
TimerTask t = startDownloaderThread() {
@Override
public void run () {
HashMap<String, Integer> pendings;
synchronized (pendingToSave) {
pendings = pendingToSave;
pendingToSave = new HashMap<String,Integer>();
}
for (String key : pendings.keySet) {
String[] keys = key.split(":");
String product = keys[0];
String categ = keys[1];
Integer count = pendings.get(key);
jedis.hset(buildRedisKey(product), categ, count.toString());
}
}
};
timer = new Timer("Item categories downloader");
timer.scheduleAtFixedRate(t, downloadTime, downloadTime);
}
...
}
~~~
下載線程鎖定pendingToSave, 向Redis發送數據時會為其它線程創建一個新的緩沖。這段代碼每隔downloadTime毫秒運行一次,這個值可由拓撲配置參數download-time配置。download-time值越大,寫入Redis的次數就越少,因為一對數據的連續計數只會向Redis寫一次。
**NewsNotifierBolt**
為了讓用戶能夠實時查看統計結果,由NewsNotifierBolt負責向web應用通知統計結果的變化。通知機制由[Apache HttpClient](http://hc.apache.org/httpcomponents-client-4.3.x/index.html)通過HTTP POST訪問由拓撲配置參數指定的URL。POST消息體是JSON格式。
測試時把這個*bolt*從拜年中刪除。
| `01` | `package`?`storm.analytics;` |
| `02` | `...` |
| `03` | `public`?`class`?`NewsNotifierBolt?``extends`?`BaseRichBolt {` |
| `04` | `...` |
| `05` | `@Override` |
| `06` | `public`?`void`?`execute(Tuple input) {` |
| `07` | `String product = input.getString(``0``);` |
| `08` | `String categ = input.getString(``1``);` |
| `09` | `int`?`visits = input.getInteger(``2``);</code>` |
| `10` | ? |
| `11` | `String content =?``"{\"product\":\"+product+"``\``",\"categ\":\""``+categ+``"\",\"visits\":"``+visits+``"}"``;` |
| `12` | `HttpPost post =?``new`?`HttpPost(webserver);` |
| `13` | `try`?`{` |
| `14` | `post.setEntity(``new`?`StringEntity(content));` |
| `15` | `HttpResponse response = client.execute(post);` |
| `16` | `org.apache.http.util.EntityUtils.consume(response.getEntity());` |
| `17` | `}?``catch`?`(Exception e) {` |
| `18` | `e.printStackTrace();` |
| `19` | `reconnect();` |
| `20` | `}` |
| `21` | `}` |
| `22` | `...` |
| `23` | `}` |
Redis服務器
Redis是一種先進的、基于內存的、支持持久化的鍵值存儲(見[http://redis.io](http://redis.io/))。本例使用它存儲以下信息:
* 產品信息,用來為web站點服務
* 用戶瀏覽隊列,用來為Storm拓撲提供數據
* Storm拓撲的中間數據,用于拓撲發生故障時恢復數據
* Storm拓撲的處理結果,也就是我們期望得到的結果。
**產品信息**
Redis服務器以產品ID作為鍵,以JSON字符串作為值保存著產品信息。
| `1` | `redis-cli` |
| `2` | `redis 127.0.0.1:6379&``gt``; get 15` |
| `3` | `"{\"title\":\"Kids smartphone cover\",\"category\":\"Covers\",\"price\":30,\"``id``\":` |
| `4` | `15}"` |
**用戶瀏覽隊列**
用戶瀏覽隊列保存在Redis中一個鍵為navigation的先進先出隊列中。用戶瀏覽一個產品頁時,服務器從隊列左側添加用戶瀏覽數據。Storm集群不斷的從隊列右側獲取并移除數據。
| `01` | `redis 127.0.0.1:6379&``gt``; llen navigation` |
| `02` | `(integer) 5` |
| `03` | `redis 127.0.0.1:6379&``gt``; lrange navigation 0 4` |
| `04` | `1) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"1\",\"``type``\":` |
| `05` | `\``"PRODUCT\"}"` |
| `06` | `2) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"1\",\"``type``\":` |
| `07` | `\``"PRODUCT\"}"` |
| `08` | `3) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"2\",\"``type``\":` |
| `09` | `\``"PRODUCT\"}"` |
| `10` | `4) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"3\",\"``type``\":` |
| `11` | `\``"PRODUCT\"}"` |
| `12` | `5) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"5\",\"``type``\":` |
| `13` | `\``"PRODUCT\"}"` |
**中間數據**
集群需要分開保存每個用戶的歷史數據。為了實現這一點,它在Redis服務器上保存著一個包含所有用戶瀏覽過的產品和它們的分類的集合。
| `1` | `redis 127.0.0.1:6379&``gt``; smembers?``history``:59c34159-0ecb-4ef3-a56b-99150346f8d5` |
| `2` | `1)?``"1:Players"` |
| `3` | `2)?``"5:Cameras"` |
| `4` | `3)?``"2:Players"` |
| `5` | `4)?``"3:Cameras"` |
**結果**
Storm集群生成關于用戶瀏覽的有用數據,并把它們的產品ID保存在一個名為“prodcnt”的Redis hash中。
| `1` | `redis 127.0.0.1:6379&``gt``; hgetall prodcnt:2` |
| `2` | `1)?``"Players"` |
| `3` | `2)?``"1"` |
| `4` | `3)?``"Cameras"` |
| `5` | `4)?``"2"` |
**測試拓撲**
使用LocalCluster和一個本地Redis服務器執行測試(見圖6-7)。向Redis填充產品數據,偽造訪問日志。我們的斷言會在讀取拓撲向Redis輸出的數據時執行。測試用戶用java和groovy完成。
[](http://ifeve.com/getting-started-with-storm6/figure6-7/)
圖6-7\. 測試架構
**初始化測試**
初始化由以下三步組成:
**啟動LocalCluster并提交拓撲。**初始化在AbstractAnalyticsTest實現,所有測試用例都繼承該類。當初始化多個AbstractAnalyticsTest子類對象時,由一個名為topologyStarted的靜態標志屬性確定初始化工作只會進行一次。
需要注意的是,sleep語句是為了確保在試圖獲取結果之前LocalCluster已經正確啟動了。
| `01` | `public`?`abstract?``class`?`AbstractAnalyticsTest?``extends`?`Assert {` |
| `02` | `def`?`jedis` |
| `03` | `static`?`topologyStarted = false` |
| `04` | `static`?`sync=?``new`?`Object()` |
| `05` | `private`?`void`?`reconnect() {` |
| `06` | `jedis =?``new`?`Jedis(TopologyStarter.REDIS_HOST, TopologyStarter.REDIS_PORT)` |
| `07` | `}` |
| `08` | `@Before` |
| `09` | `public`?`void`?`startTopology(){` |
| `10` | `synchronized(sync){` |
| `11` | `reconnect()` |
| `12` | `if``(!topologyStarted){` |
| `13` | `jedis.flushAll()` |
| `14` | `populateProducts()` |
| `15` | `TopologyStarter.testing = true` |
| `16` | `TopologyStarter.main(``null``)` |
| `17` | `topologyStarted = true` |
| `18` | `sleep?``1000` |
| `19` | `}` |
| `20` | `}` |
| `21` | `}` |
| `22` | `...` |
| `23` | `public`?`void`?`populateProducts() {` |
| `24` | `def`?`testProducts = [` |
| `25` | `[id:?``0``, title:``"Dvd player with surround sound system"``,` |
| `26` | `category:``"Players"``, price:?``100``],` |
| `27` | `[id:?``1``, title:``"Full HD Bluray and DVD player"``,` |
| `28` | `category:``"Players"``, price:``130``],` |
| `29` | `[id:?``2``, title:``"Media player with USB 2.0 input"``,` |
| `30` | `category:``"Players"``, price:``70``],` |
| `31` | `...` |
| `32` | `[id:?``21``, title:``"TV Wall mount bracket 50-55 Inches"``,` |
| `33` | `category:``"Mounts"``, price:``80``]` |
| `34` | `]` |
| `35` | `testProducts.``each``() { product ->` |
| `36` | `def`?`val =` |
| `37` | `"{ \"title\": \"${product.title}\" , \"category\": \"${product.category}\","`?`+` |
| `38` | `" \"price\": ${product.price}, \"id\": ${product.id} }"` |
| `39` | `println`?`val` |
| `40` | `jedis.set(product.id.toString(), val.toString())` |
| `41` | `}` |
| `42` | `}` |
| `43` | `...` |
| `44` | `}` |
**在AbstractAnalyticsTest中實現一個名為navigate的方法。**為了測試不同的場景,我們要模擬用戶瀏覽站點的行為,這一步向Redis的瀏覽隊列(譯者注:就是前文提到的鍵是navigation的隊列)插入瀏覽數據。
| `01` | `public`?`abstract?``class`?`AbstractAnalyticsTest?``extends`?`Assert {` |
| `02` | `...` |
| `03` | `public`?`void`?`navigate(user, product) {` |
| `04` | `String nav =` |
| `05` | `"{\"user\": \"${user}\", \"product\": \"${product}\", \"type\": \"PRODUCT\"}"``.toString()` |
| `06` | `println`?`"Pushing navigation: ${nav}"` |
| `07` | `jedis.lpush(``'navigation'``, nav)` |
| `08` | `}` |
| `09` | `...` |
| `10` | `}` |
**實現一個名為getProductCategoryStats的方法,用來讀取指定產品與分類的數據。**不同的測試同樣需要斷言統計結果,以便檢查拓撲是否按照期望的那樣執行了。
| `01` | `public`?`abstract?``class`?`AbstractAnalyticsTest?``extends`?`Assert {` |
| `02` | `...` |
| `03` | `public`?`int`?`getProductCategoryStats(String product, String categ) {` |
| `04` | `String?``count`?`= jedis.hget(``"prodcnt:${product}"``, categ)` |
| `05` | `if``(``count`?`==?``null`?`||?``"nil"``.equals(``count``))` |
| `06` | `return`?`0` |
| `07` | `return`?`Integer.valueOf(``count``)` |
| `08` | `}` |
| `09` | `...` |
| `10` | `}` |
**一個測試用例**
下一步,為用戶“1”模擬一些瀏覽記錄,并檢查結果。注意執行斷言之前要給系統留出兩秒鐘處理數據。(記住**ProductCategoryCounterBolt**維護著一份計數的本地副本,它是在后臺異步保存到Redis的。)
| `01` | `package`?`functional` |
| `02` | `class`?`StatsTest?``extends`?`AbstractAnalyticsTest {` |
| `03` | `@Test` |
| `04` | `public`?`void`?`testNoDuplication(){` |
| `05` | `navigate(``"1"``,?``"0"``)?``// Players` |
| `06` | `navigate(``"1"``,?``"1"``)?``// Players` |
| `07` | `navigate(``"1"``,?``"2"``)?``// Players` |
| `08` | `navigate(``"1"``,?``"3"``)?``// Cameras` |
| `09` | `Thread.sleep(``2000``)?``// Give two seconds for the system to process the data.` |
| `10` | `assertEquals?``1``, getProductCategoryStats(``"0"``,?``"Cameras"``)` |
| `11` | `assertEquals?``1``, getProductCategoryStats(``"1"``,?``"Cameras"``)` |
| `12` | `assertEquals?``1``, getProductCategoryStats(``"2"``,?``"Cameras"``)` |
| `13` | `assertEquals?``2``, getProductCategoryStats(``"0"``,?``"Players"``)` |
| `14` | `assertEquals?``3``, getProductCategoryStats(``"3"``,?``"Players"``)` |
| `15` | `}` |
| `16` | `}` |
**對可擴展性和可用性的提示**
為了能在一章的篇幅中講明白整個方案,它已經被簡化了。正因如此,一些與可擴展性和可用性有關的必要復雜性也被去掉了。這方面主要有兩個問題。
Redis服務器不只是一個故障的節點,還是性能瓶頸。你能接收的數據最多就是Redis能處理的那些。Redis可以通過分片增強擴展性,它的可用性可以通過主從配置得到改進。這都需要修改拓撲和web應用的代碼實現。
另一個缺點就是web應用不能通過增加服務器成比例的擴展。這是因為當產品統計數據發生變化時,需要通知所有關注它的瀏覽器。這一“通知瀏覽器”的機制通過Socket.io實現,但是它要求監聽器和通知器在同一主機上。這一點只有當**GET /product/:id/stats**和**POST /news**滿足以下條件時才能實現,那就是這二者擁有相同的分片標準,確保引用相同產品的請求由相同的服務器處理。
**原創文章,轉載請注明:**?轉載自[并發編程網 – ifeve.com](http://ifeve.com/)
**本文鏈接地址:**?[Storm入門之第6章一個實際的例子](http://ifeve.com/getting-started-with-storm6/)