# Flux
一個框架,為了讓創建和部署Apache Storm“流”計算遇上更方便快捷
## 定義
**flux** |fl?ks| _名詞_
1. 流動或者流出的這個動作過程 這個動作或者流出的過程
2. 持續不斷的改變
3. 在物理中,表明液體、輻射能或者顆粒在指定區域內的流速
4. 一個混合了固體用來降低其熔點的物質
## 基本原理
當配置很難被編程的時候會發生糟糕的事情。沒有人應因為需要修改配置而重新編譯或者重新打包一個應用。
## 相關
Flux是一個用來讓規定和部署Apache Storm拓撲不那么費勁的框架和一系列工具。
你是否發現你總是重復這樣的模式?:
```
public static void main(String[] args) throws Exception {
// 返回的邏輯值用來判斷我們是否在本地上運行
// 創建必要的配置選項...
boolean runLocal = shouldRunLocal();
if(runLocal){
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, topology);
} else {
StormSubmitter.submitTopology(name, conf, topology);
}
}
```
像這樣的操作會不會更容易一些呢:
```
storm jar mytopology.jar org.apache.storm.flux.Flux --local config.yaml
```
或者:
```
storm jar mytopology.jar org.apache.storm.flux.Flux --remote config.yaml
```
另一個比較經常提及的麻煩點在于由于寫拓撲圖常常是和Java代碼緊密結合的,所以任何對它的修改都需要重新編譯和重新打包拓撲的jar文件。Flux的目標是允許你將你所有的Storm組件打包在一個單獨的jar文件中,然后使用另一個文本文件來規定你的拓撲的布局和配置。通過這樣的方式,緩解這一個麻煩。
## 特點
* 安裝和部署Storm拓撲(包括Storm core和Microbatch API)簡單,而不是用內嵌的配置方法在你的拓撲代碼中安裝和部署。
* 支持已有的拓撲代碼(如下可見)
* 通過使用靈活的YAML DSL定義Storm Core API(Spouts/Bolts)。
* YAML DSL支持大多數的Storm組件 (storm-kafka, storm-hdfs, storm-hbase, 等等.)
* 對多語言的組件有便捷的支持
* 為了更簡便地在配置/環境間轉換,使用了外部屬性的置換/過濾(類似于Maven風格的`${variable.name}`置換)。
## 用法
為了使用Flux,把它添加到依賴包中,然后把你所有的Storm組件打包成一個很大的jar文件,再然后創建一個YAML文件來定義你的拓撲(下面有YAML配置選項)。
### 通過源碼來構建
使用Flux最簡單的方法就是將它作為Maven依賴包添加到項目中,如下面描述的那樣。
如果你要從源代碼中創建Flux并進行單元/集成的測試,你需要在你的系統上按照如下操作來安裝:
* Python 2.6.x or later
* Node.js 0.10.x or later
#### 創建能夠使用單元測試的命令:
```
mvn clean install
```
#### 創建不能夠使用單元測試的命令:
如果你希望在不安裝Python和Node.js的情況下構建Flux,你可以跳過這個單元測試:
```
mvn clean install -DskipTests=true
```
需要注意,如果你打算使用Flux來給遠程的簇部署拓撲,你仍然需要安裝Python,因為Apche Storm要求這么做。
#### 創建能夠使用集成測試的命令:
```
mvn clean install -DskipIntegration=false
```
### 和Maven一起打包
為了保證Flux能對你的Storm組件有效,你需要把Flux當做依賴包添加,這樣才能讓它包含在Storm的拓撲jar文件中。這個可以通過Maven shade插件(推薦)或者Maven assembly插件(不推薦)來完成。
#### Flux Maven依賴包
Flux現在的版本可以在以下的合作方的Maven中心獲得: `xml <dependency> <groupId>org.apache.storm</groupId> <artifactId>flux-core</artifactId> <version>${storm.version}</version> </dependency>`
使用shell的spouts和bolt要求附加的Flux Wrappers庫: `xml <dependency> <groupId>org.apache.storm</groupId> <artifactId>flux-wrappers</artifactId> <version>${storm.version}</version> </dependency>`
#### 創建一個允許使用Flux的拓撲JAR文件
下述的例子闡述了如何配合Maven shade插件使用Flux:
```
<!-- 在shaded jar文件中包含FLux和用戶依賴包 -->
<dependencies>
<!-- Flux include -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>flux-core</artifactId>
<version>${storm.version}</version>
</dependency>
<!-- Flux Wrappers include -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>flux-wrappers</artifactId>
<version>${storm.version}</version>
</dependency>
<!-- 在這里添加用戶依賴包... -->
</dependencies>
<!-- 創建一個包括所有依賴包的大大的jar文件 -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.4</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.storm.flux.Flux</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
```
### 部署和運行Flux拓撲
一旦你的拓撲組件和Flux的依賴包一起打包后,你就可以通過使用 `storm jar` 命令在本地或者遠端運行不同的拓撲。比如說,如果你的大大的jar文件命名為 `myTopology-0.1.0-SNAPSHOT.jar` ,你可以使用以下的命令在本地運行它:
```
storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml
```
### 命令行的參數選項
```
usage: storm jar <my_topology_uber_jar.jar> org.apache.storm.flux.Flux
[options] <topology-config.yaml>
-d,--dry-run 不運行/部署這個拓撲,僅僅是構建、驗證和打印這個拓撲的相關信息。
-e,--env-filter 執行環境變量的替換。 以形式為 `${ENV-[NAME]}` 定義的替換關鍵字將會替換 `NAME` 對應的環境變量值。
-f,--filter <file> 執行屬性替換。使用一個指定的文件作為屬性的源,然后形式為 {$[property name]} 的替換關鍵字將會替換在這個屬性文件中的值。.
-i,--inactive 部署但是不激活這個拓撲。
-l,--local 在local的模式下運行拓撲。
-n,--no-splash 抑制版權頁的輸出。
-q,--no-detail 抑制拓撲詳情的輸出。
-r,--remote 將拓撲部署到遠端的簇。
-R,--resource 使用提供的路徑來作為classpath的源文件以代替文件。
-s,--sleep <ms> 當在本地運行時,在killing一個拓撲和關閉本地簇之前需要sleep的時間(以ms為單位)
-z,--zookeeper <host:port> 當以local模式運行時,使用ZooKeeper的特定<host:port>而不是同進程的的ZooKeeper。(要求在Storm的0.9.3或之后的版本)
```
**注意:** Flux為了避免在使用到 `storm` 命令時產生命令行開關沖突,所以允許任何其他的命令行開關來表達 `storm` 這一命令。
舉例來說,你可以使用`storm`的命令開關`-c`來覆蓋拓撲配置性能。下述舉例的命令就可以運行Flux并且覆蓋`nimbus.seeds`這一配置:
```
storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --remote my_config.yaml -c 'nimbus.seeds=["localhost"]'
```
### 輸出例子
```
███████╗██╗ ██╗ ██╗██╗ ██╗
██╔════╝██║ ██║ ██║╚██╗██╔╝
█████╗ ██║ ██║ ██║ ╚███╔╝
██╔══╝ ██║ ██║ ██║ ██╔██╗
██║ ███████╗╚██████╔╝██╔╝ ██╗
╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝
+- Apache Storm -+
+- data FLow User eXperience -+
Version: 0.3.0
Parsing file: /Users/hsimpson/Projects/donut_domination/storm/shell_test.yaml
---------- TOPOLOGY DETAILS ----------
Name: shell-topology
--------------- SPOUTS ---------------
sentence-spout[1](org.apache.storm.flux.wrappers.spouts.FluxShellSpout)
---------------- BOLTS ---------------
splitsentence[1](org.apache.storm.flux.wrappers.bolts.FluxShellBolt)
log[1](org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
count[1](org.apache.storm.testing.TestWordCounter)
--------------- STREAMS ---------------
sentence-spout --SHUFFLE--> splitsentence
splitsentence --FIELDS--> count
count --SHUFFLE--> log
--------------------------------------
Submitting topology: 'shell-topology' to remote cluster...
```
## YAML配置
Flux拓撲在YAML文件中被頂迎來描述一個拓撲。一個Flux拓撲定義由以下幾項組成:
1. 一個拓撲的名字
2. 一個拓撲組件的列表(被命名為Java對象,用以可以在環境中可以調用)
3. **第三選項或者第四選項** (一個DSL拓撲定義):
* 一個spouts的列表,每一個項通過一個唯一的ID被識別
* 一個bolts的列表,每一個項通過一個唯一的ID被識別
* 一個“stream”對象的列表,用來表示spouts和bolts之間的流的元組。
4. **第三選項或者第四選項** (一個可以創建 `org.apache.storm.generated.StormTopology` 實例的JVM類):
* 一個 `topologySource` 定義。
舉個例子,這里有一個使用YAML DSL的簡單wordcount拓撲:
```
name: "yaml-topology"
config:
topology.workers: 1
# spout定義
spouts:
- id: "spout-1"
className: "org.apache.storm.testing.TestWordSpout"
parallelism: 1
# bolt定義
bolts:
- id: "bolt-1"
className: "org.apache.storm.testing.TestWordCounter"
parallelism: 1
- id: "bolt-2"
className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
parallelism: 1
# stream定義
streams:
- name: "spout-1 --> bolt-1" #name暫時未用上(可以在logging,UI等中作為placeholder)
from: "spout-1"
to: "bolt-1"
grouping:
type: FIELDS
args: ["word"]
- name: "bolt-1 --> bolt2"
from: "bolt-1"
to: "bolt-2"
grouping:
type: SHUFFLE
```
## 屬性替換/過濾
對于開發者而言,想要簡單地轉換配置是常有的事,例如在開發環境和生產環境中轉換部署。這可以通過使用分開的YAML配置文件來實現,但是這個方法會導致配置文件中多出一些多余的重復內容,尤其是在一些Storm拓撲沒有改變但是配置設置例如主機名,端口和并行性參數改變了的情況。
對于這種情況,Flux提供了屬性過濾(properties filtering)方法允許你個給一個 `.properties` 文件賦兩個具體的值,并且讓他們在 `.yaml` 文件被解析前被替代。
為了實現屬性過濾功能,使用 `--filter` 命令行選項,并且具體制定一個 `.properties` 文件。舉個例子,如果你像這樣調用flux:
```
storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_config.yaml --filter dev.properties
```
并且 `dev.properties` 內容如下:
```
kafka.zookeeper.hosts: localhost:2181
```
你在這之后就可以通過你 `.yaml` 文件中的屬性關鍵字,使用 `${}` 語法來引用它:
```
- id: "zkHosts"
className: "org.apache.storm.kafka.ZkHosts"
constructorArgs:
- "${kafka.zookeeper.hosts}"
```
在這個例子中,Flux可以在YAML內容被解析前使用 `localhost:2181` 來替換 `${kafka.zookeeper.hosts}` 。
### 環境變量替換/過濾Environment Variable Substitution/Filtering
Flux同樣也允許環境變量替換。舉個例子,如果名為`ZK_HOSTS` 的環境變量名被定義了,你可以通過以下的語法在Flux的YAML文件中引用它:
```
${ENV-ZK_HOSTS}
```
## 組件
組件從本質來說是對象實例,用來在對spouts和bolts的配置選項中獲取。如果你對Spring框架很熟悉,這里的組件大概就類比于Spring中的beans
每一個組件都是可被識別的,至少是可以通過一個唯一的標識符(字符串)和一個類名(字符串)。舉個例子,以下的例子將會創建一個 `org.apache.storm.kafka.StringScheme` 類的實例作為關鍵字 `"stringScheme"` 的引用。這里我們假設這個類 `org.apache.storm.kafka.StringScheme` 有一個默認的構造函數。
```
components:
- id: "stringScheme"
className: "org.apache.storm.kafka.StringScheme"
```
### 構造函數參數,引用,屬性和配置方法
#### 構造函數參數
為了給一個類的構造函數添加參數,我們添加 `contructorArgs` 這個元素給組件。 `contructorArgs` 是一個列表,其元素是對象。這個列表會被傳遞給類的構造函數們。以下的這個例子通過調用一個把單個字符串作為參數的構造函數來創建一個對象:
```
- id: "zkHosts"
className: "org.apache.storm.kafka.ZkHosts"
constructorArgs:
- "localhost:2181"
```
#### 引用
每一個組件實例都通過一個唯一的id可悲其他組件重復使用。為了引用一個已存在的組件,你需要在使用 `ref` 這個標簽的時候指明這個組件的id。
在以下的例子中,一個名為的組件被創建,之后將被作為另一個組件的構造函數的參數被引用:
```
components:
- id: "stringScheme"
className: "org.apache.storm.kafka.StringScheme"
- id: "stringMultiScheme"
className: "org.apache.storm.spout.SchemeAsMultiScheme"
constructorArgs:
- ref: "stringScheme" # component with id "stringScheme" must be declared above.
```
**注意:** 引用只能在對象被聲明后使用。
#### 屬性
除去允許在調用構造函數的時候傳進不同的參數,Flux同樣允許在配置組件的時候使用被聲明為 `public` 的類似JavaBean的setter方法和域:
```
- id: "spoutConfig"
className: "org.apache.storm.kafka.SpoutConfig"
constructorArgs:
# brokerHosts
- ref: "zkHosts"
# topic
- "myKafkaTopic"
# zkRoot
- "/kafkaSpout"
# id
- "myId"
properties:
- name: "ignoreZkOffsets"
value: true
- name: "scheme"
ref: "stringMultiScheme"
```
在上述的例子中,如果聲明了 `properties` ,Flux將會在 `SpoutConfig` 中找一個名字為 `setIgnoreZkOffsets(boolean b)` 的函數并試圖調用它。如果這樣的一個setter函數沒有找到,Flux就會找一個公有的叫 `ignoreZkOffsets` 的實例變量并且將它進行設置。
引用也可能被作為屬性值來使用。
#### 配置方法
從概念上來說,配置方法可能類似于屬性和構造函數的參數 —— 他們允許一個對象在創建后可以調用任意的方法。對于有一些類,它們沒有暴露JavaBean的方法或者沒有能夠將整個對象都配置好的構造函數,配置方法對這種類就十分有用。一些常見的例子包括了哪些使用構造器模式來配置/整合的類。
接下來的YAML例子創建了一個bolt并且通過幾個方法進行了配置:
```
bolts:
- id: "bolt-1"
className: "org.apache.storm.flux.test.TestBolt"
parallelism: 1
configMethods:
- name: "withFoo"
args:
- "foo"
- name: "withBar"
args:
- "bar"
- name: "withFooBar"
args:
- "foo"
- "bar"
```
對應方法的標識如下:
```
public void withFoo(String foo);
public void withBar(String bar);
public void withFooBar(String foo, String bar);
```
傳遞給配置方法的參數和構造函數中的參數作用一樣,并且也支持引用。
### 在溝早期的參數,引用,屬性和配制方法中使用Java的 `enum`s in Contructor Arguments, References, Properties and Configuration Methods
你可以在Flux YAML文件中輕松通過引用 `enum` 的名字將其值作為參數。
比如, 包含了以下 `enum` 的定義(為了簡潔而簡化過):
```
public static enum Units {
KB, MB, GB, TB
}
```
`org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy` 這個類有如下的構造器:
```
public FileSizeRotationPolicy(float count, Units units)
```
以下的Flux `component` 定義可以被用來調用這個構造器:
```
- id: "rotationPolicy"
className: "org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy"
constructorArgs:
- 5.0
- MB
```
上述的定義和下面的Java代碼從功能上來說是一樣的:
```
// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
```
## 拓撲配置
`config` 這個區段僅僅是Storm拓撲配置參數的一個圖,將會作為 `org.apache.storm.Config` 類的實例傳給 `org.apache.storm.StormSubmitter` :
```
config:
topology.workers: 4
topology.max.spout.pending: 1000
topology.message.timeout.secs: 30
```
## 已經存在的拓撲
如果你有已經存在的Storm拓撲,你仍然可以用Flux來部署/運行/測試它們。這個特點允許你按照已有的拓撲類來改變Flux構造參數,引用,屬性和拓撲配置聲明。
使用已有拓撲類最簡單的方法就是通過下面的方法定義一個名為 `getTopology()` 的實例方法:
```
public StormTopology getTopology(Map<String, Object> config)
```
或者:
```
public StormTopology getTopology(Config config)
```
你接下來就可以使用YAML來部署你的拓撲:
```
name: "existing-topology"
topologySource:
className: "org.apache.storm.flux.test.SimpleTopology"
```
如果你想用來作為拓撲源的類有一個不同的方法名(比如不叫),那么你可以把它重寫:
```
name: "existing-topology"
topologySource:
className: "org.apache.storm.flux.test.SimpleTopology"
methodName: "getTopologyWithDifferentMethodName"
```
**注意:** 這個指定的方法必須接受一個單一的類型是 `java.util.Map<String, Object>` 或者 `org.apache.storm.Config` 的類,然后返回一個 `org.apache.storm.generated.StormTopology` 對象。
## YAML DSL
## Spouts 和 Bolts
Spout和Bolts是在YAML配置中各自的區域中被配置。Spout和Bolt的定義是 `組件(component)` 定義的拓展。在組件的基礎上添加了 `并行度(parallelism)`參數,用于當一個拓撲被部署的時候設置組件的并行度。
因為spout和bolt定義繼承了 `組件(component)` ,所以它們也支持構造函數參數,引用,屬性。Because spout and bolt definitions extendthey support constructor arguments, references, and properties as well.
Shell spout的例子:
```
spouts:
- id: "sentence-spout"
className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout"
# shell spout constructor takes 2 arguments: String[], String[]
constructorArgs:
# command line
- ["node", "randomsentence.js"]
# output fields
- ["word"]
parallelism: 1
```
Kafka spout的例子:
```
components:
- id: "stringScheme"
className: "org.apache.storm.kafka.StringScheme"
- id: "stringMultiScheme"
className: "org.apache.storm.spout.SchemeAsMultiScheme"
constructorArgs:
- ref: "stringScheme"
- id: "zkHosts"
className: "org.apache.storm.kafka.ZkHosts"
constructorArgs:
- "localhost:2181"
# 可選的kafka配置
# - id: "kafkaConfig"
# className: "org.apache.storm.kafka.KafkaConfig"
# constructorArgs:
# # brokerHosts
# - ref: "zkHosts"
# # topic
# - "myKafkaTopic"
# # clientId (optional)
# - "myKafkaClientId"
- id: "spoutConfig"
className: "org.apache.storm.kafka.SpoutConfig"
constructorArgs:
# brokerHosts
- ref: "zkHosts"
# topic
- "myKafkaTopic"
# zkRoot
- "/kafkaSpout"
# id
- "myId"
properties:
- name: "ignoreZkOffsets"
value: true
- name: "scheme"
ref: "stringMultiScheme"
config:
topology.workers: 1
# spout definitions
spouts:
- id: "kafka-spout"
className: "org.apache.storm.kafka.KafkaSpout"
constructorArgs:
- ref: "spoutConfig"
```
Bolt 例子:
```
# bolt definitions
bolts:
- id: "splitsentence"
className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
constructorArgs:
# command line
- ["python", "splitsentence.py"]
# output fields
- ["word"]
parallelism: 1
# ...
- id: "log"
className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
parallelism: 1
# ...
- id: "count"
className: "org.apache.storm.testing.TestWordCounter"
parallelism: 1
# ...
```
## Streams and Stream Groupings
Flux中的“流”被表示為一列在Spouts和Bolts之間的“連接”(如圖的邊,數據流等),在連接定義的同時有一個關聯的“分組”定義。
一個“流”定義有如下的屬性:
**`name`:** 一個“連接”的名字(可選的,當下不會馬上使用)
**`from`:** 作為源頭的Spout或者Bolt的 `id`(類似于出版商)
**`to`:** 作為目的地的Spout或者Bolt的 `id` (類似于訂閱者)
**`grouping`:** 為了“流”而產生的“流分組”定義
一個“分組”定義有以下的屬性:
**`type`:** 分組的類型。下列值中任選一個 `ALL`、`CUSTOM`、`DIRECT`、`SHUFFLE`、`LOCAL_OR_SHUFFLE`、`FIELDS`、`GLOBAL`、或者 `NONE`。
**`streamId`:** Storm “流”的ID(可選的,如果沒有指定則會使用默認流)
**`args`:** 當 `type` 的值為 `FIELDS` 時,一系列域的名字。
**`customClass`** 當 `type` 的值為 `CUSTOM` 時,自定義的“分組”類實例的定義
如下例的 `流(streams)` 的定義案例建立起了一個如下的線路拓撲:
```
kafka-spout --> splitsentence --> count --> log
```
```
# 流(stream)定義
# “流”的定義定了在spouts和bolts之間的“連接”。
# 注意這樣的“連接”可能是循環的
# 自定義的“流分組”也被支持
streams:
- name: "kafka --> split" # name isn't used (placeholder for logging, UI, etc.)
from: "kafka-spout"
to: "splitsentence"
grouping:
type: SHUFFLE
- name: "split --> count"
from: "splitsentence"
to: "count"
grouping:
type: FIELDS
args: ["word"]
- name: "count --> log"
from: "count"
to: "log"
grouping:
type: SHUFFLE
```
### 自定義“流分組”
自定義的流分組是通過設置分組的類型為 `CUSTOM` 并且定義一個 `customClass` 參數,該參數告訴Flux如何實例化一個自定義類。這個 `customClass` 定義繼承自 `組件(component)`,所以它也支持構造函數參數,引用和屬性。
如下的例子創建了一個”流“,并且使用了一個類型為 `org.apache.storm.testing.NGrouping` 的自定義“流分組”類。
```
- name: "bolt-1 --> bolt2"
from: "bolt-1"
to: "bolt-2"
grouping:
type: CUSTOM
customClass:
className: "org.apache.storm.testing.NGrouping"
constructorArgs:
- 1
```
## “包含”和“重寫”
FLux允許包含其他YAML文件的內容,并且把它們當做在一個文件中定義的一樣。可以包含文件或者路徑源文件。
“包含”是通過一系列的maps來指定的:
```
includes:
- resource: false
file: "src/test/resources/configs/shell_test.yaml"
override: false
```
如果 `resource` 的值為 `true`,“包含”將會從 `file` 這個屬性值中來加載路徑源文件,否則它會被當做是普通的文件。
`override` 屬性控制著“包含”要如何影響定義在當前文件中的值。如果 `override` 的值是 `true`,那么file值將會替代現在的文件被解析。如果 `override` 的值是 `false`,那么當前文件正在解析的值會有優先權,并且解析器會拒絕將它們替換掉。
**注意:** “包含”現今不是循環的,包含文件中的包含將會被忽視。
## 基本的Word Count例子
這個例子使用了在JavaScript中實現的spout,Python中實現的bolt,和另一個在Java中實現的bolt。
拓撲 YAML 配置:
```
---
name: "shell-topology"
config:
topology.workers: 1
# spout 定義
spouts:
- id: "sentence-spout"
className: "org.apache.storm.flux.wrappers.spouts.FluxShellSpout"
# shell spout constructor takes 2 arguments: String[], String[]
constructorArgs:
# command line
- ["node", "randomsentence.js"]
# output fields
- ["word"]
parallelism: 1
# bolt 定義
bolts:
- id: "splitsentence"
className: "org.apache.storm.flux.wrappers.bolts.FluxShellBolt"
constructorArgs:
# command line
- ["python", "splitsentence.py"]
# output fields
- ["word"]
parallelism: 1
- id: "log"
className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
parallelism: 1
- id: "count"
className: "org.apache.storm.testing.TestWordCounter"
parallelism: 1
#stream 定義
# “流”定義定義了在spouts和bolts之間的連接
# 注意“連接”可能是循環的
# 自定義“流分組”也是被支持的
streams:
- name: "spout --> split" # name沒有被使用( 是logging,UI等中的占位符)
from: "sentence-spout"
to: "splitsentence"
grouping:
type: SHUFFLE
- name: "split --> count"
from: "splitsentence"
to: "count"
grouping:
type: FIELDS
args: ["word"]
- name: "count --> log"
from: "count"
to: "log"
grouping:
type: SHUFFLE
```
## Micro-Batching (Trident) API 支持
雖然目前Flux DSL只支持核心Storm API(the COre Storm API),但是對Storm的micro-batching API的支持正在計劃中。
為了和Trident拓撲一起使用Flux,在你的YAML配置中定義一個拓撲的getter方法和引用: ```yaml name: "my-trident-topology"
config: topology.workers: 1
topologySource: className: "org.apache.storm.flux.test.TridentTopologySource" # FLux將會尋找 "getTopology"方法, 這個會用來重寫之前那個 methodName: "getTopologyWithDifferentMethodName" ```
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度