Flink 中提供了3种时间模型:EventTime、ProcessingTime、IngestionTime。
底层实现上分为2种:Processing Time 与 Event Time,Ingestion Time 本质上也是一种 Processing Time,官方文档 上对于3者的描述(参考下图):
- EventTime 是事件创建的时间,即数据产生时自带时间戳。
- IngestionTime 是事件进入 Flink 的时间,即进入 source operator 是给定的时间戳。
- ProcessingTime 是每一个执行 window 操作的本地时间。
可以参考以下两篇 Blog 和 Paper 帮助对时间域的理解,也是官方推荐的
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102
https://static.googleusercontent.com/media/research.google.com/zh-CN//pubs/archive/43864.pdf
附笔者 翻译1 翻译2 翻译3
Flink 如何设置时间域?
调用 setStreamTimeCharacteristic
设置时间域,枚举类 TimeCharacteristic
预设了三种时间域,不显式设置的情况下,默认使用 TimeCharacteristic.ProcessTime
。这也是 Flink 程序一般最开始的工作。
# Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// 可选的:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
EventTime 与 WaterMarks
为什么必须处理事件时间?
在大多数情况下,消息进入系统中是无序的(网络、硬件、分布式逻辑都可能影响),并且会有消息延迟到达(例如移动场景中,由于手机无信号,导致一系列的操作消息在手机重新连接信号后发送),如果按照消息进入系统的时间计算,结果会与实时严重不符合。理想情况是 event time 和 processing time 是一致的(发生时间即处理时间),但是现实情况是不一致的,两者存在歪斜(skew)。
因此,支持事件时间的流式处理程序需要一种方法来测量事件时间的进度。例如,有一个按小时构建的窗口,当事件时间超过了一小时的时间范围,需要通知该窗口,以便关闭正在进行的窗口。
什么是水印(watermarks)
Flink 中检测事件时间处理进度的机制就是水印,Watermark 作为数据处理流中的一部分进行传输,并且携带一个时间戳t
。一个 Watermark(t) 表示流中应该不再有事件时间比t
小的元素(某个事件的时间戳比 Watermark 时间大)。
Watermark 有助于解决乱序问题
下图表示一个顺序的事件流中的 Watermark, Watermark 只代表一个简单的标记,
下图表示一个乱序的事件流中的 Watermark,表示所有事件时间戳小于 Watermark 时间戳的数据都已经处理完了,任何事件大于 Watermark 的元素都不应该再出现,当然这只是一种推测性的结果(基于多种信息的推测),
并行流中的水印
水印是在 Source function(源函数)处或之后生成的。源函数的每个并行子任务通常独立地生成水印。这些水印定义了该特定并行源的事件时间。
当水印经过流处理程序时,会将该算子的事件时间向前推进。当算子提前其事件时间时,会为后续的算子生成新水印。
一些算子使用多个输入流,例如,使用 union 或者 keyBy/partition 函数的算子。此类算子的当前事件时间是其输入流事件时间的最小值。
当它的输入流更新它们的事件时间时,算子也会更新。
下图显示了事件和水印经过并行流的示例,以及跟踪事件时间的运算符。
延迟记录(Late Elements)
某些记录可能会违反水印的条件,事件时间小于t但是晚于水印t到达。实际运行过程中,事件可能被延迟任意的时间,所以不可能指定一个时间,保证该时间之前的所有事件都被处理了。而且,即使延时时间是有界限的,过多的延迟水印的时间也是不理想的,会造成时间窗口处理的太多延时。
生成时间戳和水印
- 首先设置时间域
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- 分配时间戳
处理事件事件需要知道事件发生时间的时间戳,通常从流中数据元的某个字段提取时间戳。时间戳分配与生成水印密切相关,水印告诉系统事件时间的进展。
有两种方法可以分配时间戳并生成水印:
- 直接在数据流 source 中
- 通过时间戳分配器/水印生成器:在 Flink 中,时间戳分配器也会定义要发出的水印
带时间戳和水印的 Source Functions
Stream source 可以直接为生成的数据元分配时间戳,也可以发出水印。完成此 算子操作后,不需要时间戳分配器。如果使用了时间戳分配器,则 source 函数提供的任何时间戳和水印都将被覆盖。
要直接为源中的数据元分配时间戳,源必须使用 collectWithTimestamp(...)
方法作用域 SourceContext
。要生成水印,源必须调用 emitWatermark(Watermark)
函数。
下面是一个分配时间戳并生成水印的简单示例:
override def run(ctx: SourceContext[MyType]): Unit = {
while (/* condition */) {
val next: MyType = getNext()
ctx.collectWithTimestamp(next, next.eventTimestamp)
if (next.hasWatermarkTime) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime))
}
}
}
时间戳分配器/水印生成器
时间戳分配器(Timestamp assigners)获取流并生成带有带时间戳数据元和水印的新流。如果原始流已经有时间戳或水印,时间戳分配器会覆盖它们。
时间戳分配器通常在数据源生成之后立即指定,但并非被严格要求这样做。常见的模式是在时间戳分配器之前执行解析(MapFunction)和过滤(FilterFunction)。在任何情况下,需要在第一个操作事件时间的算子执行之前指定时间戳分配器(例如第一个窗口算子操作)。
作为一种特殊情况,当使用 Kafka 作为流式作业的数据源时 ,Flink 允许在源(或消费者)本身内部指定时间戳分配器/水印发射器。更多信息相关信息请参考 Kafka Connector 文档。
下面是一个时间戳分配器/水印生成器的简单示例(只介绍了必须实现的主要接口):
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// create stream source
val stream: DataStream[MyEvent] = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter())
// assign timestamp and watermark assigner after filter function
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
.filter( _.severity == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
// window function and sink
withTimestampsAndWatermarks
.keyBy( _.getGroup )
.timeWindow(Time.seconds(10))
.reduce( (a, b) => a.add(b) )
.addSink(...)
使用周期性(periodically)水印
AssignerWithPeriodicWatermarks 分配时间戳并定期生成水印(可能取决于流数据元,或纯粹基于处理时间)。
生成水印的间隔(每n毫秒)使用 ExecutionConfig.setAutoWatermarkInterval(...)
。每次调用分配器的方法 getCurrentWatermark()
,如果返回的水印非空并且大于先前的水印,则将发出新的水印。
下面有两个例子,时间戳分配器使用周期性水印:
该例子假定元素到达时在一定程度上是无序的,某个时间戳t
的最后达到元素相比时间戳t
的最早到达元素,最大延迟n毫秒。
/**
The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxOutOfOrderness = 3500L // 3.5 seconds
var currentMaxTimestamp: Long = _
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current highest timestamp minus the out-of-orderness bound
new Watermark(currentMaxTimestamp - maxOutOfOrderness)
}
}
该例子假设元素在有界延迟后到达,生成器生成的水印比处理时间滞后固定时间长度。
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxTimeLag = 5000L // 5 seconds
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current time minus the maximum time lag
new Watermark(System.currentTimeMillis() - maxTimeLag)
}
}
第二个例子比较容易理解,使用系统时间减去允许的延时时间作为 watermark 的时间。只跟当前系统时间有关系,如果大批事件出现延时的情况,可能很多在 watermark 的时间之后出现了,会被被丢弃。
第一个例子,在当前事件的事件时间和当前最大时间(记录最大的事件时间)中取最大值,得到最大的事件时间。用这个最大值减去一个允许的延时时间作为 watermark 时间。同样的如果大批事件发生延时,那么对应的 watermark 的时间就会向后推。
带标记(Punctuated)水印
使用 AssignerWithPunctuatedWatermarks
在某个事件指定生成新的水印的时候生成水印。这种情况下,Flink 首先会调用 extractTimestamp(...)
方法为数据分配时间戳,然后立即调用 checkAndGetNextWatermark(...)
。
checkAndGetNextWatermark(...)
方法传递在 extractTimestamp(...)
生成的时间戳,并且界定是否要生成水印。每当 checkAndGetNextWatermark(...)
方法返回非空水印,并且该水印大于先一个水印时,将向后发出新水印。
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}
override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
}
}
每个事件都可以生成水印。但是,由于水印会导致一些后续的计算,因此过多的水印会降低性能。
每个 Kafka 分区一个时间戳
当使用 Kafka 作为数据源的时候,每个分区可能有一个简单的事件时间模式(按时间戳升序或其他)。当消费来自 Kafka 的流时,多个分区一般会并行消费,分区中的事件交替消费,会破坏分区中的模式(Kafka 的消费者客户端工作方式)。
在这种情况下,可以使用 Flink 的 Kafka-partition-aware(分区感知)水印生成器。使用这个特性的时候,水印会在 Kafka 消费者内部为每个分区生成,并且每个分区水印的合并方式与在流shuffle时合并水印的方式相同。
例如,如果事件时间戳严格按每个 Kafka 分区升序排列,那么使用升序时间戳水印生成器,为每分区生成水印 将产生完美的总体水印。
下图显示了如何为每个 Kafka 分区生成水印,以及在这种情况下水印如何通过流式数据流传播。
val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
// kafka source set timestamp assigner
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})
val stream: DataStream[MyType] = env.addSource(kafkaSource)
具有递增时间戳的 Assigner
定期生成水印的最简单的特殊情况是,给定的源任务看到的时间戳按升序出现的情况。在这种情况下,当前时间戳始终可以充当水印。
时间戳只需要在每个并行数据源任务中是升序的。例如,如果在特定设置中,每个并发的源实例读取一个 Kafka 分区,则只需要在每个 Kafka 分区中时间戳是递增。水印合并机制将生成正确的水印,当并行流被shuffle,union,connect 或 merge 时。
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
允许固定时间延迟的 Assigner
另一个定期水印的例子是,当水印滞后于流中看到的最大时间戳(事件时间)一段固定的时间。包括,预先知道流中可能遇到的最大延迟的情况。Flink 提供了 BoundedOutOfOrdernessTimestampExtractor
,使用参数 maxOutOfOrderness
,计算给定窗口的最终结果时,允许元素延迟的最长时间,超过的会被忽略。延迟为 t - t_w
(t
是数据的事件时间时间戳,t_w
是前一个水印的时间戳),如果延迟大于0,数据被认为是迟到的,默认会在计算窗口的作业结果时被忽略。
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamps_watermarks.html
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html
http://vishnuviswanath.com/flink_eventtime.html