-
- RDD的创建和保存
- 1.1 textFile
从HDFS中读取一个文本文件 - 1.2 makeRDD、parallelize
都会创建一个新的ParallelCollectionRDD对象。如果makeRDD中的数据是Seq[T]结构,就会调用parallelize方法,不过makeRDD还可以将Seq[(T, Seq[String])]结构的数据转变成RDD。
- 1.3 saveAsTextFile
将RDD保存为一个压缩文本文件。
-
- 转换算子
- 2.1 不会shuffle的
- 2.1.1 map、flatMap、mapPartitions
new一个新的MapPartitionsRDD对象,并调用Scala相应集合类的map或flatMap方法。
- 2.1.2 mapValues、flatMapValues
mapValues和flatMapValues是在PairRDDFunctions类中定义的,而map和flatMap是在RDD类中定义的,说明mapValues和flatMapValues使用场景相对更小一点,其处理的RDD形式必须是键值对。从源码来看,mapValues和flatMapValues只对键值对中的值进行处理,而且一定保证分区不变。
-
2.1.3 coalesce
对RDD进行合并分区。从源码解释上来看,coalesce是窄依赖关系,不需要shuffle(参数shuffle默认是false)。但源码中也提到,如果合并后分区数很小,很可能导致计算在较少的节点上进行,此时可以将shuffle设置为true。
接着看源码,可见coalesce具体进行过程受到shuffle参数的影响。当shuffle为false时,coalesce会创建新的CoalescedRDD类,CoalescedRDD类要求参数numPartitions大于0。coalesce主要是将父RDD合并成更少的分区,如果numPartitions大于原RDD分区数,那么新RDD分区数还是和原RDD分区数一样;如果numPartitions小于原RDD分区数,就会对partitions进行分组(group),具体实现可见PartitionCoalescer类及其throwBalls方法。
- 2.1.1 map、flatMap、mapPartitions
- 2.2 会shuffle的
- 2.2.1 intersection
intersection的实现主要是利用了cogroup方法。
不过这里还有一点有些疑惑,那就是构建CogroupRDD时是如何确定结果RDD的分区数?源码中与partition相关的代码如下,但是我不太理解,。
- 2.2.2 subtract
subtract的实现是利用了SubtractedRDD类,调用subtractByKey方法,得到新的SubtractedRDD对象。subtract并没有用cogroup算子,而是为之新创造了一个类SubtractedRDD。原因是cogroup会将两个原RDD的key全部保留在内存中,但是SubtractedRDD中只会保留RDD_A的key,RDD_B的key会流式传过来。当RDD_A小于RDD_B时该优化可以让系统使用RDD_A的分区,而不用担心RDD_B的大小导致内存不足。源码中解释如下:
- 2.2.3 join、leftOuterJoin、fullOuterJoin
join类型算子都是基于cogroup算子计算的。前面已经写了,cogroup算子会将两个RDD相同的key和相应value组合在一起,最后的结果形式是(key, (iterable(v1), interable(v2)))。最后用嵌套for循环组合(iterable(v1)和(iterable(v2)就可以得到join结果。不同的join实现的不同点在于两个iterable组合时对空iterable的处理方式。 -
2.2.4 groupByKey
将RDD中相同key的元素的value全部放到一个sequence中。这个操作是很耗费资源的,因为聚集过程中并没有value的聚集,仅仅是收集起来放到一个sequence中。源码中还提到groupByKey会将所有键值对都保存在内存中,所以可能会导致OOM。
- 2.2.5 reduceByKey、aggregateByKey
如果是要对RDD中元素按key分组后对values进行聚合,那么就可以将groupByKey换成reduceByKey或aggregateByKey。
reduceByKey算子和aggregateByKey算子都是基于combineByKeyWithClassTag实现的
补充:aggregateByKey还有个优势在于combOp的输入参数类型不需要一致,也就是说,使用reduceByKey的话,map聚合后的value得和没聚合前的value类型一样,但是aggregateByKey就没这样的要求。比如,如果要同时计算用户的最大收入和最小收入,用aggregateByKey的话就可以直接对<用户, 收入>进行操作,最后得到<用户, <最大收入, 最小收入>>,但是用reduceByKey就得先将原数据形式改为<用户, <收入, 收入>>,得保证和最终想得到的格式<用户, <最大收入, 最小收入>>一样。
至于combineByKeyWithClassTag,我个人觉得就是对MapReduce中的combine函数的实现。
- 2.2.1 intersection
-
- 行动算子
-
3.1 collect
collect会调用sc.runJob将数据全部拉取到driver端存在一个Array中。
- 3.2 count、countByKey、countByValue
count调用sc.runJob将数据拉回driver端并求和
countByKey会将原PairRDD的值变成1L,然后利用reduceByKey和collect得到结果Map
countByValue会将原RDD变成(value, null),然后基于countByKey得到结果Map
-
3.3 foreach、foreachPartition
先贴上源码:
从源码中可以看出,二者最大的区别在于对传入方法f使用。前者是方法f作用于单个RDD元素,后者是方法f作用于整个RDD分区
- 3.4 first、take、top
take(num)是取回RDD的前num个元素,工作方式是先扫描一个工作分区,然后根据该分区返回结果评估是否还需要扫描其他分区以满足num数量。注意,如果RDD是"Nothing"或"Null",则会返回异常。
-
3.5 sortBy
调用sortByKey方法,创建一个新的排好序的ShuffledRDD。这里根据名字猜测一下,既然新的RDD是ShuffledRDD,说明该RDD的创建应该是进行了shuffle,所以最后的结果应该是全局排序的。
常用Spark算子总结
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- spark性能优化:数据倾斜调优 - LW_ICE - 博客频道 - CSDN.NEThttp://blog.cs...