Spark Job 详细执行流程(一)

本文以Spark 1.6 Standalone模式为例,介绍用户提交Spark Job后的Job的执行流程。大体流程如下图所示


Spark集群运行架构

用户提交Job后会生成SparkContext对象,SparkContext向Cluster Manager(在Standalone模式下是Spark Master)申请Executor资源,并将Job分解成一系列可并行处理的task,然后将task分发到不同的Executor上运行,Executor在task执行完后将结果返回到SparkContext。

下面首先介绍SparkContext申请Executor资源的过程,整个过程如下图所示。

Paste_Image.png

整个过程分为8步:

  1. SparkContext创建TaskSchedulerImpl,SparkDeploySchedulerBackend和DAGScheduler
// Create and start the scheduler  
val (sched, ts) = SparkContext.createTaskScheduler(this, master)  
_schedulerBackend = sched  
_taskScheduler = ts  
_dagScheduler = new DAGScheduler(this)  

DAGScheduler负责将Job划分为不同的Stage,并在每个Stage内化为出一系列可并行处理的task,然后将task递交给TaskSchedulerImpl调度。此过程之后详谈。
TaskSchedulerImpl负责通过SparkDeploySchedulerBackend来调度任务(task),目前实现了FIFO调度和Fair调度。注意如果是Yarn模式,则是通过YarnSchedulerBackend来进行调度。

  1. SparkDeploySchedulerBackend创建AppClient,并通过一些回调函数来得到Executor信息
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)  
client.start()  

SparkDeploySchedulerBackend与AppClient间的回调函数如下:

private[spark] trait AppClientListener {  
  def connected(appId: String): Unit  
  
  /** Disconnection may be a temporary state, as we fail over to a new Master. */  
  def disconnected(): Unit  
  
  /** An application death is an unrecoverable failure condition. */  
  def dead(reason: String): Unit  
  
  def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int)  
  
  def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit  
}  
  1. AppClient向Master注册Application
try {  
        registerWithMaster()  
      } catch {  
        case e: Exception =>  
          logWarning("Failed to connect to master", e)  
          markDisconnected()  
          context.stop(self)  
      }  

Applicent通过Akka与Master进行交互得到Executor和Master的信息,然后通过回调SparkDeploySchedulerBackend的函数。

  1. Master向Woker发送LaunchExecutor消息,同时向AppClient发送ExecutorAdded消息
    Master收到RegisterApplication信息后,开始分配Executor资源。目前有两种策略(摘抄原话):There are two modes of launching executors. The first attempts to spread out an application's executors on as many workers as possible, while the second does the opposite (i.e. launch them on as few workers as possible). The former is usually better for data locality purposes and is the default.
case RegisterApplication(description) => {  
      if (state == RecoveryState.STANDBY) {  
        // ignore, don't send response  
      } else {  
        logInfo("Registering app " + description.name)  
        val app = createApplication(description, sender)  
        registerApplication(app)  
        logInfo("Registered app " + description.name + " with ID " + app.id)  
        persistenceEngine.addApplication(app)  
        sender ! RegisteredApplication(app.id, masterUrl)  
        schedule() //分配Executor资源  
      }  
    }  

然后向Worker发送LauchExecutor消息,

  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    worker.actor ! LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
    exec.application.driver ! ExecutorAdded(
      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
  }

注意:The number of cores assigned to each executor is configurable. When this is explicitly set, multiple executors from the same application may be launched on the same worker if the worker has enough cores and memory. Otherwise, each executor grabs all the cores available on the worker by default, in which case only one executor may be launched on each worker.

  1. Worker创建ExecutorRunner,并向Master发送ExecutorStateChanged的消息
val manager = new ExecutorRunner(  
  appId,  
  execId,  
  appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),  
  cores_,  
  memory_,  
  self,  
  workerId,  
  host,  
  webUi.boundPort,  
  publicAddress,  
  sparkHome,  
  executorDir,  
  akkaUrl,  
  conf,  
  appLocalDirs, ExecutorState.LOADING)  
executors(appId + "/" + execId) = manager  
manager.start()  
coresUsed += cores_  
memoryUsed += memory_  
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)  
  1. ExecutorRunner创建CoarseGrainedSchedulerBackend
    在函数fetchAndRunExecutor中,
val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
        sparkHome.getAbsolutePath, substituteVariables)

其中appDesc.command是(在SparkDeploySchedulerBackend中定义)

val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
  1. CoarseGrainedExecutorBackend向SparkDeploySchedulerBackend发送RegisterExecutor消息
  override def onStart() {
    logInfo("Connecting to driver: " + driverUrl)
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      driver = Some(ref)
      ref.ask[RegisteredExecutor.type](
        RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
    }(ThreadUtils.sameThread).onComplete {
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      case Success(msg) => Utils.tryLogNonFatalError {
        Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor
      }
      case Failure(e) => logError(s"Cannot register with driver: $driverUrl", e)
    }(ThreadUtils.sameThread)
  }
  1. CoarseGrainedExecutorBackend在接收到SparkDeploySchedulerBackend发送的RegisteredExecutor消息后,创建Executor
  override def receive: PartialFunction[Any, Unit] = {
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      val (hostname, _) = Utils.parseHostPort(hostPort)
      executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

至此Executor创建成功,:)。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容