yarn-cluster模式下的spark启动

spark支持standlone、yarn、mesos等多种运行模式,其中standlone模式主要用于线下环境的测试,线上都采用yarn或者mesos进行资源的管控、容错,这篇文章中介绍下spark在yarn-cluster的整个启动流程,重点介绍spark端的实现逻辑,关于yarn的一些细节我们会在其他的章节中进行介绍。


什么是yarn

yarn(Yet Another Resource Negotiator)是hadoop生态圈中用于资源管理、协调、任务隔离的框架,其他的计算编程模型可以基于yarn完成任务的调度执行;更多请查看yarn官网

yarn的架构图

spark on yarn

消息流转图

消息流转图
  1. client端向ResourceManager提交spark任务
  1. ResourceManager根据ApplicationManager请求的参数以及当前集群的运行状况将启动AM进程的请求发给相应的NodeManager
  2. NodeManager在本机上根据AM的启动命令拉起AM进程
  3. ApplicationManager向ResourceManager申请资源,同时将启动executor的ContainerRequest请求发送给ResourceManager;
  4. ResourceManger同样将拉起executor的请求发给相应的NodeManager,它根据executor的启动命令拉起executor进程
  5. executor进程向ApplicationManager中的DriverEndpoint注册自己,后续当ApplicationManger中有任务需要执行时,就会将任务的执行调度到注册成功的executor上;

类交互图

类交互图
  1. 设置环境变量HADOOP_CONF_DIR或者YARN_CONF_DIR来指定yarn-site.xmlcore-site.xml等hadoop和yarn的资源配置信息;
  2. 执行命令行sh bin/spark-submit --master yarn --depoly-mode cluster --files xxx --class xxx --jars xxx来向yarn提交spark应用程序;
  3. 命令行最终转化为执行SparkSubmit.class的main函数,再依次执行参数的解析、校验、转换,最终再运行具体的类的main函数,在yarn模式中执行的类名为org.apache.spark.deploy.yarn.Client;
