概述
很多时候,我们需要将外部的数据导入到HBase集群中,例如:将一些历史的数据导入到HBase做备份。可以使用HBase的Java API,通过put方式可以将数据写入到HBase中,也可以通过MapReduce编写代码将HDFS中的数据导入到HBase。但这些方式都是基于HBase的原生API方式进行操作的。这些方式有一个共同点,就是需要与HBase连接,然后进行操作。HBase服务器要维护、管理这些连接,以及接受来自客户端的操作,会给HBase的存储、计算、网络资源造成较大消耗。此时,在需要将海量数据写入到HBase时,通过Bulk load(大容量加载)的方式,会变得更高效。可以这么说,进行大量数据操作,Bulk load是必不可少的。
HBase的数据最终是需要持久化到HDFS。HDFS是一个文件系统,那么数据可定是以一定的格式存储到里面的。例如:Hive我们可以以ORC、Parquet等方式存储。而HBase也有自己的数据格式,那就是HFile。Bulk Load就是直接将数据写入到StoreFile(HFile)中,从而绕开与HBase的交互,HFile生成后,直接一次性建立与HBase的关联即可。使用BulkLoad,绕过了Write to WAL,Write to MemStore及Flush to disk的过程。
更多可以参考官方对Bulk load的描述:https://hbase.apache.org/book.html#arch.bulk.load
一、使用java API的方式
java API中的put操作可以将数据导入到hbase中 其中包含单条和批量导入两种方式
@Test
public void test5() throws IOException {
// 获取Hbase配置文件的对象
// HBaseConfiguration conf=(HBaseConfiguration) HBaseConfiguration.create();
Configuration conf = HBaseConfiguration.create();
// 设置conf的zk访问路径
conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181");
// 创建hbase连接
Connection conn = ConnectionFactory.createConnection(conf);
System.out.println(conn);
// 获取dml的句柄
// 一个Htable对象代表一个表
HTable table = (HTable) conn.getTable(TableName.valueOf("test1"));
// 数据导入 重点****************
// 插入单条数据 Put对象是封装需要插入的数据,每一条数据都要封装一个普通对象
Put put = new Put("rk001".getBytes());
// 参数1是列簇 参数2 是列 参数3 是值
put.addColumn("info1".getBytes(), "age".getBytes(), "100".getBytes());
table.put(put);
}
// 批量数据导入 list
// 先将插入的数据放在list集合(也就是放在内存中)并没有提交,等放置完成之后一起提交,这种情况有可能出现内存溢出,因为list集合太大的话就Juin占满内存
@Test
public void test6() throws IOException {
long start = System.currentTimeMillis();
// 获取Hbase配置文件的对象
// HBaseConfiguration conf=(HBaseConfiguration) HBaseConfiguration.create();
Configuration conf = HBaseConfiguration.create();
// 设置conf的zk访问路径
conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181");
// 创建hbase连接
Connection conn = ConnectionFactory.createConnection(conf);
System.out.println(conn);
// 获取dml的句柄
// 一个Htable对象代表一个表
HTable table = (HTable) conn.getTable(TableName.valueOf("test1"));
// 创建list
List<Put> list = new ArrayList<>();
for (int i = 0; i < 10000; i++) {
Put put = new Put(("rk" + i).getBytes());
put.addColumn("info1".getBytes(), "age".getBytes(), ("" + i).getBytes());
list.add(put);
}
table.put(list);
long end = System.currentTimeMillis();
System.out.println("用时:" + (end - start));
}
// 利用本地缓存批量数据导入,本地缓存是基于磁盘的,不会占用太多的内存,但是这种方式是没有list集合的方法速度快
@Test
public void test7() throws IOException {
long start = System.currentTimeMillis();
// 获取Hbase配置文件的对象
// HBaseConfiguration conf=(HBaseConfiguration) HBaseConfiguration.create();
Configuration conf = HBaseConfiguration.create();
// 设置conf的zk访问路径
conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181");
// 创建hbase连接
Connection conn = ConnectionFactory.createConnection(conf);
System.out.println(conn);
// 获取dml的句柄
// 一个Htable对象代表一个表
HTable table = (HTable) conn.getTable(TableName.valueOf("test1"));
// 设置是否需要自动刷新自动提交put对象,默认true,默认一条数据就会提交一次
// 将参数改为false 不会立即提交,达到我们设定的值才会提交
table.setAutoFlushTo(false);
for (int i = 0; i < 10000; i++) {
Put put = new Put(("rk" + i).getBytes());
put.addColumn("info1".getBytes(), "age".getBytes(), ("" + i).getBytes());
// 这时候不会自动提交到hbase了 提交到本地缓存了
table.put(put);
// 如果设置缓存的大小一般就不用设置指定条数提交了,但是这两种方式注意最后提交一次
table.setWriteBufferSize(10 * 1024 * 1024);// 这是设置缓存的大小,
if (i % 3000 == 0) {
table.flushCommits();
}
}
table.flushCommits();
long end = System.currentTimeMillis();
System.out.println("用时:" + (end - start));
}
二、使用MAPREDUCE JOB的方式进行导入
具体的导入的方式可以参考下面的博客
https://blog.csdn.net/CHANGGUOLONG/article/details/90732931
这篇博客详细介绍了Hbase的Mapreduce操作,并有相关案例
三、采用BULKLOAD的方式进行导入
在put数据时会先将数据的更新操作信息和数据信息写入WAL,在写入到WAL后,数据就会被放到MemStore中,当MemStore满后数据就会被flush到磁盘(即形成HFile文件),在这过程涉及到的flush,split,compaction等操作都容易造成节点不稳定,数据导入慢,耗费资源等问题,在海量数据的导入过程极大的消耗了系统性能,避免这些问题最好的方法就是使用BlukLoad的方式来加载数据到HBase中。
首先明白一点:Hbase中的内容再hdfs中是以Hfile文件格式进行存储的, HBase中每张Table在根目录(/HBase)下用一个文件夹存储,Table名为文件夹名,在Table文件夹下每个Region同样用一个文件夹存储,每个Region文件夹下的每个列族也用文件夹存储,而每个列族下存储的就是一些HFile文件,HFile就是HBase数据在HFDS下存储格式,其整体目录结构如下:
/hbase/data/default/<tbl_name>/<region_id>/<cf>/<hfile_id>
3.1 Bulk load MapReduce程序开发
Bulk load的流程主要分为两步:
- 通过MapReduce准备好数据文件(Store Files)
- 加载数据文件到HBase
3.2 银行转账记录海量冷数据存储案例
银行每天都产生大量的转账记录,超过一定时期的数据,需要定期进行备份存储。本案例,在MySQL中有大量转账记录数据,需要将这些数据保存到HBase中。因为数据量非常庞大,所以采用的是Bulk Load方式来加载数据。
- 项目组为了方便数据备份,每天都会将对应的转账记录导出为CSV文本文件,并上传到HDFS。我们需要做的就将HDFS上的文件导入到HBase中。
- 因为我们只需要将数据读取出来,然后生成对应的Store File文件。所以,我们编写的MapReduce程序,只有Mapper,而没有Reducer。
3.2.1 数据集
字段 | 字段名称 |
---|---|
id | ID |
code | 流水单号 |
rec_account | 收款账户 |
rec_bank_name | 收款银行 |
rec_name | 收款人姓名 |
pay_account | 付款账户 |
pay_name | 付款人姓名 |
pay_comments | 转账附言 |
pay_channel | 转账渠道 |
pay_way | 转账方式 |
status | 转账状态 |
timestamp | 转账时间 |
money | 转账金额 |
3.2.2 项目准备工作
HBase中创建银行转账记录表
create_namespace "ITCAST_BANK"
# disable "TRANSFER_RECORD"
# drop "TRANSFER_RECORD"
create "ITCAST_BANK:TRANSFER_RECORD", { NAME => "C1", COMPRESSION => "GZ"}, { NUMREGIONS => 6, SPLITALGO => "HexStringSplit"}
3.2.3 导入POM依赖
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<target>1.8</target>
<source>1.8</source>
</configuration>
</plugin>
</plugins>
</build>
3.2.4 导入配置文件
将 core-site.xml、hbase-site.xml、log4j.properties三个配置文件拷贝到resources目录中。
如果添加下面属性可以运行的话,可以不用导入上面配置文件
//创建配置文件对象
Configuration conf=new Configuration();
//设置zk的
conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181");
3.2.5 确保Windows环境变量配置正确
需要装一个windows版本Hadoop客户端
-
HADOOP_HOME
-
HADOOP_USER_NAME
3.2.6 编写实体类
实现步骤:
- 创建实体类TransferRecord
- 添加一个parse静态方法,用来将逗号分隔的字段,解析为实体类
- 使用以下数据测试解析是否成功
7e59c946-b1c6-4b04-a60a-f69c7a9ef0d6,SU8sXYiQgJi8,6225681772493291,杭州银行,丁杰,4896117668090896,卑文彬,老婆,节日快乐,电脑客户端,电子银行转账,转账完成,2020-5-13 21:06:92,11659.0
参考代码:
public class TransferRecord {
private String id;
private String code;
private String rec_account;
private String rec_bank_name;
private String rec_name;
private String pay_account;
private String pay_name;
private String pay_comments;
private String pay_channel;
private String pay_way;
private String status;
private String timestamp;
private String money;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getRec_account() {
return rec_account;
}
public void setRec_account(String rec_account) {
this.rec_account = rec_account;
}
public String getRec_bank_name() {
return rec_bank_name;
}
public void setRec_bank_name(String rec_bank_name) {
this.rec_bank_name = rec_bank_name;
}
public String getRec_name() {
return rec_name;
}
public void setRec_name(String rec_name) {
this.rec_name = rec_name;
}
public String getPay_account() {
return pay_account;
}
public void setPay_account(String pay_account) {
this.pay_account = pay_account;
}
public String getPay_name() {
return pay_name;
}
public void setPay_name(String pay_name) {
this.pay_name = pay_name;
}
public String getPay_comments() {
return pay_comments;
}
public void setPay_comments(String pay_comments) {
this.pay_comments = pay_comments;
}
public String getPay_channel() {
return pay_channel;
}
public void setPay_channel(String pay_channel) {
this.pay_channel = pay_channel;
}
public String getPay_way() {
return pay_way;
}
public void setPay_way(String pay_way) {
this.pay_way = pay_way;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getMoney() {
return money;
}
public void setMoney(String money) {
this.money = money;
}
@Override
public String toString() {
return "TransferRecord{" +
"id='" + id + '\'' +
", code='" + code + '\'' +
", rec_account='" + rec_account + '\'' +
", rec_bank_name='" + rec_bank_name + '\'' +
", rec_name='" + rec_name + '\'' +
", pay_account='" + pay_account + '\'' +
", pay_name='" + pay_name + '\'' +
", pay_comments='" + pay_comments + '\'' +
", pay_channel='" + pay_channel + '\'' +
", pay_way='" + pay_way + '\'' +
", status='" + status + '\'' +
", timestamp='" + timestamp + '\'' +
", money='" + money + '\'' +
'}';
}
public static TransferRecord parse(String line) {
TransferRecord transferRecord = new TransferRecord();
String[] fields = line.split(",");
transferRecord.setId(fields[0]);
transferRecord.setCode(fields[1]);
transferRecord.setRec_account(fields[2]);
transferRecord.setRec_bank_name(fields[3]);
transferRecord.setRec_name(fields[4]);
transferRecord.setPay_account(fields[5]);
transferRecord.setPay_name(fields[6]);
transferRecord.setPay_comments(fields[7]);
transferRecord.setPay_channel(fields[8]);
transferRecord.setPay_way(fields[9]);
transferRecord.setStatus(fields[10]);
transferRecord.setTimestamp(fields[11]);
transferRecord.setMoney(fields[12]);
return transferRecord;
}
public static void main(String[] args) {
String str = "7e59c946-b1c6-4b04-a60a-f69c7a9ef0d6,SU8sXYiQgJi8,6225681772493291,杭州银行,丁杰,4896117668090896,卑文彬,老婆,节日快乐,电脑客户端,电子银行转账,转账完成,2020-5-13 21:06:92,11659.0";
TransferRecord tr = parse(str);
System.out.println(tr);
}
}
3.2.7 构建读取数据的Mapper
HBase提供了两个类来专门对MapReduce支持:
- ImmutableBytesWritable:对应rowkey
- MapReduceExtendedCell:对应 列 → 值(键值对)
实现步骤:
- 1、创建一个BankRecordMapper的类继承Mapper类,Mapper的泛型为
- a)输入key:LongWritable
- b)输入value:Text
- c)输出key:ImmutableBytesWritable
- d)输出value:MapReduceExtendedCell
- 2、将Mapper获取到Text文本行,转换为TransferRecord实体类
- 3、从实体类中获取ID,并转换为rowkey
- 4、使用KeyValue类构建单元格,每个需要写入到表中的字段都需要构建出来单元格
- 5、使用context.write将输出输出
- a)构建输出key:new ImmutableBytesWrite(rowkey)
- b)构建输出的value:new MapReduceExtendedCell(keyvalue对象)
参考代码:
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class BankRecordMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, MapReduceExtendedCell> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// HBase需要有rowkey,列名 => 值
TransferRecord transferRecord = TransferRecord.parse(value.toString());
String rowkey = transferRecord.getId();
// 列蔟
byte[] cf = Bytes.toBytes("C1");
byte[] colId = Bytes.toBytes("id");
byte[] colCode = Bytes.toBytes("code");
byte[] colRec_account = Bytes.toBytes("rec_account");
byte[] colRec_bank_name = Bytes.toBytes("rec_bank_name");
byte[] colRec_name = Bytes.toBytes("rec_name");
byte[] colPay_account = Bytes.toBytes("pay_account");
byte[] colPay_name = Bytes.toBytes("pay_name");
byte[] colPay_comments = Bytes.toBytes("pay_comments");
byte[] colPay_channel = Bytes.toBytes("pay_channel");
byte[] colPay_way = Bytes.toBytes("pay_way");
byte[] colStatus = Bytes.toBytes("status");
byte[] colTimestamp = Bytes.toBytes("timestamp");
byte[] colMoney = Bytes.toBytes("money");
KeyValue idKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getId()));
KeyValue codeKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getCode()));
KeyValue rec_accountKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getRec_account()));
KeyValue rec_bank_nameKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getRec_bank_name()));
KeyValue rec_nameKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getRec_name()));
KeyValue pay_accountKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_account()));
KeyValue pay_nameKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_name()));
KeyValue pay_commentsKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_comments()));
KeyValue pay_channelKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_channel()));
KeyValue pay_wayKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_way()));
KeyValue statusKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getStatus()));
KeyValue timestampKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getTimestamp()));
KeyValue moneyKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getMoney()));
ImmutableBytesWritable rowkeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowkey));
context.write(rowkeyWritable, new MapReduceExtendedCell(idKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(codeKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(rec_accountKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(rec_bank_nameKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(rec_nameKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(pay_accountKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(pay_nameKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(pay_commentsKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(pay_channelKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(pay_wayKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(statusKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(timestampKeyValue));
context.write(rowkeyWritable, new MapReduceExtendedCell(moneyKeyValue));
}
}
3.2.8 编写驱动类
分析:
- MapReduce执行时,需要读取HBase表的region相关信息,故需要获取HBase的表
实现步骤:
- 1.使用HBaseConfiguration.create()加载配置文件
- 2.创建HBase连接
- 3.获取HTable
- 4.构建MapReduce JOB
- a)使用Job.getInstance构建一个Job对象
- b)调用setJarByClass设置要执行JAR包的class
- c)调用setInputFormatClass为TextInputFormat.class
- d)设置MapperClass
- e)设置输出键Output Key Class
- f)设置输出值Output Value Class
- g)设置输入输出到HDFS的路径,输入路径/bank/input,输出路径/bank/output
- FileInputFormat.setInputPaths
- FileOutputFormat.setOutputPath
- h)使用connection.getRegionLocator获取HBase Region的分布情况
- i)使用HFileOutputFormat2.configureIncrementalLoad配置HFile输出
- 5.调用job.waitForCompletion执行MapReduce程序
public class BulkloadDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 加载配置文件
Configuration configuration = HBaseConfiguration.create();
// 配置JOB
Job instance = Job.getInstance(configuration);
instance.setJarByClass(BulkloadDriver.class);
instance.setMapperClass(BankRecordMapper.class);
// 配置输入
instance.setInputFormatClass(TextInputFormat.class);
FileInputFormat.setInputPaths(instance, new Path("hdfs://node1.itcast.cn:8020/bank/input"));
// 配置输出
FileOutputFormat.setOutputPath(instance, new Path("hdfs://node1.itcast.cn:8020/bank/output"));
instance.setOutputKeyClass(ImmutableBytesWritable.class);
instance.setOutputValueClass(MapReduceExtendedCell.class);
// 配置HFileoutputFormat2
Connection connection = ConnectionFactory.createConnection(configuration);
Table table = connection.getTable(TableName.valueOf("ITCAST_BANK:TRANSFER_RECORD"));
// 获取表的Region检索对象
RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf("ITCAST_BANK:TRANSFER_RECORD"));
HFileOutputFormat2.configureIncrementalLoad(instance, table, regionLocator);
// 执行job
if (instance.waitForCompletion(true)) {
System.out.println("任务执行成功!");
}
else {
System.out.println("任务执行失败!");
System.exit(1);
}
}
}
参考:
https://blog.csdn.net/CSDNGuoYuying/article/details/128803798