Flink -- Watermark 那些事

Watermark 是 Flink 实时处理计算平台的一个重要概念,也是 Google 的著名实时计算论文 The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing 里面经常提到的名词。看了很多的参考文献,和文档。有了一些自己的理解和体会,做个笔记。最近 Flink 1.5 版本正式发布,也算蹭个热度吧。


1. Watermark 的理解

  最早看到 Watermark 的概念就是在 Flink 的官方文档里面:

The mechanism in Flink to measure progress in event time is watermarks. Watermarks flow as part of the data stream and carry a timestamp t. A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark)

 上面这句话中有一个词叫做 event-time,了解 DataFlow 模型的同学都知道, event-time 表示的是每个独立事件在各自设备上产生的时间,是这个事件特有的属性,好比是一个人的生日。与之对应的还有 processing-timeingestion-time。理解起来也不难,processing-time 就是 Flink 的 window 操作该事件的时间,ingestion-time 是事件作为数据源进入 Flink 系统的时间。参考下面这个图1,来源于 Flink 文档

图1:Event-Time Ingestion-Time Processing-Time 示例

  重新回到引用中的这句英文,翻译过来的大致意思是," Watermarks 是 Flink 用来度量 event-time 事件处理进度的一种机制,Watermarks 作为数据流的一部分,携带一个时间 t。表示这次的数据流中 event-time 已经处理到时间 t 了,所有 event-time 早于或等于 t 时刻的事件都不应该再出现在这个数据流中了。" 有些朋友可能会有疑问,为什么要弄这么一个奇怪的限制。这是因为一般来说,在实际环境下,由于网络阻塞,延迟等问题,processing-time 相比 event-time 会有延迟滞后的现象,而且这种现象非常普遍。也就是说,并不是生产出一个事件, 就能在第一时间被处理。参考下面这个图2,来源于 Dataflow Model 论文

图2:时间域的偏离

 图2可以看到,横坐标是 event-time,纵坐标是 processing-time。理想的处理机制是,12:01分的 event-time 时间,应该就在 12:01分的 processing-time 时刻被处理。但是实际情况下,横向蓝色实线对应的 processing-time 晚于12:01分。所以图中横向黑色实线表示 event-time 的偏离,纵向红色实线表示 processing-time 的偏离,这些都比较容易理解。 接下来,按照原图的注释,浅色虚线表示 理想的 watermark,深色虚线表示 实际的 watermark。看到这里似乎有些困惑,该如何理解呢?

 直到,我看到另外一篇由 Tyler Akidau 大神写的文章 Stream 102,是这样描述 watermark 的:

Watermarks are temporal notions of input completeness in the event-time domain. Worded differently, they are the way the system measures progress and completeness relative to the event times of the records being processed in a stream of events.

  大致意思是," Watermark 是从 event-time 维度描述输入数据的完整性, 换句话说,是系统评价数据流中处理 event-time 事件进度和完整性的方式"。在图2中,随着 processing-time 的推移,深色虚线逐步获取了 event-time 的完成进度。从概念上说,可以理解成一个函数F(P) -> E,给定一个 processing-time 值,函数返回对应的 event-time 值。这个 event-time 时间值 E 就被系统认为所有早于 E 时刻的 event-time 事件都已经被观察处理过,或者说系统断言,以后都不会再有早于 E 时刻的 event-time 事件出现了。至此,对于流处理的算子(window ... )而言,对于这个时间段的计算告一段落,可以生成结果了。

 当然,很多朋友会问,如果过了 watermark 之后还有比估计时刻早的数据,姗姗来迟,怎么办?Flink 框架自带 Allowed Lateness 机制,这部分又是涉及到另一个概念 trigger,这篇文章先不提了。

2. Watermark 的例子

  了解了 watermark 概念,引用 vishnuviswanath 论坛上的例子更好的加深理解。

case 1: 消息到达没有延迟

  如图3,假设数据源生成3条消息,分别是第13秒,第13秒,第16秒。计算窗口为10秒,每隔5秒滑动一次。


