<ruby id="bdb3f"></ruby>

    <p id="bdb3f"><cite id="bdb3f"></cite></p>

      <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
        <p id="bdb3f"><cite id="bdb3f"></cite></p>

          <pre id="bdb3f"></pre>
          <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

          <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
          <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

          <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                <ruby id="bdb3f"></ruby>

                合規國際互聯網加速 OSASE為企業客戶提供高速穩定SD-WAN國際加速解決方案。 廣告
                # Windowing Support in Core Storm Storm core 支持處理落在窗口內的一組元組。窗口操作指定了一下兩個參數 ``` 1.窗口的長度 - 窗口的長度或持續時間 2.滑動間隔 - 窗口滑動的時間間隔 ``` ## 滑動窗口 元組被分組在窗口和每個滑動間隔窗口中。 一個元組可以屬于多個窗口。 例如一個持續時間長度為 10 秒和滑動間隔 5 秒的滑動窗口。 `........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |... -5 0 5 10 15 -&gt; time |&lt;------- w1 --&gt;| |&lt;---------- w2 -----&gt;| |&lt;-------------- w3 ----&gt;|` 窗口每5秒進行一次評估,第一個窗口中的某些元組與第二個窗口重疊。 注意:窗口第一次滑動在 t = 5s,并且將包含在前 5 秒鐘內收到的事件。 ## Tumbling Window 元組根據時間或數量被分組在一個窗口中。任何元組只屬于其中一個窗口。 例如一個持續時間長度為 5s 的 tumbling window。 ``` | e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |... 0 5 10 15 -> time w1 w2 w3 ``` 窗口每五秒進行一次評估,并且沒有窗口重疊。 Storm 支持指定窗口長度和滑動間隔作為元組數的計數或持續時間。 bolt 接口 `IWindowedBolt` 需要由窗口支持的bolts來實現。 ``` public interface IWindowedBolt extends IComponent { void prepare(Map stormConf, TopologyContext context, OutputCollector collector); /** * Process tuples falling within the window and optionally emit * new tuples based on the tuples in the input window. */ void execute(TupleWindow inputWindow); void cleanup(); } ``` 每次窗口激活時,都會調用 `execute` 方法。TupleWindow 的參數允許訪問窗口中的當前元組,過期的元組以及自上次窗口計算后添加的新元組,這對于高效的窗口計算將是有用的。 需要窗口支持的 Bolts 一般會擴展為 `BaseWindowedBolt`,它有用來指定窗口長度和滑動間隔的apis. 例如 ``` public class SlidingWindowBolt extends BaseWindowedBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(TupleWindow inputWindow) { for(Tuple tuple: inputWindow.get()) { // do the windowing computation ... } // emit the results collector.emit(new Values(computedValue)); } } public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 1); builder.setBolt("slidingwindowbolt", new SlidingWindowBolt().withWindow(new Count(30), new Count(10)), 1).shuffleGrouping("spout"); Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } ``` 支持以下窗口配置 ``` withWindow(Count windowLength, Count slidingInterval) 基于元組計數的滑動窗口,在多個tuples進行 `slidingInterval`滑動之后。 withWindow(Count windowLength) 基于元組計數的窗口,它與每個傳入的元組一起滑動。 withWindow(Count windowLength, Duration slidingInterval) 基于元組計數的滑動窗口,在`slidingInterval`持續時間滑動之后。 withWindow(Duration windowLength, Duration slidingInterval) 基于持續時間的滑動窗口,在`slidingInterval`持續時間滑動之后。 withWindow(Duration windowLength) 基于持續時間的窗口,它與每個傳入的元組一起滑動。 withWindow(Duration windowLength, Count slidingInterval) 基于時間的滑動窗口配置在`slidingInterval`多個元組之后滑動。 withTumblingWindow(BaseWindowedBolt.Count count) 計數的tumbling窗口在指定的元組數之后tumbles. withTumblingWindow(BaseWindowedBolt.Duration duration) 基于持續時間的tumbling窗口在指定的持續時間后tumbles。 ``` ## 元組時間戳和亂序元組 默認情況下,在窗口中追蹤的時間戳是 bolt 處理元組的時間。窗口計算是根據正在處理的時間戳進行的。 Storm 支持基于源生成的時間戳的追蹤窗口。 ``` /** * Specify a field in the tuple that represents the timestamp as a long value. If this * field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown. * * @param fieldName the name of the field that contains the timestamp */ public BaseWindowedBolt withTimestampField(String fieldName) ``` 上述`fieldName`的值將從傳入的元組中查找并考慮進行窗口計算。如果該元組中不存在該字段,將拋出異常。或者,[TimestampExtractor](../storm-core/src/jvm/org/apache/storm/windowing/TimestampExtractor.java)可以用于從元組導出時間戳值(例如,從元組中的嵌套字段提取時間戳)。 ``` /** * Specify the timestamp extractor implementation. * * @param timestampExtractor the {@link TimestampExtractor} implementation */ public BaseWindowedBolt withTimestampExtractor(TimestampExtractor timestampExtractor) ``` 與時間戳字段 name/extractor 一起,可以指定一個時間滯后參數,它指示具有無序時間戳的元組的最大時間限制。 ``` /** * Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps * cannot be out of order by more than this amount. * * @param duration the max lag duration */ public BaseWindowedBolt withLag(Duration duration) ``` 例如:如果滯后是5秒,并且元組`t1`到達時間戳為`06:00:05`沒有元組可能會在早于`06:00:00`的元組時間戳到達。 如果一個元組在`t1`之后到達時間戳`05:59:59`,并且窗口已經移動過`t1`了,它將被視為遲到的元組。 默認情況下不處理遲到的元組,只需在INFO級別打印到工作日志文件。 ```java /** * Specify a stream id on which late tuples are going to be emitted. They are going to be accessible via the * {@link org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field. * It must be defined on a per-component basis, and in conjunction with the * {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown. * * @param streamId the name of the stream used to emit late tuples on */ public BaseWindowedBolt withLateTupleStream(String streamId) ``` 通過指定上述 `streamId` 來更改此行為。 在這種情況下,遲到的元組將在指定的流中發出并可通過`WindowedBoltExecutor.LATE_TUPLE_FIELD` 訪問 字段。 ### Watermarks 為了處理具有時間戳字段的元組,storm 根據傳入的元組時間戳內部計算 watermarks。Watermark 是所有輸入流中最新的元組時間戳(減去滯后)的最小值。在較高級別,watermark類似于 Flink 和 Google 的 MillWheel 用于跟蹤基于事件的時間戳的概念。 定期的(默認每秒),watermark時間戳被發出,如果基于元組的時間戳被使用,這被認為是窗口計算的 clock tick(時鐘勾)。可以用下面的api來改變發出 watermarks 的時間間隔。 ```java /** * Specify the watermark event generation interval. For tuple based timestamps, watermark events * are used to track the progress of time * * @param interval the interval at which watermark events are generated */ public BaseWindowedBolt withWatermarkInterval(Duration interval) ``` 當接收到watermark時,將對所有時間戳記進行評估。 例如,考慮具有以下窗口參數基于元組的時間戳處理, `Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s` ``` |-----|-----|-----|-----|-----|-----|-----| 0 10 20 30 40 50 60 70 ``` 當前 ts = `09:00:00` 在`9:00:00`到`9:00:01`收到的元組`e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36)` 在time t = `09:00:01`, watermark w1 = `6:00:31`被發出,沒有早于`6:00:31`的元組可以到達。 三個窗口將被評估。通過采取最早的事件時間戳(06:00:03)并基于滑動間隔(10s)計算上限來計算第一個窗口結束在 ts(06:00:10)。 1. `5:59:50 - 06:00:10` 有元組 e1, e2, e3 2. `6:00:00 - 06:00:20` 有元組 e1, e2, e3, e4 3. `6:00:10 - 06:00:30` 有元組 e4, e5 e6未被評估,因為 watermark 時間戳`6:00:31`比元組 ts`6:00:36`更舊。 在`9:00:01`和 `9:00:02`之間,接收到的元組`e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39)` 在 time t = `09:00:02`另一個 watermark w2 = `08:00:34`被發出,沒有元組比`8:00:34`更早到達。 三個窗口將被評估 1. `6:00:20 - 06:00:40` 有元組 e5, e6 (從早期批次) 2. `6:00:30 - 06:00:50` 有元組 e6 (從早期批次) 3. `8:00:10 - 08:00:30` 有元組 e7, e8, e9 e10 不被評估,因為元組 ts `8:00:39`超出了watermark time `8:00:34`. 窗口計算考慮時間間隔,并基于元組時間戳計算窗口。 ## Guarantees storm core的窗口功能目前提供一致性保證。`執行(TupleWindow inputWindow)`方法發出的值將自動鎖定到 inputWindow 中的所有元組。預計下游 bolts 將確認接收的元組(即從窗口 bolt 發出的元組)以完成元組樹。如果不是,元組將重播,并且重新評估窗口計算。 窗口中的元組會在過期后被自動確認,即當它們在`windowLength + slidingInterval`之后從窗口中滑落出來。請注意,配置`topology.message.timeout.secs`應該遠遠超過基于時間窗口的`windowLength + slidingInterval`; 否則元組將超時并重播,并可能導致重復的評估。對于基于計數的窗口,應該調整配置,使得在超時時間段內可以接收到`windowLength + slidingInterval`元組。 ## 拓撲示例 示例拓撲`滑動窗口拓撲`顯示了如何使用apis來計算滑動窗口總和和滾動窗口平均值。
                  <ruby id="bdb3f"></ruby>

                  <p id="bdb3f"><cite id="bdb3f"></cite></p>

                    <p id="bdb3f"><cite id="bdb3f"><th id="bdb3f"></th></cite></p><p id="bdb3f"></p>
                      <p id="bdb3f"><cite id="bdb3f"></cite></p>

                        <pre id="bdb3f"></pre>
                        <pre id="bdb3f"><del id="bdb3f"><thead id="bdb3f"></thead></del></pre>

                        <ruby id="bdb3f"><mark id="bdb3f"></mark></ruby><ruby id="bdb3f"></ruby>
                        <pre id="bdb3f"><pre id="bdb3f"><mark id="bdb3f"></mark></pre></pre><output id="bdb3f"></output><p id="bdb3f"></p><p id="bdb3f"></p>

                        <pre id="bdb3f"><del id="bdb3f"><progress id="bdb3f"></progress></del></pre>

                              <ruby id="bdb3f"></ruby>

                              哎呀哎呀视频在线观看