WordCount的代码
主要是从HDFS读取文件后进行单词切割,然后进行计数,如果不懂RDD算子可以看RDD详解
WordCount的各个算子
SparkRDD的运行流程
SparkRDD宽依赖和窄依赖
SparkRDD之间的依赖主要有:
1.宽依赖
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition
总结:窄依赖我们形象的比喻为超生
2.窄依赖
窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用
总结:窄依赖我们形象的比喻为独生子女
结合WordCount的源码分析
WordCount算子内部解析
在WordCount程序中,第一个使用的Spark方法是textFile()方法,主要的源码是
这个方法的主要作用是从HDFS中读取数据, 这里创建一个HadoopRDD,在这个方法内部还创建一个MapPartitionRDD,接下里的几个 RDD同样是MapPartitionRDD,最主要的是看saveAsTextFile()方法。 下面是saveAsTextFile()方法,代码在RDD类的1272行,具体内容如下:
这个方法的主要作用是产生一个RDD,MapPartitionsRDD;然后将RDD转化为PairRDDFuctions,接下来是saveAsHadoopFile()方法: 主要的代码如下:
继续查看saveAsHadoopDataset()方法源码,主要的代码如下:
代码解析:
1.获取写入HDFS中的文件流
2.一个函数将分区数据迭代的写入到HDFS中
3.开始提交作业,Self表示Final RDD也就是作业最后的RDD在WordCount中也就是MapPartitionsRDD
这里我们将会追踪到runJob()方法中,
这里我们继续追踪到runJob()的重载方法,夏满是这个方法的核心代码:
这里是非常重要的方法,主要做的工作是调用SparkContext类中创建的dagScheduler,使用dagScheduler划分Stage,然后将Stage转化为TaskSet交给TaskScheduler在交个Executor执行
划分Stage
在前面的分析中,我们已经知道了dagScheduler调用了runJob()方法,这个方法的作用是划分stage。
这里主要是划分stage,然后调用submitJob()返回一个调度器,这里我们继续查看submitJob()方法。
上面是submitJob()方法的核心代码,主要的作用是eventProcessLoop对象内部有一个阻塞队列和线程,先将数据封装到Case Class中将事件放入到阻塞队列。
对于JobSubmitted类的模式匹配,主要的代码如下:
这里调用dagScheduler的handleJobSubmitted()方法,这个方法是对stage划分的主要方法,主要的核心代码:
通过newStage()方法,根据这个方法在这里可以看出分区的数量决定Task数量。 通过追踪newStage()方法,主要的代码如下:
这个方法是递归的划分Stage,主要的方法是getParentStages(rdd, jobId),具体的划分代码如下:
stage划分算法如下:
涉及的数据结构:栈、HashSet
1.通过最后的RDD,获取父RDD
2.将finalRDD放入栈中,然后出栈,进行for循环的找到RDD的依赖,需要注意的是RDD可能有多个依赖
3.如果RDD依赖是ShuffleDependency,那么就可以划分成为一个新的Stage,然后通过getShuffleMapStage()获取这个stage的父stage;如果是一般的窄依赖,那么将会入栈
4.通过getShuffleMapStage()递归调用,得到父stage;一直到父stage是null
5.最后返回stage的集合
stage提交算法
在对于最后一个RDD划stage后,进行提交stage,主要的方法是:
这里和划分stage的算法一样,拿到最后的stage然后找到第一个stage开始从第一个stage开始提交。
stage提交
下面的代码是submitMissingTasks(),主要是核心的代码:
这里主要做的工作是根据分区数量决定Task数量,然后根据stage的类型创建Task,这里主要有ShuffleMapTask和ResultTask。
ShuffleMapTask:进行分区局部聚合,从上游拉去数据。
ResultTask:将结果写入持久化介质.比如HDFS等。
这里将Task进行封装成为TaskSet进行提交给taskScheduler。
关于Stage划分流程图
总结
1.textFile()方法会产生两个RDD,HadoopRDD和MapPartitionRDD
2.saveTextAsFile()方法会产生一个RDD,MapPartitionRDD
3.Task数量取决于HDFS分区数量
4.Stage划分是通过最后的RDD,也就是final RDD根据依赖关系进行递归划分
5.stage提交主要是通过递归算法,根据最后一个Stage划分然后递归找到第一个stage开始从第一个stage开始提交。
喜欢小编的文章可以关注哟!