Watermark 是 Flink 实时处理计算平台的一个重要概念,也是 Google 的著名实时计算论文 The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing 里面经常提到的名词。看了很多的参考文献,和文档。有了一些自己的理解和体会,做个笔记。最近 Flink 1.5 版本正式发布,也算蹭个热度吧。
1. Watermark 的理解
最早看到 Watermark 的概念就是在 Flink 的官方文档里面:
The mechanism in Flink to measure progress in event time is watermarks. Watermarks flow as part of the data stream and carry a timestamp t. A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark)
上面这句话中有一个词叫做 event-time,了解 DataFlow 模型的同学都知道, event-time 表示的是每个独立事件在各自设备上产生的时间,是这个事件特有的属性,好比是一个人的生日。与之对应的还有 processing-time 和 ingestion-time。理解起来也不难,processing-time 就是 Flink 的 window 操作该事件的时间,ingestion-time 是事件作为数据源进入 Flink 系统的时间。参考下面这个图1,来源于 Flink 文档
重新回到引用中的这句英文,翻译过来的大致意思是," Watermarks 是 Flink 用来度量 event-time 事件处理进度的一种机制,Watermarks 作为数据流的一部分,携带一个时间 t。表示这次的数据流中 event-time 已经处理到时间 t 了,所有 event-time 早于或等于 t 时刻的事件都不应该再出现在这个数据流中了。" 有些朋友可能会有疑问,为什么要弄这么一个奇怪的限制。这是因为一般来说,在实际环境下,由于网络阻塞,延迟等问题,processing-time 相比 event-time 会有延迟滞后的现象,而且这种现象非常普遍。也就是说,并不是生产出一个事件, 就能在第一时间被处理。参考下面这个图2,来源于 Dataflow Model 论文。
图2可以看到,横坐标是 event-time,纵坐标是 processing-time。理想的处理机制是,12:01分的 event-time 时间,应该就在 12:01分的 processing-time 时刻被处理。但是实际情况下,横向蓝色实线对应的 processing-time 晚于12:01分。所以图中横向黑色实线表示 event-time 的偏离,纵向红色实线表示 processing-time 的偏离,这些都比较容易理解。 接下来,按照原图的注释,浅色虚线表示 理想的 watermark,深色虚线表示 实际的 watermark。看到这里似乎有些困惑,该如何理解呢?
直到,我看到另外一篇由 Tyler Akidau 大神写的文章 Stream 102,是这样描述 watermark 的:
Watermarks are temporal notions of input completeness in the event-time domain. Worded differently, they are the way the system measures progress and completeness relative to the event times of the records being processed in a stream of events.
大致意思是," Watermark 是从 event-time 维度描述输入数据的完整性, 换句话说,是系统评价数据流中处理 event-time 事件进度和完整性的方式"。在图2中,随着 processing-time 的推移,深色虚线逐步获取了 event-time 的完成进度。从概念上说,可以理解成一个函数F(P) -> E,给定一个 processing-time 值,函数返回对应的 event-time 值。这个 event-time 时间值 E 就被系统认为所有早于 E 时刻的 event-time 事件都已经被观察处理过,或者说系统断言,以后都不会再有早于 E 时刻的 event-time 事件出现了。至此,对于流处理的算子(window ... )而言,对于这个时间段的计算告一段落,可以生成结果了。
当然,很多朋友会问,如果过了 watermark 之后还有比估计时刻早的数据,姗姗来迟,怎么办?Flink 框架自带 Allowed Lateness 机制,这部分又是涉及到另一个概念 trigger,这篇文章先不提了。
2. Watermark 的例子
了解了 watermark 概念,引用 vishnuviswanath 论坛上的例子更好的加深理解。
case 1: 消息到达没有延迟
如图3,假设数据源生成3条消息,分别是第13秒,第13秒,第16秒。计算窗口为10秒,每隔5秒滑动一次。
这些消息在图4中会落入对应时间窗口。前两个在第13秒生成的消息会落入 [5s - 15s] 的窗口1和 [10s - 20s] 的窗口2。而第16秒生成的消息 会落入 [10s - 20s] 的窗口2和 [15s - 25s] 的窗口3。最终每隔窗口期望得到的结果分别是 (a,2), (a,3) 和 (a,1)。
case 2: 消息到达存在延迟
假设有一个第13秒生成的消息到达时,延迟了6秒(在第19秒到达),原因可能是网络阻塞。那么情况就变成了图5这样。
图中可以看到,延迟的消息落入了窗口2和窗口3,因为19秒属于 10-20 和 15-25的范围内。虽然这个例子中,对于窗口2的结果没有影响,但是窗口1和窗口3的结果都发生了变化,导致发生错误的原因就是处理消息时间的时候用的是 processing-time 而不是 event-time。
case 3:用基于 event-time 的系统来处理问题
使用 event-time 处理机制,需要对每个消息提取 event-time 信息。这个时间信息一般消息自带,或者手动生成。看下面一个例子,暂时忽略 getCurrentWatermark
这个函数,之后来讨论。
public class TimestampExtractor implements AssignerWithPeriodicWatermarks<String> {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis());
}
@Override
public long extractTimestamp(String e, long previousElementTimestamp) {
return Long.parseLong(e.split(",")[1]);
}
}
有了 event-time 信息提取器之后,主函数流程如下:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> text = env.socketTextStream("localhost", 9999)
.assignTimestampsAndWatermarks(new TimestampExtractor());
DataStream<Tuple2<String, Integer>> count = text.map((MapFunction<String, Tuple2<String, Integer>>) m -> Tuple2.of(m.split(",")[0], 1))
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.sum(1);
count.print();
env.execute("EventTime processing example");
}
产生的结果见图6,图中可以看出窗口2和窗口3产生了正确的结果,但是窗口1还是错的。Flink 不把延迟的消息指派给窗口3是因为它知道消息的 event-time不属于窗口3。但是为什么 Flink 不将延迟消息指派给窗口1呢?是因为,当延迟消息到达的时候(第19秒)窗口1已经完成计算了。那好,接下来解决这个问题。
case 4:使用 Watermark 机制
对于我们的案例来说,我们的目的是告诉 Flink 一个消息可以延迟多久。在 case 3 中提到了 getCurrentWatermark
函数,我们设置了 watermark 值为当前系统时间,这表示没有考虑到延迟的消息。现在设置 watermark 值为 当前时间 - 5秒,这就表明 Flink 期望的消息延迟最大是5秒。这是因为窗口在输出最终计算结果的条件是,当 watermark 超过窗口结束时间。所以只有当前时间到达第20秒的时候,窗口1 [5s - 15s] 才符合输出条件,同样的窗口2 [10s - 20s] 要第25秒时才会输出结果。代码如下:
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis() - 5000);
}
运行的结果如图7,最终,三个窗口的结果都正确。
3. Watermark 的本地验证
看了上面的案例,突然又有了一个奇葩的想法,如果把 当前时间 - Lateness 的方法改成 当前时间 + Lateness 的情况呢?是不是窗口会提前给出结果呢?想到这里,打算在本地试一试。Flink 分配消息时间戳的方式有两种:1. 直接在数据源 (source)指派; 2. 通过定义 timestamp assigner / watermark generator。具体教程参考官方文档。上一节内容中的代码主要是第二种方式,刚好本地验证的时候试试第一种方式。
public class EventTimeDoc {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 设置并行度为1,方便调试
env.setParallelism(1);
DataStreamSource<Tuple2<String, Long>> input = env.addSource(new SourceFunction<Tuple2<String, Long>>() {
String[] letters = new String[]{"A", "B", "C", "D", "E", "F"};
Random random = new Random();
@Override
public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
while (true) {
String key = letters[random.nextInt(6)];
Tuple2<String, Long> next = Tuple2.of(key, System.currentTimeMillis());
// 打印产生的消息
long w = showWindowsStart(next.f1, 0L, 7000L);
System.out.println(next + " -> " + LocalDateTime.fromDateFields(new Date(next.f1)) + " -> 所属的窗口起始点是:" + LocalDateTime.fromDateFields(new Date(w)));
// event-time 设置为消息本身时间
ctx.collectWithTimestamp(next, next.f1);
// 消息时间 + Lateness
ctx.emitWatermark(new Watermark(next.f1 + 8000));
// 生成一条消息,暂停2秒
TimeUnit.SECONDS.sleep(2);
}
}
@Override
public void cancel() {
}
});
input.keyBy(0).timeWindow(Time.seconds(7))
.reduce((ReduceFunction<Tuple2<String, Long>>) (value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1))
.map(new MapFunction<Tuple2<String, Long>, Object>() {
@Override
public Object map(Tuple2<String, Long> value) throws Exception {
return Tuple2.of(value.f0, LocalDateTime.fromDateFields(new Date(value.f1)));
}
})
.print();
env.execute();
}
private static long showWindowsStart(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
}
本地测试构造了一个 <String, Long>的 Tuple 消息序列,String 是 字母 A-F 的随机取值,Long 是每次生成消息时的时间戳。计算窗口选择 TumblingEventTimeWindows
,固定为 7 秒。伴随每条消息生成发送 watermark 值,并休眠 2 秒。其中 watermark 的值设置成 消息的EventTime + Lateness (Lateness 这里设置的是8秒)。
本地测试时,第一个消息产生在第49秒,根据窗口的生命周期我们知道,所属窗口的第一个消息一旦到达,窗口就立刻生成,窗口范围计算方法是 org.apache.flink.streaming.api.windowing.windows.TimeWindow
的:
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
对照图8的分析,窗口1 的范围是 [48s - 55s) 左闭右开区间。伴随着该消息的生成,watermark 也被发送,此时 watermark 是57秒,已经超过了窗口1的结束时间,窗口1输出最终结果 -- 第49秒的消息,结束自己的生命周期,窗口1被移除。所以第51秒,第53秒的消息没有被观察到。
接下来,第55秒的消息进入窗口2 [55s - 1分2s) 左闭右开区间,和第49秒消息一样 watermark 值已经超过了窗口2的结束时间,窗口2输出最终结果 -- 第55秒的消息,结束自己的生命周期,窗口2被移除。所以第 57秒,第59秒,第1分1秒的消息没有被观察到。
时间继续推移,当第1分1秒的消息到达的时候,对应的 watermark 值为 1分9秒,已经超过了 窗口3 [1分2s - 1分9s) 左闭右开区间 的结束时间。结果窗口3中的消息还没到达,没来得及计算,就已经被移除了,第 1分3秒,第1分5秒,第1分7秒的消息没有被观察到。
以此类推,在 窗口4 [1分9s - 1分16s) 左闭右开区间最终得到的结果只有第1分9秒的消息。
附上控制台输出的结果,见图9。可以发现最终结果和图8的分析是一致的。(红色框的内容就是各个窗口最终结果)
4. 总结
实时计算中,处理延迟消息是系统的重要一部分。整篇文章,可以了解迟到的消息如何对系统造成不同结果,以及不同 watermark 值在 ApacheFlink 中对窗口结果的影响。