小文件问题的影响
1.从Hive的角度看,小文件会开很多map,一个map开一个JVM去执行,所以这些任务的初始化,启动,执行会浪费大量的资源,严重影响性能。
2.在HDFS中,每个小文件对象约占150byte,如果小文件过多会占用大量内存。这样NameNode内存容量严重制约了集群的扩展。
————————————————
小文件问题的解决方案
从小文件产生的途经就可以从源头上控制小文件数量,方法如下:
1.使用Sequencefile作为表存储格式,不要用textfile,在一定程度上可以减少小文件。
2.减少reduce的数量(可以使用参数进行控制)。
3.少用动态分区,用时记得按distribute by分区。
————————————————
对于已有的小文件,我们可以通过以下几种方案解决:
1.使用hadoop archive命令把小文件进行归档。
2.重建表,建表时减少reduce数量。
3.通过参数进行调节,设置map/reduce端的相关参数,如下:
设置map输入合并小文件的相关参数:
[java] view plain copy
//每个Map最大输入大小(这个值决定了合并后文件的数量)
set mapred.max.split.size=256000000;
//一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=100000000;
//一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)
set mapred.min.split.size.per.rack=100000000;
//执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
设置map输出和reduce输出进行合并的相关参数:
[java] view plain copy
//设置map端输出进行合并,默认为true
set hive.merge.mapfiles = true
//设置reduce端输出进行合并,默认为false
set hive.merge.mapredfiles = true
//设置合并文件的大小
set hive.merge.size.per.task = 25610001000
//当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。
set hive.merge.smallfiles.avgsize=16000000
————————————————
3.Write good SQL :
说道sql优化很惭愧,自己sql很烂,不多比比了,但是sql优化确实很关键。。。
4.存储格式:
可以使用列裁剪,分区裁剪,orc,parquet等存储格式。
Hive支持ORCfile,这是一种新的表格存储格式,通过诸如谓词下推,压缩等技术来提高执行速度提升。
对于每个HIVE表使用ORCFile应该是一件容易的事情,并且对于获得HIVE查询的快速响应时间非常有益。
作为一个例子,考虑两个大表A和B(作为文本文件存储,其中一些列未在此处指定,即行试存储的缺点)以及一个简单的查询,如:
SELECT A.customerID, A.name, A.age, A.address join
B.role, B.department, B.salary
ON A.customerID=B.customerID;
此查询可能需要很长时间才能执行,因为表A和B都以TEXT形式存储,进行全表扫描。
将这些表格转换为ORCFile格式通常会显着减少查询时间:
ORC支持压缩存储(使用ZLIB或如上所示使用SNAPPY),但也支持未压缩的存储。
CREATE TABLE A_ORC (
customerID int, name string, age int, address string
) STORED AS ORC tblproperties (“orc.compress" = “SNAPPY”);
INSERT INTO TABLE A_ORC SELECT * FROM A;
CREATE TABLE B_ORC (
customerID int, role string, salary float, department string
) STORED AS ORC tblproperties (“orc.compress" = “SNAPPY”);
INSERT INTO TABLE B_ORC SELECT * FROM B;
SELECT A_ORC.customerID, A_ORC.name,
A_ORC.age, A_ORC.address join
B_ORC.role, B_ORC.department, B_ORC.salary
ON A_ORC.customerID=B_ORC.customerID;
5.压缩格式:
压缩格式 | UNIX工具 | 算 法 | 文件扩展名 | 可分割 |
---|---|---|---|---|
DEFLATE | 无 | DEFLATE | .deflate | No |
gzip | gzip | DEFLATE | .gz | No |
LZ4 | 无 | LZ4 | .LZ4 | NO |
bzip | bzip | bzip | .bz2 | YES |
LZO | lzop | LZO | .lzo | YES if indexed |
Snappy | 无 | Snappy | .snappy | NO |
大数据场景下存储格式压缩格式尤为关键,可以提升计算速度,减少存储空间,降低网络io,磁盘io,所以要选择合适的压缩格式和存储格式,那么首先就了解这些东西,作者以前博客已经进行了详细的说明,
可以看出压缩比越高,压缩时间越长,压缩比:Snappy < LZ4 < LZO < GZIP < BZIP2
gzip:
优点:压缩比在四种压缩方式中较高;hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;有hadoop native库;大部分linux系统都自带gzip命令,使用方便。
缺点:不支持split。
lzo压缩
优点:压缩/解压速度也比较快,合理的压缩率;支持split,是hadoop中最流行的压缩格式;支持hadoop native库;需要在linux系统下自行安装lzop命令,使用方便。
缺点:压缩率比gzip要低;hadoop本身不支持,需要安装;lzo虽然支持split,但需要对lzo文件建索引,否则hadoop也是会把lzo文件看成一个普通文件(为了支持split需要建索引,需要指定inputformat为lzo格式)。
snappy压缩
优点:压缩速度快;支持hadoop native库。
缺点:不支持split;压缩比低;hadoop本身不支持,需要安装;linux系统下没有对应的命令。
bzip2压缩
优点:支持split;具有很高的压缩率,比gzip压缩率都高;hadoop本身支持,但不支持native;在linux系统下自带bzip2命令,使用方便。
缺点:压缩/解压速度慢;不支持native。
————————————————
6.MAP JOIN
MapJoin简单说就是在Map阶段将小表读入内存,顺序扫描大表完成Join。
(1)通过MapReduce Local Task,将小表读入内存,生成HashTableFiles上传至Distributed Cache中,这里会对HashTableFiles进行压缩。
(2)MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中,顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。
也就是在map端进行join避免了shuffle。
————————————————
7.引擎的选择
Hive可以使用ApacheTez执行引擎而不是古老的Map-Reduce引擎。
我不会详细讨论在这里提到的使用Tez的许多好处; 相反,我想提出一个简单的建议:
如果它没有在您的环境中默认打开,请在您的Hive查询的开头将以下内容设置为'true'来使用Tez:
设置hive.execution.engine = tez;
通过上述设置,您执行的每个HIVE查询都将利用Tez。
目前Hive On Spark还处于试验阶段,慎用。。
8.Use Vectorization
向量化查询执行通过一次性批量执行1024行而不是每次单行执行,从而提高扫描,聚合,筛选器和连接等操作的性能。
在Hive 0.13中引入,此功能显着提高了查询执行时间,并可通过两个参数设置轻松启用:
设置hive.vectorized.execution.enabled = true;
设置hive.vectorized.execution.reduce.enabled = true;
9.cost based query optimization
Hive 自0.14.0开始,加入了一项”Cost based Optimizer”来对HQL执行计划进行优化,这个功能通
过”hive.cbo.enable”来开启。在Hive 1.1.0之后,这个feature是默认开启的,它可以自动优化HQL中多个JOIN的顺序,并
选择合适的JOIN算法.
Hive在提交最终执行前,优化每个查询的执行逻辑和物理执行计划。这些优化工作是交给底层来完成。
根据查询成本执行进一步的优化,从而产生潜在的不同决策:如何排序连接,执行哪种类型的连接,并行度等等。
要使用基于成本的优化(也称为CBO),请在查询开始处设置以下参数:
设置hive.cbo.enable = true;
设置hive.compute.query.using.stats = true;
设置hive.stats.fetch.column.stats = true;
设置hive.stats.fetch.partition.stats = true;
10.模式选择
本地模式
对于大多数情况,Hive可以通过本地模式在单台机器上处理所有任务。
对于小数据,执行时间可以明显被缩短。通过set hive.exec.mode.local.auto=true(默认为false)设置本地模式。
hive> set hive.exec.mode.local.auto;
hive.exec.mode.local.auto=false
并行模式
Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。
默认情况下,Hive一次只会执行一个阶段,由于job包含多个阶段,而这些阶段并非完全互相依赖,
即:这些阶段可以并行执行,可以缩短整个job的执行时间。设置参数:set hive.exec.parallel=true,或者通过配置文件来完成。
hive> set hive.exec.parallel;
hive.exec.parallel=false
严格模式
Hive提供一个严格模式,可以防止用户执行那些可能产生意想不到的影响查询,通过设置
Hive.mapred.modestrict来完成
set Hive.mapred.modestrict;
Hive.mapred.modestrict is undefined
11.JVM重用
Hadoop通常是使用派生JVM来执行map和reduce任务的。这时JVM的启动过程可能会造成相当大的开销,
尤其是执行的job包含偶成百上千的task任务的情况。JVM重用可以使得JVM示例在同一个job中时候使用N此。
通过参数mapred.job.reuse.jvm.num.tasks来设置。
12.推测执行
Hadoop推测执行可以触发执行一些重复的任务,尽管因对重复的数据进行计算而导致消耗更多的计算资源,
不过这个功能的目标是通过加快获取单个task的结果以侦测执行慢的TaskTracker加入到没名单的方式来提高整体的任务执行效率。
Hadoop的推测执行功能由2个配置控制着,通过mapred-site.xml中配置
mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true
————————————————
Hive日常调优参数汇总
--压缩配置:
-- map/reduce 输出压缩(一般采用序列化文件存储)
set hive.exec.compress.output=true;
set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
set mapred.output.compression.type=BLOCK;
--任务中间压缩
set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;
--优化
set hive.exec.dynamic.partition.mode=nonstrict;--设置非严格模式
set hive.exec.dynamic.partition=true;--设置动态分区
set hive.exec.max.dynamic.partitions.pernode=1000;--设置动态分区每个节点最多可划分为多少个分区
set hive.exec.max.dynamic.partitions=2000;--设置动态分区时的分区最大数量
set mapred.reduce.tasks = 20;--设置reduce的任务数量,可用于优化插入分区表时的执行效率
set hive.exec.reducers.max=100;--设置reduce最大数量
set spark.executor.cores=4;--设置每个executor用的core
set spark.executor.memory=8g;--设置每个executor的内存大小
set mapreduce.map.memory.mb=8192;--设置map任务的内存大小(container大小)
set mapreduce.reduce.memory.mb=8192;--设置reduce任务使用内存大小
set mapred.reduce.child.java.opts=-server -Xmx4000m -Djava.net.preferIPv4Stack=true;
--map端内存溢出可以参考下面两个参数
set mapred.map.child.java.opts=-server -Xmx2048m -Djava.net.preferIPv4Stack=true;
set mapreduce.map.child.java.opts="-Xmx3072m"
set hive.execution.engine=mr;--设置执行hive引擎为mr
set hive.merge.mapredfiles= true;--合并小文件
set hive.merge.mapfiles = true;
set hive.merge.size.per.task = 256000000;
set hive.merge.smallfiles.avgsize = 256000000;
set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set hive.optimize.cp = true;
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8;
set mapreduce.job.running.map.limit=10;--限制map运行数量
set hive.mapjoin.smalltable.filesize=26214400;--默认是25M
set hive.exec.max.created.files = 200000;--增大hive文件创建数量
set yarn.app.mapreduce.am.resource.mb=4096;
set hive.tez.java.opts=-Xmx8192m -XX:MaxPermSize=256m;
SET hive.tez.container.size=10240;
set mapreduce.input.fileinputformat.split.maxsize=256000000;
set mapreduce.input.fileinputformat.split.minsize.per.node=256000000;
set mapreduce.input.fileinputformat.split.minsize.per.rack=256000000;
set hive.exec.reducers.bytes.per.reducer=5120000000;--设置每个reducer处理的数据
--总共有三种策略{"HYBRID", "BI", "ETL"}), 默认是"HYBRID","This is not a user level config.
-- BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers).
-- ETL strategy is used when spending little more time in split generation is acceptable (split generation reads and caches file footers).
-- HYBRID chooses between the above strategies based on heuristics."),
set hive.exec.orc.split.strategy=BI;(ORC split generation failed with exception: java.lang.OutOfMemoryError)
set hive.mapjoin.localtask.max.memory.usage=0.999;--本地任务可以使用内存的百分比 默认值:0.90
-- map join做group by操作时,可使用多大的内存来存储数据。若数据太大则不会保存在内存里 默认值:0.55
set hive.mapjoin.followby.gby.localtask.max.memory.usage;
--本地mr设置
set hive.exec.mode.local.auto=true; --开启本地mr
--设置local mr的最大输入数据量,当输入数据量小于这个值的时候会采用local mr的方式
set hive.exec.mode.local.auto.inputbytes.max=50000000;
--设置local mr的最大输入文件个数,当输入文件个数小于这个值的时候会采用local mr的方式
set hive.exec.mode.local.auto.tasks.max=10;
--当这三个参数同时成立时候,才会采用本地mr
set mapreduce.map.java.opts=-Xmx4096m -XX:-UseGCOverheadLimit -- GC overhead limit exceeded
set io.sort.mb=1024;
--采样
set hive.limit.optimize.enable=true --- 开启对数据源进行采样的功能
set hive.limit.row.max.size --- 设置最小的采样容量
set hive.limit.optimize.limit.file --- 设置最大的采样样本数
set mapred.max.split.size=134217728; --决定每个map处理的最大的文件大小,可以根据总文件大小以及这个参数的设置调整map的数量,动态调整,当map数量比较小且执行非常慢时,可以将此参数调小
set mapred.min.split.size.per.node=1024000000;--每个节点,动态调整,当map数量比较小且执行非常慢时,可以将此参数调小
set mapred.min.split.size.per.rack=1024000000;--每个机架
--mapred.max.split.size <= mapred.min.split.size.per.node <= mapred.min.split.size.per.rack
set hive.auto.convert.join=true; --hive自动识别小表,小表自动加载到内存,reduce端Common Join 转化为map join,可解决数据倾斜问题,map端jpoin
--不产生shuffle
set hive.skewjoin.key=100000; --这个是join的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置
set hive.optimize.skewjoin = true;--如果是join 过程出现倾斜 应该设置为true,hive 在运行的时候没有办法判断哪个key 会产生多大的倾斜,所以使用这个参数控制倾斜的阈值,如果超过这个值,新的值会发送给那些还没有达到的reduce, 一般可以设置成(处理的总记录数/reduce个数)的2-4倍都可以接受
set hive.groupby.mapaggr.checkinterval=100000;--这个是group的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置(map端聚合操作的记录条数)
set hive.map.aggr.hash.min.reduction=0.5;--解释:预先取100000条数据聚合,如果聚合后的条数小于100000*0.5,则不再聚合。
set hive.auto.convert.join.noconditionaltask=True;--将多个map join合并为一个,Hive在基于输入文件大小的前提下将普通JOIN转换成MapJoin,并是否将多个MJ合并成一个
set hive.auto.convert.join.noconditionaltask.size=100000000;--多个mapjoin转换为1个时,所有小表的文件大小总和的最大值。
set hive.groupby.skewindata=false;--当选项设定为true,生成的查询计划会有两个MRJob。第一个MRJob 中,
--Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key
--有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到
--Reduce中(这个过程可以保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。
set hive.optimize.index.filter=true;--自动使用索引,使用聚合索引优化group by操作,如果是orc表,可以使用orc的索引,加快读取hive表的数据
set mapreduce.job.reduce.slowstart.completedmaps=0.8;--当Map Task完成的比例达到该值后才会为Reduce Task申请资源,默认是0.05,需要特别注意的是,
--在JobImpl中,如果处于Uber模式下,会将mapreduce.job.reduce.slowstart.completedmaps参数设置为1,这很好理解,因为不管Map Task,还是Reduce Task,均是串行执行的,所以当Map Task完成的比例达到多少值后才会为Reduce Task申请资源,这个值百分百应该是1
set hive.new.job.grouping.set.cardinality = 30;--grouping sets 数量较多时即cube维度过多,这条设置的意义在于告知解释器,group by之前,每条数据复制量在30份以内。
-- 关闭hive推测执行
set hive.mapred.reduce.tasks.speculative.execution = false;
set mapreduce.map.speculative = false;
set mapreduce.reduce.speculative = false;
--hive on spark
set spark.executor.memory=4g;
set spark.executor.cores=2;
set spark.executor.instances=50;
set spark.serializer=org.apache.spark.serializer.KryoSerializer;
set spark.default.parallelism = 300;
set spark.locality.wait = 6;
set spark.locality.wait.process=6;
set spark.locality.wait.node=6;
set spark.locality.wait.rack=6;
set spark.shuffle.consolidateFiles=true;--map端文件合并
set spark.shuffle.memoryFraction=0.5;
set mapreduce.map.java.opts=-Xmx2000m -XX:-UseGCOverheadLimit
--map倾斜(数据量大且map分配数据量不合理)
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=2;
set hive.groupby.skewindata=true;
set mapred.max.split.size=256000000;
set mapred.min.split.size.per.node=256000000;
set mapred.min.split.size.per.rack=256000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
-- 当mapreduce.input.fileinputformat.split.maxsize > mapreduce.input.fileinputformat.split.minsize > dfs.blockSize的情况下,此时的splitSize 将由mapreduce.input.fileinputformat.split.minsize参数决定。
-- 当mapreduce.input.fileinputformat.split.maxsize > dfs.blockSize > mapreduce.input.fileinputformat.split.minsize的情况下,此时的splitSize 将由dfs.blockSize配置决定。(第二次优化符合此种情况)
-- 当dfs.blockSize > mapreduce.input.fileinputformat.split.maxsize > mapreduce.input.fileinputformat.split.minsize的情况下,此时的splitSize 将由mapreduce.input.fileinputformat.split.maxsize参数决定。
--maxSplitSize > minSplitSizeNode > minSplitSizeRack
set mapreduce.input.fileinputformat.split.maxsize=256000000;--map端文件切片大小
set mapreduce.input.fileinputformat.split.minsize.per.node=256000000;--同一节点的数据块形成切片时,切片大小的最小值
set mapreduce.input.fileinputformat.split.minsize.per.rack=256000000;--同一机架的数据块形成切片时,切片大小的最小值
----reduce端小文件合并(即MR任务结束后进行merge)
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.size.per.task=256000000;
set hive.merge.smallfiles.avgsize=256000000;
set dfs.namenode.handler.count=20;--设置该值的一般原则是将其设置为集群大小的自然对数乘以20,即20logN,N为集群大小。 https://blog.csdn.net/turk/article/details/79723963
set mapreduce.task.timeout=36000000;--MapReduce设置参数防止超时
--COST BASED QUERY OPTIMIZATION(CBO) cbo可以优化hive的每次查询,使用CBO,需要开启下面四个选项。
set hive.cbo.enable=true;--如果数据已经根据相同的key做好聚合,则去除多余的map/reduce作业
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
set mapreduce.reduce.shuffle.parallelcopies=10; shuffle开启的fetcher线程数 apache默认5,choudera默认10
-- shuffle使用的内存比例。
mapreduce.reduce.shuffle.input.buffer.percent=0.6:--可以从0.2开始向上调。当只有20%的heap size分配给shuffle buffer的时候不容易出现OOM。
-- 单个shuffle任务能使用的内存限额,设置为0.15,即为 Shuffle内存 * 0.15。
-- 低于此值可以输出到内存,否则输出到磁盘。
mapreduce.reduce.shuffle.memory.limit.percent:
-- shuffle的数据量到Shuffle内存 ** 0.9的时候,启动合并。
mapreduce.reduce.shuffle.merge.percent:设置为0.9。
set mapreduce.reduce.shuffle.memory.limit.percent=0.1;
set hive.map.aggr.hash.percentmemory = 0.25;--Hive Map 端聚合的哈稀存储所占用虚拟机的内存比例。 当内存的Map大小,占到JVM配置的Map进程的25%的时候(默认是50%),就将这个数据flush到reducer去,以释放内存Map的空间。
set hive.map.aggr.hash.force.flush.memory.threshold=0.9 --map端做聚合操作是hash表的最大可用内容,大于该值则会触发flush
set hive.ignore.mapjoin.hint=false; --(默认值:true;是否忽略mapjoin hint 即HQL 语句中的 mapjoin 标记)
set hive.auto.convert.join.noconditionaltask=true; --(默认值:true;将普通的join转化为普通的mapjoin时,是否将多个mapjoin转化为一个mapjoin)
set hive.auto.convert.join.noconditionaltask.size=60000000;--(将多个mapjoin转化为一个mapjoin时,其表的最大值)
set hive.stats.autogather=false;--即插入数据时会优化统计,如此在大的动态分区时load数据后会有一段很长时间的统计,且操作hive元数据表,例如每个分区的文件数,行数等等。耗时比较长时可能会timeout,需要将其设成false。
-- Hadoop任务可能引起OOM错误的原因有很多。一般情况下,首先检查是否重设了hadoop参数:mapred.child.java.opts,一般设为-Xmx2000m,即使用2G的最大堆内存。
-- Hive中可能引起OOM的原因及相关的修复设定如下表所示:
-- 原因:map aggregation
-- map aggregation使用哈希表存储group by/distinct key和他们的aggregation结果。
-- aggregate结果字段过多,或group by/distinct key的散度过大,可能导致内存占用过多。
-- 修复:
-- 减小hive.map.aggr.hash.percentmemory设定(默认为0.5,即使用50%的child堆内存)。
-- 原因:join
-- join需要cache所有相同join key的非驱动表的记录
-- 修复:
-- 检查是否把大表设定为驱动表(大表写在join的最右边)。
-- 如果已经设定正确的驱动表,减小hive.join.emit.interval设定(默认为1000,即每1000行的join结果集输出一次)。
-- 原因:map join
-- map join需要cache全部小表的所有数据
-- 修复:
-- 检查小表是否足够小。如果小表超过1G,考虑不要使用map join。