# Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
## About
MQTT是IoT(Internet of Things)應用程序中經常使用的輕量經發布/訂閱協議。
Further information can be found at [http://mqtt.org](http://mqtt.org). The HiveMQ website has a great series on [MQTT Essentials](http://www.hivemq.com/mqtt-essentials/).
功能點如下:
* 完整的MQTT支持 (e.g. last will, QoS 0-2, retain, etc.)
* Spout(噴口) 實現訂閱MQTT主題
* 用于發布MQTT消息的bolt(螺栓)實現
* 用于發布MQTT消息的trident(三叉線)功能實現
* 身份驗證和TLS/SSL支持
* 用戶定義的"映射器"用于將MQTT消息轉換為元組(訂閱者)
* 用戶定義的"映射器" User-defined "mappers" 用于將元組轉換為MQTT消息(發布者)
## 快速開始
要快速查看MQTT集成操作,請按照以下說明進行操作。
**啟動 MQTT 的代理和發布者**
以下命令將在端口1883上創建一個MQTT代碼,并啟動一個隨機發布的發布者溫度/濕度值到MQTT主題。
打開終端并執行以下命令(根據需要更改路徑):
```
java -cp examples/target/storm-mqtt-examples-*-SNAPSHOT.jar org.apache.storm.mqtt.examples.MqttBrokerPublisher
```
**運行示例拓撲**
使用Flux運行示例拓撲。這將啟動由MQTT Spout(噴口)組成的本地模式集群和拓撲發布到只收錄信息的 bolt(螺栓)。
在單獨的終端中,運行以下命令(請注意,"storm" 可執行文件必須位于你的PATH上):
```
storm jar ./examples/target/storm-mqtt-examples-*-SNAPSHOT.jar org.apache.storm.flux.Flux ./examples/src/main/flux/sample.yaml --local
```
您應該可以看到來自MQTT的數據被bolt(螺栓)記錄:
```
27020 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=67.0, humidity=65.0}
27030 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=47.0, humidity=85.0}
27040 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=69.0, humidity=94.0}
27049 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=4.0, humidity=98.0}
27059 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=51.0, humidity=12.0}
27069 [Thread-17-log-executor[3 3]] INFO o.a.s.f.w.b.LogInfoBolt - {user=tgoetz, deviceId=1234, location=office, temperature=27.0, humidity=65.0}
```
允許本地集群退出,或者通過鍵入Cntrl-C 來停止。
**MQTT容錯實戰**
在拓撲關閉之后,MQTT Spout(噴口)創建的MQTT訂閱將與代理持續存在,并且它將繼續接收和排隊消息(只要代理正在運行)。
如果您再次運行拓撲(當代理程序仍在運行時),當spout(噴口)最初連接到MQTT代理時,它會收到所有的信息, 錯過的信息就當作失敗的消息。您應該看到這是一個消息的爆發,接著再以每秒兩個消息的速率顯示。
這是因為在默認情況下,MQTT Spout(噴口)在訂閱時會創建一個_會話_這意味著它要求它的代理在離線時持有并重新 提交其錯過的任何消息。另外一個重要因素是'MqttBorkerPublisher'發布MQTT Qos 為'1'的消息,這意味著_至少一次交付_。
有關MQTT容錯的更多信息,請參閱下的的**交付保證**部分。
## 交付保證
在Storm中,**_MQTT Spout(噴口)至少提供一次傳遞_**,具體取決于發布者的配置以及MQTT的spout(噴口)。
MQTT協議定義了以下QoS級別:
* `0` - 最多一次 (AKA "Fire and Forget")
* `1` - 起碼一次
* `2` - 完全一次
這里可能有點混亂,因為MQTT協議規范并沒有真正解決一個節點補一個災難性事件完全焚燒的事實。這與Storm的可靠性形成了一個鮮明的對比,該模型期望并擁抱節點的概念。
所以彈性最終取決于基礎的MQTT實現和基礎架構。
### 推薦
*你將永遠不會得到一次處理這個spout(噴口)。它可以與三叉線一起使用,但不會提供事物語義。
如果您需要可靠性的保證(即 _至少一次處理_):
1. 對于MQTT 發布者(Storm之外),發布QoS為'1'的消息,以便在spout脫機時,代理保存消息。
2. 使用spout的默認值 (`cleanSession = false` and `qos = 1`)
3. 如果可以,請確保接收和MQTT消息和任何結果是冪等的。
4. 確保您的MQTT代理不會因為網絡分區而死亡或孤立。為自然災害和人為災害及網絡分區做好準備。以及焚化和破壞的發生。
## 配置
有關完整的配置選項,請參閱JavaDoc for `org.apache.storm.mqtt.common.MqttOptions`.
### 信息映射器
要定義MQTT消息如何映射到Storm元組,您可以使用該實現配置MQTT spout `org.apache.storm.mqtt.MqttMessageMapper` 接口, 如下所示:
```
public interface MqttMessageMapper extends Serializable {
Values toValues(MqttMessage message);
Fields outputFields();
}
```
`MqttMessage` 類包含消息發布的主題("String")和消息的有效負載(`byte[]`). 例如,這是一個 `MqttMessageMapper` 實現,它基于這兩者的內容生成元組消息主題和有效負載:
```
/**
* Given a topic name: "users/{user}/{location}/{deviceId}"
* and a payload of "{temperature}/{humidity}"
* emits a tuple containing user(String), deviceId(String), location(String), temperature(float), humidity(float)
*
*/
public class CustomMessageMapper implements MqttMessageMapper {
private static final Logger LOG = LoggerFactory.getLogger(CustomMessageMapper.class);
public Values toValues(MqttMessage message) {
String topic = message.getTopic();
String[] topicElements = topic.split("/");
String[] payloadElements = new String(message.getMessage()).split("/");
return new Values(topicElements[2], topicElements[4], topicElements[3], Float.parseFloat(payloadElements[0]),
Float.parseFloat(payloadElements[1]));
}
public Fields outputFields() {
return new Fields("user", "deviceId", "location", "temperature", "humidity");
}
}
```
### 元組映射器
使用MQTT bolt(螺栓)或Trident功能發布MQTT消息時,您需要將元組數據映射到MQTT消息(主題/負載)。這是通過實現 `org.apache.storm.mqtt.MqttTupleMapper` 接口完成的:
```
public interface MqttTupleMapper extends Serializable{
MqttMessage toMessage(ITuple tuple);
}
```
例如,一個簡單的 `MqttTupleMapper` 實現可能如下所示:
```
public class MyTupleMapper implements MqttTupleMapper {
public MqttMessage toMessage(ITuple tuple) {
String topic = "users/" + tuple.getStringByField("userId") + "/" + tuple.getStringByField("device");
byte[] payload = tuple.getStringByField("message").getBytes();
return new MqttMessage(topic, payload);
}
}
```
### MQTT Spout(噴口)平行度
建議您對MQTT spout(噴口)使用并行度1,否則最終會出現多個實例的端口訂閱機同的主題,導致重復消費。
如果您要并行化spout(噴口),建議您在拓撲中使用多個spout(噴口)實例,并使用MQTT主題選擇器對數據進行分組。 如何實現分區的策略的方法最終由您的MQTT主題結構決定。舉個例子,如果你按區域劃分主題(如東/西),你可以做類似于以下內容:
```
String spout1Topic = "users/east/#";
String spout2Topic = "users/west/#";
```
然后通過預訂一個(bolt)螺栓將每個流加入到結果流中。
### 使用 Flux
以上Flux YAML 配置創建了示例中使用的拓撲:
```
name: "mqtt-topology"
components:
########## MQTT Spout Config ############
- id: "mqtt-type"
className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
- id: "mqtt-options"
className: "org.apache.storm.mqtt.common.MqttOptions"
properties:
- name: "url"
value: "tcp://localhost:1883"
- name: "topics"
value:
- "/users/tgoetz/#"
# topology configuration
config:
topology.workers: 1
topology.max.spout.pending: 1000
# spout definitions
spouts:
- id: "mqtt-spout"
className: "org.apache.storm.mqtt.spout.MqttSpout"
constructorArgs:
- ref: "mqtt-type"
- ref: "mqtt-options"
parallelism: 1
# bolt definitions
bolts:
- id: "log"
className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
parallelism: 1
streams:
- from: "mqtt-spout"
to: "log"
grouping:
type: SHUFFLE
```
### 使用 Java
同樣,你可以使用Storm Core Java API 來創建相同的拓撲結構:
```
TopologyBuilder builder = new TopologyBuilder();
MqttOptions options = new MqttOptions();
options.setTopics(Arrays.asList("/users/tgoetz/#"));
options.setCleanConnection(false);
MqttSpout spout = new MqttSpout(new StringMessageMapper(), options);
MqttBolt bolt = new LogInfoBolt();
builder.setSpout("mqtt-spout", spout);
builder.setBolt("log-bolt", bolt).shuffleGrouping("mqtt-spout");
return builder.createTopology();
```
## SSL/TLS
如果要連接的MQTT代理需要SSL或SSL客戶端身份驗證,則需要配置具有適當URI的端口以及包含必要證書的秘鑰庫/信任庫文件的位置。
### SSL/TLS URIs
要通過SSL/TLS連接使用前綴為 `ssl://` 或 `tls://` 而不是 `tcp://`. 進一步控制該算法可以指定一個特定的協議:
* `ssl://` 使用JVM默認版本的SSL 協議。
* `sslv*://` 使用特定版本的SSL協議,其中 `*` 替換為版本 (e.g. `sslv3://`)
* `tls://` 使用JVM默認版本的TLS 協議。
* `tlsv*://` 使用特定版本的TLS 協議,其中 `*` 替換為版本 (e.g. `tlsv1.1://`)
### 指定 密鑰庫/信任域的位置
`MqttSpout`, `MqttBolt` and `MqttPublishFunction` 都有構造函數,它們使用一個 `KeyStoreLoader` 實例來加載 TLS/SSL連接所需的證書。例如:
```
public MqttSpout(MqttMessageMapper type, MqttOptions options, KeyStoreLoader keyStoreLoader)
```
`DefaultKeyStoreLoader` 類可用于從本地文件系統加載證書。 請注意,密鑰庫/信任庫需要在可能執行spout(噴口)/bolt(螺栓)的所有工作節點上可用。要使用 `DefaultKeyStoreLoader` 指定密鑰庫/信任庫文件的位置,并設置必要的密碼:
```
DefaultKeyStoreLoader ksl = new DefaultKeyStoreLoader("/path/to/keystore.jks", "/path/to/truststore.jks");
ksl.setKeyStorePassword("password");
ksl.setTrustStorePassword("password");
//...
```
如果您的密鑰庫/信任庫證書存儲在單個文件中,則可以使用單參數構架函數:
```
DefaultKeyStoreLoader ksl = new DefaultKeyStoreLoader("/path/to/keystore.jks");
ksl.setKeyStorePassword("password");
//...
```
還可以使用Flux來配置SSL/TLS:
```
name: "mqtt-topology"
components:
########## MQTT Spout Config ############
- id: "mqtt-type"
className: "org.apache.storm.mqtt.examples.CustomMessageMapper"
- id: "keystore-loader"
className: "org.apache.storm.mqtt.ssl.DefaultKeyStoreLoader"
constructorArgs:
- "keystore.jks"
- "truststore.jks"
properties:
- name: "keyPassword"
value: "password"
- name: "keyStorePassword"
value: "password"
- name: "trustStorePassword"
value: "password"
- id: "mqtt-options"
className: "org.apache.storm.mqtt.common.MqttOptions"
properties:
- name: "url"
value: "ssl://raspberrypi.local:8883"
- name: "topics"
value:
- "/users/tgoetz/#"
# topology configuration
config:
topology.workers: 1
topology.max.spout.pending: 1000
# spout definitions
spouts:
- id: "mqtt-spout"
className: "org.apache.storm.mqtt.spout.MqttSpout"
constructorArgs:
- ref: "mqtt-type"
- ref: "mqtt-options"
- ref: "keystore-loader"
parallelism: 1
# bolt definitions
bolts:
- id: "log"
className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt"
parallelism: 1
streams:
- from: "mqtt-spout"
to: "log"
grouping:
type: SHUFFLE
```
## Committer Sponsors
* P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度