Flink-1.10 源码笔记 process && 调用过程

我们知道flink已经封装了很多高级的api供用户访问使用,但是有时候我们可能根据不同的需求,发现提供的高级api不能满足我们的需求,这个时候flink也为我们提供了low-level层面的api,比如processFunction,通过processFunction函数,我们可以访问state,进行注册process ,event time定时器来帮助我们完成一项复杂的操作。在我们使用process 函数的时候,有一个前提就是要求我们必须使用在keyedStream上,有两个原因,一个是getRuntimeContext 得到的StreamingRuntimeContext 只提供了KeyedStateStore的访问权限,所以只能访问keyd state,另外一个是我们在注册定时器的时候,需要有三个维度,namespace,key,time,所以要求我们有key,这就是在ProcessFunction中只能在keyedStream做定时器注册,在flink1.8.0版本中,有ProcessFunction 和KeyedProcessFunction 这个类面向用户的api,但是在ProcessFunction 类我们无法注册定时器,在ProcessOperator源码中我们发现注册是抛出异常

为什么KeyedProcessFunction可以调用RuntimeContext对象,通过源码看一下
KeyedProcessFunction是一个抽象类,继承了AbstractRichFunction抽象类

@PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction

进入AbstractRichFunction类,可以看到,该类实现了实现了RichFunction,和Serializable接口
RichFunction中定义了getRuntimeContext方法,在AbstractRichFunction中实现了该方法

@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable

我们调用getRuntimeContext方法时,便可以获取RuntimeContext对象,对状态等进行操作

    private transient RuntimeContext runtimeContext;

    @Override
    public void setRuntimeContext(RuntimeContext t) {
        this.runtimeContext = t;
    }

    @Override
    public RuntimeContext getRuntimeContext() {
        if (this.runtimeContext != null) {
            return this.runtimeContext;
        } else {
            throw new IllegalStateException("The runtime context has not been initialized.");
        }
    }

现在开始看process算子的实现

process算子需要传入的值,传入值分为两种processFunc和KeyedProcessFunc,但不建议使用ProcessFunction了,建议使用KeyedProcessFunction,所以主要看KeyedProcessFunction


image.png

数据流在经过keyBy之后会转换成KeyedStream,先看一下KeyStream中的procss方法
KeyedStream是DataStream的实现

public class KeyedStream<T, KEY> extends DataStream<T>

可以看到process需要传入一个keyedProcessFunction (用编写的),如果用户不指定输出类型,会获取默认类型

    @Internal
    public <R> SingleOutputStreamOperator<R> process(
            KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
            TypeInformation<R> outputType) {

        KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
        return transform("KeyedProcess", outputType, operator);
    }

可以看出将函数封装成了一个KeyedProcessOperator类型,这个类继承了AbstractUdfStreamOperator类和实现了OneInputStreamOperato接口和Triggerable接口

public class KeyedProcessOperator<K, IN, OUT>
    extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
    implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace>

该类重写了 父类的open方法,实现了AbstractUdfStreamOperator的processElement方法和Triggerable的onEventTime和onProcessingTime方法, 现在看一下实现的逻辑

open方法
在方法中,首先调用父类Open方法进行初始化操作, 然后初始化本类服务,

    @Override
    public void open() throws Exception {
        //调用父类open方法 进行初始化
        super.open();
        //创建一个 timestampedCollector 来给定Flink output             ----英翻  时间戳收集器
        collector = new TimestampedCollector<>(output);

        //定义内部定时服务      
        InternalTimerService<VoidNamespace> internalTimerService =
            getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
            //internalTimerService 封装到 TimeService中
                //获取timerSErvice    SimpleTimerService 内部使用了 internalTimerService
        TimerService timerService = new SimpleTimerService(internalTimerService);

        //传入 userFun 和 定时器  返回context对象
        context = new ContextImpl(userFunction, timerService);
        //同上 返回定时器onTimerContext对象
        onTimerContext = new OnTimerContextImpl(userFunction, timerService);
    }

