Event Time / Processing Time / Ingestion Time
Flink 在流应用中支持不同的时间概念:
- Processing Time: 处理时间是指执行相应操作所在的机器的系统时间。
当一个流应用使用“处理时间”的概念时,所有的基于时间的操作符(如 时间window)会使用执行该操作符所在的机器的时间。一个按小时聚合数据的window函数会包含两个整点时间之间,所有到达该操作符的数据。例如,如果一个应用在上午9:15开始运行,那么第一个按小时聚合的window会聚合9:15到10:00的数据,下一个window会聚合10:00到11:00的数据,以此类推。
处理时间是最简单的时间概念,它不需要在数据流与机器间进行协调。它提供了最好的性能与最低的延迟。但是,在分布式,异步的环境中,处理时间不会提供结果一致性,因为结果会受到各种因素的影响,如:数据到达系统的速度(如从消息队列消费的速度),系统内两个operator间的数据流的速度,以及停电(有计划停电或其他因素)。(注:这很容易理解,processing time语义下,消息的速度会对结果产生很大的影响,比如window操作符,同样的数据,如果消息速度快,则第一个window会计算100个数据,消息速度慢,则第一个window可能只会计算10个数据,显然两种情况下的window的计算结果不会相同) - Event Time:事件时间是指数据从设备中产生的时间。这个时间一般都内置在数据中,作为事件时间戳,在进入flink前就存在,且可以被flink从数据中抽取出来。在Event time语义中,时间的快慢取决于数据,而不是系统时间。Event Time语义的程序必须定义如何产生 Event Time Watermarks,watermark机制用于标识当前事件时间的进度(即:在事件时间语义下,当前时间是多少)。关于watermark机制会在本文后面介绍。
在理想情况下,事件时间语义下的处理可以做到结果的一致性与完整性,而不管数据到达的时间与顺序。然而,除非数据确定是按(时间戳)顺序到达的,否则事件时间语义下的处理会因为等待可能产生的乱序(迟到)数据而造成一定的延迟。但是又由于它只能等待一个有限的一段时间,因此它限制了应用在事件时间语义下的结果完整性的程度。
假设所有的数据都已经到达,事件时间语义下操作符会按照预期的那样执行操作,并产生准确且一致性的结果,即便因为有迟到数据而造成数据的乱序,或是处理历史数据。例如,按小时聚合的window仅会包含该window所代表的时间段的数据,不管数据到达的window的顺序或是它们在真实世界的什么时间被处理(注:这里与processing time形成对比,对于历史数据,event time语义下的window不管何时运行应用,每次都会聚合相同的数据进入同一个window,这是因为对window的定义是按照数据内置的时间戳来定义的,而在processing time语义下处理历史数据,则可能不应该在同一个window中执行聚合操作的数据被分配到了同一个window,造成结果的不准确,多次运行也可能出现每次结果都不一致)
需要注意的是,有些时候,在事件时间语义下处理实时数据时,为了保证及时处理,会使用一些processing time 操作符(注:这一句话现在理解了,在读完整篇文档后,就会有所理解,详细的解释,在后面的“空转的Source”部分) -
Ingestion time:摄入时间是指数据进入flink的时间。在Source操作符中,每个数据都会获取source操作符的当前时间作为其时间戳,下游的基于时间操作符(如window)处理时使用的就是这个时间。
摄入时间的概念介于事件时间与处理时间之间。对比于处理时间,摄入时间比其消耗更多一些,但是提供可预测的结果。因为摄入时间使用一个稳定的时间戳(由source指定),不同的window操作符操作该数据时,都会被引用相同的时间戳;而处理时间语义下的window操作符可能会将同一个数据放入不同的window中(取决于本地系统时间或者数据传输延迟)。
对比于事件时间,摄入时间语义下的程序不能处理乱序或者迟到数据,但是程序不再需要指定如何生成watermark。
在flink内部,对摄入时间的处理更像是使用自动分配timestamp以及自动生成watermark的事件时间。
设置时间语义
Flink DataStream 程序的第一部分通常会设置 时间语义(time characteristic)。这个设置定义了生成数据流的数据源的行为(如,是否需要指定timestamp),以及window操作符应该使用哪种时间概念对数据进行处理。
下面的例子展示了Flink程序按小时聚合数据。window的行为和时间语义相对应。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
stream
.keyBy( (event) -> event.getUser() )
.timeWindow(Time.hours(1))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
需要注意的是,想要在事件时间语义下运行该程序,需要使用能够对每个数据定义事件时间以及生成watermark的source,如果source不满足这个条件,程序需要在source后,指定timestamp assigner以及watermark generator。这些函数描述了如何从数据中提取事件时间以及该程序所能够处理的数据最大乱序程度。
接下来的部分描述了timestamp与watermark背后的一些机制。对于如何使用timestamp assigner以及watermark generator,可以参阅 Generating Timestamps / Watermarks
Event Time 与 Watermarks
注意:Flink实现了Dataflow Model中的许多技术。为了更好的介绍 event time 与 watermarks,建议先阅读下面的文章:
- Streaming 101 by Tyler Akidau
- The Dataflow Model paper
支持event time 的流处理器需要一个方法来衡量当前event time时间标尺下的处理进度。例如,一个按小时聚合数据的window操作,流处理器需要在event time超过window的end time后,提醒window你可以执行操作并关闭window了。
event time与processing time(注:这一段话中,将processing time替换为 系统时间 会更好理解写2)是独立处理的。例如:在一个应用中(注:处理实时数据时),当前event time可能稍稍落后于processing time,但是两者都会以相同的速度流逝。但是另一方面,另一个流程序可能会在几秒内就能够处理在event time语义下的几周的数据,比如读取kafka topic中的历史数据。
(注:为了简单的理解event time与processing time 的不同,可以理解为event time是以数据自带的时间戳的时间作为时间坐标系或者时间标尺,而processing time是以真实世界中,程序正在运行时的时间作为时间坐标系或时间标尺)
Flink中衡量event time的概念称作 watermark。Watermark作为数据流的一部分,并且会携带一个时间戳 t。Watermark(t) 表示当前event time坐标系下的数据流已经达到了时间t,这意味着应该不会再有时间戳小于等于t的数据出现(注:也就是不会再有迟到数据)
(注:为什么需要watermark?processing time是真实世界的时间,时间是平滑流逝的,而event time是从数据中抽取出来的时间,是离散的时间,因此引入watermark的概念,就是为了分隔不同的数据到不同的window中去,并且告知window数据全部到达,可以执行计算了。一个很简单的例子:一个window要聚合event time 在 10:00:00 到 10:10:00的数据,而真实流入window操作符的event time可能不会这么凑巧,最后一个数据的时间戳恰好就是 10:10:00,因此watermark的作用就是告知window,event time的坐标系下,已经把所有合适的数据聚合起来了,你这个window可以执行计算了)
下面的图展示了流中数据的时间戳以及流中的watermark。示例中的数据是按照时间戳的顺序进行的排列,意味着watermark在流中是周期性产生的。
对于处理乱序的数据流,watermark起到很关键的作用,如下图,数据并没有按照时间戳的顺序排列好。一般来说,watermark意味着在watermark产生的时刻起,所有时间戳小于watermark的数据都已经到达了。一旦watermark到达一个操作符,操作符就会将它内部的event time时间表调整到watermark所代表的时间。
注意event time的值要么来自刚刚流入source的数据的内置时间戳,要么是来自该数据所触发的watermark的时间戳。
并发流中的Watermark
watermark在source function中生成或是在其后通过方法来生成。source的每个并发的subtask一般都会生成各自的watermark。这些watermark定义了在某个source中的event time的值(即,在event time语义下,当前时间是几点)。
因为watermark会随着数据流在程序中流动,因此当它到达某个operator后,就会修改这个operator的event time的值。一旦operator修改了event time的值,就会生成一个新的watermark向下游传播。
一些操作符会消费多个数据流;如:union操作,或者是紧跟着keyBy/partition函数的操作符。这些操作符当前的event time是所有数据流中最小的event time值。当输入流更新时间后,操作符也会更新时间。
下图展示了并发流中watermark的流动以及操作符如何追踪event time的。
注意:kafka source支持 per-partition watermark,你可以参阅这里
迟到数据
事实上,即便watermark的概念可以理解为,所有时间戳小于watermark的数据都已经到达。但是真实情况下,确实会有数据违反了这样的约定,这意味着,即便生成了watermark(t),但是仍然会有时间戳t' <= t的数据在随后进入数据流。事实上,在许多真实案例中,某些数据确实会延迟到达,这使得定义一个所有数据都已经进入数据流的时间变得不可能。此外,即便迟到数据的迟到时间是有最大时间界限的,但是推迟太长时间再生成watermark(这样才能保证迟到的数据被正确处理)也不是我们期望的,因为这会造成处理的延迟。
针对这个情况,流程序需要明确迟到数据。迟到数据是指那些,当数据到达时,系统event time表的时间已经流过了这个时间戳的数据。可以查阅 Allowed Lateness 文档学习如何处理event time语义下window操作符的迟到数据。
空转的source Idling Source
目前,仅使用event time watermark generator时,如果没有数据流入,就不会有watermark产生。这意味着,如果数据流突然没有数据流入了,event time不会再向前流动(系统中的所有操作符的event time不会更新)。如:window操作符不会被新的watermark触发计算操作,也就不会有这window的结果输出。
为了规避这个问题,可以使用 periodic watermark assigner,它并不仅仅基于数据的时间戳生成watermark。一个示例的解决方法可以是:当检测到有一段时间没有数据流入时,可以使用processing time生成watermark。
Source可以使用 SourceFunction.SourceContext#markAsTemporarilyIdle来定义怎样情况下该source处于空转状态。更多信息可以参阅Javadoc的文档以及 StreamStatus。
调试watermark
参阅 Debugging Window & Event Time 部分学习如何在运行时调试。
操作符如何处理watermark
操作符需要处理完给定的watermark后才可以将其转发给下游操作符。如:WindowOperator 首先会计算那个window应该被触发,并且直到得到所有应该触发计算的window的结构后,才会吧watermark发送到下游。换句话说,触发watermark生成的数据,应该比watermark更早的加入流(注:这样才会保证计算结果的完整性和正确性)。
相同的规则适用于 TwoInpuStreamOperatro 。不同的是,这时,operator中的watermark是其所有输入流中watermark最小的那个。
详细的过程在这几个接口的实现类的方法中被定义:OneInputStreamOperator#processWatermark, TwoInputStreamOperator#processWatermark1 与TwoInputStreamOperator#processWatermark2