前言
本文前半部分的内容在很久之前讲过,但是最近又有交接到团队内的历史任务出现这方面导致的性能问题,故有必要再讲一次,并扩展一部分新内容。先通过两个例子来引入Java类型擦除。
Java类型擦除的表现
- 例一
这段代码无法通过编译,提示两个方法签名冲突,因为擦除类型相同。如果去掉其中一个方法,反编译之后的代码如下。
public void foo(List list) { }
- 例二
这段代码会返回true
。并且Java只允许List.class
的写法,不允许List<T>.class
的写法。
认识类型擦除
我们知道,泛型是高级语言中比较令人头疼的问题,一般来讲要实现泛型有两种方式:
- Code Sharing:对同一个原始类型下的泛型类型只生成同一份目标代码(在Java中就是字节码)。
- Code Specialization:对每一个泛型类型都生成不同的目标代码。
Java使用的泛型实现是前者,而C++和C#使用的是后者,它们也分别称为“假”泛型和“真”泛型。Code Sharing通过类型擦除来保证只生成一份目标代码,但也导致程序在运行时对泛型类型没有感知,所以上述例子一的代码反编译后只剩下了List
,例子二中的类型比较实际上都是Class<? extends ArrayList>
的比较。如果Java也采用Code Specialization机制(想一想C++ Template)的话,所有List<T>
就都是显式不同的类型了。
为什么Java要采用Code Sharing和类型擦除呢?主要有两点原因:一是Java泛型是到1.5版本才出现的特性,在此之前Java已经在无泛型的条件下经历了较长时间的发展,如果采用Code Specialization,就得对Java类型系统做伤筋动骨的改动,并且无法保证向前兼容性;二是Code Specialization对每个泛型类型都生成不同的目标代码,如果有10个不同泛型的List
,就要生成10份字节码,加重解释和执行负担。
由此可见,类型擦除让JVM省了不少事,但是加重了编译器的工作量。编译器必须在运行期之前就进行检查,禁止模糊的或者不合法的泛型使用方式。再举一个例子。
这种用法也是不允许的,换句话说,里氏替换原则不适用于Java的泛型类型参数。这并不难理解:对于一个List<Object>
而言,向其中添加字符串是完全合法的,但是如果方法传入的参数为List<Integer>
的话就会直接造成ClassCastException
,因此编译器会提前block掉这种可能性。
还没完,如果把traverse()
方法参数中的List<Object>
换成用通配符表示的List<?>
,那么traverse()
方法调用就没问题,但list.add()
语句就会编译不通过。这是因为list.add()
方法无法判断具体要加入列表的是Object
的哪个子类实例,因此会用最简单粗暴的方法来处理,即全部拒绝。相对地,list.get()
则是可以编译通过的,因为编译器能够通过<? extends T>
与<? super T>
得知泛型类型的上下界限。
如果泛型类型有界限,在类型擦除时会根据最左侧的泛型参数来替换,例如下面的泛型类。
class Test<T extends Comparable & Serializable> {
private T value;
public T getValue() { return value; }
public void setValue(T value) { this.value = value; }
}
类型擦除后就会变成:
class Test {
private Comparable value;
public Comparable getValue() { return value; }
public void setValue(Comparable value) { this.value = value; }
}
同理,如果没有规定T是哪个类的子类或者超类,就会替换为Object。
下面来看类型擦除为Flink类型体系带来的问题,并介绍Flink规避此问题的类型提示(Type Hint)机制。
Flink的类型提示机制
以Flink自带示例中的SocketWindowWordCount
为例,如果我们将它的主逻辑改写成Lambda表达式,如下:
DataStream<WordWithCount> windowCounts = text
.flatMap((String value, Collector<WordWithCount> out) -> {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce((a, b) ->
new WordWithCount(a.word, a.count + b.count)
);
执行时会抛出如下异常。
这说明程序无法在运行时推断出flatMap()
算子的返回类型。为什么之前采用匿名内部类就没有问题?因为匿名内部类会被真正地编译为.class
文件,而Lambda表达式是在运行时调用invokedynamic
指令,亦即在第一次执行其逻辑时才会确定。因此Lambda表达式比起匿名内部类,会丢失更多的类型信息。看一下flatMap()
算子的签名:
void flatMap(T value, Collector<O> out);
经过类型擦除,Collector
的泛型参数被抹掉了(参看报错The generic type parameters of Collector are missing
),自然就会抛出无法确定返回类型的异常。如果我们采用的不是flatMap()
算子而是map()
,就不会出现这种问题,因为map()
的返回类型可以自动推断。
为了克服类型擦除带来的问题,Flink类型系统中内置了类型提示机制,即用户在调用此类算子之后,手动指明返回的类型信息。在flatMap()
之后调用returns()
方法,就可以指定返回类型了。
text.flatMap((String value, Collector<WordWithCount> out) -> {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
})
.returns(TypeInformation.of(WordWithCount.class));
但是,如果返回类型本身就有泛型,比如在Flink中常用的元组(TupleX
),就得另外换一种写法,即通过继承TypeHint
的匿名内部类保留泛型信息。
.returns(TypeInformation.of(new TypeHint<Tuple2<String, String>>() { }))
下面再来看一个比较隐匿的可能会引发性能问题的场景。
POJO序列化fallback问题与类型注入
我们知道,Flink对标准的Java POJO类型有专门的PojoSerializer
序列化器支持,性能相当好。但是也有例外,考虑以下包含容器成员的POJO:
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class MyPojo {
private String myId;
private List<Integer> myTags;
private Map<String, Integer> myFlags;
}
在Flink应用执行时,会提示myTags
和myFlags
两个字段是Generic Types,也就是要fallback到Kryo Serializer做序列化,而非Flink原生的ListSerializer
和MapSerializer
。如果我们调用ExecutionConfig#disableGenericTypes()
方法来禁用fallback,则应用无法执行,并有异常提示Generic types have been disabled and type java.util.Map is treated as a generic type
。
可见,由于类型擦除的存在,POJO中的所有泛型参数都无法识别,无法通过原生序列化器操作,故序列化性能会有数倍的下降,在涉及状态操作和网络传输时尤其明显。本文开头提到的出现问题的历史Flink任务,就是因为单个POJO中包含了十几个Map
,且均有状态读写,近期因新业务上线,流量增大,导致大量时间耗费在序列化层面,任务严重反压。
由此可见,尽量让POJO内不包含泛型类型(多数情况下就是避免使用Java容器)是最好的,但如果必须使用的话,如何解决这个问题呢?答案是自行实现对应类型的TypeInfoFactory
,并通过@TypeInfo
注解对泛型字段做类型注入。例如,我们实现一个无嵌套的Map
对应的TypeInfoFactory
。
@SuppressWarnings("unchecked")
public class SimpleMapTypeInfoFactory<K, V> extends TypeInfoFactory<Map<K, V>> {
@Override
public TypeInformation<Map<K, V>> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
return Types.MAP(
(TypeInformation<K>) genericParameters.get("K"),
(TypeInformation<V>) genericParameters.get("V")
);
}
}
然后为myFlags
字段注入此类型,在序列化时,MapSerializer
就会对此字段生效,性能恢复正常。
@TypeInfo(SimpleMapTypeInfoFactory.class)
private Map<String, Integer> myFlags;
当然,对所有其他无嵌套的Map<K, V>
类型字段,都可以复用上面的SimpleMapTypeInfoFactory
类,无需重复编写。至于其他类型与组合,读者举一反三即可。
最后再提醒一句,Set
不是Flink序列化器原生支持的类型,即不存在SetSerializer
,故所有需要用到Set<T>
的场合,都需要用Map<T, Boolean>
代替,并注入上述SimpleMapTypeInfoFactory
,以获得最佳性能。关于误用Set
这档事,笔者之前另有文章说明,不再赘述。