Flink RichFunction题目一则

前言

祝广大女性节日快乐~

快问快答

  • Flink DataStream API中的RichFunction有哪些用途/特点?
  • RichFunction中获取到的RuntimeContext是干什么用的?
  • 所有Function都有对应的RichFunction实现吗?
  • 所有Flink流处理的算子都可以传入RichFunction吗?

前两个问题实际上可以合并成一个问题。RichFunction的特点是比Function多出了生命周期管理(open()close()方法),以及能够获取其运行时上下文RuntimeContext。RuntimeContext与Function的每个并行实例(即一个Sub-task)相关联,通过它还能进一步得到如下信息:

  • 运行时静态信息,如Task的名称、并行度、最大并行度、当前Sub-task的编号、当前类加载器等;
  • 全局数据结构,即累加器(Accumulators)、广播变量(Broadcast variables)和分布式缓存(Distributed cache);
  • 创建各种状态句柄,即我们熟知的get***State(StateDescriptor)方法。

第三个问题,yes;第四个问题,no

RichFunction不适用的场景

简单的开窗聚合场景:

dataStream.keyBy(x -> x.getKey())
  .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
  .reduce(new MyRichReduceFunction<>())

这段代码能编译通过,但执行时会抛出UnsupportedOperationException,提示ReduceFunction of reduce can not be a RichFunction。如果换成aggregate()方法和RichAggregateFunction会有同样的问题,提示This aggregation function cannot be a RichFunction。在WindowedStream的对应实现中,可以看到此路不通:

    public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
        if (function instanceof RichFunction) {
            throw new UnsupportedOperationException(
                    "ReduceFunction of reduce can not be a RichFunction. "
                            + "Please use reduce(ReduceFunction, WindowFunction) instead.");
        }

        // clean the closure
        function = input.getExecutionEnvironment().clean(function);
        return reduce(function, new PassThroughWindowFunction<>());
    }

    public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
        checkNotNull(function, "function");

        if (function instanceof RichFunction) {
            throw new UnsupportedOperationException(
                    "This aggregation function cannot be a RichFunction.");
        }

        TypeInformation<ACC> accumulatorType =
                TypeExtractor.getAggregateFunctionAccumulatorType(
                        function, input.getType(), null, false);

        TypeInformation<R> resultType =
                TypeExtractor.getAggregateFunctionReturnType(
                        function, input.getType(), null, false);

        return aggregate(function, accumulatorType, resultType);
    }

为什么不能用Rich[Reduce / Aggregate]Function?

答案并不难:与FlatMap、Filter等算子不同,Reduce和Aggregate本身就是自带确定的状态语义的算子,不需要用户手动操作状态(如果用户能干预的话大概率会出问题),也不需要生命期管理的特性(它们的生命期总是始于第一条数据,终于最后一条数据)。

以Reduce逻辑为例(Aggregate同理),不妨进一步看下对应的窗口算子是如何构造的。

    public <R> WindowOperator<K, T, ?, R, W> reduce(
            ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
        Preconditions.checkNotNull(reduceFunction, "ReduceFunction cannot be null");
        Preconditions.checkNotNull(function, "WindowFunction cannot be null");

        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException(
                    "ReduceFunction of apply can not be a RichFunction.");
        }

        if (evictor != null) {
            return buildEvictingWindowOperator(
                    new InternalIterableWindowFunction<>(
                            new ReduceApplyWindowFunction<>(reduceFunction, function)));
        } else {
            ReducingStateDescriptor<T> stateDesc =
                    new ReducingStateDescriptor<>(
                            WINDOW_STATE_NAME, reduceFunction, inputType.createSerializer(config));

            return buildWindowOperator(
                    stateDesc, new InternalSingleValueWindowFunction<>(function));
        }
    }

注意到这里创建了ReducingStateDescriptor(ReduceFunction恰好是它的一个入参),并最终获取了内置的ReducingState句柄。其实就DataStream API用户的日常编程习惯而言,很少会主动用到ReducingState(以及AggregateState)。即使这样,在它们的描述符构造方法中,也加了同样的强制校验,防止传入RichFunction,以保护状态的确定性。

    public ReducingStateDescriptor(
            String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {
        super(name, typeClass, null);
        this.reduceFunction = checkNotNull(reduceFunction);

        if (reduceFunction instanceof RichFunction) {
            throw new UnsupportedOperationException(
                    "ReduceFunction of ReducingState can not be a RichFunction.");
        }
    }

话说回来,Rich[Reduce / Aggregate]Function在Flink工程内部以及示例中都没有有效的使用过,所以我们大概可以判定这是Flink发展过程中的遗产吧(笑

The End

晚安晚安。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容

  • 一. 应用 二. 抽象 三. 时间与窗口👍 四. 类型与序列化 五. 内存管理👍 六. 状态管理 七. 作业提交 ...
    冰菓_阅读 533评论 0 5
  • 水位线 是数据流中插入的一个标记,用来表示事件时间的进展,它会随着数据一起在任务间传递。 可以实现Watermar...
    just_writer阅读 288评论 0 0
  • @[TOC] [还有视频讲解在我的B站-宝哥chbxw](https://www.bilibili.com/vid...
    宝哥大数据阅读 872评论 0 0
  • 一、Flink 简介   Apache Flink 是一个用于对无边界和有边界数据流进行有状态计算的框架和分布式处...
    小胡_鸭阅读 2,146评论 0 11
  • 前言 在更底层,可以不定义任何具体的算子(比如 map,filter,或者 window),而只是提炼出一个统一的...
    小波同学阅读 605评论 1 8