水印策略
TimestampAssigner用来获取事件的时间戳,WatermarkGenerator用来生成水印。WatermarkStrategy接口同时继承TimestampAssigner和WatermarkGenerator, 并提供了一些开箱即用的静态方法,但用户也可以实现该接口指定水印策略。
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.f0);
public interface WatermarkStrategy<T>
extends TimestampAssignerSupplier<T>,
WatermarkGeneratorSupplier<T>{
/**
* Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
* strategy.
*/
@Override
TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
/**
* Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
*/
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
使用水印策略
Flink程序中有两个地方可以使用WatermarkStrategy:
- 数据源上
- 非数据源操作之后
推荐使用第一种。因为数据源直到更多的分区信息,生成的水印也更准确,但这需要特定的数据源接口。
第二种方法应该只有在你无法直接在数据源上生成水印时使用。如下例,先读取datastream,再生成一个新的带timestamp和watermark的datastream.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(<watermark strategy>);
withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
处理空闲资源
如果其中一个input stream中一直没有数据出现,WatermarkGenerator就无法生成watermark, 因为watermark取的是多个input stream中的最小值。这是我们称这个input是空闲的。watermarkStrategy提供了withIdleness方法处理这种情况。
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1));
WatermarkGenerators
/**
* The {@code WatermarkGenerator} generates watermarks either based on events or
* periodically (in a fixed interval).
*
* <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the
* {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
*/
@Public
public interface WatermarkGenerator<T> {
/**
* Called for every event, allows the watermark generator to examine
* and remember the event timestamps, or to emit a watermark based on
* the event itself.
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
* Called periodically, and might emit a new watermark, or not.
*
* <p>The interval in which this method is called and Watermarks
* are generated depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
*/
void onPeriodicEmit(WatermarkOutput output);
}
WatermarkGenerator分两种:
- 按周期的:通过onEvent观察事件,并在框架调用onPeriodicEmit()后发出一个watermark
- 按事件的:也通过onEvent观察事件,但会根据stream中的某个事件判断何时发出watermark, 而不是onPeriodicEmit
如何定义一个Periodic WatermarkGenerator
首先通过ExecutionConfig.setAutoWatermarkInterval(...)定义生成水印的周期,亦即框架调用onPeriodicEmit()方法的周期。onPeriodicEmit调用后,如果当前watermark非空且比前一个大则发出。
/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// emit the watermark as current highest timestamp minus the out-of-orderness bound
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
}
/**
* This generator generates watermarks that are lagging behind processing time
* by a fixed amount. It assumes that elements arrive in Flink after a bounded delay.
*/
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
// don't need to do anything because we work on processing time
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
}
}
如何定义一个Punctuated WatermarkGenerator
public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
if (event.hasWatermarkMarker()) {
output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// don't need to do anything because we emit in reaction to events above
}
}
也可以给每个事件都生成一个watermark, 但这个降低性能。
水印策略和kafka连接器
使用Kafka做数据源时,每个kafka的分区都有一个简单事件时间模式(时间戳升序或有界乱序)。当同时消费kafka多个分区的数据时,多个数据流交叉会打乱这种模式。此时可以用Flink的Kafka-partition-aware生成水印,即在kafka consumer内部按分区生成水印,并且以数据流合并的方式合并水印。
operators如何处理水印
operator接收到watermark后,应该先把由watermark触发的所有操作都完成,再将watermark转发给下游。
内置的watermark generator
- 单调增加的时间戳
WatermarkStrategy.forMonotonousTimestamps();
- 有界乱序的延迟
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));