动态分区
参数 | 说明 |
---|---|
hive.exec.dynamic.partition | 是否开启动态分区,默认是false。如果要开启动态分区,就设置为true |
hive.exec.dynamic.partition.mode | 动态分区模式,默认是strict。也可以改为nonstrict,表示允许所有的分区字段都可以使用动态分区,需要结合hive.exec.dynamic.partition=true一起使用。不过即使开启动态分区,首个分区字段也必须是静态字段 |
hive.exec.max.dynamic.partitions | 可以创建的最大分区数,如果实际分区超过了就会报错 |
使用案例:
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=5000;
资源申请
参数 | 说明 |
---|---|
spark.executor.cores | 每个executor上的槽位个数 |
spark.executor.memory | executor的内存总量,executor上所有的core共享 yarn中container的内存限制为spark.executor.memory+spark.yarn.executor.memoryOverhead<=16G |
spark.yarn.executor.memoryOverhead | executor堆外内存大小,直接由yarn控制,单位是MB yarn中container的内存限制为spark.executor.memory+spark.yarn.executor.memoryOverhead<=16G |
spark.driver.memory | driver的内存总量,主要是存放任务过程中的shuffle元数据,以及任务中collect的数据。Broadcast的小表也会先存放在driver中 yarn中container的内存限制为spark.driver.memory+spark.yarn.driver.memoryOverhead<=16G |
spark.yarn.driver.memoryOverhead | driver堆外内存大小,直接由yarn控制,单位是MB yarn中container的内存限制为spark.driver.memory+spark.yarn.driver.memoryOverhead<=16G |
spark.memory.fraction | storage memory+execution memory占总内存(java heap-reserved memory)的比例 这里说明下,executor jvm中内存分为storage、execution和other内存。storage存放缓存RDD数据,execution存放shuffle过程的中间数据,other存放用户定义的数据结构或spark内部元数据。如果用户自定义数据结构较少,可以将该参数比例适当上调 |
使用案例:
set spark.executor.cores=2;
set spark.executor.memory=4G;
set spark.yarn.executor.memoryOverhead=1024;
set spark.driver.memory=8G;
set spark.yarn.driver.memoryOverhead=1024;
set spark.memory.fraction=0.7;
Executor动态申请
参数 | 说明 |
---|---|
spark.dynamicAllocation.enabled | 是否开启动态资源分配,强烈建议开启 |
spark.dynamicAllocation.maxExecutors | 开启动态资源分配后,同一时刻可以申请的最大executor数 |
spark.dynamicAllocation.minExecutors | 开启动态资源分配后,同一时刻可以申请的最小executor数 |
使用案例:
set spark.dynamicAllocation.enabled=true;
set spark.dynamicAllocation.maxExecutors=1000;
set spark.dynamicAllocation.minExecutors=400;
ORC文件性能优化
参数 | 说明 |
---|---|
spark.sql.orc.filterPushdown | ORC谓词下推,默认是关闭 |
spark.sql.orc.splits.include.file.footer | 开启后,在split划分时会使用footer信息 |
spark.sql.orc.cache.stripe.details.size | 设置每个stripe可以缓存的大小 |
spark.sql.hive.metatorePartitionPruning | 当为true,Spark SQL的谓语将被下推到Hive Metastore中,更早的消除不匹配的分区 |
使用案例:
set spark.sql.orc.filterPushdown=true;
set spark.sql.orc.splits.include.file.footer=true;
set spark.sql.orc.cache.stripe.details.size=10000;
set spark.sql.hive.metastorePartitionPruning=true;
补充说明:ORC文件存储格式如下表
Footer就是文件的元数据信息,包括数据量、每列的统计信息等
而文件中数据主要是分为stripe,每个stripe包括索引数据、行数据和stripe footer
具体ORC文档可参考ORC Specification v1
文件输入输出
参数 | 说明 |
---|---|
spark.hadoop.hive.exec.orc.split.strategy | 控制在读取ORC表时生成split的策略: 1.BI:以文件尾粒度进行split划分 2.ETL:将文件进行切分,多个stripe组成一个split 3.HYBRID:当文件的平均大小大于hadoop最大split值时采用ETL策略,否则采用BI策略 较大的ORC表,可能其footer较大,ETL策略可能导致从HDFS拉取大量数据进行split,甚至导致driver端OOM,此时建议使用BI策略;较小的尤其有数据倾斜的表,建议使用ETL策略 |
spark.hadoop.mapreduce.input.fileinputformat.split.minsize | 读ORC表时,设置小文件合并的阈值,低于该值的split会合并在一个task中执行 |
spark.hadoop.mapreduce.input.fileinputformat.split.maxsize | 读ORC表时,设置一个split的最大阈值,大于该值的split会切分成多个split |
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version | 文件提交到HDFS上的算法: 1.version=1是按照文件提交 2.version=2是批量按照目录进行提交,可以极大节约文件提交到HDFS的时间,减轻NameNode压力 |
使用案例:
set spark.hadoop.hive.exec.orc.split.strategy=ETL;
set spark.hadoop.mapreduce.input.fileinputformat.split.minsize=67108864;
set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize=268435456;
set spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2;
小文件合并
参数 | 说明 |
---|---|
spark.sql.mergeSmallFileSize | 小文件合并阈值,如果生成的文件平均大小低于阈值会额外启动一轮stage进行小文件的合并,默认不合并小文件 |
spark.sql.targetBytesInPartitionWhenMerge | 设置额外的合并job时的map端输入size |
spark.hadoopRDD.targetBytesInPartition | 设置map端输入的合并文件大小 |
使用案例
set spark.sql.mergeSmallFileSize=67108864;
set spark.sql.targetBytesInPartitionWhenMerge=67108864;
set spark.hadoopRDD.targetBytesInPartition=67108864;
补充说明:
在决定一个目录是否需要合并小文件时,会统计目录下的平均大小,然后和spark.sql.mergeSmallFileSize比较
在合并文件时,一个map task读取的数据量取决于下面三者的较大值:
1、spark.sql.mergeSmallFileSize
2、spark.sql.targetBytesInPartitionWhenMerge
3、spark.hadoopRDD.targetBytesInPartition
Shuffle
参数 | 说明 |
---|---|
spark.sql.autoBroadcastJoinThreshold | 小表join自动开启广播机制时小表的阈值,会从Hive Metastore中获取表统计信息。当设置为-1时会禁用广播 |
spark.sql.shuffle.partitions | 设置reduce阶段的分区数.设置过大可能导致很多reducer同时向一个mapper拉取数据,导致mapper由于请求压力过大而挂掉或响应缓慢,从而fetch failed |
spark.reducer.maxSizeInFlight | 同一时刻一个reducer可以同时拉取的数据量大小 |
spark.reducer.maxReqsInFlight | 同一时刻一个reducer可以同时产生的请求数 |
spark.reducer.maxBlocksInFlightPerAddress | 同一时刻一个reducer向同一个上游executor拉取的最多block数 |
spark.reducer.maxReqSizeShuffleToMem | shufle请求的block超过该阈值就会强制落盘,防止一大堆并发请求将内存占满 |
spark.shuffle.io.connectionTimeout | shuffle中连接超时时间,超过该时间会fetch failed |
spark.shuffle.io.maxRetries | shuffle中拉取数据的最大重试次数 |
spark.shuffle.io.retryWait | shuffle重试的等待间隔 |
使用案例:
set spark.sql.autoBroadcastJoinThreshold=33554432;
set spark.sql.shuffle.partitions=5000;
set spark.reducer.maxSizeInFlight=25165824;
set spark.reducer.maxReqsInFlight=10;
set spark.reducer.maxBlocksInFlightPerAddress=1;
set spark.reducer.maxReqSizeShuffleToMem=536870911;
set spark.shuffle.io.connectionTimeout=120;
set spark.shuffle.io.maxRetries=3;
set spark.shuffle.io.retryWait=5;
Adaptive Execution
参数 | 说明 |
---|---|
spark.sql.adaptive.enabled | 开启动态执行 |
spark.sql.adaptive.shuffle.targetPostShuffleInputSize | 设置每个Reducer读取的目标数据量,会将低于该值的partition进行合并 |
spark.sql.adaptive.join.enabled | 开启动态调整Join |
spark.sql.adaptiveBroadcstJoinThreshold | 设置SortMergeJoin转BroadcastJoin的阈值,如果不设置该参数,该阈值和spark.sql.autoBroadcastJoinThreshold值相等 |
spark.sql.adaptive.allowAdditionalShuffle | 是否允许为了优化Join而增加Shuffle,默认是false |
spark.sql.adaptive.skewedJoin.enabled | 开启自动处理Join时的数据倾斜 |
spark.sql.adaptive.skewedPartitionMaxSplits | 控制处理一个倾斜Partition的task个数上限,默认值是5 |
spark.sql.adaptive.skewedPartitionRowCountThreshold | 设置一个Partition被视为倾斜Partition的行数下限,行数低于该值的Partition不会被当做倾斜Partition处理 |
spark.sql.adaptive.skewedPartitionSizeThreshold | 设置一个Partition被视为倾斜Partition的大小下限,大小小于该值的Partition不会被当做倾斜Partition处理 |
spark.sql.adaptive.skewedPartitionFactor | 设置倾斜因子,当一个Partition满足以下两个条件之一,就会被视为倾斜Partition: 1. 大小大于spark.sql.adaptive.skewedPartitionSizeThreshold的同时大于各Partition大小中位数与该因子的乘积 2. 行数大于spark.sql.adaptive.skewedRowCountThreshold的同时大于各Partition行数中位数与该因子的乘积 |
使用案例:
set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=268435456;
set spark.sql.adaptive.join.enabled=true;
set spark.sql.adaptiveBroadcastJoinThreshold=33554432;
set spark.sql.adaptive.allowAddititionalShuffle=false;
set spark.sql.adaptive.skewedJoin.enabled=true;
set spark.sql.adaptive.skewedPartitionMaxSplits=100;
set spark.sql.adaptive.skewedPartitionRowCountThreshold=10000000;
set spark.sql.adaptive.skewedPartitionSizeThreshold=536870912;
set spark.sql.adaptive.skewedPartitionFactor=10;
补充说明:
- 自动设置Shuffle Partition个数功能已经发布,而动态调整执行计划和自动处理数据倾斜 还不确定是否已经发布(Spark Adaptive Execution调研)
- 自动设置Shuffle Partition功能(spark.sql.adaptive.shuffle.targetPostShuffleInputSize)慎用,可能出现极端情况而导致耗时变长。比如,笔者有次在一个任务中使用该功能,当时读取表数据量是千亿级别的,经过shuffle处理(broadcast join)后最终的结果数据量在万级别,大小在几十kb,因此shuffle后的分区数变成了1,但是shuffle前的数据量是很大的,因此分区数也达到了笔者设置的上限5000。也就是说,最后是一个task去拉取5000个task写好的shuffle文件,虽然这些shuffle文件大多很小,甚至是没有,但是整个拉取过程依然是非常耗时。而我之所以用这个功能的原因是只是为了小文件合并,完全没有必要用这个参数,将这个功能参数注释掉,换成小文件合并相关的参数后,任务执行时长就下降了十几分钟
- 自动处理数据倾斜原理:根据分区数据大小和条数来判断该分区是否为倾斜分区,如果是倾斜分区,就会将该分区对应task由1个改为多个,而另一部分的相应partition数据全量传输。比如,表a的partition0有倾斜情况,表b的partition0没有数据倾斜情况,此时会将表a的partition0分为多个部分传到不同的core上处理,同时表b的partition0数据也会全量shuffle到这些core上,因此表b的partition0最好不要数据量过大(Adaptive Execution让Spark SQL更高效更智能)
推测执行
参数 | 说明 |
---|---|
spark.speculation | spark推测执行开关,默认是true |
spark.speculation.interval | 开启推测执行后,每隔该值时间会检测是否有需要推测执行的task |
spark.speculation.quantile spark.speculation.multiplier |
当成功task占总task的比例超过spark.speculation.quantile,统计成功task运行时间中位数乘以spark.speculation.multiplier得到推测执行阈值,当在运行的任务超过这个阈值就会启动推测执行。当资源充足时,可以适当减小这两个值 |
使用案例:
set spark.speculation=true;
set spark.speculation.interval=1000ms;
set spark.speculation.quantile=0.99;
set spark.speculation.multiplier=3;