基于spark1.6版本的理解,简单聊一聊spark shuffle 写操作的过程,以及该过程中可以优化的地方;见解粗略,往提出意见
spark1.6版本shuffle默认的方式是Sort based shuffle;
该方式使用了 SortShuffleWriter类的writer方法
写入的入口
override def write(records: Iterator[Product2[K, V]]): Unit = {
/**
* 获取排序的方式:ExternalSorter
* (1)聚合排序
* (2)不聚合排序
*/
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
//对其进行聚合排序,溢写
sorter.insertAll(records)
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
val blockId: ShuffleBlockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
//
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
}
(1)首先是根据是否要在map端进行聚合,获取不同的ExternalSorter对象,该对象会对数据进行聚合、排序、落地磁盘;
(2)如果需要进行聚合排序,或者直接落地磁盘,进入insertAll方法,如果需要聚合数据则会放入到PartitionedAppendOnlyMap[K, C] 对象中,否则放入到PartitionedPairBuffer[K, C]对象中,这两个对象占用的是执行内存,数据是一条条加入到缓存中,如果内存不够则会触发溢写磁盘
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
val shouldCombine = aggregator.isDefined
//使用聚合
if (shouldCombine) {
// Combine values in-memory first using our AppendOnlyMap
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
//遍历每一条数据,加入到缓存中,占用的是执行内存
while (records.hasNext) {
//记录条数
addElementsRead()
kv = records.next()
//此步功能:?????????? 聚合
map.changeValue((getPartition(kv._1), kv._1), update)
//
maybeSpillCollection(usingMap = true)
}
} else {
// Stick values into our buffer
while (records.hasNext) {
addElementsRead()
val kv = records.next()
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
maybeSpillCollection(usingMap = false)
}
}
}
参与聚合的会在此处进行聚合
map.changeValue((getPartition(kv._1), kv._1), update)
(3)判断是否需要进行溢写(maybeSpillCollection),如果溢写,则会重新生成PartitionedAppendOnlyMap或者PartitionedPairBuffer缓存对象
private def maybeSpillCollection(usingMap: Boolean): Unit = {
var estimatedSize = 0L
if (usingMap) {
//评估数据在内存中的大小byte
estimatedSize = map.estimateSize()
if (maybeSpill(map, estimatedSize)) {
map = new PartitionedAppendOnlyMap[K, C]
}
} else {
estimatedSize = buffer.estimateSize()
if (maybeSpill(buffer, estimatedSize)) {
buffer = new PartitionedPairBuffer[K, C]
}
}
if (estimatedSize > _peakMemoryUsedBytes) {
_peakMemoryUsedBytes = estimatedSize
}
}
(4)具体判断是否溢写的逻辑在maybeSpill,每接受32条数据,且满足当前在缓存中的数据大小大于阈值才会进行检查是否溢写,如果满足了检测条件,则去申请内存:2 * currentMemory - myMemoryThreshold,如果申请到,则不溢写,同时阈值会提高到2 * currentMemory,否则会进行溢写到磁盘,恢复阈值到初始值(默认5M,由‘spark.shuffle.spill.initialMemoryThreshold’设置)
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
//每32次检查一下以及放入到缓存中的大小大于阈值,决定是否进行spill
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// 从shulle内存池中申请2倍的当前使用内存
val amountToRequest = 2 * currentMemory - myMemoryThreshold
val granted =
taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null)
myMemoryThreshold += granted
shouldSpill = currentMemory >= myMemoryThreshold//当内存没有申请下来,那么就会spill
}
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold //要么内存不够,要么条数达到了限制
// 实际溢写
if (shouldSpill) {
_spillCount += 1//记录spill次数
logSpillage(currentMemory)
spill(collection)//开始溢写
_elementsRead = 0
_memoryBytesSpilled += currentMemory
releaseMemory()//释放内存到初始值
}
shouldSpill
}
(5)真正溢写由方法spill完成
a、首先会对数据进行排序,此时不需要额外的内存,只在缓存中进行;
b、其次是获取写流的对象,其中自己定义了写流的缓存,默认32k(spark.shuffle.file.buffer),当达到32k直接溢写到本地文件中,否则等待满足10000(spark.shuffle.spill.batchSize)条的条件;此处可以优化,提高这两个参数都可以减少溢写的次数,也就是减少形成的文件个数
(6)通过shuffle编号和map编号去获取该数据文件
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
(7)通过shuffle编号和map编号去获取shuffleBlock编号
val blockId: ShuffleBlockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
(8)在ExternalSorter对象中,对缓存中的数据以及已经spill的数据进行merger,形成一个文件
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
(9)生成索引文件
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
(10)将元数据信息写入到MapStatus中,后续的任务可以通过它获取结果信息
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
总结
1、shuffle写入的流程图
2、shuffle写入的优化简单有两个参数可以调节:
(1)spark.shuffle.file.buffer
(2)spark.shuffle.spill.batchSize
疑问
在对spill文件进行merger操作时,会发生OOM吗?由于对此处源码部分没有很清晰的认识,所以对此处产生了疑问,欢迎大家的解答