Spark sort shuffle write的过程大致如下:
- ShuffleMapTask的runTask()方法
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
return writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
首先得到shuffleManager,shuffleManager分为三种SortShuffleManager,HashshuffleManager,UnsafeShuffleManager。这里我们focus on SortShuffleManager。得到shuffleManager后,再拿到SortShuffleWriter。在调用SortShuffleWriter的write()方法将数据写入shuffle文件。
- SortShuffleWriter的write()方法
override def write(records: Iterator[Product2[K, V]]): Unit = {
if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
sorter = new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.insertAll(records)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)
sorter.insertAll(records)
}
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
}
首先创建ExternalSorter对象,将数据插入到对象中。最后落盘(对每个Reducer生成一个数据文件和一个索引文件)。
- ExternalSorter的insertAll()方法
def insertAll(records: Iterator[_ <: Product2[K, V]]): Unit = {
// TODO: stop combining if we find that the reduction factor isn't high
val shouldCombine = aggregator.isDefined
if (shouldCombine) {
// Combine values in-memory first using our AppendOnlyMap
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
addElementsRead()
kv = records.next()
map.changeValue((getPartition(kv._1), kv._1), update)
maybeSpillCollection(usingMap = true)
}
} else if (bypassMergeSort) {
// SPARK-4479: Also bypass buffering if merge sort is bypassed to avoid defensive copies
if (records.hasNext) {
spillToPartitionFiles(
WritablePartitionedIterator.fromIterator(records.map { kv =>
((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])
})
)
}
} else {
// Stick values into our buffer
while (records.hasNext) {
addElementsRead()
val kv = records.next()
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
maybeSpillCollection(usingMap = false)
}
}
}
ExternalSorter里的存放数据的结构是PartitionedAppendOnlyMap对象。每写一条数据记录,都会调用maybeSpillCollection()方法来检查是否需要spill。
- ExternalSorter的maybeSpillCollection()方法
private def maybeSpillCollection(usingMap: Boolean): Unit = {
if (!spillingEnabled) {
return
}
if (usingMap) {
if (maybeSpill(map, map.estimateSize())) {
map = new PartitionedAppendOnlyMap[K, C]
}
} else {
if (maybeSpill(buffer, buffer.estimateSize())) {
buffer = if (useSerializedPairBuffer) {
new PartitionedSerializedPairBuffer[K, C](metaInitialRecords, kvChunkSize, serInstance)
} else {
new PartitionedPairBuffer[K, C]
}
}
}
}
estimateSize()是来估算PartitionedAppendOnlyMap对象占用的内存空间,估算的频率指数增长(为了控制估算函数的耗时)。
- ExternalAppendOnlyMap的spill()方法
先按照partition排序(TimSort算法,复杂度nlog(n)),在spill到磁盘。