<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                # Storm Cassandra 集成 ### Apache Cassandra 的 Bolt API 實現 這個庫提供了 Apache Cassandra 之上的核心 storm bolt . 提供簡單的 DSL 來 map storm _Tuple_ 到 Cassandra Query Language _Statement_ (Cassandra 查詢語言 _Statement_). ### Configuration (配置) 以下屬性可能會傳遞給 storm 配置. | **Property name(屬性名稱)** | **Description(描述)** | **Default(默認)** | | --- | --- | --- | | **cassandra.keyspace** | - | | | **cassandra.nodes** | - | {"localhost"} | | **cassandra.username** | - | - | | **cassandra.password** | - | - | | **cassandra.port** | - | 9092 | | **cassandra.output.consistencyLevel** | - | ONE | | **cassandra.batch.size.rows** | - | 100 | | **cassandra.retryPolicy** | - | DefaultRetryPolicy | | **cassandra.reconnectionPolicy.baseDelayMs** | - | 100 (ms) | | **cassandra.reconnectionPolicy.maxDelayMs** | - | 60000 (ms) | ### CassandraWriterBolt #### Static import ``` import static org.apache.storm.cassandra.DynamicStatementBuilder.* ``` #### Insert Query Builder (插入查詢生成器) ##### Insert query including only the specified tuple fields (插入僅包含指定 tuple 字段的查詢). ``` new CassandraWriterBolt( async( simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);") .with( fields("title", "year", "performer", "genre", "tracks") ) ) ); ``` ##### Insert query including all tuple fields(插入包含所有 tuple 字段的查詢). ``` new CassandraWriterBolt( async( simpleQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);") .with( all() ) ) ); ``` ##### Insert multiple queries from one input tuple (從一個 input tuple 插入多個查詢). ``` new CassandraWriterBolt( async( simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())), simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())) ) ); ``` ##### Insert query using QueryBuilder(使用 QueryBuilder 插入查詢) ``` new CassandraWriterBolt( async( simpleQuery("INSERT INTO album (title,year,perfomer,genre,tracks) VALUES (?, ?, ?, ?, ?);") .with(all())) ) ) ``` ##### Insert query with static bound query (使用 static bound query 插入 查詢) ``` new CassandraWriterBolt( async( boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);") .bind(all()); ) ); ``` ##### Insert query with static bound query using named setters and aliases (使用 named setters 和 aliases 插入帶有 static bound query 的查詢) ``` new CassandraWriterBolt( async( boundQuery("INSERT INTO album (title,year,performer,genre,tracks) VALUES (:ti, :ye, :pe, :ge, :tr);") .bind( field("ti"),as("title"), field("ye").as("year")), field("pe").as("performer")), field("ge").as("genre")), field("tr").as("tracks")) ).byNamedSetters() ) ); ``` ##### Insert query with bound statement load from storm configuration (從 storm 配置插入 bound statement load 的查詢) ``` new CassandraWriterBolt( boundQuery(named("insertIntoAlbum")) .bind(all()); ``` ##### Insert query with bound statement load from tuple field (從 tuple 字段插入 bound statement load 的查詢) ``` new CassandraWriterBolt( boundQuery(namedByField("cql")) .bind(all()); ``` ##### Insert query with batch statement (使用 batch 語句插入查詢) ``` // Logged new CassandraWriterBolt(loggedBatch( simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())), simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())) ) ); // UnLogged new CassandraWriterBolt(unLoggedBatch( simpleQuery("INSERT INTO titles_per_album (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())), simpleQuery("INSERT INTO titles_per_performer (title,year,performer,genre,tracks) VALUES (?, ?, ?, ?, ?);").with(all())) ) ); ``` ### 如何處理 query execution results (查詢執行結果) _ExecutionResultHandler_ 接口可用于自定義 execution result (執行結果)應如何處理. ``` public interface ExecutionResultHandler extends Serializable { void onQueryValidationException(QueryValidationException e, OutputCollector collector, Tuple tuple); void onReadTimeoutException(ReadTimeoutException e, OutputCollector collector, Tuple tuple); void onWriteTimeoutException(WriteTimeoutException e, OutputCollector collector, Tuple tuple); void onUnavailableException(UnavailableException e, OutputCollector collector, Tuple tuple); void onQuerySuccess(OutputCollector collector, Tuple tuple); } ``` 默認情況下, CassandraBolt 將在所有的 Cassandra Exception 中 fails(失敗)一個 tuple (請參閱 [BaseExecutionResultHandler](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/BaseExecutionResultHandler.java)). ``` new CassandraWriterBolt(insertInto("album").values(with(all()).build()) .withResultHandler(new MyCustomResultHandler()); ``` ### Declare Output fields (聲明輸出字段) CassandraBolt 可以聲明 declare output fields / stream output fields(輸出字段/流輸出字段). 例如, 這可以用于在 error (錯誤)或者 chain queries (鏈式查詢)上 remit (傳遞)一個 new tuple (新的元組). ``` new CassandraWriterBolt(insertInto("album").values(withFields(all()).build()) .withResultHandler(new EmitOnDriverExceptionResultHandler()); .withStreamOutputFields("stream_error", new Fields("message"); public static class EmitOnDriverExceptionResultHandler extends BaseExecutionResultHandler { @Override protected void onDriverException(DriverException e, OutputCollector collector, Tuple tuple) { LOG.error("An error occurred while executing cassandra statement", e); collector.emit("stream_error", new Values(e.getMessage())); collector.ack(tuple); } } ``` ### Murmur3FieldGrouping [Murmur3StreamGrouping](https://github.com/apache/storm/tree/master/external/storm-cassandra/blob/master/src/main/java/org/apache/storm/cassandra/Murmur3StreamGrouping.java) 可以用來優化 cassandra writes (cassandra 的寫入). 根據 specified row partition keys (指定的行分區鍵), 該 stream 在 bolt 的 task 之間進行 partitioned (分區). ``` CassandraWriterBolt bolt = new CassandraWriterBolt( insertInto("album") .values( with(fields("title", "year", "performer", "genre", "tracks") ).build()); builder.setBolt("BOLT_WRITER", bolt, 4) .customGrouping("spout", new Murmur3StreamGrouping("title")) ``` ### Trident API 支持 storm-cassandra 支持 用于將 data `inserting(插入)` Cassandra 的 Trident `state` API . `java CassandraState.Options options = new CassandraState.Options(new CassandraContext()); CQLStatementTupleMapper insertTemperatureValues = boundQuery( "INSERT INTO weather.temperature(weather_station_id, weather_station_name, event_time, temperature) VALUES(?, ?, ?, ?)") .bind(with(field("weather_station_id"), field("name").as("weather_station_name"), field("event_time").now(), field("temperature"))); options.withCQLStatementTupleMapper(insertTemperatureValues); CassandraStateFactory insertValuesStateFactory = new CassandraStateFactory(options); TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory); stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name")); stream = stream.each(new Fields("name"), new PrintFunction(), new Fields("name_x")); stream.partitionPersist(insertValuesStateFactory, new Fields("weather_station_id", "name", "event_time", "temperature"), new CassandraStateUpdater(), new Fields());` 以下 `state` API 用于從 Cassandra `querying(查詢)` 數據. `java CassandraState.Options options = new CassandraState.Options(new CassandraContext()); CQLStatementTupleMapper insertTemperatureValues = boundQuery("SELECT name FROM weather.station WHERE id = ?") .bind(with(field("weather_station_id").as("id"))); options.withCQLStatementTupleMapper(insertTemperatureValues); options.withCQLResultSetValuesMapper(new TridentResultSetValuesMapper(new Fields("name"))); CassandraStateFactory selectWeatherStationStateFactory = new CassandraStateFactory(options); CassandraStateFactory selectWeatherStationStateFactory = getSelectWeatherStationStateFactory(); TridentState selectState = topology.newStaticState(selectWeatherStationStateFactory); stream = stream.stateQuery(selectState, new Fields("weather_station_id"), new CassandraQuery(), new Fields("name"));`
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看