一、HBase简介
1.1 HBase是什么
HBase是一个分布式的、面向列的开源数据库,Hadoop 数据库。搭建基于 Hadoop 和 ZK 。
历史是基于Google的 Bigtable 、Google 文件系统等论文。HBase 在Hadoop 之上提供了类似于 Bigtable 的能力。 HBase是Apache的Hadoop项目的子项目。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。另一个不同的是HBase基于列的而不是基于行的模式。
1.2 HBase特性
1)海量存储
Hbase适合存储PB级别的海量数据,在PB级别的数据以及采用廉价PC存储的情况下,能在几十到百毫秒内返回数据。这与Hbase的极易扩展性息息相关。正式因为Hbase良好的扩展性,才为海量数据的存储提供了便利。
2)列式存储
这里的列式存储其实说的是列族存储,Hbase是根据列族来存储数据的。列族下面可以有非常多的列,列族在创建表的时候就必须指定。
3)极易扩展
Hbase的扩展性主要体现在两个方面,一个是基于上层处理能力(RegionServer)的扩展,一个是基于存储的扩展(HDFS)。通过横向添加RegionSever的机器,进行水平扩展,提升Hbase上层的处理能力,提升Hbsae服务更多Region的能力。
备注:RegionServer的作用是管理region、承接业务的访问,这个后面会详细的介绍通过横向添加Datanode的机器,进行存储层扩容,提升Hbase的数据存储能力和提升后端存储的读写能力。
4)高并发
由于目前大部分使用Hbase的架构,都是采用的廉价PC,因此单个IO的延迟其实并不小,一般在几十到上百ms之间。这里说的高并发,主要是在并发的情况下,Hbase的单个IO延迟下降并不多。能获得高并发、低延迟的服务。
5)稀疏
稀疏主要是针对Hbase列的灵活性,在列族中,你可以指定任意多的列,在列数据为空的情况下,是不会占用存储空间的。
1.3 HBase适用场景
(1)海量数据场景,天然支持数据水平扩展,处理 TB 到 PB 数据
(2)写性能突出,轻松打满网卡,增量数据抓取、批处理
(3)源于Hadoop生态,和hadoop生态各个大数据系统天然集成
1.4 Hbase应用场景
(1)半结构化或非结构化数据
对于数据结构字段不够确定或杂乱无章很难按一个概念去进行抽取的数据适合用HBase。以上面的例子为例,当业务发展需要存储author的email,phone,address信息时RDBMS需要停机维护,而HBase支持动态增加.
(2)记录非常稀疏
RDBMS的行有多少列是固定的,为null的列浪费了存储空间。而如上文提到的,HBase为null的Column不会被存储,这样既节省了空间又提高了读性能。
(3)多版本数据
如上文提到的根据Row key和Column key定位到的Value可以有任意数量的版本值,因此对于需要存储变动历史记录的数据,用HBase就非常方便了。比如上例中的author的Address是会变动的,业务上一般只需要最新的值,但有时可能需要查询到历史值。
(4)超大数据量
当数据量越来越大,RDBMS数据库撑不住了,就出现了读写分离策略,通过一个Master专门负责写操作,多个Slave负责读操作,服务器成本倍增。随着压力增加,Master撑不住了,这时就要分库了,把关联不大的数据分开部署,一些join查询不能用了,需要借助中间层。随着数据量的进一步增加,一个表的记录越来越大,查询就变得很慢,于是又得搞分表,比如按ID取模分成多个表以减少单个表的记录数。经历过这些事的人都知道过程是多么的折腾。采用HBase就简单了,只需要加机器即可,HBase会自动水平切分扩展,跟Hadoop的无缝集成保障了其数据可靠性(HDFS)和海量数据分析的高性能(MapReduce)
1.5HBase运行模式
(1)单机
(2)伪分布式
(3)分布式
(4)实战小提示:如果 DB 数据大,想要同步到 HBase 怎么做?用数据同步工具:DataX、binlog等
1.6Hbase的优点
(1)列的可以动态增加,并且列为空就不存储数据,节省存储空间.
(2)Hbase自动切分数据,使得数据存储自动具有水平scalability.
(3)Hbase可以提供高并发读写操作的支持
1.7 Hbase的缺点
(1)不能支持条件查询,只支持按照Row key来查询.
(2)暂时不能支持Master server的故障切换,当Master宕机后,整个存储系统就会挂掉.
1.8 HBase特性
强读写一致性:适合高速计数聚合操作
自动切分数据:分布式存储数据,随着数据增长进行自动切片
RegionServer自动失效备援
与HDFS集成
支持MapReduce执行大规模并行操作
提供Java Client API
提供Thrift/REST API
针对大容量查询优化的块缓存和Bloom Fliter
可视化管理界面
二、HBase安装
2.1 Zookeeper正常部署
首先保证Zookeeper集群的正常部署,并启动之:
[atguigu@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh start
[atguigu@hadoop103 zookeeper-3.4.10]$ bin/zkServer.sh start
[atguigu@hadoop104 zookeeper-3.4.10]$ bin/zkServer.sh start
2.2 Hadoop正常部署
Hadoop集群的正常部署并启动:
[atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh
[atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh
2.3 HBase的解压
解压HBase到指定目录:
[atguigu@hadoop102 software]$ tar -zxvf hbase-1.3.1-bin.tar.gz -C /opt/module
2.4 HBase的配置文件
修改HBase对应的配置文件。
1)hbase-env.sh修改内容:
export JAVA_HOME=/opt/module/jdk1.8.0_144
export HBASE_MANAGES_ZK=false
2)hbase-site.xml修改内容:
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop102:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<!-- 0.98后的新变动,之前版本没有.port,默认端口为60000 -->
<property>
<name>hbase.master.port</name>
<value>16000</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop102:2181,hadoop103:2181,hadoop104:2181</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/opt/module/zookeeper-3.4.10/zkData</value>
</property>
</configuration>
3)regionservers:
hadoop102
hadoop103
hadoop104
4)软连接hadoop配置文件到hbase:
[atguigu@hadoop102 module]$ ln -s /opt/module/hadoop-2.7.2/etc/hadoop/core-site.xml
/opt/module/hbase/conf/core-site.xml
[atguigu@hadoop102 module]$ ln -s /opt/module/hadoop-2.7.2/etc/hadoop/hdfs-site.xml
/opt/module/hbase/conf/hdfs-site.xml
2.5 HBase远程发送到其他集群
[atguigu@hadoop102 module]$ xsync hbase/
2.6 HBase服务的启动
1.启动方式1
[atguigu@hadoop102 hbase]$ bin/hbase-daemon.sh start master
[atguigu@hadoop102 hbase]$ bin/hbase-daemon.sh start regionserver
提示:如果集群之间的节点时间不同步,会导致regionserver无法启动,抛出ClockOutOfSyncException异常。
修复提示:
a、同步时间服务
请参看帮助文档:《尚硅谷大数据技术之Hadoop入门》
b、属性:hbase.master.maxclockskew设置更大的值
<property>
<name>hbase.master.maxclockskew</name>
<value>180000</value>
<description>Time difference of regionserver from master</description>
</property>
2.启动方式2
[atguigu@hadoop102 hbase]$ bin/start-hbase.sh
对应的停止服务:
[atguigu@hadoop102 hbase]$ bin/stop-hbase.sh
2.7查看HBase页面
启动成功后,可以通过“host:port”的方式来访问HBase管理页面,例如:
三、Hbase架构
3.1 Hbase之基础元素
3.2 Hbase之HTable概念
(1)Row key
行主键,HBase不支持条件查询和Order by等查询,读取记录只能按Row key(及其range)或全表扫描,因此Row key需要根据业务来设计以利用其存储排序特性(Table按Row key字典序排序如1,10,100,11,2)提高性能。
(2)Column Family(列族)
在表创建时声明,每个Column Family为一个存储单元。在上例中设计了一个HBase表blog,该表有两个列族:article和author。
(3)Column(列)
HBase的每个列都属于一个列族,以列族名为前缀,如列article:title和article:content属于article列族,author:name和author:nickname属于author列族。
Column不用创建表时定义即可以动态新增,同一Column Family的Columns会群聚在一个存储单元上,并依Column key排序,因此设计时应将具有相同I/O特性的Column设计在一个Column Family上以提高性能。同时这里需要注意的是:这个列是可以增加和删除的,这和我们的传统数据库很大的区别。所以他适合非结构化数据。
(4)Timestamp
HBase通过row和column确定一份数据,这份数据的值可能有多个版本,不同版本的值按照时间倒序排序,即最新的数据排在最前面,查询时默认返回最新版本。如上例中row key=1的author:nickname值有两个版本,分别为1317180070811对应的“一叶渡江”和1317180718830对应的“yedu”(对应到实际业务可以理解为在某时刻修改了nickname为yedu,但旧值仍然存在)。Timestamp默认为系统当前时间(精确到毫秒),也可以在写入数据时指定该值。
(5)Value
每个值通过4个键唯一索引,tableName+RowKey+ColumnKey+Timestamp=>value,例如上例中{tableName=’blog’,RowKey=’1’,ColumnName=’author:nickname’,Timestamp=’ 1317180718830’}索引到的唯一值是“yedu”。
(6)存储类型
TableName是字符串
RowKey和 ColumnName 是二进制值(Java 类型 byte[])
Timestamp是一个 64 位整数(Java 类型 long)
value是一个字节数组(Java类型 byte[])。
(7)存储结构
可以简单的将HTable的存储结构理解为即HTable按Row key自动排序,每个Row包含任意数量个Columns,Columns之间按Column key自动排序,每个Column包含任意数量个Values。理解该存储结构将有助于查询结果的迭代。
3.3 Hbase存储结构图
3.4 Hbase存储流程
1)当客户端提交变更操作(如插入put,删除delete,计数新增incr),首先客户端会连接上Zookeeper找到-Root-表的存储位置,然后根据-Root-表所提供的.Meta.表的位置找到对应的Region所在的HRegionServer。数据变更信息会先通过HRegionServer写入一个commit log,也就是WAL。当写入WAL成功后,数据变更信息会存到MemStore中。当MemStore达到设定的maximum value(hbase.hregion.memstore.flush.size,默认64MB)后,MemStore就会开始进行Flush操作,将其内容持久化到一个新的HFile中。在Flush操作过程中,MemStore通过滚动机制继续对用户提供读写服务。随着Flush操作的不断进行,HFile文件越来越多。 当HFile文件超过设定的数量后,Hbase的HouseKeeping机制就会通过Compaction特性将HFile小文件合并成一个更大的HFile文件。在Compaction的过程中,会进行版本的合并以及数据的删除。由于storeFiles是不变的,用户执行删除操作时,并不能简单地通过删除其键值对来删除数据内容。Hbase提供了一个delete marker机制(也称为tombstone marker),会告诉HRegionServer那个指定的key已经被删除了。这样其它用户检索这个key的内容时,因为已经被标记为删除,所以也不会检索出来。在进行Compaction操作中就会丢弃这些已经打标的记录。经过多次Compaction后,HFile文件会越来越大,当达到设定的值时,会触发Split操作。将当前的Region根据RowKey对等切分成两个子Region,当期的那个Region被废弃,两个子Region会被分配到其他HRegionServer上。所以刚开始时一个表只有一个Region,随着不断的split,会产生越来越多的Region,通过HMaster
的LoadBalancer调整,Region会均匀遍布到所有的HRegionServer中。
2)当HLog满时,HRegionServer就会启动LogRoller,通过执行rollWriter方法将那些所有sequence number均小于最大的那个sequence number的logfile移动到.oldLog目录中等待被删除。如果用户设置了Deferred Log Flush为true,HRegionServer会缓存有关此表的所有变更,并通过LogSyncer调用sync()方法定时将变更信息同步到filesystem。默认为false的话,一旦有变更就会立刻同步到filesystem。
3)在一个HRegionServer中只有一个WAL,所有Region共享此WAL。HLog会根据Region提交变更信息的先后顺序依次顺序写入WAL中。如果用户设置了setWriteToWAL(false)方法,则有关此表的所有Region变更日志都不会写入WAL中。这也是上图中Region将变更日志写入WAL的那个垂直向下的箭头为什么是虚线的原因。
3.5 HBase体系架构
Client:包含访问HBase的接口并维护cache来加快对HBase的访问
Master:
(1)为Region server分配region
(2)负责Region server的负载均衡
(3)发现失效的Region server并重新分配其上的region
(4)管理用户对table的增删改操作
RegionServer:
(1)Region server维护region,处理对这些region的IO请求
(2)Region server负责切分在运行过程中变得过大的region
HLog(WAL log):
(1)HLog文件就是一个普通的Hadoop Sequence File,Sequence File 的Key是 HLogKey对象,HLogKey中记录了写入数据的归属信息,除了table和 region名字外,同时还包括sequence number和timestamp,timestamp是” 写入时间”,sequence number的起始值为0,或者是最近一次存入文件系 统中sequence number。
2)HLog SequeceFile的Value是HBase的KeyValue对象,即对应HFile中的 KeyValue
Region:
(1)HBase自动把表水平划分成多个区域(region),每个region会保存一个表 里面某段连续的数据;每个表一开始只有一个region,随着数据不断插 入表,region不断增大,当增大到一个阀值的时候,region就会等分会 两个新的region(裂变);
(2)当table中的行不断增多,就会有越来越多的region。这样一张完整的表 被保存在多个Regionserver上。
Memstore与 storefile:
(1)一个region由多个store组成,一个store对应一个CF(列族)
(2)store包括位于内存中的memstore和位于磁盘的storefile写操作先写入 memstore,当memstore中的数据达到某个阈值,hregionserver会启动 flashcache进程写入storefile,每次写入形成单独的一个storefile
(3)当storefile文件的数量增长到一定阈值后,系统会进行合并(minor、 major compaction),在合并过程中会进行版本合并和删除工作 (majar),形成更大的storefile。
(4)当一个region所有storefile的大小和超过一定阈值后,会把当前的region 分割为两个,并由hmaster分配到相应的regionserver服务器,实现负载均衡。
(5)客户端检索数据,先在memstore找,找不到再找storefile
(6)HRegion是HBase中分布式存储和负载均衡的最小单元。最小单元就表 示不同的HRegion可以分布在不同的HRegion server上。
(7)HRegion由一个或者多个Store组成,每个store保存一个columns family。
(8)每个Strore又由一个memStore和0至多个StoreFile组成。
四、Hbase基本操作
Hbase中主要的客户端接口是HTable类,HTable提供了对数据的所有CRUD操作。需要注意的是由于创建HTabe实例比较耗时, 所以在实际使用中最好创建单例模式的HTable实例,不过如果需要多个HTable实例的话,可以考虑使用HBase的HTablePool特性(下面后讲到)。Hbase不提供直接的update操作。由于Hbase中数据存储有版本支持。所以如果需要update一条记录,一般是通过put操作,这样历史版本会在Compaction操作中被合并掉,这样就间接实现了更新。(在MemStore中有一个变量MemstoreTS,该变量是随put操作而递增的。比如首先往列A,timeStamp为T1上put一条数据data1,假设此时MemstoreTS为1;之后如果想更新这条数据,只需要往列A,timeStamp为T1上put一条数据data2,此时MemstoreTS为2,Hbase会自动会将MemstoreTS大的排在前面。MemstoreTS小的在Compaction过程中就被过滤掉了。)
4.1 put操作
Put操作就是讲数据插入到Hbase中。有两种模式,一种是对单行的操作(single put);还有一种是对多行的操作(List of put)。针对单行操作的方式如下:
(一)创建put实例有如下构造函数:需要用户指定某行,用户也可以设定时间戳作为版本标示。此外,用户还可以加入自定义的行锁,以防其它用户或者其它线程在变更期间访问此行的数据。
Put(byte[] row)
Put(byte[] row, RowLock rowLock)
Put(byte[] row, long ts)
Put(byte[] row, long ts, RowLock rowLock)
在Hbase中参数的传递大多是byte数组类型。Hbase提供了许多静态方法将java类型转换成byte数组类型。如下:
static byte[] toBytes(ByteBuffer bb)
static byte[] toBytes(String s)
static byte[] toBytes(boolean b)
static byte[] toBytes(long val)
static byte[] toBytes(float f)
static byte[] toBytes(int val)
(二)一旦创建好put实例后,就可以通过put类提供的方法插入数据了。插入数据的操作需要指定列族,所在列等。如下:
Put add(byte[] family, byte[] qualifier, byte[] value)
Put add(byte[] family, byte[] qualifier, long ts, byte[] value)
Put add(KeyValue kv) throws IOException
(三)put组装完成后,就可以通过HTable提供的void put(Put put)throws IOException完成数据的插入操作。
如果需要对多行进行put操作,可以组装一系列的put实例,然后调用HTable提供的void put(List puts) throws IOException来完成多行插入操作。不过需要指出的是:如果在这多个Put实例中存在一个put实例有误(比如:往一个不存在的列族中插入数据),那么该put实例会报错,但是不影响其他的put实例。跟后面的get操作有点区别。
此外,Hbase还提供了一个原子型的put操作:Atomic compare-and-set ,方法如下:boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,byte[] value, Put put) throws IOException。只有校验成功后才会完成put操作.
需要注意的是,因为每次的put操作相当于一个RPC,将数据从客户端传递到服务端并返回。如果你的应用中RPC非常频繁,比如一秒内成千上万次,可能会有隐患。解决的办法就是尽量降低RPC次数,Hbase提供了一个嵌入的客户端写缓存器(Client-side Write Buffer)。它会缓存所有的put操作,然后再一次性提交。默认情况下Client-side Write Buffer是没有激活的。用户可以在创建HTable的时候通过调用table.setAutoFlush(false)方法来激活它。并且可以通过isAutoFlush()来检查是否已经激活。默认是true,表示一旦有put操作会立即发送到服务器端。当你想将所有put操作提交到服务器端时,可以调用flushCommits()操作。它会将缓存器中所有变更提交到远程服务器。Client-side Write Buffer还会自动对buffer中的所有变更进行分组,同一个HRegionServer的分到同一个组。这样每个HRegionServer通过一个RPC传送.
4.2 get操作
Get操作就是从服务器端获取数据。跟put操作一样,get操作也分为两种模式,一种是对单行的get操作(single get),另一种是对多行进行检索操作(List of gets)。
1、HTable提供的get方法如下:其返回值为Result类,该类包含了列族,列,keyvalue,
RowKey等信息。该类提供的丰富的方法供用户获取返回的各种信息。
Result get(Get get) throws IOException
2、Get类的构造函数如下,需要用户传入指定的行及行锁等参数。
Get(byte[] row)
Get(byte[] row, RowLock rowLock)
3、 一旦创建的get实例后,用户可以调用Get类提供的如下方法来框定你需要检索的数据。如下:用户可以指定列族,列,时间戳,最大版本号等。如果不设置版本号,默认是1,表示最大的版本。
Get addFamily(byte[] family)
Get addColumn(byte[] family, byte[] qualifier)
Get setTimeRange(long minStamp, long maxStamp) throws IOException
Get setTimeStamp(long timestamp)
Get setMaxVersions()
Get setMaxVersions(int maxVersions) throws IOException
跟List of put类似,对于多行的检索操作,HTable也提供了类似的如下方法:用户只要创建多个get实例,就可以通过如下方法获取需要的数据。不过需要注意的是:跟List of put不同的是,如果Get实例列表中只要存在一个Get实例有误(比如get一个不存在的列族的值),那么整体就会抛出一个异常.
Result[] get(List gets) throws IOException
4.3 delete操作
Delete操作也类似,HTable提供了两种方法,支持单个delete实例和多个delete实例的操作。如下:
void delete(Delete delete) throws IOException
void delete(List deletes) throws IOException
1、相应的delete实例构造函数有:
Delete(byte[] row)
Delete(byte[] row, long timestamp, RowLock rowLock)
2、如果你需要添加一些限制条件,可以使用delete类提供的相关方法,支持指定列族,列,时间戳等。如果你指定了一个时间戳,则表示小于等于该时间戳的时间将被删除。如果指定了列和行号,但没有指定时间戳,则默认会删掉版本号最大的那个值。
Delete deleteFamily(byte[] family)
Delete deleteFamily(byte[] family, long timestamp)
Delete deleteColumns(byte[] family, byte[] qualifier)
Delete deleteColumns(byte[] family, byte[] qualifier, long timestamp)
Delete deleteColumn(byte[] family, byte[] qualifier)
Delete deleteColumn(byte[] family, byte[] qualifier, long timestamp)
void setTimestamp(long timestamp)
3、当使用List of delete时,如果有一个delete实例出错,那么会抛出异常。而且delete的实例列表中只会存在那个出问题的delete实例。Delete也支持原子型的Compare-and- Delete,如下:
boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,byte[] value, Delete delete) throws IOException
4.4 Batch操作
Hbase还支持批量操作。其实上面所谈到的List of puts,gets,deletes都是基于Batch操作来的。不过List of puts,gets,deletes逐渐会被废弃。推荐使用Batch操作。HTable提供的batch操作方法如下:参数中Row类是Put,Delete,Get类的父类。表示用户可以同时传入put,get及delete实例操作。不过在一个batch中,最好不要同时传入针对同一行的put和delete实例。
(1) void batch(List actions, Object[] results) throws IOException, InterruptedException
(2) Object[] batch(List actions) throws IOException, InterruptedException上面这两个batch方法比较类似,但有比较大的区别。第一个batch方法需要用户传递一个数组,该数组用来填充batch操作中所有成功的操作的结果集。如果没有指定这个数组,比如第二个方法。一旦batch操作中某一个实例出现问题,那么Hbase只会抛出一个异常。那些成功的操作的结果并不会返回。而第一个方法则会将那些成功的操作的结果集返回给用户。
此外Batch操作不支持Client-side write buffer,Batch方法是同步的,会直接将其包含的操作发往服务器。这点需要注意!
Batch操作返回的结果可能的结果有如下几种:
1、null:表示那个操作操作连接远程服务器失败。
2、Empty Result:put和delete操作的返回结果,表示操作成功。
3、Result:get操作的返回结果集
4、Throwable:异常结果
4.5 Scan操作
Scan操作类似于传统的RDBMS中的游标的概念。其目的跟get一样,也是检索服务器端数据。Hbase也提供了一个Scan类。由于Scans类似于迭代器,所以你需要通过getScanner()方法获取。HTable提供了如下方法:如果你看了源码就会知道,后面那两个方法其实是先创建一个scan实例,并加入传入的参数,然后再调用第一个方法。
ResultScanner getScanner(Scan scan) throws IOException
ResultScanner getScanner(byte[] family) throws IOException
ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException
1、Scan类提供了多个构造函数,如下:startRow和stopRow是左闭右开的。从构造函数中可以看出,用户只需要指定rowKey的范围,或者添加相应的过滤器,Hbase能够自动检索你指定的RowKey的范围的数据。如果没有指定startRow,默认从第一行开始.
Scan()
Scan(byte[] startRow, Filter filter)
Scan(byte[] startRow)
Scan(byte[] startRow, byte[] stopRow)
2、当创建好Scan实例后,如果想添加更多的限制条件,可以通过调用Scan提供的如下方法:允许添加列族,列,时间戳等.
Scan addFamily(byte [] family)
Scan addColumn(byte[] family, byte[] qualifier)
Scan setTimeRange(long minStamp, long maxStamp) throws IOException
Scan setTimeStamp(long timestamp)
Scan setMaxVersions()
Scan setMaxVersions(int maxVersions)
GetScanner()方法返回的是一个ResultScanner实例。需要注意的是:如果结果集存在多行,Scans并不会一次性将所有行在一个RPC里面传送给客户端,而是基于一行一行传送。这样做主要是因为多行需要耗费大量时间。
ResultScanner类包装了Result类将其每行结果以迭代的方式输出,使得Scan操作类似于get操作。此外ResultScanner类提供了如下方法供用户进行迭代使用:用户可以选择一次返回一行或者多行。不过不要认为是服务器端一次性返回多行。其实是客户端循环调用nbRows 次next()方法而已。服务器端在一个RPC里面还是只传送一行数据。这个确实有点影响心情,但Hbase就喜欢恶心下你,不过它也提供的相应的解决办法:Scanner Caching,默认是关闭的。
Result next() throws IOException
Result[] next(int nbRows) throws IOException
void close()
close()方法表示释放ResultScanner实例。因为ResultScanner实例持有了一定的资源,如果不及时释放,可能随着时间推移会占用很大的内存空间。此外,close()操作最好放在finally模块,原因你懂得!
4.6 shell基本操作
1.进入HBase客户端命令行
[atguigu@hadoop102 hbase]$ bin/hbase shell
2.查看帮助命令
hbase(main):001:0> help
3.查看当前数据库中有哪些表
hbase(main):002:0> list
4.7 shell表的操作
1.创建表
hbase(main):002:0> create 'student','info'
2.插入数据到表
hbase(main):003:0> put 'student','1001','info:sex','male'
hbase(main):004:0> put 'student','1001','info:age','18'
hbase(main):005:0> put 'student','1002','info:name','Janna'
hbase(main):006:0> put 'student','1002','info:sex','female'
hbase(main):007:0> put 'student','1002','info:age','20'
3.扫描查看表数据
hbase(main):008:0> scan 'student'
hbase(main):009:0> scan 'student',{STARTROW => '1001', STOPROW => '1001'}
hbase(main):010:0> scan 'student',{STARTROW => '1001'}
4.查看表结构
hbase(main):011:0> describe ‘student’
5.更新指定字段的数据
hbase(main):012:0> put 'student','1001','info:name','Nick'
hbase(main):013:0> put 'student','1001','info:age','100'
6.查看“指定行”或“指定列族:列”的数据
hbase(main):014:0> get 'student','1001'
hbase(main):015:0> get 'student','1001','info:name'
7.统计表数据行数
hbase(main):021:0> count 'student'
8.删除数据
删除某rowkey的全部数据:
hbase(main):016:0> deleteall 'student','1001'
删除某rowkey的某一列数据:
hbase(main):017:0> delete 'student','1002','info:sex'
9.清空表数据
hbase(main):018:0> truncate 'student'
提示:清空表的操作顺序为先disable,然后再truncate。
10.删除表
首先需要先让该表为disable状态:
hbase(main):019:0> disable 'student'
然后才能drop这个表:
hbase(main):020:0> drop 'student'
提示:如果直接drop表,会报错:ERROR: Table student is enabled. Disable it first.
11.变更表信息
将info列族中的数据存放3个版本:
hbase(main):022:0> alter 'student',{NAME=>'info',VERSIONS=>3}
hbase(main):022:0> get 'student','1001',{COLUMN=>'info:name',VERSIONS=>3}
五、HBase原理
5.1读流程
HBase读数据流程如图3所示
1)Client向HregionServer发送写请求;
2)HregionServer将数据写到HLog(write ahead log)。为了数据的持久化和恢复;
3)HregionServer将数据写到内存(MemStore);
4)反馈Client写成功。
1)当MemStore数据达到阈值(默认是128M,老版本是64M),将数据刷到硬盘,将内存中的数据删除,同时删除HLog中的历史数据;
2)并将数据存储到HDFS中;
3)在HLog中做标记点。
5.4数据合并过程
1)当数据块达到4块,Hmaster触发合并操作,Region将数据块加载到本地,进行合并;
2)当合并的数据超过256M,进行拆分,将拆分后的Region分配给不同的HregionServer管理;
3)当HregionServer宕机后,将HregionServer上的hlog拆分,然后分配给不同的HregionServer加载,修改.META.;
4)注意:HLog会同步到HDFS。
六、HBase API操作
6.1环境准备
新建项目后在pom.xml中添加依赖:
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
6.2 HBaseAPI
6.2.1获取Configuration对象
public static Configuration conf;
static{
//使用HBaseConfiguration的单例方法实例化
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.9.102");
conf.set("hbase.zookeeper.property.clientPort", "2181");
}
6.2.2判断表是否存在
public static boolean isTableExist(String tableName) throws MasterNotRunningException,
ZooKeeperConnectionException, IOException{
//在HBase中管理、访问表需要先创建HBaseAdmin对象
//Connection connection = ConnectionFactory.createConnection(conf);
//HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
HBaseAdmin admin = new HBaseAdmin(conf);
return admin.tableExists(tableName);
}
6.2.3创建表
public static void createTable(String tableName, String... columnFamily) throws
MasterNotRunningException, ZooKeeperConnectionException, IOException{
HBaseAdmin admin = new HBaseAdmin(conf);
//判断表是否存在
if(isTableExist(tableName)){
System.out.println("表" + tableName + "已存在");
//System.exit(0);
}else{
//创建表属性对象,表名需要转字节
HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
//创建多个列族
for(String cf : columnFamily){
descriptor.addFamily(new HColumnDescriptor(cf));
}
//根据对表的配置,创建表
admin.createTable(descriptor);
System.out.println("表" + tableName + "创建成功!");
}
}
6.2.4删除表
public static void dropTable(String tableName) throws MasterNotRunningException,
ZooKeeperConnectionException, IOException{
HBaseAdmin admin = new HBaseAdmin(conf);
if(isTableExist(tableName)){
admin.disableTable(tableName);
admin.deleteTable(tableName);
System.out.println("表" + tableName + "删除成功!");
}else{
System.out.println("表" + tableName + "不存在!");
}
}
6.2.5向表中插入数据
public static void addRowData(String tableName, String rowKey, String columnFamily, String
column, String value) throws IOException{
//创建HTable对象
HTable hTable = new HTable(conf, tableName);
//向表中插入数据
Put put = new Put(Bytes.toBytes(rowKey));
//向Put对象中组装数据
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
hTable.put(put);
hTable.close();
System.out.println("插入数据成功");
}
6.2.6删除多行数据
public static void deleteMultiRow(String tableName, String... rows) throws IOException{
HTable hTable = new HTable(conf, tableName);
List<Delete> deleteList = new ArrayList<Delete>();
for(String row : rows){
Delete delete = new Delete(Bytes.toBytes(row));
deleteList.add(delete);
}
hTable.delete(deleteList);
hTable.close();
}
6.2.7获取所有数据
public static void getAllRows(String tableName) throws IOException{
HTable hTable = new HTable(conf, tableName);
//得到用于扫描region的对象
Scan scan = new Scan();
//使用HTable得到resultcanner实现类的对象
ResultScanner resultScanner = hTable.getScanner(scan);
for(Result result : resultScanner){
Cell[] cells = result.rawCells();
for(Cell cell : cells){
//得到rowkey
System.out.println("行键:" + Bytes.toString(CellUtil.cloneRow(cell)));
//得到列族
System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
}
6.2.8获取某一行数据
public static void getRow(String tableName, String rowKey) throws IOException{
HTable table = new HTable(conf, tableName);
Get get = new Get(Bytes.toBytes(rowKey));
//get.setMaxVersions();显示所有版本
//get.setTimeStamp();显示指定时间戳的版本
Result result = table.get(get);
for(Cell cell : result.rawCells()){
System.out.println("行键:" + Bytes.toString(result.getRow()));
System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
System.out.println("时间戳:" + cell.getTimestamp());
}
}
6.2.9获取某一行指定“列族:列”的数据
public static void getRowQualifier(String tableName, String rowKey, String family, String
qualifier) throws IOException{
HTable table = new HTable(conf, tableName);
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
Result result = table.get(get);
for(Cell cell : result.rawCells()){
System.out.println("行键:" + Bytes.toString(result.getRow()));
System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
6.3 MapReduce
通过HBase的相关JavaAPI,我们可以实现伴随HBase操作的MapReduce过程,比如使用MapReduce将数据从本地文件系统导入到HBase的表中,比如我们从HBase中读取一些原始数据后使用MapReduce做数据分析。
6.3.1官方HBase-MapReduce
1.查看HBase的MapReduce任务的执行
$ bin/hbase mapredcp
2.环境变量的导入
(1)执行环境变量的导入(临时生效,在命令行执行下述操作)
$ export HBASE_HOME=/opt/module/hbase-1.3.1
$ export HADOOP_HOME=/opt/module/hadoop-2.7.2
$ export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
(2)永久生效:在/etc/profile配置
export HBASE_HOME=/opt/module/hbase-1.3.1
export HADOOP_HOME=/opt/module/hadoop-2.7.2
并在hadoop-env.sh中配置:(注意:在for循环之后配)
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*
3.运行官方的MapReduce任务
--案例一:统计Student表中有多少行数据
$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar rowcounter student
--案例二:使用MapReduce将本地数据导入到HBase
1)在本地创建一个tsv格式的文件:fruit.tsv
1001 Apple Red
1002 Pear Yellow
1003 Pineapple Yellow
2)创建HBase表
hbase(main):001:0> create 'fruit','info'
3)在HDFS中创建input_fruit文件夹并上传fruit.tsv文件
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir /input_fruit/
$ /opt/module/hadoop-2.7.2/bin/hdfs dfs -put fruit.tsv /input_fruit/
[if !supportLists]4)[endif]执行MapReduce到HBase的fruit表中
$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \
hdfs://hadoop102:9000/input_fruit
[if !supportLists]5)[endif]使用scan命令查看导入后的结果
hbase(main):001:0> scan ‘fruit’
6.3.2自定义HBase-MapReduce1
目标:将fruit表中的一部分数据,通过MR迁入到fruit_mr表中。
分步实现:
1.构建ReadFruitMapper类,用于读取fruit表中的数据
package com.atguigu;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
//将fruit的name和color提取出来,相当于将每一行数据读取出来放入到Put对象中。
Put put = new Put(key.get());
//遍历添加column行
for(Cell cell: value.rawCells()){
//添加/克隆列族:info
if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
//添加/克隆列:name
if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
//将该列cell加入到put对象中
put.add(cell);
//添加/克隆列:color
}else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
//向该列cell加入到put对象中
put.add(cell);
}
}
}
//将从fruit读取到的每行数据写入到context中作为map的输出
context.write(key, put);
}
}
2. 构建WriteFruitMRReducer类,用于将读取到的fruit表中的数据写入到fruit_mr表中
package com.atguigu.hbase_mr;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)
throws IOException, InterruptedException {
//读出来的每一行数据写入到fruit_mr表中
for(Put put: values){
context.write(NullWritable.get(), put);
}
}
}
3.构建Fruit2FruitMRRunner extends Configured implements Tool用于组装运行Job任务
//组装Job
public int run(String[] args) throws Exception {
//得到Configuration
Configuration conf = this.getConf();
//创建Job任务
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(Fruit2FruitMRRunner.class);
//配置Job
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setCaching(500);
//设置Mapper,注意导入的是mapreduce包下的,不是mapred包下的,后者是老版本
TableMapReduceUtil.initTableMapperJob(
"fruit", //数据源的表名
scan, //scan扫描控制器
ReadFruitMapper.class,//设置Mapper类
ImmutableBytesWritable.class,//设置Mapper输出key类型
Put.class,//设置Mapper输出value值类型
job//设置给哪个JOB
);
//设置Reducer
TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRReducer.class, job);
//设置Reduce数量,最少1个
job.setNumReduceTasks(1);
boolean isSuccess = job.waitForCompletion(true);
if(!isSuccess){
throw new IOException("Job running with error");
}
return isSuccess ? 0 : 1;
}
4.主函数中调用运行该Job任务
public static void main( String[] args ) throws Exception{
Configuration conf = HBaseConfiguration.create();
int status = ToolRunner.run(conf, new Fruit2FruitMRRunner(), args);
System.exit(status);
}
5.打包运行任务
$ /opt/module/hadoop-2.7.2/bin/yarn jar ~/softwares/jars/hbase-0.0.1-SNAPSHOT.jar
com.z.hbase.mr1.Fruit2FruitMRRunner
提示:运行任务前,如果待数据导入的表不存在,则需要提前创建。
提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)
6.3.3自定义HBase-MapReduce2
目标:实现将HDFS中的数据写入到HBase表中。
分步实现:
1.构建ReadFruitFromHDFSMapper于读取HDFS中的文件数据
package com.atguigu;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class ReadFruitFromHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//从HDFS中读取的数据
String lineValue = value.toString();
//读取出来的每行数据使用\t进行分割,存于String数组
String[] values = lineValue.split("\t");
//根据数据中值的含义取值
String rowKey = values[0];
String name = values[1];
String color = values[2];
//初始化rowKey
ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey));
//初始化put对象
Put put = new Put(Bytes.toBytes(rowKey));
//参数分别:列族、列、值
put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));
put.add(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(color));
context.write(rowKeyWritable, put);
}
}
2.构建WriteFruitMRFromTxtReducer类
package com.z.hbase.mr2;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
public class WriteFruitMRFromTxtReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
//读出来的每一行数据写入到fruit_hdfs表中
for(Put put: values){
context.write(NullWritable.get(), put);
}
}
}
3.创建Txt2FruitRunner组装Job
public int run(String[] args) throws Exception {
//得到Configuration
Configuration conf = this.getConf();
//创建Job任务
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(Txt2FruitRunner.class);
Path inPath = new Path("hdfs://hadoop102:9000/input_fruit/fruit.tsv");
FileInputFormat.addInputPath(job, inPath);
//设置Mapper
job.setMapperClass(ReadFruitFromHDFSMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
//设置Reducer
TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRFromTxtReducer.class, job);
//设置Reduce数量,最少1个
job.setNumReduceTasks(1);
boolean isSuccess = job.waitForCompletion(true);
if(!isSuccess){
throw new IOException("Job running with error");
}
return isSuccess ? 0 : 1;
}
4.调用执行Job
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int status = ToolRunner.run(conf, new Txt2FruitRunner(), args);
System.exit(status);
}
5.打包运行
$ /opt/module/hadoop-2.7.2/bin/yarn jar hbase-0.0.1-SNAPSHOT.jar com.atguigu.hbase.mr2.Txt2FruitRunner
提示:运行任务前,如果待数据导入的表不存在,则需要提前创建之。
提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)
6.4与Hive的集成
6.4.1 HBase与Hive的对比
1.Hive
(1)数据仓库
Hive的本质其实就相当于将HDFS中已经存储的文件在Mysql中做了一个双射关系,以方便使用HQL去管理查询。
(2)用于数据分析、清洗
Hive适用于离线的数据分析和清洗,延迟较高。
(3)基于HDFS、MapReduce
Hive存储的数据依旧在DataNode上,编写的HQL语句终将是转换为MapReduce代码执行。
2.HBase
(1)数据库
是一种面向列存储的非关系型数据库。
(2)用于存储结构化和非结构化的数据
适用于单表非关系型数据的存储,不适合做关联查询,类似JOIN等操作。
(3)基于HDFS
数据持久化存储的体现形式是Hfile,存放于DataNode中,被ResionServer以region的形式进行管理。
(4)延迟较低,接入在线业务使用
面对大量的企业数据,HBase可以直线单表大量数据的存储,同时提供了高效的数据访问速度。
6.4.2 HBase与Hive集成使用
尖叫提示:HBase与Hive的集成在最新的两个版本中无法兼容。所以,我们只能含着泪勇敢的重新编译:hive-hbase-handler-1.2.2.jar!!好气!!
环境准备
因为我们后续可能会在操作Hive的同时对HBase也会产生影响,所以Hive需要持有操作HBase的Jar,那么接下来拷贝Hive所依赖的Jar包(或者使用软连接的形式)。
export HBASE_HOME=/opt/module/hbase
export HIVE_HOME=/opt/module/hive
ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar $HIVE_HOME/lib/hbase-common-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbase-server-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jar
ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_HOME/lib/htrace-core-3.1.0-incubating.jar
ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar
同时在hive-site.xml中修改zookeeper的属性,如下:
<property>
<name>hive.zookeeper.quorum</name>
<value>hadoop102,hadoop103,hadoop104</value>
<description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
<description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
1.案例一
目标:建立Hive表,关联HBase表,插入数据到Hive表的同时能够影响HBase表。
分步实现:
(1) 在Hive中创建表同时关联HBase
CREATE TABLE hive_hbase_emp_table(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
提示:完成之后,可以分别进入Hive和HBase查看,都生成了对应的表
(2)在Hive中创建临时中间表,用于load文件中的数据
提示:不能将数据直接load进Hive所关联HBase的那张表中
CREATE TABLE emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
row format delimited fields terminated by '\t';
(3)向Hive中间表中load数据
hive> load data local inpath '/home/admin/softwares/data/emp.txt' into table emp;
(4)通过insert命令将中间表中的数据导入到Hive关联HBase的那张表中
hive> insert into table hive_hbase_emp_table select * from emp;
(5)查看Hive以及关联的HBase表中是否已经成功的同步插入了数据
Hive:
hive> select * from hive_hbase_emp_table;
HBase:
hbase> scan ‘hbase_emp_table’
2.案例二
目标:在HBase中已经存储了某一张表hbase_emp_table,然后在Hive中创建一个外部表来关联HBase中的hbase_emp_table这张表,使之可以借助Hive来分析HBase这张表中的数据。
注:该案例2紧跟案例1的脚步,所以完成此案例前,请先完成案例1。
分步实现:
(1)在Hive中创建外部表
CREATE EXTERNAL TABLE relevance_hbase_emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
(2) 关联后就可以使用Hive函数进行一些分析操作了
hive (default)> select * from relevance_hbase_emp;
七、HBase优化
7.1高可用
在HBase中Hmaster负责监控RegionServer的生命周期,均衡RegionServer的负载,如果Hmaster挂掉了,那么整个HBase集群将陷入不健康的状态,并且此时的工作状态并不会维持太久。所以HBase支持对Hmaster的高可用配置。
1.关闭HBase集群(如果没有开启则跳过此步)
[atguigu@hadoop102 hbase]$ bin/stop-hbase.sh
2.在conf目录下创建backup-masters文件
[atguigu@hadoop102 hbase]$ touch conf/backup-masters
3.在backup-masters文件中配置高可用HMaster节点
[atguigu@hadoop102 hbase]$ echo hadoop103 > conf/backup-masters
4.将整个conf目录scp到其他节点
[atguigu@hadoop102 hbase]$ scp -r conf/ hadoop103:/opt/module/hbase/
[atguigu@hadoop102 hbase]$ scp -r conf/ hadoop104:/opt/module/hbase/
5.打开页面测试查看
7.2预分区
每一个region维护着startRow与endRowKey,如果加入的数据符合某个region维护的rowKey范围,则该数据交给这个region维护。那么依照这个原则,我们可以将数据所要投放的分区提前大致的规划好,以提高HBase性能。
1.手动设定预分区
hbase> create 'staff1','info','partition1',SPLITS => ['1000','2000','3000','4000']
2.生成16进制序列预分区
create 'staff2','info','partition2',{NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}
3.按照文件中设置的规则预分区
创建splits.txt文件内容如下:
aaaa
bbbb
cccc
dddd
然后执行:
create 'staff3','partition3',SPLITS_FILE => 'splits.txt'
4.使用JavaAPI创建预分区
//自定义算法,产生一系列Hash散列值存储在二维数组中
byte[][] splitKeys =某个散列值函数
//创建HBaseAdmin实例
HBaseAdmin hAdmin = new HBaseAdmin(HBaseConfiguration.create());
//创建HTableDescriptor实例
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
//通过HTableDescriptor实例和散列值二维数组创建带有预分区的HBase表
hAdmin.createTable(tableDesc, splitKeys);
7.3 RowKey设计
一条数据的唯一标识就是rowkey,那么这条数据存储于哪个分区,取决于rowkey处于哪个一个预分区的区间内,设计rowkey的主要目的 ,就是让数据均匀的分布于所有的region中,在一定程度上防止数据倾斜。接下来我们就谈一谈rowkey常用的设计方案。
1.生成随机数、hash、散列值
比如:
原本rowKey为1001的,SHA1后变成:dd01903921ea24941c26a48f2cec24e0bb0e8cc7
原本rowKey为3001的,SHA1后变成:49042c54de64a1e9bf0b33e00245660ef92dc7bd
原本rowKey为5001的,SHA1后变成:7b61dec07e02c188790670af43e717f0f46e8913
在做此操作之前,一般我们会选择从数据集中抽取样本,来决定什么样的rowKey来Hash后作为每个分区的临界值。
2.字符串反转
20170524000001转成10000042507102
20170524000002转成20000042507102
这样也可以在一定程度上散列逐步put进来的数据。
3.字符串拼接
20170524000001_a12e
20170524000001_93i7
7.4内存优化
HBase操作过程中需要大量的内存开销,毕竟Table是可以缓存在内存中的,一般会分配整个可用内存的70%给HBase的Java堆。但是不建议分配非常大的堆内存,因为GC过程持续太久会导致RegionServer处于长期不可用状态,一般16~48G内存就可以了,如果因为框架占用内存过高导致系统内存不足,框架一样会被系统服务拖死。
7.5基础优化
1.允许在HDFS的文件中追加内容
hdfs-site.xml、hbase-site.xml
属性:dfs.support.append
解释:开启HDFS追加同步,可以优秀的配合HBase的数据同步和持久化。默认值为true。
2.优化DataNode允许的最大文件打开数
hdfs-site.xml
属性:dfs.datanode.max.transfer.threads
解释:HBase一般都会同一时间操作大量的文件,根据集群的数量和规模以及数据动作,设置为4096或者更高。默认值:4096
3.优化延迟高的数据操作的等待时间
hdfs-site.xml
属性:dfs.image.transfer.timeout
解释:如果对于某一次数据操作来讲,延迟非常高,socket需要等待更长的时间,建议把该值设置为更大的值(默认60000毫秒),以确保socket不会被timeout掉。
4.优化数据的写入效率
mapred-site.xml
属性:
mapreduce.map.output.compress
mapreduce.map.output.compress.codec
解释:开启这两个数据可以大大提高文件的写入效率,减少写入时间。第一个属性值修改为true,第二个属性值修改为:org.apache.hadoop.io.compress.GzipCodec或者其他压缩方式。
5.设置RPC监听数量
hbase-site.xml
属性:hbase.regionserver.handler.count
解释:默认值为30,用于指定RPC监听的数量,可以根据客户端的请求数进行调整,读写请求较多时,增加此值。
6.优化HStore文件大小
hbase-site.xml
属性:hbase.hregion.max.filesize
解释:默认值10737418240(10GB),如果需要运行HBase的MR任务,可以减小此值,因为一个region对应一个map任务,如果单个region过大,会导致map任务执行时间过长。该值的意思就是,如果HFile的大小达到这个数值,则这个region会被切分为两个Hfile。
7.优化hbase客户端缓存
hbase-site.xml
属性:hbase.client.write.buffer
解释:用于指定HBase客户端缓存,增大该值可以减少RPC调用次数,但是会消耗更多内存,反之则反之。一般我们需要设定一定的缓存大小,以达到减少RPC次数的目的。
8.指定scan.next扫描HBase所获取的行数
hbase-site.xml
属性:hbase.client.scanner.caching
解释:用于指定scan.next方法获取的默认行数,值越大,消耗内存越大。
9.flush、compact、split机制
当MemStore达到阈值,将Memstore中的数据Flush进Storefile;compact机制则是把flush出来的小文件合并成大的Storefile文件。split则是当Region达到阈值,会把过大的Region一分为二。
涉及属性:
即:128M就是Memstore的默认阈值
hbase.hregion.memstore.flush.size:134217728
即:这个参数的作用是当单个HRegion内所有的Memstore大小总和超过指定值时,flush该HRegion的所有memstore。RegionServer的flush是通过将请求添加一个队列,模拟生产消费模型来异步处理的。那这里就有一个问题,当队列来不及消费,产生大量积压请求时,可能会导致内存陡增,最坏的情况是触发OOM。
hbase.regionserver.global.memstore.upperLimit:0.4
hbase.regionserver.global.memstore.lowerLimit:0.38
即:当MemStore使用内存总量达到hbase.regionserver.global.memstore.upperLimit指定值时,将会有多个MemStores flush到文件中,MemStore flush 顺序是按照大小降序执行的,直到刷新到MemStore使用内存略小于lowerLimit