我们知道hbase是列式存储的分布式数据库,数据是以kv形式存储的,hbase官方也开放了API接口供我们使用,进行数据的各种交互也是很方便,但是hbase本身是nosql数据库,不支持sql的查询,于是phoenix横空出世,就是为了解决hbase的sql化查询而生。
下面我们就介绍phoenix结合hbase的应用:
1,建表映射hbase中的表
create table IF NOT EXISTS "phoenixtest"("ROW" varchar primary key, "info"."name" varchar , "info"."age" varchar,"info"."addr" varchar);
把HBASE中的ROW当作主键
表名和列族以及列名需要用双引号括起来,因为HBase是区分大小写的,如果不用双引号括起来的话Phoenix在创建表的时候会自动将小写转换为大写字母
2,sql查询
select * from "phoenixtest";
3,java API查询
pom依赖:
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-core -->
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.7.0-HBase-1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-server-client -->
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-server-client</artifactId>
<version>4.7.0-HBase-1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>1.1.2</version>
<type>pom</type>
</dependency>
</dependencies>
编写一个phoenix连接池:
JdbcConnectionPool
/**
-
phoenix连接池
*/
public class JdbcConnectionPool implements DataSource {
private static PropertyUtil property = PropertyUtil.getInstance();
private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(JdbcConnectionPool.class);
private static String driver;
private static String url;
private static int jdbcConnectionInitSize;//最小连接数量
private static int max = 10; //当前最大连接数量
private static LinkedList list = new LinkedList<Connection>();static {
try {
url = property.getString("url", "");
driver = property.getString("driver", "");
Class.forName(driver);
jdbcConnectionInitSize = property.getInt("jdbcConnectionInitSize", 0);
//创建最小数据库的连接
for (int i = 0; i < jdbcConnectionInitSize; i++) {
final Connection conn = DriverManager.getConnection(url);
System.out.println("connected to phoenix...");
list.add(conn);
}
} catch (SQLException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}@Override
public Connection getConnection() throws SQLException {
if (list.size() == 0 && max <= 50) {
try {
Class.forName(driver);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
for (int i = 0; i < jdbcConnectionInitSize; i++) {
final Connection conn = DriverManager.getConnection(url);
list.add(conn);
}
max++;
}
if (list.size() > 0) {
final Connection conn1 = (Connection) list.removeFirst();
return (Connection) Proxy.newProxyInstance(JdbcConnectionPool.class.getClass().getClassLoader(),
new Class[]{Connection.class}, new InvocationHandler() {@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (!method.getName().equalsIgnoreCase("close")) { return method.invoke(conn1, args); } else { list.add(conn1); return null; } } }); } else { System.out.println("connect phoenix error."); } return null;
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
return null;
}@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return null;
}@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return false;
}@Override
public PrintWriter getLogWriter() throws SQLException {
return null;
}@Override
public void setLogWriter(PrintWriter out) throws SQLException {}
@Override
public void setLoginTimeout(int seconds) throws SQLException {}
@Override
public int getLoginTimeout() throws SQLException {
return 0;
}@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return null;
}
}
编写连接工具:
JdbcConnection
public class JdbcConnection {
private static JdbcConnectionPool pool = new JdbcConnectionPool();
private final static Connection conn = null;
private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(JdbcConnection.class);
/**
* 获取链接
*
* @return
* @throws SQLException
*/
public static Connection getConnection() throws SQLException {
return pool.getConnection();
}
/**
* 释放连接
*
* @param conn
* @param st
* @param rs
*/
public static void release(final Connection conn, final PreparedStatement st, final ResultSet rs) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
super.run();
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (st != null) {
try {
st.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
});
}
}
加载配置文件:
//加载配置文件
public class PropertyUtil extends Properties {
private static final long serialVersionUID = 50440463580273222L;
private static PropertyUtil instance = null;
public static synchronized PropertyUtil getInstance() {
if (instance == null) {
instance = new PropertyUtil();
}
return instance;
}
public String getProperty(String key, String defaultValue) {
String val = getProperty(key);
return (val == null || val.isEmpty()) ? defaultValue : val;
}
public String getString(String name, String defaultValue) {
return this.getProperty(name, defaultValue);
}
public int getInt(String name, int defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
}
public long getLong(String name, long defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
}
public float getFloat(String name, float defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);
}
public double getDouble(String name, double defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);
}
public byte getByte(String name, byte defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);
}
public PropertyUtil() {
InputStream in;
try {
in = this.getClass().getClassLoader().getResourceAsStream("cloudConfig.properties");
this.load(in);
in.close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
编写连接测试:
public class PhoenixTest {
public static void main(String[] args) {
Connection conn = null;
Statement statement = null;
final PropertyUtil property = PropertyUtil.getInstance();
final String url = property.getString("url", "");
final String driver = property.getString("driver", "");
final String sql = property.getString("sql", "");
try {
Class.forName(driver);
conn = DriverManager.getConnection(url);
statement = conn.createStatement();
System.out.println("--------------------------");
final ResultSet rs = statement.executeQuery(sql);
while (rs.next()) {
System.out.println(rs.getString("name"));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
conn.close();
statement.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
配置文件
cloudConfig.properties:
jdbc info
url=jdbc:phoenix:192.168.1.12:2181
driver=org.apache.phoenix.jdbc.PhoenixDriver
jdbcConnectionInitSize=10
sql=select * from "phoenixtest"
注意此时直接运行是连接不上的,为什么?
因为phoenix需要加载hbase的配置文件才能找到需要的相关参数,于是我们将hbase-site.xml放入到source 目录conf下
hbase-site.xml
<configuration>
<property>
<name>dfs.domain.socket.path</name>
<value>/var/lib/hadoop-hdfs/dn_socket</value>
</property>
<property>
<name>hbase.bulkload.staging.dir</name>
<value>/apps/hbase/staging</value>
</property>
<property>
<name>hbase.client.keyvalue.maxsize</name>
<value>1048576</value>
</property>
<property>
<name>hbase.client.retries.number</name>
<value>35</value>
</property>
<property>
<name>hbase.client.scanner.caching</name>
<value>100</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.coprocessor.master.classes</name>
<value></value>
</property>
<property>
<name>hbase.coprocessor.region.classes</name>
<value>org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint</value>
</property>
<property>
<name>hbase.custom-extensions.root</name>
<value>/hdp/ext/2.6/hbase</value>
</property>
<property>
<name>hbase.defaults.for.version.skip</name>
<value>true</value>
</property>
<property>
<name>hbase.hregion.majorcompaction</name>
<value>604800000</value>
</property>
<property>
<name>hbase.hregion.majorcompaction.jitter</name>
<value>0.50</value>
</property>
<property>
<name>hbase.hregion.max.filesize</name>
<value>10737418240</value>
</property>
<property>
<name>hbase.hregion.memstore.block.multiplier</name>
<value>4</value>
</property>
<property>
<name>hbase.hregion.memstore.flush.size</name>
<value>134217728</value>
</property>
<property>
<name>hbase.hregion.memstore.mslab.enabled</name>
<value>true</value>
</property>
<property>
<name>hbase.hstore.blockingStoreFiles</name>
<value>10</value>
</property>
<property>
<name>hbase.hstore.compaction.max</name>
<value>10</value>
</property>
<property>
<name>hbase.hstore.compactionThreshold</name>
<value>3</value>
</property>
<property>
<name>hbase.local.dir</name>
<value>${hbase.tmp.dir}/local</value>
</property>
<property>
<name>hbase.master.info.bindAddress</name>
<value>0.0.0.0</value>
</property>
<property>
<name>hbase.master.info.port</name>
<value>16010</value>
</property>
<property>
<name>hbase.master.namespace.init.timeout</name>
<value>2400000</value>
</property>
<property>
<name>hbase.master.port</name>
<value>16000</value>
</property>
<property>
<name>hbase.master.ui.readonly</name>
<value>false</value>
</property>
<property>
<name>hbase.master.wait.on.regionservers.timeout</name>
<value>30000</value>
</property>
<property>
<name>hbase.region.server.rpc.scheduler.factory.class</name>
<value>org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory</value>
</property>
<property>
<name>hbase.regionserver.executor.openregion.threads</name>
<value>20</value>
</property>
<property>
<name>hbase.regionserver.global.memstore.size</name>
<value>0.4</value>
</property>
<property>
<name>hbase.regionserver.handler.count</name>
<value>30</value>
</property>
<property>
<name>hbase.regionserver.info.port</name>
<value>16030</value>
</property>
<property>
<name>hbase.regionserver.port</name>
<value>16020</value>
</property>
<property>
<name>hbase.regionserver.wal.codec</name>
<value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>
<property>
<name>hbase.rootdir</name>
<value>hdfs://node3:8020/apps/hbase/data</value>
</property>
<property>
<name>hbase.rpc.protection</name>
<value>authentication</value>
</property>
<property>
<name>hbase.rpc.timeout</name>
<value>90000</value>
</property>
<property>
<name>hbase.security.authentication</name>
<value>simple</value>
</property>
<property>
<name>hbase.security.authorization</name>
<value>false</value>
</property>
<property>
<name>hbase.superuser</name>
<value>hbase</value>
</property>
<property>
<name>hbase.tmp.dir</name>
<value>/tmp/hbase-${user.name}</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>node1,node2,node3,node4,node6,node7,node8,node9,node10</value>
</property>
<property>
<name>hbase.zookeeper.useMulti</name>
<value>true</value>
</property>
<property>
<name>hfile.block.cache.size</name>
<value>0.4</value>
</property>
<property>
<name>phoenix.functions.allowUserDefinedFunctions</name>
<value>true</value>
</property>
<property>
<name>phoenix.query.timeoutMs</name>
<value>60000</value>
</property>
<property>
<name>zookeeper.recovery.retry</name>
<value>6</value>
</property>
<property>
<name>zookeeper.session.timeout</name>
<value>90000</value>
</property>
<property>
<name>zookeeper.znode.parent</name>
<value>/hbase-unsecure</value>
</property>
</configuration>
程序目录结构如下:
select * from "ott_deviceinfo_buffer" where "userid"='13860507270';
注意,13860507270字符串必须用单引号,userid字段必须用双引号
创建二级索引
create index idex0 on "ott_deviceinfo_buffer" ("info"."deviceid");
创建多级索引
create local index idex2 on "ott_deviceinfo_buffer" ("info"."terminalid","info"."terminalmode");
查看是否应用了二级索引
explain select * from "ott_deviceinfo_buffer" where "userid"='13860507270';
创建异步索引表
CREATE INDEX async_index ON ""ott_deviceinfo_buffer"" (v) ASYNC
通过create index的时候指定 ASYNC 关键字来指定异步创建索引。执行这个命令之后并不会引起索引表与源表的直接同步。这个时候查询并不会使用这个索引表。那么索引数据的导入还需要采用phoenix提供的索引同步工具类 IndexTool , 这是一个mapreduce工具类,使用方式如下:
${HBASE_HOME}/bin/hbase org.apache.phoenix.mapreduce.index.IndexTool
--schema MY_SCHEMA --data-table MY_TABLE --index-table ASYNC_IDX
--output-path ASYNC_IDX_HFILES
执行结束以后才会应用索引表