# Windowing Support in Core Storm
Storm core 支持處理落在窗口內的一組元組。窗口操作指定了一下兩個參數
```
1.窗口的長度 - 窗口的長度或持續時間
2.滑動間隔 - 窗口滑動的時間間隔
```
## 滑動窗口
元組被分組在窗口和每個滑動間隔窗口中。 一個元組可以屬于多個窗口。
例如一個持續時間長度為 10 秒和滑動間隔 5 秒的滑動窗口。 `........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |... -5 0 5 10 15 -> time |<------- w1 -->| |<---------- w2 ----->| |<-------------- w3 ---->|` 窗口每5秒進行一次評估,第一個窗口中的某些元組與第二個窗口重疊。
注意:窗口第一次滑動在 t = 5s,并且將包含在前 5 秒鐘內收到的事件。
## Tumbling Window
元組根據時間或數量被分組在一個窗口中。任何元組只屬于其中一個窗口。
例如一個持續時間長度為 5s 的 tumbling window。
```
| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0 5 10 15 -> time
w1 w2 w3
```
窗口每五秒進行一次評估,并且沒有窗口重疊。
Storm 支持指定窗口長度和滑動間隔作為元組數的計數或持續時間。 bolt 接口 `IWindowedBolt` 需要由窗口支持的bolts來實現。
```
public interface IWindowedBolt extends IComponent {
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
/**
* Process tuples falling within the window and optionally emit
* new tuples based on the tuples in the input window.
*/
void execute(TupleWindow inputWindow);
void cleanup();
}
```
每次窗口激活時,都會調用 `execute` 方法。TupleWindow 的參數允許訪問窗口中的當前元組,過期的元組以及自上次窗口計算后添加的新元組,這對于高效的窗口計算將是有用的。
需要窗口支持的 Bolts 一般會擴展為 `BaseWindowedBolt`,它有用來指定窗口長度和滑動間隔的apis.
例如
```
public class SlidingWindowBolt extends BaseWindowedBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(TupleWindow inputWindow) {
for(Tuple tuple: inputWindow.get()) {
// do the windowing computation
...
}
// emit the results
collector.emit(new Values(computedValue));
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("slidingwindowbolt",
new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),
1).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
```
支持以下窗口配置
```
withWindow(Count windowLength, Count slidingInterval)
基于元組計數的滑動窗口,在多個tuples進行 `slidingInterval`滑動之后。
withWindow(Count windowLength)
基于元組計數的窗口,它與每個傳入的元組一起滑動。
withWindow(Count windowLength, Duration slidingInterval)
基于元組計數的滑動窗口,在`slidingInterval`持續時間滑動之后。
withWindow(Duration windowLength, Duration slidingInterval)
基于持續時間的滑動窗口,在`slidingInterval`持續時間滑動之后。
withWindow(Duration windowLength)
基于持續時間的窗口,它與每個傳入的元組一起滑動。
withWindow(Duration windowLength, Count slidingInterval)
基于時間的滑動窗口配置在`slidingInterval`多個元組之后滑動。
withTumblingWindow(BaseWindowedBolt.Count count)
計數的tumbling窗口在指定的元組數之后tumbles.
withTumblingWindow(BaseWindowedBolt.Duration duration)
基于持續時間的tumbling窗口在指定的持續時間后tumbles。
```
## 元組時間戳和亂序元組
默認情況下,在窗口中追蹤的時間戳是 bolt 處理元組的時間。窗口計算是根據正在處理的時間戳進行的。 Storm 支持基于源生成的時間戳的追蹤窗口。
```
/**
* Specify a field in the tuple that represents the timestamp as a long value. If this
* field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
*
* @param fieldName the name of the field that contains the timestamp
*/
public BaseWindowedBolt withTimestampField(String fieldName)
```
上述`fieldName`的值將從傳入的元組中查找并考慮進行窗口計算。如果該元組中不存在該字段,將拋出異常。或者,[TimestampExtractor](../storm-core/src/jvm/org/apache/storm/windowing/TimestampExtractor.java)可以用于從元組導出時間戳值(例如,從元組中的嵌套字段提取時間戳)。
```
/**
* Specify the timestamp extractor implementation.
*
* @param timestampExtractor the {@link TimestampExtractor} implementation
*/
public BaseWindowedBolt withTimestampExtractor(TimestampExtractor timestampExtractor)
```
與時間戳字段 name/extractor 一起,可以指定一個時間滯后參數,它指示具有無序時間戳的元組的最大時間限制。
```
/**
* Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps
* cannot be out of order by more than this amount.
*
* @param duration the max lag duration
*/
public BaseWindowedBolt withLag(Duration duration)
```
例如:如果滯后是5秒,并且元組`t1`到達時間戳為`06:00:05`沒有元組可能會在早于`06:00:00`的元組時間戳到達。 如果一個元組在`t1`之后到達時間戳`05:59:59`,并且窗口已經移動過`t1`了,它將被視為遲到的元組。 默認情況下不處理遲到的元組,只需在INFO級別打印到工作日志文件。 ```java /** * Specify a stream id on which late tuples are going to be emitted. They are going to be accessible via the * {@link org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field. * It must be defined on a per-component basis, and in conjunction with the * {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown. * * @param streamId the name of the stream used to emit late tuples on */ public BaseWindowedBolt withLateTupleStream(String streamId)
```
通過指定上述 `streamId` 來更改此行為。 在這種情況下,遲到的元組將在指定的流中發出并可通過`WindowedBoltExecutor.LATE_TUPLE_FIELD` 訪問
字段。
### Watermarks
為了處理具有時間戳字段的元組,storm 根據傳入的元組時間戳內部計算 watermarks。Watermark 是所有輸入流中最新的元組時間戳(減去滯后)的最小值。在較高級別,watermark類似于 Flink 和 Google 的 MillWheel 用于跟蹤基于事件的時間戳的概念。
定期的(默認每秒),watermark時間戳被發出,如果基于元組的時間戳被使用,這被認為是窗口計算的 clock tick(時鐘勾)。可以用下面的api來改變發出 watermarks 的時間間隔。
```java
/**
* Specify the watermark event generation interval. For tuple based timestamps, watermark events
* are used to track the progress of time
*
* @param interval the interval at which watermark events are generated
*/
public BaseWindowedBolt withWatermarkInterval(Duration interval)
```
當接收到watermark時,將對所有時間戳記進行評估。
例如,考慮具有以下窗口參數基于元組的時間戳處理,
`Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s`
```
|-----|-----|-----|-----|-----|-----|-----|
0 10 20 30 40 50 60 70
```
當前 ts = `09:00:00`
在`9:00:00`到`9:00:01`收到的元組`e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36)`
在time t = `09:00:01`, watermark w1 = `6:00:31`被發出,沒有早于`6:00:31`的元組可以到達。
三個窗口將被評估。通過采取最早的事件時間戳(06:00:03)并基于滑動間隔(10s)計算上限來計算第一個窗口結束在 ts(06:00:10)。
1. `5:59:50 - 06:00:10` 有元組 e1, e2, e3
2. `6:00:00 - 06:00:20` 有元組 e1, e2, e3, e4
3. `6:00:10 - 06:00:30` 有元組 e4, e5
e6未被評估,因為 watermark 時間戳`6:00:31`比元組 ts`6:00:36`更舊。
在`9:00:01`和 `9:00:02`之間,接收到的元組`e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39)`
在 time t = `09:00:02`另一個 watermark w2 = `08:00:34`被發出,沒有元組比`8:00:34`更早到達。
三個窗口將被評估
1. `6:00:20 - 06:00:40` 有元組 e5, e6 (從早期批次)
2. `6:00:30 - 06:00:50` 有元組 e6 (從早期批次)
3. `8:00:10 - 08:00:30` 有元組 e7, e8, e9
e10 不被評估,因為元組 ts `8:00:39`超出了watermark time `8:00:34`.
窗口計算考慮時間間隔,并基于元組時間戳計算窗口。
## Guarantees
storm core的窗口功能目前提供一致性保證。`執行(TupleWindow inputWindow)`方法發出的值將自動鎖定到 inputWindow 中的所有元組。預計下游 bolts 將確認接收的元組(即從窗口 bolt 發出的元組)以完成元組樹。如果不是,元組將重播,并且重新評估窗口計算。
窗口中的元組會在過期后被自動確認,即當它們在`windowLength + slidingInterval`之后從窗口中滑落出來。請注意,配置`topology.message.timeout.secs`應該遠遠超過基于時間窗口的`windowLength + slidingInterval`; 否則元組將超時并重播,并可能導致重復的評估。對于基于計數的窗口,應該調整配置,使得在超時時間段內可以接收到`windowLength + slidingInterval`元組。
## 拓撲示例
示例拓撲`滑動窗口拓撲`顯示了如何使用apis來計算滑動窗口總和和滾動窗口平均值。
- Storm 基礎
- 概念
- Scheduler(調度器)
- Configuration
- Guaranteeing Message Processing
- 守護進程容錯
- 命令行客戶端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 綜述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 語言參考
- Storm SQL 內部實現
- Flux
- Storm 安裝和部署
- 設置Storm集群
- 本地模式
- 疑難解答
- 在生產集群上運行 Topology
- Maven
- 安全地運行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 資源感知調度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各種內部行為的 Metrics
- Windows 用戶指南
- Storm 中級
- 序列化
- 常見 Topology 模式
- Clojure DSL
- 使用沒有jvm的語言編輯storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 狀態管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 調試
- 動態日志級別設置
- Storm Logs
- 動態員工分析
- 拓撲事件檢查器
- Storm 與外部系統, 以及其它庫的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息隊列遙測傳輸) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高級
- 針對 Storm 定義一個不是 JVM 的 DSL
- 多語言協議
- Storm 內部實現
- 翻譯進度