0x01 数据序列化调优
在进行RDD缓存和Shuffle过程时,Spark会将数据对象进行序列化,所以选择合适的序列化方法,可以提高spark任务的性能。SPark提供了两个序列化库:
原生Java序列化:默认情况下Spark使用Java序列化库。Java序列化很灵活,但是通常很慢,而且序列化格式很大。
Kryo序列化:与Java序列化相比,kryo速度更快(10倍左右),而且序列化格式更紧凑,序列化后数据包体更小。
使用:conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
,如果对象很大,还需要增加spark.kryoserializer.buffer
的大小。
0x02 内存调优
2.1调整数据结构
1.如果Executor分配的内存少于32G,可以设置JVM标识:-XX:+UseCompressedOops将对象指针设为4字节而不是8字节。
2.2 序列化RDD存储
对于重复使用的RDD,将它缓存起来。
2.3 垃圾收集优化
1.Spark的Storage和Execution内存占用由spark.memory.fraction控制,应该保证JVM的老年代的占用比例(NewRatio/(NewRatio+1))大于spark.memory.fraction。这样能够保证老年代有足够的空间存放RDD缓存和Shuffle缓存的数据,避免频繁Full GC。
2.通过设置-XX:+UseG1GC来使用G1垃圾回收器。在堆内存够大时,需要通过设置-XX:G1HeapRegionSize来增大G1区域大小。
3.如果任务是从HDFS中读取数据,通常每个HDFS数据块(128M)对应一个RDD的Partition(或者一个Task),解压缩块的大小通常是块大小的2~3倍,而且每个Executor通常同时运行3 ~ 4Task,所以我们可以估计Eden的大小为4*3*128MB。
0x03 其他调优
3.1 并行度调优
最理想的情况:Task数量和分配的CPU core数量相同,这样所有的Task一起运行,差不多同一时间运行完毕。
但实际情况,有些task会运行的快一些,有些task会运行的慢一点,如果task数量和cpu core的数量相同,就会导致一部分task运行完后,这部分cpu core就空闲出来了,而且一直空闲到程序结束,就造成了资源浪费。所以官方推荐Task数量设置成CPU core数量的2~3倍。尽量让cpu不要空闲,同时提高Task的并发度,可以减少每个task处理的数据量,从而也可以提升运行性能。
3.2 减少Task的内存使用量
Spark自动根据文件的大小设定了运行在其上的map任务的数量,而对于reduce操作,例如groupByKey和reduceByKey,如果并行度过低,导致Task中的数据集太大,进而导致OutofMemoryError。我们可以增加并行度,让每个人物获取到的数据集更小。而且多任务可以重用一个Executor JVM,任务的启动成本很低,因此可以安全地增加并行度到集群中的core数量。
3.3 广播大变量
当RDD算子中引用了Driver中的一个大的对象,每个Task都会持有这个大对象的序列化副本,导致Task序列化包体太大,可以通过Spark的广播机制,将大对象广播到每个Executor上,这样可以减少Task的序列化包体。一般来说大于20kb的Task有可能需要优化。