新旧版本对比
早在flink1.10以前的版本,有三种时间语义:
- processing time,即当前集群的处理时间
- event time,即消息中附带的消息产生时间
- ingestion time,即消息进入集群的摄取时间
彼时,时间语义可以通过如下方式设置
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
2020年年中,flink1.11发布,其中,FLIP-126: Unify (and separate) Watermark Assigners。水位线的产生之前有两个接口(AssignerWithPunctuatedWatermarks和AssignerWithPeriodicWatermarks),改为由WatermarkGenerator接口实现,降低了开发的复杂度,并且脱离了TimestampAssigner
2020年年底,flink1.12发布,流批统一,另外flink将上述时间语义设置方式标记过期:env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
水印生成策略
// version flink-1.12.1
//官方策略-无水印生成
assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks())
//官方策略-单调递增的水印
assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
//官方策略-延迟20秒的水印
assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)))
//自定义策略-从消息中提取水印
class MyWatermark implements WatermarkGenerator<String> {
@Override
public void onEvent(String event, long timestamp, WatermarkOutput output) {
LocalDateTime dateTime = LocalDateTime.parse(event, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
long eventTimestamp = dateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
log.info("event[{}] timestamp[{}] eventTimestamp[{}]" ,event , timestamp, eventTimestamp);
output.emitWatermark(new Watermark(eventTimestamp));
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
}
}
void onPeriodicEmit(WatermarkOutput output):周期性触发watermark生成,默认200ms。可以通过env.getConfig().setAutoWatermarkInterval(200)配置。
void onEvent(String event, long eventTimestamp, WatermarkOutput output):事件触发watermark生成。
withIdleness(Duration.ofMinutes(1)):此方法为闲置watermark生成。如果1分钟内没有事件产生,则下发一个watermark。
两条时间线
processing time:flink处理消息时的时间,窗口开启的依据时间
-
watermark:消息中的消息产生时间,窗口关闭的依据时间
note:如果当前watermark大于窗口关闭的时间,你会发现,数据无法在窗口收集
源码分析
//窗口分配器参数中 timestamp为当前处理事件
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > -9223372036854775808L) {
if (this.staggerOffset == null) {
this.staggerOffset = this.windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), this.size);
}
long start = TimeWindow.getWindowStartWithOffset(timestamp, (this.globalOffset + this.staggerOffset) % this.size, this.size);
return Collections.singletonList(new TimeWindow(start, start + this.size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
//窗口触发器 TriggerResult.FIRE表示窗口关闭,触发计算
//onElement表示窗口收集动作
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
//onEventTime表示窗口计算动作
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
注意事项
- 单调递增的watermark的下发一般越贴近数据源越好
- 每个算子默认的watermark是Long.MIN_VALUE
- 算子的watermark时间不会出现回溯(下一个水印如果小于当前水印则无效)
- 如果并行度大于1,则watermark依据木桶原理,同一算子的每个slot的watermark时间都大于关窗时间才触发计算
- 上述均为watermark的生成及下发,还有关于执行时间的生成及下发,虽然他很少用。具体可以了解了解BoundedOutOfOrdernessTimestampExtractor这个类
参考资料
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html