<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                企業??AI智能體構建引擎,智能編排和調試,一鍵部署,支持知識庫和私有化部署方案 廣告
                # Storm Druid 集成 ## Storm Druid Bolt 和 TridentState 該模塊提供了將數據寫入[Druid](http://druid.io/) 數據存儲的核心Strom和Trident bolt(螺栓)的實現。 該實現使用Druid's的[Tranquility庫](https://github.com/druid-io/tranquility)向druid發送消息。 一些實施細節從現有的借用 [Tranquility Storm Bolt](https://github.com/druid-io/tranquility/blob/master/docs/storm.md). 這個新的Bolt(螺栓)增加了支持最新的storm釋放,并保持在storm回購的bolt(螺栓)。 ### Core Bolt 下面的例子描述了使用 `org.apache.storm.druid.bolt.DruidBeamBolt`的核心bolt(螺栓)默認情況下,該bolt(螺栓)希望收到元組,其中"事件"字段提供您的事件類型。可以通過實現ITupleDruidEventMapper接口來更改此邏輯。 ``` DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>()); DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build(); ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME); DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig); topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen"); topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId()); ``` ### Trident State ``` DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>()); ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME); final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10)); stream.peek(new Consumer() { @Override public void accept(TridentTuple input) { LOG.info("########### Received tuple: [{}]", input); } }).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater()); ``` ### 樣品工廠實現 Druid bolt 必須配置一個 BeamFactory. 您可以使用它們其中一個來實現 [DruidBeams builder's](https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala) "buildBeam()" method. See the [Configuration documentation](https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details. For more details refer [Tranquility library](https://github.com/druid-io/tranquility) docs. ``` public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> { @Override public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) { final String indexService = "druid/overlord"; // The druid.service name of the indexing service Overlord node. final String discoveryPath = "/druid/discovery"; // Curator service discovery path. config: druid.discovery.curator.path final String dataSource = "test"; //The name of the ingested datasource. Datasources can be thought of as tables. final List<String> dimensions = ImmutableList.of("publisher", "advertiser"); List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of( new CountAggregatorFactory( "click" ) ); // Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>). final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>() { @Override public DateTime timestamp(Map<String, Object> theMap) { return new DateTime(theMap.get("timestamp")); } }; // Tranquility uses ZooKeeper (through Curator) for coordination. final CuratorFramework curator = CuratorFrameworkFactory .builder() .connectString((String)conf.get("druid.tranquility.zk.connect")) //take config from storm conf .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000)) .build(); curator.start(); // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default, // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp. final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null); // Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is // done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```. // In this case, we won't provide one, so we're just using Jackson. final Beam<Map<String, Object>> beam = DruidBeams .builder(timestamper) .curator(curator) .discoveryPath(discoveryPath) .location(DruidLocation.create(indexService, dataSource)) .timestampSpec(timestampSpec) .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE)) .tuning( ClusteredBeamTuning .builder() .segmentGranularity(Granularity.HOUR) .windowPeriod(new Period("PT10M")) .partitions(1) .replicants(1) .build() ) .druidBeamConfig( DruidBeamConfig .builder() .indexRetryPeriod(new Period("PT10M")) .build()) .buildBeam(); return beam; } } ``` Example code is available [here.](https://github.com/apache/storm/tree/master/external/storm-druid/src/test/java/org/apache/storm/druid)
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看