hive运行一个查询,可能会由于各种原因失败,但不应该出现执行成功,但数据结果不正确。同样的sql查询,同样的数据,却出现了某一次查询,没有报错,但数据异常,且只此一次,再也无法重现了。经过几天的排查,终于找到了原因。
经过:
2018-04-02凌晨,数仓同学收到数据质量报警,某个字段的唯一性检查没有通过。一般情况下,这种问题是由脏数据引起的。然而这一次排查发现上游数据没有问题,于是数仓同学尝试直接通过hive命令进行操作,这次结果正确了,能通过数据质量检查。之后我又重试了很多次,再也无法重现数据错误的那种情况。
调研过程:
1.排除了数据源的数据变化,以及查询语句的改动。
2.看那一次查询的运行日志,找到其中的job的日志,与正常情况下的job日志对比
循着explain出来的执行计划往前追溯,找到异常最早出现在stage1的job。对比如下:
在错误的case下,reduce input records 小于map output records。这里combine 的records为0,且有相同数据的查询做为对比,不会是因为某些key被过滤而引起的。所以这里这是一个异常情况。
看一下执行计划,这是一个普通的join的job
再看一下这个job的执行情况
maps和reduces各有三次被kill
由于speculation execution 被kill是常见的,但是可见有一个map 和一个reduce是由于NodeX:8800 不可用引起的。
到这里变成了两个问题:1. NodeX:8800 为什么会不可用;2.为什么这个不可用会造成数据结果有问题
对于第一个问题,首先找到task的日志中不可用的具体时间,在00:33左右,然后查看node manager的日志,但是没找到线索。又想到我们集群设置了如果某个datanode的磁盘使用率如果大于90%的话,node manager会通知resource manager当前这个节点不可用。查看NodeX的当前状态,果然当前的使用率在87%,已经很接近90%。凌晨同时加上map和reduce任务,超90%也在情理之中,基本上就是这个原因了。那么为什么这个节点负载这么高,平时我们hdfs也会做均衡。经过排查,发现有一个job已经运行了3天还没跑完,应该是数据倾斜了,其有一个reduce task正是运行在NodeX上,kill之后负载就下来了。
继续看第二个问题,由于是reduce的input records不对,首先怀疑reduce阶段的问题。把四个成功reduce的日志放到一起,统计reduce的读写数据量,并与正确情况对比。可惜对比两次正确的执行,发现其中的读写数据量也会有少量差异,可能与中间结果的压缩有关。然后考虑对比每个reduce的output records,找到具体哪个reduce出的错,然而对比两次正确的执行,其中每个reduce 的output records量仍然不同,也就是说每次执行其分区结果并不完全一样。
从这些log中得到信息只有:某一个reduce读取的map out put 文件是新的map task生成的,而其他三个reduce读取的是由于NodeX不可用被kill的task生成的。但是即使如此,也不应该有问题,因为两个task生成的map output应该是一样的。
到这里没线索了,干脆来理一下当时的场景:
1.这个job正在执行,共19个map task
2.map task 都正确输出了output file,4 个 reduce task 启动了
3.其中三个reduce task处理的数据量较小,已经读取了NodeX上的map task的输出: attempt_1496659744394_1860537_m_000003_0
4.NodeX由于磁盘负载问题不可用,RM通知AM,AM收到通知后,kill了NodeX上的map task 和reduce task
5.AM启动了新的map task,执行结束后生成output file: attempt_1496659744394_1860537_m_000003_1
6.AM启动了新的reduce task,读取attempt_1496659744394_1860537_m_000003_1
其中还有几个因为speculation execution被kill的task,以及被map task 抢占的reduce task
在这个过程中,问题只可能出在5,6步中新建的map和reduce task中,怀疑是新的map和reduce task中的分区方式不一样。关于分区方式其实有一个疑问,就是上文加粗部分,为什么每次执行,分区结果都是不同的?把map和reduce的日志级别改到TRACE,在日志中寻找关于分区的线索,没找到; 又仔细看了hive的代码,join的时候会以join columns的值生成 partition key,然后生成hashcode ,最后在collect的时候对hashcode取余进行分区,这其中每一步都是deterministic的,固定的输入最后的分区结果一定是一样的。问题到底出在哪里?
似乎又没线索了,于是把从输入查询语句到执行结束整个过程再梳理一遍。分析了查询语句之后,终于搞清楚了。查询语句中有如下用法:
...
from (select
...
if(x_id>0,cast(concat(substr(x_id,0,2),'0000') as int),cast(ceiling(rand()*-65535) as bigint)) AS y_id,
...
from db.tbl) a
left join(select id,name from db.tbl2)l on a.y_id=l.id
...
为了解决数据倾斜问题,y_id中的一部分数据是随机生成的,而y_id又是join key,所以一个split中会有一部分数据是随机分区的。在上述这种不幸的场景下,示意图如下,一共4个reduce task,三个从attempt_1496659744394_1860537_m_000003_0获取分区数据,一个从attempt_1496659744394_1860537_m_000003_1获取分区数据,由于部分数据key的随机性,这两个map output的文件中的分区是不一致的。结果就是有的数据会重复取,有的数据没取到。
那么怎么解决这个问题呢?几种办法
1.改MR框架,在这种两次map的output 中分区不一致的情况,当某个map task被kill,执行了新的map task以后,所以的reduce task都需要重新执行,即使是已经成功的reduce task
2.改hive,在hive层确保某个map task即使被执行多次,每次执行的分区结果都要一致
上述两种改法代价都是比较大的,这种有随机join key生成的job本身就比较少,而恰好只有部分reduce task成功的间隙,某个map task所在的data node不可用的概率也是比较小的。
3.workaround:改查询语句,在生成随机的列值之后先插入某个中间表,然后再取出来做join,这时map task的分区就是确定的了。