def main(args: Array[String]): Unit = {
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

参数解析完毕后运行相应的childMainClass,yarn模式下为Client类的main方法。

private def submit(args: SparkSubmitArguments): Unit = {
    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)

    def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
            }
          })
        } catch {
          case e: Exception =>
            // Hadoop's AuthorizationException suppresses the exception's stack trace, which
            // makes the message printed to the output by the JVM not very helpful. Instead,
            // detect exceptions with empty stack traces here, and treat them differently.
            if (e.getStackTrace().length == 0) {
              // scalastyle:off println
              printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
              // scalastyle:on println
              exitFn(1)
            } else {
              throw e
            }
        }
      } else {
        runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
      }
    }
  1. Client中首先向yarn申请一个ApplicationId,然后上传相应的文件、jar、配置参数等到hdfs上;然后提交应用到ResourceManager,如果spark.yarn.submit.waitAppCompletion设置为true,启动进程会一直获取应用的状态信息直到应用状态变为FINISHEDKILLEDFAILED后退出;否则直接退出。
 private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
    : ContainerLaunchContext = {
    logInfo("Setting up container launch context for our AM")
    val appId = newAppResponse.getApplicationId
    val appStagingDir = getAppStagingDir(appId)
    val pySparkArchives =
      if (sparkConf.getBoolean("spark.yarn.isPython", false)) {
        findPySparkArchives()
      } else {
        Nil
      }
    val launchEnv = setupLaunchEnv(appStagingDir, pySparkArchives)
     //上传spark.yarn.jar到/user/{user.home}/.sparkStaging/{applicationid}/__spark__.jar
     //上传userjar到/user/{user.home}/.sparkStaging/{applicationid}/__app__.jar
     //上传--jar中指定的jar到/user/{user.home}/.sparkStaging/{applicationid}/{#linkname}.jar
     //上传--files中指定的文件到/user/{user.home}/.sparkStaging/{applicationid}/{#linkname}
     //上传__spark_conf__.zip中指定的jar到/user/{user.home}/.sparkStaging/{applicationid}/__spark__conf__.zip,conf文件中包括HADOOP_CONF_DIR和YARN_CONF_DIR下面的所有文件以及由SparkConf生成的__spark_conf__.propertie文件
    val localResources = prepareLocalResources(appStagingDir, pySparkArchives)

    // Set the environment variables to be passed on to the executors.
    distCacheMgr.setDistFilesEnv(launchEnv)
    distCacheMgr.setDistArchivesEnv(launchEnv)

    val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
    amContainer.setLocalResources(localResources.asJava)
    amContainer.setEnvironment(launchEnv.asJava)

    val javaOpts = ListBuffer[String]()

    // Set the environment variable through a command prefix
    // to append to the existing value of the variable
    var prefixEnv: Option[String] = None
  1. 根据queue、am的启动命令、依赖的环境变量等信息初始化ApplicationSubmissionContext
 def createApplicationSubmissionContext(
      newApp: YarnClientApplication,
      containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
    val appContext = newApp.getApplicationSubmissionContext
    appContext.setApplicationName(args.appName)
    appContext.setQueue(args.amQueue)
    appContext.setAMContainerSpec(containerContext)
    appContext.setApplicationType("SPARK")
    sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS)
      .map(StringUtils.getTrimmedStringCollection(_))
      .filter(!_.isEmpty())
      .foreach { tagCollection =>
        try {
          // The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use
          // reflection to set it, printing a warning if a tag was specified but the YARN version
          // doesn't support it.
          val method = appContext.getClass().getMethod(
            "setApplicationTags", classOf[java.util.Set[String]])
          method.invoke(appContext, new java.util.HashSet[String](tagCollection))
        } catch {
          case e: NoSuchMethodException =>
            logWarning(s"Ignoring $CONF_SPARK_YARN_APPLICATION_TAGS because this version of " +
              "YARN does not support it")
        }
      }
    sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match {
      case Some(v) => appContext.setMaxAppAttempts(v)
      case None => logDebug("spark.yarn.maxAppAttempts is not set. " +
          "Cluster's default value will be used.")
    }

    if (sparkConf.contains("spark.yarn.am.attemptFailuresValidityInterval")) {
      try {
        val interval = sparkConf.getTimeAsMs("spark.yarn.am.attemptFailuresValidityInterval")
        val method = appContext.getClass().getMethod(
          "setAttemptFailuresValidityInterval", classOf[Long])
        method.invoke(appContext, interval: java.lang.Long)
      } catch {
        case e: NoSuchMethodException =>
          logWarning("Ignoring spark.yarn.am.attemptFailuresValidityInterval because the version " +
            "of YARN does not support it")
      }
    }

    val capability = Records.newRecord(classOf[Resource])
    capability.setMemory(args.amMemory + amMemoryOverhead)
    capability.setVirtualCores(args.amCores)

    if (sparkConf.contains("spark.yarn.am.nodeLabelExpression")) {
      try {
        val amRequest = Records.newRecord(classOf[ResourceRequest])
        amRequest.setResourceName(ResourceRequest.ANY)
        amRequest.setPriority(Priority.newInstance(0))
        amRequest.setCapability(capability)
        amRequest.setNumContainers(1)
        val amLabelExpression = sparkConf.get("spark.yarn.am.nodeLabelExpression")
        val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String])
        method.invoke(amRequest, amLabelExpression)

        val setResourceRequestMethod =
          appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest])
        setResourceRequestMethod.invoke(appContext, amRequest)
      } catch {
        case e: NoSuchMethodException =>
          logWarning("Ignoring spark.yarn.am.nodeLabelExpression because the version " +
            "of YARN does not support it")
          appContext.setResource(capability)
      }
    } else {
      appContext.setResource(capability)
    }

    appContext
  }
  1. 提交应用上下文到ResourceManager的ClientRMService接口完成任务的提交;

