Hive、Spark SQL任务参数调优

动态分区

参数 说明
hive.exec.dynamic.partition 是否开启动态分区,默认是false。如果要开启动态分区,就设置为true
hive.exec.dynamic.partition.mode 动态分区模式,默认是strict。也可以改为nonstrict,表示允许所有的分区字段都可以使用动态分区,需要结合hive.exec.dynamic.partition=true一起使用。不过即使开启动态分区,首个分区字段也必须是静态字段
hive.exec.max.dynamic.partitions 可以创建的最大分区数,如果实际分区超过了就会报错

使用案例:

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=5000;

资源申请

参数 说明
spark.executor.cores 每个executor上的槽位个数
spark.executor.memory executor的内存总量,executor上所有的core共享
yarn中container的内存限制为spark.executor.memory+spark.yarn.executor.memoryOverhead<=16G
spark.yarn.executor.memoryOverhead executor堆外内存大小,直接由yarn控制,单位是MB
yarn中container的内存限制为spark.executor.memory+spark.yarn.executor.memoryOverhead<=16G
spark.driver.memory driver的内存总量,主要是存放任务过程中的shuffle元数据,以及任务中collect的数据。Broadcast的小表也会先存放在driver中
yarn中container的内存限制为spark.driver.memory+spark.yarn.driver.memoryOverhead<=16G
spark.yarn.driver.memoryOverhead driver堆外内存大小,直接由yarn控制,单位是MB
yarn中container的内存限制为spark.driver.memory+spark.yarn.driver.memoryOverhead<=16G
spark.memory.fraction storage memory+execution memory占总内存(java heap-reserved memory)的比例
这里说明下,executor jvm中内存分为storage、execution和other内存。storage存放缓存RDD数据,execution存放shuffle过程的中间数据,other存放用户定义的数据结构或spark内部元数据。如果用户自定义数据结构较少,可以将该参数比例适当上调

使用案例:

set spark.executor.cores=2;
set spark.executor.memory=4G;
set spark.yarn.executor.memoryOverhead=1024;
set spark.driver.memory=8G;
set spark.yarn.driver.memoryOverhead=1024;
set spark.memory.fraction=0.7;

Executor动态申请

参数 说明
spark.dynamicAllocation.enabled 是否开启动态资源分配,强烈建议开启
spark.dynamicAllocation.maxExecutors 开启动态资源分配后,同一时刻可以申请的最大executor数
spark.dynamicAllocation.minExecutors 开启动态资源分配后,同一时刻可以申请的最小executor数

使用案例:

set spark.dynamicAllocation.enabled=true;
set spark.dynamicAllocation.maxExecutors=1000;
set spark.dynamicAllocation.minExecutors=400;

ORC文件性能优化

参数 说明
spark.sql.orc.filterPushdown ORC谓词下推,默认是关闭
spark.sql.orc.splits.include.file.footer 开启后,在split划分时会使用footer信息
spark.sql.orc.cache.stripe.details.size 设置每个stripe可以缓存的大小
spark.sql.hive.metatorePartitionPruning 当为true,Spark SQL的谓语将被下推到Hive Metastore中,更早的消除不匹配的分区

使用案例:

set spark.sql.orc.filterPushdown=true;
set spark.sql.orc.splits.include.file.footer=true;
set spark.sql.orc.cache.stripe.details.size=10000;
set spark.sql.hive.metastorePartitionPruning=true;

补充说明:ORC文件存储格式如下表

ORC文件存储格式.png
Postscript是文件描述信息,包括file footer和元数据长度,文件版本,压缩格式等
Footer就是文件的元数据信息,包括数据量、每列的统计信息等
而文件中数据主要是分为stripe,每个stripe包括索引数据、行数据和stripe footer
具体ORC文档可参考ORC Specification v1

文件输入输出

参数 说明
spark.hadoop.hive.exec.orc.split.strategy 控制在读取ORC表时生成split的策略:
 1.BI:以文件尾粒度进行split划分
 2.ETL:将文件进行切分,多个stripe组成一个split
 3.HYBRID:当文件的平均大小大于hadoop最大split值时采用ETL策略,否则采用BI策略
