Spark篇运行时消息通信

Spark运行时消息通信

这里主要说明一下,当你launch一个Application之后,启动Main方法时,初始化SparkContext到注册Application,注册Executor以及TaskSchedulerImpl分配完Task之后交由Executor来进行执行的这个过程。这里我都是以Standalone形式就行阅读代码的,其他的如Yarn, Mesos, Local需看对应的代码部分。其时序图如下:


消息通信.png

流程

(1)执行应用程序首先执行Main方法,启动SparkContext, 在SparkContext初始化中会先实例化StandaloneScheduleBackend对象
StandaloneSchedulerBackend extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) with StandaloneAppClientListener,在该对象启动start过程中会继承DriverEndpoint和创建AppClient的ClientEndpoint(实际上是StandaloneAppClient)的两个终端点。SparkContext初始化会先new TaskSchedulerImpl,其会调用backend.start(),实际上 就是调用了CoarseGrainedSchedulerBackend的start方法,再次方法中:

override def start() {
    val properties = new ArrayBuffer[(String, String)]
    for ((key, value) <- scheduler.sc.conf.getAll) {
      if (key.startsWith("spark.")) {
        properties += ((key, value))
      }
    }

    // TODO (prashant) send conf instead of properties
    driverEndpoint = createDriverEndpointRef(properties)
  }

  protected def createDriverEndpointRef(
      properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
    rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
  }

  protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
    new DriverEndpoint(rpcEnv, properties)
  }

可以知道创建了Driver的终端点并进行了注册。而StandaloneSchedulerBackend extends CoarseGrainedSchedulerBackend继承关系导致了StandaloneSchedulerBackend会继承driverEndpoint。
后面执行了

client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()

创建了StandaloneAppClient

(2)在StandaloneAppClient通过tryResisterAllMasters来实现Application向Master的注册

(3)当Master收到注册请求之后进行处理, 注册完毕之后会发送注册成功消息给StandaloneApplClient, 然后调用startExecutorsOnWorkers方法运行应用。
(4)Executor注册过程
a)调用startExecutorsOnWorkers会分配资源来运行应用程序, 调用allcateWorkerResourceToExecutors实现在Worker中启动Executor,allcateWorkerResourceToExecutors里面有个lanchExecutor方法,这里面会调用send(LaunchTask)给Worker, Worker收到后会实例化ExecutorRunner对象,在ExecutorRunner创建进程生成器ProcessBuilder,然后此生成器根据ApplicationInfo中的command创建CoarseGrainedExecutorBackend对象,也就是Executor运行的容器, 最后Worker向Master发送ExecutorStateChanged通知Executor容器创建完毕,

b)进程生成器创建CoarseGrainedExecutorBackend对象时,调用了start方法,其半生对象会注册Executor终端点,会触发onStart方法,会发送注册Executor消息RegisterExecutor到Driverpoint,如果注册成功Driverpoint会返回RegisteredExecutor消息给ExecutorEndppoint。当ExecutorEndppoint实际上是CoarseGrainedExecutorBackend收到注册成功, 则会创建Executor对象。

c)DriverEndpoint会创建一个守护线程,监听是否有taskSets过来

private val reviveThread =
      ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")

    override def onStart() {
      // Periodically revive offers to allow delay scheduling to work
      val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")

      reviveThread.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryLogNonFatalError {
          Option(self).foreach(_.send(ReviveOffers))
        }
      }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
    }

这时候会去调用makeOffers()

d)makeoffers会调用launchTasks

// Make fake resource offers on all executors
    private def makeOffers() {
      // Filter out executors under killing
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
      val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq
      launchTasks(scheduler.resourceOffers(workOffers))
    }

进而转向

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

进而CoarseGrainedExecutorBackend终端点(ExecutorEndpoint)接收到LaunchTask

case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }

launchTask会初始化TaskRunner(它实际上是个Runnable对象),然后通过threadPool将其加入线程池执行。

def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer): Unit = {
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }

TaskRunner的run方法体内就会执行Task, 当执行完毕时会向Driver汇报此Task在Executor上执行完毕了。

execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

------------------------------------------------------------------------------------
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
    val msg = StatusUpdate(executorId, taskId, state, data)
    driver match {
      case Some(driverRef) => driverRef.send(msg)
      case None => logWarning(s"Drop $msg because has not yet connected to driver")
    }
  }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容