# Clojure DSL
Storm配有Clojure DSL,用于定義spouts(噴口),bolts(螺栓)和topologies(拓撲)。 Clojure DSL可以訪問Java API暴露的所有內容,因此如果您是Clojure用戶,您可以直接編寫Storm拓撲,根本不需要使用Java。 Clojure DSL 的源碼在 [org.apache.storm.clojure](http://github.com/apache/storm/blob/master%0A/storm-core/src/clj/org/apache/storm/clojure.clj)命名空間中定義。
本頁概述了Clojure DSL的所有功能,包括:
1. Defining topologies(定義拓撲)
2. `defbolt`
3. `defspout`
4. Running topologies in local mode or on a cluster(在本地模式或集群上運行拓撲)
5. Testing topologies(測試拓撲)
### Defining topologies(定義拓撲)
請使用`topology`函來定義topology(拓撲)。`topology`有兩個參數:“spout specs(規格)”的映射和“bolt specs(規格)” 的映射。每個spouts specs(規格)和bolt specs(規格) 通過指定輸入和并行度 來將組件的代碼連接到topology中。
我們來看一下[storm-starter](http://github.com/apache/storm/blob/master%0A/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj) :項目中的topology定義示例:
```
(topology {"1" (spout-spec sentence-spout) "2" (spout-spec (sentence-spout-parameterized ["the cat jumped over the door" "greetings from a faraway land"]) :p 2)} {"3" (bolt-spec {"1" :shuffle "2" :shuffle} split-sentence :p 5) "4" (bolt-spec {"3" ["word"]} word-count :p 6)})
```
spout和bolt specs(規格)的映射是從組件ID到相應規格的映射。組件ID必須在映射上是唯一的。就像在Java中定義topologies(拓撲)一樣,在聲明topologies(拓撲)中的bolts 輸入時使用的組件ID。
#### spout-spec(spout-規格)
`spout-spec`作為spout實現(實現 [IRichSpout](javadocs/org/apache/storm/topology/IRichSpout.html))的對象)的可選關鍵字參數的參數 。當前存在的唯一選項是:`:p`選項,它指定了spout的并行性。如果您省略`:p`,則spout將作為單個任務執行。
#### bolt-spec(bolt-規格)
`bolt-spec`作為bolt實現(實現IRichBolt的對象)的可選關鍵字參數的參數
輸入聲明是從stream ids到stream groupings的映射。stream id 可以有以下兩種形式中的一種:
1. `[==component id== ==stream id==]`: Subscribes to a specific stream on a component(訂閱組件上的特定流)
2. `==component id==`: Subscribes to the default stream on a component(訂閱組件上的默認流)
stream grouping可以是以下之一:
1. `:shuffle`: 用shuffle grouping進行訂閱
2. Vector of field names, like ["id" "name"](%E5%A4%9A%E5%AD%97%E6%AE%B5%E5%90%8D,%20%E5%83%8F%C2%A0%60%5B%22id%22%20%22name%22%5D%60): 使用fields grouping訂閱指定的字段
3. `:global`: 使用global grouping進行訂閱
4. `:all`: 使用all grouping進行訂閱
5. `:direct`: 使用direct grouping進行訂閱
See [Concepts](Concepts.html) for more info on stream groupings. Here's an example input declaration showcasing the various ways to declare inputs: 有關stream groupings的更多信息,請參閱[概念](Concepts.html)。下面是一個輸入聲明的示例,展示各種聲明輸入的方法:
```
{["2" "1"] :shuffle "3" ["field1" "field2"] ["4" "2"] :global}
```
此輸入聲明共計三個流。它通過隨機分組來訂閱組件“2”上的流“1”,在字段“field1”和“field2”上以fields grouping的方式訂閱組件“3”上的默認流,使用全局分組在組件“4”上訂閱流“2”。
像`spout-spec`一樣,bolt-spec唯一當前支持的關鍵字參數是:p,它指定了bolt的并行性。
#### shell-bolt-spec (shell-bolt-規格)
`shell-bolt-spec` is used for defining bolts that are implemented in a non-JVM language. It takes as arguments the input declaration, the command line program to run, the name of the file implementing the bolt, an output specification, and then the same keyword arguments that `bolt-spec` accepts. `shell-bolt-spec`用于定義以非JVM語言實現的bolts。它作為輸入聲明參數,在命令行程序中運行,用文件的名稱實現bolt,輸出規范 以及接受的相同關鍵字參數作為參數的 `bolt-spec` 。
這有一個shell-bolt-spec的例子:
```
(shell-bolt-spec {"1" :shuffle "2" ["id"]} "python" "mybolt.py" ["outfield1" "outfield2"] :p 25)
```
輸出聲明的語法在下面的defbolt部分中有更詳細的描述。有關Storm的工作原理的詳細信息,請參閱[使用Storm的非JVM語言](Using-non-JVM-languages-with-Storm.html) 。
### defbolt
`defbolt` 用于在Clojure中定義bolts。這里對bolts有一個限制,那就是他必須是可序列化的,這就是為什么你不能僅僅具體化`IRichBolt`來實現一個bolts(closures不可序列化)。 `defbolt` 在這個限制的基礎上為定義bolts提供了一種更好的語法,而不僅僅是實現一個Java接口的。
在最充分的表現形勢下,`defbolt`支持參數化bolts,并在bolts執行期間保持關閉狀態。它還提供了用于定義不需要額外功能的bolts的快捷方式。 `defbolt`的簽名如下所示:
(defbolt _name_ _output-declaration_ *_option-map_ & _impl_)
省略option map(選項映射)相當于具有{:prepare false}的option map(選項映射)。
#### Simple bolts (簡單 bolts)
我們從最簡單的defbolt形式開始吧。這是一個將包含句子的元組分割成每個單詞的元組的示例bolt:
```
(defbolt split-sentence ["word"] [tuple collector] (let [words (.split (.getString tuple 0) " ")] (doseq [w words] (emit-bolt! collector [w] :anchor tuple)) (ack! collector tuple) ))
```
Since the option map is omitted, this is a non-prepared bolt. The DSL simply expects an implementation for the `execute` method of `IRichBolt`. The implementation takes two parameters, the tuple and the `OutputCollector`, and is followed by the body of the `execute` function. The DSL automatically type-hints the parameters for you so you don't need to worry about reflection if you use Java interop. (由于感覺有不準確的地方,先留著方便優化。) 由于省略了option map(選項映射),這是一個non-prepared bolt。 DSL只是期望執行一個IRichBolt的`execute`方法。該實現需要兩個參數,即tuple(元組)和`OutputCollector`,后面是`execute`函數的正文。 DSL會為你自動提示參數,所以如果您使用Java交互,不需要擔心反射問題。
This implementation binds `split-sentence` to an actual `IRichBolt` object that you can use in topologies, like so: 此實現將`split-sentence`綁定到一個可用于topologies實現的`IRichBolt`對象,如下所示: `clojure (bolt-spec {"1" :shuffle} split-sentence :p 5)`
#### Parameterized bolts (參數化 bolts)
有時候你想用其他參數來參數化你的bolts。例如,假設你想有一個可以接收到每個輸入字符串后綴的bolts,并且希望在運行時設置該后綴。你可以在defbolt中通過在option map(選項映射)中包含:`:params`選項來執行此操作,如下所示:
```
(defbolt suffix-appender ["word"] {:params [suffix]} [tuple collector] (emit-bolt! collector [(str (.getString tuple 0) suffix)] :anchor tuple) )
```
與前面的示例不同,`suffix-appender`將綁定到一個返回`IRichBolt`而不是直接作為`IRichBolt`對象的函數。這是通過在其option map(選項映射)中指定`:params`引起的。因此,在topology中使用`suffix-appender`,您可以執行以下操作:
```
(bolt-spec {"1" :shuffle} (suffix-appender "-suffix") :p 10)
```
#### Prepared bolts (準備 bolts)
要做更復雜的bolts,如加入和流聚合的bolt,bolt需要存儲狀態。您可以通過在option map(選項映射)中創建一個通過包含`{:prepare true}`指定的prepared bolt 來實現此目的。例如,思考下這個實現單詞計數的bolt:
```
(defbolt word-count ["word" "count"] {:prepare true} [conf context collector] (let [counts (atom {})] (bolt (execute [tuple] (let [word (.getString tuple 0)] (swap! counts (partial merge-with +) {word 1}) (emit-bolt! collector [word (@counts word)] :anchor tuple) (ack! collector tuple) )))))
```
prepared bolt的實現是通過一個函數 ,它將topology的配置“TopologyContext”和“OutputCollector”作為輸入,并返回“IBolt”接口的一個實現。此設計允許您圍繞`execute`和`cleanup`的實現時進行閉包。
在這個例子中,單詞計數存儲在一個名為`counts`的映射的閉包中。 `bolt`宏用于創建`IBolt`實現。 `bolt`宏是一種比簡化實現界面更簡潔的方法,它會自動提示所有的方法參數。該bolt實現了更新映射中的計數并發出新的單詞計數的執行方法。
請注意, prepared bolts 中的`execute`方法只能作為元組的輸入,因為`OutputCollector`已經在函數的閉包中(對于簡單的bolts,collector是`execute`函數的第二個參數)。
Prepared bolts 可以像 simple bolts 一樣進行參數化。
#### Output declarations (輸出聲明)
Clojure DSL具有用于bolt輸出的簡明語法。聲明輸出的最通用的方法就是從stream id到stream spec的映射。例如:
```
{"1" ["field1" "field2"] "2" (direct-stream ["f1" "f2" "f3"]) "3" ["f1"]}
```
stream id 是一個字符串,而stream spec(流規范)是個字段的向量或由`direct-stream`包裝的字段的向量。 `direct stream`將流標記為direct stream(有關直接流的更多詳細信息,請參閱[Concepts](Concepts.html) 和[Direct groupings](%E7%A9%BA%E7%9A%84%E3%80%82%E3%80%82))。
如果bolt只有一個輸出流,您可以使用向量而不用輸出聲明的映射來定義bolt的默認流。例如:
```
["word" "count"]
```
這段bolt輸出的聲明 為默認 stream id 上的字段[“word” “count”]。
#### Emitting, acking, and failing (發射,確認和失敗)
DSL可以使用`OutputCollector`:`emit-bolt!`,`emit-direct-bolt!`,`ack!`和`fail !`,而不用直接在`OutputCollector`上使用Java方法.
1. `emit-bolt!`:將“OutputCollector”,發出的值(一個Clojure sequence)和`:anchor`以及`:stream`的關鍵字參數作為參數。 `:anchor`可以是個single tuple或一個list of tuples,`:stream`是要發送到的流的id。 若省略關鍵字參數則默認流會發出一個unanchored tuple。
2. `emit-direct-bolt!`:將`OutputCollector`作為參數,發送元組的任務id,發送的值,以及把`:anchor`和`:stream`的關鍵字參數作為參數。 此函數只能發出聲明為direct streams的流。
3. `ack!`: 將“OutputCollector”作為元組確認參數。
4. `fail!`: 將“OutputCollector”作為元組失敗參數
有關確認和錨定的更多信息,請參閱[保證消息處理](Guaranteeing-message-processing.html)。
### defspout
`defspout`用于定義Clojure中的噴口。像螺栓一樣,噴口必須是可序列化的,所以您不能只是在“Clojure”中引用“IRichSpout”來執行噴口實現。 `defspout`圍繞這個限制,為定義spouts提供了一個更好的語法,而不僅僅是實現一個Java接口。
`defspout`的簽名如下:
(defspout _name_ _output-declaration_ *_option-map_ & _impl_)
如果你省略選項映射,則默認為{:prepare true}。 `defspout`的輸出聲明與`defbolt`語法相同。
這里有個實現`defspout`的一個例子[storm-starter](http://github.com/apache/storm/blob/master%0A/examples/storm-starter/src/clj/org/apache/storm/starter/clj/word_count.clj):
```
(defspout sentence-spout ["sentence"] [conf context collector] (let [sentences ["a little brown dog" "the man petted the dog" "four score and seven years ago" "an apple a day keeps the doctor away"]] (spout (nextTuple [] (Thread/sleep 100) (emit-spout! collector [(rand-nth sentences)]) ) (ack [id] ;; You only need to define this method for reliable spouts
;; (such as one that reads off of a queue like Kestrel)
;; This is an unreliable spout, so it does nothing here
))))
```
該實現將topology配置的“TopologyContext”和“SpoutOutputCollector”作為輸入。該實現返回一個`ISpout`對象。這里,`nextTuple`函數從`sentence`發出一個隨機語句。
這個spout不是可靠的,所以`ack`和`fail`方法永遠不會被調用。一個可靠的端口將在發出元組時添加一條消息ID,然后當元組完成或失敗時,將會調用`ack`或`fail`。有關Storm中可靠性如何工作的更多信息,請參閱[保證消息處理](Guaranteeing-message-processing.html)。
`emit-spout!`將“SpoutOutputCollector”和新元組的參數作為參數發送,并接受作為關鍵字參數`:stream`和`:id`。 `:stream`為指定要發送的流,`:id`為指定元組的消息ID(在`ack'`和`fail`回調中使用)。省略這些參數會為默認輸出流發出一個unanchored tuple。
這還有一個`emit-direct-spout !`函數,他會發出一個direct stream的元組,并附加一個任務id作為的第二個參數來發送這個元組。
Spouts可以像bolts一樣進行參數化,在這種情況下,symbol綁定到返回“IRichSpout”的函數而不是“IRichSpout”本身。您還可以聲明一個unprepared spout,它只定義`nextTuple`方法。以下是在運行時發出隨機語句參數化的unprepared spout示例:
```
(defspout sentence-spout-parameterized ["word"] {:params [sentences] :prepare false} [collector] (Thread/sleep 500) (emit-spout! collector [(rand-nth sentences)]))
```
以下示例說明了如何在`spout-spec`中使用此spout: `clojure (spout-spec (sentence-spout-parameterized ["the cat jumped over the door" "greetings from a faraway land"]) :p 2)`
### Running topologies in local mode or on a cluster (在本地模式或集群上運行topologies)
要想使用遠程模式或本地模式提交topologies,只需像Java一樣使用“StormSubmitter”或“LocalCluster”類。這就是Clojure DSL。
要創建topology配置,最簡單的方法是使用org.apache.storm.config命名空間來定義所有可能配置的常量。常量與“Config”類中的靜態常量相同,但是使用的是破折號而不是下劃線。例如,這有一個topology配置,將workers數設置為15,并以調試模式配置topology:
```
{TOPOLOGY-DEBUG true TOPOLOGY-WORKERS 15}
```
### Testing topologies (測試topologies)
關于測試Clojure中的topologies ,[博文](http://www.pixelmachine.org/2011/12/17/Testing-Storm-Topologies.html)及其[后續](http://www.pixelmachine.org/2011/%2012/21%20/%20Testing-Storm-Topology-Part-2.html) 很好地概述了Storm的強大內置功能。
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度