帮你快速理解 Spark 的分区器

本文基于Spark2.1.0版本

0,引言:

Spark一般是部署在分布式环境中的(有可能是在区域集中的集群上,也有可能跨城市),而在分布式环境中,数据在各节点进行网络的传递代价是很大的。借用Spark源码里对groupByKey算子的描述(@note This operation may be very expensive 。。。 ),可见一斑。

/*** @note This operation may be very expensive. If you are grouping in order to perform anaggregation (such as a sum or average) over each key, using`PairRDDFunctions.aggregateByKey` or`PairRDDFunctions.reduceByKey`will provide much better performance.

@note As currently implemented, groupByKey must be able to hold all the key-value pairs for any key in memory. If a key has too many values, it can result in an[[OutOfMemoryError]]. ***/

def groupByKey(partitioner: Partitioner): RDD[(K,Iterable[V])] = self.withScope {...}

1,Shuffle的作用

有一些场景,节点间通过网络传递的数据是很少的。比如从海量日志中提取出ERROR级别的日志,每个节点计算完成后,可以选择把本地得到的结果发送给Driver程序,也可以直接在本地节点上写入外部系统。

而有些场景,是需要每个节点上的数据合并到一起统筹计算的,势必会产生大量的网络开销。比如pairRDD的键值操作,这就涉及到Shuffle过程:Shuffle中译是“洗牌、混洗”,而和洗牌含义不同的是,它不是把数据洗的越乱越好,而是需要把分布在不同节点的数据按照一定的规则聚集到一起的过程。

产生shuffle的算子操作:groupByKey

2,为什么需要分区器

那么,控制好数据的分布以便获得最少的网络传输,可以极大的提升整体性能,减少网络开销。Spark为pairRDD提供的Paritioner,就是为了帮助我们来合理的进行数据分布。本文不会深入介绍具体场景的Shuffle操作的优化,而是会说一些常常被忽略的概念。

3,分区器的种类

HashPartitioner:原理很简单,代码也很简单。对于给定的key,计算其hashCode,并除于分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID。该分区方法可以保证同一组的键出现在同一个节点的分区上。

class HashPartitioner(partitions:Int) extends Partitioner {

require(partitions >=0,s"Number of partitions ($partitions) cannot be negative.")

def numPartitions:Int= partitions

def getPartition(key:Any):Int= key match{

    case null=>0

    case_ => Utils.nonNegativeMod(key.hashCode,numPartitions}

override def equals(other:Any):Boolean= other match{

    caseh: HashPartitioner =>h.numPartitions == numPartitions

    case_ =>false}

override defhashCode:Int= numPartitions}

通过partitionBy(下面第4节会有介绍)主动使用该分区器时,可以通过partitions参数指定想要分区的数量:

scala> val rdd3=rdd2.partitionBy(new org.apache.spark.HashPartitioner(3))

scala> rdd3.partitions.size

res1: Int = 3

通过转换操作使用该分区器时(下面第5节会有介绍),可以继承父RDD的分区数量:

scala> val rdd3=rdd2.groupByKey()

scala> rdd3.partitioner

res2: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@6)

scala> rdd3.partitions.size

res3: Int = 6  (父rdd2的分区数是6)

RangePartitioner:简单的说就是将一定范围内的数映射到某一个分区内。算法比较复杂,代码也比较多,这里就不举例了,可以自行参考Spark源码Partitioner.scala。

这里要说一个需要关注的地方,看源码中对该分区器的一个note:

/*@note The actual number of partitions created by the RangePartitioner might not be the same as the`partitions`parameter, in the case where the number of sampled records is less than* the value of`partitions`.*/

class RangePartitioner[K: Ordering : ClassTag,V]

因为该分区器使用到了Reservoir sampling(水塘抽样)算法,所以不管用户是通过partitionBy(下面第4节会有介绍)主动使用该分区器,或者通过转换操作使用该分区器时,得到的实际分区数可能和想要的设置的不一样,可能会少于预期。

自定义分区方式:Spark允许用户通过自定义的Partitioner对象,灵活的来控制RDD的分区方式。

比如:需要根据域名分区:www.spark.com 和 www.spark.com/sub

使用哈希或者范围分区器,可能无法把上面两个URL划分到相同的分区内,用户就可以自定义一个基于域名的分区器(如下),这个分区器只对URL中的域名求哈希。

class CustomPartitioner(numParts: Int) extends Partitioner {

     def numPartitions: Int = numParts

