先只做一个静态的分析,即上图最左面部份,动态调度执行稍后分析
概念理解
做为新手,RDD看的我头痛,Resilient Distributed Dataset, 弹性分布式数据集,有哪些特点呢?
1. 首先他是 Dataset, 俗称数据集。可以类比 Redis 里的 ZSET, HSET, SET, 保存数据的一种组织结构而已。区别就在于 RDD 是分布式,粗粒度
2. 关于分布式,大家所熟知的就是将数据分片。类比 MySQL 分库分表,可以有按 ID 做 Range 分怎,也可以按 Hash。RDD 也同样,具体取决于 partitioner 如何实现。
3. 具有 fault tolerance 特性,做个对比,关系数据库一般都是对分区 partition 做多副本来做到容灾和高可用。但是RDD 完全另外一个思路,他有一个 lineage(血统??) 的概念,每一个 partition 都可以回溯来重建。
4. 由于 RDD 只读,每一个 RDD 都由父 RDD 和做用之上的操作生成,父子 partition 涉及一一对应(窄依赖)和一对多(宽依赖),而这种依赖关系就是构成 fault tolerance 回溯的基础。
上面只是对于 RDD 静态的分析,至于动态的调度执行和 stage 划分下次再分享。
RDD 五要素
1. A list of partitions 数据分区
2. A function for computing each split 每个片的处理函数
3. A list of dependencies on other RDDs 依赖
4. Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 分区函数很重要,对于join group 优化
5. Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
例子: 行数统计
来看一个官网 quick start 例子,打开 spark-shell
scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at:27
textFile 是一个 MapPartitionsRDD, 他没有 Dependency, 并且也不会读取数据,不做任何操作
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at:29
linesWithSpark 同样也是一个 MapPartitionsRDD, 他有 Dependency, 就是上文的 textFile, 并且附加一个 filter 操作,返回包含 “Spark” 的记录
scala> linesWithSpark.count
res0: Long = 17
最后执行 count 函数,得到记录 17
transformation 和 action
RDD 只读,所以每次 filter, map, flatMap 等操作都是生成一个新的 RDD, 多个 RDD 为链式关系,由 Denpendency 和 compute 联系在一起。下图是主要的两类函数。
transformation: 惰性的操作,只生成新的 RDD, 描述执行逻辑
action: 会提交 job, 交给 worker 去执行,流式依次计算所有 transformation 操作,流式的精髓在于 Iterator
对应文中例子,textFile和filter 都是 transformation 操作,只负责生成 RDD, 只有最后一个 count 才发起 job 执行。
filter 函数简单明了,直接生成一个新的 MapPartitionsRDD, 重点在于两个参数 this 和 iter 操作的闭包
MapPartitionsRDD 要重写几个关键成员函数,partitioner 决定如何做分区,getPartitions 决定如何从父 RDD 中获取自已使用的 partition,这块是简单的 one to one依赖,即窄依赖,后续会涉及到 shuffle 宽依赖再分析。compute 封装传弟的 f 闭包,直接作用于 partition, 这里都是针对 Iterator, 不到最后触发是不会执行。
count不具有代表现,贴一个 reduce 源码, 最常见的就是 reduce(_+_),如果大家了解单机的,肯定知道原理,对于分布式的RDD也一样。 jobResult 是一个 Option[T] 结果,mergeResult 决定如何对各个分区的结果进行操作,就是调用 reduce(f: (T,T) => T): T 里面的闭包f,对于各分区也执行 f。
RDD 的初探,DAGSchdule 后文分析,如有理解有误的请大家指证