较大的ORC表,可能其footer较大,ETL策略可能导致从HDFS拉取大量数据进行split,甚至导致driver端OOM,此时建议使用BI策略;较小的尤其有数据倾斜的表,建议使用ETL策略
spark.hadoop.mapreduce.input.fileinputformat.split.minsize 读ORC表时,设置小文件合并的阈值,低于该值的split会合并在一个task中执行
spark.hadoop.mapreduce.input.fileinputformat.split.maxsize 读ORC表时,设置一个split的最大阈值,大于该值的split会切分成多个split
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 文件提交到HDFS上的算法:
 1.version=1是按照文件提交
 2.version=2是批量按照目录进行提交,可以极大节约文件提交到HDFS的时间,减轻NameNode压力

使用案例:

set spark.hadoop.hive.exec.orc.split.strategy=ETL;
set spark.hadoop.mapreduce.input.fileinputformat.split.minsize=67108864;
set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize=268435456;
set spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2;

小文件合并

参数 说明
spark.sql.mergeSmallFileSize 小文件合并阈值,如果生成的文件平均大小低于阈值会额外启动一轮stage进行小文件的合并,默认不合并小文件
spark.sql.targetBytesInPartitionWhenMerge 设置额外的合并job时的map端输入size
spark.hadoopRDD.targetBytesInPartition 设置map端输入的合并文件大小

使用案例

set spark.sql.mergeSmallFileSize=67108864;
set spark.sql.targetBytesInPartitionWhenMerge=67108864;
set spark.hadoopRDD.targetBytesInPartition=67108864;

补充说明:

在决定一个目录是否需要合并小文件时,会统计目录下的平均大小,然后和spark.sql.mergeSmallFileSize比较
在合并文件时,一个map task读取的数据量取决于下面三者的较大值:
1、spark.sql.mergeSmallFileSize
2、spark.sql.targetBytesInPartitionWhenMerge
3、spark.hadoopRDD.targetBytesInPartition

Shuffle

参数 说明
spark.sql.autoBroadcastJoinThreshold 小表join自动开启广播机制时小表的阈值,会从Hive Metastore中获取表统计信息。当设置为-1时会禁用广播
spark.sql.shuffle.partitions 设置reduce阶段的分区数.设置过大可能导致很多reducer同时向一个mapper拉取数据,导致mapper由于请求压力过大而挂掉或响应缓慢,从而fetch failed
spark.reducer.maxSizeInFlight 同一时刻一个reducer可以同时拉取的数据量大小
spark.reducer.maxReqsInFlight 同一时刻一个reducer可以同时产生的请求数
spark.reducer.maxBlocksInFlightPerAddress 同一时刻一个reducer向同一个上游executor拉取的最多block数
spark.reducer.maxReqSizeShuffleToMem shufle请求的block超过该阈值就会强制落盘,防止一大堆并发请求将内存占满
spark.shuffle.io.connectionTimeout shuffle中连接超时时间,超过该时间会fetch failed
spark.shuffle.io.maxRetries shuffle中拉取数据的最大重试次数
spark.shuffle.io.retryWait shuffle重试的等待间隔

使用案例:

set spark.sql.autoBroadcastJoinThreshold=33554432;
set spark.sql.shuffle.partitions=5000;
set spark.reducer.maxSizeInFlight=25165824;
set spark.reducer.maxReqsInFlight=10;
set spark.reducer.maxBlocksInFlightPerAddress=1;
set spark.reducer.maxReqSizeShuffleToMem=536870911;
set spark.shuffle.io.connectionTimeout=120;
set spark.shuffle.io.maxRetries=3;
set spark.shuffle.io.retryWait=5;

Adaptive Execution

