Spark-Core原理和源码

Spark原理和图

一个Job的作业过程:

SparkContext.runJob()
    * DAGScheduler.runJob(rdd) - submitJobs():  DAGScheduler.handleJobSubmitted(); DAGScheduler.submitMissingTasks(); - MemoryStore.store(), BlockManagerInfo.store(), SparkContext.broadcate()
        * DAGScheduler.submitStage()
            * TaskSchedulerImpl.scheduler()
                * TaskSetManager.submitTask()
                    * Executor.runTask()
                        * RDD.compute()
                    * Executor.finshedTask()
                * TaskSetManager.handleTaskFinsish()
            * TaskSchedulerImpl.removeTask()
        * DAGScheduler.markStageAsFinished()
    * DAGScheduler.finishedJob()
图片.png

Spark内核初始化: new SparkContext(conf)

//1. 创建spark内核对象 :new SparkContext(SparkConf conf)

new SparkContext(conf){
    logInfo(s"Running Spark version $SPARK_VERSION")
        * INFO  SparkContext - Running Spark version 2.3.2
    
    // 核心初始化方法, 一堆的创建对象和初始化;
    try{
        _conf = config.clone()
        _conf.validateSettings(){// 校验SparkConf中的配置, 校验成功则打印Submitted
            val executorOptsKey = "spark.executor.extraJavaOptions"
            val executorClasspathKey = "spark.executor.extraClassPath"
            val driverOptsKey = "spark.driver.extraJavaOptions"
            val driverClassPathKey = "spark.driver.extraClassPath"
            val driverLibraryPathKey = "spark.driver.extraLibraryPath"
            val sparkExecutorInstances = "spark.executor.instances"
            
            // Validate memory fractions: 校验内存比例, 应该 >0, <1;
            // Warn against deprecated memory fractions (unless legacy memory management mode is enabled)
            
            if (contains("spark.submit.deployMode")) {// deployMode 只能为cluster或者 client
              get("spark.submit.deployMode") match {
                case "cluster" | "client" =>
                case e => throw new SparkException("spark.submit.deployMode can only be \"cluster\" or " +
                  "\"client\".")
              }
            }

            val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s")
            val executorHeartbeatInterval = getTimeAsSeconds("spark.executor.heartbeatInterval", "10s")
            require(executorTimeoutThreshold > executorHeartbeatInterval, "must be no less than the value of executorHeartbeatInterval")
            
        } 
        logInfo(s"Submitted application: $appName")
            * INFO  SparkContext - Submitted application: StructuredStreamingDemo
        
        //  当用yarn cluster模式时, 必须设置 spark.yarn.app.id; 废弃原来的yarn cluster模式;
        if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
          throw new SparkException("Detected yarn cluster mode, Please use spark-submit.")
        }
        
        //若没有配置参数, 就用 本机hostname:0 设置为 host:port
        _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
        _conf.setIfMissing("spark.driver.port", "0")
        _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)// executor.Id也设为"driver",难道后面yarn模式时,再重新设置?
        
        /** 从配置中spark.jars 和 spark.files开头的参数中, 解析出jars和resource相关路径;
        * 
        */
        _jars = Utils.getUserJars(_conf){
            val sparkJars = conf.getOption("spark.jars")
            sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
        }
        _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
        
        // 当开启spark的eventLog功能后, 默认用 /tmp/spark-events作为eventLog的目录;
        val isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
        _eventLogDir =
            if (isEventLogEnabled) {
                val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/")
                Some(Utils.resolveURI(unresolvedDir))
            } else {
                None
            }
        _eventLogCodec = {//对EventLog的压缩方式; 由spark.eventLog.compress开关,默认false, 压缩方式由spark.io.compression.codec指定,默认lz4
            val compress = _conf.getBoolean("spark.eventLog.compress", false)
            if (compress && isEventLogEnabled) {
                Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
            }else {
                None
            }
        }
        
        
        _listenerBus = new LiveListenerBus(_conf){
            private[spark] val metrics = new LiveListenerBusMetrics(conf)
            private val started = new AtomicBoolean(false)
            private val stopped = new AtomicBoolean(false)
            private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
            private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]()
        }
        
        // 干嘛? 有什么作用? 
        _statusStore = AppStatusStore.createLiveStore(conf){
            val store = new ElementTrackingStore(new InMemoryStore(), conf)
            val listener = new AppStatusListener(store, conf, true)
            new AppStatusStore(store, listener = Some(listener))
        }
        listenerBus.addToStatusQueue(_statusStore.listener.get){//LiveListenerBus.addToStatusQueue(listener: SparkListenerInterface)
            addToQueue(listener, APP_STATUS_QUEUE){
                queues.asScala.find(_.name == queue) match {
                    case Some(queue) => queue.addListener(listener)
                    
                    case None => // new SparkContext是queue为None,进入这里
                        val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
                        newQueue.addListener(listener)
                        if (started.get()) {
                          newQueue.start(sparkContext)
                        }
                        queues.add(newQueue)
                }
            }
        }
        
        
        /** 创建SparkEnv对象: 432行, 创建对象包括:
        *   - securityManager: SecurityManager
        *   - serializer: JavaSerializer
        *   - rpcEnv:NettyRpcEnv
        *   - broadcastManager: BroadcastManager
        *   - shuffleManager: ShuffleManager
        *   - blockManagerMaster: BlockManagerMaster
        *   - blockManager: BlockManager
        *   - metricsSystem: MetricsSystem
        *   - envInstance:SparkEnv
        */
        _env = createSparkEnv(_conf, isLocal, listenerBus){//SparkEnv.createDriverEnv()
            SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master)){
                val securityManager = new SecurityManager(conf, ioEncryptionKey)
                    * INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(it-out-allen); groups with view permissions: Set(); users  with modify permissions: Set(it-out-allen); groups with modify permissions: Set()
                
                // 创建序列化者 serializer: 
                val serializer = instantiateClassFromConf[Serializer]("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
                
                // 创建基于Netty的Rpc通信后台;
                val rpcEnv:NettyRpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,securityManager, clientMode = !isDriver)
                    * INFO Utils: Successfully started service 'sparkDriver' on port 58014.
                
                // 创建广播管理器
                val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
                
                // 创建ShuffMangaer
                val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
                
                // 创建BlockManagerMaster,用于接收BM的注册信息;
                val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint),conf,isDriver)
                    * INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
                    
                // 先创建BM对象,但要晚一点 掉initialize()方法后,才能被使用;
                val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,blockTransferService, securityManager, numUsableCores)
                    * INFO DiskBlockManager: Created local directory at C:\Users\it-out-allen\AppData\Local\Temp\blockmgr-da6bb514-0c2f-48d9-8ed2-7c09e134f42d
                    * INFO MemoryStore: MemoryStore started with capacity 1992.0 MB

                // 创建运行监控系统:MetricsSystem
                val metricsSystem = if (isDriver) {
                    MetricsSystem.createMetricsSystem("driver", conf, securityManager)
                }else{
                    conf.set("spark.executor.id", executorId)
                    val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager).start()
                }
                
                val envInstance:SparkEnv = new SparkEnv(executorId,rpcEnv,serializer,closureSerializer,serializerManager,mapOutputTracker,shuffleManager,
                            broadcastManager,blockManager,securityManager,metricsSystem,memoryManager,outputCommitCoordinator,conf)
                
                return envInstance;
            }
        }
            * INFO  SecurityManager - Changing view acls to: 86177      (开始)
            * INFO  SparkEnv - Registering OutputCommitCoordinator      (结束)
        SparkEnv.set(_env) //设置为静态资源的全局变量;
        
        
        /** 创建Server服务 (new SparkUI), 由spark.ui.enabled默认开启; 并启动绑定到4040端口(bind() ); 
        *   handlers: ArrayBuffer[ServletContextHandler]    总共25个handlers,拦截各种url并处理请求:  主要是:
        *       - /jobs:    /jobs/json; /jobs/job/json; /jobs/job/kill;
        *       - /stages:  /stages/json; /stages/stage; /stages/stage/json; /stages/pool; /stages/pool/json;   
        *       - /storage: /storage/json; /storage/rdd;    /storage/rdd/json;
        *       - /environment: /environment/json
        *       - /executors:   /executors/json; /executors/threadDump; /executors/threadDump/json
        *       - /static:  
        *       - /api: 
        */
        _ui =
          if(conf.getBoolean("spark.ui.enabled", true)) {
            Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", startTime)){
                new SparkUI(store, sc, conf, securityManager, appName, basePath, startTime, appSparkVersion){
                    val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)
                    
                    // 核心: 
                    initialize(){
                        val jobsTab = new JobsTab(this, store)
                        attachTab(jobsTab)
                        val stagesTab = new StagesTab(this, store)
                        attachTab(stagesTab)
                        attachTab(new StorageTab(this, store))
                        attachTab(new EnvironmentTab(this, store))
                        attachTab(new ExecutorsTab(this))
                        attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
                        attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
                        attachHandler(ApiRootResource.getServletHandler(this))

                        // These should be POST only, but, the YARN AM proxy won't proxy POSTs
                        attachHandler(createRedirectHandler("/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST")))
                        attachHandler(createRedirectHandler("/stages/stage/kill", "/stages/", stagesTab.handleKillRequest, httpMethods = Set("GET", "POST")))
                        
                    }
                }
            }
          }else{
            // For tests, do not enable the UI
            None
          } 
        _ui.foreach(_.bind()){//WebUI.bind()
            assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!")
            try {
              val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0")
              serverInfo = Some(startJettyServer(host, port, sslOptions, handlers, conf, name)){//JettyUtils.startJettyServer()
                    addFilters(handlers, conf)
                    val pool = new QueuedThreadPool.setDaemon(true)
                    val server = new Server(pool)
                    val collection = new ContextHandlerCollection
                    server.setHandler(collection)
                    val serverExecutor = new ScheduledExecutorScheduler(s"$serverName-JettyScheduler", true)
                    
                    try{
                        server.start()
                        val securePort = sslOptions.createJettySslContextFactory().map {}
                        val (httpConnector, httpPort) = Utils.startServiceOnPort[ServerConnector](port, httpConnect, conf, serverName)
                        handlers.foreach { h=>
                            h.setVirtualHosts(toVirtualHosts(SPARK_CONNECTOR_NAME))
                            val gzipHandler = new GzipHandler()
                            gzipHandler.setHandler(h)
                            collection.addHandler(gzipHandler)
                            gzipHandler.start()
                        }
                            
                        pool.setMaxThreads(math.max(pool.getMaxThreads, minThreads))
                        ServerInfo(server, httpPort, securePort, conf, collection)
                    }catch {
                        case e: Exception => server.stop();throw e;
                    }
                    
              }
              logInfo(s"Bound $className to $host, and started at $webUrl")
                * INFO  SparkUI - Bound SparkUI to 0.0.0.0, and started at http://ldsver50:4040
                
            } catch {
              case e: Exception =>logError(s"Failed to bind $className", e)
                System.exit(1)
            }
        }
        
        _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
        
        
        if (jars != null) {
          jars.foreach(addJar)
        }
        if (files != null) {
          files.foreach(addFile)
        }
        
        // 什么意思? 依次从
        _executorMemory = _conf.getOption("spark.executor.memory")
            .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))).orElse(Option(System.getenv("SPARK_MEM"))
            .map(warnSparkMem)).map(Utils.memoryStringToMb).getOrElse(1024)
          
        executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"  
        executorEnvs ++= _conf.getExecutorEnv
        executorEnvs("SPARK_USER") = sparkUser
        
        
        // 创建心跳检查器
        _heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))


        /** 初始化最重要三大对象创建: SchedulerBackend,TaskScheduler,DAGScheduler
        *   - 创建SchedulerBachend,TaskScheduler
        *
        */
        
        val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode){
            val MAX_LOCAL_TASK_FAILURES = 1
            master match {
              case "local" =>
                val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
                val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
                scheduler.initialize(backend)
                (backend, scheduler)
            
              case LOCAL_N_REGEX(threads) => {//local[4]是进入这里; 
                    def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
                    // 若为 * 则获取所有可用的core, 否则用local[num]中的num;
                    val threadCount = if (threads == "*") localCpuCount else threads.toInt
                    if (threadCount <= 0) {
                      throw new SparkException(s"Asked to run locally with $threadCount threads")
                    }
                    val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true){
                        private val speculationScheduler =ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")
                        val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s") // TaskSet饥饿超时时间
                        val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) //每个任务(Task)所需的CPU个数, 默认1;
                        private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager]
                        val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
                        private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString) // 调度策略, 默认FIFO先进先出; 通过spark.scheduler.mode修改;
                        val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
                        
                    }
                    
                    val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount){
                        private val launcherBackend = new LauncherBackend() {
                            override def conf: SparkConf = LocalSchedulerBackend.this.conf
                            override def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED)
                        }
                        
                        launcherBackend.connect(){//LauncherBackend.connect()
                            val port = conf.getOption(LauncherProtocol.CONF_LAUNCHER_PORT)
                              .orElse(sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT))
                              .map(_.toInt)
                            val secret = conf.getOption(LauncherProtocol.CONF_LAUNCHER_SECRET)
                              .orElse(sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET))
                            if (port != None && secret != None) {
                              val s = new Socket(InetAddress.getLoopbackAddress(), port.get)
                              connection = new BackendConnection(s)
                              connection.send(new Hello(secret.get, SPARK_VERSION)){
                                  
                              }
                              clientThread = LauncherBackend.threadFactory.newThread(connection)
                              clientThread.start()
                              _isConnected = true
                            }
                                                
                        }
                        
                    }
                    
                    // 初始化,主要把backend的实现类 LocalSchedulerBackend传入scheduler,并 构建FIFO/Fair的调度器
                    scheduler.initialize(backend){//TaskSchedulerImpl.initialize(backend: SchedulerBackend)
                        this.backend = backend
                        schedulableBuilder = {
                          schedulingMode match {
                            case SchedulingMode.FIFO =>
                              new FIFOSchedulableBuilder(rootPool)
                            case SchedulingMode.FAIR =>
                              new FairSchedulableBuilder(rootPool, conf)
                            case _ =>
                              throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: "+s"$schedulingMode")
                          }
                        }
                        schedulableBuilder.buildPools()
                    }
                    (backend, scheduler)
               }

              case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>{
                    def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
                    // local[*, M] means the number of cores on the computer with M failures
                    // local[N, M] means exactly N threads with M failures
                    val threadCount = if (threads == "*") localCpuCount else threads.toInt
                    val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
                    val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
                    scheduler.initialize(backend)
                    (backend, scheduler)
               }
              
              case SPARK_REGEX(sparkUrl) =>
                val scheduler = new TaskSchedulerImpl(sc)
                val masterUrls = sparkUrl.split(",").map("spark://" + _)
                val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
                scheduler.initialize(backend)
                (backend, scheduler)

              case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>{
                    // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
                    val memoryPerSlaveInt = memoryPerSlave.toInt
                    if (sc.executorMemory > memoryPerSlaveInt) {
                      throw new SparkException(
                        "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
                          memoryPerSlaveInt, sc.executorMemory))
                    }

                    val scheduler = new TaskSchedulerImpl(sc)
                    val localCluster = new LocalSparkCluster(
                      numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
                    val masterUrls = localCluster.start()
                    val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
                    scheduler.initialize(backend)
                    backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
                      localCluster.stop()
                    }
                    (backend, scheduler)
                }
                
                
              case masterUrl =>{//Yarn 集群是这里启动;
                    val cm = getClusterManager(masterUrl) match {
                      case Some(clusterMgr) => clusterMgr
                      case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
                    }
                    try {
                      val scheduler = cm.createTaskScheduler(sc, masterUrl)
                      val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
                      cm.initialize(scheduler, backend){
                        TaskSchedulerImpl.start(){
                            backend[YarnClusterSchedulerBackend].start()
                        }
                      }
                      (backend, scheduler)
                    } catch {
                      case se: SparkException => throw se
                      case NonFatal(e) =>
                        throw new SparkException("External scheduler cannot be instantiated", e)
                    }
                }
              
            }
            
        }
        _schedulerBackend = sched
        _taskScheduler = ts
        // 创建DAGScheduler
        _dagScheduler = new DAGScheduler(this){
            private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
            taskScheduler.setDAGScheduler(this)
            
            
        }
        _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet){//RpcEndpointRef.ask(message: Any)
            ask(message, defaultAskTimeout){//NettyRpcEnv.ask(message: Any, timeout: RpcTimeout)
                nettyEnv.ask(new RequestMessage(nettyEnv.address, this, message), timeout){//NettyRpcEnv.ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T]
                    val promise = Promise[Any]()
                    val remoteAddr = message.receiver.address
                    if (remoteAddr == address) {
                        val p = Promise[Any]()
                        p.future.onComplete {
                          case Success(response) => onSuccess(response)
                          case Failure(e) => onFailure(e)
                        }(ThreadUtils.sameThread)
                        dispatcher.postLocalMessage(message, p)
                    } else {
                        val rpcMessage = RpcOutboxMessage(message.serialize(this), onFailure, (client, response) => onSuccess(deserialize[Any](client, response)))
                        postToOutbox(message.receiver, rpcMessage)
                        promise.future.failed.foreach {
                          case _: TimeoutException => rpcMessage.onTimeout()
                          case _ =>
                        }(ThreadUtils.sameThread)
                    }
                    
                    val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
                        override def run(): Unit = {
                          onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " +s"in ${timeout.duration}"))
                        }
                    }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
                      promise.future.onComplete { v => timeoutCancelable.cancel(true)}(ThreadUtils.sameThread)
                    
                    promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
                    
                }
            }
        }
        
        // 进行Scheduler相关的启动和初始化
        _taskScheduler.start(){//TaskSchedulerImpl.start()
            backend.start(){
                //1. 当Local模式时: 实现LocalSchedulerBackend.start()
                LocalSchedulerBackend.start(){
                    val rpcEnv = SparkEnv.get.rpcEnv
                    val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores)
                    localEndpoint = rpcEnv.setupEndpoint("LocalSchedulerBackendEndpoint", executorEndpoint)
                    
                    // 
                    listenerBus.post(SparkListenerExecutorAdded(
                        System.currentTimeMillis, executorEndpoint.localExecutorId,
                        new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty))){//LiveListenerBus.post()
                            metrics.numEventsPosted.inc()
                            synchronized {
                                if (!started.get()) {
                                    queuedEvents += event
                                    return
                                }
                            }
                        }
                    launcherBackend.setAppId(appId)
                    launcherBackend.setState(SparkAppHandle.State.RUNNING)
                }
                
                // 2. StandaloneSchedulerBackend.start(){}
                StandaloneSchedulerBackend.start(){}
                
            }
            
            if (!isLocal && conf.getBoolean("spark.speculation", false)) {// 
                logInfo("Starting speculative execution thread")
                speculationScheduler.scheduleWithFixedDelay(new Runnable {
                    override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
                      checkSpeculatableTasks()
                    }
                }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
            }
            
        }
        _applicationId = _taskScheduler.applicationId()
        _applicationAttemptId = taskScheduler.applicationAttemptId()
        _conf.set("spark.app.id", _applicationId)
        _ui.foreach(_.setAppId(_applicationId))

        
        // 初始化BlockManger
        _env.blockManager.initialize(_applicationId){//BlockManager.initialize()
            blockTransferService.init(this)
                * INFO NettyBlockTransferService: Server created on 192.168.41.1:59715
            shuffleClient.init(appId)
            
            blockReplicationPolicy = {
                  val priorityClass = conf.get("spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
                  val clazz = Utils.classForName(priorityClass)
                  val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
                  logInfo(s"Using $priorityClass for block replication policy")
                  ret
            }
            
            // 向Driver.BlockManagerMaster中注册 BlockManager
            val id =BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)
            val idFromMaster = master.registerBlockManager(id,maxOnHeapMemory,maxOffHeapMemory,slaveEndpoint);{//BlockManagerMaster.registerBlockManager()
                logInfo(s"Registering BlockManager $blockManagerId")
                    * BlockManagerMaster - Registering BlockManager null
                val updatedId = driverEndpoint.askSync[BlockManagerId](RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)){
                    RpcEndpointRef.askSync(){//askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T 
                        val future = ask[T](message, timeout)
                        timeout.awaitResult(future)
                    }
                }
                
                logInfo(s"Registered BlockManager $updatedId")
                updatedId
            }
            
            blockManagerId = if (idFromMaster != null) idFromMaster else id
            shuffleServerId = if (externalShuffleServiceEnabled) {
                logInfo(s"external shuffle service port = $externalShuffleServicePort")
                BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
            } else {
                blockManagerId
            }
            
            logInfo(s"Initialized BlockManager: $blockManagerId")
                * INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.41.1, 59559, None)
            
        }
            
        _env.metricsSystem.start()
        _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
        
        // 创建eventLog的管理组件;
        _eventLogger =
            if (isEventLogEnabled) {// 默认关闭, 可通过spark.eventLog.enabled开启;
                val logger = new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, _conf, _hadoopConfiguration)
                logger.start(){//EventLoggingListener.start()
                    if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
                      throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.")
                    }
                    val workingPath = logPath + IN_PROGRESS
                    val path = new Path(workingPath)
                    val uri = path.toUri
                    val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
                    val isDefaultLocal = defaultFs == null || defaultFs == "file"
    
                    val dstream =
                        if ((isDefaultLocal && uri.getScheme == null) || uri.getScheme == "file") {
                            new FileOutputStream(uri.getPath)
                        } else {
                            hadoopDataStream = Some(fileSystem.create(path))
                            hadoopDataStream.get
                        }
                    
                    try {
                      val cstream = compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
                      val bstream = new BufferedOutputStream(cstream, outputBufferSize)

                      EventLoggingListener.initEventLog(bstream, testing, loggedEvents){//EventLoggingListener.initEventLog(
                            val metadata = SparkListenerLogStart(SPARK_VERSION)
                            val eventJson = JsonProtocol.logStartToJson(metadata)
                            val metadataJson = compact(eventJson) + "\n"
                            logStream.write(metadataJson.getBytes(StandardCharsets.UTF_8))
                            if (testing && loggedEvents != null) {
                                loggedEvents += eventJson
                            }
                      }
                      fileSystem.setPermission(path, LOG_FILE_PERMISSIONS)
                      writer = Some(new PrintWriter(bstream))// 对于Local模式, 写出就是PrintWriter
                      logInfo("Logging events to %s".format(logPath))
                        * EventLoggingListener - Logging events to file:/E:/studyAndTest/eventLogDir/local-1586004614114
                    } catch {
                        case e: Exception => dstream.close(); throw e; 
                    }
                    
                }
                listenerBus.addToEventLogQueue(logger)
                Some(logger)
            } else {
                None
            }
        
        // 动态扩容机制
        val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) // spark.dynamicAllocation.enabled 决定, 默认false不开启;
        _executorAllocationManager =
            if (dynamicAllocationEnabled) {
                schedulerBackend match {
                  case b: ExecutorAllocationClient =>
                    Some(new ExecutorAllocationManager(schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf, _env.blockManager.master))
                  case _ => None
                }
            } else {
                None
            }
        _executorAllocationManager.foreach(_.start()){//ExecutorAllocationManager.start()
            listenerBus.addToManagementQueue(listener)
            val scheduleTask = new Runnable() {
              override def run(): Unit = {
                try {
                    schedule(){//ExecutorAllocationManager.schedule()
                        val now = clock.getTimeMillis
                        updateAndSyncNumExecutorsTarget(now)
                        val executorIdsToBeRemoved = ArrayBuffer[String]()
                        removeTimes.retain { case (executorId, expireTime) =>
                            val expired = now >= expireTime
                            if (expired) {
                                initializing = false
                                executorIdsToBeRemoved += executorId
                            }
                            !expired
                        }
                        if (executorIdsToBeRemoved.nonEmpty) {
                            removeExecutors(executorIdsToBeRemoved)
                        }
                    }
                } catch {
                  case ct: ControlThrowable =>
                    throw ct
                  case t: Throwable =>
                    logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
                }
              }
            }
            executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
            client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
            
        }

        
        // 自动清理器: 默认开启, spark.cleaner.referenceTracking关闭;
        _cleaner =
            if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
                Some(new ContextCleaner(this))
            } else {
                None
            }
        _cleaner.foreach(_.start()){//ContextCleaner.start()
            cleaningThread.setDaemon(true)
            cleaningThread.setName("Spark Context Cleaner")
            cleaningThread.start()
            
            /* 每periodicGCInterval秒来一次Full GC;
            *   GC频率: periodicGCInterval : spark.cleaner.periodicGC.interval 决定, 默认30min分钟;
            *   默认从30分钟后开启执行
            */
            periodicGCService.scheduleAtFixedRate(new Runnable {
              override def run(): Unit = System.gc()
            }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
        }
        
        setupAndStartListenerBus()
        postEnvironmentUpdate()
        postApplicationStart(){
            listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls)){
                if (stopped.get()) {
                    return
                }
                metrics.numEventsPosted.inc()
                if (queuedEvents == null) {
                    postToQueues(event){//LiveListenerBus.postToQueues(event: SparkListenerEvent)
                        val it = queues.iterator()
                        while (it.hasNext()) {
                            it.next().post(event){//AsyncEventQueue.post()
                                if (stopped.get()) {
                                    return
                                }
                                eventCount.incrementAndGet()
                                if (eventQueue.offer(event)) {// 从队列若取到数据,就直接返回;
                                    return
                                }
                                
                                eventCount.decrementAndGet()
                                droppedEvents.inc()
                                droppedEventsCounter.incrementAndGet()
                                val droppedCount = droppedEventsCounter.get
                                if (droppedCount > 0) {
                                  if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
                                    if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
                                      val prevLastReportTimestamp = lastReportTimestamp
                                      lastReportTimestamp = System.currentTimeMillis()
                                      val previous = new java.util.Date(prevLastReportTimestamp)
                                      logWarning(s"Dropped $droppedCount events from $name since $previous.")
                                    }
                                  }
                                }
                                
                            }
                        }
                    }
                  return
                }
                synchronized {
                  if (!started.get()) {
                    queuedEvents += event
                    return
                  }
                }
                postToQueues(event)
    
            }
        }
        
        // Post init: 启动相关后台服务? 
        _taskScheduler.postStartHook()
        _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
        _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
        _executorAllocationManager.foreach { e =>
          _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
        }
        
        _shutdownHookRef = ShutdownHookManager.addShutdownHook(ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
          logInfo("Invoking stop() from shutdown hook")
          stop()
        }
        
        //SparkContext构造方法中的长 try()方法结束: 其末尾打印的日志为:
            * BlockManager - Initialized BlockManager: BlockManagerId(driver, ldsver50, 60851, None)
            * ContextHandler - Started o.s.j.s.ServletContextHandler@77f991c{/metrics/json,null,AVAILABLE,@Spark}
    }
    
    private val nextShuffleId = new AtomicInteger(0)
    private val nextRddId = new AtomicInteger(0)
    
    // 构造SparkContext结束代码: 将
    private val allowMultipleContexts: Boolean =config.getBoolean("spark.driver.allowMultipleContexts", false)
    SparkContext.setActiveContext(this, allowMultipleContexts){//SparkContext.setActiveContext(sc: SparkContext, allowMultipleContexts: Boolean)
        SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
            assertNoOtherContextIsRunning(sc, allowMultipleContexts)
            contextBeingConstructed = None
            activeContext.set(sc)// AtomicReference[SparkContext].set(sc)
        }
    }

}


