Spark系列(十)TaskSchedule工作原理 - 会飞的纸盒 - 博客园
http://www.cnblogs.com/jianyuan/p/Spark%E7%B3%BB%E5%88%97%E4%B9%8BTaskSchedule%E5%B7%A5%E4%BD%9C%E5%8E%9F%E7%90%86.html
工作原理图
源码分析:
1、submitTasks
在submitTasks方法中最后调用backend.reviveOffers()进行下一步的task调度分配
1
override def submitTasks(taskSet: TaskSet) {
2
val tasks = taskSet.tasks
3
logInfo("****Adding**** ****task**** ****set**** ****" + taskSet.id + "**** ****with**** ****" + tasks.length + "**** ****tasks****")
4
this.synchronized {
5
// 为taskSet创建TaskSetManager
6
// TaskSetManager用于对TaskSet的执行状况进行管理和监控
7
val manager = createTaskSetManager(taskSet, maxTaskFailures)
8
// 将manager加入activeTaskSets缓存中
9
activeTaskSets(taskSet.id) = manager
10
// 将manager加入schedulableBuilder中
11
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
12
13
if (!isLocal && !hasReceivedTask) {
14
starvationTimer.scheduleAtFixedRate(new TimerTask() {
15
override def run() {
16
if (!hasLaunchedTask) {
17
logWarning("****Initial**** ****job**** ****has**** ****not**** ****accepted**** ****any**** ****resources;**** ****" +
18
"****check**** ****your**** ****cluster**** ****UI**** ****to**** ****ensure**** ****that**** ****workers**** ****are**** ****registered**** ****" +
19
"****and**** ****have**** ****sufficient**** ****resources****")
20
} else {
21
this.cancel()
22
}
23
}
24
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
25
}
26
hasReceivedTask = true
27
}
28
backend.reviveOffers()
29
}
2、makeOffers
调用过程:收到reviveOffers消息后调用makeOffers方法。
所属包:org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
1
def makeOffers() {
2
// resourceOffers方法用于实现任务分配算法,将各个task分配到executor上
3
// launchTasks方法用于将所分配的task发送到对应的executor中执行
4
// WorkerOffer封装了Application可用的资源
5
launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
6
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
7
}.toSeq))
8
}
3、resourceOffers
1
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
2
// Mark each slave as alive and remember its hostname
3
// Also track if new executor is added
4
var newExecAvail = false
5
// 遍历Application可用的资源FreeCores获取节点主机信息
6
for (o <- offers) {
7
executorIdToHost(o.executorId) = o.host
8
activeExecutorIds += o.executorId
9
if (!executorsByHost.contains(o.host)) {
10
executorsByHost(o.host) = new HashSetString
11
executorAdded(o.executorId, o.host)
12
newExecAvail = true
13
}
14
for (rack <- getRackForHost(o.host)) {
15
hostsByRack.getOrElseUpdate(rack, new HashSetString) += o.host
16
}
17
}
18
19
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
20
val shuffledOffers = Random.shuffle(offers)
21
// Build a list of tasks to assign to each worker.
22
val tasks = shuffledOffers.map(o => new ArrayBufferTaskDescription)
23
// executor可用的cores序列(每个executor最多可用多少个cores)
24
val availableCpus = shuffledOffers.map(o => o.cores).toArray
25
// rootPool中取出排好序的TaskSet,TaskScheduler初始化时,创建完TaskSchedulerImpl、
26
// SparkDeploySchedulerBackend之后,会执行initialize()方法,在该方法中会创建一个调度池,
27
// 所有提交的TaskSet先会放入该调度池,后面执行task分配分配算法时就从该调度池中取出排好序的TaskSet
28
val sortedTaskSets = rootPool.getSortedTaskSetQueue
29
for (taskSet <- sortedTaskSets) {
30
logDebug("****parentName:**** ****%s,**** ****name:**** ****%s,**** ****runningTasks:**** ****%s****".format(
31
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
32
if (newExecAvail) {
33
taskSet.executorAdded()
34
}
35
}
36
37
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
38
// of locality levels so that it gets a chance to launch local tasks on all of them.
39
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
40
41
// 本地化级别
42
// PROCESS_LOCAL:进程本地化,rdd的partition和task在同一个executor中(最快)
43
// NODE_LOCAL:rdd的partition和task,不在一个同一个executor中,但在同一个节点中
44
// NO_PREF:没有本地化级别
45
// RACK_LOCAL:机架本地化,至少rdd的partition和task在一个机架中
46
// ANY:任意本地化级别
47
48
// 按照从最小本地化级别到最大本地化级别的顺序,尝试把taskSet中的task在executor上启动,
49
// 直到task在某种本地化级别下task全部启动
50
var launchedTask = false
51
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
52
do {
53
launchedTask = resourceOfferSingleTaskSet(
54
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
55
} while (launchedTask)
56
}
57
58
if (tasks.size > 0) {
59
hasLaunchedTask = true
60
}
61
return tasks
62
}
4、resourceOfferSingleTaskSet
1
private def resourceOfferSingleTaskSet(
2
taskSet: TaskSetManager,
3
maxLocality: TaskLocality,
4
shuffledOffers: Seq[WorkerOffer],
5
availableCpus: Array[Int],
6
tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
7
var launchedTask = false
8
for (i <- 0 until shuffledOffers.size) {
9
val execId = shuffledOffers(i).executorId
10
val host = shuffledOffers(i).host
11
// 当前executor的可用cpu数量至少大于每个task要使用的cpu数量(默认是1)
12
if (availableCpus(i) >= CPUS_PER_TASK) {
13
try {
14
// 查找在executor上用那种本地化级别启动taskSet中的task
15
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
16
// 给指定的executor加上要启动task
17
tasks(i) += task
18
// 更新分配信息
19
val tid = task.taskId
20
taskIdToTaskSetId(tid) = taskSet.taskSet.id
21
taskIdToExecutorId(tid) = execId
22
executorsByHost(host) += execId
23
availableCpus(i) -= CPUS_PER_TASK
24
assert(availableCpus(i) >= 0)
25
launchedTask = true
26
}
27
} catch {
28
case e: TaskNotSerializableException =>
29
logError(s"****Resource**** ****offer**** ****failed,**** ****task**** ****set**** ****${taskSet.name}**** ****was**** ****not**** ****serializable****")
30
// Do not offer resources for this task, but don't throw an error to allow other
31
// task sets to be submitted.
32
return launchedTask
33
}
34
}
35
}
36
return launchedTask
37
}
5、launchTasks
1
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
2
for (task <- tasks.flatten) {
3
// 将每个executor要执行的task信息进行序列化
4
val ser = SparkEnv.get.closureSerializer.newInstance()
5
val serializedTask = ser.serialize(task)
6
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
7
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
8
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
9
try {
10
var msg = "****Serialized**** ****task**** ****%s:%d**** ****was**** ****%d**** ****bytes,**** ****which**** ****exceeds**** ****max**** ****allowed:**** ****" +
11
"****spark.akka.frameSize**** ****(%d**** ****bytes)**** ****-**** ****reserved**** ****(%d**** ****bytes).**** ****Consider**** ****increasing**** ****" +
12
"****spark.akka.frameSize**** ****or**** ****using**** ****broadcast**** ****variables**** ****for**** ****large**** ****values.****"
13
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
14
AkkaUtils.reservedSizeBytes)
15
taskSet.abort(msg)
16
} catch {
17
case e: Exception => logError("****Exception**** ****in**** ****error**** ****callback****", e)
18
}
19
}
20
}
21
else {
22
val executorData = executorDataMap(task.executorId)
23
// 在对应的executor的资源中减去要使用的cpu资源
24
executorData.freeCores -= scheduler.CPUS_PER_TASK
25
// 向executor发送launchTask消息来启动task
26
executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
27
}
28
}
29
}
说明:
1、resourceOffer方法功能:判断executor本地化级别的等待时间是否在一定范围内,如果在就认为task使用本地化级别可以在executor上启动。
2、TaskSetManager功能:对一个单独的TaskSet的任务进行调度,该类负责追踪每个task,如果task失败会负责重试,知道超过重试次数的限制,且会通过延迟调度为该TaskSet处理本地化调度机制,它主要接口是resourceOffer,在这个接口中,TaskSet会希望在一个节点上运行一个任务,并接受任务的状态改变消息,来知道它负责的task的状态改变了。
3、本地化级别种类: PROCESS_LOCAL:进程本地化,rdd的partition和task在同一个executor中(最快) NODE_LOCAL:rdd的partition和task,不在一个同一个executor中,但在同一个节点中 NO_PREF:没有本地化级别 RACK_LOCAL:机架本地化,至少rdd的partition和task在一个机架中 ANY:任意本地化级别
分类: Spark