spark源码阅读之executor模块①

本文基于Spark 1.6.3源码,采用一步一步深入的方式来展开阅读,本文是为了纪录自己在阅读源码时候的思路,看完一遍真的很容易忘记,写一篇文章梳理一遍可以加深印象。

SparkContext:Spark应用的入口

SparkContext是用户应用于Spark集群交互的主要接口,所以把SparkContext作为入口来展开executor的源码阅读,主要针对standaone模式下的executor模块。

SparkContext通过调用createTaskScheduler()方法来创建两个重要的类:TaskScheduler和SchedulerBackend

    // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    // 在DAGScheduler的构造中持有TaskScheduler的引用之后,开始TaskScheduler
    _taskScheduler.start()

两个重要的类

TaskScheduler类:低级的task调度接口,仅有一个实现类为:TaskSchedulerImpl,这个类的作用是为高级task调度接口DAGScheduler划分好的stage分配TaskSet,然后提交给Spark集群,处理Task的运行消息,并将event返回给DAGScheduler,这里可以看出DAGScheduler实例化后持有了TaskSchedulerImpl的引用,有关DAGScheduler与TaskSchedulerImpl配合的调度机制,在后面的文章中展开。

SchedulerBackend类:调度的后台接口,实现类有很多,根据传入的master url采用模式匹配的方式来确定需要什么实现类,主要的作用是当有新的task或者资源变动时找到合适的executor来分配资源,或者是处理从TaskSchedulerImpl发出杀掉Task请求。

在standalone模式中,SchedulerBackend的具体实现类为:SparkDeploySchedulerBackend,通过以下createTaskScheduler()方法中的截选代码可以了解这个过程:

case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)

scheduler.initialize(backend)表明了TaskSchedulerImpl持有backend的引用,且在这个方法里初始化了用于FIFO和FAIR两种调度模式的容器和池,这部分放到调度模块展开。

至此为止,两个重要的类的实例已经构造完毕:TaskSchedulerImpl和SparkDeploySchedulerBackend

driverEndpoint和appClient的初始化

紧接着,调用了TaskSchedulerImpl的start()方法,在start()方法中首先调用了backend的start()方法

override def start() {
    backend.start()   //调用SchedulerBackend的start()方法

    // 如果开启了推测执行功能的话,就开启一条speculation线程来计算,参数是通过配置文件的参数来传入,或者使用默认值
    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          checkSpeculatableTasks()
        }
      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }

SparkDeploySchedulerBackend的start()方法首先调用了super的start()方法,这里需要说明的是SparkDeploySchedulerBackend并不是直接继承自SchedulerBackend,而是继承自CoarseGrainedSchedulerBackend,CoarseGrainedSchedulerBackend继承自SchedulerBackend
这样的话,最后其实调用的是CoarseGrainedSchedulerBackend的start()方法,代码如下:

override def start() {
    val properties = new ArrayBuffer[(String, String)]
    for ((key, value) <- scheduler.sc.conf.getAll) {
      if (key.startsWith("spark.")) {
        properties += ((key, value))
      }
    }

    // TODO (prashant) send conf instead of properties
    driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
  }

start方法中注册了DriverEndpoint,调用createDriverEndpoint方法创建了一个DriverEndpoint的实例,至此DriverEndpoint创建完成,DriverEndpoint在实例化的过程中,会去调用生命周期中onstart方法,在onStart方法中会周期性的执行以下代码:Option(self).foreach(_.send(ReviveOffers))
即自己给自己发送ReviveOffers的消息,收到ReviveOffers消息后会调用makeOffers方法选出合适executor然后分配资源。

SparkDeploySchedulerBackend在start方法中,还创建了AppClient实例:

    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
      command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
    client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()

AppClient实例封装了关于application的一些信息ApplicationDescription,如appName,maxCores,executorMemory等
client.start()方法中注册了AppClient中的通信端ClientEndpoint

def start() {
    // Just launch an rpcEndpoint; it will call back into the listener.
    // 注册appClient的rpcEndpoint
    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  }

在注册ClientEndpoint的过程中,调用其生命周期中的onstart方法

override def onStart(): Unit = {
      try {
        registerWithMaster(1)
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          stop()
      }
    }

至此,DriverEndpoint和AppClient都已经实例化完成
DriverEndpoint已经准备好了,一旦有新的application提交或是集群的资源发生了变化,即调用makeoffers方法去分配资源;
AppClient在注册ClientEndpoint的过程中,将要调用registerWithMaster将application注册请求提交给Master。

registerWithMaster之后的剖析将会放在下一篇文章里继续深入。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容