# Storm SQL 集成
Storm SQL 使用戶在 Storm 中的流數據上運行 SQL 查詢. SQL 接口不僅可以加快流數據分析的開發周期, 同時還創造了一個機遇, 統一如 [Apache Hive](///hive.apache.org) 和實時流數據分析之類的批量數據處理.
在很高的級別, StromSQL 把 SQL 編譯為 [Trident](Trident-API-Overview.html) 拓撲并在 Strom 集群中執行. 本文檔提供了作為一個末端用戶如何使用 StormSQL 的相關信息. 對于想更深入了解 StormSQL 的設計和實現的朋友請參考[這個](storm-sql-internal.html) 頁面.
Storm SQL 是一個 `試驗性` 的功能, 因此其內部邏輯和支持的特性可能在將來會有變化. 但是小的改動不會影響用戶體驗. 在引入 UX 更改時,我們會提醒和通知用戶.
## 使用
運行 `storm sql` 命令把 SQL 語句編譯為 Trident topology, 并且提交到 Storm 集群.
```
$ bin/storm sql <sql-file> <topo-name>
```
`sql-file` 文件中包含需要被執行的 SQL 語句的列表, `topo-name` 是 topology 的名稱.
當用戶把 `topo-name` 設置為 `--explain` 的時候, StormSQL 激活 `explain mode` 以顯示查詢計劃而不是提交拓撲. 詳細的解釋請參見 `顯示查詢計劃(explain mode)` 一節.
## 支持的特性
當前版本支持以下特性:
* 讀出和流入外部數據源
* 過濾 tuples
* 投影
* 用戶自定義函數 (標量)
特意不支持聚合和連接. 當 Storm SQL 要支持本地 `Streaming SQL` 時, 將會介紹這些特性.
## 指定外部數據源
在 StormSQL中, 數據表現為外部表. 用戶可以使用語句 `CREATE EXTERNAL TABLE` 指定數據源. `CREATE EXTERNAL TABLE` 語法與 [Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL)中的非常接近.
```
CREATE EXTERNAL TABLE table_name field_list
[ STORED AS
INPUTFORMAT input_format_classname
OUTPUTFORMAT output_format_classname
]
LOCATION location
[ PARALLELISM parallelism ]
[ TBLPROPERTIES tbl_properties ]
[ AS select_stmt ]
```
各種屬性的詳細解釋參考 [Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL).
`PARALLELISM` 是 StormSQL 特有的關鍵詞, 用于描述輸入數據源的并行度. 等同于為 Trident Spout 設置并行度.
默認值是 1, 這個選項對于輸出數據源沒有任何影響. (如果需要的話, 以后可能會改變. 正常情況下應當避免重新分區).
例如, 下面的語句指定了一個 Kafka Spout 和 sink:
```
CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://localhost:2181/brokers?topic=test' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.org.apache.storm.kafka.ByteBufferSerializer"}}'
```
## 植入外部數據源
用戶通過實現 `ISqlTridentDataSource` 接口并且利用 Java 的 service loader 機制注冊他們, 以植入外部數據源. 外部數據源將根據表的 URI模式 進行選擇. 更多細節請參考 `storm-sql-kafka`.
## 指定 User Defined Function (UDF)
用戶可以使用 `CREATE FUNCTION` 語句來定義 user defined function (標量 或者 聚合). 例如, 下面的語句使用`org.apache.storm.sql.TestUtils$MyPlus` 類定義了一個名為 `MYPLUS` 的函數.
```
CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus'
```
Storm SQL 通過檢查用了什么方法來決定這個函數作為一個 標量 還是 聚合. 如果類中定義了 `evaluate` 方法, Storm SQL 將這個函數作為 `scalar`.
標量函數類的示例:
```
public class MyPlus {
public static Integer evaluate(Integer x, Integer y) {
return x + y;
}
}
```
## 例子: 過濾 Kafka 流
假設有一個 Kafka stream 代表訂單交易. 每個 stream 中的消息包含訂單的 id, 產品的單價, 產品數量. 目標是過濾重要交易的訂單(譯注:總價格大于50的訂單),并將這些訂單插入到另一個 Kafka stream 用于進一步分析.
用戶可以在 SQL 文件中指定下列 SQL 語句:
```
CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders'
CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 'kafka://localhost:2181/brokers?topic=large_orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.org.apache.storm.kafka.ByteBufferSerializer"}}'
INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY > 50
```
第一個語句定義一個表 `ORDER` 代表輸入流. `LOCATION` 從句指定 ZkHost (`localhost:2181`), broker的路徑(`/brokers`), 和 topic名稱(`orders`).
類似的, 第二個語句指定了表 `LARGE_ORDERS` 代表一個輸出流. `TBLPROPERTIES` 從句指定了一個 [KafkaProducer](http://kafka.apache.org/documentation.html#producerconfigs) 的配置, 這個從句是 Kafka sink 表必須的.
第三個語句是一個定義拓撲的 `SELECT` 語句: 它指示 StormSQL 過濾 `ORDERS` 表中的所有訂單, 計算各訂單總價并將匹配的記錄插入 `LARGE_ORDER` 指定的 Kafka流 中.
要想運行這個例子, 用戶需要在 classpath 中包含數據源 (本例中 `storm-sql-kafka`)和它的所有依賴. 當運行 `storm sql` 的時候 Storm SQL 的依賴會自動處理. 用戶可以在提交的步驟中包含數據源依賴, 如下所示:
```
$ bin/storm sql order_filtering.sql order_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
```
上面的命令提交 SQL 語句到 StormSQL. 如果用戶使用了不同版本的 Storm 或者 Kafka, 需要替換每個 artifacts 的版本.
現在, 應該能在 Storm UI 中看到 `order_filtering` 拓撲.
## 顯示查詢計劃(explain mode)
就像 SQL 語句上的 `explain`, StormSQL 在運行 Storm SQL 執行器時提供 `explain mode`. 在分析模式下, StormSQL 分析每一個查詢語句(僅DML)并顯示執行計劃而不是提交拓撲.
為了運行 `explain mode`, 需要設置拓撲名稱為 `--explain` 并像用和提交相同的方式執行 `storm sql` 命令.
例如, 當以分析模式運行上面的例子的時:
```
$ bin/storm sql order_filtering.sql --explain --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2\!org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
```
StormSQL 輸出打印如下:
```
===========================================================
query>
CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
-----------------------------------------------------------
16:53:43.951 [main] INFO o.a.s.s.r.DataSourcesRegistry - Registering scheme kafka with org.apache.storm.sql.kafka.KafkaDataSourcesProvider@4d1bf319
No plan presented on DDL
===========================================================
===========================================================
query>
CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 'kafka://localhost:2181/brokers?topic=large_orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
-----------------------------------------------------------
No plan presented on DDL
===========================================================
===========================================================
query>
INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY > 50
-----------------------------------------------------------
plan>
LogicalTableModify(table=[[LARGE_ORDERS]], operation=[INSERT], updateColumnList=[[]], flattened=[true]), id = 8
LogicalProject(ID=[$0], TOTAL=[*($1, $2)]), id = 7
LogicalFilter(condition=[>(*($1, $2), 50)]), id = 6
EnumerableTableScan(table=[[ORDERS]]), id = 5
===========================================================
```
## 局限
* Windowing 尚未實現.
* 不支持聚合和連接(待到 `流SQL` 成熟)
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度