简介
spark作为实时及离线合为一体的大数据计算框架,由于其出色的批处理能力及高效的流计算能力,因此在大数据中依旧有着不可撼动的地位。这篇文章来记录一下spark Job作业是如何被调度起来的。
RDD
众所周知RDD是spark的基础,是Spark数据管理单元的高级抽象,RDD之间的流转变化形成了了我们的数据计算模式及过程,首先来看一下RDD的组成:
一个RDD主要由四大部分组成:
- dependencies:当前RDD所依赖的上一个RDD,是RDD计算流转的关键,从一个RDD transform到另一个RDD,也就有了依赖关系,任何一个RDD可以通过其dependencies及compute算子计算得出。 (计算)
- computing:算子,用于描述从一个或多个RDD到另外一个RDD的计算过程。(计算)
- partition(Data):一个RDD被分成多个partition(由partitioner决定),每个partition存储着一个RDD的实际数据,而这些partition数据分散在不同的executor,这些数据既可以存储在内存中,也可以存储在磁盘中。(存储)
- checkpoint:在大数据框架中最重要的特性就是容错性,大数据节点中的executor会因各种原因down掉,所以就必须有一个好的机制处理这种问题,而checkpoint就是用于提高容错性的关键所在,一个Executor down机后,会优先选择从checkpoint恢复,如果没有checkpoint则开始从dependencies递归重算。(容错)
有了Dependencies及Computing这两个概念之后,RDD之间的数据DAG流转有了经济基础,但是想要基于此要搞出一些新花样,仅仅有经济基础是不够的,还得要有丰富的上层建筑,所以Spark又提出了Transformation及Action两个上层建筑的概念,伴随着的是stage及Job两种模式。如果让用户直接去操作Dependencies的话未免有点太过于复杂,所以Spark内置了很多通用的基础算子,诸如map, flatMap, reduce, aggegrate等等,这些算子函数会生成对应的RDD之类,其主要定义了dependencies及computing算子;为了更好的描述spark作业的实际运行顺序,又将这些算子分为两大类(Narrow Transformation & Wide Transformation),其中Narrow都是一对一依赖,不需要shuffle,两个RDD中的partition数是一样的,对应的映射关系也是固定不变的,RDD的计算能在同一个executor中运算,因此他们都属于同一个stage;而对于Wide Transformation,由于一个RDD会依赖多个RDD,所以不能再同一个executor中通过计算dependencies得到当前RDD,所以需要产生一个新的stage。有了这些算子之后,我们就有了一张完整的数据计算DAG图,但是Spark是懒加载的,虽然通过这些Transformation算子描绘好了一幅DAG图,但是并不会立刻执行,而需要一个Action操作来触发整个DAG的执行。
我们以rdd.foreach执行代码流程来简单分析。
// RDD.scala
def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}
// SparkContext.scala
def runJob[T, U: ClassTag](
rdd: RDD[T], // result rdd
func: (TaskContext, Iterator[T]) => U, // 对RDD各个partition中的数据一次执行该函数
partitions: Seq[Int], // 哪些partition需要计算
resultHandler: (Int, U) => Unit): Unit = { // 获取到最终结果后调用该handler (partition, result)
// ... some checks
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) // 启动作业,会post 一个 JobSubmitted Event
progressBar.foreach(_.finishAll())
rdd.doCheckpoint() // 生成checkpoint
}
// DagScheduler.scala
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
...
}
private[scheduler] def handleJobSubmitted(...) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
... // Exception handle
}
... // some logs and status update
submitStage(finalStage)
}
/**
* Create a ResultStage associated with the provided jobId.
*/
private def createResultStage(...): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
val parents = getOrCreateParentStages(rdd, jobId) // DFS生成所有子stage,并返回当前result stage依赖的所有shuffle dependencies。
val id = nextStageId.getAndIncrement() // 生成stage id
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) // 生成stage
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
private[scheduler] def getShuffleDependencies( // 通过BFS的方式找到Wide Transformation的源头dependencies
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}
// getOrCreateShuffleMapStage => createShuffleMapStage => getOrCreateShuffleMapStage ... // DFS生成所有Stage
/** Submits stage, but first recursively submits any missing parents. */
// 会跑起来第一个没有父依赖的stage,然后通过 completion event来执行后续的stage
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get) // 没有missing parent 该stage可以跑
} else {
for (parent <- missing) { // 依赖的父stage由于节点失败而missing,需要重跑
submitStage(parent)
}
waitingStages += stage // 将stage 加入waitingStages
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
// 当event handler收到task结束的event时,从waitingStages获取一个stage执行
如此一个spark job就被执行起来了