上一节我们是不是讲到,Driver,Application注册到Master上面后,Master中调用scheduler()进行资源调度,在这个里面通过LaunchDriver(),LaunchExecutor(),向Worker发出启动Driver,Exeutor的请求(或者说命令),
Worker接收到发来的请求,通过创建DriverRunner,ExecutorRunner线程来启动我们的Driver与Application.在启动完成后,根据第一章节我们分析的Spark核心原理,Executor会反向注册到Driver上,这样Driver就清楚哪些Executor在执行Application,实际上到了这个时候,我们的SparkContext已经全部初使化完成。
接下来我们就要继续执行我们自己编写的代码程序,其实一个Application包括多个JOB,那么JOB是如何划分的呢? 实际上一个Action操作,会划分一个JOB,就是说多个Action操作就会有多个JOB,JOB执行的顺序是从第一个开始。
下面我们以wordcount实例来详细分析一下JOB的划分:
val lines = sc.textFile(...)
val words = lines.flatMap(line => line.split(" "))
val pairs = words.map(word => (word, 1))
val counts = pairs.reduceByKey(_ + _)
counts.foreach(count => println(count._1 + ": " + count._2))
看到上面的几行代码,大家是不是太熟悉了,这就是我们学习spark的第一个例子。
首先我们先来分析第一行代码:
val lines = sc.textFile()
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
上面的代码是不是很熟悉,
- 首先,textFile会调用一个hadoopFile(...)的方法,我们看里面的参数,TextInputFormat这是不是很熟悉,这是hadoop里读取文本文件的解析器啊
- 后面的classOf[LongWritable], classOf[Text]这就是hadoop
map()函数的k1,v1吧
这可都是hadoop里的东东。下面我们看hadoopFile方法的代码
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)] = {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
//这个是不是我们把hadoop配置数据序列化后在广播出去,共享广播这个我们在之前讲过,不会忘记吧!
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
不难看出,这个方法返回了一个HadoopRDD算子
接着我们看后面调用了map(...),map方法具体我们这里就不分析,以前的章节我们分析过了,
.map(pair => pair._2.toString).setName(path)
其中的pair是不是一个tuple,也就是我们Hadoop的k1,v1,那么pair._2.toStrig,是不是我们读取文件的每一行内容。
接下来我们继续执行代码:
val words = lines.flatMap(line => line.split(" "))
val pairs = words.map(word => (word, 1))
这两行与上面分析map的相似,我们就不重复了。接着我们执行
val counts = pairs.reduceByKey(_ + _)
这行代码我们惊奇的发现在RDD类里没有reduceByKey方法,这是为什么了?没有方法如何调用。
大家是不是想到scala中的隐式转换,在RDD类中找查到了隐式转换方法rddToPairRDDFunctions(),把rdd转换成了rddToPairRDDFunctions,返回这个对象,如下面代码
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}
果然在PairRDDFunction类中找到这个方法:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}`
接下来我们执行最后一行代码,
counts.foreach(count => println(count._1 + ": " + count._2))
当我们看foreach时,明白这就是一个action算子。它调用了sparkContext的runJob方法,来划分一个JOB
def foreach(f: T => Unit) {
val cleanF = sc.clean(f)
//执行SparkContext中的runJob
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
经过多个runJob()重载的调用,最后我们找到最终的调用方法:
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
if (stopped) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
//调用sparkContext之前初使化 时创建的DAGScheduler的runJob方法
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
看到这里大家 是不是明白了,最终其实就是调用了sparkContext之前初使化 时创建的DAGScheduler的runJob方法,里面的第一个参数rdd是不是以前的rdd算子。
这里也说明了JOB的划分都是在DAGScheduler里完成的。接下来我们会在在下一章节会对DAGScheduler如何划分JOB进行深入的分析.
本章中每一个字(包括源码注解)都是作者敲出来的,你感觉有用,帮点击'喜欢'