Hbase的主要客户端接口通过org.apache.hadoop.hbase.client包中的HTable类来实现,用户可以通过它实现对Hbase进行数据的增查改删(CRUD)操作。
需要注意的是HTable实例的创建是有代价的,实例化时会进行一些检查和其他操作,这些操作都是比较耗时的。因此,建议给每个线程创建一个HTable实例,并在客户端应用的生存周期内复用这个实例。
put方法
put方法实现向Hbase存入数据,可以分为两类:一类作用于单行;另一类作用于多行
单行put
void put(Put put) throws IOException
该方法以单个Put或存储在列表中的一组Put对象作为入参,Put对象有如下几个构造函数:
Put(byte[] row)
Put(byte[] row, RowLock rowLock)
Put(byte[] row, long ts)
Put(byte[] row, long ts, RowLock rowLock)
创建Put实例用户需要提供一个行键,Hbase中每行数据都有唯一的行键,就像关系型数据库的主键
创建完Put实例后,调用下面的几个方法向实例中添加数据:
Put add(byte[] family, byte[] qualifier, byte[] value)
Put add(byte[] family, byte[] qualifier, long ts, byte[] value)
Put add(KeyValue kv)
每次调用add方法可以向实例中添加一列数据,如果不指定时间戳ts参数,会使用构造Put实例时指定的时间戳,如果也没指定,将会由region server指定
获取Put实例内部添加的数据通过get方法:
List<KeyValue> get(byte[] family, byte[] qualifier)
Map<byte[], List<KeyValue>> getFamilyMap()
如果想不通过遍历整个集合而确定数据是否存在,可以使用:
boolean has(byte[] family, byte[] qualifier)
boolean has(byte[] family, byte[] qualifier, long ts)
boolean has(byte[] family, byte[] qualifier, byte[] value)
boolean has(byte[] family, byte[] qualifier, long ts, byte[] value)
如果找到匹配的列返回true
Put类还提供了一些其他常用方法:
方法 | 用途 |
---|---|
getRow() | 返回创建Put实例时指定的行键 |
getRowLock() | 返回当前Put实例的RowLock实例 |
getTimeStamp() | 返回Put实例的时间戳,未被设定时返回Long.MAX_VALUE |
heapSize() | 计算当前Put实例的堆大小,既包括数据也包含数据结构所占空间 |
isEmpty() | 检查FamilyMap是否含有任何KeyValue实例 |
numFamilies() | FamilyMap的大小,即所有KeyValue实例中列族的个数 |
size() | 本次Put会添加的KeyValue(单元格)实例的数量 |
调用实例
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class PutExample {
public static void main(String[] args) throws IOException {
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "testtable");
Put put = new Put(Bytes.toBytes("row1"));
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val1"));
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2")
Bytes.toBytes("val2"));
table.put(put);
}
}
数据版本化
Hbase的一个特殊功能是可以为一个单元格(一个特定列的值)存储多个版本的数据,每个版本的数据都有一个时间戳,并且按照降序存储。
举个例子:
> create "test", "cf1"
> put "test", "row1", "cf1", "val1"
> put "test", "row1", "cf1", "val2"
> scan "test"
> scan "test",{ VERSIONS => 3}
上面这些shell命令创建了一个test表和cf1列族,紧接着使用两个put命令向同一个行键和列键中存入数据,但是值不同;然后用scan命令查看数据。正常情况下,你可能认为val2会覆盖val1,但Hbase中不是的,默认情况Hbase会保留3个版本的数据。scan操作和get操作只返回最新版本的数据,调用时加入最大版本参数就会得到多版本大数据
写缓冲区
在Hbase中,每个put操作都会被转换成一次 RPC 操作,从而将客户端的数据提交给服务器并接受服务器的处理结果。如果某个应用程序需要每秒提交上千行数据到Hbase中,这样使用一行已提交的方式就太过于不合理了
Hbase的api提供了一个客户端的写缓冲区,它负责收集客户端提交的put操作,在合适的时机通过一次RPC调用将所有put送往服务器,通过下面的方法可以控制和查看缓冲区的使用状态:
void setAutoFlush(boolean autoFlush)
boolean isAutoFlush()
Hbase客户端默认情况下是禁用客户端缓冲区的,可以显示地将 autoFlush 设置成 false 来激活缓冲区:
table.setAutoFlush(false)
同时可以通过调用 isAutoFlush() 方法来查看缓冲区是否启用,返回false表示已启用缓冲区
显示激活缓冲区后,客户端单行put操作不会直接产生RPC调用,此时提交的put实例被保存在客户端进程的内存中,如果想强制把数据刷新到Hbase服务器,可以调用api方法:
void flushCommits() throws IOException
该方法会将所有的修改提交给远程Hbase服务器,被缓冲的put实例可以跨多行
通常用户不需要自己手动的强制刷缓冲区,Hbase客户端api会跟踪缓冲区中数据大小,一旦超出配置的大小限制,会自动隐式调用写命令,可以通过下面的方法查看和设置缓冲区大小(单位是字节,默认是2097152字节,即2M):
long getWriteBufferSize()
void setWriteBufferSize(long writeBufferSize) throws IOException
此外还可以通过修改Hbase的配置文件 hbase-site.xml 来设置缓冲区大小,这样可以避免给每个HTable实例都设置一次大小:
<property>
<name>hbase.client.write.buffer</name>
<value>20971520</value>
</property>
写缓冲区实例
HTable table = new HTable(conf, "testtable");
System.out.println("Auto flush:" + table.isAutoFlush());
table.setAutoFlush(false);
Put put1 = new Put(Bytes.toBytes("row1));
put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val1"));
table.put(put1);
Put pu2 = new Put(Bytes.toBytes("row2"));
put2.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val2"));
table.put(put2);
Put pu3 = new Put(Bytes.toBytes("row3"));
put3.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val3"));
table.put(put3);
Get get = new Get(Bytes.toBytes("row1"));
Result res1 = table.get(get);
System.out.println("Result:" + res1);
table.flushCommits();
Result res2 = table.get(get);
System.out.println("Result:" + res2);
可以通过 getWriteBuffer() 方法获取提交到缓冲区的put实例列表,不过直接操作这个列表是比较危险的,因为这将绕过堆大小的检查,并且有可能这个缓冲区正在刷写内容到服务器
批量Put
Hbase客户端api提供批量提交put实例的方法,调用方法如下:
void put(List<Put> puts) throws IOException
由于批量操作可能会涉及多行,有可能部分修改会失败,那么Hbase服务器将放弃当前修改的处理,继续处理下一个,最后通过一个IOException反馈给客户端。服务器处理失败的put实例会被保存在客户端的写缓冲区中,再下一次刷写缓冲区的时候会重试
值得一提的是,客户端会做一些基本的检查工作,例如put实例内容是否为空,是否制定了列等。如果检查不通过,客户端会抛出异常,同时刷写数据的时候这些put实例不会被处理
使用批量put时,需要注意的是:客户端无法控制服务器端执行put的顺序,即用户无法保证数据的写入顺序!
批量put实例
List<Put> puts = new ArrayList<Put>();
Put put1 = new Put(Bytes.toBytes("row1"));
put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val1"));
puts.add(put1);
Put put2 = new Put(Bytes.toBytes("row2"));
put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val2"));
puts.add(put2);
table.put(puts);
原子性操作
Hbase客户端api提供了一种特殊的put调用,它能够保证操作的原子性:(检查写 check and put):
boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
byte[] value, Put put) throws IOException
该方法自带检查功能,检查通过才执行put操作,否则放弃本次修改,可以保证服务端put操作的原子性。
此原子性操作api限制了检查和修改的数据必须属于同一行,因此只提供了同一行数据的原子性保证;检查和修改分别针对不同行时会抛出异常
原子操作实例
Put put1 = new Put(Bytes.toBytes("row1"));
put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val1"));
// 值不存在的情况下执行
boolean res1 = table.checkAndPut(Bytes.toBytes("row1"),
Bytes.toBytes("colfam1"),
Bytes.toBytes("qual1"),
null, put1);
Put put2 = new Put(Bytes.toBytes("row1"));
put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"),
Bytes.toBytes("val2"));
// 当上一次提交的值存在,则执行
boolean res2 = table.checkAndPut(Bytes.toBytes("row1"),
Bytes.toBytes("colfam1"),
Bytes.toBytes("qual1"),
Bytes.toBytes("val1"),
put2);
原子性操作使Hbase提供了不同客户端并发修改数据的功能