Spark Repartition 使用

看到一些同学的Spark代码中包含了很多repartition的操作,有一些不是很合理,非但没有增加处理的效率,反而降低了性能。这里做一个介绍。

repartition 从字面的意思看是是对数据进行重新分区,所以是会对数据进行打散。很多时候大家会觉得任务执行比较慢的时候,就会增加repartition的数量,其实这个时候不一定能提高应用执行的速度。这个操作的使用还要具体问题具体分析。

官方解释

repartition(partitionNums): Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
中文:通过创建更过或更少的分区将数据随机的打散,让数据在不同分区之间相对均匀。这个操作经常是通过网络进行数打散。

原理图

image.png

repartition源代码

repartition源码

 /**
   * Return a new RDD that has exactly numPartitions partitions.
   *
   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
   * a shuffle to redistribute data.
   *
   * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
   * which can avoid performing a shuffle.
   */
  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

从注释可以看到以下两个点

  • 1 repartition可以将分区的并行度增加,也可以将分区的并行度减少
  • 2 可以看到repartition调用了coalesce方法,并且传入的shuffle参数是true。换句说话,就是无论分区数是增加还是减少都会执行shuffle操作。

前提

使用repartition 使得任务能够并行执行的话,分配的core的数量一定要略微大于最大的分区数,才能使得所有的
task能够并行执行。
这里举个例子,比如当前RDD有100个分区,如果我分配的是50个core(spark.executor.instances * spark.executor.cores),那么同时执行的任务最多只有50。同一个操作需分成两部分串行执行,。这个时候如果有一个节点有问题(并行度是50 -1*2 = 48),那么就需要再次申请资源,如果资源紧张,需要等待,如果申请的时候申请的是54个core,虽然看起来有点浪费,但是对于任务的稳定性而言,却可能会有很大的提升。当然最好的视情况是分配102-110比较合理。

repartition适用场景

通过上面注释的理解,结合工作中的一些实践经验,repartition一般有如下几个比较常见的场景:

处理能力不够

RDD单个分区数据量比较大,或者单个分区处理比较慢,都可以通过repartition进行操作,这个时候numPartitions自然是要比当前的分区数要大一些。
单个分区数据量比较大,这种case在读取和处理hdfs文件的时候并不常见,因为默认spark读取hdfs文件的分区数和hdfs文件的block的数量是一致的。这个mr的mapper数量类似。但是对于Spark Streaming 任务,如果是window窗口较长,就比较容易出现单个分区数据量较大的情况。
还有一种case,比如每个需要有外部的一些读写操作,这个时候大量数据在一个partition中,会因为外部存储、单机处理能力,网络等的性能瓶颈导致数据每个partition的数据处理变慢,举个例子,往redis里写数据,如果每个partition数据量特别大,如果使用的是redis的非批量的,基本每个数据需要一次数据请求,假设采用的是pipleline的形式,依然会因为数据量很大导致这个分区的写入很慢,从而导致任务执行时间较长。这种问题在Spark Streaming的任务中影响比较大。这个时候通过repartition增加分区数量效果也会很明显。

数据倾斜

数据倾斜,这个自然不不用说了,使用repartition可以将数据进行打散,避免倾斜导致的执行耗时不均匀的问题。虽然这种情况是完全可以减少任务执行的时间,但是对于数据量比价小的分区而言,很快就能处理完,此时如果executor空闲的话,其实是比较浪费资源的。所以如果是通过这个处理数据倾斜的话,建议开启动态资源分配。另外使用reapartition处理数据倾斜的时候,最好提前统计一下key的分布,这个比较常见的方法就是:
1 如果是hive的数据

select  count(1)  from table group by key

2 如果是rdd中统计

val keyCount =rdd.map((k,v) =>(k,1)).reduceByKey((a,b) =>(a+b)).sortBy(_._2,false).take(100)
print keyCount

需要进行合并场景

本身需要进行分区合并的时候使用coalesce是最合适的,原因就在于coalesce默认shuffle是false,也就是说,如果100个分区 想要变成50个分区,这个时候coalesce并不会进行shuffle操作。进行coalesce的场景,数据分区很多,但是每个分区数较少,这个时候其实并不太影响执行效率,但是对于一些需要外部链接,或者写入文件的场景来说,这是很浪费资源的。
需要使用shuffle进行分区数减少的场景,原始的rdd分区数据分散不是很均匀,比如 当前RDD是100个分区,想要合并成50个分区,但是100个分区总数据分布从10K-200M分配不均,这个时候为了方便下游数据处理,我们可以将数据进行shuffle形式的合并。

总结

repartition是spark开发中使用非常多的一个操作,而且使用的是否合理直接影响到任务执行的快慢以及成功与否。是否需要进行repartition,以及repartition的数量是一个需要开发的同学多方面综合考虑的结果,并没有一成不变的经验。但是我这儿依然给出一个具体的思路以供参考:确定原始数据分区数量,以及分区大小,key的分布式,参考集群资源的是否充足以及资源使用分布(cpu和内存使用比例),然后综合考虑。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容