定义与作用
官方定义:A Watermark tells operators that no elements with a timestamp older or equal to the watermark timestamp should arrive at the operator.
作用:watermark是一个时间标记,用于上游告诉下游,该时间点以前的数据都已经发送完毕。主要用于解决乱序和延迟数据的问题。
产生方式
Watermark的产生必然伴随着数据的Timestamp抽取(通过调用assignTimestampsAndWatermarks方法)。可以在产生DataStreamSource的时候就直接产生(如用FlinkKafkaConsumer类中的assignTimestampsAndWatermarks来配置),也可以在后面为DataStream中设置(用DataStream类中的assignTimestampsAndWatermarks来配置)。
Watermark的产生逻辑有两种:Periodic和Punctuated。
Periodic
使用方式:
Datastream.assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner);
这种方式下,Datastream向下游发送Watermark的时机总是固定周期的,但是发送的Watermark值却是根据数据中的Timestamp来计算的。
AssignerWithPeriodicWatermarks<T>包含一个方法:
Watermark getCurrentWatermark();
用来实现获取Watermark的逻辑。Datastream在会注册一个定时器,定时器触发的时候会调用getCurrentWatermark来计算一个Watermark,并将其注入输出的数据流中。
可以通过继承AssignerWithPeriodicWatermarks<T>来自定义Watermark的计算逻辑。当然通常更多地使用预定义的类:
BoundedOutOfOrdernessTimestampExtractor
对于所有数据元素抽取Timestamp后,保存迄今最大的值保存在currentMaxTimestamp中,然后产生watermark的计算方法:watermark = currentMaxTimestamp - maxOutOfOrderness。其中maxOutOfOrderness表示最大乱序时间,可以自定义。即当前timestamp最大值减去一定的延时作为watermark,这是比较常用的一种
Punctuated
使用方式:
Datastream.assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner);
这种方式下,无论是Datastream向下游发送Watermark的时机,还是发送的Watermark值都是由用户自行定义的。
AssignerWithPunctuatedWatermarks<T>包含一个方法:
Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
它会作用于每一个数据元素,其中用户需要自己定义是否根据这个元素输出和怎样输出watermark。
这种方式的主要使用场景是数据中本身包含有标志着该数据以前的数据已经发送完毕的信息,或者用户想自己控制watermark产生的方法与时机。
传递方式
watermark和record、stream status一样,都以StreamElement的形式在DataStream中,由上游算子发送给下游算子。
每个算子都使用了StatusWatermarkValve来保存和处理上游发来的watermark状态,并且决定如何传递给它的下游。
StatusWatermarkValve内部又包含了一个数组
private final InputChannelStatus[] channelStatuses;
用于保存每个上游通道的最大watermark以及该通道的stream status等数据
处理流程
一个算子接收到watermark,并进行处理的整个流程:
每个算子都有一个StreamInputProcessor(或StreamTwoInputProcessor,根据上游算子的数量决定),它从数据流中读数据,并判断数据的类型。如果是watermark,则交给其所属的StatusWatermarkValve(状态水位阀)去处理这个watermark。
StatusWatermarkValve内部包含了一个数组InputChannelStatus[] channelStatuses,它保存了每个上游通道的最大watermark。其中每个通道对应上游算子的一个并发。
StatusWatermarkValve拿到这个watermark时,会在InputChannelStatus[]数组中找到其对应通道的之前的watermark,如果当前的比原来的大,则更新该通道的watermark;
如果有更新,则还要判断一下InputChannelStatus[]数组中的watermark最小值是否有更新,如果更新了,则调用outputHandler的handleWatermark来产生该算子的新watermark。
随后,算子的timerService首先会更新算子内部的watermark状态,并判断是否触发一些窗口、定时事件等,等数据都处理完毕后调用output.emitWatermark将当前的watermark下发给下游。注意必须等该Watermark所触发的计算完成、并保证数据下发后,再行下发Watermark。
使用方式
触发窗口
这是最常见的用法,当Watermark没过算子的某些窗口的结束时间,算子会触发这些窗口,对窗口中的数据进行处理,然后输出处理结果,并将Watermark也输出到下游。
在此之后如果有早于该Watermark时间戳的数据元素流入该算子,则会被当成late element被输出到side output中。
注意:WindowedStream中还有一个方法
allowedLateness(Time lateness);
这里的lateness与maxOutOfOrderness类似,都是用于设定延时时间的,只是maxOutOfOrderness是在curMaxTimestamp的基础上减去一个延时时间,来产生Watermark。而lateness是在Watermark的基础上再减去一个延时时间。设定了这个参数后,是当Watermark-lateness大于窗口结束时间时,才出发窗口。
触发定时器
有些算子中虽然没有使用窗口,但是注册使用了定时器:
ctx.timerService().registerEventTimeTimer(value.getBizTime());
这种情况下,当Watermark没过所注册的定时时间的时候,就会触发定时器,去调用用户自定义的onTimer方法。