先上代码:
以standalone模式
任务提交脚本:
spark-submit \
--master spark://node01:7077,node02:7077 \
--class com.leox.wordcount \
--executor-memory 1g\
--total-executor-cores 4 \
XXXX.jar \
/test.txt /out
object wordcount{
def main(args:Array[String]):Unit{
//构建sparkcontext
val sparkConf = new SparkConf().setAppName("wordcount")
val sc = new SparkContext(sparkConf)
//数据处理
val textfile = sc.TextFile(args(0))
val words = textfile.flatmap(X=>X.split(",")).map(X=>(X,1)).reduceByKey((A,B)=>A+B)
//保存到hdfs
words.saveAsTextFile(args(1))
}
}
执行流程:standalone模式
如图所示
yarn模式:
Stage的划分
如图所示
val textfile = sc.TextFile(args(0))这句语句,sparkcore内部其实生成两个RDD,类似MR的读取模式(K,V)其中K是数据偏移量,之后的两个Transformation算子,实际上都是在各自节点的同一个task内完成的。
由于图片太长只能分割分析
遇到reduceByKey宽依赖的时候,需要执行shuffle操作,生成的RDD的task数改变(粗粒度改变)也是Stage划分的标志
总体图
总结:从尾巴往前,当遇到宽依赖(shuffle操作),划分stage。
在同一个stage中,不发生task的网络传输,里面的操作只需要在当前节点的当前task(线程)就能完成。
注意:一个application里会有多个job,一个action算子触发一个job操作,一个application有多个stage。