# Storm SQL 示例
本文通過處理 Apache 日志的例子來展示如何使用 strom SQL. 本文使用 "how-to" 風格書寫, 因此可以根據步驟, 一步一步的學習如何使用 Storm SQL.
## 準備
本文假定 Apache Zookeeper、Apache Storm、Apache Kafka 都本地安裝并且正確配置運行. 方便起見, 本文假設 Apache Kafka 0.10.0 是通過 `brew` 安裝.
我們將使用下列工具為輸入數據源生成 JSON 數據. 因為他們都是 Python 工程, 本頁假設已經安裝了 Python 2.7、`pip`、`virtualenv`. 如果你使用 Python 3, 需要在生成數據的時候修改一些與 Python 3 不兼容的代碼.
* [https://github.com/kiritbasu/Fake-Apache-Log-Generator](https://github.com/kiritbasu/Fake-Apache-Log-Generator)
* [https://github.com/rory/apache-log-parser](https://github.com/rory/apache-log-parser)
## 創建 topic
在本頁, 我們會使用3個 topic, `apache-logs`, `apache-errorlogs`, `apache-slowlogs`. 請根據你的環境來創建 topic.
對于使用 brew 安裝的 Apache Kafka 0.10.0,
```
kafka-topics --create --topic apache-logs --zookeeper localhost:2181 --replication-factor 1 --partitions 5
kafka-topics --create --topic apache-errorlogs --zookeeper localhost:2181 --replication-factor 1 --partitions 5
kafka-topics --create --topic apache-slowlogs --zookeeper localhost:2181 --replication-factor 1 --partitions 5
```
## 灌入數據
讓我們提供數據給輸入 topic. 在本頁中我們將生成假的 Apache 日志, 轉換為 JSON 格式, 并把 JSON 灌入 Kafka topic.
開始創建你的工作目錄, 用于克隆項目并且設置 virtualenv.
在你的工作目錄, 命令 `virtualenv env` 把 evn 目錄設置為 virtualenv 目錄, 然后激活虛擬環境.
```
$ virtualenv env
$ source env/bin/activate
```
完成例子以后, 可以隨時 `deactivate`, 退出 python 虛擬環境.
### 安裝和修改 Fake-Apache-Log-Generator
`Fake-Apache-Log-Generator` 對包不可見, 我們還需要修改一下腳本.
```
$ git clone https://github.com/kiritbasu/Fake-Apache-Log-Generator.git
$ cd Fake-Apache-Log-Generator
```
打開 `apache-fake-log-gen.py`, 將 `while (flag):` 語句替換成下面的語句:
```
elapsed_us = random.randint(1 * 1000,1000 * 1000) # 1 ms to 1 sec
seconds=random.randint(30,300)
increment = datetime.timedelta(seconds=seconds)
otime += increment
ip = faker.ipv4()
dt = otime.strftime('%d/%b/%Y:%H:%M:%S')
tz = datetime.datetime.now(pytz.timezone('US/Pacific')).strftime('%z')
vrb = numpy.random.choice(verb,p=[0.6,0.1,0.1,0.2])
uri = random.choice(resources)
if uri.find("apps")>0:
uri += `random.randint(1000,10000)`
resp = numpy.random.choice(response,p=[0.9,0.04,0.02,0.04])
byt = int(random.gauss(5000,50))
referer = faker.uri()
useragent = numpy.random.choice(ualist,p=[0.5,0.3,0.1,0.05,0.05] )()
f.write('%s - - [%s %s] %s "%s %s HTTP/1.0" %s %s "%s" "%s"\n' % (ip,dt,tz,elapsed_us,vrb,uri,resp,byt,referer,useragent))
log_lines = log_lines - 1
flag = False if log_lines == 0 else True
```
要確保 elapsed_us 包含在假日志中.
為了方便, 你可以跳過克隆項目這一步, 直接從這里下載修改過的文件: [apache-fake-log-gen.py (gist)](https://gist.github.com/HeartSaVioR/79fd4e461604fabecf535ffece47e6c2)
### 安裝 apache-log-parser 并編寫轉換腳本
`apache-log-parser` 模塊可以通過 `pip` 命令安裝.
```
$ pip install apache-log-parser
```
因為 `apache-log-parser` 是一個 python 庫, 為了轉換日志我們需要寫編寫一個小腳本. 我們創建一個文件 `parse-fake-log-gen-to-json-with-incrementing-id.py` 包含以下內容:
```
import sys
import apache_log_parser
import json
auto_incr_id = 1
parser_format = '%a - - %t %D "%r" %s %b "%{Referer}i" "%{User-Agent}i"'
line_parser = apache_log_parser.make_parser(parser_format)
while True:
# we'll use pipe
line = sys.stdin.readline()
if not line:
break
parsed_dict = line_parser(line)
parsed_dict['id'] = auto_incr_id
auto_incr_id += 1
# works only python 2, but I don't care cause it's just a test module :)
parsed_dict = {k.upper(): v for k, v in parsed_dict.iteritems() if not k.endswith('datetimeobj')}
print json.dumps(parsed_dict)
```
### 將轉換后的 JSON Apache Log 灌入 Kafka
好了! 我們已經準備好將數據寫入 Kafka topic. 下面使用 `kafka-console-producer` 來灌入 JSON.
```
$ python apache-fake-log-gen.py -n 0 | python parse-fake-log-gen-to-json-with-incrementing-id.py | kafka-console-producer --broker-list localhost:9092 --topic apache-logs
```
打開另一個終端執行下面的命令, 確認數據已經進入 topic.
```
$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-logs
```
如果看到如下的 json, 就說明搞定了:
```
{"TIME_US": "757467", "REQUEST_FIRST_LINE": "GET /wp-content HTTP/1.0", "REQUEST_METHOD": "GET", "RESPONSE_BYTES_CLF": "4988", "TIME_RECEIVED_ISOFORMAT": "2021-06-30T22:02:53", "TIME_RECEIVED_TZ_ISOFORMAT": "2021-06-30T22:02:53-07:00", "REQUEST_HTTP_VER": "1.0", "REQUEST_HEADER_USER_AGENT__BROWSER__FAMILY": "Firefox", "REQUEST_HEADER_USER_AGENT__IS_MOBILE": false, "REQUEST_HEADER_USER_AGENT__BROWSER__VERSION_STRING": "3.6.13", "REQUEST_URL_FRAGMENT": "", "REQUEST_HEADER_USER_AGENT": "Mozilla/5.0 (X11; Linux x86_64; rv:1.9.7.20) Gecko/2010-10-13 13:52:34 Firefox/3.6.13", "REQUEST_URL_SCHEME": "", "REQUEST_URL_PATH": "/wp-content", "REQUEST_URL_QUERY_SIMPLE_DICT": {}, "TIME_RECEIVED_UTC_ISOFORMAT": "2021-07-01T05:02:53+00:00", "REQUEST_URL_QUERY_DICT": {}, "STATUS": "200", "REQUEST_URL_NETLOC": "", "REQUEST_URL_QUERY_LIST": [], "REQUEST_URL_QUERY": "", "REQUEST_URL_USERNAME": null, "REQUEST_HEADER_USER_AGENT__OS__VERSION_STRING": "", "REQUEST_URL_HOSTNAME": null, "REQUEST_HEADER_USER_AGENT__OS__FAMILY": "Linux", "REQUEST_URL": "/wp-content", "ID": 904128, "REQUEST_HEADER_REFERER": "http://white.com/terms/", "REQUEST_URL_PORT": null, "REQUEST_URL_PASSWORD": null, "TIME_RECEIVED": "[30/Jun/2021:22:02:53 -0700]", "REMOTE_IP": "88.203.90.62"}
```
## 例子: 過濾錯誤日志
在這個例子中, 我們將從所有的日志中過濾出 error 日志并且存儲到另一個 topic 中. 將會用到 `project` 和 `filter` 特性.
腳本文件的內容如下:
```
CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://localhost:2181/brokers?topic=apache-logs'
CREATE EXTERNAL TABLE APACHE_ERROR_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_ELAPSED_MS INT) LOCATION 'kafka://localhost:2181/brokers?topic=apache-error-logs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
INSERT INTO APACHE_ERROR_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, (TIME_US / 1000) AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (CAST(STATUS AS INT) / 100) >= 4
```
把文件保存為 `apache_log_error_filtering.sql`.
讓我們過一遍這個腳本.
第一個語句定義了一個表 `APACHE_LOGS` 代表輸入流. `LOCATION` 從句指定了 ZkHost (`localhost:2181`), brokers路徑(`/brokers`) 和 topic (`apache-logs`). 注意 Kafka 數據源必須定義一個主鍵. 這就是為什么我們為 JSON 數據設置了一個整數 id.
同樣, 第二個語句指定了表 `APACHE_ERROR_LOGS` 代表輸出流. `TBLPROPERTIES` 從句指定了[KafkaProducer](http://kafka.apache.org/documentation.html#producerconfigs)的配置, 從句對于 Kafka sink表 是必須的.
最后的語句定義了一個 topology. Storm SQL 只會在 DML 語句上定義和運行 topology. DDL 語句定義輸入數據源、輸出數據源、以及可以被 DML 語句引用的用戶定義函數(user defined function).
L我們先看 where 語句. 由于我們想過濾 error 日志, 我們使用狀態碼除以 100, 驗證得到的商是否等于或者大于4.(簡單的說就是 statu_code >= 400) 由于JSON中的狀態碼是字符串格式(因此在 APACHE_LOGS 表中是 VARCHAR 格式). 我們在應用除法之前, 使用 CAST(STATUS AS INT) 先把狀態碼轉換為整數. 現在我們只有 error 日志了.
我們轉換一些列以和輸出流想匹配. 在這個語句中, 我們使用 CAST(STATUS AS INT) 轉化為整數類型, 然后使用 1000 除 TIME_US 將毫秒轉換成秒.
最后, insert 語句將過濾和轉換后的行(tuples)存入輸出流.
要運行這個例子, 用戶需要包含數據源(本例中是 `storm-sql-kafka`) 和的所有依賴到 class path 中. 當用戶運行 `storm sql` 命令的時候 Storm SQL 的依賴會被自動處理. 用戶可以在提交階段包含數據源依賴, 如下:
```
$ $STORM_DIR/bin/storm sql apache_log_error_filtering.sql apache_log_error_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
```
上面的命令提交 SQL 語句到 StormSQL. storm sql 命令的選項是 `storm sql [script file] [topology name]`. 如果用戶使用了不同版本的 Storm 或者 Kafka ,需要修改每個 artifacts 的版本號與之對應.
如果你的語句通過了驗證階段, 會在 Storm UI 頁面上顯示 topology.
你可以在控制臺上看到下面的輸出:
```
$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-error-logs
```
輸出類似下面的內容:
```
{"ID":854643,"REMOTE_IP":"4.227.214.159","REQUEST_URL":"/wp-content","REQUEST_METHOD":"GET","STATUS":404,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Windows 98; Win 9x 4.90; it-IT; rv:1.9.2.20) Gecko/2015-06-03 11:20:16 Firefox/3.6.17","TIME_RECEIVED_UTC_ISOFORMAT":"2021-03-28T19:14:44+00:00","TIME_RECEIVED_TIMESTAMP":1616958884000,"TIME_ELAPSED_MS":274.222} {"ID":854693,"REMOTE_IP":"223.50.249.7","REQUEST_URL":"/apps/cart.jsp?appID=5578","REQUEST_METHOD":"GET","STATUS":404,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_6; rv:1.9.2.20) Gecko/2015-11-06 00:20:43 Firefox/3.8","TIME_RECEIVED_UTC_ISOFORMAT":"2021-03-28T21:41:02+00:00","TIME_RECEIVED_TIMESTAMP":1616967662000,"TIME_ELAPSED_MS":716.851} ...
```
你可以運行 Storm SQL runner, 將 topology 名稱替換為 `--explain` 來查看邏輯執行計劃.
```
$ $STORM_DIR/bin/storm sql apache_log_error_filtering.sql --explain --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
```
輸入類似下面的內容:
```
LogicalTableModify(table=[[APACHE_ERROR_LOGS]], operation=[INSERT], updateColumnList=[[]], flattened=[true]), id = 8
LogicalProject(ID=[$0], REMOTE_IP=[$1], REQUEST_URL=[$2], REQUEST_METHOD=[$3], STATUS=[CAST($4):INTEGER NOT NULL], REQUEST_HEADER_USER_AGENT=[$5], TIME_RECEIVED_UTC_ISOFORMAT=[$6], TIME_ELAPSED_MS=[/($7, 1000)]), id = 7
LogicalFilter(condition=[>=(/(CAST($4):INTEGER NOT NULL, 100), 4)]), id = 6
EnumerableTableScan(table=[[APACHE_LOGS]]), id = 5
```
如果 Storm SQL 應用了查詢優化, 你可能看到的輸出會和上面的不一樣.
我們正在執行第一個 Storm SQL topology! 如果你看到了足夠多的輸出和日志, 請殺掉 topology.
為了簡潔, 我們不再解釋我們已經看到的東西.
## 例子: 過濾訪問慢的日志
在這個例子中我們將過濾訪問慢的日志, 把他們存儲到另一個 topic. 用到的特性有 `project`、`filter`、`User Defined Function (UDF)`. 這個例子與上一個例子 `filtering error logs` 非常相似, 我們主要看如何定義 `User Defined Function (UDF)`.
腳本文件的內容如下:
```
CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://localhost:2181/brokers?topic=apachelogs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
CREATE EXTERNAL TABLE APACHE_SLOW_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_RECEIVED_TIMESTAMP BIGINT, TIME_ELAPSED_MS INT) LOCATION 'kafka://localhost:2181/brokers?topic=apacheslowlogs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}'
CREATE FUNCTION GET_TIME AS 'org.apache.storm.sql.runtime.functions.scalar.datetime.GetTime2'
INSERT INTO APACHE_SLOW_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, GET_TIME(TIME_RECEIVED_UTC_ISOFORMAT, 'yyyy-MM-dd''T''HH:mm:ssZZ') AS TIME_RECEIVED_TIMESTAMP, TIME_US / 1000 AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (TIME_US / 1000) >= 100
```
內容保存為文件 `apache_log_slow_filtering.sql`.
由于前兩個語句和上一個例子相似, 我們直接跳過.
第三個語句定義了一個 `User defined function`. 我們使用 `org.apache.storm.sql.runtime.functions.scalar.datetime.GetTime2` 定義了一個 `GET_TIME`.
`GetTime2` 函數的實現如下:
```
package org.apache.storm.sql.runtime.functions.scalar.datetime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
public class GetTime2 {
public static Long evaluate(String dateString, String dateFormat) {
try {
DateTimeFormatter df = DateTimeFormat.forPattern(dateFormat).withZoneUTC();
return df.parseDateTime(dateString).getMillis();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
```
由于這個類定義了靜態方法 `evaluate` 可以用于 UDF. SQL 的參數和返回的類型取決于 Storm SQL 依賴哪種風格.
注意, 這個類應該放在 classpath 路徑下, 因此為了定義 UDF, 你需要創建一個包含 UDF 類的 jar 文件, 并在執行 `storm sql` 命令的時候使用 `-- jar` 選項.
最后一個語句和過濾錯誤日志的例子相似. 唯一新鮮的東西就是我們調用了 `GET_TIME(TIME_RECEIVED_UTC_ISOFORMAT, 'yyyy-MM-dd''T''HH:mm:ssZZ')` 將字符串格式的時間轉換為 unix timestamp (BIGINT).
執行:
```
$ $STORM_DIR/bin/storm sql apache_log_slow_filtering.sql apache_log_slow_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
```
可以在控制臺看到下面的輸出:
```
$ kafka-console-consumer --zookeeper localhost:2181 --topic apache-slow-logs
輸出類似下面的內容:
```
{"ID":890502,"REMOTE_IP":"136.156.159.160","REQUEST_URL":"/list","REQUEST_METHOD":"GET","STATUS":200,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Windows NT 5.01) AppleWebKit/5311 (KHTML, like Gecko) Chrome/13.0.860.0 Safari/5311","TIME_RECEIVED_UTC_ISOFORMAT":"2021-06-05T03:44:59+00:00","TIME_RECEIVED_TIMESTAMP":1622864699000,"TIME_ELAPSED_MS":638.579} {"ID":890542,"REMOTE_IP":"105.146.3.190","REQUEST_URL":"/search/tag/list","REQUEST_METHOD":"DELETE","STATUS":200,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (X11; Linux i686) AppleWebKit/5332 (KHTML, like Gecko) Chrome/13.0.891.0 Safari/5332","TIME_RECEIVED_UTC_ISOFORMAT":"2021-06-05T05:54:27+00:00","TIME_RECEIVED_TIMESTAMP":1622872467000,"TIME_ELAPSED_MS":403.957} ... ```
好了! 假設我們有通過遠程 IP 查詢 geo 信息的 UDF, 我們可以通過 geo 位置做過濾, 或者將 geo 位置添加到轉換結果中.
## Summary
我們通讀了幾個 Storm SQL 的簡單的用例來學習 Storm SQL 的特性. 如果還沒有看過[Storm SQL integration](storm-sql.html) 和 [Storm SQL language](storm-sql-reference.html), 你需要閱讀這些章節來查看所有支持的特性.
注意, Storm SQL 運行在小批量和非強類型的 Trident 庫之上. Sink 實際并不檢查類型. (你可能注意到一些輸出字段的類型與輸出表模式的定義不同).
當 Storm SQL 的后端 API 修改為核心(tuple by tuple, low-level, high-level) 時, Storm SQL 的行為會相應改變.
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度