UnsafeShuffleWriter 对应SortShuffle的tungsten-sort方式
实现方式参考图:
UnsafeShuffleWriter内部使用了和BytesToBytesMap基本相同的数据结构处理map端的输出,不过将其细化为ShuffleExternalSorter和ShuffleInMemorySorter两部分,功能如下
ShuffleExternalSorter 使用MemoryBlock存储数据,每条记录包括长度信息和K-V Pair
ShuffleInMemorySorter 使用long数组存储每条记录对应的位置信息(page number + offset),以及其对应的PartitionId,共8 bytes
排序-Sort
排序的规则是一句partitionId,partitionId这是依据数据hash(Key)得到的。
写文件或溢写前(spill到disk前),根据数据的PartitionId信息,使用TimSort算法对ShuffleInMemorySorter的long数组排序,排序的结果为,PartitionId相同的聚集在一起,且PartitionId较小的排在前面,ShuffleExternalSorter中的数据不需要处理,如下图所示:
写文件-溢出-Spill
依次读取ShuffleInMemorySorter中long数组的元素,再根据page number和offset信息去ShuffleExternalSorter中读取K-V Pair写入文件,如下图所示:
合并-Merge
内存不足时,溢写bucket缓存数据到磁盘,每次溢写会生成上图中的一个filteSegemtn,如果多次溢写产生多个fileSegment(图中的tmp dataFile),会在map端数据处理结束后进行merge合并为一个dataFile,如下图所示:
至此,UnsafeShuffleWriter的实现就介绍完了。
优势
SPARK-7081中简述了UnsafeShuffleManager的优势,如下介绍:
ShuffleExternalSorter使用UnSafe API操作序列化数据,而不是Java对象,减少了内存占用及因此导致的GC耗时(参考Spark 内存管理之Tungsten),这个优化需要Serializer支持relocation。
ShuffleExternalSorter存原始数据,ShuffleInMemorySorter使用压缩指针存储元数据,每条记录仅占8 bytes,并且排序时不需要处理原始数据,效率高。
溢写 & 合并这一步操作的是同一Partition的数据,因为使用UnSafe API直接操作序列化数据,合并时不需要反序列化数据。
溢写 & 合并可以使用fastMerge提升效率(调用NIO的transferTo方法),设置spark.shuffle.unsafe.fastMergeEnabled为true,并且如果使用了压缩,需要压缩算法支持SerializedStreams的连接,各默认值如下
使用
Spark Shuffle之Sort Shuffle中讨论了使用UnsafeShuffleWriter需满足的前提条件,如下
接下来分析下为什么要满足这三个要求
map-side aggregation:从上面的实现也可以看出,UnsafeShuffleWriter不是类似HashMap的数据结构,无法聚合key对应的value,所以无法支持map端的aggregation。
Partition数小于16777216:参考第一幅图,存储PartitionId信息使用24bit,能表示的最大值为 (1 << 24) = 16777215,因此Partition数要小于16777216。
Serializer支持relocation:原始数据首先被序列化处理,并且再也不需要反序列,在其对应的元数据被排序后,需要Serializer支持relocation,在指定位置读取对应数据。
总结
本文介绍tungsten-sort(UnsafeShuffleWriter)的实现、优势及何种情况下被Spark使用。