前面介绍了很多关于Spark性能的调优手段,今天来介绍一下Spark性能调优的最后一个点,就是关于Spark中常用算子的调优。废话不多说,直接进入正文;
1.使用mapPartitions算子提高性能
mapPartition的优点:使用普通的map操作,假设一个partition中有1万条数据,那么function就要被执行1万次,但是使用mapPartitions操作之后,function仅仅会被执行一次,显然性能得到了很大的提升,这个就没必要在多废话了。
mapPartition的缺点:使用普通的map操作,调用一次function执行一条数据,不会出现内存不够使用的情况;但是使用mapPartitions操作,很显然,如果数据量太过于大的时候,由于内存有限导致发生OOM,内存溢出。
总结:通过以上以上优缺点的对比,我们可以得出一个结论;就是在数据量不是很大的情况下使用mapPartition操作,性能可以得到一定的提升,在使用mapPartition前,我们需要预先估计一下每个partition的量和每个executor可以被分配到的内存资源。然后尝试去运行程序,如果程序没有问题就大可放心的使用即可,下图是一个实际的应用例子,仅供参考。
2.filter操作之后使用coalesce算子提高性能
先看看默认情况下,执行完filter操作以后的各个partition的情况,如下图所示;
问题:从上面的图中可以很明显的看出,经过一次filter操作以后,每个partition的数据量不同程度的变少了,这里就出现了一个问题;由于每个partition的数据量不一样,出现了数据倾斜的问题。比如上图中执行filter之后的第一个partition的数据量还有9000条。
解决方案:针对上述出现的问题,我们可以将filter操作之后的数据进行压缩处理;一方面减少partition的数量,从而减少task的数量;另一方面通过压缩处理之后,尽量让每个partition的数据量差不多,减少数据倾斜情况的出现,从而避免某个task运行速度特别慢。coalesce算子就是针对上述出现的问题的一个解决方案,下图是一个解决案例。
3.使用foreachPartition算子进行
默认的foreach对于每一条数据,都要单独调用一次function并创建一个数据库连接,如果数据量很大,对于spark作业是非常消耗性能的。
而对于foreachPartition来说,对于function函数,只调用一次,只获取一个数据库连接,一次将数据全部写入数据库。但是数据量很大的话,可能会引发OOM的问题。不过在生产环境中一般都是使用foreachPartition(好像说了半天废话)。
4.使用repartition解决SparkSQL低并行度的问题
在spark项目中,如果在某些地方使用了SparkSQL,那么使用了SparkSQL的那个stage的并行度就没有办法通过手动设置了,而是由程序自己决定。那么,我们通过什么样的手段来提高这些stage的并行度呢?其实解决这个问题的办法就是使partition的数量增多,从而间接的提高了task的并发度,要提高partition的数量,该怎么做呢?就是使用repartition算子,对SparkSQL查询出来的数据重新进行分区操作,此时可以增加分区的个数。具体使用如下图所示:
总结:关于RDD算子的优化,就先讲到这里。关于整个Spark调优,基本先告一段落,后面会介绍一些Spark源码分析的知识,欢迎关注。
如需转载,请注明: