Spark 源码浅析之 Master 和 Worker 部分

Master 与 Worker

在 Standalone 模式下 Spark 使用 Master 来进行资源的分配与管理,Worker 为具体执行任务(提供计算服务)的节点。

流程概览

Master 在 SparkCore 的 org.apache.spark.deploy.master 包下,Worker 在 org.apache.spark.deploy.worker 包下。

Master

Master 维护着 Application 和 Worker 的信息,当有新的 Application 注册过来的时候为其找到合适的 Worker 并为其在 Worker 上启动 Executor。

流程概览

Master 的 main() 方法 (执行启动 Spark 命令):

private[deploy] object Master extends Logging {

  def main(argStrings: Array[String]) {
    Utils.initDaemon(log)
    val conf = new SparkConf
    val args = new MasterArguments(argStrings, conf)
    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
    rpcEnv.awaitTermination()
  }

  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
   
    // RPC 环境
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      // 实例化 Master
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
    val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  }
}

接下来,我们先看几个 Master 的成员变量,看看 Master 维护着哪些信息:

// Workers 信息
val workers = new HashSet[WorkerInfo]
private val idToWorker = new HashMap[String, WorkerInfo]
private val addressToWorker = new HashMap[RpcAddress, WorkerInfo]

// Applications 信息
val apps = new HashSet[ApplicationInfo]
val idToApp = new HashMap[String, ApplicationInfo]
private val waitingApps = new ArrayBuffer[ApplicationInfo]
private val endpointToApp = new HashMap[RpcEndpointRef, ApplicationInfo]
private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
private val completedApps = new ArrayBuffer[ApplicationInfo]

Master 的初始化工作是在 onStart() 方法中完成的:

override def onStart(): Unit = {
  // Web UI 配置
  webUi = new MasterWebUI(this, webUiPort)
  webUi.bind()
  masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort

  // 设置定期检查 Worker 是否超时任务
  checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
    override def run(): Unit = Utils.tryLogNonFatalError {
      self.send(CheckForWorkerTimeOut)
    }
  }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)

  // 其它操作

  // 启动指标系统
  masterMetricsSystem.registerSource(masterSource)
  masterMetricsSystem.start()
  applicationMetricsSystem.start()
  // 指标系统绑定到 Web UI
  masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
  applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)

  // 根据模式的不同,选择不同的持久化机制
  val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
      
    // 使用 Zookeeper
    case "ZOOKEEPER" =>
      val zkFactory =
        new ZooKeeperRecoveryModeFactory(conf, serializer)
      (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))

    // 其它的模式
      
  }
  persistenceEngine = persistenceEngine_
  // leader 选举
  leaderElectionAgent = leaderElectionAgent_
}

Master 在初始化的时候主要做了两件事,一是启动 Web UI,二是持久化和选举模式的选择。

当 Worker 启动的时候,会向 Master 发送一条注册信息,然后 Master 将其加入到 Worker 集合中(成员变量 workers),响应 Worker 注册请求的代码可以在 Master.receive() 方法中找到:

override def receive: PartialFunction[Any, Unit] = {
  // 选举机制
  case ElectedLeader =>
    // 略略略

  // Master 主备切换
  // 有兴趣的可以看看
  case CompleteRecovery => completeRecovery()

  // Worker 注册
  case RegisterWorker(
    id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
    
    if (state == RecoveryState.STANDBY) {
      workerRef.send(MasterInStandby)
    } else if (idToWorker.contains(id)) {
      workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
    } else {
      // 主要看这里
      // 将 Worker 发送过来的信息进行了封装
      val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
        workerRef, workerWebUiUrl)
      // 注册 Worker
      if (registerWorker(worker)) {
        // 持久化 Worker 信息
        // 比如使用 Zookeepeer
        persistenceEngine.addWorker(worker)
        // 向 Worker 发送一条确认信息
        workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
        // 编制执行计划
        // 因为有新 Worker 的到达,所以可以唤醒一些等待执行的 Application
        // 等会看内部源码
        schedule()
      } else {
          // 略略略...
      }
    }

  // Application 注册
  case RegisterApplication(description, driver) =>
    // 等会看

  // Executor 状态改变
  case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
    // 略略略...

  // Driver 状态改变
  case DriverStateChanged(driverId, state, exception) =>
    // 略略略...

  // 心跳机制
  case Heartbeat(workerId, worker) =>
    // 略略略...

  // Worker 超时检查
  case CheckForWorkerTimeOut =>
    timeOutDeadWorkers()

}

Master.registerWorker() 的实现细节:

