Flink Watermark相关整理

定义与作用

官方定义:A Watermark tells operators that no elements with a timestamp older or equal to the watermark timestamp should arrive at the operator.

作用:watermark是一个时间标记,用于上游告诉下游,该时间点以前的数据都已经发送完毕。主要用于解决乱序和延迟数据的问题。

产生方式

Watermark的产生必然伴随着数据的Timestamp抽取(通过调用assignTimestampsAndWatermarks方法)。可以在产生DataStreamSource的时候就直接产生(如用FlinkKafkaConsumer类中的assignTimestampsAndWatermarks来配置),也可以在后面为DataStream中设置(用DataStream类中的assignTimestampsAndWatermarks来配置)。

Watermark的产生逻辑有两种:Periodic和Punctuated。

Periodic

使用方式:

 Datastream.assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner);

这种方式下,Datastream向下游发送Watermark的时机总是固定周期的,但是发送的Watermark值却是根据数据中的Timestamp来计算的。

AssignerWithPeriodicWatermarks<T>包含一个方法:

Watermark getCurrentWatermark();

用来实现获取Watermark的逻辑。Datastream在会注册一个定时器,定时器触发的时候会调用getCurrentWatermark来计算一个Watermark,并将其注入输出的数据流中。

可以通过继承AssignerWithPeriodicWatermarks<T>来自定义Watermark的计算逻辑。当然通常更多地使用预定义的类:

BoundedOutOfOrdernessTimestampExtractor

对于所有数据元素抽取Timestamp后,保存迄今最大的值保存在currentMaxTimestamp中,然后产生watermark的计算方法:watermark = currentMaxTimestamp - maxOutOfOrderness。其中maxOutOfOrderness表示最大乱序时间,可以自定义。即当前timestamp最大值减去一定的延时作为watermark,这是比较常用的一种

Punctuated

使用方式:

 Datastream.assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner);

这种方式下,无论是Datastream向下游发送Watermark的时机,还是发送的Watermark值都是由用户自行定义的。

AssignerWithPunctuatedWatermarks<T>包含一个方法:

Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);

它会作用于每一个数据元素,其中用户需要自己定义是否根据这个元素输出和怎样输出watermark。

这种方式的主要使用场景是数据中本身包含有标志着该数据以前的数据已经发送完毕的信息,或者用户想自己控制watermark产生的方法与时机。

传递方式

image

watermark和record、stream status一样,都以StreamElement的形式在DataStream中,由上游算子发送给下游算子。

每个算子都使用了StatusWatermarkValve来保存和处理上游发来的watermark状态,并且决定如何传递给它的下游。

StatusWatermarkValve内部又包含了一个数组

private final InputChannelStatus[] channelStatuses;

用于保存每个上游通道的最大watermark以及该通道的stream status等数据

处理流程

一个算子接收到watermark,并进行处理的整个流程:

  • 每个算子都有一个StreamInputProcessor(或StreamTwoInputProcessor,根据上游算子的数量决定),它从数据流中读数据,并判断数据的类型。如果是watermark,则交给其所属的StatusWatermarkValve(状态水位阀)去处理这个watermark。

  • StatusWatermarkValve内部包含了一个数组InputChannelStatus[] channelStatuses,它保存了每个上游通道的最大watermark。其中每个通道对应上游算子的一个并发。

  • StatusWatermarkValve拿到这个watermark时,会在InputChannelStatus[]数组中找到其对应通道的之前的watermark,如果当前的比原来的大,则更新该通道的watermark;

  • 如果有更新,则还要判断一下InputChannelStatus[]数组中的watermark最小值是否有更新,如果更新了,则调用outputHandler的handleWatermark来产生该算子的新watermark。

  • 随后,算子的timerService首先会更新算子内部的watermark状态,并判断是否触发一些窗口、定时事件等,等数据都处理完毕后调用output.emitWatermark将当前的watermark下发给下游。注意必须等该Watermark所触发的计算完成、并保证数据下发后,再行下发Watermark。

image
image

使用方式

触发窗口

这是最常见的用法,当Watermark没过算子的某些窗口的结束时间,算子会触发这些窗口,对窗口中的数据进行处理,然后输出处理结果,并将Watermark也输出到下游。

在此之后如果有早于该Watermark时间戳的数据元素流入该算子,则会被当成late element被输出到side output中。

注意:WindowedStream中还有一个方法

allowedLateness(Time lateness);

这里的lateness与maxOutOfOrderness类似,都是用于设定延时时间的,只是maxOutOfOrderness是在curMaxTimestamp的基础上减去一个延时时间,来产生Watermark。而lateness是在Watermark的基础上再减去一个延时时间。设定了这个参数后,是当Watermark-lateness大于窗口结束时间时,才出发窗口。

触发定时器

有些算子中虽然没有使用窗口,但是注册使用了定时器:

ctx.timerService().registerEventTimeTimer(value.getBizTime());

这种情况下,当Watermark没过所注册的定时时间的时候,就会触发定时器,去调用用户自定义的onTimer方法。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容