【Spark Java API】Transformation(10)—combineByKey、groupByKey

combineByKey


官方文档描述:

Generic function to combine the elements for each key using a custom set of aggregation functions. 
Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C Note that V and C can be different 
-- for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]). 

Users provide three functions:
 - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
 - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
 - `mergeCombiners`, to combine two C's into a single one.

 In addition, users can control the partitioning of the output RDD, 
and whether to perform map-side aggregation (if a mapper can produce multiple items with the same key).

函数原型:

def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C],  
mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C]

def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], 
mergeCombiners: JFunction2[C, C, C], numPartitions: Int): JavaPairRDD[K, C]

def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C],    
mergeCombiners: JFunction2[C, C, C], partitioner: Partitioner): JavaPairRDD[K, C]

def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], 
mergeCombiners: JFunction2[C, C, C], partitioner: Partitioner, 
mapSideCombine: Boolean,serializer: Serializer): JavaPairRDD[K, C]

**
该函数是用于将RDD[k,v]转化为RDD[k,c],其中类型v和类型c可以相同也可以不同。
其中的参数如下:
**

  • createCombiner:该函数用于将输入参数RDD[k,v]的类型V转化为输出参数RDD[k,c]中类型C;
  • mergeValue:合并函数,用于将输入中的类型C的值和类型V的值进行合并,得到类型C,输入参数是(C,V),输出是C;
  • mergeCombiners:合并函数,用于将两个类型C的值合并成一个类型C,输入参数是(C,C),输出是C;
  • numPartitions:默认HashPartitioner中partition的个数;
  • partitioner:分区函数,默认是HashPartitionner;
  • mapSideCombine:该函数用于判断是否需要在map进行combine操作,类似于MapReduce中的combine,默认是 true。

源码分析:

def combineByKey[C](createCombiner: V => C, 
mergeValue: (C, V) => C, 
mergeCombiners: (C, C) => C, 
partitioner: Partitioner, 
mapSideCombine: Boolean = true, 
serializer: Serializer = null): RDD[(K, C)] = self.withScope {  
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0  
if (keyClass.isArray) {    
  if (mapSideCombine) {      
    throw new SparkException("Cannot use map-side combining with array keys.")    
  }    
  if (partitioner.isInstanceOf[HashPartitioner]) {          
    throw new SparkException("Default partitioner cannot partition array keys.")    
  }  
}  
val aggregator = new Aggregator[K, V, C](    
  self.context.clean(createCombiner),    
  self.context.clean(mergeValue),    
  self.context.clean(mergeCombiners))  
if (self.partitioner == Some(partitioner)) {    
  self.mapPartitions(iter => {      
    val context = TaskContext.get()      
    new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))    
  }, preservesPartitioning = true)  
} else {    
  new ShuffledRDD[K, V, C](self, partitioner)      
    .setSerializer(serializer)      
    .setAggregator(aggregator)        
    .setMapSideCombine(mapSideCombine)  
  }
}

**
从源码中可以看出,combineByKey()的实现是一边进行aggregate,一边进行compute() 的基础操作。假设一组具有相同 K 的 <K, V> records 正在一个个流向 combineByKey(),createCombiner 将第一个 record 的 value 初始化为 c (比如,c = value),然后从第二个 record 开始,来一个 record 就使用 mergeValue(c, record.value) 来更新 c,比如想要对这些 records 的所有 values 做 sum,那么使用 c = c + record.value。等到 records 全部被 mergeValue(),得到结果 c。假设还有一组 records(key 与前面那组的 key 均相同)一个个到来,combineByKey() 使用前面的方法不断计算得到 c'。现在如果要求这两组 records 总的 combineByKey() 后的结果,那么可以使用 final c = mergeCombiners(c, c') 来计算;然后依据partitioner进行不同分区合并。
**

实例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
//转化为pairRDD
JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {    
    @Override    
    public Tuple2<Integer, Integer> call(Integer integer) throws Exception {  
    return new Tuple2<Integer, Integer>(integer,1);   
  }
});

JavaPairRDD<Integer,String> combineByKeyRDD = javaPairRDD.combineByKey(new Function<Integer, String>() {    
    @Override    
    public String call(Integer v1) throws Exception {  
      return v1 + " :createCombiner: ";    
  }
  }, new Function2<String, Integer, String>() {    
    @Override    
    public String call(String v1, Integer v2) throws Exception {        
      return v1 + " :mergeValue: " + v2;    
  }
}, new Function2<String, String, String>() {    
    @Override    
    public String call(String v1, String v2) throws Exception {        
      return v1 + " :mergeCombiners: " + v2;    
  }
});
System.out.println("result~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + combineByKeyRDD.collect());

groupByKey


官方文档描述:

Group the values for each key in the RDD into a single sequence. 
Allows controlling the partitioning of the resulting key-value pair RDD by passing a Partitioner. 
The ordering of elements within each group is not guaranteed, 
and may even differ each time the resulting RDD is evaluated.
Note: This operation may be very expensive. 
If you are grouping in order to perform an aggregation (such as a sum or average) over each key, 
using [[PairRDDFunctions.aggregateByKey]] or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any key in memory. 
If a key has too many values, it can result in an [[OutOfMemoryError]].

函数原型:

def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]]

def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]]

源码分析:

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {  
  // groupByKey shouldn't use map side combine because map side combine does not  
  // reduce the amount of data shuffled and requires all map side data be inserted  
  // into a hash table, leading to more objects in the old gen.  
  val createCombiner = (v: V) => CompactBuffer(v)  
  val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v  
  val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2  
  val bufs = combineByKey[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)  
  bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}

**
从源码中可以看出groupByKey()是基于combineByKey()实现的, 只是将 Key 相同的 records 聚合在一起,一个简单的 shuffle 过程就可以完成。ShuffledRDD 中的 compute() 只负责将属于每个 partition 的数据 fetch 过来,之后使用 mapPartitions() 操作进行 aggregate,生成 MapPartitionsRDD,到这里 groupByKey() 已经结束。最后为了统一返回值接口,将 value 中的 ArrayBuffer[] 数据结构抽象化成 Iterable[]。groupByKey() 没有在 map 端进行 combine(mapSideCombine = false),这样设计是因为map 端 combine 只会省掉 partition 里面重复 key 占用的空间;但是,当重复 key 特别多时,可以考虑开启 combine。
**

实例:

List<Integer> data = Arrays.asList(1, 2, 4, 3, 5, 6, 7);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data);
//转为k,v格式
JavaPairRDD<Integer,Integer> javaPairRDD = javaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {    
    @Override      
    public Tuple2<Integer, Integer> call(Integer integer) throws Exception {        
    return new Tuple2<Integer, Integer>(integer,1);    
  }
});

JavaPairRDD<Integer,Iterable<Integer>> groupByKeyRDD = javaPairRDD.groupByKey(2);
System.out.println(groupByKeyRDD.collect());

//自定义partition
JavaPairRDD<Integer,Iterable<Integer>> groupByKeyRDD3 = javaPairRDD.groupByKey(new Partitioner() {    
    //partition各数    
    @Override    
    public int numPartitions() {        return 10;    }    
    //partition方式    
    @Override    
    public int getPartition(Object o) {          
      return (o.toString()).hashCode()%numPartitions();    
  }
});
System.out.println(groupByKeyRDD3.collect());
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,723评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,485评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,998评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,323评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,355评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,079评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,389评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,019评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,519评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,971评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,100评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,738评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,293评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,289评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,517评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,547评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,834评论 2 345

推荐阅读更多精彩内容