flink时间语义指北

新旧版本对比

早在flink1.10以前的版本,有三种时间语义:

  • processing time,即当前集群的处理时间
  • event time,即消息中附带的消息产生时间
  • ingestion time,即消息进入集群的摄取时间

彼时,时间语义可以通过如下方式设置

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

2020年年中,flink1.11发布,其中,FLIP-126: Unify (and separate) Watermark Assigners。水位线的产生之前有两个接口(AssignerWithPunctuatedWatermarksAssignerWithPeriodicWatermarks),改为由WatermarkGenerator接口实现,降低了开发的复杂度,并且脱离了TimestampAssigner

2020年年底,flink1.12发布,流批统一,另外flink将上述时间语义设置方式标记过期:env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

水印生成策略

// version flink-1.12.1
//官方策略-无水印生成
assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks())
//官方策略-单调递增的水印
assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
//官方策略-延迟20秒的水印
assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)))

//自定义策略-从消息中提取水印
class MyWatermark implements WatermarkGenerator<String> {
    @Override
    public void onEvent(String event, long timestamp, WatermarkOutput output) {
        LocalDateTime dateTime = LocalDateTime.parse(event, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
        long eventTimestamp = dateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
        log.info("event[{}] timestamp[{}] eventTimestamp[{}]" ,event , timestamp, eventTimestamp);
        output.emitWatermark(new Watermark(eventTimestamp));
    }
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
    }
}

void onPeriodicEmit(WatermarkOutput output):周期性触发watermark生成,默认200ms。可以通过env.getConfig().setAutoWatermarkInterval(200)配置。

void onEvent(String event, long eventTimestamp, WatermarkOutput output):事件触发watermark生成。

withIdleness(Duration.ofMinutes(1)):此方法为闲置watermark生成。如果1分钟内没有事件产生,则下发一个watermark。

两条时间线

  • processing time:flink处理消息时的时间,窗口开启的依据时间

  • watermark:消息中的消息产生时间,窗口关闭的依据时间

    note:如果当前watermark大于窗口关闭的时间,你会发现,数据无法在窗口收集

源码分析

    //窗口分配器参数中 timestamp为当前处理事件
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > -9223372036854775808L) {
            if (this.staggerOffset == null) {
                this.staggerOffset = this.windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), this.size);
            }

            long start = TimeWindow.getWindowStartWithOffset(timestamp, (this.globalOffset + this.staggerOffset) % this.size, this.size);
            return Collections.singletonList(new TimeWindow(start, start + this.size));
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }
    //窗口触发器 TriggerResult.FIRE表示窗口关闭,触发计算
    //onElement表示窗口收集动作
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }
    //onEventTime表示窗口计算动作
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

注意事项

  1. 单调递增的watermark的下发一般越贴近数据源越好
  2. 每个算子默认的watermark是Long.MIN_VALUE
  3. 算子的watermark时间不会出现回溯(下一个水印如果小于当前水印则无效)
  4. 如果并行度大于1,则watermark依据木桶原理,同一算子的每个slot的watermark时间都大于关窗时间才触发计算
  5. 上述均为watermark的生成及下发,还有关于执行时间的生成及下发,虽然他很少用。具体可以了解了解BoundedOutOfOrdernessTimestampExtractor这个类

参考资料

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容