看到一些同学的Spark代码中包含了很多repartition的操作,有一些不是很合理,非但没有增加处理的效率,反而降低了性能。这里做一个介绍。
repartition 从字面的意思看是是对数据进行重新分区,所以是会对数据进行打散。很多时候大家会觉得任务执行比较慢的时候,就会增加repartition的数量,其实这个时候不一定能提高应用执行的速度。这个操作的使用还要具体问题具体分析。
官方解释
repartition(partitionNums): Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
中文:通过创建更过或更少的分区将数据随机的打散,让数据在不同分区之间相对均匀。这个操作经常是通过网络进行数打散。
原理图
repartition源代码
repartition源码
/**
* Return a new RDD that has exactly numPartitions partitions.
*
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
从注释可以看到以下两个点
- 1 repartition可以将分区的并行度增加,也可以将分区的并行度减少
- 2 可以看到repartition调用了coalesce方法,并且传入的shuffle参数是true。换句说话,就是无论分区数是增加还是减少都会执行shuffle操作。
前提
使用repartition 使得任务能够并行执行的话,分配的core的数量一定要略微大于最大的分区数,才能使得所有的
task能够并行执行。
这里举个例子,比如当前RDD有100个分区,如果我分配的是50个core(spark.executor.instances * spark.executor.cores),那么同时执行的任务最多只有50。同一个操作需分成两部分串行执行,。这个时候如果有一个节点有问题(并行度是50 -1*2 = 48),那么就需要再次申请资源,如果资源紧张,需要等待,如果申请的时候申请的是54个core,虽然看起来有点浪费,但是对于任务的稳定性而言,却可能会有很大的提升。当然最好的视情况是分配102-110比较合理。
repartition适用场景
通过上面注释的理解,结合工作中的一些实践经验,repartition一般有如下几个比较常见的场景:
处理能力不够
RDD单个分区数据量比较大,或者单个分区处理比较慢,都可以通过repartition进行操作,这个时候numPartitions自然是要比当前的分区数要大一些。
单个分区数据量比较大,这种case在读取和处理hdfs文件的时候并不常见,因为默认spark读取hdfs文件的分区数和hdfs文件的block的数量是一致的。这个mr的mapper数量类似。但是对于Spark Streaming 任务,如果是window窗口较长,就比较容易出现单个分区数据量较大的情况。
还有一种case,比如每个需要有外部的一些读写操作,这个时候大量数据在一个partition中,会因为外部存储、单机处理能力,网络等的性能瓶颈导致数据每个partition的数据处理变慢,举个例子,往redis里写数据,如果每个partition数据量特别大,如果使用的是redis的非批量的,基本每个数据需要一次数据请求,假设采用的是pipleline的形式,依然会因为数据量很大导致这个分区的写入很慢,从而导致任务执行时间较长。这种问题在Spark Streaming的任务中影响比较大。这个时候通过repartition增加分区数量效果也会很明显。
数据倾斜
数据倾斜,这个自然不不用说了,使用repartition可以将数据进行打散,避免倾斜导致的执行耗时不均匀的问题。虽然这种情况是完全可以减少任务执行的时间,但是对于数据量比价小的分区而言,很快就能处理完,此时如果executor空闲的话,其实是比较浪费资源的。所以如果是通过这个处理数据倾斜的话,建议开启动态资源分配。另外使用reapartition处理数据倾斜的时候,最好提前统计一下key的分布,这个比较常见的方法就是:
1 如果是hive的数据
select count(1) from table group by key
2 如果是rdd中统计
val keyCount =rdd.map((k,v) =>(k,1)).reduceByKey((a,b) =>(a+b)).sortBy(_._2,false).take(100)
print keyCount
需要进行合并场景
本身需要进行分区合并的时候使用coalesce是最合适的,原因就在于coalesce默认shuffle是false,也就是说,如果100个分区 想要变成50个分区,这个时候coalesce并不会进行shuffle操作。进行coalesce的场景,数据分区很多,但是每个分区数较少,这个时候其实并不太影响执行效率,但是对于一些需要外部链接,或者写入文件的场景来说,这是很浪费资源的。
需要使用shuffle进行分区数减少的场景,原始的rdd分区数据分散不是很均匀,比如 当前RDD是100个分区,想要合并成50个分区,但是100个分区总数据分布从10K-200M分配不均,这个时候为了方便下游数据处理,我们可以将数据进行shuffle形式的合并。
总结
repartition是spark开发中使用非常多的一个操作,而且使用的是否合理直接影响到任务执行的快慢以及成功与否。是否需要进行repartition,以及repartition的数量是一个需要开发的同学多方面综合考虑的结果,并没有一成不变的经验。但是我这儿依然给出一个具体的思路以供参考:确定原始数据分区数量,以及分区大小,key的分布式,参考集群资源的是否充足以及资源使用分布(cpu和内存使用比例),然后综合考虑。