有時候你可能想使用不是基于JVM的語言開發一個Storm工程,你可能更喜歡使用別的語言或者想使用用某種語言編寫的庫。
Storm是用Java實現的,你看到的所有這本書中的*spout*和*bolt*都是用java編寫的。那么有可能使用像Python、Ruby、或者JavaScript這樣的語言編寫*spout*和*bolt*嗎?答案是當然
可以!可以使用*多語言協議*達到這一目的。
多語言協議是Storm實現的一種特殊的協議,它使用標準輸入輸出作為*spout*和*bolt*進程間的通訊通道。消息以JSON格式或純文本格式在通道中傳遞。
我們看一個用非JVM語言開發*spout*和*bolt*的簡單例子。在這個例子中有一個*spout*產生從1到10,000的數字,一個*bolt*過濾素數,二者都用PHP實現。
**NOTE:?**在這個例子中,我們使用一個很笨的辦法驗證素數。有更好當然也更復雜的方法,它們已經超出了這個例子的范圍。
有一個專門為Storm實現的PHP DSL(譯者注:領域特定語言),我們將會在例子中展示我們的實現。首先定義拓撲。
~~~
...
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("numbers-generator", new NumberGeneratorSpout(1, 10000));
builder.setBolt("prime-numbers-filter", new
PrimeNumbersFilterBolt()).shuffleGrouping("numbers-generator");
StormTopology topology = builder.createTopology();
...
~~~
**NOTE:**有一種使用非JVM語言定義拓撲的方式。既然Storm拓撲是Thrift架構,而且*Nimbus*是一個Thrift守護進程,你就可以使用任何你想用的語言創建并提交拓撲。但是這已經超出了本書的范疇了。
這里沒什么新鮮了。我們看一下**NumbersGeneratorSpout**的實現。
~~~
public class NumberGeneratorSpout extends ShellSpout implements IRichSpout {
public NumberGeneratorSpout(Integer from, Integer to) {
super("php", "-f", "NumberGeneratorSpout.php", from.toString(), to.toString());
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("number"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
~~~
你可能已經注意到了,這個*spout*繼承了**ShellSpout**。這是個由Storm提供的特殊的類,用來幫助你運行并控制用其它語言編寫的*spout*。在這種情況下它告訴Storm如何執行你的PHP腳本。
NumberGeneratorSpout的PHP腳本向標準輸出分發元組,并從標準輸入讀取確認或失敗信號。
在開始實現NumberGeneratorSpout.php腳本之前,多觀察一下多語言協議是如何工作的。
*spout*按照傳遞給構造器的參數從**from**到**to**順序生成數字。
接下來看看**PrimeNumbersFilterBolt**。這個類實現了之前提到的殼。它告訴Storm如何執行你的PHP腳本。Storm為這一目的提供了一個特殊的叫做**ShellBolt**的類,你惟一要做的事就是指出如何運行腳本以及聲明要分發的屬性。
~~~
public class PrimeNumbersFilterBolt extends ShellBolt implements IRichBolt {
public PrimeNumbersFilterBolt() {
super("php", "-f", "PrimeNumbersFilterBolt.php");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("number"));
}
}
~~~
在這個構造器中只是告訴Storm如何運行PHP腳本。它與下列命令等價。
~~~
php -f PrimeNumbersFilterBolt.php
~~~
PrimeNumbersFilterBolt.php腳本從標準輸入讀取元組,處理它們,然后向標準輸出分發、確認或失敗。在開始這個腳本之前,我們先多了解一些多語言協議的工作方式。
1. 發起一次握手
2. 開始循環
3. 讀/寫元組
**NOTE:**有一種特殊的方式可以使用Storm的內建日志機制在你的腳本中記錄日志,所以你不需要自己實現日志系統。
下面我們來看一看上述每一步的細節,以及如何用PHP實現它。
**發起握手**
為了控制整個流程(開始以及結束它),Storm需要知道它執行的腳本進程號(PID)。根據多語言協議,你的進程開始時發生的第一件事就是Storm要向標準輸入(譯者注:根據上下文理解,本章提到的標準輸入輸出都是從非JVM語言的角度理解的,這里提到的標準輸入也就是PHP的標準輸入)發送一段JSON數據,它包含Storm配置、拓撲上下文和一個進程號目錄。它看起來就像下面的樣子:
~~~
{
"conf": {
"topology.message.timeout.secs": 3,
// etc
},
"context": {
"task->component": {
"1": "example-spout",
"2": "__acker",
"3": "example-bolt"
},
"taskid": 3
},
"pidDir": "..."
}
~~~
腳本進程必須在**pidDir**指定的目錄下以自己的進程號為名字創建一個文件,并以JSON格式把進程號寫到標準輸出。
~~~
{"pid": 1234}
~~~
舉個例子,如果你收到**/tmp/example\n**而你的腳本進程號是123,你應該創建一個名為**/tmp/example/123**的空文件并向標準輸出打印文本行?**{“pid”: 123}\n**(譯者注:此處原文只有一個n,譯者猜測應是排版錯誤)和**end\n**。這樣Storm就能持續追蹤進程號并在它關閉時殺死腳本進程。下面是PHP實現:
~~~
$config = json_decode(read_msg(), true);
$heartbeatdir = $config['pidDir'];
$pid = getmypid();
fclose(fopen("$heartbeatdir/$pid", "w"));
storm_send(["pid"=>$pid]);
flush();
~~~
你已經實現了一個叫做**read_msg**的函數,用來處理從標準輸入讀取的消息。按照多語言協議的聲明,消息可以是單行或多行JSON文本。一條消息以**end\n**結束。
~~~
function read_msg() {
$msg = "";
while(true) {
$l = fgets(STDIN);
$line = substr($l,0,-1);
if($line=="end") {
break;
}
$msg = "$msg$line\n";
}
return substr($msg, 0, -1);
}
function storm_send($json) {
write_line(json_encode($json));
write_line("end");
}
function write_line($line) {
echo("$line\n");
}
~~~
**NOTE:**flush()方法非常重要;有可能字符緩沖只有在積累到一定程度時才會清空。這意味著你的腳本可能會為了等待一個來自Storm的輸入而永遠掛起,而Storm卻在等待來自你的腳本的輸出。因此當你的腳本有內容輸出時立即清空緩沖是很重要的。
**開始循環以及讀/寫元組**
這是整個工作中最重要的一步。這一步的實現取決于你開發的*spout*和*bolt*。
如果是*spout*,你應當開始分發元組。如果是*bolt*,就循環讀取元組,處理它們,分發它發,確認成功或失敗。
下面我們就看看用來分發數字的*spout*。
~~~
$from = intval($argv[1]);
$to = intval($argv[2]);
while(true) {
$msg = read_msg();
$cmd = json_decode($msg, true);
if ($cmd['command']=='next') {
if ($from<$to) {
storm_emit(array("$from"));
$task_ids = read_msg();
$from++;
} else {
sleep(1);
}
}
storm_sync();
}
~~~
從命令行獲取參數**from**和**to**,并開始迭代。每次從Storm得到一條**next**消息,這意味著你已準備好分發下一個元組。
一旦你發送了所有的數字,而且沒有更多元組可發了,就休眠一段時間。
為了確保腳本已準備好發送下一個元組,Storm會在發送下一條之前等待**sync\n**文本行。調用**read_msg()**,讀取一條命令,解析JSON。
對于*bolts*來說,有少許不同。
~~~
while(true) {
$msg = read_msg();
$tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
if (!empty($tuple["id"])) {
if (isPrime($tuple["tuple"][0])) {
storm_emit(array($tuple["tuple"][0]));
}
storm_ack($tuple["id"]);
}
}
~~~
循環的從標準輸入讀取元組。解析讀取每一條JSON消息,判斷它是不是一個元組,如果是,再檢查它是不是一個素數,如果是素數再次分發一個元組,否則就忽略掉,最后不論如何都要確認成功。
**NOTE:**在**json_decode**函數中使用的**JSON_BIGINT_AS_STRING**是為了解決一個在JAVA和PHP之間的數據轉換問題。JAVA發送的一些很大的數字,在PHP中會丟失精度,這樣就會導致問題。為了避開這個問題,告訴PHP把大數字當作字符串處理,并在JSON消息中輸出數字時不使用雙引號。PHP5.4.0或更高版本要求使用這個參數。
**emit,ack,fail,**以及**log**消息都是如下結構:
**emit**
~~~
{
"command": "emit",
"tuple": ["foo", "bar"]
}
~~~
其中的數組包含了你分發的元組數據。
**ack**
~~~
{
"command": "ack",
"id": 123456789
}
~~~
其中的id就是你處理的元組的ID。
**fail**
~~~
{
"command": "fail",
"id": 123456789
}
~~~
與**ack**(譯者注:原文是**emit**從上下JSON的內容和每個方法的功能上判斷此處就是**ack**,可能是排版錯誤)相同,其中id就是你處理的元組ID。
**log
**
~~~
{
"command": "log",
"msg": "some message to be logged by storm."
}
~~~
下面是完整的的PHP代碼。
~~~
//你的spout:
<?php
function read_msg() {
$msg = "";
while(true) {
$l = fgets(STDIN);
$line = substr($l,0,-1);
if ($line=="end") {
break;
}
$msg = "$msg$line\n";
}
return substr($msg, 0, -1);
}
function write_line($line) {
echo("$line\n");
}
function storm_emit($tuple) {
$msg = array("command" => "emit", "tuple" => $tuple);
storm_send($msg);
}
function storm_send($json) {
write_line(json_encode($json));
write_line("end");
}
function storm_sync() {
storm_send(array("command" => "sync"));
}
function storm_log($msg) {
$msg = array("command" => "log", "msg" => $msg);
storm_send($msg);
flush();
}
$config = json_decode(read_msg(), true);
$heartbeatdir = $config['pidDir'];
$pid = getmypid();
fclose(fopen("$heartbeatdir/$pid", "w"));
storm_send(["pid"=>$pid]);
flush();
$from = intval($argv[1]);
$to = intval($argv[2]);
while(true) {
$msg = read_msg();
$cmd = json_decode($msg, true);
if ($cmd['command']=='next') {
if ($from<$to) {
storm_emit(array("$from"));
$task_ids = read_msg();
$from++;
} else {
sleep(1);
}
}
storm_sync();
}
?>
//你的bolt:
<?php
function isPrime($number) {
if ($number < 2) {
return false;
}
if ($number==2) {
return true;
}
for ($i=2; $i<=$number-1; $i++) {
if ($number % $i == 0) {
return false;
}
}
return true;
}
function read_msg() {
$msg = "";
while(true) {
$l = fgets(STDIN);
$line = substr($l,0,-1);
if ($line=="end") {
break;
}
$msg = "$msg$line\n";
}
return substr($msg, 0, -1);
}
function write_line($line) {
echo("$line\n");
}
function storm_emit($tuple) {
$msg = array("command" => "emit", "tuple" => $tuple);
storm_send($msg);
}
function storm_send($json) {
write_line(json_encode($json));
write_line("end");
}
function storm_ack($id) {
storm_send(["command"=>"ack", "id"=>"$id"]);
}
function storm_log($msg) {
$msg = array("command" => "log", "msg" => "$msg");
storm_send($msg);
}
$config = json_decode(read_msg(), true);
$heartbeatdir = $config['pidDir'];
$pid = getmypid();
fclose(fopen("$heartbeatdir/$pid", "w"));
storm_send(["pid"=>$pid]);
flush();
while(true) {
$msg = read_msg();
$tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
if (!empty($tuple["id"])) {
if (isPrime($tuple["tuple"][0])) {
storm_emit(array($tuple["tuple"][0]));
}
storm_ack($tuple["id"]);
}
}
?>
~~~
**NOTE:**需要重點指出的是,應當把所有的腳本文件保存在你的工程目錄下的一個名為**multilang/resources**的子目錄中。這個子目錄被包含在發送給工人進程的jar文件中。如果你不把腳本包含在這個目錄中,Storm就不能運行它們,并拋出一個錯誤。
**原創文章,轉載請注明:**?轉載自[并發編程網 – ifeve.com](http://ifeve.com/)
**本文鏈接地址:**?[Storm入門之第7章使用非JVM語言開發](http://ifeve.com/getting-started-with-storm7/)