图3:数据生成

 这些消息在图4中会落入对应时间窗口。前两个在第13秒生成的消息会落入 [5s - 15s] 的窗口1和 [10s - 20s] 的窗口2。而第16秒生成的消息 会落入 [10s - 20s] 的窗口2和 [15s - 25s] 的窗口3。最终每隔窗口期望得到的结果分别是 (a,2), (a,3) 和 (a,1)。

图4:期望结果

case 2: 消息到达存在延迟

 假设有一个第13秒生成的消息到达时,延迟了6秒(在第19秒到达),原因可能是网络阻塞。那么情况就变成了图5这样。


图5:数据到达存在延迟

  图中可以看到,延迟的消息落入了窗口2和窗口3,因为19秒属于 10-20 和 15-25的范围内。虽然这个例子中,对于窗口2的结果没有影响,但是窗口1和窗口3的结果都发生了变化,导致发生错误的原因就是处理消息时间的时候用的是 processing-time 而不是 event-time

case 3:用基于 event-time 的系统来处理问题

 使用 event-time 处理机制,需要对每个消息提取 event-time 信息。这个时间信息一般消息自带,或者手动生成。看下面一个例子,暂时忽略 getCurrentWatermark 这个函数,之后来讨论。

public class TimestampExtractor implements AssignerWithPeriodicWatermarks<String> {
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(System.currentTimeMillis());
    }

    @Override
    public long extractTimestamp(String e, long previousElementTimestamp) {
        return Long.parseLong(e.split(",")[1]);
    }
}

  有了 event-time 信息提取器之后,主函数流程如下:

public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> text = env.socketTextStream("localhost", 9999)
                                     .assignTimestampsAndWatermarks(new TimestampExtractor());

        DataStream<Tuple2<String, Integer>> count = text.map((MapFunction<String, Tuple2<String, Integer>>) m -> Tuple2.of(m.split(",")[0], 1))
                                                        .keyBy(0)
                                                        .timeWindow(Time.seconds(10), Time.seconds(5))
                                                        .sum(1);
        count.print();
        env.execute("EventTime processing example");
}

  产生的结果见图6,图中可以看出窗口2和窗口3产生了正确的结果,但是窗口1还是错的。Flink 不把延迟的消息指派给窗口3是因为它知道消息的 event-time不属于窗口3。但是为什么 Flink 不将延迟消息指派给窗口1呢?是因为,当延迟消息到达的时候(第19秒)窗口1已经完成计算了。那好,接下来解决这个问题。


图6:基于 event-time 的计算

case 4:使用 Watermark 机制

  对于我们的案例来说,我们的目的是告诉 Flink 一个消息可以延迟多久。在 case 3 中提到了 getCurrentWatermark 函数,我们设置了 watermark 值为当前系统时间,这表示没有考虑到延迟的消息。现在设置 watermark 值为 当前时间 - 5秒,这就表明 Flink 期望的消息延迟最大是5秒。这是因为窗口在输出最终计算结果的条件是,当 watermark 超过窗口结束时间。所以只有当前时间到达第20秒的时候,窗口1 [5s - 15s] 才符合输出条件,同样的窗口2 [10s - 20s] 要第25秒时才会输出结果。代码如下:

public Watermark getCurrentWatermark() {
        return new Watermark(System.currentTimeMillis() - 5000);
}

  运行的结果如图7,最终,三个窗口的结果都正确。

图7:使用 Watermark 机制

3. Watermark 的本地验证

  看了上面的案例,突然又有了一个奇葩的想法,如果把 当前时间 - Lateness 的方法改成 当前时间 + Lateness 的情况呢?是不是窗口会提前给出结果呢?想到这里,打算在本地试一试。Flink 分配消息时间戳的方式有两种:1. 直接在数据源 (source)指派; 2. 通过定义 timestamp assigner / watermark generator。具体教程参考官方文档。上一节内容中的代码主要是第二种方式,刚好本地验证的时候试试第一种方式。

