Shuffle Write
TaskRunner 在启动 Driver 端发送过来的 Task 的时候,调用的是 Task.run() 方法,Task.run() 方法会调用 runTask() 方法来进行计算,runTask() 是由 Task 的子类来进行具体实现的:
final def run(
taskAttemptId: Long,
attemptNumber: Int,
metricsSystem: MetricsSystem): T = {
// 想 Executor 端 BlockManager 注册 Task
SparkEnv.get.blockManager.registerTask(taskAttemptId)
context = new TaskContextImpl(...)
TaskContext.setTaskContext(context)
taskThread = Thread.currentThread()
new CallerContext(...).setCurrentContext()
try {
// 进行计算
runTask(context)
} // ...
}
更多细节,参考上篇文章:Executor 和 Task 部分
ShuffleMapTask
Task 有 ShuffleMapTask 和 ResultTask 两个子类,这篇文章,我们讨论的是 Shuffle -Write 部分,需要用到的是ShuffleMapTask。
ShuffleMapTask.runTask() 的实现细节:
override def runTask(context: TaskContext): MapStatus = {
// ...
var writer: ShuffleWriter[Any, Any] = null
try {
// 获取 Shuffle Manager
val manager = SparkEnv.get.shuffleManager
// 获取 Writer
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
// 将计算结果写出
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
// ...
}
}
由于后文需要,我们得先看下 manager.getWriter() 中传入的 dep.shuffleHandle 的实现细节:
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
Spark 2.2.3 默认使用SortShuffleManager 作为 Shuffle 的管理端,SortShuffleManager.registerShuffle() 的实现细节:
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// 根据spark.shuffle.sort.bypassMergeThreshold的值(默认是200)
// 判断是否需要进行 Map 的合操作
// 如果 partitions 的个数小于 200 就不进行该操作
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// 看看能不能使用序列化映射的形式映射
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// 其它
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}
SortShuffleManager 会根据配置和依赖的不同,实例化不同的 ShuffleHandle。
接下来,我们就看看 manager.getWriter() 方法的实现细节,也就是 SortShuffleManager.getWriter():
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
numMapsForShuffle.putIfAbsent(
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
// 匹配 dep.shuffleHandle,选择实例化 Writer 对象
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
bypassMergeSortHandle,
mapId,
context,
env.conf)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
}
}
SortShuffleManager.getWriter() 会根据 ShuffleHandle 的不同,实例化不同的 ShuffleWriter。
ShuffleMapTask.runTask() 获取到 ShuffleWriter 后,就会调用 ShuffleWriter.write() 方法将计算结果写出,这里我以 SortShuffleWriter 为例:
override def write(records: Iterator[Product2[K, V]]): Unit = {
// 根据是否进行 Map 端的合并,创建不同类型的 ExternalSorter
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
// 向 ExternalSorter 插入全部数据
sorter.insertAll(records)
// 在 Shuffle 写入时,不会打开多个文件,只打开一个文件
// 也就是所有的分区都写入到一个文件中
// 获取数据写出的临时文件的输出流
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
// 将 ExternalSorter 中的所有数据写入到临时文件中
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
// 创建一个索引文件,记录每个块的偏移量和结束位置
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
// 输出结果的信息
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}
我们先看下 ExternalSorter 的使用说明,在 ExternalSorter 类的注释中:
Users interact with this class in the following way:
1. Instantiate an ExternalSorter.
2. Call insertAll() with a set of records.
// 调用 iterator() 方法获取排序/聚集后的结果
// 或者调用 writePartitionedFile() 方法,将排序/聚集后的数据写出
3. Request an iterator() back to traverse sorted/aggregated records.
- or -
Invoke writePartitionedFile() to create a file containing sorted/aggregated outputs
that can be used in Spark's sort shuffle.
我们先看下 ExternalSorter.insertAll() 的实现细节:
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
// TODO: stop combining if we find that the reduction factor isn't high
val shouldCombine = aggregator.isDefined
// 在 Map 端进行合并
if (shouldCombine) {
// 首先使用 AppendOnlyMap 在内存中合并
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
// update 函数
val update = (hadValue: Boolean, oldValue: C) => {
// 如果 key 已经存在,就使用 mergeValue 行聚合操作
// 如果不存在,就使用 createCombiner() 方法进行初始化操作
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
// 如果记录中有值
while (records.hasNext) {
addElementsRead()
// 获取 key-value
kv = records.next()
// map 是 PartitionedAppendOnlyMap 的实例化对象
map.changeValue((getPartition(kv._1), kv._1), update)
// 是否进行 Spill 操作
maybeSpillCollection(usingMap = true)
}
} else {
// 不进行 Map 端合并
while (records.hasNext) {
addElementsRead()
// 获取 key-value
val kv = records.next()
// 向缓存区中写入
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
// 是否进行 Spill 操作
maybeSpillCollection(usingMap = false)
}
}
}
ExternalSorter.insertAll() 将任务分为需要 map 端聚合和不需要 map 端聚合两种情况,分别进行处理。
在 map 端进行聚合是由 map.changeValue() 方法来完成的,map 是 PartitionedAppendOnlyMap 的实例化对象。
changeValue() 方法是在 AppendOnlyMap 中进行的实现:
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
// ...
var pos = rehash(k.hashCode) & mask
var i = 1
while (true) {
// data 是一个存储 key 和 value 的数组,格式为:key0, value0, key1, value1...
// key = 2 * pos,value = 2 * pos + 1
val curKey = data(2 * pos)
if (curKey.eq(null)) { // key 不存在
// 进行 createCombiner 操作
val newValue = updateFunc(false, null.asInstanceOf[V])
data(2 * pos) = k
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
incrementSize()
return newValue
} else if (k.eq(curKey) || k.equals(curKey)) { // key 存在
// 进行 mergeValue 操作
val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
return newValue
} else {
val delta = i
pos = (pos + delta) & mask
i += 1
}
}
null.asInstanceOf[V] // Never reached but needed to keep compiler happy
}
不在 map 端进行聚合就会通过 buffer.insert() 方法,将数据插入到缓存去中,buffer 是 PartitionedPairBuffer 的实例化对象。
PartitionedPairBuffer.insert():
def insert(partition: Int, key: K, value: V): Unit = {
if (curSize == capacity) { // 达到容量
// 扩容
growArray()
}
// 以 (partition, key) 元组的形式存储
data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])
// 存储 value
data(2 * curSize + 1) = value.asInstanceOf[AnyRef]
curSize += 1
afterUpdate()
}
在执行完 map.changeValue() 和 buffer.insert() 操作后都调用了 maybeSpillCollection() 方法来判断是否执行溢写操作:
private def maybeSpillCollection(usingMap: Boolean): Unit = {
var estimatedSize = 0L
if (usingMap) { // 进行 map 端的合并
estimatedSize = map.estimateSize()
if (maybeSpill(map, estimatedSize)) { // 是否溢写
// 发生溢写后,就会创建一个新的 map
map = new PartitionedAppendOnlyMap[K, C]
}
} else { // 不进行 map 端的合并
estimatedSize = buffer.estimateSize()
if (maybeSpill(buffer, estimatedSize)) { // 是否溢写
// 进行溢写后,就会创建一个新的 buffer
buffer = new PartitionedPairBuffer[K, C]
}
}
// ...
}
maybeSpill() 的实现细节:
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // 已经操作阈值了
// 从 Shuffle 内存池中,最多申请 currentMemory 的两倍
// myMemoryThreshold 为内存阈值
// 通过设置 spark.shuffle.spill.initialMemoryThreshold 参数来指定
// 默认为 5 * 1024 * 1024
val amountToRequest = 2 * currentMemory - myMemoryThreshold
// 通过 TaskMemoryManager 来申请更多的内存
val granted = acquireMemory(amountToRequest)
// 提升阈值
myMemoryThreshold += granted
// 还是超过阈值,就需要溢写
shouldSpill = currentMemory >= myMemoryThreshold
}
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
// 进行溢写
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
// 真正的溢写操作
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
releaseMemory()
}
shouldSpill
}
spill():
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
// 在内存对 key 进行排序
val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
// 溢写到磁盘
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
// 将溢写文件加到溢写磁盘中
spills += spillFile
}
到接力 ExternalSorter.insertAll() 方法就算简单的剖析完了,接下来,我们看看 ExternalSorter.writePartitionedFile() 方法:
def writePartitionedFile(
blockId: BlockId,
outputFile: File): Array[Long] = {
// Track location of each range in the output file
val lengths = new Array[Long](numPartitions)
// 获取 Writer
val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
context.taskMetrics().shuffleWriteMetrics)
if (spills.isEmpty) {
// 溢写文件集合为空,也就是没有溢写过
val collection = if (aggregator.isDefined) map else buffer
// 按分区和 key 排序后的数据迭代器
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
while (it.hasNext) {
// 遍历分区 并写入
val partitionId = it.nextPartition()
while (it.hasNext && it.nextPartition() == partitionId) {
it.writeNext(writer)
}
val segment = writer.commitAndGet()
lengths(partitionId) = segment.length
}
} else {
// 发生过溢写操作,需要将溢写文件进行合并
// this.partitionedIterator 会将磁盘和内存中的数据合并到一起
for ((id, elements) <- this.partitionedIterator ) {
if (elements.hasNext) {
for (elem <- elements) {
writer.write(elem._1, elem._2)
}
val segment = writer.commitAndGet()
lengths(id) = segment.length
}
}
}
// ...
lengths
}
this.partitionedIterator 实现细节:
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
val usingMap = aggregator.isDefined
val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
if (spills.isEmpty) {
// 这种情况我们不考虑
} else {
// 合并数据
merge(spills, destructiveIterator(
collection.partitionedDestructiveSortedIterator(comparator)))
}
}
merge() 的实现细节:
private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
: Iterator[(Int, Iterator[Product2[K, C]])] = {
// 溢写文件的 reader
val readers = spills.map(new SpillReader(_))
val inMemBuffered = inMemory.buffered
// 根据分区数进行迭代
(0 until numPartitions).iterator.map { p =>
// 当前分区的迭代器
val inMemIterator = new IteratorForPartition(p, inMemBuffered)
// 合并操作
val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
if (aggregator.isDefined) {
// 需要 map 端聚合
(p, mergeWithAggregation(
iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
} else if (ordering.isDefined) {
// No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
// sort the elements without trying to merge them
(p, mergeSort(iterators, ordering.get))
} else {
(p, iterators.iterator.flatten)
}
}
}
合并操作我就不深入的剖析了。
简单的总结一下:
ShuffleMapTask 会根据不同的运行情况,选择不同的 Shuffle 写方式,我们以 SortShuffleWriter 为例。
SortShuffleWriter 在收到计算结果后,会根据是否进行 map 端聚合来实例化 ExternalSorter,然后,将计算结果通过 ExternalSorter.insertAll() 方法传递给 ExternalSorter 实例化对象中。
ExternalSorter 会根据是否需要进行 map 端聚合,对传递过来的数据进行不同形式的处理。如果在 map 端聚合,就在内存中进行聚合;如果不需要,就直接写入到内存缓存区中。
没插入完一条数据,都会进行是否需要溢写判断。如果达到阈值,ExternalSorter 并不会直接进行溢写操作,而是向 TaskMemoryManager 申请更多的内存,然后调整阈值。如果 TaskMemoryManager 无法分配足够多的内存时,才会进行溢写,每次溢写都会产生一个新文件,并重新创建容器(map / buffer)。
在完成 insertAll() 操作后,就会将各个分区数据写入到临时文件(就一个)中了。
写入的时候也分两种情况,如果没有产生过溢写文件,也就是数据全在内存中,就会将数据按照分区和 key 进行排序,然后依次写入到临时文件中;如果产生过溢写文件,就会按照分区进行迭代,然后遍历溢写文件,合并这个分区的数据,最后写入到临时文件中。
最后一步,就是创建索引文件,记录各个块的偏移量和结束的位置,Shuffle Reader 进行读的时候,可以直接调用 getBlockData() 方法进行读取。