Hive 优化策略

排序选择

  • cluster by:
    对同一字段分桶并排序,不能和 sort by 连用
  • distribute by + sort by:
    分桶,保证同一字段值只存在一个结果文件当中,结合 sort by 保证 每个 reduceTask 结果有序
  • sort by:
    单机排序,单个 reduce 结果有序
  • order by:
    全局排序,缺陷是只能使用一个 reduce
1. order by

Hive中的order by 会对查询的结果做一次全局排序,所以所有的数据都会到同一个reducer进行处理。
缺点:对于大量数据将会消耗很长的时间。
跟传统的sql还有一点区别:如果指定了hive.mapred.mode=strict(默认值是nonstrict), 所有的数据都会在同一个reducer端进行,就必须指定 limit 来限制输出条数。

2. sort by

Hive中指定了sort by,那么在每个reducer端都会做排序,也就是说保证了局部有序(每个reducer出来的数据是有序的,但是不能保证所有的数据是有序的,除非只有一个reducer),好处是:执行了局部排序之后可以为接下去的全局排序提高不少的效率(其实就是做一次归并排序就可以做到全局排序了)。

3. distribute by和sort by一起使用

ditribute by是控制 map 的输出在 reducer 是如何划分的。
ditribute by year:表示所有相同的年份划分到同一个 reducer 。

distribute by必须要写在sort by之前。

4. cluster by

cluster by 的功能就是 distribute by 和 sort by 相结合

left semi join

LEFT SEMI JOIN 是 IN/EXISTS 子查询的一种更高效的实现。

先看 SQL中 IN 和 EXISTS 用法的区别
1. in
select * from A
where A.id in (select B.id from B)

它查出B表中的所有id字段并缓存起来.之后,检查A表的id是否与B表中的id相等,如果相等则将A表的记录加入结果集中,直到遍历完A表的所有记录.

可以看出,当B表数据较大时不适合使用in(),因为它会B表数据全部遍历一次.
如: A表有 10000 条记录, B表有 1000000 条记录,那么最多有可能遍历 10000 * 1000000 次, 效率很差.
再如:A表有10000条记录,B表有100条记录,那么最多有可能遍历10000*100次,遍历次数大大减少,效率大大提升.

结论:in()适合B表比A表数据小的情况。

2. exists
select A.* from A
where exists (select B.id from B where A.id = B.id)

当B表比A表数据大时适合使用exists(),因为它没有那么遍历操作,只需要再执行一次查询就行.

如:A表有10000条记录,B表有1000000条记录,那么exists()会执行10000次去判断A表中的id是否与B表中的id相等.

再如:A表有10000条记录,B表有100条记录,那么exists()还是执行10000次,还不如使用in()遍历10000*100次,因为in()是在内存里遍历比较,而exists()需要查询数据库,我们都知道查询数据库所消耗的性能更高,而内存比较很快.

结论:exists()适合B表比A表数据大的情况

left semi join 使用场景

上面的例子 应该转换成:

select A.id from A left semi join B on A.id = B.id;

更多关于 left semi join 的,请参考:https://blog.csdn.net/djy37010/article/details/72780827

怎样做笛卡尔积

什么是 笛卡尔积?

join 操作都会产生 笛卡尔积。

尽量使用 Map Join。

设置合理的 maptask 数量

  • Map 数过大
    Map 阶段输出文件太小,产生大量小文件
    初始化和创建 Map 的开销很大
  • Map 数太小
    文件处理或查询并发度小,Job 执行时间过长
    大量作业时,容易堵塞集群

在 MapReduce 的编程案例中,我们得知,一个MR Job的 MapTask 数量是由输入分片 InputSplit 决定的。而输入分片是由 FileInputFormat.getSplit()决定的。一个输入分片对应一个 MapTask, 而输入分片是由三个参数决定的:


输入分片大小的计算是这么计算出来的:
long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))

默认情况下,输入分片大小和 HDFS 集群默认数据块大小一致,也就是默认一个数据块,启 用一个 MapTask 进行处理,这样做的好处是避免了服务器节点之间的数据传输,提高 job 处 理效率

两种经典的控制 MapTask 的个数方案:减少 MapTask 数或者增加 MapTask 数

1、 减少 MapTask 数是通过合并小文件来实现,这一点主要是针对数据源
2、 增加 MapTask 数可以通过控制上一个 job 的 reduceTask 个数

默认情况下, MapReduce 中一个 MapTask 或者一个 ReduceTask 就会启动一个 JVM 进程,一个 Task 执行完毕后, JVM 进程就退出。这样要多次启动 JVM ,启动时间会变成一个比较大的消耗, 这个时候,就可以通过重用 JVM 来解决:

