我们平时很容易遇到说排序,并取前N个的状况。
我们根据数据类型可以简单分为重复键和不重复键的topN
MapReduce
对于MR来说,topN代码比较多一些,在这里我只讲讲思路。
当无重复键的时候,
我们有数据("w"->2,"ww"->3,"r"->3)
我们的目的是对值进行排序,如用户点击了几次网页,值记录的就是网页。
map阶段,我们要做的是获取并且处理数据,并完成本地的topN排序。
在排序时我们用的是java自带的treeMap(它是一个基于红黑树的实现)。
为什么要在map阶段就进行排序呢?
因为在数据量巨大的时候,为了减少RPC和reduce的压力。于是我们在map排好序并筛选出前N个。
reduce阶段,我们只需要把map传来的topN再进行一次排序筛选出前N个。
这样我们的目的就达成了。
对于非唯一键,MR显得笨拙一些,它必须先经过一次reduce,把非唯一键变成唯一键后再重复上述操作。
spark
spark具有高层抽象函数。所以排序显得十分简单。在这里主要看看这几个函数。
sortby
def sortBy[S](f: JFunction[T, S], ascending: Boolean, numPartitions: Int): JavaRDD[T]
sortby函数可以完成对指定数据的排序,(k,v)既可以指定k也可以指定v,第二个参数是选择正序还是逆序(默认是true正序,一般要topN的话用逆序),因为这是一个shuffle操作所以可要指定分区。sortbykey
比sortby少一个第一个参数,它是仅对key的排序。sortwith
def sortWith(lt: (A, A) => Boolean): Repr = sorted(Ordering fromLessThan lt)
一种自定义排序的方法takeOrder
take
def take(num: Int): Array[T]
抽取rdd的前n个元素top
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
默认使用降序,并抽取前n个元素tabkeOrdered
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
默认使用升序,并抽取前n个元素
val arr=Map(1->2,2->3,3->1,4->4,4->5,10->23,12->21,10->2,9->1,0->2,9->3)
val conf=new SparkConf().setAppName("test")
val sc=new SparkContext(conf)
val rdd=sc.parallelize(arr.toList,4)
println(rdd.partitions.size+"======================================")
val rerdd=rdd.coalesce(3)
println((rerdd.partitions.size+"======================================"))
val pairs=rerdd.map(x=>new Tuple2(x._1,x._2))
val result=pairs.reduceByKey(_+_)
println(result.partitions.size+"======================================")
val partitions=result.sortBy(x=>x._2,false)
val res=partitions.take(3)
res.foreach(x=>println(x))
代码的简单实现。
思考:如果大量数据中进行topN有什么优化呢?
个人认为剪枝是必要的,假如对于1-100分布的数服从正态分布,我们自然就可以过滤掉百分之50-70的数。
如果在已知平均值等情况下,更方便进行剪枝。