hbase-phoenix集合的应用

我们知道hbase是列式存储的分布式数据库,数据是以kv形式存储的,hbase官方也开放了API接口供我们使用,进行数据的各种交互也是很方便,但是hbase本身是nosql数据库,不支持sql的查询,于是phoenix横空出世,就是为了解决hbase的sql化查询而生。
下面我们就介绍phoenix结合hbase的应用:
1,建表映射hbase中的表
create table IF NOT EXISTS "phoenixtest"("ROW" varchar primary key, "info"."name" varchar , "info"."age" varchar,"info"."addr" varchar);
把HBASE中的ROW当作主键
表名和列族以及列名需要用双引号括起来,因为HBase是区分大小写的,如果不用双引号括起来的话Phoenix在创建表的时候会自动将小写转换为大写字母
2,sql查询
select * from "phoenixtest";


image.png

3,java API查询
pom依赖:
<dependencies>

    <!-- https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-core -->
    <dependency>
        <groupId>org.apache.phoenix</groupId>
        <artifactId>phoenix-core</artifactId>
        <version>4.7.0-HBase-1.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-server-client -->
    <dependency>
        <groupId>org.apache.phoenix</groupId>
        <artifactId>phoenix-server-client</artifactId>
        <version>4.7.0-HBase-1.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase -->
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase</artifactId>
        <version>1.1.2</version>
        <type>pom</type>
    </dependency>

</dependencies>

编写一个phoenix连接池:
JdbcConnectionPool
/**

  • phoenix连接池
    */
    public class JdbcConnectionPool implements DataSource {
    private static PropertyUtil property = PropertyUtil.getInstance();
    private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(JdbcConnectionPool.class);
    private static String driver;
    private static String url;
    private static int jdbcConnectionInitSize;//最小连接数量
    private static int max = 10; //当前最大连接数量
    private static LinkedList list = new LinkedList<Connection>();

    static {
    try {
    url = property.getString("url", "");
    driver = property.getString("driver", "");
    Class.forName(driver);
    jdbcConnectionInitSize = property.getInt("jdbcConnectionInitSize", 0);
    //创建最小数据库的连接
    for (int i = 0; i < jdbcConnectionInitSize; i++) {
    final Connection conn = DriverManager.getConnection(url);
    System.out.println("connected to phoenix...");
    list.add(conn);
    }
    } catch (SQLException e) {
    e.printStackTrace();
    } catch (ClassNotFoundException e) {
    e.printStackTrace();
    }
    }

    @Override
    public Connection getConnection() throws SQLException {
    if (list.size() == 0 && max <= 50) {
    try {
    Class.forName(driver);
    } catch (ClassNotFoundException e) {
    e.printStackTrace();
    }
    for (int i = 0; i < jdbcConnectionInitSize; i++) {
    final Connection conn = DriverManager.getConnection(url);
    list.add(conn);
    }
    max++;
    }
    if (list.size() > 0) {
    final Connection conn1 = (Connection) list.removeFirst();
    return (Connection) Proxy.newProxyInstance(JdbcConnectionPool.class.getClass().getClassLoader(),
    new Class[]{Connection.class}, new InvocationHandler() {

                     @Override
                     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                         if (!method.getName().equalsIgnoreCase("close")) {
                             return method.invoke(conn1, args);
                         } else {
                             list.add(conn1);
                             return null;
                         }
                     }
                 });
     } else {
         System.out.println("connect phoenix error.");
     }
     return null;
    

    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
    return null;
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
    return null;
    }

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
    return false;
    }

    @Override
    public PrintWriter getLogWriter() throws SQLException {
    return null;
    }

    @Override
    public void setLogWriter(PrintWriter out) throws SQLException {

    }

    @Override
    public void setLoginTimeout(int seconds) throws SQLException {

    }

    @Override
    public int getLoginTimeout() throws SQLException {
    return 0;
    }

    @Override
    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
    return null;
    }

}