这里重要的是这行代码context = new ContextImpl(userFunction, timerService);
现在看下他的内部实现, 这个是内部类,他继承了KeyedProcessFunction的Context类
在该类中实现了Countext对象,对我们提供上下文服务

    private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context {

        private final TimerService timerService;

        private StreamRecord<IN> element;

        ContextImpl(KeyedProcessFunction<K, IN, OUT> function, TimerService timerService) {
            function.super();
            this.timerService = checkNotNull(timerService);
        }

        @Override
        public Long timestamp() {
            checkState(element != null);

            if (element.hasTimestamp()) {
                return element.getTimestamp();
            } else {
                return null;
            }
        }
            
        @Override
        public TimerService timerService() {
            return timerService;
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            if (outputTag == null) {
                throw new IllegalArgumentException("OutputTag must not be null.");
            }

            output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
        }

        @Override
        @SuppressWarnings("unchecked")
        public K getCurrentKey() {
            return (K) KeyedProcessOperator.this.getCurrentKey();
        }
    }

在看一下 processElement 方法,主要调用用户逻辑
这里userFunc调用processElement方法,该方法为用户定义的内容

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);

        //赋值element
        context.element = element;
        //将context对象 和collector 传入 userFunc中
        //为用户层级提供了访问时间和注册定时器的入口
        userFunction.processElement(element.getValue(), context, collector);

        //赋值调用完后 清空
        context.element = null;
    }

当用户通过ctx.timerService().registerProcessingTimeTimer(); 设置定时器后,定时器触发会走KeyedProcessOperator的onEventTime或onProcessingTime方法 这里看下onEventTime的实现
在EventTime计时器触发时调用,在方法中 调用了 invokeUserFunction方法

  @Override
  public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
      collector.setAbsoluteTimestamp(timer.getTimestamp());
      invokeUserFunction(TimeDomain.EVENT_TIME, timer);
  }

我们跟随invokeUserFunction进入方法 看下实现,这个方法会调用 用户的onTime方法,执行里面逻辑

    private void invokeUserFunction(
        TimeDomain timeDomain,
        InternalTimer<K, VoidNamespace> timer) throws Exception {
        onTimerContext.timeDomain = timeDomain;
        onTimerContext.timer = timer;
        //这个时候也就是调用了我们自定义类K\eyedProcessFunction中的onTimer,
        //调用时传入了OnTimerContextImpl对象,其持有IntervalTimeService服务,也可以注册定时器操作。
        userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
        onTimerContext.timeDomain = null;
        onTimerContext.timer = null;
    }

最终 将用户的Func 包装成KeyedProcessOperator对象 调用transform方法,最终返回转换后的DataStream

    @Internal
    public <R> SingleOutputStreamOperator<R> process(
            KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
            TypeInformation<R> outputType) {

        KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
        return transform("KeyedProcess", outputType, operator);
    }

现在我们追踪进去看,最终调用了doTransform方法,经过一系列的转换,将将operator添加到拓补图中,最终将operator转换成SingleOutputStreamOperator对象,该类继承DataStream,进行返回

    protected <R> SingleOutputStreamOperator<R> doTransform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            StreamOperatorFactory<R> operatorFactory) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        // 检查输出类型是否为MissingTypeInfo,如果是抛出异常,
        transformation.getOutputType();

        //创建OneInputTransformation
        OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
            transformation,          //input   --上游的 transformation
                operatorName,
                operatorFactory,     //需要进行转换操作的
                outTypeInfo,
                environment.getParallelism());

        @SuppressWarnings({"unchecked", "rawtypes"})
        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

        //多个级联的map和filter操作会被transform成为一连串的OneInputTransformation。
        // 后一个transformation的input指向前一个transformation
        getExecutionEnvironment().addOperator(resultTransform);

        return returnStream;
    }

到此整个process算子调用完成

如有错误,欢迎指正!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,602评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,442评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,878评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,306评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,330评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,071评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,382评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,006评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,512评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,965评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,094评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,732评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,283评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,286评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,512评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,536评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,828评论 2 345

推荐阅读更多精彩内容