我们写spark应用程序的时候,第一步都是创建一个SparkContext,SparkContext初始化时主要是初始化TaskScheduler、DAGScheduler、SparkUI
TaskScheduler初始化流程
以下是对应上图的源码
一、入口是createTaskScheduler,从代码注释可以看出,主要是根据SparkConf中设置的"master"来创建TaskSchedulerImpl和SparkDeploySchedulerBackend;
这个是stand-alone模式,当然还有其他模式
TaskSchedulerImpl:
1、底层通过操作一个SchedulerBackend,针对不同种类的cluster(standalone,yarn,mesos),调度task
2、可以通过使用一个LocalBackend,并且将isLocal参数设置为true,来在本地模式下工作
3、负责处理一些通用的逻辑,比如说,决定多个job的调度顺序,启动推测任务执行
4、客户端应用首先调用它的initialize()方法和start(),然后通过runTasks()方法提交task sets
通过TaskSchelulerImpl的initialize()初始化创建SchedulerPool
二:TaskSchedulerImpl.start() -> SparkDeploySchedulerBackend.start()
三:通过ApplicationDescription创建AppClient
ApplicationDescription:
1、描述当前执行的application的一些情况,包括applicaiton最大需要多少cpu core,每个slave上需要多少内存
AppClient:
1、是一个接口
2、负责为application与spark集群进行同行
3、会接收spark master的url,以及一个applicationDescription,和一个集群事件的监听器,以及各种事件发生时的监听器回调
DAGScheduler
DAGScheduler:
1、实现了面向stage的调度机制的高层次的调度层,它会为每个job计算一个stage的DAG(有向无环图),追踪RDD和stage的输出是否被物化了(物化是指,写入磁盘或者内存等地方),并且寻找一个最少消耗(最优)调度机制来运行job;
2、会将stage作为tasksets提交到底层的TaskSchedulerImpl上,并在集群上运行他们(task)
3、负责决定运行每个task的最佳位置,基于当前的缓存状态,将这些最佳位置提交给底层的TaskSchedulerImpl。
4、处理由于shuffle输出文件丢失导致的失败,在这种情况下,旧的stage可能就会被重新提交,一个stage内部的失败,如果不是由于shuffle文件丢失所导致的,会被TaskScheduler处理,它会多次重试每一个task,直到最后,实在不行了,才会去取消整个stage
SparkUI
注:本文的分析是基于以下spark-core版本
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.0</version>
</dependency>