前言
Spark数据本地化即移动计算而不是移动数据,而现实又是残酷的,不是想要在数据块的地方计算就有足够的资源提供,为了让task能尽可能的以最优本地化级别(Locality Levels)来启动,Spark的延迟调度应运而生,资源不够可在该Locality Levels对应的限制时间内重试,超过限制时间后还无法启动则降低Locality Levels再尝试启动……
本地化级别(Locality Levels)
- PROCESS_LOCAL:进程本地化,代码和数据在同一个进程中,也就是在同一个executor中;计算数据的task由executor执行,数据在executor的BlockManager中,性能最好
- NODE_LOCAL:节点本地化,代码和数据在同一个节点中;比如说,数据作为一个HDFS block块在节点上,而task在节点上某个executor中运行;或者是数据和task在一个节点上的不同executor中,数据需要在进程间进行传输
- NO_PREF:对于task来说,数据从哪里获取都一样,没有好坏之分,比如说SparkSQL读取MySql中的数据
- RACK_LOCAL:机架本地化,数据和task在一个机架的两个节点上,数据需要通过网络在节点之间进行传输
- ANY:数据和task可能在集群中的任何地方,而且不在一个机架中,性能最差
这些Task的本地化级别其实描述的就是计算与数据的位置关系,这个最终的关系是如何产生的呢?接下来对其来龙去脉进行详细的讲解。
DAGScheduler提交tasks
DAGScheduler对job进行stage划分完后,会通过submitMissingTasks方法将Stage以TaskSet的形式提交给TaskScheduler,看看该方法关于位置优先的一些代码:
...
// 获取还未执行或未成功执行分区的id
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
...
// 通过getPreferredLocs方法获取rdd该分区的优先位置
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
val job = s.activeJob.get
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
}
...
//通过最优位置等信息构建Task
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
}
case stage: ResultStage =>
val job = stage.activeJob.get
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)
}
}
} catch {
}
...
//将所有task以TaskSet的形式提交给TaskScheduler
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
注意这里提交的TaskSet里面的Task已经包含了该Task的优先位置,而该优先位置是通过getPreferredLocs方法获取,可以简单看看其实现:
private def getPreferredLocsInternal(
rdd: RDD[_],
partition: Int,
visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
...
// 从缓存中获取
val cached = getCacheLocs(rdd)(partition)
if (cached.nonEmpty) {
return cached
}
// 直接通过rdd的preferredLocations方法获取
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
// 递归从parent Rdd获取(窄依赖)
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
case _ =>
}
Nil
}
无论是通过哪种方式获取RDD分区的优先位置,第一次计算的数据来源肯定都是通过RDD的preferredLocations方法获取的,不同的RDD有不同的preferredLocations实现,但是数据无非就是在三个地方存在,被cache到内存、HDFS、磁盘,而这三种方式的TaskLocation都有具体的实现:
//数据在内存中
private [spark] case class ExecutorCacheTaskLocation(override val host: String, executorId: String)
extends TaskLocation {
override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
}
//数据在磁盘上(非HDFS上)
private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {
override def toString: String = host
}
//数据在HDFS上
private [spark] case class HDFSCacheTaskLocation(override val host: String) extends TaskLocation {
override def toString: String = TaskLocation.inMemoryLocationTag + host
}
所以,在实例化Task的时候传的优先位置就是这三种的其中一种。
Locality levels生成
DAGScheduler将TaskSet提交给TaskScheduler后,TaskScheduler会为每个TaskSet创建一个TaskSetMagager来对其Task进行管理,在初始化TaskSetMagager的时候就会通过computeValidLocalityLevels计算该TaskSet包含的locality levels:
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
levels += PROCESS_LOCAL
}
if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
if (!pendingTasksWithNoPrefs.isEmpty) {
levels += NO_PREF
}
if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
levels += RACK_LOCAL
}
levels += ANY
logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
levels.toArray
}
程序会依次判断该TaskSetMagager是否包含各个级别,逻辑都类似,我们就细看第一个,pendingTasksForExecutor的定义与添加:
// key为executorId,value为在该executor上有缓存的数据块对应的taskid数组
private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
...
//遍历所有该TaskSet的所有task进行添加
for (i <- (0 until numTasks).reverse) {
addPendingTask(i)
}
...
private def addPendingTask(index: Int) {
for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
case e: HDFSCacheTaskLocation =>
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =>
for (e <- set) {
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
}
case _ =>
}
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
for (rack <- sched.getRackForHost(loc.host)) {
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}
if (tasks(index).preferredLocations == Nil) {
pendingTasksWithNoPrefs += index
}
allPendingTasks += index // No point scanning this whole list to find the old task there
}
注意这里的addPendingTask方法,会遍历该TaskSetMagager管理的所有Task的优先位置(上文已解析),若是ExecutorCacheTaskLocation (缓存在内存中)则添加对应的executorId和taskId到pendingTasksForExecutor,同时还会添加到低级别需要的pendingTasksForHost、pendingTasksForRack中,说明假设一个 task 的最优本地性为 X,那么该 task 同时也具有其他所有本地性比X差的本地性。
回到上面的本地性级别判断:
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
levels += PROCESS_LOCAL
}
只要是看第三个判断 pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive())),其中,pendingTasksForExecutor.keySet就是上面说明的存在有与task对应的数据块被缓存在executor中的executorId,sched.isExecutorAlive()就是判断参数中的 executor id 当前是否 active。所以整行代码意思是存在有与task对应的数据块被缓存在executor中的executors是否有active的,若有则添加PROCESS_LOCAL级别到该TaskSet的LocalityLevels中。
后面的其他本地性级别是同样的逻辑就不细讲了,区别是如判断存在有与task对应的数据块在某些节点中的hosts是否有Alive的等……
至此,TaskSet包含的LocalityLevels就已经计算完。
延迟调度策略
若spark跑在yarn上,也有两层延迟调度,第一层就是yarn尽量将spark的executor分配到有数据的nodemanager上,这一层没有做到data locality,到spark阶段,data locality更不可能了。
延迟调度的目的是为了较小网络及IO开销,在数据量大而计算逻辑简单(task执行时间小于数据传输时间)的情况下表现明显。
Spark调度总是会尽量让每个task以最高的本地性级别来启动,当一个task以X本地性级别启动,但是该本地性级别对应的所有节点都没有空闲资源而启动失败,此时并不会马上降低本地性级别启动而是在某个时间长度内再次以X本地性级别来启动该task,若超过限时时间则降级启动。
TaskSetMagager会以某一种TaskSet包含的本地性级别遍历每个可用executor资源尝试在该executor上启动当前管理的tasks,那么是如何决定某个task能否在该executor上启动呢?首先都会通过getAllowedLocalityLevel(curTime)方法计算当前TaskSetMagager中未执行的tasks的最高本地级别:
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
// Remove the scheduled or finished tasks lazily
def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
var indexOffset = pendingTaskIds.size
while (indexOffset > 0) {
indexOffset -= 1
val index = pendingTaskIds(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return true
} else {
pendingTaskIds.remove(indexOffset)
}
}
false
}
// Walk through the list of tasks that can be scheduled at each location and returns true
// if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
// already been scheduled.
def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
val emptyKeys = new ArrayBuffer[String]
val hasTasks = pendingTasks.exists {
case (id: String, tasks: ArrayBuffer[Int]) =>
if (tasksNeedToBeScheduledFrom(tasks)) {
true
} else {
emptyKeys += id
false
}
}
// The key could be executorId, host or rackId
emptyKeys.foreach(id => pendingTasks.remove(id))
hasTasks
}
while (currentLocalityIndex < myLocalityLevels.length - 1) {
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
}
if (!moreTasks) {
// This is a performance optimization: if there are no more tasks that can
// be scheduled at a particular locality level, there is no point in waiting
// for the locality wait timeout (SPARK-4939).
lastLaunchTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
currentLocalityIndex += 1
} else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
// Jump to the next locality level, and reset lastLaunchTime so that the next locality
// wait timer doesn't immediately expire
lastLaunchTime += localityWaits(currentLocalityIndex)
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
s"${localityWaits(currentLocalityIndex)}ms")
currentLocalityIndex += 1
} else {
return myLocalityLevels(currentLocalityIndex)
}
}
myLocalityLevels(currentLocalityIndex)
}
循环条件里的currentLocalityIndex是getAllowedLocalityLevel 前一次被调用返回的LocalityIndex在 myLocalityLevels 中的索引,初始值为0,myLocalityLevels则是TaskSetMagager所有tasks包含的本地性级别。
- 若myLocalityLevels(currentLocalityIndex)对应的level是否还有未执行的task,通过moreTasksToRunIn方法获取(逻辑很简单:执行完及正在执行的task都从对应列表中移除,有未执行过的task直接返回true)
- 若没有,则currentLocalityIndex 加一继续循环(降级)
- 若有,则先判断当前时间与上次以该级别启动时间之差是否超过了该级别能容忍的时间限制,若未超过,则直接返回对应的LocalityLevel,若超过,则currentLocalityIndex 加一继续循环(降级)
至此,就取出了该TaskSetMagager中未执行的tasks的最高本地性级别(取和maxLocality中级别高的作为最终的allowedLocality)。
最终决定是否在某个executor上启动某个task的是方法dequeueTask(execId, host, allowedLocality)
private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value, Boolean)] =
{
for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL, false))
}
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
for (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
}
...
}
通过TaskLocality.isAllowed方法来保证只以比allowedLocality级别高(可相等)的locality来启动task,因为一个 task 拥有比最优本地性 差的其他所有本地性。这样就保证了能尽可能的以高本地性级别来启动一个task。
优化建议
可用过Spark UI来查看某个job的task的locality level,根据实际情况调整数据本地化的等待时长:
- spark.locality.wait 全局的,适用于每个locality level,默认为3s
- spark.locality.wait.process
- spark.locality.wait.node
- spark.locality.wait.rack