Hbase - 自定义Rowkey规则

> 在Flink中我们有时候需要分析数据1点到2点的范围,可是经过Region又比较慢,这时候我们就可以定制`TableInputFormat`来实现我们的需求了,我们还可以采用Flink的`DataSet`的方式读取,另外下面还有`Spark`读取的例子。

## 使用教程

Md5Util.java

```

import org.apache.commons.codec.binary.Hex;

import java.security.MessageDigest;

import java.security.NoSuchAlgorithmException;

public class Md5Util {

    public static String md5(byte[] key) {

        return md5(key, 0, key.length);

    }

    public static String md5(byte[] key, int offset, int length) {

        try {

            MessageDigest e = MessageDigest.getInstance("MD5");

            e.update(key, offset, length);

            byte[] digest = e.digest();

            return new String(Hex.encodeHex(digest));

        } catch (NoSuchAlgorithmException var5) {

            throw new RuntimeException("Error computing MD5 hash", var5);

        }

    }

    public static String md5(String str) {

        return md5(str.getBytes());

    }

    public static String md5(String str,int offset, int length) {

        return md5(str.getBytes(),offset,length);

    }

}

```

数据`Split`方式

```

private Connection connection;

    private Admin admin;

    @Before

    public void init() throws Exception {

        System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");

        System.setProperty("sun.security.krb5.debug", "false");

        final String user = "hbase/abc.demo.com@DEMO.COM";

        final String keyPath = "/home/dounine/kerberos/lake.keytab";

        Configuration conf = new Configuration();

        conf.addResource("hbase-site.xml");

        UserGroupInformation.setConfiguration(conf);

        UserGroupInformation.loginUserFromKeytab(user, keyPath);

        connection = ConnectionFactory.createConnection(conf);

        admin = connection.getAdmin();

    }

@Test

    public void createTable() throws IOException {

        TableName table = TableName.valueOf("logTable1");

        TableDescriptorBuilder tableDesc = TableDescriptorBuilder.newBuilder(table);

        tableDesc.setValue(TableDescriptorBuilder.SPLIT_POLICY,KeyPrefixRegionSplitPolicy.class.getName());

        tableDesc.setValue(KeyPrefixRegionSplitPolicy.PREFIX_LENGTH_KEY,"2");

        ColumnFamilyDescriptor extCF = ColumnFamilyDescriptorBuilder.newBuilder("ext".getBytes()).build();

        ColumnFamilyDescriptor deviceCF = ColumnFamilyDescriptorBuilder.newBuilder("device".getBytes()).build();

        ColumnFamilyDescriptor locationCF = ColumnFamilyDescriptorBuilder.newBuilder("location".getBytes()).build();

        tableDesc.setColumnFamilies(Arrays.asList(extCF,locationCF,deviceCF));

        try {

            byte[][] splitKeys = new byte[4][];

            splitKeys[0] = Bytes.toBytes("00");

            splitKeys[1] = Bytes.toBytes("40");

            splitKeys[2] = Bytes.toBytes("80");

            splitKeys[3] = Bytes.toBytes("c0");

            admin.createTable(tableDesc.build(),splitKeys);

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

```

`logTable1`数据写入方式

```

public class HbaseKerberos{

    private static final Logger LOGGER = LoggerFactory.getLogger(HbaseKerberos.class);

    private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS");

    private static final String TABLE_NAME = "logTable1";

    public void insertDataToHbase1(String appKey,List<Log> hasDatas) throws IOException {

        Table table = HbaseUtils.getTable(TABLE_NAME);

        Long sumCount = 0L;

        /**

        * 常规值

        */

        byte[] extCF = Bytes.toBytes("ext");//CF列族

        Random random = new Random();

        List<Put> rows = new ArrayList<>();

        for (Log logEntity : hasDatas) {

            JSONObject dataJsonObject = logEntity.getData();

            JSONObject extJsonObject = dataJsonObject.getJSONObject("ext");

            String userId = extJsonObject.getString("userId");

            String timeStr = logEntity.getTime().format(dtf);

            String md5Str = Md5Util.md5(userId);

            String rowKey = new StringBuilder()

                    .append(md5Str.substring(0,2))//md5出来的前两位最高为ff,00~ff为256位,后期Region可以增加那么多,足够使用了。

                    .append("|")

                    .append(timeStr)//时间

                    .append("|")

                    .append(CrcUtil.getCrcValue(appKey))

                    .append("|")

                    .append(md5Str.substring(2,8))

                    .append("|")

                    .append(Md5Util.md5(UUID.randomUUID().toString()).substring(0,2))

                    .toString();

            Put row = new Put(Bytes.toBytes(rowKey));

            for(String keyName : extJsonObject.keySet()){

                String value = extJsonObject.getString(keyName);

                if(StringUtils.isNotBlank(value)){

                    row.addColumn(extCF, Bytes.toBytes(keyName), Bytes.toBytes(value));

                }

            }

            row.addColumn(extCF, Bytes.toBytes("time"), Bytes.toBytes(logEntity.getTime().toString()));

            /**

            * 设备信息

            */

            putFieldToRow(logEntity.getData(),"device",row);

            /**

            * 位置信息

            */

            putFieldToRow(logEntity.getData(),"location",row);

            rows.add(row);

        }

        for(Integer[] durtation : LimitUtil.getLimits(rows.size(),1000)){

            Object[] results = new Object[(durtation[1]-durtation[0])];

            try {

                table.batch(rows.subList(durtation[0], durtation[1]),results);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            sumCount += (durtation[1]-durtation[0]);

        }

        LOGGER.info("write data count:" + sumCount);

    }

}

```

