combineByKey
官方文档描述:
Generic function to combine the elements for each key using a custom set of aggregation functions.
Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C Note that V and C can be different
-- for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]).
Users provide three functions:
- `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
- `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
- `mergeCombiners`, to combine two C's into a single one.
In addition, users can control the partitioning of the output RDD,
and whether to perform map-side aggregation (if a mapper can produce multiple items with the same key).
函数原型:
def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C]
def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C], numPartitions: Int): JavaPairRDD[K, C]
def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C], partitioner: Partitioner): JavaPairRDD[K, C]
def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C], partitioner: Partitioner,
mapSideCombine: Boolean,serializer: Serializer): JavaPairRDD[K, C]
**
该函数是用于将RDD[k,v]转化为RDD[k,c],其中类型v和类型c可以相同也可以不同。
其中的参数如下:
**
- createCombiner:该函数用于将输入参数RDD[k,v]的类型V转化为输出参数RDD[k,c]中类型C;
- mergeValue:合并函数,用于将输入中的类型C的值和类型V的值进行合并,得到类型C,输入参数是(C,V),输出是C;
- mergeCombiners:合并函数,用于将两个类型C的值合并成一个类型C,输入参数是(C,C),输出是C;
- numPartitions:默认HashPartitioner中partition的个数;
- partitioner:分区函数,默认是HashPartitionner;
- mapSideCombine:该函数用于判断是否需要在map进行combine操作,类似于MapReduce中的combine,默认是 true。
源码分析:
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
**
从源码中可以看出,combineByKey()的实现是一边进行aggregate,一边进行compute() 的基础操作。假设一组具有相同 K 的 <K, V> records 正在一个个流向 combineByKey(),createCombiner 将第一个 record 的 value 初始化为 c (比如,c = value),然后从第二个 record 开始,来一个 record 就使用 mergeValue(c, record.value) 来更新 c,比如想要对这些 records 的所有 values 做 sum,那么使用 c = c + record.value。等到 records 全部被 mergeValue(),得到结果 c。假设还有一组 records(key 与前面那组的 key 均相同)一个个到来,combineByKey() 使用前面的方法不断计算得到 c'。现在如果要求这两组 records 总的 combineByKey() 后的结果,那么可以使用 final c = mergeCombiners(c, c') 来计算;然后依据partitioner进行不同分区合并。
**
实例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
//转化为pairRDD
JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
return new Tuple2<Integer, Integer>(integer,1);
}
});
JavaPairRDD<Integer,String> combineByKeyRDD = javaPairRDD.combineByKey(new Function<Integer, String>() {
@Override
public String call(Integer v1) throws Exception {
return v1 + " :createCombiner: ";
}
}, new Function2<String, Integer, String>() {
@Override
public String call(String v1, Integer v2) throws Exception {
return v1 + " :mergeValue: " + v2;
}
}, new Function2<String, String, String>() {
@Override
public String call(String v1, String v2) throws Exception {
return v1 + " :mergeCombiners: " + v2;
}
});
System.out.println("result~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + combineByKeyRDD.collect());
groupByKey
官方文档描述:
Group the values for each key in the RDD into a single sequence.
Allows controlling the partitioning of the resulting key-value pair RDD by passing a Partitioner.
The ordering of elements within each group is not guaranteed,
and may even differ each time the resulting RDD is evaluated.
Note: This operation may be very expensive.
If you are grouping in order to perform an aggregation (such as a sum or average) over each key,
using [[PairRDDFunctions.aggregateByKey]] or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any key in memory.
If a key has too many values, it can result in an [[OutOfMemoryError]].
函数原型:
def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]]
def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]]
源码分析:
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKey[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
**
从源码中可以看出groupByKey()是基于combineByKey()实现的, 只是将 Key 相同的 records 聚合在一起,一个简单的 shuffle 过程就可以完成。ShuffledRDD 中的 compute() 只负责将属于每个 partition 的数据 fetch 过来,之后使用 mapPartitions() 操作进行 aggregate,生成 MapPartitionsRDD,到这里 groupByKey() 已经结束。最后为了统一返回值接口,将 value 中的 ArrayBuffer[] 数据结构抽象化成 Iterable[]。groupByKey() 没有在 map 端进行 combine(mapSideCombine = false),这样设计是因为map 端 combine 只会省掉 partition 里面重复 key 占用的空间;但是,当重复 key 特别多时,可以考虑开启 combine。
**
实例:
List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
//转为k,v格式
JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
return new Tuple2<Integer, Integer>(integer,1);
}
});
JavaPairRDD<Integer,Iterable<Integer>> groupByKeyRDD = javaPairRDD.groupByKey(2);
System.out.println(groupByKeyRDD.collect());
//自定义partition
JavaPairRDD<Integer,Iterable<Integer>> groupByKeyRDD3 = javaPairRDD.groupByKey(new Partitioner() {
//partition各数
@Override
public int numPartitions() { return 10; }
//partition方式
@Override
public int getPartition(Object o) {
return (o.toString()).hashCode()%numPartitions();
}
});
System.out.println(groupByKeyRDD3.collect());