Spark Dag Scheduler学习记录

简介

spark作为实时及离线合为一体的大数据计算框架,由于其出色的批处理能力及高效的流计算能力,因此在大数据中依旧有着不可撼动的地位。这篇文章来记录一下spark Job作业是如何被调度起来的。

RDD

众所周知RDD是spark的基础,是Spark数据管理单元的高级抽象,RDD之间的流转变化形成了了我们的数据计算模式及过程,首先来看一下RDD的组成:


RDD composite

一个RDD主要由四大部分组成:

  1. dependencies:当前RDD所依赖的上一个RDD,是RDD计算流转的关键,从一个RDD transform到另一个RDD,也就有了依赖关系,任何一个RDD可以通过其dependencies及compute算子计算得出。 (计算)
  2. computing:算子,用于描述从一个或多个RDD到另外一个RDD的计算过程。(计算
  3. partition(Data):一个RDD被分成多个partition(由partitioner决定),每个partition存储着一个RDD的实际数据,而这些partition数据分散在不同的executor,这些数据既可以存储在内存中,也可以存储在磁盘中。(存储
  4. checkpoint:在大数据框架中最重要的特性就是容错性,大数据节点中的executor会因各种原因down掉,所以就必须有一个好的机制处理这种问题,而checkpoint就是用于提高容错性的关键所在,一个Executor down机后,会优先选择从checkpoint恢复,如果没有checkpoint则开始从dependencies递归重算。(容错
images.png

有了Dependencies及Computing这两个概念之后,RDD之间的数据DAG流转有了经济基础,但是想要基于此要搞出一些新花样,仅仅有经济基础是不够的,还得要有丰富的上层建筑,所以Spark又提出了Transformation及Action两个上层建筑的概念,伴随着的是stage及Job两种模式。如果让用户直接去操作Dependencies的话未免有点太过于复杂,所以Spark内置了很多通用的基础算子,诸如map, flatMap, reduce, aggegrate等等,这些算子函数会生成对应的RDD之类,其主要定义了dependencies及computing算子;为了更好的描述spark作业的实际运行顺序,又将这些算子分为两大类(Narrow Transformation & Wide Transformation),其中Narrow都是一对一依赖,不需要shuffle,两个RDD中的partition数是一样的,对应的映射关系也是固定不变的,RDD的计算能在同一个executor中运算,因此他们都属于同一个stage;而对于Wide Transformation,由于一个RDD会依赖多个RDD,所以不能再同一个executor中通过计算dependencies得到当前RDD,所以需要产生一个新的stage。有了这些算子之后,我们就有了一张完整的数据计算DAG图,但是Spark是懒加载的,虽然通过这些Transformation算子描绘好了一幅DAG图,但是并不会立刻执行,而需要一个Action操作来触发整个DAG的执行。
我们以rdd.foreach执行代码流程来简单分析。

  // RDD.scala
  def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
  }
  // SparkContext.scala
  def runJob[T, U: ClassTag](
      rdd: RDD[T], // result rdd
      func: (TaskContext, Iterator[T]) => U, // 对RDD各个partition中的数据一次执行该函数
      partitions: Seq[Int], // 哪些partition需要计算
      resultHandler: (Int, U) => Unit): Unit = { // 获取到最终结果后调用该handler (partition, result)
     // ... some checks
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) // 启动作业,会post 一个 JobSubmitted Event
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint() // 生成checkpoint
  }

  // DagScheduler.scala
  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    ...
  }
  private[scheduler] def handleJobSubmitted(...) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
         ... // Exception handle
    }
    ... // some logs and status update
    submitStage(finalStage)
  }
  /**
   * Create a ResultStage associated with the provided jobId.
   */
  private def createResultStage(...): ResultStage = {
    checkBarrierStageWithDynamicAllocation(rdd)
    checkBarrierStageWithNumSlots(rdd)
    checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
    val parents = getOrCreateParentStages(rdd, jobId) // DFS生成所有子stage,并返回当前result stage依赖的所有shuffle dependencies。
    val id = nextStageId.getAndIncrement() // 生成stage id
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite) // 生成stage
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

  private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
  }

  private[scheduler] def getShuffleDependencies( // 通过BFS的方式找到Wide Transformation的源头dependencies
      rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
    val parents = new HashSet[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new ArrayStack[RDD[_]]
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.pop()
      if (!visited(toVisit)) {
        visited += toVisit
        toVisit.dependencies.foreach {
          case shuffleDep: ShuffleDependency[_, _, _] =>
            parents += shuffleDep
          case dependency =>
            waitingForVisit.push(dependency.rdd)
        }
      }
    }
    parents
  }
  // getOrCreateShuffleMapStage => createShuffleMapStage => getOrCreateShuffleMapStage ... // DFS生成所有Stage

  /** Submits stage, but first recursively submits any missing parents. */
  // 会跑起来第一个没有父依赖的stage,然后通过 completion event来执行后续的stage
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get)  // 没有missing parent 该stage可以跑
        } else {
          for (parent <- missing) { // 依赖的父stage由于节点失败而missing,需要重跑
            submitStage(parent)
          }
          waitingStages += stage // 将stage 加入waitingStages
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

  // 当event handler收到task结束的event时,从waitingStages获取一个stage执行

如此一个spark job就被执行起来了

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容