Spark源码分析:Spark运行模式及原理
1.运行模式概述
spark运行模式多种多样,分为以下几种
- 本地模式
- 为分布式
- 集群
- standalone
- mesos
- hadoop yarn
基本框架:
2.相关类介绍
-
taskscheduler/taskschedulerImpl
private[spark] trait TaskScheduler {private val appId = "spark-application-" + System.currentTimeMillis def rootPool: Pool def schedulingMode: SchedulingMode def start(): Unit // Invoked after system has successfully initialized (typically in spark context). // Yarn uses this to bootstrap allocation of resources based on preferred locations, // wait for slave registrations, etc. def postStartHook() { } // Disconnect from the cluster. def stop(): Unit // Submit a sequence of tasks to run. def submitTasks(taskSet: TaskSet): Unit // Cancel a stage. def cancelTasks(stageId: Int, interruptThread: Boolean): Unit /** * Kills a task attempt. * * @return Whether the task was successfully killed. */ def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called. def setDAGScheduler(dagScheduler: DAGScheduler): Unit // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs. def defaultParallelism(): Int /** * Update metrics for in-progress tasks and let the master know that the BlockManager is still * alive. Return true if the driver knows about the given block manager. Otherwise, return false, * indicating that the block manager should re-register. */ def executorHeartbeatReceived( execId: String, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean /** * Get an application ID associated with the job. * * @return An application ID */ def applicationId(): String = appId /** * Process a lost executor */ def executorLost(executorId: String, reason: ExecutorLossReason): Unit /** * Get an application's attempt ID associated with the job. * * @return An application's Attempt ID */ def applicationAttemptId(): Option[String] }
taskscheduler主要用于核dagscheduler交互,负责任务的具体调度和运行。
其核心接口是取消和提交任务sumbittasks和cancletasks -
schedulerbackend
主要用于和底层资源调度系统交互(yarn mesos)
/**
* A backend interface for scheduling systems that allows plugging in different ones under
* TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as
* machines become available and can launch tasks on them.
*/
private[spark] trait SchedulerBackend {
private val appId = "spark-application-" + System.currentTimeMillisdef start(): Unit def stop(): Unit def reviveOffers(): Unit def defaultParallelism(): Int /** * Requests that an executor kills a running task. * * @param taskId Id of the task. * @param executorId Id of the executor the task is running on. * @param interruptThread Whether the executor should interrupt the task thread. * @param reason The reason for the task kill. */ def killTask( taskId: Long, executorId: String, interruptThread: Boolean, reason: String): Unit = throw new UnsupportedOperationException def isReady(): Boolean = true /** * Get an application ID associated with the job. * * @return An application ID */ def applicationId(): String = appId /** * Get the attempt ID for this run, if the cluster manager supports multiple * attempts. Applications run in client mode will not have attempt IDs. * * @return The application attempt id, if available. */ def applicationAttemptId(): Option[String] = None /** * Get the URLs for the driver logs. These URLs are used to display the links in the UI * Executors tab for the driver. * @return Map containing the log names and their respective URLs */ def getDriverLogUrls: Option[Map[String, String]] = None }
-
executor
实际任务的执行都有executor执行,executor对每个人物创建一个taskrunner,交给线程池执行
3.local模式
localbackend 响应scheduler的receiveOffers 请求,根据可用的cpu的核的设定值直接生成cpu资源返回给scheduler,并通过executor在线程池中依次启动核运行scheduler返回的任务列表。
4.yarn
YARN是一个资源管理、任务调度的框架,主要包含三大模块:ResourceManager(RM)、NodeManager(NM)、ApplicationMaster(AM)。
其中,ResourceManager负责所有资源的监控、分配和管理;ApplicationMaster负责每一个具体应用程序的调度和协调;NodeManager负责每一个节点的维护。
对于所有的applications,RM拥有绝对的控制权和对资源的分配权。而每个AM则会和RM协商资源,同时和NodeManager通信来执行和监控task。几个模块之间的关系如图所示
Yarn Cluster 模式
Spark的Yarn Cluster 模式流程如下:
- 本地用YARN Client 提交App 到 Yarn Resource Manager
- Yarn Resource Manager 选个 YARN Node Manager,用它来
创建个ApplicationMaster,SparkContext相当于是这个ApplicationMaster管的APP,生成YarnClusterScheduler与YarnClusterSchedulerBackend选择集群中的容器启动CoarseCrainedExecutorBackend,用来启动spark.executor。 - ApplicationMaster与CoarseCrainedExecutorBackend会有远程调用。
Spark的Yarn Client 模式流程如下:
- 本地启动SparkContext,生成YarnClientClusterScheduler 和 YarnClientClusterSchedulerBackend
- YarnClientClusterSchedulerBackend启动yarn.Client,用它提交App 到 Yarn Resource Manager
- Yarn Resource Manager 选个 YARN Node Manager,用它来选择集群中的容器启动CoarseCrainedExecutorBackend,用来启动spark.executor
- YarnClientClusterSchedulerBackend与CoarseCrainedExecutorBackend会有远程调用。
standalone
启动app,在SparkContxt启动过程中,先初始化DAGScheduler 和 TaskScheduler,并初始化 SparkDeploySchedulerBackend,并在其内部启动DriverEndpoint和ClientEndpoint。
ClientEndpoint想Master注册app,Master收到注册信息后把该app加入到等待运行app列表中,等待由Master分配给该app worker。
app获取到worker后,Master通知Worker的WorkerEndpont创建CoarseGrainedExecutorBackend进程,在该进程中创建执行容器executor
executor创建完毕后发送信息给Master和DriverEndpoint,告知Executor创建完毕,在SparkContext注册,后等待DriverEndpoint发送执行任务的消息。
SparkContext分配TaskSet给CoarseGrainedExecutorBackend,按一定调度策略在executor执行。
CoarseGrainedExecutorBackend在Task处理的过程中,把处理Task的状态发送给DriverEndpoint,Spark根据不同的执行结果来处理。若处理完毕,则继续发送其他TaskSet。
-
app运行完成后,SparkContext会进行资源回收,销毁Worker的CoarseGrainedExecutorBackend进程,然后注销自己。
启动master
private[deploy] object Master extends Logging {
val SYSTEM_NAME = "sparkMaster"
val ENDPOINT_NAME = "Master"def main(argStrings: Array[String]) { Utils.initDaemon(log) val conf = new SparkConf //解析参数 val args = new MasterArguments(argStrings, conf) val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf) rpcEnv.awaitTermination() } /** * Start the Master and return a three tuple of: * (1) The Master RpcEnv * (2) The web UI bound port * (3) The REST server bound port, if any */ def startRpcEnvAndEndpoint( host: String, port: Int, webUiPort: Int, conf: SparkConf): (RpcEnv, Int, Option[Int]) = { val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr) val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest) (rpcEnv, portsResponse.webUIPort, portsResponse.restPort) } }
master解析参数
/**
* Command-line parser for the master.
*/
private[master] class MasterArguments(args: Array[String], conf: SparkConf) extends Logging {
var host = Utils.localHostName()
var port = 7077
var webUiPort = 8080
var propertiesFile: String = null
// Check for settings in environment variables
if (System.getenv("SPARK_MASTER_IP") != null) {
logWarning("SPARK_MASTER_IP is deprecated, please use SPARK_MASTER_HOST")
host = System.getenv("SPARK_MASTER_IP")
}
if (System.getenv("SPARK_MASTER_HOST") != null) {
host = System.getenv("SPARK_MASTER_HOST")
}
if (System.getenv("SPARK_MASTER_PORT") != null) {
port = System.getenv("SPARK_MASTER_PORT").toInt
}
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
}
parse(args.toList)
// This mutates the SparkConf, so all accesses to it must be made after this line
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
if (conf.contains("spark.master.ui.port")) {
webUiPort = conf.get("spark.master.ui.port").toInt
}
@tailrec
private def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
host = value
parse(tail)
case ("--host" | "-h") :: value :: tail =>
Utils.checkHost(value, "Please use hostname " + value)
host = value
parse(tail)
case ("--port" | "-p") :: IntParam(value) :: tail =>
port = value
parse(tail)
case "--webui-port" :: IntParam(value) :: tail =>
webUiPort = value
parse(tail)
case ("--properties-file") :: value :: tail =>
propertiesFile = value
parse(tail)
case ("--help") :: tail =>
printUsageAndExit(0)
case Nil => // No-op
case _ =>
printUsageAndExit(1)
}
/**
* Print usage and exit JVM with the given exit code.
*/
private def printUsageAndExit(exitCode: Int) {
// scalastyle:off println
System.err.println(
"Usage: Master [options]\n" +
"\n" +
"Options:\n" +
" -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" +
" -h HOST, --host HOST Hostname to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: 7077)\n" +
" --webui-port PORT Port for web UI (default: 8080)\n" +
" --properties-file FILE Path to a custom Spark properties file.\n" +
" Default is conf/spark-defaults.conf.")
// scalastyle:on println
System.exit(exitCode)
}
}
系统环境变量<spark−default.conf中的属性<命令行参数<应用级代码中的参数设置
启动worker
private[deploy] object Worker extends Logging {
val SYSTEM_NAME = "sparkWorker"
val ENDPOINT_NAME = "Worker"
def main(argStrings: Array[String]) {
Utils.initDaemon(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir, conf = conf)
rpcEnv.awaitTermination()
}
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): RpcEnv = {
// The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
rpcEnv
}
def isUseLocalNodeSSLConfig(cmd: Command): Boolean = {
val pattern = """\-Dspark\.ssl\.useNodeLocalConf\=(.+)""".r
val result = cmd.javaOpts.collectFirst {
case pattern(_result) => _result.toBoolean
}
result.getOrElse(false)
}
def maybeUpdateSSLSettings(cmd: Command, conf: SparkConf): Command = {
val prefix = "spark.ssl."
val useNLC = "spark.ssl.useNodeLocalConf"
if (isUseLocalNodeSSLConfig(cmd)) {
val newJavaOpts = cmd.javaOpts
.filter(opt => !opt.startsWith(s"-D$prefix")) ++
conf.getAll.collect { case (key, value) if key.startsWith(prefix) => s"-D$key=$value" } :+
s"-D$useNLC=true"
cmd.copy(javaOpts = newJavaOpts)
} else {
cmd
}
}
}
剩下的解析和master类似
资源回收
我们在概述中提到了“ app运行完成后,SparkContext会进行资源回收,销毁Worker的CoarseGrainedExecutorBackend进程,然后注销自己。”接下来我们就来讲解下Master和Executor是如何感知到Application的退出的。
调用栈如下:
- SparkContext.stop
- DAGScheduler.stop
- TaskSchedulerImpl.stop
- CoarseGrainedSchedulerBackend.stop
- CoarseGrainedSchedulerBackend.stopExecutors
- CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply - CoarseGrainedExecutorBackend.receive
- Executor.stop- CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply
- CoarseGrainedSchedulerBackend.DriverEndpoint.receiveAndReply - CoarseGrainedExecutorBackend.receive
- CoarseGrainedSchedulerBackend.stopExecutors
- CoarseGrainedSchedulerBackend.stop
- TaskSchedulerImpl.stop
- DAGScheduler.stop