public class EventTimeDoc {
    
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 设置并行度为1,方便调试
        env.setParallelism(1);

        DataStreamSource<Tuple2<String, Long>> input = env.addSource(new SourceFunction<Tuple2<String, Long>>() {
            String[] letters = new String[]{"A", "B", "C", "D", "E", "F"};
            Random random = new Random();

            @Override
            public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
                while (true) {
                    String key = letters[random.nextInt(6)];
                    Tuple2<String, Long> next = Tuple2.of(key, System.currentTimeMillis());

                    // 打印产生的消息
                    long w = showWindowsStart(next.f1, 0L, 7000L);
                    System.out.println(next + " -> " + LocalDateTime.fromDateFields(new Date(next.f1)) + " -> 所属的窗口起始点是:" + LocalDateTime.fromDateFields(new Date(w)));
                    // event-time 设置为消息本身时间
                    ctx.collectWithTimestamp(next, next.f1);
                    // 消息时间 + Lateness
                    ctx.emitWatermark(new Watermark(next.f1 + 8000));

                    // 生成一条消息,暂停2秒
                    TimeUnit.SECONDS.sleep(2);
                }
            }

            @Override
            public void cancel() {

            }
        });

        input.keyBy(0).timeWindow(Time.seconds(7))
             .reduce((ReduceFunction<Tuple2<String, Long>>) (value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1))
             .map(new MapFunction<Tuple2<String, Long>, Object>() {
                 @Override
                 public Object map(Tuple2<String, Long> value) throws Exception {
                     return Tuple2.of(value.f0, LocalDateTime.fromDateFields(new Date(value.f1)));
                 }
             })
             .print();

        env.execute();
    }

    private static long showWindowsStart(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }
}

  本地测试构造了一个 <String, Long>的 Tuple 消息序列,String 是 字母 A-F 的随机取值,Long 是每次生成消息时的时间戳。计算窗口选择 TumblingEventTimeWindows,固定为 7 秒。伴随每条消息生成发送 watermark 值,并休眠 2 秒。其中 watermark 的值设置成 消息的EventTime + Lateness (Lateness 这里设置的是8秒)

  本地测试时,第一个消息产生在第49秒,根据窗口的生命周期我们知道,所属窗口的第一个消息一旦到达,窗口就立刻生成,窗口范围计算方法是 org.apache.flink.streaming.api.windowing.windows.TimeWindow 的:

public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }

  对照图8的分析,窗口1 的范围是 [48s - 55s) 左闭右开区间。伴随着该消息的生成,watermark 也被发送,此时 watermark 是57秒,已经超过了窗口1的结束时间,窗口1输出最终结果 -- 第49秒的消息,结束自己的生命周期,窗口1被移除。所以第51秒,第53秒的消息没有被观察到。

  接下来,第55秒的消息进入窗口2 [55s - 1分2s) 左闭右开区间,和第49秒消息一样 watermark 值已经超过了窗口2的结束时间,窗口2输出最终结果 -- 第55秒的消息,结束自己的生命周期,窗口2被移除。所以第 57秒,第59秒,第1分1秒的消息没有被观察到。

  时间继续推移,当第1分1秒的消息到达的时候,对应的 watermark 值为 1分9秒,已经超过了 窗口3 [1分2s - 1分9s) 左闭右开区间 的结束时间。结果窗口3中的消息还没到达,没来得及计算,就已经被移除了,第 1分3秒,第1分5秒,第1分7秒的消息没有被观察到。

  以此类推,在 窗口4 [1分9s - 1分16s) 左闭右开区间最终得到的结果只有第1分9秒的消息。

图8:执行分析图

  附上控制台输出的结果,见图9。可以发现最终结果和图8的分析是一致的。(红色框的内容就是各个窗口最终结果)

图9:控制台输出信息

4. 总结

  实时计算中,处理延迟消息是系统的重要一部分。整篇文章,可以了解迟到的消息如何对系统造成不同结果,以及不同 watermark 值在 ApacheFlink 中对窗口结果的影响。

参考资料

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容