# Storm Druid 集成
## Storm Druid Bolt 和 TridentState
該模塊提供了將數據寫入[Druid](http://druid.io/) 數據存儲的核心Strom和Trident bolt(螺栓)的實現。 該實現使用Druid's的[Tranquility庫](https://github.com/druid-io/tranquility)向druid發送消息。
一些實施細節從現有的借用 [Tranquility Storm Bolt](https://github.com/druid-io/tranquility/blob/master/docs/storm.md). 這個新的Bolt(螺栓)增加了支持最新的storm釋放,并保持在storm回購的bolt(螺栓)。
### Core Bolt
下面的例子描述了使用 `org.apache.storm.druid.bolt.DruidBeamBolt`的核心bolt(螺栓)默認情況下,該bolt(螺栓)希望收到元組,其中"事件"字段提供您的事件類型。可以通過實現ITupleDruidEventMapper接口來更改此邏輯。
```
DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build();
ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig);
topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen");
topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId());
```
### Trident State
```
DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>());
ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);
final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10));
stream.peek(new Consumer() {
@Override
public void accept(TridentTuple input) {
LOG.info("########### Received tuple: [{}]", input);
}
}).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater());
```
### 樣品工廠實現
Druid bolt 必須配置一個 BeamFactory. 您可以使用它們其中一個來實現 [DruidBeams builder's](https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala) "buildBeam()" method. See the [Configuration documentation](https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details. For more details refer [Tranquility library](https://github.com/druid-io/tranquility) docs.
```
public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
@Override
public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {
final String indexService = "druid/overlord"; // The druid.service name of the indexing service Overlord node.
final String discoveryPath = "/druid/discovery"; // Curator service discovery path. config: druid.discovery.curator.path
final String dataSource = "test"; //The name of the ingested datasource. Datasources can be thought of as tables.
final List<String> dimensions = ImmutableList.of("publisher", "advertiser");
List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of(
new CountAggregatorFactory(
"click"
)
);
// Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).
final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>()
{
@Override
public DateTime timestamp(Map<String, Object> theMap)
{
return new DateTime(theMap.get("timestamp"));
}
};
// Tranquility uses ZooKeeper (through Curator) for coordination.
final CuratorFramework curator = CuratorFrameworkFactory
.builder()
.connectString((String)conf.get("druid.tranquility.zk.connect")) //take config from storm conf
.retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
.build();
curator.start();
// The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,
// Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);
// Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is
// done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```.
// In this case, we won't provide one, so we're just using Jackson.
final Beam<Map<String, Object>> beam = DruidBeams
.builder(timestamper)
.curator(curator)
.discoveryPath(discoveryPath)
.location(DruidLocation.create(indexService, dataSource))
.timestampSpec(timestampSpec)
.rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE))
.tuning(
ClusteredBeamTuning
.builder()
.segmentGranularity(Granularity.HOUR)
.windowPeriod(new Period("PT10M"))
.partitions(1)
.replicants(1)
.build()
)
.druidBeamConfig(
DruidBeamConfig
.builder()
.indexRetryPeriod(new Period("PT10M"))
.build())
.buildBeam();
return beam;
}
}
```
Example code is available [here.](https://github.com/apache/storm/tree/master/external/storm-druid/src/test/java/org/apache/storm/druid)
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度