在数据仓库建设,处理日常业务需求的过程中,经常会遇到各种各样的问题。在处理更新操作数据时,遇到数据重复问题。前期因为没考虑到关系型数据库update操作,把数据同步到hive时有数据重复问题,在业务日常报表分析时,带来很大困扰。今天就简单来说一下,遇到这样的场景该如何处理。其实只要用到hive的窗口函数即可解决。
业务背景
首先,要来谈谈为啥会遇到这个问题。如何不结合业务背景讨论这个问题,那么是毫无意义的。我们公司在建设数仓过程中,使用的是第三方平台,类似阿里云这样的平台。公司购买的第三方产品,省去了底层的开发工作。很多东西,都是已经封装完成的,拿来使用即可。
数据仓库建设,使用传统关系型数据库进行数据存储,显然是不现实的。我们需要把关系型数据库,如mysql,oracle等库里的数据同步到分布式文件系统(HDFS)上,我们公司使用的是hive这种非关系型数据库。注意:hive的元数据管理使用的是mysql数据库,但是业务数据都是存储在HDFS上的。
从关系型数据库同步数据的时候,第一次选择全量同步。因为数据量非常大,单个表的数据就几百G不等。但是后续继续使用全量同步显然是不显示的,只能考虑使用增量同步,来同步增量数据,使用 修改时间 这个字段进行判断数据是否为新增数据。
在日常生产中,使用关系型数据库,大家知道有update更新操作。比如某个用户的状态改变以后,只需要在数据库中更改这条已经存在的记录即可。
但是这种更新我们需要实时的同步到hive中,可是hive并没有update的更新操作。把关系型数据库中更新的数据同步到hive中,就会存在数据重复的问题。(因为以前可能已经同步过这条数据)。那么怎么解决这个问题呢?
曲线救国,选择出最新的记录
hive没有update的更新操作。但是我们可以通过hive的序列函数和窗口函数来解决这个问题,非常简单。使用row_number函数即可。
hive的row_number函数
row_number函数:会对所有数值,输出不同的序号,序号唯一且连续,如:1、2、3、4、5、6、7。利用row_number函数的这种特性选择出其中一条我们需要的记录即可。
造一些简单的数据进行验证
id update state password username
001 2020-08-03 15:33:24 1 123456 用户1
002 2020-08-03 12:31:21 1 abcd12 用户2
003 2020-08-04 09:05:44 1 223wer 用户3
001 2020-08-04 16:01:35 1 123abc 用户1
在关系型数据库,直接对001这条记录更新,修改对应的密码即可。但是这条更新记录会被同步记录到hive中,这样就存在两条记录。显然我们要的是最新时间的记录。使用row_number选择出最新的记录。
使用下面的函数即可选择出最新的一条记录
SELECT
id ,
update ,
state ,
password ,
username
FROM
(
SELECT
id ,
update ,
state ,
password ,
username ,
row_muber over (partition by id order by update desc) num
FROM
table
) t
where t.num = 1
#其中,子查询生成的中间临时结果是这样的,结果如下:
id update state password username num
001 2020-08-04 16:01:35 1 123abc 用户1 1
001 2020-08-03 15:33:24 1 123456 用户1 2
002 2020-08-03 12:31:21 1 abcd12 用户2 1
003 2020-08-04 09:05:44 1 223wer 用户3 1
#最后最终生成的结果如下:
id update state password username
001 2020-08-04 16:01:35 1 123abc 用户1
002 2020-08-03 12:31:21 1 abcd12 用户2
003 2020-08-04 09:05:44 1 223wer 用户3
通过上面的方法即可选择出最新的一条记录。在hive中,窗口函数是经常使用的一种函数,用好了窗口函数,平时开发写sql中会带来很多方便。
小问题?如果一个查询语句,有两个row_number(),而且两个分组一样,但排序不一样,那么查询出来的数据会跟着哪个走呢?
比如下面的查询语句,最后的结果是按照id还是aga进行排行返回呢?
select
sex,
name,
id,
age,
row_number() over(partition by sex order by id desc) as id,
row_number() over(partition by sex order by age desc) as age
from
student;
答案:最后的结果是根据age来进行排行的,也就是根据最外围的row_number() 进行排行,有兴趣的可以自己验证一下。