ApplicationMaster启动

  1. NodeManager拉起ApplicationMaster,执行main函数,设置amfilter,确保webui只能被指定的原ip访问,否则重定向到proxyurl;
  2. 启动线程driver来运行userclass的main函数,初始化相应的SparkContext,在主线程中向ResoureManager注册am,告知RM其sparkUI的地址
 private def runDriver(securityMgr: SecurityManager): Unit = {
    //添加AmIpFilter,控制sparkui只能从特定的ip访问,否则重定向到指定的url
    addAmIpFilter()
    //执行userclass的main 函数,开始运行用户定义的代码(sparksubmit命令中--class指定的类)
    userClassThread = startUserApplication()

    // This a bit hacky, but we need to wait until the spark.driver.port property has
    // been set by the Thread executing the user class.
    val sc = waitForSparkContextInitialized()

    // If there is no SparkContext at this point, just fail the app.
    if (sc == null) {
      finish(FinalApplicationStatus.FAILED,
        ApplicationMaster.EXIT_SC_NOT_INITED,
        "Timed out waiting for SparkContext.")
    } else {
      rpcEnv = sc.env.rpcEnv
      val driverRef = runAMEndpoint(
        sc.getConf.get("spark.driver.host"),
        sc.getConf.get("spark.driver.port"),
        isClusterMode = true)
      //向ResourceManager注册启动成功的AM,告知其ui的地址,程序退出后,在去注册的时候告知其history ui address
      registerAM(rpcEnv, driverRef, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
      //等待应用线程执行结束
      userClassThread.join()
    }
  }
  1. 同时开启report线程,用于定期的向ResourceManager申请资源,主要逻辑在YarnAllocator中;同时监测executor失败的情况,当executor失败的次数超过spark.yarn.max.executor.failures指定的值时,停止AM;
//开启reporter线程监控程序的运行状态,同时定期的向RM申请资源
 private def launchReporterThread(): Thread = {
    // The number of failures in a row until Reporter thread give up
    val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)

    val t = new Thread {
      override def run() {
        var failureCount = 0
        while (!finished) {
          try {
            if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
              finish(FinalApplicationStatus.FAILED,
                ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
                s"Max number of executor failures ($maxNumExecutorFailures) reached")
            } else {
              logDebug("Sending progress")
              //触发资源的申请流程
              allocator.allocateResources()
            }
            failureCount = 0
          } catch {
            case i: InterruptedException =>
            case e: Throwable => {
              failureCount += 1
              if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
                finish(FinalApplicationStatus.FAILED,
                  ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
                    s"$failureCount time(s) from Reporter thread.")
              } else {
                logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
              }
            }
          }
          try {
            val numPendingAllocate = allocator.getPendingAllocate.size
            allocatorLock.synchronized {
              val sleepInterval =
                if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
                  val currentAllocationInterval =
                    math.min(heartbeatInterval, nextAllocationInterval)
                  nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
                  currentAllocationInterval
                } else {
                  nextAllocationInterval = initialAllocationInterval
                  heartbeatInterval
                }
              logDebug(s"Number of pending allocations is $numPendingAllocate. " +
                       s"Sleeping for $sleepInterval.")
              allocatorLock.wait(sleepInterval)
            }
          } catch {
            case e: InterruptedException =>
          }
        }
      }
    }
    // setting to daemon status, though this is usually not a good idea.
    t.setDaemon(true)
    t.setName("Reporter")
    t.start()
    logInfo(s"Started progress reporter thread with (heartbeat : $heartbeatInterval, " +
            s"initial allocation : $initialAllocationInterval) intervals")
    t
  1. 如果未开启dynamic功能,系统一开始需要申请的executor个数由spark.executor.instances参数指定;开启dynamic allocate的话,最初的资源个数由参数spark.dynamicAllocation.initialExecutors指定
    YarnAllocator
 def allocateResources(): Unit = synchronized {
    //根据需要申请的executor总数、已经成功申请和任务的位置获取需要申请的executor数目以及其地理位置信息
    updateResourceRequests()

    val progressIndicator = 0.1f
    // Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
    // 获取AmClientImpl中处于pending和release状态的请求,组装成AllocateRequest,最终调用ApplicationMasterProtocol.allocate接口向RM申请资源
    val allocateResponse = amClient.allocate(progressIndicator)

    val allocatedContainers = allocateResponse.getAllocatedContainers()

    if (allocatedContainers.size > 0) {
      logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s."
        .format(
          allocatedContainers.size,
          numExecutorsRunning,
          allocateResponse.getAvailableResources))

      handleAllocatedContainers(allocatedContainers.asScala)
    }
