一、基本概念
Spark 中最基本的数据抽象是 RDD,弹性分布式数据集 (Resilient Distributed DataSet)。
1.1 三大特性
这三个特性分别为:分区,不可变,并行操作。
RDD 的分区及分区与工作结点(Worker Node)的分布关系:
1.1.1 分区
每一个 RDD 包含的数据被存储在系统的不同节点上。逻辑上我们可以将 RDD 理解成一个大的数组,数组中的每个元素就代表一个分区 (Partition) 。
在物理存储中,每个分区指向一个存储在内存或者硬盘中的数据块 (Block) ,其实这个数据块就是每个 task 计算出的数据块,它们可以分布在不同的节点上。
所以,RDD 只是抽象意义的数据集合,分区内部并不会存储具体的数据,只会存储它在该 RDD 中的 index,通过该 RDD 的 ID 和分区的 index 可以唯一确定对应数据块的编号,然后通过底层存储层的接口提取到数据进行处理。
在集群中,各个节点上的数据块会尽可能的存储在内存中,只有当内存没有空间时才会放入硬盘存储,这样可以最大化的减少硬盘 IO 的开销。
1.1.2 不可变
不可变性是指每个 RDD 都是只读的,它所包含的分区信息是不可变的。由于已有的 RDD 是不可变的,所以我们只有对现有的 RDD 进行转化 (Transformation) 操作,才能得到新的 RDD ,一步一步的计算出我们想要的结果。
这样会带来这样的好处:我们在 RDD 的计算过程中,不需要立刻去存储计算出的数据本身,我们只要记录每个 RDD 是经过哪些转化操作得来的,即:依赖关系,这样一方面可以提高计算效率,一方面是错误恢复会更加容易。如果在计算过程中,第 N 步输出的 RDD 的节点发生故障,数据丢失,那么可以根据依赖关系从第 N-1 步去重新计算出该 RDD,这也是 RDD 叫做"弹性"分布式数据集的一个原因。
1.1.3 并行操作
因为 RDD 的分区特性,所以其天然支持并行处理的特性。即不同节点上的数据可以分别被处理,然后生成一个新的 RDD。
1.2 存储级别
1.3 血缘关系(Lineage )
RDD 的最重要的特性之一就是血缘关系(Lineage ),它描述了一个 RDD 是如何从父 RDD 计算得来的。如果某个 RDD 丢失了,则可以根据血缘关系,从父 RDD 计算得来。系统从输入中逻辑上生成了 A 和 C 两个 RDD, 经过一系列转换操作,逻辑上生成了 F 这个 RDD。Spark 记录了 RDD 之间的生成和依赖关系。当 F 进行行动操作时,Spark 才会根据 RDD 的依赖关系生成 DAG,并从起点开始真正的计算。
上述一系列处理称为一个血缘关系(Lineage),即 DAG 拓扑排序的结果。在血缘关系中,下一代的 RDD 依赖于上一代的 RDD。例如,在图 2 中,B 依赖于 A,D 依赖于 C,而 E 依赖于 B 和 D。
根据不同的转换操作,RDD 血缘关系的依赖分为窄依赖和宽依赖。窄依赖是指父 RDD 的每个分区都只被子 RDD 的一个分区所使用。宽依赖是指父 RDD 的每个分区都被多个子 RDD 的分区所依赖。map、filter、union 等操作是窄依赖,而 groupByKey、reduceByKey 等操作是宽依赖。
所以可得出一个结论,窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖,也就是说,对父 RDD 依赖的 Partition 不会随着 RDD 数据规模的改变而改变。
1. 窄依赖
1)子 RDD 的每个分区依赖于常数个父分区(即与数据规模无关)。
2)输入输出一对一的算子,且结果 RDD 的分区结构不变,如 map、flatMap。
3)输入输出一对一的算子,但结果 RDD 的分区结构发生了变化,如 union。
4)从输入中选择部分元素的算子,如 filter、distinct、subtract、sample。
2. 宽依赖
1)子 RDD 的每个分区依赖于所有父 RDD 分区。
2)对单个 RDD 基于 Key 进行重组和 reduce,如 groupByKey、reduceByKey。
3)对两个 RDD 基于 Key 进行 join 和重组,如 join。
Spark 的这种依赖关系设计,使其具有了天生的容错性,大大加快了 Spark 的执行速度。RDD 通过血缘关系记住了它是如何从其他 RDD 中演变过来的。当这个 RDD 的部分分区数据丢失时,它可以通过血缘关系获取足够的信息来重新运算和恢复丢失的数据分区,从而带来性能的提升。
相对而言,窄依赖的失败恢复更为高效,它只需要根据父 RDD 分区重新计算丢失的分区即可,而不需要重新计算父 RDD 的所有分区。而对于宽依赖来讲,单个结点失效,即使只是 RDD 的一个分区失效,也需要重新计算父 RDD 的所有分区,开销较大。
宽依赖操作就像是将父 RDD 中所有分区的记录进行了“洗牌”,数据被打散,然后在子 RDD 中进行重组。
二、Transformation算子
以pyspark为例,已创建一个SparkContext对象。
from pysparkimport SparkContext, SparkConf
import os
os.environ["PYSPARK_PYTHON"]="/usr/local/bin/python3.7"
conf = SparkConf().setMaster("local[1]").setAppName("spark.mapdemo")
sc = SparkContext(conf=conf)
2.1 map
2.2 flatMap
2.3 filter
2.4 groupByKey
2.5 reduceByKey
2.6 reduce
2.7 fold
2.8 sortByKey
2.9 union
2.10 distinct
2.11 sample
2.12 intersection
2.13 join
2.14 leftOutjoin
2.15 rightOutjoin
2.16 fullOutjoin
2.17 subtract
2.18 cogroup
2.19 cartesian
三、Action算子
在transformation中有用到