     def getPartition(key: Any): Int = {

     val domain =newjava.net.URL(key.toString).getHost()

     val code = (domain.hashCode % numPartitions)

     if (code <0) {code + numPartitions} else {code}

     override def equals(other: Any): Boolean = other match {

         case    Custom: CustomPartitioner =>Custom.numPartitions == numPartitions

         case    _ =>false }

     def hashCode: Int = numPartitions

}

4,主动使用分区器

用户有的时候想在某些操作前提前对pairRDD按照某种方式进行分区,而不是被动的通过某些转换算子的shuffle过程。这样可以提前对key根据某种规则来分配到相同的分区,减少后续操作的网络传输。(当然,用户可以根据场景灵活的使用第3点说到的3种分区方式)

scala> val rdd1=sc.parallelize(1 to 10)  -- 通过scala集合生成ParallelCollectionRDD

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at:24

scala> val rdd2=rdd1.map(x=>(x,1))    -- 通过map算子,转换为pairRDD

rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1] at map at:26

-- 通过下面的命令,给rdd3指定HashPartitioner分区器并分10个区

scala> val rdd3=rdd2.partitionBy(new org.apache.spark.HashPartitioner(10))

rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[3] at partitionBy at:28

scala> rdd3.partitioner  -- 可以看到rdd3的分区器是HashPartitioner

Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@a)

scala> rdd3.partitions.size -- rdd3的分区数是10

res11: Int = 10

5,使用某些转换算子会自动为结果RDD生成分区信息

scala> val rdd1=sc.parallelize(1 to 10)  -- 通过scala集合生成ParallelCollectionRDD

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at:24

scala> val rdd2=rdd1.map(x=>(x,1))    -- 通过map算子,转换为pairRDD

rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1] at map at:26

scala> rdd2.partitioner   -- 此时rdd2并没有分区器

res9: Option[org.apache.spark.Partitioner] = None

scala> val rdd3=rdd2.groupByKey()  -- 通过groupByKey算子生成rdd3

org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[4] at groupByKey at:28

scala> rdd3.partitioner  -- 可以看到rdd3自动生成了HashPartitioner分区器

Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@6)

scala> val rdd4=rdd2.sortByKey() -- 通过sortByKey算子生成rdd4

rdd4: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[7] at sortByKey at:28

scala> rdd4.partitioner -- 可以看到rdd4自动生成了RangePartitioner分区器Option[org.apache.spark.Partitioner]=Some(org.apache.spark.RangePartitioner@53df81fe)

这种机制的好处是,Spark知道rdd3是用哈希分区的,那么后面再对rdd3进行键分区相关的操作时(比如reduceByKey)速度就会快很多(rdd4同理)。

6,什么操作会导致子RDD失去父RDD的分区方式?

比如,使用map()算子生成的RDD,由于该转换操作理论上可能会改变元素的键(Spark并不会去判断是否真的改变了键),所以不再继承父RDD的分区器,如下:

scala> val rdd1=sc.parallelize(1 to 10)  -- 通过scala集合生成ParallelCollectionRDD

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at:24

scala> val rdd2=rdd1.map(x=>(x,1))    -- 通过map算子,转换为pairRDD

rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1] at map at:26

-- 通过下面的命令,给rdd3指定HashPartitioner分区器并分10个区

scala> val rdd3=rdd2.partitionBy(new org.apache.spark.HashPartitioner(10))

rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[3] at partitionBy at:28

scala> rdd3.partitioner  -- 可以看到rdd3的分区器是HashPartitioner

Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@a)

scala> val rdd4=rdd3.mapValues(x=>x*2)  -- 如果仅仅对pairRDD的value操作,则子RDD会继承父RDD的分区器及分区数

scala> rdd4.partitioner -- 可以看到rdd4的分区器也是HashPartitioner

Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@a)

scala> rdd4.partitions.size -- rdd4的分区数也是10

res11: Int = 10

scala> val rdd5=rdd3.map(x=>(x,1)) -- 如果是对键操作,则子RDD不再继承父RDD的分区器,但是分区数会继承

rdd5: org.apache.spark.rdd.RDD[((Int, Int), Int)] = MapPartitionsRDD[5] at map at:30

scala> rdd5.partitioner  -- rdd5的分区器是None

res7: Option[org.apache.spark.Partitioner] = None

scala> rdd5.partitions.size  -- rdd5的分区数也是10

res12: Int = 10

7,多元RDD的分区操作后,子RDD如何继承分区信息?

对于两个或多个RDD的操作,生成的新的RDD,其分区方式,取决于父RDD的分区方式。如果两个父RDD都设置过分区方式,则会选择第一个父RDD的分区方式。

scala> a2.partitioner 

 Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@6)

scala> b2.partitioner

Option[org.apache.spark.Partitioner]=Some(org.apache.spark.RangePartitioner@4a2cc441)

scala> val c=a2.cogroup(b2)  -- 通过父b2、父a2 分组操作生成c

scala> c.partitioner                  -- c继承第一个父b2的分区方式

Option[org.apache.spark.Partitioner]=Some(org.apache.spark.RangePartitioner@4a2cc441)

scala> val c=b2.cogroup(a2)  -- 通过父a2、父b2 分组操作生成c

scala> c.partitioner                  -- c继承第一个父a2的分区方式

Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@6)

好了,本章就说这么多吧。

欢迎指正,转载请标明作者和出处,谢谢。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容