1. 前言
有时候需要按照key去做reduce操作时,一般情况下调用reduceByKey就可以完成按照key reduce的任务,reduceByKey的调用就必然意味着shuffle操作。但是有的时候如果我们已经知道相同的key都在同一个partition里面了,这个时候其实没有必要去使用reduceByKey通过一次shuffle将相同的key收集到同一个reducer分区里面,而是可以直接在map端就去完成reduce操作。
比如下面是一个word count在2个分区里面的分布:
------partition 1----------
(failure,1)
(count,1)
(thief,1)
(failure,1)
------partition 2--------
(fortification,1)
(peek,1)
(lepta,1)
(peek,1)
由于相同的word都在同一个分区里面了,没必要去通过reduceByKey去完成word count操作。
2. 解决方法
实现一个RDD,在其compute方法里完成按key聚合,实现如下:
/**
K: key type
V: 上游rdd中value的type
C: V 经过reduce之后的type
参考ShufferedRDD
*/
class MapsideReduceRDD[K:ClassTag, V:ClassTag, C:ClassTag](
// 上游rdd,要求上游的rdd中数据已经转换成(key,value)的形式
@transient var prev : RDD[_ <: Product2[K,V]]
) extends RDD[(K,C)](prev){
// 需要一个aggregator去完成value的聚合, reduceByKey也会创建这个
private var aggregator : Option[Aggregator[K,V,C]] = None
def setAggregator(aggregator: Aggregator[K,V,C]):this.type ={
this.aggregator = Option(aggregator)
this
}
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
/* 创建一个ExternalAppendOnlyMap,这个数据结构是spark中提供的,插入
. (K,V)的数据,然后按照传递给他聚合方法完成(K,V)的聚合,返回(K,C)的数据
*/
val externalMap = createExternalMap
// 这里迭代上游rdd中(K,V)类型的记录
val rddIter = dependencies(0).rdd.asInstanceOf[RDD[Product2[K,V]]].iterator(split, context)
// 插入到externalMap中
externalMap.insertAll(rddIter)
// 返回
new InterruptibleIterator(context,
externalMap.iterator
)
}
override protected def getPartitions: Array[Partition] = firstParent[Product2[K,V]].partitions
private def createExternalMap: ExternalAppendOnlyMap[K,V,C] = {
require(aggregator.nonEmpty, "aggregator should not be empty")
/**
创建ExternalAppendOnlyMap, 它需要一下参数:
- 一个V => C 类型的函数,用于迭代时发现某个key的第一个value,将它转换成C
- 一个(C,V) => C类型的函数,用于将value合并到C上
- 一个(C,C) => C类型的函数,将两个部分聚合的结果合并到一起
*/
new ExternalAppendOnlyMap[K,V,C](aggregator.get.createCombiner, aggregator.get.mergeValue, aggregator.get.mergeCombiners)
}
}
ExternalAppendOnlyMap会在必要时spill到磁盘
2.1 测试
测试类如下:
object MapsideReduceTest {
def main(args: Array[String]): Unit ={
val sc = new SparkContext()
val words = Seq(Seq("failure","count","thief","failure","count"),Seq("fortification","peek","lepta","peek"));
// 分两个分区,第一个分区包含Seq("failure","count","thief","failure","count")
// 这样相同的word只在一个分区里面,然后统计word count
val wordsRDD = sc.parallelize(words, 2)
// flatMap将Seq()展开,然后调用map转换成(failure,1)这种数据
val wordsCount = wordsRDD.flatMap(seq => seq).map(word => (word,1))
val aggregator = createAggregator
val mapsideReduceRDD = new MapsideReduceRDD[String, Int, Int](wordsCount).setAggregator(aggregator)
mapsideReduceRDD.saveAsTextFile("/Users/eric/mapsideReduce")
}
def createAggregator: Aggregator[String,Int,Int] ={
val createCombiner: Int => Int = value => value
val mergeValue : (Int, Int) => Int = (mergedValue, newValue) => {
mergedValue + newValue
}
val mergeCombiner = mergeValue
new Aggregator[String, Int,Int](createCombiner, mergeValue, mergeCombiner)
}
}
提交后测试结果如下:
产生2个输出文件以及内容:
------ part-00000 -------
(failure,2)
(count,2)
(thief,1)
------- part-00001 ------
(lepta,1)
(peek,2)
(fortification,1)