> 在Flink中我们有时候需要分析数据1点到2点的范围,可是经过Region又比较慢,这时候我们就可以定制`TableInputFormat`来实现我们的需求了,我们还可以采用Flink的`DataSet`的方式读取,另外下面还有`Spark`读取的例子。 ## 使用教程 Md5Util.java ``` import org.apache.commons.codec.binary.Hex; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; public class Md5Util { public static String md5(byte[] key) { return md5(key, 0, key.length); } public static String md5(byte[] key, int offset, int length) { try { MessageDigest e = MessageDigest.getInstance("MD5"); e.update(key, offset, length); byte[] digest = e.digest(); return new String(Hex.encodeHex(digest)); } catch (NoSuchAlgorithmException var5) { throw new RuntimeException("Error computing MD5 hash", var5); } } public static String md5(String str) { return md5(str.getBytes()); } public static String md5(String str,int offset, int length) { return md5(str.getBytes(),offset,length); } } ``` 数据`Split`方式 ``` private Connection connection; private Admin admin; @Before public void init() throws Exception { System.setProperty("java.security.krb5.conf", "/etc/krb5.conf"); System.setProperty("sun.security.krb5.debug", "false"); final String user = "hbase/abc.demo.com@DEMO.COM"; final String keyPath = "/home/dounine/kerberos/lake.keytab"; Configuration conf = new Configuration(); conf.addResource("hbase-site.xml"); UserGroupInformation.setConfiguration(conf); UserGroupInformation.loginUserFromKeytab(user, keyPath); connection = ConnectionFactory.createConnection(conf); admin = connection.getAdmin(); } @Test public void createTable() throws IOException { TableName table = TableName.valueOf("logTable1"); TableDescriptorBuilder tableDesc = TableDescriptorBuilder.newBuilder(table); tableDesc.setValue(TableDescriptorBuilder.SPLIT_POLICY,KeyPrefixRegionSplitPolicy.class.getName()); tableDesc.setValue(KeyPrefixRegionSplitPolicy.PREFIX_LENGTH_KEY,"2"); ColumnFamilyDescriptor extCF = ColumnFamilyDescriptorBuilder.newBuilder("ext".getBytes()).build(); ColumnFamilyDescriptor deviceCF = ColumnFamilyDescriptorBuilder.newBuilder("device".getBytes()).build(); ColumnFamilyDescriptor locationCF = ColumnFamilyDescriptorBuilder.newBuilder("location".getBytes()).build(); tableDesc.setColumnFamilies(Arrays.asList(extCF,locationCF,deviceCF)); try { byte[][] splitKeys = new byte[4][]; splitKeys[0] = Bytes.toBytes("00"); splitKeys[1] = Bytes.toBytes("40"); splitKeys[2] = Bytes.toBytes("80"); splitKeys[3] = Bytes.toBytes("c0"); admin.createTable(tableDesc.build(),splitKeys); } catch (IOException e) { e.printStackTrace(); } } ``` `logTable1`数据写入方式 ``` public class HbaseKerberos{ private static final Logger LOGGER = LoggerFactory.getLogger(HbaseKerberos.class); private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"); private static final String TABLE_NAME = "logTable1"; public void insertDataToHbase1(String appKey,List hasDatas) throws IOException { Table table = HbaseUtils.getTable(TABLE_NAME); Long sumCount = 0L; /** * 常规值 */ byte[] extCF = Bytes.toBytes("ext");//CF列族 Random random = new Random(); List rows = new ArrayList<>(); for (Log logEntity : hasDatas) { JSONObject dataJsonObject = logEntity.getData(); JSONObject extJsonObject = dataJsonObject.getJSONObject("ext"); String userId = extJsonObject.getString("userId"); String timeStr = logEntity.getTime().format(dtf); String md5Str = Md5Util.md5(userId); String rowKey = new StringBuilder() .append(md5Str.substring(0,2))//md5出来的前两位最高为ff,00~ff为256位,后期Region可以增加那么多,足够使用了。 .append("|") .append(timeStr)//时间 .append("|") .append(CrcUtil.getCrcValue(appKey)) .append("|") .append(md5Str.substring(2,8)) .append("|") .append(Md5Util.md5(UUID.randomUUID().toString()).substring(0,2)) .toString(); Put row = new Put(Bytes.toBytes(rowKey)); for(String keyName : extJsonObject.keySet()){ String value = extJsonObject.getString(keyName); if(StringUtils.isNotBlank(value)){ row.addColumn(extCF, Bytes.toBytes(keyName), Bytes.toBytes(value)); } } row.addColumn(extCF, Bytes.toBytes("time"), Bytes.toBytes(logEntity.getTime().toString())); /** * 设备信息 */ putFieldToRow(logEntity.getData(),"device",row); /** * 位置信息 */ putFieldToRow(logEntity.getData(),"location",row); rows.add(row); } for(Integer[] durtation : LimitUtil.getLimits(rows.size(),1000)){ Object[] results = new Object[(durtation[1]-durtation[0])]; try { table.batch(rows.subList(durtation[0], durtation[1]),results); } catch (InterruptedException e) { e.printStackTrace(); } sumCount += (durtation[1]-durtation[0]); } LOGGER.info("write data count:" + sumCount); } } ``` `logTable1`数据 ``` 00|20180518203401772|2352356512|4519 column=ext:appKey, timestamp=1533646292389, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e f3|f1 00|20180518203401772|2352356512|4519 column=ext:channelCode, timestamp=1533646292389, value=guanlan-resurrection-002- f3|f1 00|20180518203401772|2352356512|4519 column=ext:createDateTime, timestamp=1533646292389, value=1526646836093 f3|f1 00|20180518203401772|2352356512|4519 column=ext:retain, timestamp=1533646292389, value=17670 f3|f1 00|20180518203401772|2352356512|4519 column=ext:scene, timestamp=1533646292389, value=1007 f3|f1 00|20180518203401772|2352356512|4519 column=ext:shareId, timestamp=1533646292389, value=ogJmG5ItE_nBCS3pg5XCvGotGI1c f3|f1 00|20180518203401772|2352356512|4519 column=ext:time, timestamp=1533646292389, value=2018-05-18T20:34:01 f3|f1 00|20180518203401772|2352356512|4519 column=ext:type, timestamp=1533646292389, value=login_in f3|f1 00|20180518203401772|2352356512|4519 column=ext:userId, timestamp=1533646292389, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ f3|f1 00|20180518203406167|2352356512|4519 column=ext:appKey, timestamp=1533646347725, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e f3|54 00|20180518203406167|2352356512|4519 column=ext:channelCode, timestamp=1533646347725, value=guanlan-regular-001- f3|54 00|20180518203406167|2352356512|4519 column=ext:createDateTime, timestamp=1533646347725, value=1526646839075 f3|54 00|20180518203406167|2352356512|4519 column=ext:retain, timestamp=1533646347725, value=17670 f3|54 00|20180518203406167|2352356512|4519 column=ext:shareId, timestamp=1533646347725, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ f3|54 00|20180518203406167|2352356512|4519 column=ext:time, timestamp=1533646347725, value=2018-05-18T20:34:06 f3|54 00|20180518203406167|2352356512|4519 column=ext:type, timestamp=1533646347725, value=sharesuccess f3|54 00|20180518203406167|2352356512|4519 column=ext:userId, timestamp=1533646347725, value=ogJmG5KRcIxtyg7UmcRHFCn6YiAQ f3|54 00|20180518203407144|2352356512|5ca1 column=ext:appKey, timestamp=1533646294045, value=898b7e90-5754-11e8-983c-6b4bcc3b7c2e c4|bc 00|20180518203407144|2352356512|5ca1 column=ext:createDateTime, timestamp=1533646294045, value=1526646849745 c4|bc 00|20180518203407144|2352356512|5ca1 column=ext:retain, timestamp=1533646294045, value=17670 c4|bc 00|20180518203407144|2352356512|5ca1 column=ext:scene, timestamp=1533646294045, value=1037 c4|bc 00|20180518203407144|2352356512|5ca1 column=ext:time, timestamp=1533646294045, value=2018-05-18T20:34:07 c4|bc 00|20180518203407144|2352356512|5ca1 column=ext:type, timestamp=1533646294045, value=login_in ``` CustomTableInputFormat.java ``` import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.mapreduce.RegionSizeCalculator; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableSplit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.net.DNS; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; public class CustomTableInputFormat extends TableInputFormat { private HashMap reverseDNSCacheMap = new HashMap<>(); private List keys = new ArrayList<>(); public CustomTableInputFormat(){ super(); for(int i =0;i<256;i++){ keys.add(StringUtils.substring("00"+Integer.toHexString(i),-2)); } } @Override public List getSplits(JobContext context) throws IOException { super.initialize(context); TableName tableName = super.getTable().getName(); RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(getRegionLocator(), getAdmin()); List splits = new ArrayList<>(); for (String key : keys) { HRegionLocation location = getRegionLocator().getRegionLocation(Bytes.toBytes(key), false); InetSocketAddress isa = new InetSocketAddress(location.getHostname(), location.getPort()); InetAddress regionAddress = isa.getAddress(); String regionLocation; regionLocation = reverseDNS(regionAddress); byte[] regionName = location.getRegion().getRegionName(); String encodedRegionName = location.getRegion().getEncodedName(); long regionSize = sizeCalculator.getRegionSize(regionName); byte[] splitStart = Bytes.add(Bytes.toBytes(key+"|"),this.getScan().getStartRow()); byte[] splitStop = Bytes.add(Bytes.toBytes(key+"|"),this.getScan().getStopRow()); TableSplit split = new TableSplit(tableName, this.getScan(), splitStart, splitStop, regionLocation, encodedRegionName, regionSize); splits.add(split); } return splits; } String reverseDNS(InetAddress ipAddress) throws UnknownHostException { String hostName = this.reverseDNSCacheMap.get(ipAddress); if (hostName == null) { String ipAddressString = null; try { ipAddressString = DNS.reverseDns(ipAddress, null); } catch (Exception e) { ipAddressString = InetAddress.getByName(ipAddress.getHostAddress()).getHostName(); } if (ipAddressString == null) throw new UnknownHostException("No host found for " + ipAddress); hostName = Strings.domainNamePointerToHostName(ipAddressString); this.reverseDNSCacheMap.put(ipAddress, hostName); } return hostName; } } ``` ## Flink例子 ``` static Configuration conf; static { HadoopKrbLogin.login(); conf = new Configuration(); String tableName = "logTable1"; conf.addResource("hbase-site.xml"); Scan scan = new Scan(); scan.setCaching(1000); scan.withStartRow("201805182039".getBytes()); scan.withStopRow("201805182040".getBytes()); scan.setCacheBlocks(false); conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, tableName); ClientProtos.Scan proto = null; try { proto = ProtobufUtil.toScan(scan); } catch (IOException e) { e.printStackTrace(); } String ScanToString = Base64.encodeBytes(proto.toByteArray()); conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN, ScanToString); } public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource> hbase = env.createInput( HadoopInputs.createHadoopInput( new CustomTableInputFormat(), ImmutableBytesWritable.class, Result.class, Job.getInstance(conf) ) ); DataSet toTuple = hbase.map( new MapFunction, LogEntity>() { public LogEntity map(Tuple2 record) throws Exception { Result result = record.f1; return result2Entity(result); } }); } private static LogEntity result2Entity(Result result) { JSONObject root = new JSONObject(); JSONObject ext = new JSONObject(); JSONObject device = new JSONObject(); JSONObject location = new JSONObject(); for (Cell cell : result.rawCells()) { byte[] family = CellUtil.cloneFamily(cell); byte[] column = CellUtil.cloneQualifier(cell); byte[] value = CellUtil.cloneValue(cell); String columnName = Bytes.toString(column); if ("ext".equals(Bytes.toString(family))) { if ("durationTime".equals(columnName)) { ext.put(columnName, Bytes.toLong(value)); } else if ("time".equals(columnName)) { root.put(columnName, Bytes.toString(value)); root.put("timeLong", DateUtil.getMill(LocalDateTime.parse(Bytes.toString(value)))); } else { ext.put(columnName, Bytes.toString(value)); } } else if ("device".equals(Bytes.toString(family))) { device.put(columnName, Bytes.toString(value)); } else if ("location".equals(Bytes.toString(family))) { location.put(columnName, Bytes.toString(value)); } } JSONObject data = new JSONObject(); if (device.keySet().size() > 0) { data.put("device", device); } if (location.keySet().size() > 0) { data.put("location", location); } data.put("ext", ext); root.put("data", data); return JSON.parseObject(root.toString(), LogEntity.class); } ``` ## Spark 例子 ``` public class SimpleApp implements Serializable { static Configuration cfg = null; static { HadoopKrbLogin.login(); cfg = new Configuration(); String tableName = "logTable1"; cfg.addResource("hbase-site.xml"); Scan scan = new Scan(); scan.setCaching(1000); scan.withStartRow("201805182039".getBytes()); scan.withStopRow("201805182040".getBytes()); scan.setCacheBlocks(false); cfg.set(TableInputFormat.INPUT_TABLE, tableName); ClientProtos.Scan proto = null; try { proto = ProtobufUtil.toScan(scan); } catch (IOException e) { e.printStackTrace(); } String ScanToString = Base64.encodeBytes(proto.toByteArray()); cfg.set(TableInputFormat.SCAN, ScanToString); } public static void main(String[] args) { SparkConf sparkConf = new SparkConf() .setMaster("local") .setAppName("HbaseDemo"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); JavaPairRDD hBaseRDD = jsc.newAPIHadoopRDD(cfg, CustomTableInputFormat.class, ImmutableBytesWritable.class, Result.class); // do some transformation JavaRDD rdd1 = hBaseRDD.mapPartitions((FlatMapFunction>, LogEntity>) tuple2Iterator -> { List logEntities = new ArrayList<>(); while (tuple2Iterator.hasNext()) { Tuple2 tuple = tuple2Iterator.next(); Result result = tuple._2; String rowKey = Bytes.toString(result.getRow()); logEntities.add(result2Entity(result)); } return logEntities.iterator(); }); } private static LogEntity result2Entity(Result result) { JSONObject root = new JSONObject(); JSONObject ext = new JSONObject(); JSONObject device = new JSONObject(); JSONObject location = new JSONObject(); for (Cell cell : result.rawCells()) { byte[] family = CellUtil.cloneFamily(cell); byte[] column = CellUtil.cloneQualifier(cell); byte[] value = CellUtil.cloneValue(cell); String columnName = Bytes.toString(column); if ("ext".equals(Bytes.toString(family))) { if ("durationTime".equals(columnName)) { ext.put(columnName, Bytes.toLong(value)); } else if ("time".equals(columnName)) { root.put(columnName, Bytes.toString(value)); root.put("timeLong", DateUtil.getMill(LocalDateTime.parse(Bytes.toString(value)))); } else { ext.put(columnName, Bytes.toString(value)); } } else if ("device".equals(Bytes.toString(family))) { device.put(columnName, Bytes.toString(value)); } else if ("location".equals(Bytes.toString(family))) { location.put(columnName, Bytes.toString(value)); } } JSONObject data = new JSONObject(); if (device.keySet().size() > 0) { data.put("device", device); } if (location.keySet().size() > 0) { data.put("location", location); } data.put("ext", ext); root.put("data", data); return JSON.parseObject(root.toString(), LogEntity.class); } ``` --- ![](https://upload-images.jianshu.io/upload_images/9028759-07315bb8dadcd082.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)w