TimestampAssigner用来获取事件的时间戳,WatermarkGenerator用来生成水印。WatermarkStrategy接口同时继承TimestampAssigner和WatermarkGenerator, 并提供了一些开箱即用的静态方法,但用户也可以实现该接口指定水印策略。
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.f0);
public interface WatermarkStrategy<T>
extends TimestampAssignerSupplier<T>,
* Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
* strategy.
TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
* Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
- 数据源上
- 非数据源操作之后
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(<watermark strategy>);
.keyBy( (event) -> event.getGroup() )
.reduce( (a, b) -> a.add(b) )
如果其中一个input stream中一直没有数据出现,WatermarkGenerator就无法生成watermark, 因为watermark取的是多个input stream中的最小值。这是我们称这个input是空闲的。watermarkStrategy提供了withIdleness方法处理这种情况。
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
* The {@code WatermarkGenerator} generates watermarks either based on events or
* periodically (in a fixed interval).
* <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the
* {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
public interface WatermarkGenerator<T> {
* Called for every event, allows the watermark generator to examine
* and remember the event timestamps, or to emit a watermark based on
* the event itself.
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
* Called periodically, and might emit a new watermark, or not.
* <p>The interval in which this method is called and Watermarks
* are generated depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
void onPeriodicEmit(WatermarkOutput output);
- 按周期的:通过onEvent观察事件,并在框架调用onPeriodicEmit()后发出一个watermark
- 按事件的:也通过onEvent观察事件,但会根据stream中的某个事件判断何时发出watermark, 而不是onPeriodicEmit
如何定义一个Periodic WatermarkGenerator
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
public void onPeriodicEmit(WatermarkOutput output) {
// emit the watermark as current highest timestamp minus the out-of-orderness bound
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
* This generator generates watermarks that are lagging behind processing time
* by a fixed amount. It assumes that elements arrive in Flink after a bounded delay.
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {
private final long maxTimeLag = 5000; // 5 seconds
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
// don't need to do anything because we work on processing time
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
如何定义一个Punctuated WatermarkGenerator
public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
if (event.hasWatermarkMarker()) {
output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
public void onPeriodicEmit(WatermarkOutput output) {
// don't need to do anything because we emit in reaction to events above
也可以给每个事件都生成一个watermark, 但这个降低性能。
使用Kafka做数据源时,每个kafka的分区都有一个简单事件时间模式(时间戳升序或有界乱序)。当同时消费kafka多个分区的数据时,多个数据流交叉会打乱这种模式。此时可以用Flink的Kafka-partition-aware生成水印,即在kafka consumer内部按分区生成水印,并且以数据流合并的方式合并水印。
内置的watermark generator
- 单调增加的时间戳
- 有界乱序的延迟