spark-streaming消费kafka数据,采用buffermutator写hbase踩坑记
场景
otter同步mysql的数据到kafka,spark-streaming消费kafka,采用mutator异步写hbase。mysql的insert,update,delete特别频繁,每秒都有大量的insert,delete,update
以下都是基于同一条数据操作
1. 坑 Delete(byte [] row), Put(byte[] row),
bufferedmutator写hbase的时候 同一条数据先 delete 再put ,
遇到的问题:
hbase显示没这个数据;就是bufferedmutator处理数据乱序了(每次测试都是hbase找不到数据)
解决的途径:
遇坑后,百度找到https://developer.aliyun.com/ask/129312?spm=a2c6h.13159736这篇文章。意思就是new 对象 加上时间戳
2.坑 Delete(byte [] row, long timestamp) ,Put(byte[] row, long ts)
代码里时间戳使用System.currentTimeMillis()
遇到的问题:
spark-streaming处理数据的时候,一个批次rdd遍历起来很快,难免两个System.currentTimeMillis()一样,此时就和方法1一样,达不到先delete,后insert的效果,依然没有解决(反复测试,确实有两个时间戳一样)
解决的途径:
上边的时间戳加1操作,使两个时间戳不一样
参照https://issues.apache.org/jira/browse/HBASE-8626?focusedCommentId=13669455&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13669455 hbase的issue就是上边mutator踩坑实录
longnow =System.currentTimeMillis();
Delete delete =newDelete(row);
delete.deleteFamily(cf1, now);
Put put1 =newPut(row);
put1.add(cf1,col1,now+1);