Spark原理和图
一个Job的作业过程:
SparkContext.runJob()
* DAGScheduler.runJob(rdd) - submitJobs(): DAGScheduler.handleJobSubmitted(); DAGScheduler.submitMissingTasks(); - MemoryStore.store(), BlockManagerInfo.store(), SparkContext.broadcate()
* DAGScheduler.submitStage()
* TaskSchedulerImpl.scheduler()
* TaskSetManager.submitTask()
* Executor.runTask()
* RDD.compute()
* Executor.finshedTask()
* TaskSetManager.handleTaskFinsish()
* TaskSchedulerImpl.removeTask()
* DAGScheduler.markStageAsFinished()
* DAGScheduler.finishedJob()
Spark内核初始化: new SparkContext(conf)
//1. 创建spark内核对象 :new SparkContext(SparkConf conf)
new SparkContext(conf){
logInfo(s"Running Spark version $SPARK_VERSION")
* INFO SparkContext - Running Spark version 2.3.2
// 核心初始化方法, 一堆的创建对象和初始化;
try{
_conf = config.clone()
_conf.validateSettings(){// 校验SparkConf中的配置, 校验成功则打印Submitted
val executorOptsKey = "spark.executor.extraJavaOptions"
val executorClasspathKey = "spark.executor.extraClassPath"
val driverOptsKey = "spark.driver.extraJavaOptions"
val driverClassPathKey = "spark.driver.extraClassPath"
val driverLibraryPathKey = "spark.driver.extraLibraryPath"
val sparkExecutorInstances = "spark.executor.instances"
// Validate memory fractions: 校验内存比例, 应该 >0, <1;
// Warn against deprecated memory fractions (unless legacy memory management mode is enabled)
if (contains("spark.submit.deployMode")) {// deployMode 只能为cluster或者 client
get("spark.submit.deployMode") match {
case "cluster" | "client" =>
case e => throw new SparkException("spark.submit.deployMode can only be \"cluster\" or " +
"\"client\".")
}
}
val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s")
val executorHeartbeatInterval = getTimeAsSeconds("spark.executor.heartbeatInterval", "10s")
require(executorTimeoutThreshold > executorHeartbeatInterval, "must be no less than the value of executorHeartbeatInterval")
}
logInfo(s"Submitted application: $appName")
* INFO SparkContext - Submitted application: StructuredStreamingDemo
// 当用yarn cluster模式时, 必须设置 spark.yarn.app.id; 废弃原来的yarn cluster模式;
if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
throw new SparkException("Detected yarn cluster mode, Please use spark-submit.")
}
//若没有配置参数, 就用 本机hostname:0 设置为 host:port
_conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
_conf.setIfMissing("spark.driver.port", "0")
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)// executor.Id也设为"driver",难道后面yarn模式时,再重新设置?
/** 从配置中spark.jars 和 spark.files开头的参数中, 解析出jars和resource相关路径;
*
*/
_jars = Utils.getUserJars(_conf){
val sparkJars = conf.getOption("spark.jars")
sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
}
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
// 当开启spark的eventLog功能后, 默认用 /tmp/spark-events作为eventLog的目录;
val isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
_eventLogDir =
if (isEventLogEnabled) {
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/")
Some(Utils.resolveURI(unresolvedDir))
} else {
None
}
_eventLogCodec = {//对EventLog的压缩方式; 由spark.eventLog.compress开关,默认false, 压缩方式由spark.io.compression.codec指定,默认lz4
val compress = _conf.getBoolean("spark.eventLog.compress", false)
if (compress && isEventLogEnabled) {
Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
}else {
None
}
}
_listenerBus = new LiveListenerBus(_conf){
private[spark] val metrics = new LiveListenerBusMetrics(conf)
private val started = new AtomicBoolean(false)
private val stopped = new AtomicBoolean(false)
private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]()
}
// 干嘛? 有什么作用?
_statusStore = AppStatusStore.createLiveStore(conf){
val store = new ElementTrackingStore(new InMemoryStore(), conf)
val listener = new AppStatusListener(store, conf, true)
new AppStatusStore(store, listener = Some(listener))
}
listenerBus.addToStatusQueue(_statusStore.listener.get){//LiveListenerBus.addToStatusQueue(listener: SparkListenerInterface)
addToQueue(listener, APP_STATUS_QUEUE){
queues.asScala.find(_.name == queue) match {
case Some(queue) => queue.addListener(listener)
case None => // new SparkContext是queue为None,进入这里
val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
newQueue.addListener(listener)
if (started.get()) {
newQueue.start(sparkContext)
}
queues.add(newQueue)
}
}
}
/** 创建SparkEnv对象: 432行, 创建对象包括:
* - securityManager: SecurityManager
* - serializer: JavaSerializer
* - rpcEnv:NettyRpcEnv
* - broadcastManager: BroadcastManager
* - shuffleManager: ShuffleManager
* - blockManagerMaster: BlockManagerMaster
* - blockManager: BlockManager
* - metricsSystem: MetricsSystem
* - envInstance:SparkEnv
*/
_env = createSparkEnv(_conf, isLocal, listenerBus){//SparkEnv.createDriverEnv()
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master)){
val securityManager = new SecurityManager(conf, ioEncryptionKey)
* INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(it-out-allen); groups with view permissions: Set(); users with modify permissions: Set(it-out-allen); groups with modify permissions: Set()
// 创建序列化者 serializer:
val serializer = instantiateClassFromConf[Serializer]("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
// 创建基于Netty的Rpc通信后台;
val rpcEnv:NettyRpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,securityManager, clientMode = !isDriver)
* INFO Utils: Successfully started service 'sparkDriver' on port 58014.
// 创建广播管理器
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
// 创建ShuffMangaer
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
// 创建BlockManagerMaster,用于接收BM的注册信息;
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint),conf,isDriver)
* INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
// 先创建BM对象,但要晚一点 掉initialize()方法后,才能被使用;
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,blockTransferService, securityManager, numUsableCores)
* INFO DiskBlockManager: Created local directory at C:\Users\it-out-allen\AppData\Local\Temp\blockmgr-da6bb514-0c2f-48d9-8ed2-7c09e134f42d
* INFO MemoryStore: MemoryStore started with capacity 1992.0 MB
// 创建运行监控系统:MetricsSystem
val metricsSystem = if (isDriver) {
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
}else{
conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager).start()
}
val envInstance:SparkEnv = new SparkEnv(executorId,rpcEnv,serializer,closureSerializer,serializerManager,mapOutputTracker,shuffleManager,
broadcastManager,blockManager,securityManager,metricsSystem,memoryManager,outputCommitCoordinator,conf)
return envInstance;
}
}
* INFO SecurityManager - Changing view acls to: 86177 (开始)
* INFO SparkEnv - Registering OutputCommitCoordinator (结束)
SparkEnv.set(_env) //设置为静态资源的全局变量;
/** 创建Server服务 (new SparkUI), 由spark.ui.enabled默认开启; 并启动绑定到4040端口(bind() );
* handlers: ArrayBuffer[ServletContextHandler] 总共25个handlers,拦截各种url并处理请求: 主要是:
* - /jobs: /jobs/json; /jobs/job/json; /jobs/job/kill;
* - /stages: /stages/json; /stages/stage; /stages/stage/json; /stages/pool; /stages/pool/json;
* - /storage: /storage/json; /storage/rdd; /storage/rdd/json;
* - /environment: /environment/json
* - /executors: /executors/json; /executors/threadDump; /executors/threadDump/json
* - /static:
* - /api:
*/
_ui =
if(conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", startTime)){
new SparkUI(store, sc, conf, securityManager, appName, basePath, startTime, appSparkVersion){
val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)
// 核心:
initialize(){
val jobsTab = new JobsTab(this, store)
attachTab(jobsTab)
val stagesTab = new StagesTab(this, store)
attachTab(stagesTab)
attachTab(new StorageTab(this, store))
attachTab(new EnvironmentTab(this, store))
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
attachHandler(ApiRootResource.getServletHandler(this))
// These should be POST only, but, the YARN AM proxy won't proxy POSTs
attachHandler(createRedirectHandler("/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST")))
attachHandler(createRedirectHandler("/stages/stage/kill", "/stages/", stagesTab.handleKillRequest, httpMethods = Set("GET", "POST")))
}
}
}
}else{
// For tests, do not enable the UI
None
}
_ui.foreach(_.bind()){//WebUI.bind()
assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!")
try {
val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")
serverInfo = Some(startJettyServer(host, port, sslOptions, handlers, conf, name)){//JettyUtils.startJettyServer()
addFilters(handlers, conf)
val pool = new QueuedThreadPool.setDaemon(true)
val server = new Server(pool)
val collection = new ContextHandlerCollection
server.setHandler(collection)
val serverExecutor = new ScheduledExecutorScheduler(s"$serverName-JettyScheduler", true)
try{
server.start()
val securePort = sslOptions.createJettySslContextFactory().map {}
val (httpConnector, httpPort) = Utils.startServiceOnPort[ServerConnector](port, httpConnect, conf, serverName)
handlers.foreach { h=>
h.setVirtualHosts(toVirtualHosts(SPARK_CONNECTOR_NAME))
val gzipHandler = new GzipHandler()
gzipHandler.setHandler(h)
collection.addHandler(gzipHandler)
gzipHandler.start()
}
pool.setMaxThreads(math.max(pool.getMaxThreads, minThreads))
ServerInfo(server, httpPort, securePort, conf, collection)
}catch {
case e: Exception => server.stop();throw e;
}
}
logInfo(s"Bound $className to $host, and started at $webUrl")
* INFO SparkUI - Bound SparkUI to 0.0.0.0, and started at http://ldsver50:4040
} catch {
case e: Exception =>logError(s"Failed to bind $className", e)
System.exit(1)
}
}
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
if (jars != null) {
jars.foreach(addJar)
}
if (files != null) {
files.foreach(addFile)
}
// 什么意思? 依次从
_executorMemory = _conf.getOption("spark.executor.memory")
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))).orElse(Option(System.getenv("SPARK_MEM"))
.map(warnSparkMem)).map(Utils.memoryStringToMb).getOrElse(1024)
executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
executorEnvs ++= _conf.getExecutorEnv
executorEnvs("SPARK_USER") = sparkUser
// 创建心跳检查器
_heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
/** 初始化最重要三大对象创建: SchedulerBackend,TaskScheduler,DAGScheduler
* - 创建SchedulerBachend,TaskScheduler
*
*/
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode){
val MAX_LOCAL_TASK_FAILURES = 1
master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_N_REGEX(threads) => {//local[4]是进入这里;
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// 若为 * 则获取所有可用的core, 否则用local[num]中的num;
val threadCount = if (threads == "*") localCpuCount else threads.toInt
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true){
private val speculationScheduler =ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")
val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s") // TaskSet饥饿超时时间
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) //每个任务(Task)所需的CPU个数, 默认1;
private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager]
val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString) // 调度策略, 默认FIFO先进先出; 通过spark.scheduler.mode修改;
val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
}
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount){
private val launcherBackend = new LauncherBackend() {
override def conf: SparkConf = LocalSchedulerBackend.this.conf
override def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED)
}
launcherBackend.connect(){//LauncherBackend.connect()
val port = conf.getOption(LauncherProtocol.CONF_LAUNCHER_PORT)
.orElse(sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT))
.map(_.toInt)
val secret = conf.getOption(LauncherProtocol.CONF_LAUNCHER_SECRET)
.orElse(sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET))
if (port != None && secret != None) {
val s = new Socket(InetAddress.getLoopbackAddress(), port.get)
connection = new BackendConnection(s)
connection.send(new Hello(secret.get, SPARK_VERSION)){
}
clientThread = LauncherBackend.threadFactory.newThread(connection)
clientThread.start()
_isConnected = true
}
}
}
// 初始化,主要把backend的实现类 LocalSchedulerBackend传入scheduler,并 构建FIFO/Fair的调度器
scheduler.initialize(backend){//TaskSchedulerImpl.initialize(backend: SchedulerBackend)
this.backend = backend
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: "+s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
}
(backend, scheduler)
}
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>{
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*, M] means the number of cores on the computer with M failures
// local[N, M] means exactly N threads with M failures
val threadCount = if (threads == "*") localCpuCount else threads.toInt
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
}
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>{
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
throw new SparkException(
"Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
memoryPerSlaveInt, sc.executorMemory))
}
val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
val masterUrls = localCluster.start()
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
localCluster.stop()
}
(backend, scheduler)
}
case masterUrl =>{//Yarn 集群是这里启动;
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend){
TaskSchedulerImpl.start(){
backend[YarnClusterSchedulerBackend].start()
}
}
(backend, scheduler)
} catch {
case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}
}
}
_schedulerBackend = sched
_taskScheduler = ts
// 创建DAGScheduler
_dagScheduler = new DAGScheduler(this){
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)
}
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet){//RpcEndpointRef.ask(message: Any)
ask(message, defaultAskTimeout){//NettyRpcEnv.ask(message: Any, timeout: RpcTimeout)
nettyEnv.ask(new RequestMessage(nettyEnv.address, this, message), timeout){//NettyRpcEnv.ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T]
val promise = Promise[Any]()
val remoteAddr = message.receiver.address
if (remoteAddr == address) {
val p = Promise[Any]()
p.future.onComplete {
case Success(response) => onSuccess(response)
case Failure(e) => onFailure(e)
}(ThreadUtils.sameThread)
dispatcher.postLocalMessage(message, p)
} else {
val rpcMessage = RpcOutboxMessage(message.serialize(this), onFailure, (client, response) => onSuccess(deserialize[Any](client, response)))
postToOutbox(message.receiver, rpcMessage)
promise.future.failed.foreach {
case _: TimeoutException => rpcMessage.onTimeout()
case _ =>
}(ThreadUtils.sameThread)
}
val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
override def run(): Unit = {
onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " +s"in ${timeout.duration}"))
}
}, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
promise.future.onComplete { v => timeoutCancelable.cancel(true)}(ThreadUtils.sameThread)
promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
}
}
}
// 进行Scheduler相关的启动和初始化
_taskScheduler.start(){//TaskSchedulerImpl.start()
backend.start(){
//1. 当Local模式时: 实现LocalSchedulerBackend.start()
LocalSchedulerBackend.start(){
val rpcEnv = SparkEnv.get.rpcEnv
val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores)
localEndpoint = rpcEnv.setupEndpoint("LocalSchedulerBackendEndpoint", executorEndpoint)
//
listenerBus.post(SparkListenerExecutorAdded(
System.currentTimeMillis, executorEndpoint.localExecutorId,
new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty))){//LiveListenerBus.post()
metrics.numEventsPosted.inc()
synchronized {
if (!started.get()) {
queuedEvents += event
return
}
}
}
launcherBackend.setAppId(appId)
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
// 2. StandaloneSchedulerBackend.start(){}
StandaloneSchedulerBackend.start(){}
}
if (!isLocal && conf.getBoolean("spark.speculation", false)) {//
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleWithFixedDelay(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
_ui.foreach(_.setAppId(_applicationId))
// 初始化BlockManger
_env.blockManager.initialize(_applicationId){//BlockManager.initialize()
blockTransferService.init(this)
* INFO NettyBlockTransferService: Server created on 192.168.41.1:59715
shuffleClient.init(appId)
blockReplicationPolicy = {
val priorityClass = conf.get("spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
val clazz = Utils.classForName(priorityClass)
val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
logInfo(s"Using $priorityClass for block replication policy")
ret
}
// 向Driver.BlockManagerMaster中注册 BlockManager
val id =BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)
val idFromMaster = master.registerBlockManager(id,maxOnHeapMemory,maxOffHeapMemory,slaveEndpoint);{//BlockManagerMaster.registerBlockManager()
logInfo(s"Registering BlockManager $blockManagerId")
* BlockManagerMaster - Registering BlockManager null
val updatedId = driverEndpoint.askSync[BlockManagerId](RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)){
RpcEndpointRef.askSync(){//askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T
val future = ask[T](message, timeout)
timeout.awaitResult(future)
}
}
logInfo(s"Registered BlockManager $updatedId")
updatedId
}
blockManagerId = if (idFromMaster != null) idFromMaster else id
shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
} else {
blockManagerId
}
logInfo(s"Initialized BlockManager: $blockManagerId")
* INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.41.1, 59559, None)
}
_env.metricsSystem.start()
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
// 创建eventLog的管理组件;
_eventLogger =
if (isEventLogEnabled) {// 默认关闭, 可通过spark.eventLog.enabled开启;
val logger = new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, _conf, _hadoopConfiguration)
logger.start(){//EventLoggingListener.start()
if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.")
}
val workingPath = logPath + IN_PROGRESS
val path = new Path(workingPath)
val uri = path.toUri
val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
val isDefaultLocal = defaultFs == null || defaultFs == "file"
val dstream =
if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
new FileOutputStream(uri.getPath)
} else {
hadoopDataStream = Some(fileSystem.create(path))
hadoopDataStream.get
}
try {
val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
val bstream = new BufferedOutputStream(cstream, outputBufferSize)
EventLoggingListener.initEventLog(bstream, testing, loggedEvents){//EventLoggingListener.initEventLog(
val metadata = SparkListenerLogStart(SPARK_VERSION)
val eventJson = JsonProtocol.logStartToJson(metadata)
val metadataJson = compact(eventJson) + "\n"
logStream.write(metadataJson.getBytes(StandardCharsets.UTF_8))
if (testing && loggedEvents != null) {
loggedEvents += eventJson
}
}
fileSystem.setPermission(path, LOG_FILE_PERMISSIONS)
writer = Some(new PrintWriter(bstream))// 对于Local模式, 写出就是PrintWriter
logInfo("Logging events to %s".format(logPath))
* EventLoggingListener - Logging events to file:/E:/studyAndTest/eventLogDir/local-1586004614114
} catch {
case e: Exception => dstream.close(); throw e;
}
}
listenerBus.addToEventLogQueue(logger)
Some(logger)
} else {
None
}
// 动态扩容机制
val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) // spark.dynamicAllocation.enabled 决定, 默认false不开启;
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf, _env.blockManager.master))
case _ => None
}
} else {
None
}
_executorAllocationManager.foreach(_.start()){//ExecutorAllocationManager.start()
listenerBus.addToManagementQueue(listener)
val scheduleTask = new Runnable() {
override def run(): Unit = {
try {
schedule(){//ExecutorAllocationManager.schedule()
val now = clock.getTimeMillis
updateAndSyncNumExecutorsTarget(now)
val executorIdsToBeRemoved = ArrayBuffer[String]()
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
executorIdsToBeRemoved += executorId
}
!expired
}
if (executorIdsToBeRemoved.nonEmpty) {
removeExecutors(executorIdsToBeRemoved)
}
}
} catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
}
}
}
executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
}
// 自动清理器: 默认开启, spark.cleaner.referenceTracking关闭;
_cleaner =
if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
} else {
None
}
_cleaner.foreach(_.start()){//ContextCleaner.start()
cleaningThread.setDaemon(true)
cleaningThread.setName("Spark Context Cleaner")
cleaningThread.start()
/* 每periodicGCInterval秒来一次Full GC;
* GC频率: periodicGCInterval : spark.cleaner.periodicGC.interval 决定, 默认30min分钟;
* 默认从30分钟后开启执行
*/
periodicGCService.scheduleAtFixedRate(new Runnable {
override def run(): Unit = System.gc()
}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
}
setupAndStartListenerBus()
postEnvironmentUpdate()
postApplicationStart(){
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls)){
if (stopped.get()) {
return
}
metrics.numEventsPosted.inc()
if (queuedEvents == null) {
postToQueues(event){//LiveListenerBus.postToQueues(event: SparkListenerEvent)
val it = queues.iterator()
while (it.hasNext()) {
it.next().post(event){//AsyncEventQueue.post()
if (stopped.get()) {
return
}
eventCount.incrementAndGet()
if (eventQueue.offer(event)) {// 从队列若取到数据,就直接返回;
return
}
eventCount.decrementAndGet()
droppedEvents.inc()
droppedEventsCounter.incrementAndGet()
val droppedCount = droppedEventsCounter.get
if (droppedCount > 0) {
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
val previous = new java.util.Date(prevLastReportTimestamp)
logWarning(s"Dropped $droppedCount events from $name since $previous.")
}
}
}
}
}
}
return
}
synchronized {
if (!started.get()) {
queuedEvents += event
return
}
}
postToQueues(event)
}
}
// Post init: 启动相关后台服务?
_taskScheduler.postStartHook()
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
_shutdownHookRef = ShutdownHookManager.addShutdownHook(ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")
stop()
}
//SparkContext构造方法中的长 try()方法结束: 其末尾打印的日志为:
* BlockManager - Initialized BlockManager: BlockManagerId(driver, ldsver50, 60851, None)
* ContextHandler - Started o.s.j.s.ServletContextHandler@77f991c{/metrics/json,null,AVAILABLE,@Spark}
}
private val nextShuffleId = new AtomicInteger(0)
private val nextRddId = new AtomicInteger(0)
// 构造SparkContext结束代码: 将
private val allowMultipleContexts: Boolean =config.getBoolean("spark.driver.allowMultipleContexts", false)
SparkContext.setActiveContext(this, allowMultipleContexts){//SparkContext.setActiveContext(sc: SparkContext, allowMultipleContexts: Boolean)
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
assertNoOtherContextIsRunning(sc, allowMultipleContexts)
contextBeingConstructed = None
activeContext.set(sc)// AtomicReference[SparkContext].set(sc)
}
}
}
//2. 其他构建SparkContext的接口
//2.1 新建或复用该线程已存在的
SparkContext.getOrCreate(conf:SparkConf){
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
if (activeContext.get() == null) {// 新建并存于 ThreadLocal中;
setActiveContext(new SparkContext(config), allowMultipleContexts = false)
} else {// 复用原来, 直接从ThreadLocal取出返回;
if (config.getAll.nonEmpty) {
logWarning("Using an existing SparkContext; some configuration may not take effect.")
}
}
activeContext.get()
}
}
SparkContext.runJob(rdd) 提交Job执行的源码
//2. 提交一个Job作业
sc.runJob(rdd, func, partitions){//SparkContext.runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U ): Array[U]
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions){
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res){//SparkContext.runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],resultHandler: (Int, U) => Unit)
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
* SparkContext - Starting job: start at SSparkHelper.scala:81
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
/**
*
*/
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get){//DAGScheduler.runJob
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties){//DAGScheduler.submitJob(rdd)
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException("Attempting to access a non-existent partition: Total number of partitions: " + maxPartitions)
}
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
// 提交一个JobSubmitted 到 DAGSchedulerEventProcessLoop[的父类EventLoop].eventQueue[LinkedBlockQueue]队列中;
/** 提交一个 Job(由JobSubmitted封装)到 DAGSchedulerEventProcessLoop的eventQueue中;
* - eventQueue位于DAGSchedulerEventProcessLoop的父类EventLoop中; 在其父类的 run()方法中会循环的从eventQueue中取Event并处理;
* - DAGSchedulerEventProcessLoop从eventQueue队列消费并处理的逻辑,是由一个独立的线程完成的, 该线程名为:dag-scheduler-event-loop
*/
eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,SerializationUtils.clone(properties))){//DAGScheudlerEventProcessLoop.post()
EventLoop.post(event){//EventLoop.(event: E): Unit
eventQueue[LinkedBlockingQueue].put(event)//event==JobSubmitted
}
// 循环处理Job的"dag-scheduler-event-loop"线程,逻辑代码:
{
// 在DAGScheduler的构造函数中 创建DAGSchedulerEventProcessLoop实例;
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
// 在DAGScheduler的构造函数的末尾, 启动了循环消费 eventQueue的线程"dag-scheduler-event-loop"线程;
eventProcessLoop.start(){
onStart()
eventThread.start(){//Thread.start()
override def run(){
try {
// 循环从 eventQueue队列尾部取Event,并通过各自的实现类进一步处理;
while (!stopped.get) {
val event = eventQueue.take()
try {
onReceive(event){//该抽象方法由 DAGSchedulerEventProcessLoop实现; 对JobSubmitted, ExecutorAdded,JobCancelled等event的处理;
val timerContext = timer.time()
try {
doOnReceive(event){// DAGSchedulerEventProcessLoop.doOnReceive(event: DAGSchedulerEvent): Unit
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties){
}
// to do...
}
} finally {
timerContext.stop()
}
}
} catch {
case NonFatal(e) =>
try {
onError(e)
} catch {
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
} catch {
case ie: InterruptedException => // exit even if eventQueue is not empty
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
}
}
}
waiter
}
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
*
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
results
}
}