[TOC]
在流处理中,时间是一个非常核心的概念,是整个系统的基石。比如,我们经常会遇到这样的需求:给定一个时间窗口,比如一个小时,统计时间窗口的内数据指标。那如何界定哪些数据将进入这个窗口呢?在窗口的定义之前,首先需要确定一个应用使用什么样的时间语义。
本文将介绍Flink的Event Time、Processing Time和Ingestion Time三种时间语义,接着会详细介绍Event Time和Watermark的工作机制,以及如何对数据流设置Event Time并生成Watermark。
Flink的三种时间语义
如上图所示,Flink支持三种时间语义:
Event Time
Event Time指的是数据流中每个元素或者每个事件自带的时间属性,一般是事件发生的时间。由于事件从发生到进入Flink时间算子之间有很多环节,一个较早发生的事件因为延迟可能较晚到达,因此使用Event Time意味着事件到达有可能是乱序的。
使用Event Time时,最理想的情况下,我们可以一直等待所有的事件到达后再进行时间窗口的处理。假设一个时间窗口内的所有数据都已经到达,基于Event Time的流处理会得到正确且一致的结果:无论我们是将同一个程序部署在不同的计算环境还是在相同的环境下多次计算同一份数据,都能够得到同样的计算结果。我们根本不同担心乱序到达的问题。但这只是理想情况,现实中无法实现,因为我们既不知道究竟要等多长时间才能确认所有事件都已经到达,更不可能无限地一直等待下去。在实际应用中,当涉及到对事件按照时间窗口进行统计时,Flink会将窗口内的事件缓存下来,直到接收到一个Watermark,以确认不会有更晚数据的到达。Watermark意味着在一个时间窗口下,Flink会等待一个有限的时间,这在一定程度上降低了计算结果的绝对准确性,而且增加了系统的延迟。在流处理领域,比起其他几种时间语义,使用Event Time的好处是某个事件的时间是确定的,这样能够保证计算结果在一定程度上的可预测性。
一个基于Event Time的Flink程序中必须定义Event Time,以及如何生成Watermark。我们可以使用元素中自带的时间,也可以在元素到达Flink后人为给Event Time赋值。
使用Event Time的优势是结果的可预测性,缺点是缓存较大,增加了延迟,且调试和定位问题更复杂。
Processing Time
对于某个算子来说,Processing Time指算子使用当前机器的系统时钟来定义时间。在Processing Time的时间窗口场景下,无论事件什么时候发生,只要该事件在某个时间段达到了某个算子,就会被归结到该窗口下,不需要Watermark机制。对于一个程序在同一个计算环境来说,每个算子都有一定的耗时,同一个事件的Processing Time,第n个算子和第n+1个算子不同。如果一个程序在不同的集群和环境下执行时,限于软硬件因素,不同环境下前序算子处理速度不同,对于下游算子来说,事件的Processing Time也会不同,不同环境下时间窗口的计算结果会发生变化。因此,Processing Time在时间窗口下的计算会有不确定性。
Processing Time只依赖当前执行机器的系统时钟,不需要依赖Watermark,无需缓存。Processing Time是实现起来非常简单也是延迟最小的一种时间语义。
Ingestion Time
Ingestion Time是事件到达Flink Souce的时间。从Source到下游各个算子中间可能有很多计算环节,任何一个算子的处理速度快慢可能影响到下游算子的Processing Time。而Ingestion Time定义的是数据流最早进入Flink的时间,因此不会被算子处理速度影响。
Ingestion Time通常是Event Time和Processing Time之间的一个折中方案。比起Event Time,Ingestion Time可以不需要设置复杂的Watermark,因此也不需要太多缓存,延迟较低。比起Processing Time,Ingestion Time的时间是Souce赋值的,一个事件在整个处理过程从头至尾都使用这个时间,而且后续算子不受前序算子处理速度的影响,计算结果相对准确一些,但计算成本稍高。
设置时间语义
在Flink中,我们需要在执行环境层面设置使用哪种时间语义。下面的代码使用Event Time:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
如果想用另外两种时间语义,需要替换为:TimeCharacteristic.ProcessingTime和TimeCharacteristic.IngestionTime。
Event Time和Watermark
Flink的三种时间语义中,Processing Time和Ingestion Time都可以不用设置Watermark。如果我们要使用Event Time语义,以下两项配置缺一不可:第一,使用一个时间戳为数据流中每个事件的Event Time赋值;第二,生成Watermark。
实际上,Event Time是每个事件的元数据,Flink并不知道每个事件的发生时间是什么,我们必须要为每个事件的Event Time赋值一个时间戳。关于时间戳,包括Flink在内的绝大多数系统都支持Unix时间戳系统(Unix time或Unix epoch)。Unix时间戳系统以1970-01-01 00:00:00.000 为起始点,其他时间记为距离该起始时间的整数差值,一般是毫秒(millisecond)精度。
有了Event Time时间戳,我们还必须生成Watermark。Watermark是Flink插入到数据流中的一种特殊的数据结构,它包含一个时间戳,并假设后续不会有小于该时间戳的数据。下图展示了一个乱序数据流,其中方框是单个事件,方框中的数字是其对应的Event Time时间戳,圆圈为Watermark,圆圈中的数字为Watermark对应的时间戳。
- Watermark与事件的时间戳紧密相关。一个时间戳为T的Watermark假设后续到达的事件时间戳都大于T。
- 假如Flink算子接收到一个违背上述规则的事件,该事件将被认定为迟到数据,如上图中时间戳为19的事件比Watermark(20)更晚到达。Flink提供了一些其他机制来处理迟到数据。
- Watermark时间戳必须单调递增,以保证时间不会倒流。
- Watermark机制允许用户来控制准确度和延迟。Watermark设置得与事件时间戳相距紧凑,会产生不少迟到数据,影响计算结果的准确度,整个应用的延迟很低;Watermark设置得非常宽松,准确度能够得到提升,但应用的延迟较高,因为Flink必须等待更长的时间才进行计算。
Flink的Watermark细节介绍
Watermark是什么?从不同的维度可以有不同的理解
- 从Watermark的计算角度看:可以将Watermark理解为一个函数:
,它的输入是当前的系统时间,输出是一个Event Time(一个时间戳),而且输出的这个时间戳是严格单调递增的。这样看,Watermark就是一个函数。 - 从Watermark的具体形式来看:可以将Watermark当成一个个时间戳,值就是1中输出的那个时间戳。
- 从Watermark流转的角度看:可以将Watermark理解成夹杂在正常流事件中的一个个特殊事件。
这3种描述方式,看似不同,实则一样,只是从不同的角度去看了而已。不管怎么理解,我们必须知道:流处理系统规定,如果某个时刻Watermark的值为T1,那系统就认为凡是早于或等于T1时间的事件都已经收到了。注意,这个就是Watermark所代表的含义,实际因为现实中各种情况,未必能严格做到这样,但目标就是要达到上面规定的这样,或者无限逼近。
Why?
为什么需要Watermark?这个也有很多种描述方式,往大了说就是提供一种理论上解决分布式系统中消息乱序问题(这是分布式系统中一个经典难题)的方案。说小点就是在有状态的流计算中,当我们关注事件的顺序或者完整性时,需要有这么一种机制能实现这个需求。
这里的完整性我举个例子解释一下:比如我们基于事件发生时间统计每5min的用户PV总量,那比如12:00-12:05这个5min的统计该在什么时间点计算呢?假设没有Watermark这个概念,你就永远不知道什么时候12:00-12:05区间的所有事件才全到齐。你不能假定收到12:00-12:05的数据就认为之前的数据已经全部来了,因为数据可能延迟+乱序了。而Watermark就是为了解决这个问题而提出的,当你收到Watermark的值为12:00-12:05的事件时,你就可以认为早于这个时间的数据已经都到了,数据已经完整了,可以进行12:00-12:05这个5min区间的数据计算了。至于如何保证,这个是框架要做的事情(当然一般需要开发者参与)。
Where?
哪里需要Watermark?这里我给一个简单粗暴的结论,当同时满足下面两个条件的时候才会需要Watermark:
- 计算中使用了时间相关的算子(time-based operations),其实再明确点,就是使用了Window的时候(注:Flink的Global Window除外,这个Window不是基于时间的)。
- 1中使用的时间相关的算子选择使用事件时间,即Event Time(注:如果是Flink的话,也包含Ingestion Time)。
这里再解释一下2。前文我们介绍过有两种时间Event Time和Processing Time(Flink独有的Ingestion Time在Watermark这里可以归结为Event Time,后文不再另行说明),时间相关的算子选择时间时必然是二选一。并不是选择Processing Time的时候就没有Watermark了,只是这个时候Processing Time自身就是一个完美的Watermark(因为时间一去不复返,Processing Time永远是单调递增的),并不需要产生单独的Watermark了。所以在Processing Time里面,你可以认为Watermark没有意义了,所以去掉了,或者认为Processing Time自身就是Watermark都行。
实战
场景介绍
为了方便说明,我构造了一个简单的场景,假设有一个设备产生了一组事件,如下:
{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}
一共9个事件,id是事件名称,timestamp是设备端事件真实产生的时间。也就是事件真实产生顺序是:
event1, event2, event3, event4, event5, event6, event7, event8, event9
但在传输过程中因为各种现实原因乱序了,到Flink这里的时候,事件顺序变成了:
event1, event2, event4, event5, event7, event3, event6, event8, event9
现在我们要做的事情就是计算每5秒中的事件个数,以此来判断事件高峰期。
说明:
- 这个计算是非常有代表性的,比如电商统计每小时的pv就能知道每天用户高峰期发生在哪几个时段,这里为了方便说明问题,把问题简化了,并且为了快速出结果,把时间粒度缩短为5秒钟。
- 计算时,要想结果准确,就不能使用Processing Time,这样如果数据从产生到被处理延迟比较大的话,最终计算的结果也会不准确。除非这个延迟可控或者可接受,则可简单的使用Processing Time,否则就必须用Event Time进行计算。
Flink提供的Watermark机制
Flink提供了3种方式来生成Watermark:
- 在Source中生成Watermark;
- 通过AssignerWithPeriodicWatermarks生成Watermark;
- 通过AssignerWithPunctuatedWatermarks生成Watermark;
前面介绍过了Watermark是在使用Event Time的场景下才使用的,所以给事件增加Event Time和生成Watermark是一对操作,一般都是一起使用的。方式1是直接在Flink的最源头Source那里就生成了Event Time和Watermark。方式2和方式3则是流处理中的某一步骤(可以理解为一个特殊点的算子),它的输入是流,输出还是流,只不过经过这个流之后事件就会有Event Timestamp和Watermark了,一般这一步放在Source之后,最晚也要在时间算子之前,也就是Window之前。而且他两的优先级高,如果Source中生成了Watermark,后面又使用了方式2或3,则会覆盖之前的Event Timestamp和Watermark。
下面我们分别介绍每种方式。
Watermark In Source
package com.niyanchun.watermark;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;
import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;
/**
* Assign timestamp and watermark at Source Function Demo.
*
* @author NiYanchun
**/
public class AssignAtSourceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.addSource(new CustomSource())
.timeWindowAll(Time.seconds(5))
.process(new CustomProcessFunction())
.print();
env.execute();
}
public static class CustomSource extends RichSourceFunction<JSONObject> {
@Override
public void run(SourceContext<JSONObject> ctx) throws Exception {
System.out.println("event in source:");
getOutOfOrderEvents().forEach(e -> {
System.out.println(e);
long timestampInMills = ((DateTime) e.get("timestamp")).getMillis();
ctx.collectWithTimestamp(e, timestampInMills);
ctx.emitWatermark(new Watermark(timestampInMills));
});
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void cancel() {
}
}
/**
* generate out of order events
*
* @return List<JSONObject>
*/
private static List<JSONObject> getOutOfOrderEvents() {
// 2020-05-24 12:00:00
JSONObject event1 = new JSONObject().fluentPut("id", "event1")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 0));
// 2020-05-24 12:00:01
JSONObject event2 = new JSONObject().fluentPut("id", "event2")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 1));
// 2020-05-24 12:00:03
JSONObject event3 = new JSONObject().fluentPut("id", "event3")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 3));
// 2020-05-24 12:00:04
JSONObject event4 = new JSONObject().fluentPut("id", "event4")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 4));
// 2020-05-24 12:00:05
JSONObject event5 = new JSONObject().fluentPut("id", "event5")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 5));
// 2020-05-24 12:00:06
JSONObject event6 = new JSONObject().fluentPut("id", "event6")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 6));
// 2020-05-24 12:00:07
JSONObject event7 = new JSONObject().fluentPut("id", "event7")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 7));
// 2020-05-24 12:00:08
JSONObject event8 = new JSONObject().fluentPut("id", "event8")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 8));
// 2020-05-24 12:00:09
JSONObject event9 = new JSONObject().fluentPut("id", "event9")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 9));
// 这里把消息打乱,模拟实际中的消息乱序
// 真实的消息产生顺序是(根据时间戳):event1, event2, event3, event4, event5, event6, event7, event8, event9
// 打乱之后的消息顺序是:event1, event2, event4, event3, event5, event7, event6, event8, event9
return Arrays.asList(event1, event2, event4, event5, event7, event3, event6, event8, event9);
}
public static class CustomProcessFunction extends ProcessAllWindowFunction<JSONObject, Object, TimeWindow> {
@Override
public void process(Context context, Iterable<JSONObject> elements, Collector<Object> out) throws Exception {
TimeWindow window = context.window();
Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(String.format("\nwindow{%s - %s}", sdf.format(window.getStart()), sdf.format(window.getEnd())));
int count = 0;
for (JSONObject element : elements) {
System.out.println(element.getString("id"));
count++;
}
System.out.println("Total:" + count);
}
}
}
这里自定义了一个Source,然后接了一个Window(timeWindowAll),做了一个简单的处理,最终输出。这里需要注意一个点:timeWindowAll底层其实是定义了一个TumblingWindows,至于使用Processing Time(TumblingProcessingTimeWindows),还是Event Time(TumblingEventTimeWindows)则由env.setStreamTimeCharacteristic来确定的,该选项的默认值是TimeCharacteristic.ProcessingTime,即使用Processing Time。
作为演示,修改一下上面代码,先使用Processing Time,看下结果:
event in source:
{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}
window{2020-05-24 20:12:30 - 2020-05-24 20:12:35}
event1
event2
event4
event5
event7
event3
event6
event8
event9
Total:9
Process finished with exit code 0
可以看到,只有一个Window,其范围是window{2020-05-24 20:12:30 - 2020-05-24 20:12:35},即我代码运行的时间,显然这样的统计结果是没有意义的,因为它体现不出业务真正的高峰期。后面我们只讨论使用Event Time的情况。
现在重新改为env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);,然后运行:
event in source:
{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}
window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}
event1
event2
event4
Total:3
window{2020-05-24 12:00:05 - 2020-05-24 12:00:10}
event5
event7
event6
event8
event9
Total:5
Process finished with exit code 0
我们看下现在的输出,有两个Window:window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}和window{2020-05-24 12:00:05 - 2020-05-24 12:00:10},可以看到就是5秒钟一个Window。然后12:00:00-12:00:05这个Window里面包含了3个事件:event1,event2,event4;12:00:05-12:00:10这个Window里面包含了5个事件:event5、event7、event6、event8、event9。
从这个结果看event3丢了,其它数据都在,为什么呢?如果我说因为event3乱序了,排在了后边,你肯定会说event6也排到了event7后边,为什么event6却没有丢呢?要解释清楚这个问题还需要涉及到触发器以及窗口的原理和机制,为了保证行文的连贯性,这里我先直接给出结论:因为窗口默认的触发器实现机制是本该在一个窗口内的数据乱序了以后,只要在这个窗口结束(即被触发)之前来,那是不影响的,不认为是迟到数据,不会被丢掉;但如果这个窗口已经结束了才来,就会被丢掉了。比如event3本应该属于12:00:00-12:00:05这个窗口,当event5这条数据来的时候,这个窗口就就认为数据完整了,于是触发计算,接着就销毁了。等event3来的时候已经是12:00:05-12:00:10窗口了,所以它直接被丢掉了。也就是在时间窗口这里,对于“乱序”的定义不是要求每个到来事件的时间戳都严格升序,而是看属于这个窗口的事件能否在窗口时间范围内来,如果能来,就不算乱序,至于在这个时间范围内来的先后顺序无所谓。这个其实也是合理的。
另外还有两个细节点要注意一下:
- 当Source是有界数据时,当所有数据发送完毕后,系统会自动发一个值为Long.MAX_VALUE的Watermark,表示数据发送完了。
- Window是一个左闭右开区间,比如12:00:00的数据属于12:00:00-12:00:05窗口,而12:00:05的数据属于12:00:05-12:00:10窗口。
AssignerWithPeriodicWatermarks && AssignerWithPunctuatedWatermarks
AssignerWithPriodicWatermarks和AssignerWithPunctuatedWatermarks其实非常像,哪怕是用法都非常像,他两个的主要区别是Watermark的产生机制或者时机:AssignerWithPriodicWatermarks是根据一个固定的时间周期性的产生Watermark,而AssignerWithPunctuatedWatermarks则是由事件驱动,然后代码自己控制何时以何种方式产生Watermark,比如一个event就产生一个,还是几个event产生一个,或者满足什么条件时产生Watermark等,就是用户可以灵活控制。
package com.niyanchun.watermark;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;
/**
* Assign timestamp and watermark at Source Function Demo.
*
* @author NiYanchun
**/
public class AssignerWatermarksDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.addSource(new CustomSource())
.assignTimestampsAndWatermarks(new CustomAssignerWithPeriodicWatermarks())
// .assignTimestampsAndWatermarks(new CustomAssignerWithPunctuatedWatermarks())
.timeWindowAll(Time.seconds(5))
.process(new CustomProcessFunction())
.print();
env.execute();
}
public static class CustomSource extends RichSourceFunction<JSONObject> {
@Override
public void run(SourceContext<JSONObject> ctx) throws Exception {
System.out.println("event in source:");
getOutOfOrderEvents().forEach(e -> {
System.out.println(e);
ctx.collect(e);
});
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void cancel() {
}
}
/**
* generate out of order events
*
* @return List<JSONObject>
*/
private static List<JSONObject> getOutOfOrderEvents() {
// 2020-05-24 12:00:00
JSONObject event1 = new JSONObject().fluentPut("id", "event1")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 0));
// 2020-05-24 12:00:01
JSONObject event2 = new JSONObject().fluentPut("id", "event2")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 1));
// 2020-05-24 12:00:03
JSONObject event3 = new JSONObject().fluentPut("id", "event3")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 3));
// 2020-05-24 12:00:04
JSONObject event4 = new JSONObject().fluentPut("id", "event4")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 4));
// 2020-05-24 12:00:05
JSONObject event5 = new JSONObject().fluentPut("id", "event5")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 5));
// 2020-05-24 12:00:06
JSONObject event6 = new JSONObject().fluentPut("id", "event6")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 6));
// 2020-05-24 12:00:07
JSONObject event7 = new JSONObject().fluentPut("id", "event7")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 7));
// 2020-05-24 12:00:08
JSONObject event8 = new JSONObject().fluentPut("id", "event8")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 8));
// 2020-05-24 12:00:09
JSONObject event9 = new JSONObject().fluentPut("id", "event9")
.fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 9));
// 可以把消息打乱,模拟实际中的消息乱序。
// 真实的消息产生顺序是(根据时间戳):event1, event2, event3, event4, event5, event6, event7, event8, event9
// 打乱之后的消息顺序是:event1, event2, event4, event3, event5, event7, event6, event8, event9
return Arrays.asList(event1, event2, event4, event5, event7, event3, event6, event8, event9);
}
public static class CustomProcessFunction extends ProcessAllWindowFunction<JSONObject, Object, TimeWindow> {
@Override
public void process(Context context, Iterable<JSONObject> elements, Collector<Object> out) throws Exception {
TimeWindow window = context.window();
Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(String.format("\nwindow{%s - %s}", sdf.format(window.getStart()), sdf.format(window.getEnd())));
int count = 0;
for (JSONObject element : elements) {
System.out.println(element.getString("id"));
count++;
}
System.out.println("Total:" + count);
}
}
/**
* AssignerWithPeriodicWatermarks demo
*/
public static class CustomAssignerWithPeriodicWatermarks implements AssignerWithPeriodicWatermarks<JSONObject> {
private long currentTimestamp;
@Nullable
@Override
public Watermark getCurrentWatermark() {
Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(String.format("invoke getCurrentWatermark at %s and watermark is: %s",
System.currentTimeMillis(), sdf.format(currentTimestamp)));
return new Watermark(currentTimestamp);
}
@Override
public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
long timestamp = ((DateTime) element.get("timestamp")).getMillis();
Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("invoke extractTimestamp: " + sdf.format(timestamp));
currentTimestamp = timestamp;
return timestamp;
}
}
/**
* AssignerWithPunctuatedWatermarks demo.
*/
public static class CustomAssignerWithPunctuatedWatermarks implements AssignerWithPunctuatedWatermarks<JSONObject> {
private long currentTimestamp;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp) {
Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(String.format("invoke getCurrentWatermark at %s and watermark is: %s",
System.currentTimeMillis(), sdf.format(currentTimestamp)));
return new Watermark(currentTimestamp);
}
@Override
public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
long timestamp = ((DateTime) element.get("timestamp")).getMillis();
Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("invoke extractTimestamp: " + sdf.format(timestamp));
currentTimestamp = timestamp;
return timestamp;
}
}
}
先分别看下AssignerWithPriodicWatermarks和AssignerWithPunctuatedWatermarks部分吧:
/**
* AssignerWithPeriodicWatermarks demo
*/
public static class CustomAssignerWithPeriodicWatermarks implements AssignerWithPeriodicWatermarks<JSONObject> {
private long currentTimestamp;
@Nullable
@Override
public Watermark getCurrentWatermark() {
// 省略一些逻辑
return new Watermark(currentTimestamp);
}
@Override
public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
// 省略一些逻辑
return timestamp;
}
}
/**
* AssignerWithPunctuatedWatermarks demo.
*/
public static class CustomAssignerWithPunctuatedWatermarks implements AssignerWithPunctuatedWatermarks<JSONObject> {
private long currentTimestamp;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp) {
// 省略一些逻辑
return new Watermark(currentTimestamp);
}
@Override
public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
// 省略一些逻辑
return timestamp;
}
}
为了突出重点,我删掉了具体实现。可以看到这两个类都有一个extractTimestamp方法,这个方法每个Event都会调用,作用就是给Event赋一个Event Time。另外一个方法稍微有点差异,AssignerWithPeriodicWatermarks的方法叫getCurrentWatermark(),而AssignerWithPunctuatedWatermarks的方法是checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp),它们的主要区别是方法的调用机制:
- getCurrentWatermark()没有参数,它是框架根据用户设置的固定时间周期性的调用。这个固定的时间可以通过以下方式设置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();
executionConfig.setAutoWatermarkInterval(500);
上面的代码设置每500毫秒调用一次getCurrentWatermark(),即每500毫秒产生一个Watermark。不显式的设置的话,默认值是0,但实际效果是每200ms调用一次。
- checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp)有两个参数:一个是event,一个是extractTimestamp方法返回的时间戳。这个方法被调用的时间点是:每个事件来了先调用extractTimestamp,然后马上调用checkAndGetNextWatermark。在checkAndGetNextWatermark中你可以通过返回值控制是否产生新的Watermark,如果你不想返回新的Watermark,可以返回null或者一个小于等于上一个Watermark的时间戳,这样就相当于本次不返回Watermark或者返回的Watermark不是递增的被丢弃了,继续使用原来的Watermark。因为Watermark不能为null,且必须单调递增。
AssignerWithPriodicWatermarks和AssignerWithPunctuatedWatermarks的区别就这些,最佳实践的话我个人觉得优先考虑AssignerWithPriodicWatermarks,如果不能满足需求,再考虑AssignerWithPunctuatedWatermarks。一方面是前者简单一些,另一方面是一般没有必要每个事件就计算一个Watermark,这样会增加不是很有必要的计算量。
迟到数据
从上面的部分看到event3因为迟到被默默的丢掉了,现实中数据是重要资产,肯定是不能随便丢弃的。Flink提供了两种解决方案:
- 允许一定的延迟。这个延迟可以在两个地方设置:第一种是可以在上面的AssignerWithXXXWatermarks方法里面给计算出的时间戳减去一个时间,这个时间就是你允许延迟的时间。第二种就是在时间窗口那里可以通过allowedLateness来设置一个允许的延迟时间,
但允许一定延迟的方式只能治标,不能治本。我们只能根据实际情况允许一定限度的延迟,但总归是有个限度的,原因主要有两个:1)延迟太高会丧失实时性,如果你的场景对实时性要求比较高,那就无法设置太大的延迟。2)延迟实际是延长了窗口的生命周期,所以资源消耗会增加。
- 在Window那里通过sideOutputLateData将迟到的数据以流的形式旁路出去。这个是治本的手段,它没有时间的限制,如果有迟到数据,就会发送到这个单独的流里面去,然后可以为这个流单独设置处理方式。