`logTable1`数据

```

00|20180518203401772|2352356512|4519 column=ext:appKey, timestamp=1533646292389, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e                     

f3|f1                                                                                                                                           

00|20180518203401772|2352356512|4519 column=ext:channelCode, timestamp=1533646292389, value=guanlan-resurrection-002-                           

f3|f1                                                                                                                                           

00|20180518203401772|2352356512|4519 column=ext:createDateTime, timestamp=1533646292389, value=1526646836093                                   

f3|f1                                                                                                                                           

00|20180518203401772|2352356512|4519 column=ext:retain, timestamp=1533646292389, value=17670                                                   

f3|f1                                                                                                                                           

00|20180518203401772|2352356512|4519 column=ext:scene, timestamp=1533646292389, value=1007                                                     

f3|f1                                                                                                                                           

00|20180518203401772|2352356512|4519 column=ext:shareId, timestamp=1533646292389, value=ogJmG5ItE_nBCS3pg5XCvGotGI1c                           

f3|f1                                                                                                                                           

00|20180518203401772|2352356512|4519 column=ext:time, timestamp=1533646292389, value=2018-05-18T20:34:01                                       

f3|f1                                                                                                                                           

00|20180518203401772|2352356512|4519 column=ext:type, timestamp=1533646292389, value=login_in                                                   

f3|f1                                                                                                                                           

00|20180518203401772|2352356512|4519 column=ext:userId, timestamp=1533646292389, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ                             

f3|f1                                                                                                                                           

00|20180518203406167|2352356512|4519 column=ext:appKey, timestamp=1533646347725, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e                     

f3|54                                                                                                                                           

00|20180518203406167|2352356512|4519 column=ext:channelCode, timestamp=1533646347725, value=guanlan-regular-001-                               

f3|54                                                                                                                                           

00|20180518203406167|2352356512|4519 column=ext:createDateTime, timestamp=1533646347725, value=1526646839075                                   

f3|54                                                                                                                                           

00|20180518203406167|2352356512|4519 column=ext:retain, timestamp=1533646347725, value=17670                                                   

f3|54                                                                                                                                           

00|20180518203406167|2352356512|4519 column=ext:shareId, timestamp=1533646347725, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ                           

f3|54                                                                                                                                           

00|20180518203406167|2352356512|4519 column=ext:time, timestamp=1533646347725, value=2018-05-18T20:34:06                                       

f3|54                                                                                                                                           

00|20180518203406167|2352356512|4519 column=ext:type, timestamp=1533646347725, value=sharesuccess                                               

f3|54                                                                                                                                           

00|20180518203406167|2352356512|4519 column=ext:userId, timestamp=1533646347725, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ                             

f3|54                                                                                                                                           

00|20180518203407144|2352356512|5ca1 column=ext:appKey, timestamp=1533646294045, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e                     

c4|bc                                                                                                                                           

00|20180518203407144|2352356512|5ca1 column=ext:createDateTime, timestamp=1533646294045, value=1526646849745                                   

c4|bc                                                                                                                                           

00|20180518203407144|2352356512|5ca1 column=ext:retain, timestamp=1533646294045, value=17670                                                   

c4|bc                                                                                                                                           

00|20180518203407144|2352356512|5ca1 column=ext:scene, timestamp=1533646294045, value=1037                                                     

c4|bc                                                                                                                                           

00|20180518203407144|2352356512|5ca1 column=ext:time, timestamp=1533646294045, value=2018-05-18T20:34:07                                       

c4|bc                                                                                                                                           

00|20180518203407144|2352356512|5ca1 column=ext:type, timestamp=1533646294045, value=login_in 

```

CustomTableInputFormat.java

