浅谈Flink对象重用(object reuse)

前言

今天是大年初一,祝各位虎年大吉大利~

近期受工作变动影响,博客又荒废了许久。今天难得有空,就前段时间内部技术分享里提到的一个小知识点来写几笔。

对象重用(object reuse)在Flink文档的Execution Configuration一节中并不起眼,并且关于它的说明也语焉不详,如下:

enableObjectReuse() / disableObjectReuse() By default, objects are not reused in Flink. Enabling the object reuse mode will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior.

那么,"reuse"的具体操作是什么?为什么可能会造成bug?什么时候可以安全地启用它呢?本文来简单聊一聊。

算子链与DataStream API对象重用

笔者之前讲过,算子链(operator chaining)是StreamGraph向JobGraph转化过程中的主要优化措施。经过此优化,所有chain在一起的sub-task都会在同一个TaskManager slot中执行,能够减少不必要的数据交换、序列化(注意这点)和上下文切换,从而提高作业的执行效率。

算子链内部的简单示意图如下。

但是,将chained operators连接在一起的ChainingOutput实际上有两种,即ChainingOutputCopyingChainingOutput。查看OperatorChain类中对应的代码:

if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
    currentOperatorOutput = new ChainingOutput<>(operator, outputTag);
} else {
    TypeSerializer<IN> inSerializer =
            operatorConfig.getTypeSerializerIn1(userCodeClassloader);
    currentOperatorOutput = new CopyingChainingOutput<>(operator, inSerializer, outputTag);
}

也就是说,如果启用了对象重用,构造算子链时采用的是ChainingOutput,反之则是CopyingChainingOutput。它们唯一的不同点就是将StreamRecord推到下游算子时的处理方式,做个对比:

// ChainingOutput#pushToOperator()
protected <X> void pushToOperator(StreamRecord<X> record) {
    try {
        // we know that the given outputTag matches our OutputTag so the record
        // must be of the type that our operator expects.
        @SuppressWarnings("unchecked")
        StreamRecord<T> castRecord = (StreamRecord<T>) record;
        numRecordsIn.inc();
        input.setKeyContextElement(castRecord);
        input.processElement(castRecord);
    } catch (Exception e) {
        throw new ExceptionInChainedOperatorException(e);
    }
}

// CopyingChainingOutput#pushToOperator()
protected <X> void pushToOperator(StreamRecord<X> record) {
    try {
        // we know that the given outputTag matches our OutputTag so the record
        // must be of the type that our operator (and Serializer) expects.
        @SuppressWarnings("unchecked")
        StreamRecord<T> castRecord = (StreamRecord<T>) record;
        numRecordsIn.inc();
        StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
        input.setKeyContextElement(copy);
        input.processElement(copy);
    } catch (ClassCastException e) {
        if (outputTag != null) {
            // Enrich error message
            ClassCastException replace =
                    new ClassCastException(
                            String.format(
                                    "%s. Failed to push OutputTag with id '%s' to operator. "
                                            + "This can occur when multiple OutputTags with different types "
                                            + "but identical names are being used.",
                                    e.getMessage(), outputTag.getId()));
            throw new ExceptionInChainedOperatorException(replace);
        } else {
            throw new ExceptionInChainedOperatorException(e);
        }
    } catch (Exception e) {
        throw new ExceptionInChainedOperatorException(e);
    }
}

可见,对象重用的本质就是在算子链中的下游算子使用上游对象的浅拷贝。若关闭对象重用,则必须经过一轮序列化和反序列化,相当于深拷贝,所以就不能100%地发挥算子链的优化效果。

但正所谓鱼与熊掌不可兼得,若启用了对象重用,那么我们的业务代码中必然不能出现以下两种情况,以免造成混乱:

  • 在下游修改上游发射的对象,或者上游存入其State中的对象;
  • 同一条流直接对接多个处理逻辑(如stream.map(new AFunc())的同时还有stream.map(new BFunc()))。

总之,在enableObjectReuse()之前,需要谨慎评估业务代码是否会带来副作用。社区大佬David Anderson曾在Stack Overflow上给出了一个简单明晰的回答,可参见这里

Flink SQL中的对象重用

另一位社区大佬Nico Kruber曾经写过一篇名为<<A Journey to Beating Flink's SQL Performance>>的文章,其中说启用对象重用可以为Blink Planner带来可观的性能收益,并且还相当安全。为什么?

我们知道,Flink SQL的类型系统与DataStream Runtime原生的类型系统有一定区别,故某些基础数据类型的序列化器的实现也有不同。以最常见的字符串类型为例,DataStream原生的StringSerializercopy()方法如下。

@Override
public String copy(String from) {
    return from;
}

可见是能够利用String类型本身的不可变性(immutability)来避免真正的复制。所以,若DataStream API程序中的复杂数据类型越少,序列化成本就越低,打开对象重用的收益也就越小。前述的文章也说明了这一点。

Flink SQL体系中的StringDataSerializer#copy()方法则完全不一样,如下(实际上是BinaryStringData#copy())。

public BinaryStringData copy() {
    ensureMaterialized();
    byte[] copy =
            BinarySegmentUtils.copyToBytes(
                    binarySection.segments, binarySection.offset, binarySection.sizeInBytes);
    return new BinaryStringData(
            new MemorySegment[] {MemorySegmentFactory.wrap(copy)},
            0,
            binarySection.sizeInBytes,
            javaObject);
}

可见是要实打实地复制底层的MemorySegment,此时对象重用的优点就很明显了。

如何保证这边不会有像DataStream API同样的隐患?答案在(之前讲过的)代码生成阶段。例如,在查询维表的CommonExecLookupJoin执行节点中,生成访问输入字段的代码时,会判断是否要强制深拷贝(当允许对象重用时,deepCopy就为true):

  def generateFieldAccess(
    ctx: CodeGeneratorContext,
    inputType: LogicalType,
    inputTerm: String,
    index: Int,
    deepCopy: Boolean): GeneratedExpression = {
    val expr = generateFieldAccess(ctx, inputType, inputTerm, index)
    if (deepCopy) {    // 
      expr.deepCopy(ctx)
    } else {
      expr
    }
  }

如果结果类型是可变(mutable)类型的话,就会生成新的拷贝代码,防止出问题。

def deepCopy(ctx: CodeGeneratorContext): GeneratedExpression = {
  // only copy when type is mutable
  if (TypeCheckUtils.isMutable(resultType)) {
    // if the type need copy, it must be a boxed type
    val typeTerm = boxedTypeTermForType(resultType)
    val serTerm = ctx.addReusableTypeSerializer(resultType)
    val newResultTerm = ctx.addReusableLocalVariable(typeTerm, "field")
    val newCode =
      s"""
         |$code
         |$newResultTerm = $resultTerm;
         |if (!$nullTerm) {
         |  $newResultTerm = ($typeTerm) ($serTerm.copy($newResultTerm));
         |}
      """.stripMargin
    GeneratedExpression(newResultTerm, nullTerm, newCode, resultType, literalValue)
  } else {
    this
  }
}

The End

边看《开端》边写的这一篇,三心二意,有错误请批评指正(

京东物流人工智能与大数据部持续招人中,各位有意年后换工作的大佬尽管丢简历过来,JDL欢迎你~

民那晚安(

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容