[第九章]Job触发流程原理剖析

上一节我们是不是讲到,Driver,Application注册到Master上面后,Master中调用scheduler()进行资源调度,在这个里面通过LaunchDriver(),LaunchExecutor(),向Worker发出启动Driver,Exeutor的请求(或者说命令),

Worker接收到发来的请求,通过创建DriverRunner,ExecutorRunner线程来启动我们的Driver与Application.在启动完成后,根据第一章节我们分析的Spark核心原理,Executor会反向注册到Driver上,这样Driver就清楚哪些Executor在执行Application,实际上到了这个时候,我们的SparkContext已经全部初使化完成。

接下来我们就要继续执行我们自己编写的代码程序,其实一个Application包括多个JOB,那么JOB是如何划分的呢? 实际上一个Action操作,会划分一个JOB,就是说多个Action操作就会有多个JOB,JOB执行的顺序是从第一个开始。
下面我们以wordcount实例来详细分析一下JOB的划分:

val lines = sc.textFile(...)

val words = lines.flatMap(line => line.split(" "))

val pairs = words.map(word => (word, 1))

val counts = pairs.reduceByKey(_ + _)

counts.foreach(count => println(count._1 + ": " + count._2))

看到上面的几行代码,大家是不是太熟悉了,这就是我们学习spark的第一个例子。
首先我们先来分析第一行代码:
val lines = sc.textFile()

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

上面的代码是不是很熟悉,

  • 首先,textFile会调用一个hadoopFile(...)的方法,我们看里面的参数,TextInputFormat这是不是很熟悉,这是hadoop里读取文本文件的解析器啊
  • 后面的classOf[LongWritable], classOf[Text]这就是hadoop
    map()函数的k1,v1吧
    这可都是hadoop里的东东。下面我们看hadoopFile方法的代码
 def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions
      ): RDD[(K, V)] = {
    assertNotStopped()
    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
    //这个是不是我们把hadoop配置数据序列化后在广播出去,共享广播这个我们在之前讲过,不会忘记吧!
    val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
  }

不难看出,这个方法返回了一个HadoopRDD算子
接着我们看后面调用了map(...),map方法具体我们这里就不分析,以前的章节我们分析过了,
.map(pair => pair._2.toString).setName(path)
其中的pair是不是一个tuple,也就是我们Hadoop的k1,v1,那么pair._2.toStrig,是不是我们读取文件的每一行内容。

接下来我们继续执行代码:

val words = lines.flatMap(line => line.split(" "))
val pairs = words.map(word => (word, 1))

这两行与上面分析map的相似,我们就不重复了。接着我们执行

val counts = pairs.reduceByKey(_ + _)

这行代码我们惊奇的发现在RDD类里没有reduceByKey方法,这是为什么了?没有方法如何调用。
大家是不是想到scala中的隐式转换,在RDD类中找查到了隐式转换方法rddToPairRDDFunctions(),把rdd转换成了rddToPairRDDFunctions,返回这个对象,如下面代码

implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
    new PairRDDFunctions(rdd)
  }

果然在PairRDDFunction类中找到这个方法:

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
   combineByKey[V]((v: V) => v, func, func, partitioner)
 }`

接下来我们执行最后一行代码,

counts.foreach(count => println(count._1 + ": " + count._2))

当我们看foreach时,明白这就是一个action算子。它调用了sparkContext的runJob方法,来划分一个JOB

def foreach(f: T => Unit) {
    val cleanF = sc.clean(f)
    //执行SparkContext中的runJob
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }

经过多个runJob()重载的调用,最后我们找到最终的调用方法:

 def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit) {
    if (stopped) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    //调用sparkContext之前初使化 时创建的DAGScheduler的runJob方法
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
      resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()

看到这里大家 是不是明白了,最终其实就是调用了sparkContext之前初使化 时创建的DAGScheduler的runJob方法,里面的第一个参数rdd是不是以前的rdd算子。
这里也说明了JOB的划分都是在DAGScheduler里完成的。接下来我们会在在下一章节会对DAGScheduler如何划分JOB进行深入的分析.

本章中每一个字(包括源码注解)都是作者敲出来的,你感觉有用,帮点击'喜欢'

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容