参数 说明
spark.sql.adaptive.enabled 开启动态执行
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 设置每个Reducer读取的目标数据量,会将低于该值的partition进行合并
spark.sql.adaptive.join.enabled 开启动态调整Join
spark.sql.adaptiveBroadcstJoinThreshold 设置SortMergeJoin转BroadcastJoin的阈值,如果不设置该参数,该阈值和spark.sql.autoBroadcastJoinThreshold值相等
spark.sql.adaptive.allowAdditionalShuffle 是否允许为了优化Join而增加Shuffle,默认是false
spark.sql.adaptive.skewedJoin.enabled 开启自动处理Join时的数据倾斜
spark.sql.adaptive.skewedPartitionMaxSplits 控制处理一个倾斜Partition的task个数上限,默认值是5
spark.sql.adaptive.skewedPartitionRowCountThreshold 设置一个Partition被视为倾斜Partition的行数下限,行数低于该值的Partition不会被当做倾斜Partition处理
spark.sql.adaptive.skewedPartitionSizeThreshold 设置一个Partition被视为倾斜Partition的大小下限,大小小于该值的Partition不会被当做倾斜Partition处理
spark.sql.adaptive.skewedPartitionFactor 设置倾斜因子,当一个Partition满足以下两个条件之一,就会被视为倾斜Partition:
1. 大小大于spark.sql.adaptive.skewedPartitionSizeThreshold的同时大于各Partition大小中位数与该因子的乘积
2. 行数大于spark.sql.adaptive.skewedRowCountThreshold的同时大于各Partition行数中位数与该因子的乘积

使用案例:

set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=268435456;
set spark.sql.adaptive.join.enabled=true;
set spark.sql.adaptiveBroadcastJoinThreshold=33554432;
set spark.sql.adaptive.allowAddititionalShuffle=false;
set spark.sql.adaptive.skewedJoin.enabled=true;
set spark.sql.adaptive.skewedPartitionMaxSplits=100;
set spark.sql.adaptive.skewedPartitionRowCountThreshold=10000000;
set spark.sql.adaptive.skewedPartitionSizeThreshold=536870912;
set spark.sql.adaptive.skewedPartitionFactor=10;

补充说明:

  1. 自动设置Shuffle Partition个数功能已经发布,而动态调整执行计划和自动处理数据倾斜 还不确定是否已经发布(Spark Adaptive Execution调研
  2. 自动设置Shuffle Partition功能(spark.sql.adaptive.shuffle.targetPostShuffleInputSize)慎用,可能出现极端情况而导致耗时变长。比如,笔者有次在一个任务中使用该功能,当时读取表数据量是千亿级别的,经过shuffle处理(broadcast join)后最终的结果数据量在万级别,大小在几十kb,因此shuffle后的分区数变成了1,但是shuffle前的数据量是很大的,因此分区数也达到了笔者设置的上限5000。也就是说,最后是一个task去拉取5000个task写好的shuffle文件,虽然这些shuffle文件大多很小,甚至是没有,但是整个拉取过程依然是非常耗时。而我之所以用这个功能的原因是只是为了小文件合并,完全没有必要用这个参数,将这个功能参数注释掉,换成小文件合并相关的参数后,任务执行时长就下降了十几分钟
  3. 自动处理数据倾斜原理:根据分区数据大小和条数来判断该分区是否为倾斜分区,如果是倾斜分区,就会将该分区对应task由1个改为多个,而另一部分的相应partition数据全量传输。比如,表a的partition0有倾斜情况,表b的partition0没有数据倾斜情况,此时会将表a的partition0分为多个部分传到不同的core上处理,同时表b的partition0数据也会全量shuffle到这些core上,因此表b的partition0最好不要数据量过大(Adaptive Execution让Spark SQL更高效更智能

推测执行

参数 说明
spark.speculation spark推测执行开关,默认是true
spark.speculation.interval 开启推测执行后,每隔该值时间会检测是否有需要推测执行的task
spark.speculation.quantile
spark.speculation.multiplier
当成功task占总task的比例超过spark.speculation.quantile,统计成功task运行时间中位数乘以spark.speculation.multiplier得到推测执行阈值,当在运行的任务超过这个阈值就会启动推测执行。当资源充足时,可以适当减小这两个值

使用案例:

set spark.speculation=true;
set spark.speculation.interval=1000ms;
set spark.speculation.quantile=0.99;
set spark.speculation.multiplier=3;
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容