private def registerWorker(worker: WorkerInfo): Boolean = {

  // Worker 集合中可能已经注册过这个 Worker (发生了故障)
  // 将其移出
  workers.filter { w =>
    (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
  }.foreach { w =>
    workers -= w
  }

  // Worker RPC Address
  // 补充 addressToWorker 信息 (成员变量)
  val workerAddress = worker.endpoint.address
  if (addressToWorker.contains(workerAddress)) {
    // 略略略
  }
    
  // 将 Worker 放入到 Worker 集合
  workers += worker
  
  // 补充 idToWorker 信息 (成员变量)
  idToWorker(worker.id) = worker
  addressToWorker(workerAddress) = worker
  if (reverseProxy) {
     // Web UI 添加
     webUi.addProxyTargets(worker.id, worker.webUiAddress)
  }
  true
}

在收到 Worker 发送过来的注册信息后,Master 会将 Worker 的信息添加到自身的成员变量中,并且还会将 Worker 的信息持久化一份 (当 Master 挂了之后可以恢复)。

接下来我们看看,在 Master 收到 Application 发送过来的注册消息后会做哪些工作,同样可以在 Master.receive() 方法中可以找到:

override def receive: PartialFunction[Any, Unit] = {

  // Application 注册
  case RegisterApplication(description, driver) =>

    if (state == RecoveryState.STANDBY) {
      // ignore, don't send response
    } else {
      // 构建 Application
      val app = createApplication(description, driver)
      // 注册 App
      // 主要是添加到等待队列,并记录 App 信息,上文提到的成员变量
      // 与注册 Worker 类似
      registerApplication(app)
      // 持久化
      persistenceEngine.addApplication(app)
      // 向 Driver 发送一条确认信息
      driver.send(RegisteredApplication(app.id, self))
      // 有新的 App 的加入
      // 所以需要重新编排下执行计划
      // 如果等待执行队列为空,那么这个 App 可能就直接被 Master 调度
      schedule()
    }

}

在注册完 Worker 和 Application 后,最后都调用了 Master.schedule() 方法来编排执行计划,接下来我们看看其实现细节:

private def schedule(): Unit = {
    
  // 对活跃的 Worker 进行随机打散
  // 尽量负载均衡
  val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
  // 活跃的 Worker 数
  val numWorkersAlive = shuffledAliveWorkers.size
  var curPos = 0
    
  // 针对 Driver 的集群启动模式
  for (driver <- waitingDrivers.toList) {
    var launched = false
    var numWorkersVisited = 0
    // 循环遍历活跃的 Worker,直到某个 Worker 满足 Driver 的资源申请
    while (numWorkersVisited < numWorkersAlive && !launched) {
      val worker = shuffledAliveWorkers(curPos)
      numWorkersVisited += 1
      // 当前的 Worker 可以满足 Driver 申请的资源数
      if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
        // 启动 Driver
        launchDriver(worker, driver)
        waitingDrivers -= driver
        launched = true
      }
      curPos = (curPos + 1) % numWorkersAlive
    }
  }
    
  // Application 的 Executor 的分配机制
  startExecutorsOnWorkers()
}

我这里关注的重点是 Master 如何对 Application 的进行资源分配的,所以我们继续看下 startExecutorsOnWorkers() 的实现细节:

private def startExecutorsOnWorkers(): Unit = {

  // Master 使用先进先出的调度方式
  for (app <- waitingApps if app.coresLeft > 0) {
    // App 需要每个 Executor CPU 的核数
    // 在提交任务的时候由我们指定的
    val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
    // 在活跃的 Workers 中找出满足 App 资源需求的 Worker
    val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
      .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
        worker.coresFree >= coresPerExecutor.getOrElse(1))
      .sortBy(_.coresFree).reverse
      
    // scheduleExecutorsOnWorkers() 这个方法有点复杂
    // 计算在哪些 Worker 上启动 Executor
    // 返回可执行的 Worker 数组及内核数
    // 内部有两种分配方式:
    //   1. 多启动 Worker,让 Worker 均摊提供给 Executor 的 Core.
    //   2. 少启动 Worker,让 Worker 提供最大的 Core 给 Executor(最大的 Core 为
    //      每个 Executor 需要的Core).
    // 有兴趣的可以自己看下
    val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

    // 对 Worker 进行资源分配
    for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
      allocateWorkerResourceToExecutors(
        app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
    }
  }
}

我们还是看点简单的,看看 allocateWorkerResourceToExecutors() 的内部实现细节:

