watermarker
watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。通常基于Event Time的数据,自身都包含一个timestamp。watermark的含义表示晚于这个时间的数据都已经处理完,如果还有的话直接抛弃掉
作用
watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。
我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。
structed streaming
表示方法
.withWatermark("timestamp", "10 minutes")
图表解释
上图为trigger=5min,window=10min,watermark-threshold=10min的图表
一个trigger的水位线等于上一个trigger中的最大eventtime-threshold
如上图
12:15-12:20这个trigger的水位线等于12:10-12:15这个trigger的最大eventTime(12:14-10) = 12:04
12:20-12:25这个trigger的水位线等于12:15-12:20这个trigger的最大eventTime(12:21-10) = 12:11
flink watermark
表示方法
- source之后直接加watermark
val processedData = env.addSource(consumer).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks)
- 做一定的预处理之后加watermark
val processedData = env.addSource(consumer)
.map(log => {
preprocess(log)
}).filter(_ != null).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks)
Watermakers的实现
基于事件时间
processedData.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[FlowLog] {
var currentMaxTimestamp = 0l
final val maxOutOfOrderness = 30000l
override def getCurrentWatermark: Watermark = {
new Watermark(currentMaxTimestamp - maxOutOfOrderness)
}
override def extractTimestamp(element: FlowLog, previousElementTimestamp: Long): Long = {
val timestamp = element.start_timestamp
currentMaxTimestamp = math.max(timestamp, currentMaxTimestamp)
timestamp
}
})
基于系统时间
processedData.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[FlowLog] {
final val maxOutOfOrderness = 30000l
override def getCurrentWatermark: Watermark = {
new Watermark(System.currentTimeMillis() - maxOutOfOrderness)
}
override def extractTimestamp(element: FlowLog, previousElementTimestamp: Long): Long = {
val timestamp = element.start_timestamp
timestamp
}
})
可以看到watermark=math.max(element.timestamp, currentMaxTimestamp) - maxOutOfOrderness
- maxOutOfOrderness就是上面strcuted streaming中所说的threshold,所以从上面代码来看flink的watermark也是之前数据的最大事件时间-threshold