1.Flink的TypeInformation类
TypeInformation是flink中所有类型的基类,其作为生产序列化器和比较的一个工具。它包括了类型的一些基本属性,并可以通过它来生产序列化器(serializer),特殊情况下还可以生成类型比较器。(Flink中的比较器不仅仅是定义大小顺序,更是处理keys的基本辅助工具)
- 基本类型:所有Java基本数据类型和对应的装箱类型,加上void,String,Date,BigDecimal和BigInteger
- 基本数组和对象数组
- 复合类型:
- Flink Java Tuples (Flink Java API的一部分): 最多25个成员,不支持null成员
- Scala case 类 (包括 Scala tuples): 最多25个成员, 不支持null成员
- Row: 包含任意多个字段的元组并且支持null成员
- POJOs: 遵循类bean模式的类
- 辅助类型 (Option, Either, Lists, Maps, …)
- 泛型: Flink自身不会序列化泛型,而是借助Kryo进行序列化.
POJO类非常有意思,因为POJO类可以支持复杂类型的创建,并且在定义keys时可以使用成员的名字:dataSet.join(another).where("name").equalTo("personName")。同时,POJO类对于运行时(runtime)是透明的,这使得Flink可以非常高效地处理它们。
1.1 POJO类型的规则
在满足如下条件时,Flink会将这种数据类型识别成POJO类型(并允许以成员名引用字段):
- 该类是public的并且是独立的(即没有非静态的内部类)
- 该类有一个public的无参构造方法
- 该类(及该类的父类)的所有成员要么是public的,要么是拥有按照标准java bean命名规则命名的public getter和 public setter方法。
1.2 创建一个TypeInformation对象或序列化器###
创建一个TypeInformation对象时如下:
在Scala中,Flink使用在编译时运行的宏,在宏可供调用时去捕获所有泛型信息。
// 重要: 为了能够访问'createTypeInformation' 的宏方法,这个import是必须的
import org.apache.flink.streaming.api.scala._
val stringInfo: TypeInformation[String] = createTypeInformation[String]
val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]
你也可以在Java使用相同的方法作为备选。
为了创建一个序列化器(TypeSerializer),只需要在TypeInformation 对象上调用typeInfo.createSerializer(config)方法。
config参数的类型是ExecutionConfig,它保留了程序的注册的自定义序列化器的相关信息。在可能用到TypeSerializer的地方,尽量传入程序的ExecutionConfig,你可以调用DataStream 或 DataSet的 getExecutionConfig()方法获取ExecutionConfig。一些内部方法(如:MapFunction)中,你可以通过将该方法变成一个Rich Function,然后调用getRuntimeContext().getExecutionConfig()获取ExecutionConfig.
2 基本类型实现示例
以String为例:
//BasicTypeInfo.java
public static final BasicTypeInfo<String> STRING_TYPE_INFO = new BasicTypeInfo<>(String.class, new Class<?>[]{}, StringSerializer.INSTANCE, StringComparator.class);
StringSerializer如下
//StringSerializer.java
public final class StringSerializer extends TypeSerializerSingleton<String> {
private static final long serialVersionUID = 1L;
public static final StringSerializer INSTANCE = new StringSerializer();
private static final String EMPTY = "";
@Override
public boolean isImmutableType() {
return true;
}
@Override
public String createInstance() {
return EMPTY;
}
@Override
public String copy(String from) {
return from;
}
@Override
public String copy(String from, String reuse) {
return from;
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(String record, DataOutputView target) throws IOException {
StringValue.writeString(record, target);
}
@Override
public String deserialize(DataInputView source) throws IOException {
return StringValue.readString(source);
}
@Override
public String deserialize(String record, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
StringValue.copyString(source, target);
}
@Override
public boolean canEqual(Object obj) {
return obj instanceof StringSerializer;
}
@Override
protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
return super.isCompatibleSerializationFormatIdentifier(identifier)
|| identifier.equals(StringValue.class.getCanonicalName());
}
}
上面代码中出现的StringValue是真正进行input以及output序列化过程操作,基本类型都有相应的方法,后面会单独说明下多字段Record序列化形式。
StringComparator如下
public final class StringComparator extends BasicTypeComparator<String> {
private static final long serialVersionUID = 1L;
private static final int HIGH_BIT = 0x1 << 7;
private static final int HIGH_BIT2 = 0x1 << 13;
private static final int HIGH_BIT2_MASK = 0x3 << 6;
public StringComparator(boolean ascending) {
super(ascending);
}
@Override
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
String s1 = StringValue.readString(firstSource);
String s2 = StringValue.readString(secondSource);
int comp = s1.compareTo(s2);
return ascendingComparison ? comp : -comp;
}
@Override
public boolean supportsNormalizedKey() {
return true;
}
@Override
public boolean supportsSerializationWithKeyNormalization() {
return false;
}
@Override
public int getNormalizeKeyLen() {
return Integer.MAX_VALUE;
}
@Override
public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
return true;
}
@Override
public void putNormalizedKey(String record, MemorySegment target, int offset, int len) {;
final int limit = offset + len;
final int end = record.length();
int pos = 0;
while (pos < end && offset < limit) {
char c = record.charAt(pos++);
if (c < HIGH_BIT) {
target.put(offset++, (byte) c);
}
else if (c < HIGH_BIT2) {
target.put(offset++, (byte) ((c >>> 7) | HIGH_BIT));
if (offset < limit) {
target.put(offset++, (byte) c);
}
}
else {
target.put(offset++, (byte) ((c >>> 10) | HIGH_BIT2_MASK));
if (offset < limit) {
target.put(offset++, (byte) (c >>> 2));
}
if (offset < limit) {
target.put(offset++, (byte) c);
}
}
}
while (offset < limit) {
target.put(offset++, (byte) 0);
}
}
@Override
public StringComparator duplicate() {
return new StringComparator(ascendingComparison);
}
}
3 多字段Record示例
在开始这部分原理分析之前可以先看个示例代码
//RecordTest.java
public void testAddField() {
try {
// Add a value to an empty record
Record record = new Record();
assertTrue(record.getNumFields() == 0);
record.addField(this.origVal1);
assertTrue(record.getNumFields() == 1);
assertTrue(origVal1.getValue().equals(record.getField(0, StringValue.class).getValue()));
// Add 100 random integers to the record
record = new Record();
for (int i = 0; i < 100; i++) {
IntValue orig = new IntValue(this.rand.nextInt());
record.addField(orig);
IntValue rec = record.getField(i, IntValue.class);
assertTrue(record.getNumFields() == i + 1);
assertTrue(orig.getValue() == rec.getValue());
}
// Add 3 values of different type to the record
record = new Record(this.origVal1, this.origVal2);
record.addField(this.origVal3);
assertTrue(record.getNumFields() == 3);
StringValue recVal1 = record.getField(0, StringValue.class);
DoubleValue recVal2 = record.getField(1, DoubleValue.class);
IntValue recVal3 = record.getField(2, IntValue.class);
assertTrue("The value of the first field has changed", recVal1.equals(this.origVal1));
assertTrue("The value of the second field changed", recVal2.equals(this.origVal2));
assertTrue("The value of the third field has changed", recVal3.equals(this.origVal3));
} catch (Throwable t) {
Assert.fail("Test failed due to an exception: " + t.getMessage());
}
}
Record代表多个数值的记录,其可以包含多个字段(可空并不体现在该记录中),内部有一个bitmap标记字段是否被赋值。为了数据交换方便,Record中的数据都以bytes方式存储,字段在访问时才被进行反序列化。当字段被修改时首先是放在cache中,并在下次序列化时合入或者显式调用updateBinaryRepresenation()方法。
Notes:
- 该record必须是一个可变的对象,这样才可以被多个自定义方法使用来提升性能(后面单独分析)。该record是一个比较中的对象,为了减少对每个字段的序列化、反序列化操作,其保存了比较大的状态,需要有多个指针以及数组,从而要占用相对比较大的内存空间,在64位的JVM中要占用超过200bytes。
- 该类是非线程安全的
4 存放Record的数据结构
针对上面提出的存放数据结构的疑问,这里继续深入分析下。
- 将record放在一个迭代器中,当前存在一个叫BlockResettableMutableObjectIterator,其包含如下一些方法,读写都是在这个迭代器中进行。
其中以无参数next()方法为示例走读存储或者读取流程,代码如下:
public T next() throws IOException {
// check for the left over element
if (this.readPhase) {
return getNextRecord();
} else {
// writing phase. check for leftover first
T result = null;
if (this.leftOverReturned) {
// get next record
if ((result = this.input.next()) != null) {
if (writeNextRecord(result)) {
return result;
} else {
// did not fit into memory, keep as leftover
this.leftOverRecord = this.serializer.copy(result);
this.leftOverReturned = false;
this.fullWriteBuffer = true;
return null;
}
} else {
this.noMoreBlocks = true;
return null;
}
} else if (this.fullWriteBuffer) {
return null;
} else {
this.leftOverReturned = true;
return this.leftOverRecord;
}
}
}
通过源码可以看出,在方法执行时根据标记判断是读取还是写入流程,同时方法对应getNextRecord和writeNextRecord两个方法,都在抽象类AbstractBlockResettableIterator中,两个方法源码如下:
protected T getNextRecord() throws IOException {
if (this.numRecordsReturned < this.numRecordsInBuffer) {
this.numRecordsReturned++;
return this.serializer.deserialize(this.readView);
} else {
return null;
}
}
protected boolean writeNextRecord(T record) throws IOException {
try {
this.serializer.serialize(record, this.collectingView);
this.numRecordsInBuffer++;
return true;
} catch (EOFException eofex) {
return false;
}
}
其中存放数据是基于Flink内存管理部分进行申请以及维护大小等,相关初始化源码如下:
memoryManager.allocatePages(ownerTask, emptySegments, numPages);
this.collectingView = new SimpleCollectingOutputView(this.fullSegments,
new ListMemorySegmentSource(this.emptySegments), memoryManager.getPageSize());
this.readView = new RandomAccessInputView(this.fullSegments, memoryManager.getPageSize());
5 Flink 如何直接操作二进制数据
Flink 提供了如 group、sort、join 等操作,这些操作都需要访问海量数据。这里,我们以sort为例,这是一个在 Flink 中使用非常频繁的操作。
首先,Flink 会从 MemoryManager 中申请一批 MemorySegment,我们把这批 MemorySegment 称作 sort buffer,用来存放排序的数据。
我们会把 sort buffer 分成两块区域。一个区域是用来存放所有对象完整的二进制数据。另一个区域用来存放指向完整二进制数据的指针以及定长的序列化后的key(key+pointer)。如果需要序列化的key是个变长类型,如String,则会取其前缀序列化。如上图所示,当一个对象要加到 sort buffer 中时,它的二进制数据会被加到第一个区域,指针(可能还有key)会被加到第二个区域。
将实际的数据和指针加定长key分开存放有两个目的。第一,交换定长块(key+pointer)更高效,不用交换真实的数据也不用移动其他key和pointer。第二,这样做是缓存友好的,因为key都是连续存储在内存中的,可以大大减少 cache miss(后面会详细解释)。
排序的关键是比大小和交换。Flink 中,会先用 key 比大小,这样就可以直接用二进制的key比较而不需要反序列化出整个对象。因为key是定长的,所以如果key相同(或者没有提供二进制key),那就必须将真实的二进制数据反序列化出来,然后再做比较。之后,只需要交换key+pointer就可以达到排序的效果,真实的数据不用移动。
最后,访问排序后的数据,可以沿着排好序的key+pointer区域顺序访问,通过pointer找到对应的真实数据,并写到内存或外部(更多细节可以看这篇文章 Joins in Flink)。
5.1 缓存友好的数据结构和算法
随着磁盘IO和网络IO越来越快,CPU逐渐成为了大数据领域的瓶颈。从 L1/L2/L3 缓存读取数据的速度比从主内存读取数据的速度快好几个量级。通过性能分析可以发现,CPU时间中的很大一部分都是浪费在等待数据从主内存过来上。如果这些数据可以从 L1/L2/L3 缓存过来,那么这些等待时间可以极大地降低,并且所有的算法会因此而受益。
在上面讨论中我们谈到的,Flink 通过定制的序列化框架将算法中需要操作的数据(如sort中的key)连续存储,而完整数据存储在其他地方。因为对于完整的数据来说,key+pointer更容易装进缓存,这大大提高了缓存命中率,从而提高了基础算法的效率。这对于上层应用是完全透明的,可以充分享受缓存友好带来的性能提升。