编写连接工具:
JdbcConnection
public class JdbcConnection {
private static JdbcConnectionPool pool = new JdbcConnectionPool();
private final static Connection conn = null;
private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(JdbcConnection.class);

/**
 * 获取链接
 *
 * @return
 * @throws SQLException
 */
public static Connection getConnection() throws SQLException {
    return pool.getConnection();
}

/**
 * 释放连接
 *
 * @param conn
 * @param st
 * @param rs
 */
public static void release(final Connection conn, final PreparedStatement st, final ResultSet rs) {
    Runtime.getRuntime().addShutdownHook(new Thread() {

        @Override
        public void run() {
            super.run();
            if (rs != null) {
                try {
                    rs.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (st != null) {
                try {
                    st.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    });
}

}

加载配置文件:
//加载配置文件
public class PropertyUtil extends Properties {

private static final long serialVersionUID = 50440463580273222L;

private static PropertyUtil instance = null;

public static synchronized PropertyUtil getInstance() {
    if (instance == null) {
        instance = new PropertyUtil();
    }
    return instance;
}

public String getProperty(String key, String defaultValue) {
    String val = getProperty(key);
    return (val == null || val.isEmpty()) ? defaultValue : val;
}

public String getString(String name, String defaultValue) {
    return this.getProperty(name, defaultValue);
}

public int getInt(String name, int defaultValue) {
    String val = this.getProperty(name);
    return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
}

public long getLong(String name, long defaultValue) {
    String val = this.getProperty(name);
    return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
}

public float getFloat(String name, float defaultValue) {
    String val = this.getProperty(name);
    return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);
}

public double getDouble(String name, double defaultValue) {
    String val = this.getProperty(name);
    return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);
}

public byte getByte(String name, byte defaultValue) {
    String val = this.getProperty(name);
    return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);
}

public PropertyUtil() {
    InputStream in;
    try {
        in = this.getClass().getClassLoader().getResourceAsStream("cloudConfig.properties");
        this.load(in);
        in.close();
    } catch (IOException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }
}

}
编写连接测试:
public class PhoenixTest {

public static void main(String[] args) {
    Connection conn = null;
    Statement statement = null;
    final PropertyUtil property = PropertyUtil.getInstance();
    final String url = property.getString("url", "");
    final String driver = property.getString("driver", "");
    final String sql = property.getString("sql", "");
    try {
        Class.forName(driver);
        conn = DriverManager.getConnection(url);
        statement = conn.createStatement();
        System.out.println("--------------------------");
        final ResultSet rs = statement.executeQuery(sql);
        while (rs.next()) {
            System.out.println(rs.getString("name"));
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        try {
            conn.close();
            statement.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

}

配置文件
cloudConfig.properties:

jdbc info

url=jdbc:phoenix:192.168.1.12:2181
driver=org.apache.phoenix.jdbc.PhoenixDriver
jdbcConnectionInitSize=10
sql=select * from "phoenixtest"

注意此时直接运行是连接不上的,为什么?
因为phoenix需要加载hbase的配置文件才能找到需要的相关参数,于是我们将hbase-site.xml放入到source 目录conf下
hbase-site.xml
<configuration>

<property>
  <name>dfs.domain.socket.path</name>
  <value>/var/lib/hadoop-hdfs/dn_socket</value>
</property>

<property>
  <name>hbase.bulkload.staging.dir</name>
  <value>/apps/hbase/staging</value>
</property>

<property>
  <name>hbase.client.keyvalue.maxsize</name>
  <value>1048576</value>
</property>

<property>
  <name>hbase.client.retries.number</name>
  <value>35</value>
</property>

<property>
  <name>hbase.client.scanner.caching</name>
  <value>100</value>
</property>

<property>
  <name>hbase.cluster.distributed</name>
  <value>true</value>
</property>

<property>
  <name>hbase.coprocessor.master.classes</name>
  <value></value>
</property>

<property>
  <name>hbase.coprocessor.region.classes</name>
  <value>org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint</value>
</property>

<property>
  <name>hbase.custom-extensions.root</name>
  <value>/hdp/ext/2.6/hbase</value>
</property>

<property>
  <name>hbase.defaults.for.version.skip</name>
  <value>true</value>
</property>

<property>
  <name>hbase.hregion.majorcompaction</name>
  <value>604800000</value>
</property>

<property>
  <name>hbase.hregion.majorcompaction.jitter</name>
  <value>0.50</value>
</property>

<property>
  <name>hbase.hregion.max.filesize</name>
  <value>10737418240</value>
</property>

<property>
  <name>hbase.hregion.memstore.block.multiplier</name>
  <value>4</value>
</property>

<property>
  <name>hbase.hregion.memstore.flush.size</name>
  <value>134217728</value>
</property>

<property>
  <name>hbase.hregion.memstore.mslab.enabled</name>
  <value>true</value>
</property>

<property>
  <name>hbase.hstore.blockingStoreFiles</name>
  <value>10</value>
</property>

<property>
  <name>hbase.hstore.compaction.max</name>
  <value>10</value>
</property>

<property>
  <name>hbase.hstore.compactionThreshold</name>
  <value>3</value>
</property>

<property>
  <name>hbase.local.dir</name>
  <value>${hbase.tmp.dir}/local</value>
</property>

<property>
  <name>hbase.master.info.bindAddress</name>
  <value>0.0.0.0</value>
</property>

<property>
  <name>hbase.master.info.port</name>
  <value>16010</value>
</property>

<property>
  <name>hbase.master.namespace.init.timeout</name>
  <value>2400000</value>
</property>

<property>
  <name>hbase.master.port</name>
  <value>16000</value>
</property>

<property>
  <name>hbase.master.ui.readonly</name>
  <value>false</value>
</property>

<property>
  <name>hbase.master.wait.on.regionservers.timeout</name>
  <value>30000</value>
</property>

<property>
  <name>hbase.region.server.rpc.scheduler.factory.class</name>
  <value>org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory</value>
</property>

<property>
  <name>hbase.regionserver.executor.openregion.threads</name>
  <value>20</value>
</property>

<property>
  <name>hbase.regionserver.global.memstore.size</name>
  <value>0.4</value>
</property>

<property>
  <name>hbase.regionserver.handler.count</name>
  <value>30</value>
</property>

<property>
  <name>hbase.regionserver.info.port</name>
  <value>16030</value>
</property>

<property>
  <name>hbase.regionserver.port</name>
  <value>16020</value>
</property>

<property>
  <name>hbase.regionserver.wal.codec</name>
  <value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>

<property>
  <name>hbase.rootdir</name>
  <value>hdfs://node3:8020/apps/hbase/data</value>
</property>

<property>
  <name>hbase.rpc.protection</name>
  <value>authentication</value>
</property>

<property>
  <name>hbase.rpc.timeout</name>
  <value>90000</value>
</property>

<property>
  <name>hbase.security.authentication</name>
  <value>simple</value>
</property>

<property>
  <name>hbase.security.authorization</name>
  <value>false</value>
</property>

<property>
  <name>hbase.superuser</name>
  <value>hbase</value>
</property>

<property>
  <name>hbase.tmp.dir</name>
  <value>/tmp/hbase-${user.name}</value>
</property>

<property>
  <name>hbase.zookeeper.property.clientPort</name>
  <value>2181</value>
</property>

<property>
  <name>hbase.zookeeper.quorum</name>
  <value>node1,node2,node3,node4,node6,node7,node8,node9,node10</value>
</property>

<property>
  <name>hbase.zookeeper.useMulti</name>
  <value>true</value>
</property>

<property>
  <name>hfile.block.cache.size</name>
  <value>0.4</value>
</property>

<property>
  <name>phoenix.functions.allowUserDefinedFunctions</name>
  <value>true</value>
</property>

<property>
  <name>phoenix.query.timeoutMs</name>
  <value>60000</value>
</property>

<property>
  <name>zookeeper.recovery.retry</name>
  <value>6</value>
</property>

<property>
  <name>zookeeper.session.timeout</name>
  <value>90000</value>
</property>

<property>
  <name>zookeeper.znode.parent</name>
  <value>/hbase-unsecure</value>
</property>

</configuration>

程序目录结构如下:


image.png

select * from "ott_deviceinfo_buffer" where "userid"='13860507270';
注意,13860507270字符串必须用单引号,userid字段必须用双引号

创建二级索引
create index idex0 on "ott_deviceinfo_buffer" ("info"."deviceid");
创建多级索引
create local index idex2 on "ott_deviceinfo_buffer" ("info"."terminalid","info"."terminalmode");
查看是否应用了二级索引
explain select * from "ott_deviceinfo_buffer" where "userid"='13860507270';


image.png

创建异步索引表
CREATE INDEX async_index ON ""ott_deviceinfo_buffer"" (v) ASYNC
通过create index的时候指定 ASYNC 关键字来指定异步创建索引。执行这个命令之后并不会引起索引表与源表的直接同步。这个时候查询并不会使用这个索引表。那么索引数据的导入还需要采用phoenix提供的索引同步工具类 IndexTool , 这是一个mapreduce工具类,使用方式如下:
${HBASE_HOME}/bin/hbase org.apache.phoenix.mapreduce.index.IndexTool
--schema MY_SCHEMA --data-table MY_TABLE --index-table ASYNC_IDX
--output-path ASYNC_IDX_HFILES
执行结束以后才会应用索引表

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容