时间概念
Flink在流程序中支持三种时间概念:
- 处理时间(Processing Time):处理时间是指执行相应操作的机器的系统时间。
当流程序在处理时间上运行时,所有基于时间的操作(如时间窗口)将使用运行各自操作符的机器的系统时间。
处理时间是最简单的时间概念,不需要流和机器之间的协调。它提供最佳性能和最低延迟。但是,在分布式和异步环境中,处理时间不提供确定性,因为它很容易受到记录到达系统的速度,记录在系统内部操作符之间流动的速度以及中断的影响。
- 事件时间(Event Time):事件时间是每个单独事件在其生成设备上发生的时间。
这个时间通常在记录输入Flink之前嵌入到记录中,并且可以从每个记录中提取事件时间戳。在事件时间中,时间的进展取决于数据,而不是任何挂钟。
事件时间程序必须指定如何生成事件时间Watermarks,这是表示事件时间进度的机制。
- 摄入时间(Ingestion time):摄入时间是事件进入Flink的时间。
在源操作符中,每个记录以时间戳的形式获取源的当前时间,基于时间的操作(如时间窗口)引用该时间戳。
从概念上讲,摄入时间介于事件时间和处理时间之间。与事件时间相比,摄入时间程序不能处理任何无序事件或延迟数据,但程序不必指定如何生成Watermarks,因为在内部,它自动进行时间戳分配和自动Watermarks生成。
设定时间特征
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
event time和watermarks
设置时间特征为事件时间的流处理器需要一种方法来衡量事件时间的进度,Flink衡量event time 进度的机制是watermarks,watermark 带有一个时间戳,作为数据流的一部分随数据流流动,Watermark(t) 表示event time 小于等于 t 的都已经到达,watermarks可以解决实时系统中最常见的问题:乱序与延迟。
生成watermark的方法:
- 在source中,直接生成watermark,由source生成的watermark 优先级比较低,可以被另一个方法产生的watermark覆盖掉。
- 通过时间戳分配器(timestamp assigner)来生成水印(watermark)。时间戳分配器分两种:Periodic: 周期性(一定时间间隔或一定数据量)产生watermark;Punctuated: 间断的 watermark,一般根据event 决定是否产生新watermark。
方法一:直接在source中生成watermark
override def run(ctx: SourceContext[MyType]): Unit = {
while (/* condition */) {
val next: MyType = getNext()
ctx.collectWithTimestamp(next, next.eventTimestamp)
if (next.hasWatermarkTime) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime))
}
}
}
方法二:通过时间戳分配器生成watermark
首先需要指定时间戳分配器,时间戳分配器通常在数据源之后立即指定,但并非严格要求这样做。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[MyEvent] = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter())
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
.filter( _.severity == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
withTimestampsAndWatermarks
.keyBy( _.getGroup )
.timeWindow(Time.seconds(10))
.reduce( (a, b) => a.add(b) )
.addSink(...)
周期性时间戳分配器,如下所示:可以看出,自定义的时间戳分配器需要实现AssignerWithPeriodicWatermarks 接口,其中getCurrentWatermark 产生新的watermark,如果返回非空且大于原来的watermark,则生成了新的watermark;另外,extractTimestamp 用于给数据加上时间戳,这个时间戳在后续所有基于event time的计算中使用。
ExecutionConfig.setAutoWatermarkInterval(...) 定义了watermark产生的时间间隔,单位是毫秒。
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxOutOfOrderness = 3500L // 3.5 seconds
var currentMaxTimestamp: Long = _
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current highest timestamp minus the out-of-orderness bound
new Watermark(currentMaxTimestamp - maxOutOfOrderness)
}
}
间断性的时间戳分配器,如下所示:根据event来确定是否需要产生新的watermark,定义Punctuated 分配器需要实现AssignerWithPunctuatedWatermarks接口,包括函数extractTimestamp,checkAndGetNextWatermark,其中extractTimestamp 同Periodic Assigner,首先调用;然后调用checkAndGetNextWatermark ,用于确定是否需要产生新的watermark,当checkAndGetNextWatermark 产生一个非空且大于上一个watermark时就产生了新的watermark。
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}
override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
}
}
Flink允许程序员分配自己的时间戳并发出自己的watermarks。更具体地说,可以通过实现其中一个AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks接口来实现,具体取决于用例。简而言之,第一个会周期性地发出watermarks,而第二个会根据传入记录的某些属性发出watermarks。为了进一步简化此类任务的编程工作,Flink提供了两种已经实现的时间戳分配器:AscendingTimestampExtractor和BoundedOutOfOrdernessTimestampExtractor。
- AscendingTimestampExtractor: 这是AssignerWithPeriodicWatermarks 的最简单的情况,数据流是按时间戳升序到达Flink的,这种情况下,数据里的时间戳就可以作为watermark。
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
- BoundedOutOfOrdernessTimestampExtractor: 这也是一个AssignerWithPeriodicWatermarks 的实现,表示已知数据的最大延迟,在丢弃元素之前允许元素延迟的最长时间,大于最大延迟时长的元素将被丢弃。
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))