<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                # Storm SQL 示例 本文通過處理 Apache 日志的例子來展示如何使用 strom SQL. 本文使用 "how-to" 風格書寫, 因此可以根據步驟, 一步一步的學習如何使用 Storm SQL. ## 準備 本文假定 Apache Zookeeper、Apache Storm、Apache Kafka 都本地安裝并且正確配置運行. 方便起見, 本文假設 Apache Kafka 0.10.0 是通過 `brew` 安裝. 我們將使用下列工具為輸入數據源生成 JSON 數據. 因為他們都是 Python 工程, 本頁假設已經安裝了 Python 2.7、`pip`、`virtualenv`. 如果你使用 Python 3, 需要在生成數據的時候修改一些與 Python 3 不兼容的代碼. * [https://github.com/kiritbasu/Fake-Apache-Log-Generator](https://github.com/kiritbasu/Fake-Apache-Log-Generator) * [https://github.com/rory/apache-log-parser](https://github.com/rory/apache-log-parser) ## 創建 topic 在本頁, 我們會使用3個 topic, `apache-logs`, `apache-errorlogs`, `apache-slowlogs`. 請根據你的環境來創建 topic. 對于使用 brew 安裝的 Apache Kafka 0.10.0, ``` kafka-topics --create --topic apache-logs --zookeeper localhost:2181 --replication-factor 1 --partitions 5 kafka-topics --create --topic apache-errorlogs --zookeeper localhost:2181 --replication-factor 1 --partitions 5 kafka-topics --create --topic apache-slowlogs --zookeeper localhost:2181 --replication-factor 1 --partitions 5 ``` ## 灌入數據 讓我們提供數據給輸入 topic. 在本頁中我們將生成假的 Apache 日志, 轉換為 JSON 格式, 并把 JSON 灌入 Kafka topic. 開始創建你的工作目錄, 用于克隆項目并且設置 virtualenv. 在你的工作目錄, 命令 `virtualenv env` 把 evn 目錄設置為 virtualenv 目錄, 然后激活虛擬環境. ``` $ virtualenv env $ source env/bin/activate ``` 完成例子以后, 可以隨時 `deactivate`, 退出 python 虛擬環境. ### 安裝和修改 Fake-Apache-Log-Generator `Fake-Apache-Log-Generator` 對包不可見, 我們還需要修改一下腳本. ``` $ git clone https://github.com/kiritbasu/Fake-Apache-Log-Generator.git $ cd Fake-Apache-Log-Generator ``` 打開 `apache-fake-log-gen.py`, 將 `while (flag):` 語句替換成下面的語句: ``` elapsed_us = random.randint(1 * 1000,1000 * 1000) # 1 ms to 1 sec seconds=random.randint(30,300) increment = datetime.timedelta(seconds=seconds) otime += increment ip = faker.ipv4() dt = otime.strftime('%d/%b/%Y:%H:%M:%S') tz = datetime.datetime.now(pytz.timezone('US/Pacific')).strftime('%z') vrb = numpy.random.choice(verb,p=[0.6,0.1,0.1,0.2]) uri = random.choice(resources) if uri.find("apps")>0: uri += `random.randint(1000,10000)` resp = numpy.random.choice(response,p=[0.9,0.04,0.02,0.04]) byt = int(random.gauss(5000,50)) referer = faker.uri() useragent = numpy.random.choice(ualist,p=[0.5,0.3,0.1,0.05,0.05] )() f.write('%s - - [%s %s] %s "%s %s HTTP/1.0" %s %s "%s" "%s"\n' % (ip,dt,tz,elapsed_us,vrb,uri,resp,byt,referer,useragent)) log_lines = log_lines - 1 flag = False if log_lines == 0 else True ``` 要確保 elapsed_us 包含在假日志中. 為了方便, 你可以跳過克隆項目這一步, 直接從這里下載修改過的文件: [apache-fake-log-gen.py (gist)](https://gist.github.com/HeartSaVioR/79fd4e461604fabecf535ffece47e6c2) ### 安裝 apache-log-parser 并編寫轉換腳本 `apache-log-parser` 模塊可以通過 `pip` 命令安裝. ``` $ pip install apache-log-parser ``` 因為 `apache-log-parser` 是一個 python 庫, 為了轉換日志我們需要寫編寫一個小腳本. 我們創建一個文件 `parse-fake-log-gen-to-json-with-incrementing-id.py` 包含以下內容: ``` import sys import apache_log_parser import json auto_incr_id = 1 parser_format = '%a - - %t %D "%r" %s %b "%{Referer}i" "%{User-Agent}i"' line_parser = apache_log_parser.make_parser(parser_format) while True: # we'll use pipe line = sys.stdin.readline() if not line: break parsed_dict = line_parser(line) parsed_dict['id'] = auto_incr_id auto_incr_id += 1 # works only python 2, but I don't care cause it's just a test module :) parsed_dict = {k.upper(): v for k, v in parsed_dict.iteritems() if not k.endswith('datetimeobj')} print json.dumps(parsed_dict) ``` ### 將轉換后的 JSON Apache Log 灌入 Kafka 好了! 我們已經準備好將數據寫入 Kafka topic. 下面使用 `kafka-console-producer` 來灌入 JSON. ``` $ python apache-fake-log-gen.py -n 0 | python parse-fake-log-gen-to-json-with-incrementing-id.py | kafka-console-producer --broker-list localhost:9092 --topic apache-logs ``` 打開另一個終端執行下面的命令, 確認數據已經進入 topic. ``` $ kafka-console-consumer --zookeeper localhost:2181 --topic apache-logs ``` 如果看到如下的 json, 就說明搞定了: ``` {"TIME_US": "757467", "REQUEST_FIRST_LINE": "GET /wp-content HTTP/1.0", "REQUEST_METHOD": "GET", "RESPONSE_BYTES_CLF": "4988", "TIME_RECEIVED_ISOFORMAT": "2021-06-30T22:02:53", "TIME_RECEIVED_TZ_ISOFORMAT": "2021-06-30T22:02:53-07:00", "REQUEST_HTTP_VER": "1.0", "REQUEST_HEADER_USER_AGENT__BROWSER__FAMILY": "Firefox", "REQUEST_HEADER_USER_AGENT__IS_MOBILE": false, "REQUEST_HEADER_USER_AGENT__BROWSER__VERSION_STRING": "3.6.13", "REQUEST_URL_FRAGMENT": "", "REQUEST_HEADER_USER_AGENT": "Mozilla/5.0 (X11; Linux x86_64; rv:1.9.7.20) Gecko/2010-10-13 13:52:34 Firefox/3.6.13", "REQUEST_URL_SCHEME": "", "REQUEST_URL_PATH": "/wp-content", "REQUEST_URL_QUERY_SIMPLE_DICT": {}, "TIME_RECEIVED_UTC_ISOFORMAT": "2021-07-01T05:02:53+00:00", "REQUEST_URL_QUERY_DICT": {}, "STATUS": "200", "REQUEST_URL_NETLOC": "", "REQUEST_URL_QUERY_LIST": [], "REQUEST_URL_QUERY": "", "REQUEST_URL_USERNAME": null, "REQUEST_HEADER_USER_AGENT__OS__VERSION_STRING": "", "REQUEST_URL_HOSTNAME": null, "REQUEST_HEADER_USER_AGENT__OS__FAMILY": "Linux", "REQUEST_URL": "/wp-content", "ID": 904128, "REQUEST_HEADER_REFERER": "http://white.com/terms/", "REQUEST_URL_PORT": null, "REQUEST_URL_PASSWORD": null, "TIME_RECEIVED": "[30/Jun/2021:22:02:53 -0700]", "REMOTE_IP": "88.203.90.62"} ``` ## 例子: 過濾錯誤日志 在這個例子中, 我們將從所有的日志中過濾出 error 日志并且存儲到另一個 topic 中. 將會用到 `project` 和 `filter` 特性. 腳本文件的內容如下: ``` CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://localhost:2181/brokers?topic=apache-logs' CREATE EXTERNAL TABLE APACHE_ERROR_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_ELAPSED_MS INT) LOCATION 'kafka://localhost:2181/brokers?topic=apache-error-logs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' INSERT INTO APACHE_ERROR_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, (TIME_US / 1000) AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (CAST(STATUS AS INT) / 100) >= 4 ``` 把文件保存為 `apache_log_error_filtering.sql`. 讓我們過一遍這個腳本. 第一個語句定義了一個表 `APACHE_LOGS` 代表輸入流. `LOCATION` 從句指定了 ZkHost (`localhost:2181`), brokers路徑(`/brokers`) 和 topic (`apache-logs`). 注意 Kafka 數據源必須定義一個主鍵. 這就是為什么我們為 JSON 數據設置了一個整數 id. 同樣, 第二個語句指定了表 `APACHE_ERROR_LOGS` 代表輸出流. `TBLPROPERTIES` 從句指定了[KafkaProducer](http://kafka.apache.org/documentation.html#producerconfigs)的配置, 從句對于 Kafka sink表 是必須的. 最后的語句定義了一個 topology. Storm SQL 只會在 DML 語句上定義和運行 topology. DDL 語句定義輸入數據源、輸出數據源、以及可以被 DML 語句引用的用戶定義函數(user defined function). L我們先看 where 語句. 由于我們想過濾 error 日志, 我們使用狀態碼除以 100, 驗證得到的商是否等于或者大于4.(簡單的說就是 statu_code &gt;= 400) 由于JSON中的狀態碼是字符串格式(因此在 APACHE_LOGS 表中是 VARCHAR 格式). 我們在應用除法之前, 使用 CAST(STATUS AS INT) 先把狀態碼轉換為整數. 現在我們只有 error 日志了. 我們轉換一些列以和輸出流想匹配. 在這個語句中, 我們使用 CAST(STATUS AS INT) 轉化為整數類型, 然后使用 1000 除 TIME_US 將毫秒轉換成秒. 最后, insert 語句將過濾和轉換后的行(tuples)存入輸出流. 要運行這個例子, 用戶需要包含數據源(本例中是 `storm-sql-kafka`) 和的所有依賴到 class path 中. 當用戶運行 `storm sql` 命令的時候 Storm SQL 的依賴會被自動處理. 用戶可以在提交階段包含數據源依賴, 如下: ``` $ $STORM_DIR/bin/storm sql apache_log_error_filtering.sql apache_log_error_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2" ``` 上面的命令提交 SQL 語句到 StormSQL. storm sql 命令的選項是 `storm sql [script file] [topology name]`. 如果用戶使用了不同版本的 Storm 或者 Kafka ,需要修改每個 artifacts 的版本號與之對應. 如果你的語句通過了驗證階段, 會在 Storm UI 頁面上顯示 topology. 你可以在控制臺上看到下面的輸出: ``` $ kafka-console-consumer --zookeeper localhost:2181 --topic apache-error-logs ``` 輸出類似下面的內容: ``` {"ID":854643,"REMOTE_IP":"4.227.214.159","REQUEST_URL":"/wp-content","REQUEST_METHOD":"GET","STATUS":404,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Windows 98; Win 9x 4.90; it-IT; rv:1.9.2.20) Gecko/2015-06-03 11:20:16 Firefox/3.6.17","TIME_RECEIVED_UTC_ISOFORMAT":"2021-03-28T19:14:44+00:00","TIME_RECEIVED_TIMESTAMP":1616958884000,"TIME_ELAPSED_MS":274.222} {"ID":854693,"REMOTE_IP":"223.50.249.7","REQUEST_URL":"/apps/cart.jsp?appID=5578","REQUEST_METHOD":"GET","STATUS":404,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_6_6; rv:1.9.2.20) Gecko/2015-11-06 00:20:43 Firefox/3.8","TIME_RECEIVED_UTC_ISOFORMAT":"2021-03-28T21:41:02+00:00","TIME_RECEIVED_TIMESTAMP":1616967662000,"TIME_ELAPSED_MS":716.851} ... ``` 你可以運行 Storm SQL runner, 將 topology 名稱替換為 `--explain` 來查看邏輯執行計劃. ``` $ $STORM_DIR/bin/storm sql apache_log_error_filtering.sql --explain --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2" ``` 輸入類似下面的內容: ``` LogicalTableModify(table=[[APACHE_ERROR_LOGS]], operation=[INSERT], updateColumnList=[[]], flattened=[true]), id = 8 LogicalProject(ID=[$0], REMOTE_IP=[$1], REQUEST_URL=[$2], REQUEST_METHOD=[$3], STATUS=[CAST($4):INTEGER NOT NULL], REQUEST_HEADER_USER_AGENT=[$5], TIME_RECEIVED_UTC_ISOFORMAT=[$6], TIME_ELAPSED_MS=[/($7, 1000)]), id = 7 LogicalFilter(condition=[>=(/(CAST($4):INTEGER NOT NULL, 100), 4)]), id = 6 EnumerableTableScan(table=[[APACHE_LOGS]]), id = 5 ``` 如果 Storm SQL 應用了查詢優化, 你可能看到的輸出會和上面的不一樣. 我們正在執行第一個 Storm SQL topology! 如果你看到了足夠多的輸出和日志, 請殺掉 topology. 為了簡潔, 我們不再解釋我們已經看到的東西. ## 例子: 過濾訪問慢的日志 在這個例子中我們將過濾訪問慢的日志, 把他們存儲到另一個 topic. 用到的特性有 `project`、`filter`、`User Defined Function (UDF)`. 這個例子與上一個例子 `filtering error logs` 非常相似, 我們主要看如何定義 `User Defined Function (UDF)`. 腳本文件的內容如下: ``` CREATE EXTERNAL TABLE APACHE_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS VARCHAR, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_US DOUBLE) LOCATION 'kafka://localhost:2181/brokers?topic=apachelogs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' CREATE EXTERNAL TABLE APACHE_SLOW_LOGS (ID INT PRIMARY KEY, REMOTE_IP VARCHAR, REQUEST_URL VARCHAR, REQUEST_METHOD VARCHAR, STATUS INT, REQUEST_HEADER_USER_AGENT VARCHAR, TIME_RECEIVED_UTC_ISOFORMAT VARCHAR, TIME_RECEIVED_TIMESTAMP BIGINT, TIME_ELAPSED_MS INT) LOCATION 'kafka://localhost:2181/brokers?topic=apacheslowlogs' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' CREATE FUNCTION GET_TIME AS 'org.apache.storm.sql.runtime.functions.scalar.datetime.GetTime2' INSERT INTO APACHE_SLOW_LOGS SELECT ID, REMOTE_IP, REQUEST_URL, REQUEST_METHOD, CAST(STATUS AS INT) AS STATUS_INT, REQUEST_HEADER_USER_AGENT, TIME_RECEIVED_UTC_ISOFORMAT, GET_TIME(TIME_RECEIVED_UTC_ISOFORMAT, 'yyyy-MM-dd''T''HH:mm:ssZZ') AS TIME_RECEIVED_TIMESTAMP, TIME_US / 1000 AS TIME_ELAPSED_MS FROM APACHE_LOGS WHERE (TIME_US / 1000) >= 100 ``` 內容保存為文件 `apache_log_slow_filtering.sql`. 由于前兩個語句和上一個例子相似, 我們直接跳過. 第三個語句定義了一個 `User defined function`. 我們使用 `org.apache.storm.sql.runtime.functions.scalar.datetime.GetTime2` 定義了一個 `GET_TIME`. `GetTime2` 函數的實現如下: ``` package org.apache.storm.sql.runtime.functions.scalar.datetime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; public class GetTime2 { public static Long evaluate(String dateString, String dateFormat) { try { DateTimeFormatter df = DateTimeFormat.forPattern(dateFormat).withZoneUTC(); return df.parseDateTime(dateString).getMillis(); } catch (Exception ex) { throw new RuntimeException(ex); } } } ``` 由于這個類定義了靜態方法 `evaluate` 可以用于 UDF. SQL 的參數和返回的類型取決于 Storm SQL 依賴哪種風格. 注意, 這個類應該放在 classpath 路徑下, 因此為了定義 UDF, 你需要創建一個包含 UDF 類的 jar 文件, 并在執行 `storm sql` 命令的時候使用 `-- jar` 選項. 最后一個語句和過濾錯誤日志的例子相似. 唯一新鮮的東西就是我們調用了 `GET_TIME(TIME_RECEIVED_UTC_ISOFORMAT, 'yyyy-MM-dd''T''HH:mm:ssZZ')` 將字符串格式的時間轉換為 unix timestamp (BIGINT). 執行: ``` $ $STORM_DIR/bin/storm sql apache_log_slow_filtering.sql apache_log_slow_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2" ``` 可以在控制臺看到下面的輸出: ``` $ kafka-console-consumer --zookeeper localhost:2181 --topic apache-slow-logs 輸出類似下面的內容: ``` {"ID":890502,"REMOTE_IP":"136.156.159.160","REQUEST_URL":"/list","REQUEST_METHOD":"GET","STATUS":200,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (Windows NT 5.01) AppleWebKit/5311 (KHTML, like Gecko) Chrome/13.0.860.0 Safari/5311","TIME_RECEIVED_UTC_ISOFORMAT":"2021-06-05T03:44:59+00:00","TIME_RECEIVED_TIMESTAMP":1622864699000,"TIME_ELAPSED_MS":638.579} {"ID":890542,"REMOTE_IP":"105.146.3.190","REQUEST_URL":"/search/tag/list","REQUEST_METHOD":"DELETE","STATUS":200,"REQUEST_HEADER_USER_AGENT":"Mozilla/5.0 (X11; Linux i686) AppleWebKit/5332 (KHTML, like Gecko) Chrome/13.0.891.0 Safari/5332","TIME_RECEIVED_UTC_ISOFORMAT":"2021-06-05T05:54:27+00:00","TIME_RECEIVED_TIMESTAMP":1622872467000,"TIME_ELAPSED_MS":403.957} ... ``` 好了! 假設我們有通過遠程 IP 查詢 geo 信息的 UDF, 我們可以通過 geo 位置做過濾, 或者將 geo 位置添加到轉換結果中. ## Summary 我們通讀了幾個 Storm SQL 的簡單的用例來學習 Storm SQL 的特性. 如果還沒有看過[Storm SQL integration](storm-sql.html) 和 [Storm SQL language](storm-sql-reference.html), 你需要閱讀這些章節來查看所有支持的特性. 注意, Storm SQL 運行在小批量和非強類型的 Trident 庫之上. Sink 實際并不檢查類型. (你可能注意到一些輸出字段的類型與輸出表模式的定義不同). 當 Storm SQL 的后端 API 修改為核心(tuple by tuple, low-level, high-level) 時, Storm SQL 的行為會相應改變.
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看