set mapred.job.reuse.jvm.num.tasks=5

小文件合并

文件数目过多,会给 HDFS 带来压力,并且会影响处理效率,可以通过合并 Map 和 Reduce 的 结果文件来消除这样的影响:

set hive.merge.mapfiles = true ##在 map only 的任务结束时合并小文件
set hive.merge.mapredfiles = false ## true 时在 MapReduce 的任务结束时合并小文件
set hive.merge.size.per.task = 256*1000*1000 ##合并文件的大小
set mapred.max.split.size=256000000; ##每个 Map 最大分割大小
set mapred.min.split.size.per.node=1; ##一个节点上 split 的最少值
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; ##执行 Map 前进行小文件合并

设置合理的 reduceTask 的数量

依据 Hadoop 的经验,可以将参数 2 设定为 0.95 * (集群中 datanode 个数).

1. Hive自己如何确定reduce数:

reduce个数的设定极大影响任务执行效率,不指定reduce个数的情况下,Hive会猜测确定一个reduce个数,基于以下两个参数设定:
1.hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1000^3=1G)

  1. hive.exec.reducers.max(每个任务最大的reduce数,默认为999)

计算reducer数的公式很简单:N = min(参数2,总输入数据量 / 参数1)
即,如果reduce的输入(map的输出)总大小不超过1G, 那么只会有一个reduce任务;总大小为9G多,因此有10个reduce.

2. 调整reduce个数方法一:

调整 hive.exec.reducers.bytes.per.reducer 参数的值;
set hive.exec.reducers.bytes.per.reducer = 500000000; (500M)
总大小为9G多, 这次有20个reduce

同时,还要注意 hive.exec.reducers.max 的值,最终的 ReduceTask 数量都要结合上面的公式。

3. 调整reduce个数方法二;

set mapred.reduce.tasks = 15;
这次有15个reduce

4. reduce个数并不是越多越好;

同map一样,启动和初始化reduce也会消耗时间和资源;
另外,有多少个reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;

合理利用分桶 :Bucketing 和 Sampling

Bucket (桶)

对于每一个表(table)或者分区, Hive可以进一步组织成桶,也就是说桶是更为细粒度的数据范围划分。Hive也是 针对某一列进行桶的组织。Hive采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中。
把表(或者分区)组织成桶(Bucket)有两个理由:

  1. 获得更高的查询处理效率。桶为表加上了额外的结构,Hive 在处理有些查询时能利用这个结构。具体而言,连接两个在(包含连接列的)相同列上划分了桶的表,可以使用 Map 端连接 (Map-side join)高效的实现。比如JOIN操作。对于JOIN操作两个表有一个相同的列,如果对这两个表都进行了桶操作。那么将保存相同列值的桶进行JOIN操作就可以,可以大大较少JOIN的数据量。

  2. 使取样(sampling)更高效。在处理大规模数据集时,在开发和修改查询的阶段,如果能在数据集的一小部分数据上试运行查询,会带来很多方便。

创建带桶的 table :

create table bucketed_user(id int,name string) clustered by (id) 
into 4 buckets row format delimited fields terminated by '\t' stored as textfile; 

我们来看如何告诉Hive—个表应该被划分成桶。我们使用CLUSTERED BY 子句来指定划分桶 所用的列 和 要划分的桶的个数。

