flink的event time机制

时间概念

Flink在流程序中支持三种时间概念:

  • 处理时间(Processing Time):处理时间是指执行相应操作的机器的系统时间。

当流程序在处理时间上运行时,所有基于时间的操作(如时间窗口)将使用运行各自操作符的机器的系统时间。

处理时间是最简单的时间概念,不需要流和机器之间的协调。它提供最佳性能和最低延迟。但是,在分布式和异步环境中,处理时间不提供确定性,因为它很容易受到记录到达系统的速度,记录在系统内部操作符之间流动的速度以及中断的影响。

  • 事件时间(Event Time):事件时间是每个单独事件在其生成设备上发生的时间。

这个时间通常在记录输入Flink之前嵌入到记录中,并且可以从每个记录中提取事件时间戳。在事件时间中,时间的进展取决于数据,而不是任何挂钟。

事件时间程序必须指定如何生成事件时间Watermarks,这是表示事件时间进度的机制。

  • 摄入时间(Ingestion time):摄入时间是事件进入Flink的时间。

在源操作符中,每个记录以时间戳的形式获取源的当前时间,基于时间的操作(如时间窗口)引用该时间戳。

从概念上讲,摄入时间介于事件时间和处理时间之间。与事件时间相比,摄入时间程序不能处理任何无序事件或延迟数据,但程序不必指定如何生成Watermarks,因为在内部,它自动进行时间戳分配和自动Watermarks生成。

设定时间特征

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

event time和watermarks

设置时间特征为事件时间的流处理器需要一种方法来衡量事件时间的进度,Flink衡量event time 进度的机制是watermarks,watermark 带有一个时间戳,作为数据流的一部分随数据流流动,Watermark(t) 表示event time 小于等于 t 的都已经到达,watermarks可以解决实时系统中最常见的问题:乱序与延迟。

生成watermark的方法:

  • 在source中,直接生成watermark,由source生成的watermark 优先级比较低,可以被另一个方法产生的watermark覆盖掉。
  • 通过时间戳分配器(timestamp assigner)来生成水印(watermark)。时间戳分配器分两种:Periodic: 周期性(一定时间间隔或一定数据量)产生watermark;Punctuated: 间断的 watermark,一般根据event 决定是否产生新watermark。

方法一:直接在source中生成watermark

override def run(ctx: SourceContext[MyType]): Unit = {
    while (/* condition */) {
        val next: MyType = getNext()
        ctx.collectWithTimestamp(next, next.eventTimestamp)

        if (next.hasWatermarkTime) {
            ctx.emitWatermark(new Watermark(next.getWatermarkTime))
        }
    }
}

方法二:通过时间戳分配器生成watermark
首先需要指定时间戳分配器,时间戳分配器通常在数据源之后立即指定,但并非严格要求这样做。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: DataStream[MyEvent] = env.readFile(
         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
         FilePathFilter.createDefaultFilter())

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .filter( _.severity == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

withTimestampsAndWatermarks
        .keyBy( _.getGroup )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) => a.add(b) )
        .addSink(...)

周期性时间戳分配器,如下所示:可以看出,自定义的时间戳分配器需要实现AssignerWithPeriodicWatermarks 接口,其中getCurrentWatermark 产生新的watermark,如果返回非空且大于原来的watermark,则生成了新的watermark;另外,extractTimestamp 用于给数据加上时间戳,这个时间戳在后续所有基于event time的计算中使用。
ExecutionConfig.setAutoWatermarkInterval(...) 定义了watermark产生的时间间隔,单位是毫秒。

class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxOutOfOrderness = 3500L // 3.5 seconds

    var currentMaxTimestamp: Long = _

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
    }
}

间断性的时间戳分配器,如下所示:根据event来确定是否需要产生新的watermark,定义Punctuated 分配器需要实现AssignerWithPunctuatedWatermarks接口,包括函数extractTimestamp,checkAndGetNextWatermark,其中extractTimestamp 同Periodic Assigner,首先调用;然后调用checkAndGetNextWatermark ,用于确定是否需要产生新的watermark,当checkAndGetNextWatermark 产生一个非空且大于上一个watermark时就产生了新的watermark。

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
        if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
    }
}

Flink允许程序员分配自己的时间戳并发出自己的watermarks。更具体地说,可以通过实现其中一个AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks接口来实现,具体取决于用例。简而言之,第一个会周期性地发出watermarks,而第二个会根据传入记录的某些属性发出watermarks。为了进一步简化此类任务的编程工作,Flink提供了两种已经实现的时间戳分配器:AscendingTimestampExtractor和BoundedOutOfOrdernessTimestampExtractor。

  • AscendingTimestampExtractor: 这是AssignerWithPeriodicWatermarks 的最简单的情况,数据流是按时间戳升序到达Flink的,这种情况下,数据里的时间戳就可以作为watermark。
val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
  • BoundedOutOfOrdernessTimestampExtractor: 这也是一个AssignerWithPeriodicWatermarks 的实现,表示已知数据的最大延迟,在丢弃元素之前允许元素延迟的最长时间,大于最大延迟时长的元素将被丢弃。
val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容