转自:https://yq.aliyun.com/articles/461770?spm=a2c4e.11163080.searchblog.129.49792ec1bgg2MF
目录
4.4.2 将reduce join 转化为map join
Apache Spark广泛用于大规模数据处理方面,凭借支持交互查询、流式计算的性能快速崭露头角。Spark需要快速的处理海量数据,由此针对Spark进行性能调优显得十分必要。Spark调优包含众多方面,本文中仅针对于:资源、程序开发、数据倾斜三方面的调优进行阐述。资源优化方面,针对特定生产环境设置集群配置参数;程序开发优化方面,对于RDD的使用、算子的使用、数据及数据结构等方面进行了阐述;数据倾斜优化方面,在简单了解数据倾斜之后,提出了解决数据倾斜的三类方法:优化数据源、优化并行度、优化数据结构。
关键字: Spark调优、数据倾斜、开发调优
Spark调优综述
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎,在非常短的时间内崭露头角,它的API更丰富,且支持交互式查询、流式计算、机器学习等。与曾经引爆大数据产业革命的Hadoop Mapreduce相比它具有更快的速度。然而,想要让Spark作业拥有更好的性能需要一定的技巧。如果没有对于Spark作业进行合理的优化,那么Spark作业的执行速度可能大大降低,这样Spark的速度优势就不能完全体现。由此可见,对于Spark作业进行优化十分有必要。
想要对Spark作业进行优化,了解Spark作业的执行原理十分有必要。通过对于Spark作业的执行流程的分析,有助于做出适合的优化操作。同时,Spark的表现实际上是由很多方面决定的,对于Spark性能调优也是由很多部分组成,不是调节几个参数就可以大幅度提升作业性能的。我们需要结合实际应用场景对Spark作业进行综合分析,调节多个方面,才能获得更好的性能。在本文中我们将对资源、开发、数据倾斜三个方面进行原理介绍及优化。
2.1 优化资源参数
了解了Spark作业的基本原理之后,对于资源相关的参数进行调优就比较容易理解了。对资源参数进行调优就是对Spark作业运行过程中的需要使用到资源的地方,通过调节各种参数,来优化资源的使用率,从而提升Spark作业的执行性能。
各个参数对应于作业运行原理中的某个部分。在Spark中我们有三种方式来设置资源参数,按照优先级排序依是:(1)用户代码中显示调用set()方法设置;(2)通过Spark-submit传递参数;(3)配置文件。当以上三种方法均没有设置参数的值时,Spark将使用系统默认值,下面对主要参数的配置进行产阐述。
(1)num-executors。该参数用于设计Spark作业总的Executor的个数。YARN集群管理器尽可能根据num-executor设置在工作节点上启动Executor。这个参数十分重要,Spark默认只会启动很少的进程,这时并行度不够,任务执行速度十分缓慢。一般为每个Spark作业设置50~100个Executor,设置Executor太多大部分队列无法给予充分的资源;设置Executor太少无法充分利用集群性能。
(2)executor-memory。这个参数设置每个Executor进程的内存,Executor内存的大小,很多程度上直接决定了Spark作业的性能,而且跟很常见的java虚拟机内存溢出异常也有关系。这里executor-memory最大可以设置为式(3.1)所示,其中rMemory表示资源队列的最大内存限制。
executor-memory=rMemory/num-executor 公式(3.1)
(3)executor-core,该参数用于设置每个Executor进程的CPU core数量。因为每个CPU core同一时间只能执行一个task线程,所以executor-core的个数决定了Executor进程的并发线程能力。该参数设置为2-4比较合适。
(4)driver-memory,该参数用于设置Driver进程的内存。这个参数通常不设置,但是要注意的一点是,使用collect算子时,一定要保证Driver内存足够大,否则会出现内存溢出的错误。
(5)Spark.default.parallelism,该参数用于设置每个Stage默认的task数量。该参数使用默认值时,Spark会根据底层HDFS的block数量设置task数量,通常一个block对应一个task,这样task的数量通常是偏少的。由于task是真正执行Spark作业的线程,如果task数量太少,那么Executor中将面临有足够资源却没有偶task执行的窘境,针对Executor所做的优化也将前功尽弃。这里Spark.default.parallelism的大小可以用式(3.2)计算。
Spark.default.parallelism=[2,3]*num-executors * executor-cores 公式(3.2)
(6)Spark.storage.memoryFraction,该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。当Spark作业中有较多RDD需要进行持久化操作时,可以将该参数值调高;当Spark作业中有较少RDD需要进行持久化操作时,可以将该参数值调低。
(7)Spark.Shuffle.memoryFraction,该参数用于设置Shuffle过程中一个task拉取到上个Stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。如果Spark作业中RDD持久化操作较少,Shuffle操作较多时,可以将该参数调高;如果Spark作业中RDD持久化操作较多,Shuffle操作较少时,可以将该参数调低。
在编写Spark程序之初我们就要注意性能优化。实现同一个目的的Spark程序往往因为:使用的算子不同、RDD的复用程度不同、序列化方式不同等表现出性能方面的差异。开发调优就是优化RDD、优化算子、优化数据的过程,通过遵循开发调优的原则并将这些原则根据具体的业务应用到实际中,可能很好的实现Spark作业的性能提升。
3.1 优化RDD
3.1.1避免创建重复的RDD
通常来说,一个Spark作业就是对某个数据源创建RDD,然后对这个RDD进行转化和行为操作。通过转化操作,得到下一个RDD;通过行为操作,得到处理结果。在开发过程中需要注意,对于同一份数据,只应该创建一个RDD,不能创建多个RDD代表同一份数据。使用多个RDD代表同一份数据源时常常会增加作业的性能开销,这些开销包括:(1)创建新RDD的开销;(2)从外部系统中加载数据到RDD中的开销;(3)重复计算的开销。
3.1.2 尽可能复用一个RDD
在对不同的数据执行算子操作时应该尽量复用一个RDD。例如,当RDD A的数据格式是key-value类型的,RDD B的数据格式是value类型的,但是这两个RDD的value数据完全相同;那么,RDD A包含了RDD B中的所有信息,理论上来说RDD B可以被替代,而实际开发中也应该尽量减少多个RDD数据有重复或者包含的情况,这样可以尽可能减少RDD的数量从而减少算子执行的次数。
3.1.3 对多次使用的RDD进行持久化
Spark使用懒惰计算,执行转化操作时并不马上执行命令而是维护一张记录了执行RDD转化关系的谱系图,如图3.1所示。每次同一个RDD执行多个算子运算时都会重新从源头处计算一次,得到该RDD后,在对这个RDD执行算子操作,这种方式极大的损耗了Spark集群的资源。对于这种情况,应该对于多次使用的RDD进行持久化操作,持久化操作会将RDD数据保存到内存或者磁盘中,以后每次使用这个RDD时都不必重新计算,而是从内存或磁盘中直接取出该持久化RDD。
图3.1 RDD转化谱系图
RDD的持有化有几种不同的级别,分别是:MEMORY_ONLY、MEMORY_AND_DISK、MEMORY_ONLY_SER、MEMORY_AND_DISK_SER、DISK_ONLY、MEMORY_ONLY_2等,表3.1中对各种级别的含义进行了简单的介绍。这几种持久化级别使用的优先级排序如下:
(1)MEMORY_ONLY性能最高,直接将RDD存储在内存中,省区了序列化及反序列化、从磁盘读取的时间,从但是对于内存的容量有较高的要求;
(2)MEMORY_ONLY_SER会将数据序列化后保存在内存中,通过序列化压缩了RDD的大小,但是相较于MEMORY_ONLY多出了序列化及反序列化的时间;
(3)MEMORY_AND_DISK_SER优先将RDD缓存在内存中,内存缓存不下时才会存在磁盘中;
(4)DISK_ONLY和后缀为_2的级别通常不建议使用,完全基于磁盘文件的读写会导致性能的极具降低;后缀为2的级别会将所有数据都复制一份副本到其他节点上,数据复制及网络传输会导致较大的性能开销。
表3.1 RDD持久化级别
持久化级别含义
MEMORY_ONLY使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。
MEMORY_AND_DISK使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
MEMORY_ONLY_SER基本含义同MEMORY_ONLY。区别是会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
MEMORY_AND_DISK_SER基本含义同MEMORY_AND_DISK。区别是会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
DISK_ONLY使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
MEMORY_ONLY_2,
MEMORY_AND_DISK_2
对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。
3.2 优化算子
3.2.1 尽量避免使用Shuffle算子
Spark作业最消耗性能的部分就是Shuffle过程,应尽量避免使用Shuffle算子。Shuffle过程就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或者join操作,在操作过程中可能会因为一个节点上处理的key过多导致数据溢出到磁盘。由此可见,Shuffle过程可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作,Shuffle过程如图3.2所示。Shuffle类算子有:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等,编写Spark作业程序时,应该尽量使用map类算子替代Shuffle算子。
图3.2 Shuffle过程
3.2.1 使用高性能算子
Spark提供了几十种算子,使用这些算子可以让Spark作业不同的性能,在编写Spark程序时应尽量使用性能高的算子替换性能低的算子。这里给出几种算子的替换方案:
(1)使用mapPartitions替代普通map。mapPartition类的算子每次会处理一个分区的数据,而普通map每次只会处理一条数据。一次处理一个分区的数据节省了多次建立连接、多次打开数据流的时间;但是,mapPartitions也有可能会出现内存溢出问题,需要谨慎使用。
(2)foreachPartitions替代foreach。原理类似于“使用mapPartitions替代普通map”,foreachPartitions函数也是每次处理一个分区,而foreach函数每次只处理一条数据。比如在foreach函数中,将RDD中所有数据写MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。实践中发现,对于1万条左右的数据量写MySQL,性能可以提升30%以上。
(3)使用filter之后进行coalesce操作。对于一个RDD使用filter进行过滤后,分区中的数据量可能会大为缩减,每个task任务处理的数据量大为减少,有些浪费资源,这时考虑将RDD的分区缩减。coalesce函数可以重新划分分区,但是要注意使用该函数会引起Shuffle。
(4)使用repartitionAndSortWithinPartitions替代repartition与sort类操作。repartitionAndSortWithinPartitions函数是Spark官方网站推荐的一个函数,如果需要现对RDD进行分区操作然后排序,那么不必使用repartition与sort的组合,直接使用repartitionAndSortWithinPartitions函数性能会更好。因为该算子可以在分区的同时进行排序操作,Shuffle操作与sort操作同时进行。
3.3 优化数据
3.3.1 使用Kryo优化序列化性能
在一个Spark作业中,有三处涉及到了序列化:(1)在算子函数中使用到外部变量时,该变量将会被序列化后进行网络传输;(2)将自定的类型作为RDD的泛型类型是,所有自定义类型对象都会进行序列化。此时,自定义的类必须要实现Serializable接口;(3)使用可序列化的持久化策略时,RDD的每个分区都会被序列化成为一个大的字节数组。
对于这三种涉及到序列化的地方,可以使用java提供的序列化机制,这也是Spark作业默认的序列化机制,但是这种序列化机制要保存全类名,效率较低。这里提供一种性能更好的序列化方法:Kryo序列化类库,这种方法较java序列化方法速度快了10倍作业。但是使用Kryo时需要自行注册需要序列化的自定义类,比较有难度。
3.3.2 优化数据结构
在java中有三种类型比较耗费内存:(1)对象;(2)字符串;(3)集合类型。因此Spark编码时应尽量不要使用以上三种数据结构。尽量使用字符串代替对象;使用原始类型代替字符串;使用数组代替集合,这样可以减少内存的占用,降低垃圾回收的频率提高性能。
4.1走近数据倾斜
4.1.1 数据倾斜的原因
数据倾斜是在进行数据计算时,数据分散度不够大量数据集中到少数机器上计算,这些数据的计算速度远远低于平均计算速度,导致整个计算过程过慢。发生数据清晰时常有以下现象:(1)Executor lost,OOM,Shuffle过程出错;(2)Driver OOM;(3)单个Executor执行时间特别久,整体任务卡在某个阶段不能结束;(4)正常运行的任务突然失败。
数据倾斜的产生的原理十分简单:在Spark作业进行Shuffle时会将个个节点上相同的key拉取到某个节点的一个task上进行操作,此时,如果某一个key对应的数据量特别大时,该key对应的task就要处理非常庞大的数据量,这就发生了数据倾斜。通过图4.1可以很好的理解这一过程:在三个节点上,hello对应7条数据,world对应1条数据,you对应1条数据,执行Shuffle操作时,第一个task需要处理7条数,其运行时间远大于其他两个task。由于木桶效应,整个Stage的运行速度是由运行最慢的task决定的。
图4.1 Shuffle过程中数据倾斜的产生
4.1.2 定位数据倾斜的位置
数据倾斜现象只会发生在Shuffle过程中,当出现数据倾斜时应检查代码中可能会触发Shuffle操作的算子。对于不同的数据倾斜现象,有不同的定位方法:
(1)某个task执行特别慢的情况
对于这种情况,首先要确定数据倾斜发生在第几个Stage中。如果是用yarn-client模式提交,那么本地是直接可以看到log的,可以在log中找到当前运行到了第几个Stage;如果是用yarn-cluster模式提交,则可以通过Spark Web UI来查看当前运行到了第几个Stage。此外,无论是使用yarn-client模式还是yarn-cluster模式,我们都可以在Spark Web UI上深入看一下当前这个Stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。
知道数据倾斜发生在哪一个Stage之后,接着根据Stage划分原理,推算出来发生倾斜的那个Stage对应代码中的哪一部分。精准推算Stage与代码的对应关系,需要对Spark的源码有深入的理解,这里有一个相对简单实用的推算方法:只要看到Spark代码中出现了一个Shuffle类算子或者是Spark SQL的SQL语句中出现了会导致Shuffle的语句,那么就可以判定,以此为界限划分出了前后两个Stage。
(2)程序异常报错
这种情况比较容易定位有问题的代码,可以直接查看yarn-client模式下本地log的异常信息,或通过yarn-cluster模式下的log中的异常信息。一般来说,通过异常栈信息就可以定位到你的代码中哪一行发生了内存溢出。然后在那行代码附近找找,一般也会有Shuffle类算子,此时很可能就是这个算子导致了数据倾斜。
要注意的是,不能单纯靠偶然的内存溢出就判定发生了数据倾斜。因为代码的bug、偶然出现的数据异常等,也可能会导致内存溢出。通过Spark Web UI查看报错的那个Stage的各个task的运行时间以及分配的数据量,才能确定是否是由于数据倾斜才导致了这次内存溢出。
4.2 优化数据源
4.2.1 使用Hive ETL预处理数据
当Spark作业的数据来自Hive,且Hive表中数据不均匀,即少量的key对应了大多数的数据时,对于Hive中的数据进行处理是解决数据倾斜的一种办法。
该方法的思路是在Spark作业之前先对Hive中的数据进行聚合、join等处理,然后Spark作业处理的数据就是解决了数据倾斜问题的数据。该方法从根本上解决了数据倾斜,但是这也是一种危机转移方案,虽然Spark作业避免了数据倾斜,但是在Hive中的预操作中依旧存在数据倾斜问题。当对于Spark作业的实时性要求很高时可以采用这种方案,将数据倾斜提前在上游的Hive ETL中完成,周期内仅执行一次,周期内其他时间的操作都会提速。
4.2.2 过滤少数导致倾斜的key
当导致数据倾斜的key很少,且少量的key对作业结果影响并不大,那么过滤掉少数导致倾斜的key是一种不错的处理方法。
如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。如果需要每次作业执行时,动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。这种方法实现简单,效果也很好,可以完全规避掉数据倾斜。但是在实际应用场景中,导致数据倾斜的key往往较多,所以该方法适用范围较小。
4.3 优化集群并行度
4.3.1 提高Shuffle操作的并行度
当必须要正面解决数据倾斜问题时,该方案较为适合,这也是处理数据倾斜最简单的方法之一。
通过增加Shuffle read task的数量,可以让原本分配给一个task的key分配给多个task,从而让每个task处理比原来更少的数据,原理如图4.2所示。这种方案实现起来比较简单,可以有效的缓解数据倾斜的影响。但是这种方法通常无法从根本解决问题,因为如果有一些极端情况出现,如:某个key对应的数据量占整个数据集的90%,那么该key所对应的90%的数据还是会分配到一个task中,这是数据倾斜现象还是是产生。
图4.2 提高Shuffle操作的并行度
4.4 优化算法
4.4.1 两阶段聚合
对RDD执行reduceByKey等聚合类Shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。
这种方案的核心思想是将会产生数据倾斜的一次聚合作业,分为两个阶段进行聚合。第一次聚合,先为每个key标记一个随机数,随机数范围为[0,n],此时一个key被分为n份,对标记后的key进行局部聚合;第二个阶段,将局部聚合后的key所标记的随机数去除,然后在对key进行聚合。最终,Spark聚合作业就完成了,具体原理如图4.3所示。这种方案对于聚合类的Shuffle操作导致的数据倾斜效果很不错,但是仅仅适用于聚合类的操作。
图4.3 两阶段聚合原理图
4.4.2 将reduce join 转化为map join
本方案适合于以下情况使用:RDD中使用join类型的操做或Spark SQL中使用join语句,且join操作中的一个RDD表数据量较小。普通的join是会走Shuffle过程的,而一旦Shuffle,就相当于会将相同key的数据拉取到一个Shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生Shuffle操作,也就不会发生数据倾斜。
本方案使用广播变量与map类算子代替了join操作,从而完全规避掉Shuffle操作,彻底避免了数据倾斜的发生和出现。首先,将较小的RDD中的数据通过collect算子拉去到Driver端内存中,然后将该RDD的数据通过Broadcast变量广播出去;然后,对另一个RDD执行map操作,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来,具体原理如图4.4所示。本方案对于join操作导致的数据倾斜十分有效,但是本方案仅仅局限于执行join操作的两个RDD中有一个数据量较小时。
图4.4 将reduce join 转化为map join
4.4.3 采样倾斜key并分拆join操作
本方案适合于以下情况使用:RDD中使用join类型的操做或Spark SQL中使用join语句,且两个RDD数据集的数据量都比较大,且出现数据倾斜的原因是一个RDD中少数几个key的数据量过大,另一个RDD的key分布均匀。对于join导致的数据倾斜,如果只是某几个key导致了倾斜,可以将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join。
本方案的操作过程如下:
(1)对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key;
(2)将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD A,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD B;
(3)将需要join的另一个RDD也过滤出倾斜key对应的数据并形成一个单独的RDD C,将每条数据膨胀成n条数据,这n条数据按顺序附加一个0~n的前缀,不会导致倾斜的大部分key形成另外一个RDD D;
(4)将RDD A与RDD C进行join,此时导致数据倾斜的key分成n份,分散到多个task中去进行join,得到RDD E;
(5)将RDD B和RDD D进行join操作,得到RDD F;
(6)对RDD E与RDD F执行union操作,得到 RDD G,RDD G即为最终结果。
以上操作步骤可以用图4.5来表示,该方案针对于某几个key导致的数据倾斜十分有效,只需要针对少数导致数据倾斜的key进行扩容n倍,不需要对全量数据进行扩容,避免了占用过多内存;但对于导致数据倾斜的key的数量特别多时,这种方案是无能为力的。
图4.5 采样倾斜key并
分拆join操作
4.4.4 使用随机前缀和扩容RDD进行join
本方案适合于以下情况使用:RDD中使用join类型的操做或Spark SQL中使用join语句,且两个RDD数据集的数据量都比较大,且出现数据倾斜的原因是一个RDD中有大量的key导致数据倾斜。该方案将导致数据倾斜的RDD中的所有key均附加随机前缀,然后将处理后的key分散到不同的key中进行处理,而不是让一个task处理大量的数据。该方案与4.4.3小节方案类似,但是本方案需要更多的内存资源。
本方案的操作过程如下:
(1)查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表;
(2)将该RDD的每条数据都打上一个n以内的随机前缀;
(3)对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀;
(4)将两个处理后的RDD进行join。
该方案对于所有join类型的数据倾斜都可以处理,效果较好,同时本方案更注重与缓解数据倾斜,而不是彻底避免数据倾斜,对于内存资源的要求很高。
Spark性能优化是一项繁复的任务,需要结合实际生产情况对于多个方面级进行优化,仅仅对于某一方面的调整很难取得集群性能上巨大的提升。本文对于Spark调优主要的三个方面:资源优化、开发程序优化、数据倾斜优化进行了阐述,并在每个部分给出了常见的调优方法。在撰写本文的过程中,我对于Spark整体有了更深入的理解,希望能够砥砺前行。