Apache Flink源码解析 (四)Stream Operator

概述

  • 以Flink算子的视角为入口,解析它们是如何设计和工作的。
  • 重点在AbstractStreamOperator

实现StreamOperator的接口和抽象类

  • 首先是StreamOperator接口,方法1-4是生命周期相关的。方法5-6是容错与状态相关的,方法7-8是负责从StreamRecord中获取Key的信息的,应用需要shuffle的算子中。方法9-10是与flink的Operator Chaining 优化相关设置。方法11是跟监控相关。除此之外,StreamOperator还继承了CheckpointListener接口(notifyCheckpointComplete方法,checkpoint结束之后的回调方法),KeyContext接口(key的getter setter方法)。
StreamOperator
  • 继承了StreamOperator的扩展接口则有OneInputStreamOperator,TwoInputStreamOperator(这两个接口基本上能做到顾名思义)。
    实现了StreamOperator的抽象类有AbstractStreamOperator以及它的子类AbstractStreamUdfOperator。
StreamOperator

OneInputStreamOperator与TwoInputStreamOperator接口

  • 这两个接口非常类似,本质上就是处理流上存在的三种元素StreamRecord,Watermark和LatencyMarker。一个用作单流输入,一个用作双流输入。除了StreamSource以外的所有Stream算子都必须实现并且只能实现其中一个接口。

    • OneInputStreamOperator
    • TwoInputStreamOperator