```

import org.apache.commons.lang3.StringUtils;

import org.apache.hadoop.hbase.HRegionLocation;

import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.mapreduce.RegionSizeCalculator;

import org.apache.hadoop.hbase.mapreduce.TableInputFormat;

import org.apache.hadoop.hbase.mapreduce.TableSplit;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.hbase.util.Strings;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.JobContext;

import org.apache.hadoop.net.DNS;

import java.io.IOException;

import java.net.InetAddress;

import java.net.InetSocketAddress;

import java.net.UnknownHostException;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

public class CustomTableInputFormat extends TableInputFormat {

    private HashMap<InetAddress, String> reverseDNSCacheMap =

            new HashMap<>();

    private List<String> keys = new ArrayList<>();

    public CustomTableInputFormat(){

        super();

        for(int i =0;i<256;i++){

            keys.add(StringUtils.substring("00"+Integer.toHexString(i),-2));

        }

    }

    @Override

    public List<InputSplit> getSplits(JobContext context) throws IOException {

        super.initialize(context);

        TableName tableName = super.getTable().getName();

        RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(getRegionLocator(), getAdmin());

        List<InputSplit> splits = new ArrayList<>();

        for (String key : keys) {

            HRegionLocation location = getRegionLocator().getRegionLocation(Bytes.toBytes(key), false);

            InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort());

            InetAddress regionAddress = isa.getAddress();

            String regionLocation;

            regionLocation = reverseDNS(regionAddress);

            byte[] regionName = location.getRegion().getRegionName();

            String encodedRegionName = location.getRegion().getEncodedName();

            long regionSize = sizeCalculator.getRegionSize(regionName);

            byte[] splitStart = Bytes.add(Bytes.toBytes(key+"|"),this.getScan().getStartRow());

            byte[] splitStop = Bytes.add(Bytes.toBytes(key+"|"),this.getScan().getStopRow());

            TableSplit split = new TableSplit(tableName, this.getScan(),

                    splitStart, splitStop, regionLocation, encodedRegionName, regionSize);

            splits.add(split);

        }

        return splits;

    }

    String reverseDNS(InetAddress ipAddress) throws UnknownHostException {

        String hostName = this.reverseDNSCacheMap.get(ipAddress);

        if (hostName == null) {

            String ipAddressString = null;

            try {

                ipAddressString = DNS.reverseDns(ipAddress, null);

            } catch (Exception e) {

                ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName();

            }

            if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress);

            hostName = Strings.domainNamePointerToHostName(ipAddressString);

            this.reverseDNSCacheMap.put(ipAddress, hostName);

        }

        return hostName;

    }

}

```

## Flink例子

```

static Configuration conf;

    static {

        HadoopKrbLogin.login();

        conf = new Configuration();

        String tableName = "logTable1";

        conf.addResource("hbase-site.xml");

        Scan scan = new Scan();

        scan.setCaching(1000);

        scan.withStartRow("201805182039".getBytes());

        scan.withStopRow("201805182040".getBytes());

        scan.setCacheBlocks(false);

        conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, tableName);

        ClientProtos.Scan proto = null;

        try {

            proto = ProtobufUtil.toScan(scan);

        } catch (IOException e) {

            e.printStackTrace();

        }

        String ScanToString = Base64.encodeBytes(proto.toByteArray());

        conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN, ScanToString);

    }

    public static void main(String[] args) throws Exception {

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Tuple2<ImmutableBytesWritable, Result>> hbase = env.createInput(

                HadoopInputs.createHadoopInput(

                        new CustomTableInputFormat(),

                        ImmutableBytesWritable.class,

                        Result.class,

                        Job.getInstance(conf)

                )

        );

        DataSet<LogEntity> toTuple = hbase.map(

                new MapFunction<Tuple2<ImmutableBytesWritable, Result>, LogEntity>() {

                    public LogEntity map(Tuple2<ImmutableBytesWritable, Result> record) throws Exception {

                        Result result = record.f1;

                        return result2Entity(result);

                    }

                });

}

private static LogEntity result2Entity(Result result) {

        JSONObject root = new JSONObject();

        JSONObject ext = new JSONObject();

        JSONObject device = new JSONObject();

        JSONObject location = new JSONObject();

        for (Cell cell : result.rawCells()) {

            byte[] family = CellUtil.cloneFamily(cell);

            byte[] column = CellUtil.cloneQualifier(cell);

            byte[] value = CellUtil.cloneValue(cell);

            String columnName = Bytes.toString(column);

            if ("ext".equals(Bytes.toString(family))) {

                if ("durationTime".equals(columnName)) {

                    ext.put(columnName, Bytes.toLong(value));

                } else if ("time".equals(columnName)) {

                    root.put(columnName, Bytes.toString(value));

                    root.put("timeLong", DateUtil.getMill(LocalDateTime.parse(Bytes.toString(value))));

                } else {

                    ext.put(columnName, Bytes.toString(value));

                }

            } else if ("device".equals(Bytes.toString(family))) {

                device.put(columnName, Bytes.toString(value));

            } else if ("location".equals(Bytes.toString(family))) {

                location.put(columnName, Bytes.toString(value));

            }

        }

        JSONObject data = new JSONObject();

        if (device.keySet().size() > 0) {

            data.put("device", device);

        }

        if (location.keySet().size() > 0) {

            data.put("location", location);

        }

        data.put("ext", ext);

        root.put("data", data);

        return JSON.parseObject(root.toString(), LogEntity.class);

    }

```

