一直好奇这个所谓的“分布式、弹性、数据集”的理念对应到代码层面,是怎么落地的?比如它里面都有什么?如何实现惰性求值?如何实现分布式等等。
最近终~于~开始扒Spark的代码了,有一些感悟和猜想,记录在这里。
RDD不是集合类
这是这篇博客的核心点:要理解RDD,不应该用Java/Scala的Collection的方式去类比,而应该是以DSL或者Fluent API的方式去类比。
虽然名字里面带了“数据集”几个字,但实际上在.filter()
、.map()
这些动作的时候,甚至连存储的元素都还没有加载到内存中取。
而如果把它看成一个DSL的话,你在调用这些方法的时候,就是在进行声明式编程,告诉Spark你要做什么而不是怎么做,然后由Spark把这个DSL翻译成可执行的Scala代码。比如下面这个样子:
/**
* RDD提供的API
*/
1. lines = ... // 从分布式文件系统读取
2. count = lines.filter(...).count() // 过滤并统计行数
/**
* 对应Scala的版本
*/
3. lines : List[String] = ... // 从分布式文件系统读取
4. count = lines.filter(...).size() // 过滤并统计行数
当然,上面只是提了Executor部分的逻辑,实际上还要包括Driver汇总的逻辑才是完整的。所以说RDD就是提供了这样的一个编程的抽象:自动的拆分任务、合并任务结果。
惰性求值
所以RDD为什么是惰性求值(Lazy Evaluation)。我的理解是,为了能生成底层可执行的Scala代码,因此只有当触发action动作的时候,才动手构造出这样一个执行链、才把这些逻辑打包放在task里面分发给各个Executor去执行。
直白点说,就是在action的时候,去构造了一个Runable。如果不这么做,反而更难实现。
为了能实现惰性求值,需要几个要素。比如我要知道“完整”的调用链是什么样子的。这里面包括了2个部分,一个是调用链什么时候结束的,也就是action动作的发生,另一个是结束之前的调用链都有哪些动作,也就是每个RDD都要记住自己的血缘关系。
说道血缘关系,为什么说DAG会从action开始,倒着一直找到最初的那个RDD?因为它要找到所有“有关系”的RDD才能把filter啊、map啊之类的算子里面的函数给扒拉出来,然后重新转成Scala的集合类能接受的形式啊。
又因为是倒着找的,所以如果不做特殊处理(比如persist)下面这两个DAG,C <- B <- A,D <- B <- A,其中A和B这俩RDD肯定是不能复用的。
至此我感觉我明白了“为什么说你理解了什么是Scala的集合操作,基本就理解了什么是Spark”这句话。
分区是什么
一开始把RDD作为集合的时候,觉得分区是个特别抽象的概念(尤其是repartition)。实际上RDD的分区只是一个数组,最终Job会被拆分成一个个Task,而Task的数量和分区数有关。
persist是什么
未完待续...