一、起因
最近使用Flink(1.8.1)遇到一个问题,ValueState中数据结构的一个字段类型在开发的时候定义的是int类型,但是实际上应该是long类型。flink虽然从1.8.0开始支持POJO类型的演化,但只支持字段的增减,不支持字段类型的改变,具体规则如下:
POJO types
Flink supports evolving schema of POJO types, based on the following set of rules:
- Fields can be removed. Once removed, the previous value for the removed field will be dropped in future checkpoints and savepoints.
- New fields can be added. The new field will be initialized to the default value for its type, as defined by Java.
- Declared fields types cannot change.
- Class name of the POJO type cannot change, including the namespace of the class.
Note that the schema of POJO type state can only be evolved when restoring from a previous savepoint with Flink versions newer than 1.8.0. When restoring with Flink versions older than 1.8.0, the schema cannot be changed.
为了解决当前遇到的困境和内心的疑问,下文将解答以下几个问题:
- 如何知道用的ValueState是不是POJO类型的?
- 为什么POJO 支持字段增减,不支持改类型?
- 遇到字段类型需要改变的场景如何抢救?
二、如何知道用的ValueState是不是POJO类型的
有几个方法,第一种方法是参考Flink给出的POJO类型规范:
Rules for POJO types
Flink recognizes a data type as a POJO type (and allows “by-name” field referencing) if the following conditions are fulfilled:
- The class is public and standalone (no non-static inner class)
- The class has a public no-argument constructor
- All non-static, non-transient fields in the class (and all superclasses) are either public (and non-final) or have a public getter- and a setter- method that follows the Java beans naming conventions for getters and setters.
Note that when a user-defined data type can’t be recognized as a POJO type, it must be processed as GenericType and serialized with Kryo.
如果没有被识别为POJO类型的,将按照Kryo方式进行操作。注意Kryo目前是不支持演化的。
当第一种方法不能让你觉得放心,还有一种更方便的方法。
/**
* Create a new {@code StateDescriptor} with the given name and the given type information.
*
* <p>If this constructor fails (because it is not possible to describe the type via a class),
* consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor.
*
* @param name The name of the {@code StateDescriptor}.
* @param type The class of the type of values in the state.
* @param defaultValue The default value that will be set when requesting state without setting
* a value before.
*/
protected StateDescriptor(String name, Class<T> type, @Nullable T defaultValue) {
this.name = checkNotNull(name, "name must not be null");
checkNotNull(type, "type class must not be null");
try {
this.typeInfo = TypeExtractor.createTypeInfo(type);
} catch (Exception e) {
throw new RuntimeException(
"Could not create the type information for '" + type.getName() + "'. " +
"The most common reason is failure to infer the generic type information, due to Java's type erasure. " +
"In that case, please pass a 'TypeHint' instead of a class to describe the type. " +
"For example, to describe 'Tuple2<String, String>' as a generic type, use " +
"'new PravegaDeserializationSchema<>(new TypeHint<Tuple2<String, String>>(){}, serializer);'", e);
}
this.defaultValue = defaultValue;
}
上面这个方法是在创建ValueState的时候需要传入的描述类的构造方法。其中
this.typeInfo = TypeExtractor.createTypeInfo(type);
就是为了识别出是什么类型。可以自己写个main方法单独调用这个方法,type传入自己存入的类。如果执行返回的是PojoTypeInfo,则认为是POJO。如果是GenericTypeInfo则会使用Kryo。
三、为什么POJO 支持字段增减,不支持改类型
Flink通过org.apache.flink.api.java.typeutils.runtime.PojoSerializer实现对ValueState中存储POJO对象的序列化和反序列化操作。
我们假定ValueState存储POJO类:
//这里使用了lombok
@Data
public class Dog {
private int type;
private String name;
private int age;
private int sex;
}
下面是截取的序列化方法serialize的代码,fields是用户类的字段的字段反射数组,长度为4,分别对应4个字段,类为java.lang.reflect.Field。fieldSerializers是字段对应的序列化器,同样长度也是4,如果字段类型是int,序列化器是IntSerializer,以此类推。通过反射获取对象字段内容,然后调用各自的序列化器写入到字节流。
for (int i = 0; i < numFields; i++) {
Object o = (fields[i] != null) ? fields[i].get(value) : null;
if (o == null) {
//如果对象该字段为null,则写入true
target.writeBoolean(true); // null field handling
} else {
//如果对象该字段不为null,则写入false,然后写入序列化之后的值内容
target.writeBoolean(false);
fieldSerializers[i].serialize(o, target);
}
}
Flink在做checkpoint时会将ValueState的对象「字段内容」和「字段相应的序列化器」同时进行快照。序列化器的快照操作调用了PojoSerializer的以下方法:
@Override
public PojoSerializerSnapshot<T> snapshotConfiguration() {
return buildSnapshot(
clazz,
registeredClasses,
registeredSerializers,
fields,
fieldSerializers,
subclassSerializerCache);
}
反序列化方法deserialize的代码判断截取如下。
for (int i = 0; i < numFields; i++) {
//读取对象下一个字段是否为null
boolean isNull = source.readBoolean();
//下一个字段的反射Field对象是否为null。如果这个字段在新的POJO类中被删除则为null。
if (fields[i] != null) {
if (isNull) {
fields[i].set(target, null);
} else {
Object field = fieldSerializers[i].deserialize(source);
fields[i].set(target, field);
}
} else if (!isNull) {
// 这条分支表示:该字段在新的POJO结构中删除了,但是在之前序列化时值非null
// 虽然这个值不需要赋值到新的POJO对象中,但是需要保证流的结构正确,还是需要跳过这部分
fieldSerializers[i].deserialize(source);
}
}
字段减少
字段没有变化情况从略,这里对于字段增减的场景进行说明。假定ValueState做了一次checkpoint后去掉了name字段,类结构变为:
//这里使用了lombok
@Data
public class Dog {
private int type;
private int age;
private int sex;
}
程序在反序列化的时候fields长度和fieldSerializers长度依然都是4。不同的是fields数组中字段name对应数据元素值为null。name字段的反序列化以下分支:
fieldSerializers[i].deserialize(source);
字段增加
假定ValueState做了一次checkpoint后增了address字段,类结构变为:
//这里使用了lombok
@Data
public class Dog {
private int type;
private String name;
private int age;
private String address;
private int sex;
}
程序在反序列化的时候fields长度和fieldSerializers长度仍然都是4。走正常反序列化流程,不同的是对象新增加的字段address值为null。
字段类型变化
字段类型变化有可能从long到int,或者String到int,Flink无法决定如何进行转换,所以不支持字段类型转换情有可原。对字段变化的兼容性检查,在使用PojoSerializer进行反序列化之前。在org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot的getCompatibilityOfPreExistingFields方法中判断兼容性,具体在此不展开说明。
四、遇到字段类型需要改变的场景如何抢救
如果该字段暂时没有下游使用,可以删除掉这个字段,等到checkpoint做完后,再加回来,利用POJO字段可以增删的特性。
如果字段已经有下游使用,需要沟通协调,比如另外增加字段存储这个值。
六、参考内容
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html#rules-for-pojo-types
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#pojo-types