Hbase - 自定义Rowkey规则

> 在Flink中我们有时候需要分析数据1点到2点的范围,可是经过Region又比较慢,这时候我们就可以定制`TableInputFormat`来实现我们的需求了,我们还可以采用Flink的`DataSet`的方式读取,另外下面还有`Spark`读取的例子。 ## 使用教程 Md5Util.java ``` import org.apache.commons.codec.binary.Hex; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; public class Md5Util { public static String md5(byte[] key) { return md5(key, 0, key.length); } public static String md5(byte[] key, int offset, int length) { try { MessageDigest e = MessageDigest.getInstance("MD5"); e.update(key, offset, length); byte[] digest = e.digest(); return new String(Hex.encodeHex(digest)); } catch (NoSuchAlgorithmException var5) { throw new RuntimeException("Error computing MD5 hash", var5); } } public static String md5(String str) { return md5(str.getBytes()); } public static String md5(String str,int offset, int length) { return md5(str.getBytes(),offset,length); } } ``` 数据`Split`方式 ``` private Connection connection; private Admin admin; @Before public void init() throws Exception { System.setProperty("java.security.krb5.conf", "/etc/krb5.conf"); System.setProperty("sun.security.krb5.debug", "false"); final String user = "hbase/abc.demo.com@DEMO.COM"; final String keyPath = "/home/dounine/kerberos/lake.keytab"; Configuration conf = new Configuration(); conf.addResource("hbase-site.xml"); UserGroupInformation.setConfiguration(conf); UserGroupInformation.loginUserFromKeytab(user, keyPath); connection = ConnectionFactory.createConnection(conf); admin = connection.getAdmin(); } @Test public void createTable() throws IOException { TableName table = TableName.valueOf("logTable1"); TableDescriptorBuilder tableDesc = TableDescriptorBuilder.newBuilder(table); tableDesc.setValue(TableDescriptorBuilder.SPLIT_POLICY,KeyPrefixRegionSplitPolicy.class.getName()); tableDesc.setValue(KeyPrefixRegionSplitPolicy.PREFIX_LENGTH_KEY,"2"); ColumnFamilyDescriptor extCF = ColumnFamilyDescriptorBuilder.newBuilder("ext".getBytes()).build(); ColumnFamilyDescriptor deviceCF = ColumnFamilyDescriptorBuilder.newBuilder("device".getBytes()).build(); ColumnFamilyDescriptor locationCF = ColumnFamilyDescriptorBuilder.newBuilder("location".getBytes()).build(); tableDesc.setColumnFamilies(Arrays.asList(extCF,locationCF,deviceCF)); try { byte[][] splitKeys = new byte[4][]; splitKeys[0] = Bytes.toBytes("00"); splitKeys[1] = Bytes.toBytes("40"); splitKeys[2] = Bytes.toBytes("80"); splitKeys[3] = Bytes.toBytes("c0"); admin.createTable(tableDesc.build(),splitKeys); } catch (IOException e) { e.printStackTrace(); } } ``` `logTable1`数据写入方式 ``` public class HbaseKerberos{ private static final Logger LOGGER = LoggerFactory.getLogger(HbaseKerberos.class); private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"); private static final String TABLE_NAME = "logTable1"; public void insertDataToHbase1(String appKey,List hasDatas) throws IOException { Table table = HbaseUtils.getTable(TABLE_NAME); Long sumCount = 0L; /** * 常规值 */ byte[] extCF = Bytes.toBytes("ext");//CF列族 Random random = new Random(); List rows = new ArrayList<>(); for (Log logEntity : hasDatas) { JSONObject dataJsonObject = logEntity.getData(); JSONObject extJsonObject = dataJsonObject.getJSONObject("ext"); String userId = extJsonObject.getString("userId"); String timeStr = logEntity.getTime().format(dtf); String md5Str = Md5Util.md5(userId); String rowKey = new StringBuilder() .append(md5Str.substring(0,2))//md5出来的前两位最高为ff,00~ff为256位,后期Region可以增加那么多,足够使用了。 .append("|") .append(timeStr)//时间 .append("|") .append(CrcUtil.getCrcValue(appKey)) .append("|") .append(md5Str.substring(2,8)) .append("|") .append(Md5Util.md5(UUID.randomUUID().toString()).substring(0,2)) .toString(); Put row = new Put(Bytes.toBytes(rowKey)); for(String keyName : extJsonObject.keySet()){ String value = extJsonObject.getString(keyName); if(StringUtils.isNotBlank(value)){ row.addColumn(extCF, Bytes.toBytes(keyName), Bytes.toBytes(value)); } } row.addColumn(extCF, Bytes.toBytes("time"), Bytes.toBytes(logEntity.getTime().toString())); /** * 设备信息 */ putFieldToRow(logEntity.getData(),"device",row); /** * 位置信息 */ putFieldToRow(logEntity.getData(),"location",row); rows.add(row); } for(Integer[] durtation : LimitUtil.getLimits(rows.size(),1000)){ Object[] results = new Object[(durtation[1]-durtation[0])]; try { table.batch(rows.subList(durtation[0], durtation[1]),results); } catch (InterruptedException e) { e.printStackTrace(); } sumCount += (durtation[1]-durtation[0]); } LOGGER.info("write data count:" + sumCount); } } ``` `logTable1`数据 ``` 00|20180518203401772|2352356512|4519 column=ext:appKey, timestamp=1533646292389, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e f3|f1 00|20180518203401772|2352356512|4519 column=ext:channelCode, timestamp=1533646292389, value=guanlan-resurrection-002- f3|f1 00|20180518203401772|2352356512|4519 column=ext:createDateTime, timestamp=1533646292389, value=1526646836093 f3|f1 00|20180518203401772|2352356512|4519 column=ext:retain, timestamp=1533646292389, value=17670 f3|f1 00|20180518203401772|2352356512|4519 column=ext:scene, timestamp=1533646292389, value=1007 f3|f1 00|20180518203401772|2352356512|4519 column=ext:shareId, timestamp=1533646292389, value=ogJmG5ItE_nBCS3pg5XCvGotGI1c f3|f1 00|20180518203401772|2352356512|4519 column=ext:time, timestamp=1533646292389, value=2018-05-18T20:34:01 f3|f1 00|20180518203401772|2352356512|4519 column=ext:type, timestamp=1533646292389, value=login_in f3|f1 00|20180518203401772|2352356512|4519 column=ext:userId, timestamp=1533646292389, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ f3|f1 00|20180518203406167|2352356512|4519 column=ext:appKey, timestamp=1533646347725, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e f3|54 00|20180518203406167|2352356512|4519 column=ext:channelCode, timestamp=1533646347725, value=guanlan-regular-001- f3|54 00|20180518203406167|2352356512|4519 column=ext:createDateTime, timestamp=1533646347725, value=1526646839075 f3|54 00|20180518203406167|2352356512|4519 column=ext:retain, timestamp=1533646347725, value=17670 f3|54 00|20180518203406167|2352356512|4519 column=ext:shareId, timestamp=1533646347725, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ f3|54 00|20180518203406167|2352356512|4519 column=ext:time, timestamp=1533646347725, value=2018-05-18T20:34:06 f3|54 00|20180518203406167|2352356512|4519 column=ext:type, timestamp=1533646347725, value=sharesuccess f3|54 00|20180518203406167|2352356512|4519 column=ext:userId, timestamp=1533646347725, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ f3|54 00|20180518203407144|2352356512|5ca1 column=ext:appKey, timestamp=1533646294045, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e c4|bc 00|20180518203407144|2352356512|5ca1 column=ext:createDateTime, timestamp=1533646294045, value=1526646849745 c4|bc 00|20180518203407144|2352356512|5ca1 column=ext:retain, timestamp=1533646294045, value=17670 c4|bc 00|20180518203407144|2352356512|5ca1 column=ext:scene, timestamp=1533646294045, value=1037 c4|bc 00|20180518203407144|2352356512|5ca1 column=ext:time, timestamp=1533646294045, value=2018-05-18T20:34:07 c4|bc 00|20180518203407144|2352356512|5ca1 column=ext:type, timestamp=1533646294045, value=login_in ``` CustomTableInputFormat.java ``` import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.mapreduce.RegionSizeCalculator; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableSplit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.net.DNS; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; public class CustomTableInputFormat extends TableInputFormat { private HashMap reverseDNSCacheMap = new HashMap<>(); private List keys = new ArrayList<>(); public CustomTableInputFormat(){ super(); for(int i =0;i<256;i++){ keys.add(StringUtils.substring("00"+Integer.toHexString(i),-2)); } } @Override public List getSplits(JobContext context) throws IOException { super.initialize(context); TableName tableName = super.getTable().getName(); RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(getRegionLocator(), getAdmin()); List splits = new ArrayList<>(); for (String key : keys) { HRegionLocation location = getRegionLocator().getRegionLocation(Bytes.toBytes(key), false); InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); InetAddress regionAddress = isa.getAddress(); String regionLocation; regionLocation = reverseDNS(regionAddress); byte[] regionName = location.getRegion().getRegionName(); String encodedRegionName = location.getRegion().getEncodedName(); long regionSize = sizeCalculator.getRegionSize(regionName); byte[] splitStart = Bytes.add(Bytes.toBytes(key+"|"),this.getScan().getStartRow()); byte[] splitStop = Bytes.add(Bytes.toBytes(key+"|"),this.getScan().getStopRow()); TableSplit split = new TableSplit(tableName, this.getScan(), splitStart, splitStop, regionLocation, encodedRegionName, regionSize); splits.add(split); } return splits; } String reverseDNS(InetAddress ipAddress) throws UnknownHostException { String hostName = this.reverseDNSCacheMap.get(ipAddress); if (hostName == null) { String ipAddressString = null; try { ipAddressString = DNS.reverseDns(ipAddress, null); } catch (Exception e) { ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName(); } if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress); hostName = Strings.domainNamePointerToHostName(ipAddressString); this.reverseDNSCacheMap.put(ipAddress, hostName); } return hostName; } } ``` ## Flink例子 ``` static Configuration conf; static { HadoopKrbLogin.login(); conf = new Configuration(); String tableName = "logTable1"; conf.addResource("hbase-site.xml"); Scan scan = new Scan(); scan.setCaching(1000); scan.withStartRow("201805182039".getBytes()); scan.withStopRow("201805182040".getBytes()); scan.setCacheBlocks(false); conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, tableName); ClientProtos.Scan proto = null; try { proto = ProtobufUtil.toScan(scan); } catch (IOException e) { e.printStackTrace(); } String ScanToString = Base64.encodeBytes(proto.toByteArray()); conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN, ScanToString); } public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource> hbase = env.createInput( HadoopInputs.createHadoopInput( new CustomTableInputFormat(), ImmutableBytesWritable.class, Result.class, Job.getInstance(conf) ) ); DataSet toTuple = hbase.map( new MapFunction, LogEntity>() { public LogEntity map(Tuple2 record) throws Exception { Result result = record.f1; return result2Entity(result); } }); } private static LogEntity result2Entity(Result result) { JSONObject root = new JSONObject(); JSONObject ext = new JSONObject(); JSONObject device = new JSONObject(); JSONObject location = new JSONObject(); for (Cell cell : result.rawCells()) { byte[] family = CellUtil.cloneFamily(cell); byte[] column = CellUtil.cloneQualifier(cell); byte[] value = CellUtil.cloneValue(cell); String columnName = Bytes.toString(column); if ("ext".equals(Bytes.toString(family))) { if ("durationTime".equals(columnName)) { ext.put(columnName, Bytes.toLong(value)); } else if ("time".equals(columnName)) { root.put(columnName, Bytes.toString(value)); root.put("timeLong", DateUtil.getMill(LocalDateTime.parse(Bytes.toString(value)))); } else { ext.put(columnName, Bytes.toString(value)); } } else if ("device".equals(Bytes.toString(family))) { device.put(columnName, Bytes.toString(value)); } else if ("location".equals(Bytes.toString(family))) { location.put(columnName, Bytes.toString(value)); } } JSONObject data = new JSONObject(); if (device.keySet().size() > 0) { data.put("device", device); } if (location.keySet().size() > 0) { data.put("location", location); } data.put("ext", ext); root.put("data", data); return JSON.parseObject(root.toString(), LogEntity.class); } ``` ## Spark 例子 ``` public class SimpleApp implements Serializable {  static Configuration cfg = null; static { HadoopKrbLogin.login(); cfg = new Configuration(); String tableName = "logTable1"; cfg.addResource("hbase-site.xml"); Scan scan = new Scan(); scan.setCaching(1000); scan.withStartRow("201805182039".getBytes()); scan.withStopRow("201805182040".getBytes()); scan.setCacheBlocks(false); cfg.set(TableInputFormat.INPUT_TABLE, tableName); ClientProtos.Scan proto = null; try { proto = ProtobufUtil.toScan(scan); } catch (IOException e) { e.printStackTrace(); } String ScanToString = Base64.encodeBytes(proto.toByteArray()); cfg.set(TableInputFormat.SCAN, ScanToString); } public static void main(String[] args) { SparkConf sparkConf = new SparkConf() .setMaster("local") .setAppName("HbaseDemo"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); JavaPairRDD hBaseRDD = jsc.newAPIHadoopRDD(cfg, CustomTableInputFormat.class, ImmutableBytesWritable.class, Result.class); // do some transformation JavaRDD rdd1 = hBaseRDD.mapPartitions((FlatMapFunction>, LogEntity>) tuple2Iterator -> { List logEntities = new ArrayList<>(); while (tuple2Iterator.hasNext()) { Tuple2 tuple = tuple2Iterator.next(); Result result = tuple._2; String rowKey = Bytes.toString(result.getRow()); logEntities.add(result2Entity(result)); } return logEntities.iterator(); }); } private static LogEntity result2Entity(Result result) { JSONObject root = new JSONObject(); JSONObject ext = new JSONObject(); JSONObject device = new JSONObject(); JSONObject location = new JSONObject(); for (Cell cell : result.rawCells()) { byte[] family = CellUtil.cloneFamily(cell); byte[] column = CellUtil.cloneQualifier(cell); byte[] value = CellUtil.cloneValue(cell); String columnName = Bytes.toString(column); if ("ext".equals(Bytes.toString(family))) { if ("durationTime".equals(columnName)) { ext.put(columnName, Bytes.toLong(value)); } else if ("time".equals(columnName)) { root.put(columnName, Bytes.toString(value)); root.put("timeLong", DateUtil.getMill(LocalDateTime.parse(Bytes.toString(value)))); } else { ext.put(columnName, Bytes.toString(value)); } } else if ("device".equals(Bytes.toString(family))) { device.put(columnName, Bytes.toString(value)); } else if ("location".equals(Bytes.toString(family))) { location.put(columnName, Bytes.toString(value)); } } JSONObject data = new JSONObject(); if (device.keySet().size() > 0) { data.put("device", device); } if (location.keySet().size() > 0) { data.put("location", location); } data.put("ext", ext); root.put("data", data); return JSON.parseObject(root.toString(), LogEntity.class); } ``` --- ![](https://upload-images.jianshu.io/upload_images/9028759-07315bb8dadcd082.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)w
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容

  • 在Flink中我们有时候需要分析数据1点到2点的范围,可是经过Region又比较慢,这时候我们就可以定制Table...
    kikiki2阅读 167评论 0 2
  • > 在Flink中我们有时候需要分析数据1点到2点的范围,可是经过Region又比较慢,这时候我们就可以定制`Ta...
    kikiki5阅读 131评论 0 3
  • 在Flink中我们有时候需要分析数据1点到2点的范围,可是经过Region又比较慢,这时候我们就可以定制Table...
    kikiki2阅读 200评论 0 1
  • 在Flink中我们有时候需要分析数据1点到2点的范围,可是经过Region又比较慢,这时候我们就可以定制Table...
    kikiki2阅读 122评论 0 1
  • 在Flink中我们有时候需要分析数据1点到2点的范围,可是经过Region又比较慢,这时候我们就可以定制Table...
    大猪大猪阅读 201评论 0 1