一.前言
Spark的作业和任务调度系统是其核心。Spark的作业调度主要是基于RDD的一系列操作构成一个作业,然后在Executor上执行,这些操作算子主要分为转换和行动算子,对于转换算子的计算是lazy级别的,也就是延迟执行,只有出现了行动算子才触发作业的提交。在Spark调度中,最重要的是DAGScheduler和TaskSechduler两个调度器,其中DAGScheduler负责任务的逻辑调度,将作业拆分成不同Stage,具有依赖关系的任务集,而TaskSechduler则负责具体任务的调度执行。
下面介绍一些相关术语
- 1.作业job:RDD中由action算子触发所生成的一个或者多个Stage
- 2.调度阶段Stage:每个作业会因为RDD之间的依赖关系拆分成多组任务集合,称之为Stage,也叫做任务集TaskSet,Stage是由DAGScheduler来划分的,Stage分为ShuffleMapStage和ResultStage两种
-
3.DAGSchuduler:面向Stage的任务调度器,负责Spark提交的作业,根据RDD的依赖关系划分Stage,根据Stage中的最后一个RDD中的partition来确定task的数量,确定最优的task的location,封装成
taskSet提交给taskScheduler - 4.TaskScheduler:接受DAGScheduler提交过来的调度阶段,然后把任务发到woker节点上的Executor来运行任务
SparkContext中有三个重要的组件,DAGScheduler、TaskSechduler、和SchedulerBackend
1.DAGScheduler
DAGScheduler主要负责将用户的应用的DAG划分为不同的Stage,其中每个Stage由可以并发执行的一组Task构成, 这些Task的执行逻辑完全相同,只是作用于不同的数据。
2.TaskSechduler
负责具体任务的调度执行,从DAGScheduler接收不同Stage的任务,按照调度算法,分配给应用程序的资源Executor上执行相关任务,并为执行特别慢的任务启动备份任务。
3.SchedulerBackend
分配当前可用的资源, 具体就是向当前等待分配计算资源的Task分配计算资源(即Executor) , 并且在分配的Executor上启动Task, 完成计算的调度过程。 它使用reviveOffers完成上述的任务调度。
接下来我们开始根据源码进行作业和任务调度的流程分析。
二.作业和任务调度流程分析
1.划分Stage阶段
首先我们从一个action算子开始明确整个作业的流程。
例如在RDD类中的foreach中会调用SparkContext的runJob方法,这里层层找下去会发现最终调用了DAGScheduler的runJob方法。
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
DAGScheduler的runJob方法中,会调用submitJob,这里会发生阻塞,直到返回作业完成或者失败的结果。
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
然后在submitJob方法里,创建一个jobWaiter对象,封装了job相关信息,并借助内部消息处理把这个对象发送给DAGScheduler的内嵌类DAGSchedulerEventProcessLoop进行处理。
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
//提交任务。eventProcessLoop 是 DAGSchedulerEventProcessLoop 对象
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
最后在DAGSchedulerEventProcessLoop消息接受,方法onReceive中,会调用doOnReceive,,会进行模式匹配,匹配到接受的jobSubmitted样例类,继续调用DAGScheduler的handleJobSubmitted方法来提交作业,在该方法中会进行stage切分。
handleJobSubmitted的源码如下
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
略.....
这里通过调用createResultStage方法获取最后一个stage。
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
上面通过createResultStage方法调用getOrCreateParentStages方法,主要流程是根据finalRdd找出其依赖的祖先rdd是否存在shuffle操作,如果没有shuffle操作,则本次作业只有一个resultStage,该stage不存在父stage,如果存在shuffle操作,则本次作业存在一个resultStage和至少一个shuffleMapStage,具体是根据两个set集合和一个栈完成的Stage判断,而且最多只会返回当前Stage的直接父Stage,并不会做过多的向前回溯。生成的resultStage叫做finalStage。
核心的getShuffleDependencies代码如下
private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}
上述的过程总结如下:
DAGScheduler会从最后一个RDD出发,优先使用广度优先遍历整个依赖树,从而划分Stage,Stage的划分依据是以是否为宽依赖shuffleDependency进行的,即当某个RDD的操作为shuffle时,以shuffle操作为界限划分成两个Stage。
当所有Stage划分完毕,这些Stage直接就形成了一个依赖关系,这些父Stage都会被封装到List集合中,被称为parents,通过该属性可以获取当前Satge的所有父Stage,Stage是划分Spark作业执行的重要组成部分。
到这里Stage划分阶段就完成了。
2.提交Stage阶段
通过上一阶段生成的finalStage,生成一个作业实例,在该作业实例执行过程中通过监听总线获取作业,Stage执行情况。
handleJobSubmitted的源码如下
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
....
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}
之后在submitStage方法中调用getMissParentStage方法获取finalStage中的父调度阶段,如果不存在父调度阶段,则使用submitMissingTasks方法提交执行,如果存在父Stage,则把该调度阶段存放到waitingStages列表中。
submitStage的源码如下
//递归寻找stage
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing: List[Stage] = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
//最终会执行 submitMissingTasks 方法
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
如果waitingStages列表不为空,则递归调用submitStage方法,把存在父Stage的Stage放入到waitingStages列表中,不存在父Stage的作为作业的运行入口,这就是一个递归的过程,递归的出口就是找到finalStage在stage数中的firstStage。
getMissParentStage的源码如下
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new ArrayStack[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
}
}
}
}
}
waitingForVisit.push(stage.rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
missing.toList
}
可以看到上述的这种算法,实现了一种大递归,小循环的调度模式,避免了回溯到firstStage时,因为递归过多而栈溢出的问题。最后执行的submitMissingTasks方法传入的Stage都将会是Stage依赖树中的源Stage。
3.提交任务阶段
当Stage切分阶段完成之后,在DAGScheduler的submitMissingTasks方法中,会根据Stage中最后一个RDD的partition个数拆分对应个数的任务。
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
....
}
/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
}
之后submitMissingTasks方法会对每个task计算出它的最佳位置,通过调用getPreferredLocs方法完成
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
getPreferredLocsInternal方法中会首先判断partition是否被缓存了,如果被缓存了就获取其缓存的位置,如果没有就返回该RDD的最佳位置列表,通过调用不同RDD的getPreferredLocations实现来完成。如果有些RDD没有实现该方法,那么rddPrefs为空,则会判断该RDD的依赖是否是窄依赖,然后获取该RDD的第一个窄依赖对应的RDD,计算该RDD去获取最佳位置列表。
getPreferredLocsInternal的源码如下
private def getPreferredLocsInternal(
rdd: RDD[_],
partition: Int,
visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
// If the partition has already been visited, no need to re-visit.
// This avoids exponential path exploration. SPARK-695
if (!visited.add((rdd, partition))) {
// Nil has already been returned for previously visited partitions.
return Nil
}
// If the partition is cached, return the cache locations
val cached = getCacheLocs(rdd)(partition)
if (cached.nonEmpty) {
return cached
}
// If the RDD has some placement preferences (as is the case for input RDDs), get those
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
// If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
// that has any placement preferences. Ideally we would choose based on transfer sizes,
// but this will do for now.
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
case _ =>
}
Nil
}
之后submitMissingTasks方法会进行任务集的获取,根据Stage的不同划分出不同的Task,对于ResultStage生成resultTask,对于shuffleMapStage生成shuffleMapTask,最后这些任务组成一个任务集提交到TaskScheduler中进行处理。
一个任务集taskSet包含该Stage中的所有任务,这些任务的处理逻辑完全一样,只不过是对应的处理的数据不一样,而且这些数据对应的是其数据分片partition。需要注意的是,这里的任务都会被序列化,并且以二进制的形式广播出去。
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
if (tasks.size > 0) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
//以taskSet形式提交任务
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
上面DAGScheduler将任务集交给TaskScheduler去处理提交,当TaskScheduler收到发来的任务集时,在submitTask方法中构建一个TaskSetManager实例,用于管理这个任务集的生命周期,之后将该任务集管理器加入到系统调度池中,由系统统一调配,该调度器属于应用级别,支持FIFO和FAIR两种调度模式(schedulableBuilder.addTaskSetManager)
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
backend.reviveOffers()
}
之后通过调度器后台进程schedulerBackend的reviveOffers分配资源并运行,这里的schedulerBackend是CoarseGrainedSchedulerBackend粗粒度的schedulerBackend,该方法会将driverEndpoint端点发送消息。
override def reviveOffers() {
//给Driver 提交task,在当前类中的DriverEndpoint中 有receive方法来接收数据
driverEndpoint.send(ReviveOffers)
}
上面的DriverEndpoint是CoarseGrainedSchedulerBackend中的一个内部类,这里并没有通过内部类的方式直接去调用而是遵循了消息传递调度的规范,通过通过rpcEnv来完成的,也就是说,通过dipatcher的生产者消费者模式来处理消息,保证消息处理的统一性。
最终会调用DriverEndpoint的receive方法,匹配ReviveOffers,然后调用makeoffers方法
// Make fake resource offers on all executors
private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
}
if (!taskDescs.isEmpty) {
//去Executor中启动Task
launchTasks(taskDescs)
}
}
DriverEndpoint中的makeoffers方法会先获取集群中可以的使用的Executor,然后发送到TaskSehedulerImpl中进行对任务集的任务分配资源,resourceOffer方法在资源分配过程中会根据调度策略对TaskSetManager进行排序,然后依次对这些TaskSetManager按照就近原则分配资源,顺序为PROCESS_LOCAL,NODE_LOCAL,NO_PREF,RACK_LOCAL,ANY。
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
....
val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
for (taskSet <- sortedTaskSets) {
var launchedAnyTask = false
var launchedTaskAtCurrentMaxLocality = false
for (currentMaxLocality <- taskSet.myLocalityLevels) {
do {
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
之后分配好的资源任务最后提交到launchTask方法中。该方法会把任务一个一个的发送到woker节点上的CoarseGrainedExecutorBackend上,然后通过其内部的Executor来执行任务。
4.任务执行阶段
当CoarseGrainedExecutorBackend接受到LaunchTask消息时,会调用Executor的launchTask方法来处理,初始化一个TaskRunner来封装任务,它用于管理任务运行时的细节,再把TaskRunner对象放入到ThreadPool线程池中去执行。
//启动Task
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
//Executor 启动Task
executor.launchTask(this, taskDesc)
}
在TaskRunner的run方法中首先会对发送过来的Task本身和它所依赖的Jar等文件反序列化,然后对反序列化的任务调用Task中的runTask方法,由于task本身是一个抽象类,具体的runTask方法是由它的两个子类ShuffleMapTask和ReduceTask来实现。
对应ShuffleMapTask来说:
runTask方法中首先会反序列化广播变量中的RDD及其依赖关系,之后通过shuffleManager(在2.3.1版本中只有sortShuffleManager)根据从依赖关系中获取ShuffleHandle,调用getWriter方法,创建对应的shuffleWriter,
getWriter中的会对handle的类型做判断,采用哪种shuffle写,在Spark中有三种shuffle写BypassMergeSortShuffleWriter、UnsafeShuffleWriter、SortShuffleWriter。
这三者和ShuffleHandle的对应关系如下:
- UnsafeShuffleWriter:SerializedShuffleHandle
- bypassMergeSortShuffleWriter:BypassMergeSortShuffleHandle,
- SortShuffleWriter:BaseShuffleHandle
ShuffleMapTask的runTask方法
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
//反序列化回来当前RDD的依赖关系
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
/**
* 从依赖关系中获取ShuffleHandle ,调用getWriter 方法 创建相对应的 ShuffleWriter
*/
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
//写磁盘
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
之后就是调用相关shuffleWriter中的write,通过该Stage中的最后一个rdd的迭代器,从数据源读数据,迭代器的调用就会形成一个pipeline,计算结果会保存在本地系统中的blockManager中,最终返回给DAGScheduler是一个MapStatus对象。该对象中管理了ShuffleMapTask的运算结果存储到BlockManager的相关存储信息,而不是计算结果本身,这些储存信息将会成为下一阶段任务需要获取输入数据时的依据。
以BypassMergeSortShuffleWriter为例,可以看到这里调用了rdd的迭代器,构成了pipeline,向前抓取数据。
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
partitionWriters = new DiskBlockObjectWriter[numPartitions];
partitionWriterSegments = new FileSegment[numPartitions];
for (int i = 0; i < numPartitions; i++) {
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
//partitioner.getPartition(key) 得到当前这条数据写入的分区号
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}
for (int i = 0; i < numPartitions; i++) {
final DiskBlockObjectWriter writer = partitionWriters[i];
partitionWriterSegments[i] = writer.commitAndGet();
writer.close();
}
File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
File tmp = Utils.tempFileWith(output);
try {
partitionLengths = writePartitionedFile(tmp);
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
bypassMergeSortShuffleWriter中的write方法首先会获取序列化器,通过blockManager获取diskWriter,即磁盘文件的IO流(磁盘对象写),其中缓冲区大小为32k,之后将每一条记录partitioner.getPartition(key),判断其分区号,选择对应的分区的IO流写入到文件中去,然后这些文件最终会形成一个大的文件,文件都是根据partition排好序的。
这个过程不需要开辟很大内存,也不需要频繁序列化反序列化,也不需要比较这种比较损耗资源的操作。
对于ResultTask来说:
它的最终返回结果就是func函数的计算结果。
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
func(context, rdd.iterator(partition, context))
}
5.获取执行结果
对于Executor的计算结果,会根据结果有着不同的策略:
1.对于生成的结果大小大于1G,则结果直接丢弃,该配置项可以通过maxResultSize来配置
2.对于生成结果大小在min(1m,128MB)-1G之间的数据,会把该结果以taskId为编号存入到blockManager中,然后把该编号通过netty发送给driverEndpoint端点,该阈值是netty框架传输的最大值128MB和配置的最大maxDirectResultSize的值(默认1MB)的最小值。
3.对于生成结果在0-1MB的数据,通过netty直接发送到driverEndpoint端点.
执行完任务的run方法对于Executor的计算结果的处理如下
// directSend = sending directly back to the driver
val serializedResult: ByteBuffer = {
if (maxResultSize > 0 && resultSize > maxResultSize) {
logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize > maxDirectResultSize) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId,
new ChunkedByteBuffer(serializedDirectResult.duplicate()),
StorageLevel.MEMORY_AND_DISK_SER)
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
} else {
logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
serializedDirectResult
}
}
....中间部分省略
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
任务执行完毕后,TaskRunner会将任务的执行结果发送给driverEndpoint端点,该端点会转发给TaskScheduler的statusUpdate方法进行处理,在该方法中对于不同的任务状态有着不同的处理。
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
var reason: Option[ExecutorLossReason] = None
synchronized {
try {
taskIdToTaskSetManager.get(tid) match {
case Some(taskSet) =>
if (state == TaskState.LOST) {
// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
// where each executor corresponds to a single task, so mark the executor as failed.
val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
"taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
if (executorIdToRunningTaskIds.contains(execId)) {
reason = Some(
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
removeExecutor(execId, reason.get)
failedExecutor = Some(execId)
}
}
if (TaskState.isFinished(state)) {
cleanupTaskState(tid)
taskSet.removeRunningTask(tid)
if (state == TaskState.FINISHED) {
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
}
case None =>
logError(
("Ignoring update with state %s for TID %s because its task set is gone (this is " +
"likely the result of receiving duplicate task finished status updates) or its " +
"executor has been marked as failed.")
.format(state, tid))
}
} catch {
case e: Exception => logError("Exception in statusUpdate", e)
}
}
// Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor.isDefined) {
assert(reason.isDefined)
dagScheduler.executorLost(failedExecutor.get, reason.get)
backend.reviveOffers()
}
}
statusUpdate对于不同的任务状态的处理方式如下:
1.如果类型为Taskstate.Finished,那么会调用TaskResultGetter的enquenceSuccessfulTask方法进行处理,该方法的主要是根据TaskResult的发送方式去做出相应的处理,如果是indirectTaskResult,就根据blockid获取结果,如果是directTaskResult,那么结果就无需远程获取了(因为直接发送到driverEndpoint端点了,不需要从blockmanager去拉取数据)。
2.如果类型是Taskstate.failed或者Taskstate.killed,或者Taskstate.lost,调用TaskResultGetter的enquenceFailedTask进行处理,对于Taskstate.lost,还需要将其所在的executor标记为failed,并且根据更新后的executor去重新调度。
enquenceSuccessfulTask方法最终会调用TaskScheduler的handleSuccessfulTask方法,最终调用DAGSchedule的handleTaskCompletion方法。
还记得之前说过shufflewriter阶段写入到blockManager中时,最终返回给DAGScheduler是一个MapStatus对象吗,该对象会被序列化存入到indirectTaskResult和directTaskResult中,而handleTaskCompletion方法就会获取这个结果,并把mapStatus注册到MapOutputTrackerMaster中,从而完成ShuffleMapTask的处理。
DAGScheduler的handleTaskCompletion方法匹配ShuffleMapTask的处理过程如下:
case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
if (stageIdToStage(task.stageId).latestInfo.attemptNumber == task.stageAttemptId) {
shuffleStage.pendingPartitions -= task.partitionId
}
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
} else {
mapOutputTracker.registerMapOutput(
shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
shuffleStage.pendingPartitions -= task.partitionId
}
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
markStageAsFinished(shuffleStage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
logInfo("waiting: " + waitingStages)
logInfo("failed: " + failedStages)
mapOutputTracker.incrementEpoch()
clearCacheLocs()
if (!shuffleStage.isAvailable) {
logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
") because some of its tasks had failed: " +
shuffleStage.findMissingPartitions().mkString(", "))
submitStage(shuffleStage)
} else {
markMapStageJobsAsFinished(shuffleStage)
submitWaitingChildStages(shuffleStage)
}
}
}
而如果任务是ResultTask,会判断该作业是否完成,如果完成,则标记该作业以及完成,清除作业依赖的资源并发送消息给系统监听总线告知作业完成。