//2. 其他构建SparkContext的接口
    //2.1 新建或复用该线程已存在的
    SparkContext.getOrCreate(conf:SparkConf){
        SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
          if (activeContext.get() == null) {// 新建并存于 ThreadLocal中;
            setActiveContext(new SparkContext(config), allowMultipleContexts = false)
          } else {// 复用原来, 直接从ThreadLocal取出返回;
            if (config.getAll.nonEmpty) {
              logWarning("Using an existing SparkContext; some configuration may not take effect.")
            }
          }
          activeContext.get()
        }
    }

SparkContext.runJob(rdd) 提交Job执行的源码

//2. 提交一个Job作业  
sc.runJob(rdd, func, partitions){//SparkContext.runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U ): Array[U]
    val cleanedFunc = clean(func)
    runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions){
        val results = new Array[U](partitions.size)
        
        runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res){//SparkContext.runJob[T, U: ClassTag](rdd: RDD[T],func: (TaskContext, Iterator[T]) => U,partitions: Seq[Int],resultHandler: (Int, U) => Unit)
            if (stopped.get()) {
                throw new IllegalStateException("SparkContext has been shutdown")
            }
            
            val callSite = getCallSite
            val cleanedFunc = clean(func)
            logInfo("Starting job: " + callSite.shortForm)
                * SparkContext - Starting job: start at SSparkHelper.scala:81
            
            if (conf.getBoolean("spark.logLineage", false)) {
                logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
            }
            
            /** 
            *
            */
            dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get){//DAGScheduler.runJob
                val start = System.nanoTime
                
                val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties){//DAGScheduler.submitJob(rdd)
                    val maxPartitions = rdd.partitions.length
                    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
                      throw new IllegalArgumentException("Attempting to access a non-existent partition: Total number of partitions: " + maxPartitions)
                    }
                    val jobId = nextJobId.getAndIncrement()
                    if (partitions.size == 0) {
                      // Return immediately if the job is running 0 tasks
                      return new JobWaiter[U](this, jobId, 0, resultHandler)
                    }

                    assert(partitions.size > 0)
                    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
                    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
                    
                    // 提交一个JobSubmitted 到 DAGSchedulerEventProcessLoop[的父类EventLoop].eventQueue[LinkedBlockQueue]队列中;
                    /** 提交一个 Job(由JobSubmitted封装)到 DAGSchedulerEventProcessLoop的eventQueue中;
                    *   - eventQueue位于DAGSchedulerEventProcessLoop的父类EventLoop中; 在其父类的 run()方法中会循环的从eventQueue中取Event并处理;
                    *   - DAGSchedulerEventProcessLoop从eventQueue队列消费并处理的逻辑,是由一个独立的线程完成的, 该线程名为:dag-scheduler-event-loop
                    */
                    eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,SerializationUtils.clone(properties))){//DAGScheudlerEventProcessLoop.post()
                        EventLoop.post(event){//EventLoop.(event: E): Unit 
                            eventQueue[LinkedBlockingQueue].put(event)//event==JobSubmitted
                        }
                        
                        // 循环处理Job的"dag-scheduler-event-loop"线程,逻辑代码:
                        {
                            // 在DAGScheduler的构造函数中 创建DAGSchedulerEventProcessLoop实例;
                            private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
                            
                            // 在DAGScheduler的构造函数的末尾, 启动了循环消费 eventQueue的线程"dag-scheduler-event-loop"线程;
                            eventProcessLoop.start(){
                                onStart()
                                eventThread.start(){//Thread.start()
                                    override def run(){
                                        try {
                                            // 循环从 eventQueue队列尾部取Event,并通过各自的实现类进一步处理;
                                            while (!stopped.get) {
                                              val event = eventQueue.take()
                                              try {
                                                onReceive(event){//该抽象方法由 DAGSchedulerEventProcessLoop实现; 对JobSubmitted, ExecutorAdded,JobCancelled等event的处理;
                                                    val timerContext = timer.time()
                                                    try {
                                                      doOnReceive(event){// DAGSchedulerEventProcessLoop.doOnReceive(event: DAGSchedulerEvent): Unit
                                                        case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
                                                            dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties){
                                                                
                                                                
                                                                
                                                            }

                                                        // to do...
                                                      }
                                                    } finally {
                                                      timerContext.stop()
                                                    }
                                                }
                                              } catch {
                                                case NonFatal(e) =>
                                                  try {
                                                        onError(e)
                                                  } catch {
                                                        case NonFatal(e) => logError("Unexpected error in " + name, e)
                                                  }
                                              }
                                            }
                                        } catch {
                                            case ie: InterruptedException => // exit even if eventQueue is not empty
                                            case NonFatal(e) => logError("Unexpected error in " + name, e)
                                        }
                                        
                                    }
                                }
                            }
                    
                        }
                        
                    }
                    
                    waiter
                    
                }
                
                ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
                waiter.completionFuture.value.get match {
                  case scala.util.Success(_) =>
                    logInfo("Job %d finished: %s, took %f s".format(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
                        * 
                  case scala.util.Failure(exception) =>
                    logInfo("Job %d failed: %s, took %f s".format(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
                    // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
                    val callerStackTrace = Thread.currentThread().getStackTrace.tail
                    exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
                    throw exception
                }           
            }
            progressBar.foreach(_.finishAll())
            rdd.doCheckpoint()
        }
        results
    }
    
    
}


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容