- 基于flink-1.8.1
- 基于flink官网
概述
-
实时计算中,数据时间比较敏感。有eventTime和processTime区分,一般来说eventTime是从原始的消息中提取过来的,processTime是Flink自己提供的,Flink中一个亮点就是可以基于eventTime计算,这个功能很有用,因为实时数据可能会经过比较长的链路,多少会有延时,并且有很大的不确定性,对于一些需要精确体现事件变化趋势的场景中,单纯使用processTime显然是不合理的。
Flink WaterMark介绍
- watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。通常基于Event Time的数据,自身都包含一个timestamp.watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。
- 流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
- 但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。
并行流中的watermark
- 在source functions处或者之后生成watermark,source functions的parallel subtask 通常会独立的生成watermark;这些watermarks定义了该特定parallel source的事件事件;
- 对于流式处理中的,当watermarks达到某个算子时,watermark会将event time提前。每当算子将流中的event time提前,这个算子都会为下游算子生成新的watermark;
- 一些算子会有多个source stream, 例如,一个union,或者跟随keyBy(...)或partition(...)函数的算子。当前输入stream中的event time是多个source stream中的event time的最小值;(
Such an operator’s current event time is the minimum of its input streams’ event times
);由于input stream更新了event time,算子同样会更新event time。如下图所示:
生成 Timestamps / Watermarks
分配Timestamps
- 为了处理事件时间,Flink需要知道事件的时间戳,这意味着流中的每个元素都需要分配其事件时间戳。 这通常通过从元素中的某个字段访问/提取时间戳来完成。
- 时间戳分配与生成watermark密切相关,watermark告诉系统事件时间的进展。
- 有两种方法可以分配时间戳并生成水印:
- 直接在数据流源中;
- 通过时间戳分配器/watermatk生成器:在Flink中,时间戳分配器还定义要发出的watermark;
- Attention:Both timestamps and watermarks are specified as milliseconds since the Java epoch of 1970-01-01T00:00:00Z.
Source Functions with Timestamps and Watermarks
- source functions可以直接为它们生成的元素分配时间戳,它们也可以发出watermark。当完成此操作时,不需要再使用时间戳分配器。请注意,如果使用时间戳分配器,则将覆盖源提供的任何时间戳和水印。
- 要直接为source中的元素分配时间戳,源必须在SourceContext上使用collectWithTimestamp(...)方法。 要生成水印,源必须调用emitWatermark(水印)功能。
- 语法:
java
@Override
public void run(SourceContext<MyType> ctx) throws Exception {
while (/* condition */) {
MyType next = getNext();
ctx.collectWithTimestamp(next, next.getEventTimestamp());
if (next.hasWatermarkTime()) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
}
}
}
scala
override def run(ctx: SourceContext[MyType]): Unit = {
while (/* condition */) {
val next: MyType = getNext()
ctx.collectWithTimestamp(next, next.eventTimestamp)
if (next.hasWatermarkTime) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime))
}
}
}
Timestamp Assigners / Watermark Generators
- Timestamp Assigners获取stream并生成带有带时间戳元素和水印的新流。 如果原始stream中已经有时间戳和/或水印,则时间戳分配器会覆盖它们。
- 时间戳分配器通常在数据源之后立即指定,但并非严格要求这样做。 例如,常见的模式是在时间戳分配器之前解析(MapFunction)和过滤(FilterFunction)。 在任何情况下,需要在事件时间的第一个操作之前指定时间戳分配器(例如第一个窗口操作)。 作为一种特殊情况,当使用Kafka作为流式传输作业的源时,Flink允许在源(或消费者)本身内指定时间戳分配器/水印发射器。 有关如何执行此操作的更多信息,请参阅Kafka Connector文档。
- java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.timeWindow(Time.seconds(10))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
- scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[MyEvent] = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter())
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
.filter( _.severity == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
withTimestampsAndWatermarks
.keyBy( _.getGroup )
.timeWindow(Time.seconds(10))
.reduce( (a, b) => a.add(b) )
.addSink(...)
With Periodic Watermarks周期性watermark
定时提取watermark,这种方式会定时提取更新wartermark。
- AssignerWithPeriodicWatermarks定期分配时间戳并生成水印(可能取决于流元素,或纯粹基于处理时间)。
- 生成水印的间隔(每n毫秒)由ExecutionConfig.setAutoWatermarkInterval(...)定义。 每次调用分配器的getCurrentWatermark()方法,如果返回的水印非空且大于前一个水印,则会发出新的水印。
- 这里我们展示了两个使用周期性水印生成的时间戳分配器的简单示例。 请注意,Flink附带了一个BoundedOutOfOrdernessTimestampExtractor,类似于下面显示的BoundedOutOfOrdernessGenerator;
- java
/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
/**
* This generator generates watermarks that are lagging behind processing time by a fixed amount.
* It assumes that elements arrive in Flink after a bounded delay.
*/
public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current time minus the maximum time lag
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
}
- scala
/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxOutOfOrderness = 3500L // 3.5 seconds
var currentMaxTimestamp: Long = _
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current highest timestamp minus the out-of-orderness bound
new Watermark(currentMaxTimestamp - maxOutOfOrderness)
}
}
/**
* This generator generates watermarks that are lagging behind processing time by a fixed amount.
* It assumes that elements arrive in Flink after a bounded delay.
*/
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxTimeLag = 5000L // 5 seconds
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current time minus the maximum time lag
new Watermark(System.currentTimeMillis() - maxTimeLag)
}
}
With Punctuated Watermarks
伴随event的到来就提取watermark,就是每一个event到来的时候,就会提取一次Watermark。这样的方式当然设置watermark更为精准,但是当数据量大的时候,频繁的更新wartermark会比较影响性能。通常情况下采用定时提取就足够了。
- 当生成watermark的过程中某个event指示器 可能生成新wateramrk,请使用AssignerWithPunctuatedWatermarks。 对于此类,Flink将首先调用extractTimestamp(...)方法为元素分配时间戳,然后立即调用该元素上的checkAndGetNextWatermark(...)方法。
- checkAndGetNextWatermark(...)方法传递在extractTimestamp(...)方法中分配的时间戳,并可以决定是否要生成watermark。 每当checkAndGetNextWatermark(...)方法返回非空watermark,并且该watermark大于最新的先前watermark时,将发出该新watermark。
- demo
public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> {
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
}
}
- scala
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}
override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
}
}
- 注意:可以在每个事件上生成水印。 然而,因为每个水印在下游引起一些计算,所以过多的水印会降低性能.
Timestamps per Kafka Partition
- 当使用Apache Kafka作为数据源时,每个Kafka分区可能具有简单的事件时间模式(升序时间戳或有界无序)。但是,当从Kafka消费流数据时,多个分区通常并行消费,交错来自分区的事件并破坏每个分区模式(这是Kafka的消费者客户端工作的固有方式)。
- 在这种情况下,您可以使用Flink的Kafka分区感知水印生成。使用该功能,根据Kafka分区在Kafka使用者内部生成水印,并且每个分区水印的合并方式与在流shuffle上合并水印的方式相同。
- 例如,如果事件时间戳严格按每个Kafka分区升序,则使用升序时间戳水印生成器生成每分区水印将产生完美的整体水印。
- 下图显示了如何使用per-Kafka分区水印生成,以及在这种情况下水印如何通过流数据流传播。
- demo
FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {
@Override
public long extractAscendingTimestamp(MyType element) {
return element.eventTimestamp();
}
});
DataStream<MyType> stream = env.addSource(kafkaSource);
- scala
val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})
val stream: DataStream[MyType] = env.addSource(kafkaSource)
Pre-defined Timestamp Extractors / Watermark Emitters
- 这部分比较简单,是两个类的讲解,详见官网;
- 建议大家认真阅读一下官网;