def updateResourceRequests(): Unit = {
    val pendingAllocate = getPendingAllocate
    val numPendingAllocate = pendingAllocate.size
    //计算出还需要申请多少container
    val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning

    if (missing > 0) {
      logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
        s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")

      // Split the pending container request into three groups: locality matched list, locality
      // unmatched list and non-locality list. Take the locality matched container request into
      // consideration of container placement, treat as allocated containers.
      // For locality unmatched and locality free container requests, cancel these container
      // requests, since required locality preference has been changed, recalculating using
      // container placement strategy.
      val (localityMatched, localityUnMatched, localityFree) = splitPendingAllocationsByLocality(
        hostToLocalTaskCounts, pendingAllocate)

      // Remove the outdated container request and recalculate the requested container number
      localityUnMatched.foreach(amClient.removeContainerRequest)
      localityFree.foreach(amClient.removeContainerRequest)
      val updatedNumContainer = missing + localityUnMatched.size + localityFree.size

     //计算出container的位置信息,比如分布到哪些host或者rack
      val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
        updatedNumContainer, numLocalityAwareTasks, hostToLocalTaskCounts,
          allocatedHostToContainersMap, localityMatched)

      for (locality <- containerLocalityPreferences) {
        val request = createContainerRequest(resource, locality.nodes, locality.racks)
        //只是简单存储到本地的全局的map中,待后续真正申请才向rm发送申请命令
        amClient.addContainerRequest(request)
        val nodes = request.getNodes
        val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.asScala.last
        logInfo(s"Container request (host: $hostStr, capability: $resource)")
      }
    } else if (missing < 0) {
      val numToCancel = math.min(numPendingAllocate, -missing)
      logInfo(s"Canceling requests for $numToCancel executor containers")

      val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
      if (!matchingRequests.isEmpty) {
        matchingRequests.iterator().next().asScala
          .take(numToCancel).foreach(amClient.removeContainerRequest)
      } else {
        logWarning("Expected to find pending requests, but found none.")
      }
    }
  }
  1. 其余运行过程中executor故障退出后,会由YarnSchedulerBackend向YarnAllocator设置需要申请的总数,触发节点的补充。

资源的弹性伸缩

spark on yarn运行用户开启弹性伸缩策略,系统将根据当前的负载来决定增加或者移除相应的executor,负载是根据正在运行以及待运行的任务数来决定需要的executor数目;详情可以参考类ExecutorAllocationManager。

  private def maxNumExecutorsNeeded(): Int = {
    val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
    //tasksPerExecutor是executor的核数
    (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
  }

动态伸缩的策略

  • 扩容策略: 当积压的task的持续时间超过一定的阈值时,开始进行executor的增加,增加的方式类似于tcp的慢启动算法,指数级增加
  • 缩容策略:当executor空闲的时间超过一定的阈值时,进行资源的释放
    参数详解
参数值 参数说明
spark.dynamicAllocation.minExecutors 最小的executor的数量
spark.dynamicAllocation.maxExecutors 最大的executor的数量
spark.dynamicAllocation.enabled 是否开启动态分配策略,前提是spark.executor.instances为0
spark.executor.instances 设置executor的个数
spark.dynamicAllocation.initialExecutors 初次申请executor个数
spark.dynamicAllocation.schedulerBacklogTimeout 开始扩容的阈值
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 超过schedulerBacklogTimeout阈值后,再次扩容的时间
spark.dynamicAllocation.executorIdleTimeout executor空闲时间,超过改值,移除掉executor
spark.dynamicAllocation.cachedExecutorIdleTimeout 有缓存block的executor的超时时间
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,670评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,928评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,926评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,238评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,112评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,138评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,545评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,232评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,496评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,596评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,369评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,226评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,600评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,906评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,185评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,516评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,721评论 2 335

推荐阅读更多精彩内容