<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                ??一站式輕松地調用各大LLM模型接口,支持GPT4、智譜、豆包、星火、月之暗面及文生圖、文生視頻 廣告
                # Storm SQL 集成 Storm SQL 使用戶在 Storm 中的流數據上運行 SQL 查詢. SQL 接口不僅可以加快流數據分析的開發周期, 同時還創造了一個機遇, 統一如 [Apache Hive](///hive.apache.org) 和實時流數據分析之類的批量數據處理. 在很高的級別, StromSQL 把 SQL 編譯為 [Trident](Trident-API-Overview.html) 拓撲并在 Strom 集群中執行. 本文檔提供了作為一個末端用戶如何使用 StormSQL 的相關信息. 對于想更深入了解 StormSQL 的設計和實現的朋友請參考[這個](storm-sql-internal.html) 頁面. Storm SQL 是一個 `試驗性` 的功能, 因此其內部邏輯和支持的特性可能在將來會有變化. 但是小的改動不會影響用戶體驗. 在引入 UX 更改時,我們會提醒和通知用戶. ## 使用 運行 `storm sql` 命令把 SQL 語句編譯為 Trident topology, 并且提交到 Storm 集群. ``` $ bin/storm sql <sql-file> <topo-name> ``` `sql-file` 文件中包含需要被執行的 SQL 語句的列表, `topo-name` 是 topology 的名稱. 當用戶把 `topo-name` 設置為 `--explain` 的時候, StormSQL 激活 `explain mode` 以顯示查詢計劃而不是提交拓撲. 詳細的解釋請參見 `顯示查詢計劃(explain mode)` 一節. ## 支持的特性 當前版本支持以下特性: * 讀出和流入外部數據源 * 過濾 tuples * 投影 * 用戶自定義函數 (標量) 特意不支持聚合和連接. 當 Storm SQL 要支持本地 `Streaming SQL` 時, 將會介紹這些特性. ## 指定外部數據源 在 StormSQL中, 數據表現為外部表. 用戶可以使用語句 `CREATE EXTERNAL TABLE` 指定數據源. `CREATE EXTERNAL TABLE` 語法與 [Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL)中的非常接近. ``` CREATE EXTERNAL TABLE table_name field_list [ STORED AS INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname ] LOCATION location [ PARALLELISM parallelism ] [ TBLPROPERTIES tbl_properties ] [ AS select_stmt ] ``` 各種屬性的詳細解釋參考 [Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL). `PARALLELISM` 是 StormSQL 特有的關鍵詞, 用于描述輸入數據源的并行度. 等同于為 Trident Spout 設置并行度. 默認值是 1, 這個選項對于輸出數據源沒有任何影響. (如果需要的話, 以后可能會改變. 正常情況下應當避免重新分區). 例如, 下面的語句指定了一個 Kafka Spout 和 sink: ``` CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://localhost:2181/brokers?topic=test' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.org.apache.storm.kafka.ByteBufferSerializer"}}' ``` ## 植入外部數據源 用戶通過實現 `ISqlTridentDataSource` 接口并且利用 Java 的 service loader 機制注冊他們, 以植入外部數據源. 外部數據源將根據表的 URI模式 進行選擇. 更多細節請參考 `storm-sql-kafka`. ## 指定 User Defined Function (UDF) 用戶可以使用 `CREATE FUNCTION` 語句來定義 user defined function (標量 或者 聚合). 例如, 下面的語句使用`org.apache.storm.sql.TestUtils$MyPlus` 類定義了一個名為 `MYPLUS` 的函數. ``` CREATE FUNCTION MYPLUS AS 'org.apache.storm.sql.TestUtils$MyPlus' ``` Storm SQL 通過檢查用了什么方法來決定這個函數作為一個 標量 還是 聚合. 如果類中定義了 `evaluate` 方法, Storm SQL 將這個函數作為 `scalar`. 標量函數類的示例: ``` public class MyPlus { public static Integer evaluate(Integer x, Integer y) { return x + y; } } ``` ## 例子: 過濾 Kafka 流 假設有一個 Kafka stream 代表訂單交易. 每個 stream 中的消息包含訂單的 id, 產品的單價, 產品數量. 目標是過濾重要交易的訂單(譯注:總價格大于50的訂單),并將這些訂單插入到另一個 Kafka stream 用于進一步分析. 用戶可以在 SQL 文件中指定下列 SQL 語句: ``` CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 'kafka://localhost:2181/brokers?topic=large_orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.org.apache.storm.kafka.ByteBufferSerializer"}}' INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY > 50 ``` 第一個語句定義一個表 `ORDER` 代表輸入流. `LOCATION` 從句指定 ZkHost (`localhost:2181`), broker的路徑(`/brokers`), 和 topic名稱(`orders`). 類似的, 第二個語句指定了表 `LARGE_ORDERS` 代表一個輸出流. `TBLPROPERTIES` 從句指定了一個 [KafkaProducer](http://kafka.apache.org/documentation.html#producerconfigs) 的配置, 這個從句是 Kafka sink 表必須的. 第三個語句是一個定義拓撲的 `SELECT` 語句: 它指示 StormSQL 過濾 `ORDERS` 表中的所有訂單, 計算各訂單總價并將匹配的記錄插入 `LARGE_ORDER` 指定的 Kafka流 中. 要想運行這個例子, 用戶需要在 classpath 中包含數據源 (本例中 `storm-sql-kafka`)和它的所有依賴. 當運行 `storm sql` 的時候 Storm SQL 的依賴會自動處理. 用戶可以在提交的步驟中包含數據源依賴, 如下所示: ``` $ bin/storm sql order_filtering.sql order_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2" ``` 上面的命令提交 SQL 語句到 StormSQL. 如果用戶使用了不同版本的 Storm 或者 Kafka, 需要替換每個 artifacts 的版本. 現在, 應該能在 Storm UI 中看到 `order_filtering` 拓撲. ## 顯示查詢計劃(explain mode) 就像 SQL 語句上的 `explain`, StormSQL 在運行 Storm SQL 執行器時提供 `explain mode`. 在分析模式下, StormSQL 分析每一個查詢語句(僅DML)并顯示執行計劃而不是提交拓撲. 為了運行 `explain mode`, 需要設置拓撲名稱為 `--explain` 并像用和提交相同的方式執行 `storm sql` 命令. 例如, 當以分析模式運行上面的例子的時: ``` $ bin/storm sql order_filtering.sql --explain --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2\!org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2" ``` StormSQL 輸出打印如下: ``` =========================================================== query> CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' ----------------------------------------------------------- 16:53:43.951 [main] INFO o.a.s.s.r.DataSourcesRegistry - Registering scheme kafka with org.apache.storm.sql.kafka.KafkaDataSourcesProvider@4d1bf319 No plan presented on DDL =========================================================== =========================================================== query> CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 'kafka://localhost:2181/brokers?topic=large_orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.storm.kafka.ByteBufferSerializer"}}' ----------------------------------------------------------- No plan presented on DDL =========================================================== =========================================================== query> INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY > 50 ----------------------------------------------------------- plan> LogicalTableModify(table=[[LARGE_ORDERS]], operation=[INSERT], updateColumnList=[[]], flattened=[true]), id = 8 LogicalProject(ID=[$0], TOTAL=[*($1, $2)]), id = 7 LogicalFilter(condition=[>(*($1, $2), 50)]), id = 6 EnumerableTableScan(table=[[ORDERS]]), id = 5 =========================================================== ``` ## 局限 * Windowing 尚未實現. * 不支持聚合和連接(待到 `流SQL` 成熟)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看