hadoop上的数据仓库很多,这里为什么说一定要导入到impala中呢?因为impala和hive之间并不是完全互通的,hive支持的某些压缩格式impala并不支持。因为这个问题,笔者还踩了个坑。一个很简单的任务花了1周才完成,在此记录一下。
开始接到的需求是:“把ip 200的机子(以下简称数据源)上的本地日志文件导入到 专门存放数据的ip为 1 2 3的集群(以下简称数据集群)上,要求数据实时、不丢失、不重复”。
实现这个需求有很多方法,在此仅列举三种思路:
- 使用flume将数据导入到hive,然后在impala中直接读取
- 用shell脚本定期拉整个本地文件到spark上,再用hql执行整个文件到指定表的覆盖导入
- 使用flume + sparkSteaming + hbase,每接到1条数据就执行一次插入
第一种思路看起来最好,但是实际上完全不可行,我是完成了整个流程才发现,flume导入到hive的数据只能有一种压缩格式---orc格式,偏偏这种格式impala是不支持的。中间还有个两边的hadoop版本不一致(数据源2.5,数据集群2.6)的问题,导入了缺少的jar包才把数据源机子上的flume跑起来。
进而想不导入到hive而是导入到hdfs上hive表对应的目录下,结果发现flume导入中的数据后缀名为.tmp, hive无法直接识别,即使去掉了后缀名也不行。更蛋疼的是flume进程如果中途被杀,并不会把上一次的.tmp文件自动更改成hive可识别的格式。也就是不能满足我们数据不丢失的要求,需要我们自己来处理这个问题,比较麻烦。更何况flume在hdfs上产生的小文件非常多,查询起来也不是那么快。
第二种思路就比较简单,就是在idea中写spark程序,在maven中添加 spark flume 和spark hive的引用,用spark把数据源机器上的本地文件导入到内存中转化成dataframe,然后调用hive的api把数据存入到数据集群上,这里spark需要配置一下与hive的连接方式。打出jar包放到数据源机器上,然后用crontab每隔5分钟执行一次这个jar就可以了。
这里遇到了scala编译版本不一致的问题等,依赖的jar包也要打入到目标jar中,idea导出jar包时总是报xxx is not acceptable(找不到main方法)等问题。
这里还有个问题,如果是导入到hive中,为什么不直接用hdfs的api直接put进去呢?因为数据在数据源机子上,这个机子是2.5版本的hdfs的一个节点,与数据集群不是一个集群。
那么hive的beeline也可以直接执行bin/beeline -u "jdbc:hive://destination:10000" -e "load local data inpath '/xx/xxx/xxx.txt' into table xx partition(xx='xx')" 这样的指令,为什么不用呢?因为beeline一旦执行了远程连接,在load local data的时候,指定的本地目录是数据集群上destination所在的机子的本地路径了,并不是数据源本地路径!这也是个坑。
第三种思路能满足实时性的要求,但是需要引入新的框架hbase,也不知道impala和hbase的兼容性如何,暂时不予考虑。等以后对实时性的要求高的话再考虑这种思路。
先挖坑,后面贴上flume的配置、agent的各种参数写法、spark与hive的连接、项目的maven依赖、遇到的问题的解决办法等。
这是我从android转到大数据方向做的第一个需求,感觉问题还是比较多的,主要是对公司的hadoop这一堆软件不够熟悉,遇到的大部分是环境问题,解决这些问题不难,难的是怎么定位。这一套流程走下来感觉心里就有底了,原来对数据组的工作不太有自信,怕无法应付工作,这个问题的解决是让自己恢复了在搞android时一样的信心,只要时间够,还是都能解决的。