我们使用 用户ID 来确定如何划分桶 (Hive使用对值进行哈希并将结果除 以桶的个数取余数,所有余数相同的 ID,被划分到一个桶中。

对桶中的数据进行采样

select * from bucketed_user tablesample(bucket 1 out of 4 on id);

注:tablesample 是抽样语句,语法:tablesample(bucket x out of y)

  • y 必须是 table 总 bucket 数的倍数或者因子。
    hive 根据 y 的大小,决定抽样的比例。
    例如,table 总共分了 64 份,当 y=32 时,抽取 (64/32=2) 个 bucket 的数据;
    当 y=128 时,抽取 (64/128=1/2) 个bucket的数据。

  • x表示从哪个bucket开始抽取。
    例如,table 总 bucket 数为 32,tablesample(bucket 3 out of 16),表示总共抽取(32/16=)2个bucket的数据,分别为第3个bucket 和第(3+16=)19个bucket 的数据。

合理利用分区:Partition

Partition 就是分区。
分区通过在创建表时启用 partitioned by 实现。
要查询某一分区的内容时 可以采用 where 语句,形似 where tablename.partition_column = a 来实现。

Join 优化

总体原则:

  1. 优先过滤后再进行 Join 操作,最大限度的减少参与 join 的数据量
  2. 小表 join 大表,最好启动 map join
  3. join 表的排列顺序 按照表的数据量从小到大

在使用写有 Join 操作的查询语句时有一条原则:应该将条目少的表/子查询放在 Join 操作 符的左边。原因是在 Join 操作的 Reduce 阶段,位于 Join 操作符左边的表的内容会被加 载进内存,这样可以有效减少发生 OOM 错误的几率。

在编写 Join 查询语句时,如果确定是由于 join 出现的数据倾斜,那么请做如下设置:

  • set hive.optimize.skewjoin = true;
    是否开启数据倾斜的 join 优化,默认不开启 false
  • set hive.skewjoin.key = 100000;
    判断数据倾斜的阈值,如果在 join 中发现同样的 key 超过该值则认为是该 key 是倾斜的join key,默认是100000;

Group By 优化

1、Map 端部分聚合

并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进 行部分聚合,最后在 Reduce 端得出最终结果。

MapReduce 的 combiner 组件参数包括:

  • set hive.map.aggr = true
    map 端聚合是否开启,默认开启
  • set hive.groupby.mapaggr.checkinterval = 100000
    map端做聚合时,group by 的 key 所允许的数据行数,超过该值则进行分拆,默认是100000;
2、使用 Group By 有数据倾斜的时候进行负载均衡

set hive.groupby.skewindata = true

当 sql 语句使用 groupby 时数据出现倾斜时,如果该变量设置为 true,那么 Hive 会自动进行 负载均衡。策略就是把 MR 任务拆分成两个:第一个先做预汇总,第二个再做最终汇总

  • 阶段一:
    Map的输出结果随机分配到 reduce 做预汇总, 减少某些key值条数过多 某些key条数过小 造成的数据倾斜问题,达到负载均衡的目的。

  • 阶段二:
    再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成 最终的聚合操作。

但如果要在查询语句中对 多列进行去重统计时 会报错

合理利用文件存储格式

创建表时,尽量使用 parquet 列式存储格式。

创建 parquet 格式的数据表:
hive> CREATE TABLE parquet_table(age INT, name STRING)STORED AS PARQUET;

本地模式执行 MapReduce

当 Hive 查询处理的数据量比较小时,可以只使用本地模式来执行 mapreduce job,只在一台机器上 执行,速度会很快。启动本地模式涉及到三个参数:

set hive.exec.mode.local.auto=true 是打开 hive 自动判断是否启动本地模式的开关,但是只 是打开这个参数并不能保证启动本地模式,要当 map 任务数不超过 第二个参数 的个数并且 map 输入文件大小不超过 第三个参数 所指定的大小时,才能启动本地模式。

并行化处理

一个 hive sql 语句可能会转为多个 mapreduce Job,每一个 job 就是一个 stage,这些 job 顺序 执行。

但是有时候这些任务之间并不是是相互依赖的, 如果集群资源允许的话,可以让多个并不相互依赖 stage 并发执行,这样就节约了时间,提 高了执行速度.

但是如果集群资源匮乏时,启用并行化反倒是会导致各个 job 相互抢占资源 而导致整体执行性能的下降。

启用并行化:

set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8; //同一个 sql 允许并行任务的最大线程数

设置压缩存储

1、压缩的原因

Hive 最终是转为 MapReduce 程序来执行的,而 MapReduce 的性能瓶颈在于网络 IO 和 磁盘 IO,要解决性能瓶颈,最主要的是减少数据量,对数据进行压缩是个好的方式。压缩 虽然是减少了数据量,但是压缩过程要消耗 CPU 的,但是在 Hadoop 中, 往往性能瓶颈不 在于 CPU,CPU 压力并不大,所以压缩充分利用了比较空闲的 CPU

2、常用压缩方法对比

可以看到,压缩率越高,速度越慢。

3、压缩方式的选择

压缩比率
压缩解压缩速度
是否支持 Split

4、压缩使用

Job 输出文件按照 block 以 GZip 的方式进行压缩:

set mapreduce.output.fileoutputformat.compress=true // 默认值是 false

set mapreduce.output.fileoutputformat.compress.type=BLOCK // 默认值是 Record

set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec // 默认值是 org.apache.hadoop.io.compress.DefaultCodec

Map 输出结果也以 Gzip 进行压缩:

set mapred.map.output.compress=true

set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec // 默认值是 org.apache.hadoop.io.compress.DefaultCodec

对 Hive 输出结果和中间都进行压缩:

set hive.exec.compress.output=true // 默认值是 false,不压缩

set hive.exec.compress.intermediate=true // 默认值是 false,为 true 时 MR 设置的压缩才启用

参考

http://www.cnblogs.com/qingyunzong/p/8847775.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容