# Storm JDBC 集成
Storm/Trident集成JDBC.該包中包含的核心bolts 和 trident states ,允許storm topology把storm tuples插入數據庫表中或者執行數據庫查詢,并且豐富了storm topology 中的tuples.
**注**:在下面的示例中,我們使用com.google.common.collect.Lists和com.google.common.collect.Maps。
## Inserting into a database. 插入數據庫.
該包的中bolt 和 trident state可以將數據插入到數據庫表中綁定到單個表。
### ConnectionProvider
由不同的連接池機制實現的接口 `org.apache.storm.jdbc.common.ConnectionProvider`
java public interface ConnectionProvider extends Serializable { /** * method must be idempotent. */ void prepare();
```
/**
*
* @return a DB connection over which the queries can be executed.
*/
Connection getConnection();
/**
* called once when the system is shutting down, should be idempotent.
*/
void cleanup();
```
} ```
即插即用,我們支持'org.apache.storm.jdbc.common.HikariCPConnectionProvider',這是一個使用HikariCP的實現。
### JdbcMapper
使用JDBC在表中插入數據的主要API是org.apache.storm.jdbc.mapper.JdbcMapper 接口:
```
public interface JdbcMapper extends Serializable {
List<Column> getColumns(ITuple tuple);
}
```
`getColumns()` 方法定義了storm tuple如何映射到數據庫中表示一行的列的列表。
**返回的列表的順序很重要。 查詢中的占位符以與返回列表相同的順序進行解析。**
例如,如果用戶的插入查詢是 `insert into user(user_id, user_name, create_date) values (?,?, now())` , `getColumns` 方法返回列表中的第一項將映射到第一位,第二位到第二位,依此類推。我們不會解析提供的查詢,以列名稱來嘗試和解析占位符。沒有對查詢語法進行任何假設,允許這個連接器被一些非標準的sql框架(如僅支持upsert的Pheonix)使用。
### JdbcInsertBolt
使用 `JdbcInsertBolt`,你可以通過指定一個 `ConnectionProvider` 實例和將storm tuple轉換為DB行的 `JdbcMapper` 實例來構造一個 `JdbcInsertBolt`實例。另外,您必須使用 `withTableName` 方法提供表名或使用 `withInsertQuery`插入查詢。如果您指定了一個插入查詢,那么您應該確保您的 `JdbcMapper`實例將按照插入查詢中的順序返回一列列表。您可以選擇指定一個查詢超時秒參數,指定插入查詢可以執行的最大秒數。默認設置為topology.message.timeout.secs的值為-1將表示不設置任何查詢超時。 您應該將查詢超時值設置為<= topology.message.timeout.secs。
```
Map hikariConfigMap = Maps.newHashMap();
hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
hikariConfigMap.put("dataSource.user","root");
hikariConfigMap.put("dataSource.password","password");
ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
String tableName = "user_details";
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
.withTableName("user")
.withQueryTimeoutSecs(30);
Or
JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
.withInsertQuery("insert into user values (?,?)")
.withQueryTimeoutSecs(30);
```
### SimpleJdbcMapper
`storm-jdbc` 包括一個通用的 `JdbcMapper` 實現,稱為 `SimpleJdbcMapper` ,可以映射Storm元組到數據庫行。 `SimpleJdbcMapper`假定storm tuple中有與列名相同名稱的字段您要寫入的數據庫表。
要使用 `SimpleJdbcMapper`,你只需要告訴你要寫入的tableName并提供一個connectionProvider實例。
以下代碼創建一個 `SimpleJdbcMapper` 實例:
1.允許映射器將 storm tuple轉換為映射到表test.user_details中的行的列的列表。 2.將使用提供的HikariCP配置來建立具有指定數據庫配置的連接池自動找出您要寫入的表的列名稱和相應的數據類型。 請參閱[https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby了解有關hikari配置屬性的更多信息。](https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby%E4%BA%86%E8%A7%A3%E6%9C%89%E5%85%B3hikari%E9%85%8D%E7%BD%AE%E5%B1%9E%E6%80%A7%E7%9A%84%E6%9B%B4%E5%A4%9A%E4%BF%A1%E6%81%AF%E3%80%82)
```
Map hikariConfigMap = Maps.newHashMap();
hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
hikariConfigMap.put("dataSource.user","root");
hikariConfigMap.put("dataSource.password","password");
ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
String tableName = "user_details";
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
```
在上面的示例中初始化的映射器假定storm tuple具有要插入數據的表的所有列的值,其 `getColumn`方法將按照Jdbc連接實例的 `connection.getMetaData().getColumns();`的順序返回列。
**如果您為 `JdbcInsertBolt` 指定了自己的插入查詢,則必須使用顯式列顯示方式初始化 `SimpleJdbcMapper` ,以使模式具有與插入查詢相同順序的列。**
例如,如果您的插入查詢是 `Insert into user (user_id, user_name) values (?,?)` ,那么您的 `SimpleJdbcMapper` 應該使用以下語句進行初始化: `java List<Column> columnSchema = Lists.newArrayList( new Column("user_id", java.sql.Types.INTEGER), new Column("user_name", java.sql.Types.VARCHAR)); JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);`
如果您的 storm tuple僅具有子集列的字段i.e。如果表中的某些列具有默認值,并且您只想為沒有默認值的列插入值,則可以通過顯示的指定columnschema初始化`SimpleJdbcMapper`。例如,如果你有一個user_details表`create table if not exists user_details (user_id integer, user_name varchar(100), dept_name varchar(100), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP);`在此表中,create_time列具有默認值。 要確保只插入沒有默認值的列你可以初始化`jdbcMapper` 如下:
```
List<Column> columnSchema = Lists.newArrayList(
new Column("user_id", java.sql.Types.INTEGER),
new Column("user_name", java.sql.Types.VARCHAR),
new Column("dept_name", java.sql.Types.VARCHAR));
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
```
### JdbcTridentState
我們還支持持久化trident state 通過使用trident topologies。要創建一個jdbc 持久化的tridentstate,您需要使用表名或插入查詢、JdbcMapper實例和連接提供程序實例進行初始化。見下面的例子:
```
JdbcState.Options options = new JdbcState.Options()
.withConnectionProvider(connectionProvider)
.withMapper(jdbcMapper)
.withTableName("user_details")
.withQueryTimeoutSecs(30);
JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
```
類似于 `JdbcInsertBolt`,你可以使用 `withInsertQuery`來指定一個自定義的插入查詢,而不是指定一個表名。
## Lookup from Database 查詢數據庫
我們支持數據庫的 `select` 查詢以豐富拓撲中的storm tuples。使用JDBC執行數據庫查詢的主要API是 `org.apache.storm.jdbc.mapper.JdbcLookupMapper` 接口:
```
void declareOutputFields(OutputFieldsDeclarer declarer);
List<Column> getColumns(ITuple tuple);
List<Values> toTuple(ITuple input, List<Column> columns);
```
`declareOutputFields` 方法用于指明將作為處理storm tuple的輸出tuple的一部分發出哪些字段。
`getColumns` 方法指定選擇查詢中的占位符列及其SQL類型和要使用的值。例如在上面提到的user_details表中,如果你正在執行一個查詢'select user_name from user_details where user_id =?and create_time>?`getColumns`方法將需要一個storm輸入tuple,并返回一個包含兩個項目的列表。 `Column`類型的 `getValue()`方法的第一個實例將被用作 `user_id`的值進行查找, `Column`類型的 `getValue()`方法的第二個實例將被用作 `create_time`的值。 **注意:返回的列表中的順序決定了占位符的價值。 換句話說,列表中的第一個項目映射在select查詢中第一個'?',第二個項目是第二個'?',依次類推。**
`toTuple` 方法將select查詢的結果接收輸入tuple和表示DB行的列的列表,并返回要發射的值的列表。
**請注意,它返回一個 `Values` 列表,而不僅僅是一個 `Values` 的實例。** 這允許將單個DB行映射到多個輸出storm tuples。
### SimpleJdbcLookupMapper
`storm-jdbc`包括一個通用的`JdbcLookupMapper`實現,叫做 `SimpleJdbcLookupMapper`。
要使用 `SimpleJdbcMapper`,必須使用您的bolt輸出的字段和查詢語句中占位符的列的列表來初始化它。以下示例展示如何初始化一個 `SimpleJdbcLookupMapper`,它將' `user_id,user_name,create_date` 聲明為輸出字段, `user_id`作為查詢語句中的占位符列。`SimpleJdbcMapper` 假定您的tuple中的字段名稱等于占位符列名稱,即在我們的示例中,`SimpleJdbcMapper` 將在輸入tuple中查找一個字段 `use_id` ,并將其值用作查詢語句中占位符的值。對于構造輸出tuple,它首先在輸入元組中查找`outputFields`中指定的字段,如果在輸入元組中找不到,那么它會查看select query的輸出行中與列名稱相同的列。所以在下面的例子中,如果輸入tuple有字段 `user_id, create_date` ,查詢語句是`select user_name from user_details where user_id = ?`,對于每個輸入tuple `SimpleJdbcLookupMapper.getColumns(tuple)`將返回`tuple.getValueByField("user_id")` 用作select查詢的`?` 中的值。對于DB的每個輸出行,`SimpleJdbcLookupMapper.toTuple()`將使用輸入元組中的 `user_id, create_date`,因為從結果行只添加`user_name`,并將這3個字段作為單個輸出元組返回。
```
Fields outputFields = new Fields("user_id", "user_name", "create_date");
List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
```
### JdbcLookupBolt
要使用 `JdbcLookupBolt`,使用 `ConnectionProvider`實例, `JdbcLookupMapper`實例和select查詢來構造一個它的實例。你可以選擇指定一個查詢超時秒參數,指定select查詢可以采用的最大秒數。默認值為topology.message.timeout.secs的值。 您應該將此值設置為<= topology.message.timeout.secs。
```
String selectSql = "select user_name from user_details where user_id = ?";
SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns)
JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, lookupMapper)
.withQueryTimeoutSecs(30);
```
### JdbcTridentState for lookup
我們還可以在trident topologies 中查詢trident state。
```
JdbcState.Options options = new JdbcState.Options()
.withConnectionProvider(connectionProvider)
.withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
.withSelectQuery("select user_name from user_details where user_id = ?");
.withQueryTimeoutSecs(30);
```
## Example:
可以在 `src/test/java/topology` 目錄中找到一個可運行的例子。
### Setup
* 確保您為您選擇的數據庫包含JDBC實現依賴關系,作為構建配置的一部分。
* 測試拓撲執行以下查詢,因此您的預期數據庫必須支持這些查詢才能使測試拓撲工作。
```
create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date);
create table if not exists department (dept_id integer, dept_name varchar(100));
create table if not exists user_department (user_id integer, dept_id integer);
insert into department values (1, 'R&D');
insert into department values (2, 'Finance');
insert into department values (3, 'HR');
insert into department values (4, 'Sales');
insert into user_department values (1, 1);
insert into user_department values (2, 2);
insert into user_department values (3, 3);
insert into user_department values (4, 4);
select dept_name from department, user_department where department.dept_id = user_department.dept_id and user_department.user_id = ?;
```
### Execution
使用storm jar命令運行`org.apache.storm.jdbc.topology.UserPersistanceTopology`類。 參考該課程第5小節storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <datasourceclassname><datasource.url><user><password>[拓撲名稱]</password></user></datasource.url></datasourceclassname>
要使其與Mysql一起工作,您可以將以下內容添加到pom.xml中
```
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.31</version>
</dependency>
```
您可以使用maven程序集插件生成具有依賴關系的單個jar。 要使用插件,將以下內容添加到您的pom.xml并執行 `mvn clean compile assembly:single`
```
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>fully.qualified.MainClass</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
```
Mysql Example: `storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar org.apache.storm.jdbc.topology.UserPersistanceTopology com.mysql.jdbc.jdbc2.optional.MysqlDataSource jdbc:mysql://localhost/test root password UserPersistenceTopology`
您可以針對應顯示新插入的行的用戶表執行select查詢:
```
select * from user;
```
For trident you can view `org.apache.storm.jdbc.topology.UserPersistanceTridentTopology`.
對于trident,您可以查看`org.apache.storm.jdbc.topology.UserPersistanceTridentTopology`。
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度