Flink POJO类型ValueState演化原理剖析

一、起因

最近使用Flink(1.8.1)遇到一个问题,ValueState中数据结构的一个字段类型在开发的时候定义的是int类型,但是实际上应该是long类型。flink虽然从1.8.0开始支持POJO类型的演化,但只支持字段的增减,不支持字段类型的改变,具体规则如下:


POJO types

Flink supports evolving schema of POJO types, based on the following set of rules:

  1. Fields can be removed. Once removed, the previous value for the removed field will be dropped in future checkpoints and savepoints.
  2. New fields can be added. The new field will be initialized to the default value for its type, as defined by Java.
  3. Declared fields types cannot change.
  4. Class name of the POJO type cannot change, including the namespace of the class.

Note that the schema of POJO type state can only be evolved when restoring from a previous savepoint with Flink versions newer than 1.8.0. When restoring with Flink versions older than 1.8.0, the schema cannot be changed.


为了解决当前遇到的困境和内心的疑问,下文将解答以下几个问题:

  • 如何知道用的ValueState是不是POJO类型的?
  • 为什么POJO 支持字段增减,不支持改类型?
  • 遇到字段类型需要改变的场景如何抢救?

二、如何知道用的ValueState是不是POJO类型的

有几个方法,第一种方法是参考Flink给出的POJO类型规范:


Rules for POJO types

Flink recognizes a data type as a POJO type (and allows “by-name” field referencing) if the following conditions are fulfilled:

  • The class is public and standalone (no non-static inner class)
  • The class has a public no-argument constructor
  • All non-static, non-transient fields in the class (and all superclasses) are either public (and non-final) or have a public getter- and a setter- method that follows the Java beans naming conventions for getters and setters.

Note that when a user-defined data type can’t be recognized as a POJO type, it must be processed as GenericType and serialized with Kryo.


如果没有被识别为POJO类型的,将按照Kryo方式进行操作。注意Kryo目前是不支持演化的。
当第一种方法不能让你觉得放心,还有一种更方便的方法。

/**
     * Create a new {@code StateDescriptor} with the given name and the given type information.
     *
     * <p>If this constructor fails (because it is not possible to describe the type via a class),
     * consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor.
     *
     * @param name The name of the {@code StateDescriptor}.
     * @param type The class of the type of values in the state.
     * @param defaultValue The default value that will be set when requesting state without setting
     *                     a value before.
     */
    protected StateDescriptor(String name, Class<T> type, @Nullable T defaultValue) {
        this.name = checkNotNull(name, "name must not be null");
        checkNotNull(type, "type class must not be null");

        try {
            this.typeInfo = TypeExtractor.createTypeInfo(type);
        } catch (Exception e) {
            throw new RuntimeException(
                    "Could not create the type information for '" + type.getName() + "'. " +
                    "The most common reason is failure to infer the generic type information, due to Java's type erasure. " +
                    "In that case, please pass a 'TypeHint' instead of a class to describe the type. " +
                    "For example, to describe 'Tuple2<String, String>' as a generic type, use " +
                    "'new PravegaDeserializationSchema<>(new TypeHint<Tuple2<String, String>>(){}, serializer);'", e);
        }

        this.defaultValue = defaultValue;
    }

上面这个方法是在创建ValueState的时候需要传入的描述类的构造方法。其中

 this.typeInfo = TypeExtractor.createTypeInfo(type);

就是为了识别出是什么类型。可以自己写个main方法单独调用这个方法,type传入自己存入的类。如果执行返回的是PojoTypeInfo,则认为是POJO。如果是GenericTypeInfo则会使用Kryo。

三、为什么POJO 支持字段增减,不支持改类型

Flink通过org.apache.flink.api.java.typeutils.runtime.PojoSerializer实现对ValueState中存储POJO对象的序列化和反序列化操作。
我们假定ValueState存储POJO类:

//这里使用了lombok
@Data 
public class Dog {
    private int type;
    private String name;
    private int age;
    private int sex;
}

下面是截取的序列化方法serialize的代码,fields是用户类的字段的字段反射数组,长度为4,分别对应4个字段,类为java.lang.reflect.Field。fieldSerializers是字段对应的序列化器,同样长度也是4,如果字段类型是int,序列化器是IntSerializer,以此类推。通过反射获取对象字段内容,然后调用各自的序列化器写入到字节流。

for (int i = 0; i < numFields; i++) {
        Object o = (fields[i] != null) ? fields[i].get(value) : null;
    if (o == null) {
                //如果对象该字段为null,则写入true
        target.writeBoolean(true); // null field handling
    } else {
                //如果对象该字段不为null,则写入false,然后写入序列化之后的值内容
        target.writeBoolean(false);
        fieldSerializers[i].serialize(o, target);
    }
}

Flink在做checkpoint时会将ValueState的对象「字段内容」和「字段相应的序列化器」同时进行快照。序列化器的快照操作调用了PojoSerializer的以下方法:

@Override
    public PojoSerializerSnapshot<T> snapshotConfiguration() {
        return buildSnapshot(
                clazz,
                registeredClasses,
                registeredSerializers,
                fields,
                fieldSerializers,
                subclassSerializerCache);
    }

反序列化方法deserialize的代码判断截取如下。

for (int i = 0; i < numFields; i++) {
        //读取对象下一个字段是否为null
    boolean isNull = source.readBoolean();
        
        //下一个字段的反射Field对象是否为null。如果这个字段在新的POJO类中被删除则为null。
    if (fields[i] != null) {
        if (isNull) {
                        fields[i].set(target, null);
        } else {
            Object field = fieldSerializers[i].deserialize(source);
            fields[i].set(target, field);
        }
    } else if (!isNull) {
        // 这条分支表示:该字段在新的POJO结构中删除了,但是在之前序列化时值非null
                // 虽然这个值不需要赋值到新的POJO对象中,但是需要保证流的结构正确,还是需要跳过这部分
        fieldSerializers[i].deserialize(source);
    }
}

字段减少

字段没有变化情况从略,这里对于字段增减的场景进行说明。假定ValueState做了一次checkpoint后去掉了name字段,类结构变为:

//这里使用了lombok
@Data 
public class Dog {
    private int type;
    private int age;
    private int sex;
}

程序在反序列化的时候fields长度和fieldSerializers长度依然都是4。不同的是fields数组中字段name对应数据元素值为null。name字段的反序列化以下分支:

fieldSerializers[i].deserialize(source);

字段增加

假定ValueState做了一次checkpoint后增了address字段,类结构变为:

//这里使用了lombok
@Data 
public class Dog {
    private int type;
    private String name;
    private int age;
    private String address;
    private int sex;
}

程序在反序列化的时候fields长度和fieldSerializers长度仍然都是4。走正常反序列化流程,不同的是对象新增加的字段address值为null。

字段类型变化

字段类型变化有可能从long到int,或者String到int,Flink无法决定如何进行转换,所以不支持字段类型转换情有可原。对字段变化的兼容性检查,在使用PojoSerializer进行反序列化之前。在org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot的getCompatibilityOfPreExistingFields方法中判断兼容性,具体在此不展开说明。

四、遇到字段类型需要改变的场景如何抢救

如果该字段暂时没有下游使用,可以删除掉这个字段,等到checkpoint做完后,再加回来,利用POJO字段可以增删的特性。
如果字段已经有下游使用,需要沟通协调,比如另外增加字段存储这个值。

六、参考内容

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html#rules-for-pojo-types
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#pojo-types

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345