Master 与 Worker
在 Standalone 模式下 Spark 使用 Master 来进行资源的分配与管理,Worker 为具体执行任务(提供计算服务)的节点。
Master 在 SparkCore 的 org.apache.spark.deploy.master 包下,Worker 在 org.apache.spark.deploy.worker 包下。
Master
Master 维护着 Application 和 Worker 的信息,当有新的 Application 注册过来的时候为其找到合适的 Worker 并为其在 Worker 上启动 Executor。
Master 的 main() 方法 (执行启动 Spark 命令):
private[deploy] object Master extends Logging {
def main(argStrings: Array[String]) {
Utils.initDaemon(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
}
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
// RPC 环境
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
// 实例化 Master
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
}
接下来,我们先看几个 Master 的成员变量,看看 Master 维护着哪些信息:
// Workers 信息
val workers = new HashSet[WorkerInfo]
private val idToWorker = new HashMap[String, WorkerInfo]
private val addressToWorker = new HashMap[RpcAddress, WorkerInfo]
// Applications 信息
val apps = new HashSet[ApplicationInfo]
val idToApp = new HashMap[String, ApplicationInfo]
private val waitingApps = new ArrayBuffer[ApplicationInfo]
private val endpointToApp = new HashMap[RpcEndpointRef, ApplicationInfo]
private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
private val completedApps = new ArrayBuffer[ApplicationInfo]
Master 的初始化工作是在 onStart() 方法中完成的:
override def onStart(): Unit = {
// Web UI 配置
webUi = new MasterWebUI(this, webUiPort)
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
// 设置定期检查 Worker 是否超时任务
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
// 其它操作
// 启动指标系统
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
// 指标系统绑定到 Web UI
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
// 根据模式的不同,选择不同的持久化机制
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
// 使用 Zookeeper
case "ZOOKEEPER" =>
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
// 其它的模式
}
persistenceEngine = persistenceEngine_
// leader 选举
leaderElectionAgent = leaderElectionAgent_
}
Master 在初始化的时候主要做了两件事,一是启动 Web UI,二是持久化和选举模式的选择。
当 Worker 启动的时候,会向 Master 发送一条注册信息,然后 Master 将其加入到 Worker 集合中(成员变量 workers),响应 Worker 注册请求的代码可以在 Master.receive() 方法中找到:
override def receive: PartialFunction[Any, Unit] = {
// 选举机制
case ElectedLeader =>
// 略略略
// Master 主备切换
// 有兴趣的可以看看
case CompleteRecovery => completeRecovery()
// Worker 注册
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
if (state == RecoveryState.STANDBY) {
workerRef.send(MasterInStandby)
} else if (idToWorker.contains(id)) {
workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
} else {
// 主要看这里
// 将 Worker 发送过来的信息进行了封装
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
// 注册 Worker
if (registerWorker(worker)) {
// 持久化 Worker 信息
// 比如使用 Zookeepeer
persistenceEngine.addWorker(worker)
// 向 Worker 发送一条确认信息
workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
// 编制执行计划
// 因为有新 Worker 的到达,所以可以唤醒一些等待执行的 Application
// 等会看内部源码
schedule()
} else {
// 略略略...
}
}
// Application 注册
case RegisterApplication(description, driver) =>
// 等会看
// Executor 状态改变
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
// 略略略...
// Driver 状态改变
case DriverStateChanged(driverId, state, exception) =>
// 略略略...
// 心跳机制
case Heartbeat(workerId, worker) =>
// 略略略...
// Worker 超时检查
case CheckForWorkerTimeOut =>
timeOutDeadWorkers()
}
Master.registerWorker() 的实现细节:
private def registerWorker(worker: WorkerInfo): Boolean = {
// Worker 集合中可能已经注册过这个 Worker (发生了故障)
// 将其移出
workers.filter { w =>
(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
}.foreach { w =>
workers -= w
}
// Worker RPC Address
// 补充 addressToWorker 信息 (成员变量)
val workerAddress = worker.endpoint.address
if (addressToWorker.contains(workerAddress)) {
// 略略略
}
// 将 Worker 放入到 Worker 集合
workers += worker
// 补充 idToWorker 信息 (成员变量)
idToWorker(worker.id) = worker
addressToWorker(workerAddress) = worker
if (reverseProxy) {
// Web UI 添加
webUi.addProxyTargets(worker.id, worker.webUiAddress)
}
true
}
在收到 Worker 发送过来的注册信息后,Master 会将 Worker 的信息添加到自身的成员变量中,并且还会将 Worker 的信息持久化一份 (当 Master 挂了之后可以恢复)。
接下来我们看看,在 Master 收到 Application 发送过来的注册消息后会做哪些工作,同样可以在 Master.receive() 方法中可以找到:
override def receive: PartialFunction[Any, Unit] = {
// Application 注册
case RegisterApplication(description, driver) =>
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
// 构建 Application
val app = createApplication(description, driver)
// 注册 App
// 主要是添加到等待队列,并记录 App 信息,上文提到的成员变量
// 与注册 Worker 类似
registerApplication(app)
// 持久化
persistenceEngine.addApplication(app)
// 向 Driver 发送一条确认信息
driver.send(RegisteredApplication(app.id, self))
// 有新的 App 的加入
// 所以需要重新编排下执行计划
// 如果等待执行队列为空,那么这个 App 可能就直接被 Master 调度
schedule()
}
}
在注册完 Worker 和 Application 后,最后都调用了 Master.schedule() 方法来编排执行计划,接下来我们看看其实现细节:
private def schedule(): Unit = {
// 对活跃的 Worker 进行随机打散
// 尽量负载均衡
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
// 活跃的 Worker 数
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
// 针对 Driver 的集群启动模式
for (driver <- waitingDrivers.toList) {
var launched = false
var numWorkersVisited = 0
// 循环遍历活跃的 Worker,直到某个 Worker 满足 Driver 的资源申请
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
// 当前的 Worker 可以满足 Driver 申请的资源数
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
// 启动 Driver
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
// Application 的 Executor 的分配机制
startExecutorsOnWorkers()
}
我这里关注的重点是 Master 如何对 Application 的进行资源分配的,所以我们继续看下 startExecutorsOnWorkers() 的实现细节:
private def startExecutorsOnWorkers(): Unit = {
// Master 使用先进先出的调度方式
for (app <- waitingApps if app.coresLeft > 0) {
// App 需要每个 Executor CPU 的核数
// 在提交任务的时候由我们指定的
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// 在活跃的 Workers 中找出满足 App 资源需求的 Worker
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
// scheduleExecutorsOnWorkers() 这个方法有点复杂
// 计算在哪些 Worker 上启动 Executor
// 返回可执行的 Worker 数组及内核数
// 内部有两种分配方式:
// 1. 多启动 Worker,让 Worker 均摊提供给 Executor 的 Core.
// 2. 少启动 Worker,让 Worker 提供最大的 Core 给 Executor(最大的 Core 为
// 每个 Executor 需要的Core).
// 有兴趣的可以自己看下
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// 对 Worker 进行资源分配
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
我们还是看点简单的,看看 allocateWorkerResourceToExecutors() 的内部实现细节:
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// 在这个 Worker 上需要启动的 Executor 数
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
// 可分配的 Core 数量
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
// Executor 信息
val exec = app.addExecutor(worker, coresToAssign)
// 在 Worker 上启动 Executor
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
launchExecutor() 的实现细节:
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
worker.addExecutor(exec)
// 向 Worker 发送启动 Executor 的消息
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
// 向 Driver 发送分配完成消息
// Executor 在启动完成后会主动联系 Driver
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
简单的总结一下,Master 会维护着 Application 和 Worker 的信息,当有新的 Application 过来注册时,Master 会从 Worker 集合中找到合适的 Worker 并为其分配并启动 Executor。
Worker
Worker 是具体提供计算节点,当收到 Master 的启动 Executor 启动后,就会启动 Executor 进程。
我们还是先看看 Worker 维护着哪些信息:
private val workerId = generateWorkerId()
val drivers = new HashMap[String, DriverRunner]
val executors = new HashMap[String, ExecutorRunner]
val finishedDrivers = new LinkedHashMap[String, DriverRunner]
val appDirectories = new HashMap[String, Seq[String]]
val finishedApps = new HashSet[String]
接下来我们从 Worker 的 onStart() 方法入手,看看 Worker 做了哪些工作:
override def onStart() {
// 创建工作目录
createWorkDir()
// Web UI
shuffleService.startIfEnabled()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
// 向 Master 进行注册
// 内部实现会调用 tryRegisterAllMasters(),与 SchedulerBackend 有点像
registerWithMaster()
// 指标信息绑定界面
metricsSystem.registerSource(workerSource)
metricsSystem.start()
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
接下来我们看看,当 Worker 收到 Master 的启动 Executor 请求后会做哪些工作,可以在 Worker.receive() 方法中找到:
override def receive: PartialFunction[Any, Unit] = synchronized {
case msg: RegisterWorkerResponse =>
handleRegisterResponse(msg)
// 发送心跳
case SendHeartbeat =>
if (connected) { sendToMaster(Heartbeat(workerId, self)) }
// 其它的都忽略
// 启动 Executor
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
try {
// 创建 Executor 工作目录
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
// 创建本地工作目录
val appLocalDirs = appDirectories.getOrElse(...)
// 创建一个 ExecutorRunner 负责管理
val manager = new ExecutorRunner(...)
// 启动 Executor
// 内部使用 ProcessBuilder 去启动 Executor (通过启动命令)
manager.start()
// 更新自身的资源使用情况
coresUsed += cores_
memoryUsed += memory_
// 给 Master 一条确认信息
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
} catch {
// 略略略...
}
}
// 这个是针对集群模式 Driver 的
case LaunchDriver(driverId, driverDesc) =>
val driver = new DriverRunner(...)
// 启动 Driver
// 与 Executor 类似,内部使用 ProcessBuilder 去启动
driver.start()
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
}
ExecutorRunner.start() 的实现细节:
private[worker] def start() {
// 启动一个线程去处理
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
// 略略略
}
fetchAndRunExecutor() 的实现细节:
// 下载并运行 Executor
private def fetchAndRunExecutor() {
try {
// 启动一个进程
val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
// 略略略
// 启动进程
process = builder.start()
// 略略略
} catch {
// 略略略
}
}
ProcessBuilder.start() 主要就是通过命令启动一个新的进程,内部实现太复杂了,这里就不看了。
简单的总结一下,我们只看了很少的 Worker 源码,也就是接收到 Master 的启动 Executor 请求,然后启动 Executor (新进程)。