Spark Task
Define of Task
Task是Spark中执行具体计算任务的基本单位(a unit of execution),task可以分为ShuffleMapTask and ResultTask
;Spark Job的最后一个stage包含一组ResultTask,其余的的stage包含ShuffleMapTask;ResultTask在计算完成之后会将结果返回到Driver,而ShuffleMapTask则是在计算完成之后将结果(根据RDD的Partitioner)划分到不同的buckes中。
private[spark] abstract class Task[T](
val stageId: Int, // task 所属stage ID
val stageAttemptId: Int, // task 所属 stage's attempt id
val partitionId: Int, // index of the number in the RDD
@transient var localProperties: Properties = new Properties,// copy of thread-local properties set by the user on the driver side
// The default value is only used in tests.
serializedTaskMetrics: Array[Byte] =
SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array(),
val jobId: Option[Int] = None,
val appId: Option[String] = None,
val appAttemptId: Option[String] = None) extends Serializable
TaskSet
/**
* A set of tasks submitted together to the low-level TaskScheduler, usually representing
* missing partitions of a particular stage.
*/
private[spark] class TaskSet(
val tasks: Array[Task[_]],
val stageId: Int,
val stageAttemptId: Int,
val priority: Int,
val properties: Properties) {
val id: String = stageId + "." + stageAttemptId
override def toString: String = "TaskSet " + id
}
如果每个TaskSet中的Task失败次数达到maxTaskFailures,那么这个TaskSet都会被标记为失败,需要重新调度。
State of Task
在Spark中Task共有7中状态,大体上可以分为3类(Launching,running,[finished,failed,killed,lost]),在Spark源码中定义如下:
private[spark] object TaskState extends Enumeration {
val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value
private val FINISHED_STATES = Set(FINISHED, FAILED, KILLED, LOST)
type TaskState = Value
def isFailed(state: TaskState): Boolean = (LOST == state) || (FAILED == state)
def isFinished(state: TaskState): Boolean = FINISHED_STATES.contains(state)
}