AbstractStreamOperator

  • AbstractStreamOperator抽象类

    • 在AbstractStreamOperator中有一些重要的成员变量,总体来说可以分为几类,一类是运行时相关的,一类是状态相关的,一类是配置相关的,一类是时间相关的,还有一类是监控相关的。
      • 配置相关的成员变量有chainingStrategy,决定了能否在生成JobGraph时对算子进行Chaining优化。
      • 状态相关的成员变量:对于stateKeySelector1, stateKeySelector2这两个变量,它们是用于将Key从StreamRecord中提取出来,keySelector是否为null取决于这个算子有几个流有key。keyedStateBackend就是用户指定的state backend,是将状态放在Heap里还是RocksDB里。keyedStateStore是在state backend上做一层抽象,使其能通过StateDescriptor直接获取State。operatorStateBackend是算子相关的状态。
      • 运行时相关的成员变量有container,该算子在运行时所属的StreamTask,以及任务配置信息StreamConfig,算子的输出句柄output,还有运行的上下文runtimeContext。
      • 时间相关的成员变量有timeServiceManager,提供了定时触发的服务,在状态清理上有很重要的作用。
      • 监控相关的成员变量有metrics,注册的一些关于算子的监控信息,还有latencyStats,通过LatencyMarker来获取延时信息。

    • 在AbstractStreamOperator实现的方法中,有一部分是上述变量的getter, setter,还有就是对StreamOperator的实现。

    • 首先是算子生命周期相关的方法:
      • setup的调用链是invoke(StreamTask) -> constructor(OperatorChain) -> setup。在调用setup方法时,StreamTask已经在各个TaskManager节点上,StreamTask在启动是会创建OperatorChain,OperatorChain会一一调用所包含的算子的setup方法。而在setup方法中,通过StreamTask的Environment和UserCodeClassLoader将上述的大部分成员变量进行初始化。
      • open和close方法是空方法,由继承的类实现
      • dispose方法是算子生命周期的最后一环,当StreamTask被取消或者出现异常时调用,负责释放与operator状态相关的资源。

    • 之后是状态与容错相关的方法
      • snapshotState的简化的调用链是triggerCheckpoint(CheckpointCoordinater) -> triggerCheckpoint(Execution) -(AkkaMsg: TriggerCheckpoint)-> (triggerCheckpointBarrier)TaskManager->checkpointStreamOperator(StreamTask) -> snapshotState,从这条链路可以看到checkpoint是由Master节点发起,通过akka触发TaskManager对所有StreamTask做checkpoint,然后最后由算子执行snapshot,在这一层将TimerService做备份,防止触发器丢失,并且调用operatorStateBackend和keyedStateBackend的snapshot方法将stateBackend的备份到用户指定的文件系统。
      • initializeState是空方法,留给子类去覆盖,作用是从checkpoint中恢复状态。
      • getPartitionedState(无namespace)方法创建了一个partitioned state的句柄,使得聚合类的方法可以操作状态,并且这些状态会在snapshotState被调用的时候被checkpoint(AbstractKeyedStateBackend的注释里提到)。在该方法中,调用了重载方法(参数有namespace和相应Serializer)并且传入VoidNamespace相应参数。
      • getOrCreateKeyedState,该方法会被WindowOperator调用,由于同一个key不同的window会对应不同的值,所以每个Window就是一个namespace,在处理具体Element前不仅要切换Key,还要切换namespace。本质上与上一个方法的区别就在于取得的state是否有namespace。
      • notifyCheckpointComplete方法调用了keyStateBackend的同名方法,通知其执行checkpoint完成之后的逻辑。

    • 两个setKeyConextElement方法,借助对应的成员变量stateKeySelector将Key从StreamRecord里面提取出来. 这两个方法的调用链为processInput(StreamInputProcessor || StreamTwoInputProcessor) -> streamOperator.setKeyContextElement1. 值得注意的是,在这个方法调用后紧跟着的就是processElement方法。所以这个方法的作用就是在处理Record之前将stateBackend切换到相应的Key的状态。

    • 最后,虽然没有实现OneInputStreamOperator或TwoInputStreamOperator接口,但是对processWatermark和ProcessLatencyMarker的方法都做了实现。启动对watermark的处理是通过InternalTimeServiceManager,具体不作展开。
      • AbstractStreamOperator实现类(没有继承AbstractUdfStreamOperator)
        • StreamProject实现了投影的功能,覆盖了了open方法,创建了一个输出Tuple的实例,processElement方法中将需要的field从输入Tuple中提取出来加入输出Tuple,将StreamRecord中的元素替换成输出Tuple,通过AbstractStreamOperator中定义的output发送到下游。
          public void processElement(StreamRecord<IN> element) throws Exception {
              for (int i = 0; i < this.numFields; i++) {
                  outTuple.setField(((Tuple) element.getValue()).getField(fields[i]), i);
              }
              output.collect(element.replace(outTuple));
          }
        

  • AbstractUdfStreamOperator抽象类

    • AbstractUdfStreamOperator继承了AbstractStreamOperator,对其部分方法做了增强,多了一个成员变量UserFunction,并提供get方法。此外还实现了OutputTypeConfigurable接口的setOutputType方法对输出数据的类型做了设置。
      • 还是算子生命周期相关的方法:首先调用了前文提到的AbstractStreamOperator相应方法,然后为userFunction提供runtimeContext,Configuration,并且调用function的open和close方法。
      • 状态和checkpoint相关的方法: 首先调用父类相应方法,完成算子状态的别分与恢复,然后调用实现了Checkpoint相关的接口的userFunction的相关方法。

    • AbstractStreamOperator实现类
      • OneInputStreamOperator
        • StreamFilter,StreamMap与StreamFlatMap算子在实现的processElement分别调用传入的FilterFunction,MapFunction, FlatMapFunction的udf将element传到下游。其中StreamFlatMap用到了TimestampedCollector,它是output的一层封装,将timestamp加入到StreamRecord中发送到下游。
        • StreamGroupedReduce与StreamGroupedFold算子相似的点是都涉及到了操作状态, 所以在覆盖open方法时通过创建一个状态的描述符以及调用AbstractStreamOperator实现的getPartitionedState方法获取了一个stateBackend的操作句柄。在processElement方法中借助这个句柄获取当前状态值,在用UDF将新的元素聚合进去并更新状态值,最后输出到下游。不同的是Fold的输出类型可能不一样(所以实现了OutputTypeConfigurable接口的setOutputType方法),并且有初始值。
          ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer);
          values = getPartitionedState(stateId);
        
        • ProcessOperator, LegacyKeyedProcessOperator提供了对ProcessFunction的支持, KeyedProcessOperator提供了对KeyedProcessFunction的支持。ProcessFunction是比较灵活的UDF,允许用户通过在processElement的时候可以通过传入的Conext(这个Conext是由算子实现的)操作TimerService,注册ProcessingTimeTimer和EventTimeTimer,并且通过实现方法onTimer就可以在Timer被触发的时候执行回调的逻辑。
          • 其中ProcessOperator对ProcessFunction的实现是只支持获取currentProcessingTime和wartermark,不支持Timer的注册。
          • LegacyKeyedProcessOperator对ProcessFunction的实现是支持Timer的注册的。同时它实现了Triggerable接口中的onEventTime和onProcessingTime方法,这两个方法是由InternalTimerService调用的回调方法,在这两个方法中,会去调用用户在ProcessFunction中实现的onTimer方法来实现定时触发的。LegacyKeyedProcessOperator现在被标记为Deprecated,由KeyedProcessOperator类替代,但是由于很多Function是基于ProcessFunction实现的,而KeyedProcessFunction在1.5.2中还没有什么实现类,所以还是会经常被用到。
          • KeyedProcessOperator实现与LegacyKeyedProcessOperator非常类似,不过UDF是KeyedProcessFunction。
        • StreamSink除了在processElement方法中调用SinkFunction方法外,还提供了SimpleContext,可以获取processingTime,watermark和element的时间戳。
        • GenericWriteAheadSink提供了一个可以被实现为Exactly once的sink的抽象类,这边不展开研究。
        • AsyncWaitOperator提供了异步处理的能力,是一个比较特殊的算子,对元素的处理和备份恢复都比较特殊。element的输出通过一个Emitter对象来实现。有机会单独针对这个算子写一篇文章。
        • TimestampsAndPeriodicWatermarksOperator与TimestampsAndPunctuatedWatermarksOperator通过TimestampAssigner提取timestamp并生按照规则生成watermark。
        • windowOperator可能是flink实现的最复杂的operator,可以参考vinoyang的Flink流处理之窗口算子分析。后续也有可能针对这个operator写一篇解析。

      • TwoInputStreamOperator的实现类相对较少,作用于双流的各种操作。
        • TwoInputStreamOperatorChildren.jpg
        • CoStreamMap, CoStreamFlatMap基本与单流的逻辑没什么区别,只是针对两个流的Function做类似的处理。
        • IntervalJoinOperator对双流的元素根据提供的ProcessJoinFunction做内连接,并且每个元素都有失效时间。在processElement方法中,每当一个流的元素到达,会将它加入对应流的buffer,并且遍历另一个流的buffer找到所有join的选项。最后再根据失效时间注册一个状态清理的Timer防止buffer无限增长。
          • 此外它还实现了ProcessJoinFunction的Conext抽象类,提供了获取左右两个流元素timestamp的功能。关于BufferEntry的实现这里略过。
        • CoProcessOperator和KeyedCoProcessOperator本质上与单流的处理也没有什么区别,但是提供了双流之间共享状态的可能。CoProcessOperator也被用来实现NonWindowJoin。
        • CoBroadcastWithKeyedOperator和CoBroadcastWithNonKeyedOperator提供了对(Keyed)BroadcastProcessFunction的支持,和CoProcess有一些类似,只是Broadcast的Stream只有读权限,没有写权限。并且可以通过context直接获得BroadcastState。

      • StreamSource因为没有输入,所以没有实现InputStreamOperator的接口。比较特殊的是ChainingStrategy初始化为HEAD。在运行时由SourceStreamTask调用run方法。在run方法中,他首先出事话LatencyMarksEmitter用来产生延迟监控的LatencyMarker,接着根据用户选择的时间模式(EventTime,IngestionTime和ProcessingTime)来生成相应的SourceConext(包含了产生element关联的timestamp的方法和生成watermark的方法)。最后调用SourceFunction的run方法来启动source。

  • 总结

    • 本文从StreamOperator接口开始一层一层往上解析,重点关注了AbstractStreamOperator和AbstractUdfStreamOperator这两个抽象类,覆盖了大部分的算子的功能与设计。
    • 了解算子如何工作可以对运行的任务有更深的了解,借助AbstractUdfStreamOperator,也可以较低成本的定制出特殊的算子。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容