private def allocateWorkerResourceToExecutors(
    app: ApplicationInfo,
    assignedCores: Int,
    coresPerExecutor: Option[Int],
    worker: WorkerInfo): Unit = {
  
  // 在这个 Worker 上需要启动的 Executor 数
  val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
  // 可分配的 Core 数量
  val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
  for (i <- 1 to numExecutors) {
    // Executor 信息
    val exec = app.addExecutor(worker, coresToAssign)
    // 在 Worker 上启动 Executor
    launchExecutor(worker, exec)
    app.state = ApplicationState.RUNNING
  }
}

launchExecutor() 的实现细节:

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  worker.addExecutor(exec)
  // 向 Worker 发送启动 Executor 的消息
  worker.endpoint.send(LaunchExecutor(masterUrl,
    exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
  // 向 Driver 发送分配完成消息
  // Executor 在启动完成后会主动联系 Driver
  exec.application.driver.send(
    ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}

简单的总结一下,Master 会维护着 Application 和 Worker 的信息,当有新的 Application 过来注册时,Master 会从 Worker 集合中找到合适的 Worker 并为其分配并启动 Executor。

Worker

Worker 是具体提供计算节点,当收到 Master 的启动 Executor 启动后,就会启动 Executor 进程。

流程概览

我们还是先看看 Worker 维护着哪些信息:

private val workerId = generateWorkerId()
val drivers = new HashMap[String, DriverRunner]
val executors = new HashMap[String, ExecutorRunner]
val finishedDrivers = new LinkedHashMap[String, DriverRunner]
val appDirectories = new HashMap[String, Seq[String]]
val finishedApps = new HashSet[String]

接下来我们从 Worker 的 onStart() 方法入手,看看 Worker 做了哪些工作:

override def onStart() {
  // 创建工作目录
  createWorkDir()
  // Web UI
  shuffleService.startIfEnabled()
  webUi = new WorkerWebUI(this, workDir, webUiPort)
  webUi.bind()

  workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
    
  // 向 Master 进行注册
  // 内部实现会调用 tryRegisterAllMasters(),与 SchedulerBackend 有点像
  registerWithMaster()

  // 指标信息绑定界面
  metricsSystem.registerSource(workerSource)
  metricsSystem.start()
  metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}

接下来我们看看,当 Worker 收到 Master 的启动 Executor 请求后会做哪些工作,可以在 Worker.receive() 方法中找到:

override def receive: PartialFunction[Any, Unit] = synchronized {
  case msg: RegisterWorkerResponse =>
    handleRegisterResponse(msg)

  // 发送心跳
  case SendHeartbeat =>
    if (connected) { sendToMaster(Heartbeat(workerId, self)) }

  // 其它的都忽略

  // 启动 Executor
  case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
    if (masterUrl != activeMasterUrl) {
      logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
    } else {
      try {
        // 创建 Executor 工作目录
        val executorDir = new File(workDir, appId + "/" + execId)
        if (!executorDir.mkdirs()) {
          throw new IOException("Failed to create directory " + executorDir)
        }

        // 创建本地工作目录
        val appLocalDirs = appDirectories.getOrElse(...)
        
        // 创建一个 ExecutorRunner 负责管理
        val manager = new ExecutorRunner(...)
        
        // 启动 Executor
        // 内部使用 ProcessBuilder 去启动 Executor (通过启动命令)
        manager.start()
        
        // 更新自身的资源使用情况
        coresUsed += cores_
        memoryUsed += memory_
        
        // 给 Master 一条确认信息
        sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
      } catch {
        // 略略略...
      }
    }


  // 这个是针对集群模式 Driver 的
  case LaunchDriver(driverId, driverDesc) =>
 
    val driver = new DriverRunner(...)
    // 启动 Driver
    // 与 Executor 类似,内部使用 ProcessBuilder 去启动
    driver.start()

    coresUsed += driverDesc.cores
    memoryUsed += driverDesc.mem

}

ExecutorRunner.start() 的实现细节:

private[worker] def start() {
  // 启动一个线程去处理
  workerThread = new Thread("ExecutorRunner for " + fullId) {
    override def run() { fetchAndRunExecutor() }
  }
  workerThread.start()

  // 略略略
}

fetchAndRunExecutor() 的实现细节:

// 下载并运行 Executor
private def fetchAndRunExecutor() {
  try {
    // 启动一个进程
    val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
      memory, sparkHome.getAbsolutePath, substituteVariables)
    
    // 略略略
    
    // 启动进程
    process = builder.start()

    // 略略略
      
  } catch {
    // 略略略
  }
}

ProcessBuilder.start() 主要就是通过命令启动一个新的进程,内部实现太复杂了,这里就不看了。

简单的总结一下,我们只看了很少的 Worker 源码,也就是接收到 Master 的启动 Executor 请求,然后启动 Executor (新进程)。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容