Event Time / Processing Time / Ingestion Time
Flink在streaming程序中支持不同的时间概念。
- Processing time:处理时间是指执行相应操作机器的系统时间。当流程序在处理时间上运行时,所有基于时间的操作(如时间窗口)将使用当前运行机器的系统时间。每小时处理时间窗口包括在系统时间每小时内到达的所有指定操作记录。例如:如果应用程序在上午9:15开始运行,第一个小时处理时间窗口将包括上午9:15到10:00之间处理的事件,下一个窗口将包含上午10:00到11:00之间处理的事件,以此类推。
处理时间是最简单的时间概念,不需要流和物理机之间的协调,它有最好的性能和最低的延迟。然而,在分布式和异步环境中,处理时间具有不确定性,因为它容易受到系统之间操作记录传输速度以及中断(计划或其他)的影响,导致数据延迟。
- Event time:事件时间是每个独立事件在其生成设备上发生的时间,通常是在进入到Flink之前就嵌入在记录中的时间。并且可以从每个记录中提取事件时间戳,在事件时间中,决于数据产生的时间,而不是当前系统时间。事件时间程序必须指定如何生成事件
Watermarks,用来保证事件时间的有序性
。Watermarks
机制将在下一节中进行描述。
事件时间处理将产生完全一致和确定的结果,无论事件何时到达或顺序如何。但是,除非事件是按顺序(通过时间戳)到达的,否则在等待无序事件时,事件时间处理会产生延迟。因为等待的时间是有限的,这就限制了事件时间应用程序的确定性。
假设所有数据都已到达,事件时间操作将按照预期进行,即使在处理无序延迟的事件或重新处理历史数据,也会产生正确一致的结果。例如,每小时事件时间窗口将包含所有带有属于该小时的事件时间戳的记录,而不管它们到达的顺序如何,也不管它们是在什么时候处理的。(有关延迟事件的更多信息,请参见“延迟事件”一节)。
请注意,有时当事件时间程序实时处理数据时,将使用一些处理时间操作,以确保及时地进行处理。
-
Ingestion time:摄入时间(Ingestion Time)是事件进入Flink的时间,在源操作中每个记录都会获得源的当前时间作为时间戳,后续基于时间的操作(如: time window)会依赖这个时间戳
摄入时间从概念上来讲是处在事件时间和处理时间之间,与处理时间相比,成本可能会高一点,但是会提供更加可预测的结果。因为摄入时间使用的是固定的时间戳(都是在源处指定的),记录中的不同窗口操作依赖同一个时间戳,而在处理时间中每个窗口操作可能将记录赋给不同的窗口(根据本地的系统时钟和传输时延)。
与事件时间相比,摄入时间程序不能处理任何无序事件或者延迟事件,但是程序无需指定如何产生水印。
与事件时间相比,摄入时间程序不能处理无序的事件或延迟数据,所以不必指定生成Watermark。在内部,摄入时间与事件时间非常相似,但是是自动时间戳分配和自动产生水印。
设置时间特性(Setting a Time Characteristic)
Flink DataStream 程序的第一部分常常是设置时间特性,这个设置定义了数据流源的行为(例如: 它们是否分配时间戳) 以及window操作需要用哪个时间概念,如:KeyedStream.timeWindow(Time.seconds(30))
。
下面的例子中展示了一个在小时级时间窗口中聚合事件的Flink程序,window的行为与时间特性相适应。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))
stream
.keyBy( _.getUser )
.timeWindow(Time.hours(1))
.reduce( (a, b) => a.add(b) )
.addSink(...)
注意,为了在事件时间中运行此示例,程序需要使用定义事件时间和自己触发watermarks
的源数据,或者程序必须在源文件之后注入一个Timestamp Assigner
和 Watermark Generator
。这些函数描述了如何获取事件的时间戳以及哪些程度的无序事件流需要展示。
下一节描述Timestamp Assigner
和 Watermark Generator
背后的机制。有关如何在Flink DataStream API中使用Timestamp Assigner
和 Watermark Generator
的指南,请参考Generating Timestamps / Watermarks
事件时间和水印(Event Time and Watermarks)
注意:Flink实现了很多Dataflow Model
中的技术。想要获得更多关于event time和watermark
的介绍,请查看下面的文章:
Streaming 101 by Tyler Akidau
The Dataflow Model paper
一个支持事件时间的流处理器需要一种方式来衡量事件时间的进度,例如:一个小时级的窗口操作在事件时间达到小时的末尾之后需要被通知,以便操作能够在进程中关闭窗口。
Event Time可以独立于处理时间来进行,例如:程序中一个操作的当前事件时间可能会稍微晚于处理中的时间,而它们是以相同的速度进行的;另一方面,一个流程序可能处理一周的事件时间,通过快速转发缓存在Kafka topic中的数据,仅用几秒的时间来处理。
Flink衡量event time 进度的机制是watermark,watermark流作为数据流的一部分,携带了一个时间戳t
。一个Watermark(t)
声明了数据流中的事件时间已经到t
了,也就是说数据流中不应该再有时间戳t'<=t
的元素了。
下图中展示了一个带时间戳的事件流,并且内联一个水印。在这个例子中,事件是有序的,也就是说水印仅是在流中做简单标记而已。
Watermark
对于无序流至关重要,如下图所示,事件不是按时间戳排序的。一般来说,Watermark
是一种声明在流中的那个点,在某个时间戳之前的所有事件都应该已经到达。一旦watermark
到达一个操作,这个操作就可以将事件时间的时钟提前到watermark
值的前面。请注意,事件时间由新创建的流元素(或多个元素)继承,这些流元素或来自生成它们的事件,或来自触发这些元素创建的水印。
平行流中的水印(Watermarks in Parallel Streams)
Watermarks
是在源函数处或之后直接生成的。源函数的每个并行子任务通常独立生成Watermarks
。这些水印定义了特定并行源的事件时间。
因为watermark流贯穿整个流程序,他们在到达的操作处提前了事件时间。当一个操作提前它的事件时间,它都会为下游的后继操作生成一个新的watermark
。
一些操作使用多个输入流;例如,union,keyBy(…), partition(…)函数后面的操作。这样一个操作符的当前事件时间是其输入流的事件时间的最小值。当输入流更新它们的事件时间时,操作也会更新。
下图展示了一个事件和水印流经并行流的示例,以及跟踪事件时间的操作。
延迟元素(Late Elements)
某些元素有可能会违反水印条件,这意味着在Watermark(t)
发生之后,还会出现更多带有时间戳t ' <= t
的元素。实际上,在许多设置中,某些元素可以任意延迟,因此不可能指定某个事件时间戳的所有元素发生的时间。此外,延迟是有界的,水印延迟太多通常也是不可取的,因为它会在事件时间窗的评估中造成太多的延迟。
为此,流程序可能会显式地预期一些延迟元素。延迟元素是在系统时间(如水印所示)已经超过延迟元素的时间戳时间之后到达的元素。有关如何在事件时间窗口中处理延迟元素的更多信息,请参见“允许延迟”。
闲置资源(Idling sources)
目前,使用纯事件时间水印生成器(watermarks generators)
,如果没有要处理的元素,水印无法进行处理。这意味着在传入数据中出现间隙时,事件时间不会进展,例如窗口操作不会被触发,因此现有的窗口将无法生成任何输出数据。为了避免这种情况,可以使用周期性的水印分配器,而不是基于元素时间戳进行分配。一个示例解决方案可以是切换的分配程序在一段时间没有观察到新的事件之后,使用当前的处理时间作为时间基础。
可以使用SourceFunction.SourceContext#markAsTemporarilyIdle
将源标记为空闲。有关详细信息,请参考此方法的Javadoc以及StreamStatus
。
如何操作处理水印(How operators are processing watermarks)
一般来说,在将指定的水印转发给下游之前,操作人员需要对其进行完整的处理。例如,WindowOperator
首先评估应该触发哪些窗口,并且只有在产生所有由水印触发的输出后,水印本身才会被发送到下游。换句话说,由于出现水印而产生的所有元素都将在水印之前发出。
同样的规则也适用于TwoInputStreamOperator
。但是,在这种情况下,运算符的当前水印被定义为两个输入的最小值。
实现定义的详细信息OneInputStreamOperator.processWatermark
、TwoInputStreamOperator.processWatermark1
和TwoInputStreamOperator.processWatermark2
方法。