## Spark 例子

```

public class SimpleApp implements Serializable {

 static Configuration cfg = null;

    static {

        HadoopKrbLogin.login();

        cfg = new Configuration();

        String tableName = "logTable1";

        cfg.addResource("hbase-site.xml");

        Scan scan = new Scan();

        scan.setCaching(1000);

        scan.withStartRow("201805182039".getBytes());

        scan.withStopRow("201805182040".getBytes());

        scan.setCacheBlocks(false);

        cfg.set(TableInputFormat.INPUT_TABLE, tableName);

        ClientProtos.Scan proto = null;

        try {

            proto = ProtobufUtil.toScan(scan);

        } catch (IOException e) {

            e.printStackTrace();

        }

        String ScanToString = Base64.encodeBytes(proto.toByteArray());

        cfg.set(TableInputFormat.SCAN, ScanToString);

    }

public static void main(String[] args) {

SparkConf sparkConf = new SparkConf()

                .setMaster("local")

                .setAppName("HbaseDemo");

        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =

                jsc.newAPIHadoopRDD(cfg, CustomTableInputFormat.class, ImmutableBytesWritable.class, Result.class);

        // do some transformation

        JavaRDD<LogEntity> rdd1 = hBaseRDD.mapPartitions((FlatMapFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>, LogEntity>)

                tuple2Iterator -> {

                    List<LogEntity> logEntities = new ArrayList<>();

                    while (tuple2Iterator.hasNext()) {

                        Tuple2<ImmutableBytesWritable, Result> tuple = tuple2Iterator.next();

                        Result result = tuple._2;

                        String rowKey = Bytes.toString(result.getRow());

                        logEntities.add(result2Entity(result));

                    }

                    return logEntities.iterator();

                });

}

private static LogEntity result2Entity(Result result) {

        JSONObject root = new JSONObject();

        JSONObject ext = new JSONObject();

        JSONObject device = new JSONObject();

        JSONObject location = new JSONObject();

        for (Cell cell : result.rawCells()) {

            byte[] family = CellUtil.cloneFamily(cell);

            byte[] column = CellUtil.cloneQualifier(cell);

            byte[] value = CellUtil.cloneValue(cell);

            String columnName = Bytes.toString(column);

            if ("ext".equals(Bytes.toString(family))) {

                if ("durationTime".equals(columnName)) {

                    ext.put(columnName, Bytes.toLong(value));

                } else if ("time".equals(columnName)) {

                    root.put(columnName, Bytes.toString(value));

                    root.put("timeLong", DateUtil.getMill(LocalDateTime.parse(Bytes.toString(value))));

                } else {

                    ext.put(columnName, Bytes.toString(value));

                }

            } else if ("device".equals(Bytes.toString(family))) {

                device.put(columnName, Bytes.toString(value));

            } else if ("location".equals(Bytes.toString(family))) {

                location.put(columnName, Bytes.toString(value));

            }

        }

        JSONObject data = new JSONObject();

        if (device.keySet().size() > 0) {

            data.put("device", device);

        }

        if (location.keySet().size() > 0) {

            data.put("location", location);

        }

        data.put("ext", ext);

        root.put("data", data);

        return JSON.parseObject(root.toString(), LogEntity.class);

    }

```

---

![](https://upload-images.jianshu.io/upload_images/9028759-07315bb8dadcd082.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容

  • > 要导入大量数据,Hbase的BulkLoad是必不可少的,在导入历史数据的时候,我们一般会选择使用BulkLo...
    kikiki1阅读 616评论 0 2
  • 在Flink中我们有时候需要分析数据1点到2点的范围,可是经过Region又比较慢,这时候我们就可以定制Table...
    大猪大猪阅读 1,309评论 0 16
  • 在Flink中我们有时候需要分析数据1点到2点的范围,可是经过Region又比较慢,这时候我们就可以定制Table...
    kikiki2阅读 140评论 0 1
  • 在Flink中我们有时候需要分析数据1点到2点的范围,可是经过Region又比较慢,这时候我们就可以定制Table...
    大猪大猪阅读 248评论 0 1
  • 在Flink中我们有时候需要分析数据1点到2点的范围,可是经过Region又比较慢,这时候我们就可以定制Table...
    kikiki2阅读 80评论 0 3