JDBC连接池封装MaxCompute/Hive/Oracle/Mysql

有时候需要连接第三方的各种数据源,总是要去写不同的代码,于是将MaxCompute, Hive, Oracle, Mysql等JDBC连接封装起来,只需要传入不同的参数即可创建一个不同类型的连接池。

连接参数基础类封装

封装了JDBC基础的连接参数,如果不需要这些属性可以继承该类,增加新的属性即可。

@Data

public class BaseJdbcConnParam implements Serializable {

    /**

    * driver name

    */

    private String driverName;

    /**

    * IP

    */

    private String ip;

    /**

    * db server port

    */

    private Integer port;

    /**

    * db name

    */

    private String dbName;

    /**

    * db connection username

    */

    private String username;

    /**

    * db connection password

    */

    private String password;

}

抽象连接工具类封装

功能如下:

1、构造函数:根据连接参数不同构建不同的连接对象

2、构建具体的连接,子类实现buildConnection()

3、获取连接,构建好之后直接获取getConnection()

/**

* @Description 抽象连接工具类父类

* @Author itdl

* @Date 2022/08/15 09:54

*/

public abstract class AbstractConnUtil<P extends BaseJdbcConnParam> {

    /**

    * connection params

    */

    protected final P connParam;

    /**

    * jdbc connection object

    */

    protected final Connection connection;

    /**

    * 构造函数, 构造工具类对象

    * @param connParam 连接参数

    */

    public AbstractConnUtil(P connParam) {

        this.connParam = connParam;

        this.connection = buildConnection();

    }

    /**

    * 构建连接对象

    * @return 连接对象

    */

    protected abstract Connection buildConnection();

    /**

    * 获取连接

    */

    public Connection getConnection() {

        return connection;

    }

}

连接池管理

功能如下:

1、根据不同的连接参数,和最大连接数去创建一个对应类型的连接池。

2、获取连接方法,如果连接没有了,等待其他线程释放(最多等待十分钟)

3、释放连接方法,将连接放回连接池,然后唤醒等待的线程

4、关闭连接池所有的连接

/**

* @Description 连接池管理

* @Author itdl

* @Date 2022/08/16 09:42

*/

@Slf4j

public class DbConnPool<T extends BaseJdbcConnParam> {

    /**

    * 用于存放连接

    */

    private final LinkedList<Connection> connPool = new LinkedList<Connection>();

    /**

    * 最大连接池数量

    */

    private final Integer maxPoolSize;

    private final T connParam;

    /**

    * 构造函数

    * @param connParam 连接参数

    * @param maxPoolSize 连接池大小

    */

    public DbConnPool(T connParam, Integer maxPoolSize)  {

        this.maxPoolSize = maxPoolSize;

        this.connParam = connParam;

        // 初始化连接池

        for (int i = 0; i < maxPoolSize; i++) {

            connPool.addLast(this.createConnection());

        }

    }

    /**

    * 创建数据库连接

    * @return 连接

    */

    private Connection createConnection() {

        if (connParam instanceof OracleJdbcConnParam){

            final OracleConnUtil util = new OracleConnUtil((OracleJdbcConnParam) connParam);

            return util.getConnection();

        }

        if (connParam instanceof HiveJdbcConnParam){

            final HiveConnUtil util = new HiveConnUtil((HiveJdbcConnParam) connParam);

            return util.getConnection();

        }

        if (connParam instanceof MysqlJdbcConnParam){

            final MysqlConnUtil util = new MysqlConnUtil((MysqlJdbcConnParam) connParam);

            return util.getConnection();

        }

        if (connParam instanceof MaxComputeJdbcConnParam){

            final MaxComputeJdbcUtil util = new MaxComputeJdbcUtil((MaxComputeJdbcConnParam) connParam);

            return util.getConnection();

        }

        throw new BizException(ResultCode.CONN_TYPE_NOT_SUPPORT);

    }

    /**

    * 获取连接

    * @return 连接

    */

    public synchronized Connection getConnection(){

        if (connPool.size() == 0){

//            throw new BizException(ResultCode.CONN_POOL_EMPTY_ERR);

            // 最长等待十分钟

            try {

                log.info("==========连接池已经空了, 请等待其他线程释放==========");

                wait(10 * 60 * 1000);

            } catch (InterruptedException e) {

                log.info("==========连接池已经空了, 等待了10分钟还没有释放,抛出异常==========");

                e.printStackTrace();

                throw new BizException(ResultCode.CONN_POOL_EMPTY_ERR);

            }

        }

        // 去除最上面一个连接 如果没有连接了,将会抛出异常

        return connPool.removeFirst();

    }

    /**

    * 用完后释放连接

    * @param conn 要释放的连接

    */

    public synchronized void freeConnection(Connection conn){

        // 通知连接已经释放

        notifyAll();

        this.connPool.addLast(conn);

    }

    /**

    * 关闭连接池

    */

    public synchronized void close(){

        for (Connection connection : connPool) {

            SqlUtil.close(connection);

        }

    }

}

SQL操作工具类

根据连接对象Connection和数据库房源,封装不同的sql执行。执行SQL核心功能封装。

/**

* @Description SQL操作工具类

* @Author itdl

* @Date 2022/08/10 17:13

*/

@Slf4j

public class SqlUtil {

    /**查询mysql表注释sql*/

    public static final String SELECT_TABLES_MYSQL = "select table_name, table_comment from information_schema.tables where TABLE_SCHEMA = '%s'";

    /**查询MaxCompute表注释sql*/

    public static final String SELECT_TABLES_MAX_COMPUTE = "select table_name, table_comment from information_schema.tables where TABLE_SCHEMA = '%s'";

    /**查询oracle表注释sql*/

    public static final String SELECT_TABLES_ORACLE = "SELECT t2.TABLE_NAME as table_name, t2.COMMENTS as table_comment FROM user_tables t1 inner join user_tab_comments t2 on t1.TABLE_NAME = t2.TABLE_NAME";

    /**查询hive表注释sql, 先查询表名,根据表名获取建表语句,正则提取表注释*/

    public static final String SELECT_TABLES_HIVE = "show tables";

    public static final String SELECT_TABLES_2_HIVE = "describe extended %s";

    /**分页数量统计Mysql*/

    private static final String SELECT_COUNT_MYSQL = "select count(1) from (%s) z";

    /**分页数量统计MaxCompute*/

    private static final String SELECT_COUNT_MAX_COMPUTE = "select count(1) from (%s) z;";

    /**分页数量统计Hive*/

    private static final String SELECT_COUNT_ORACLE = "select count(1) from (%s) z";

    /**分页数量统计Oracle*/

    private static final String SELECT_COUNT_HIVE = "select count(1) from (%s) z";

    /**maxCompute开启全表扫描sql*/

    private static final String FULL_SCAN_MAX_COMPUTE = "set odps.sql.allow.fullscan=true;";

    /**分页查询sql-Mysql*/

    private static final String SELECT_PAGE_MYSQL = "select z.* from (%s) z limit %s, %s";

    /**分页查询sql-MaxCompute*/

    private static final String SELECT_PAGE_MAX_COMPUTE = "select z.* from (%s) z limit %s, %s;";

    /**分页查询sql-Hive*/

    private static final String SELECT_PAGE_HIVE = "select * from (select row_number() over () as row_num_01,u.* from (%s) u) mm where mm.row_num_01 between %s and %s";

    /**分页查询sql-Oracle*/

    private static final String SELECT_PAGE_ORACLE = "select * from (SELECT ROWNUM as row_num_01,z.* from (%s) z) h where h.row_num_01 > %s and h.row_num_01 <= %s";

    /**数据库连接*/

    private final Connection connection;

    /**数据库方言*/

    private final Integer dbDialect;

    /**支持的方言列表*/

    private static final List<Integer> supportDbTypes =

            Arrays.asList(DbDialectEnum.ORACLE.getCode(), DbDialectEnum.HIVE.getCode(), DbDialectEnum.MYSQL.getCode(), DbDialectEnum.MAX_COMPUTE.getCode());

    public SqlUtil(Connection connection, Integer dbDialect) {

        if (!supportDbTypes.contains(dbDialect)){

            throw new BizException(ResultCode.CONN_TYPE_NOT_SUPPORT);

        }

        this.connection = connection;

        this.dbDialect = dbDialect;

    }

    /**

    * 根据connection获取所有的表和对应的注释

    */

    public List<TableMetaInfo> getTables(String schemaName){

        List<TableMetaInfo> result = new ArrayList<>();

        String sql = "";

        switch (this.dbDialect){

            case 1:

                sql = SELECT_TABLES_ORACLE;

                break;

            case 2:

                sql = SELECT_TABLES_HIVE;

                break;

            case 3:

                if (StringUtils.isBlank(schemaName)){

                    throw new BizException(ResultCode.SELECT_TABLES_SCHEMA_NOT_NULL_ERR);

                }

                sql = String.format(SELECT_TABLES_MYSQL, schemaName);

                break;

            case 4:

                if (StringUtils.isBlank(schemaName)){

                    throw new BizException(ResultCode.SELECT_TABLES_SCHEMA_NOT_NULL_ERR);

                }

                sql = String.format(SELECT_TABLES_MAX_COMPUTE, schemaName);

            default:

                break;

        }

        if (StringUtils.isBlank(sql)){

            throw new BizException(ResultCode.CONN_TYPE_NOT_SUPPORT);

        }

        // 执行SQL语句

        final List<LinkedHashMap<String, Object>> resultMaps = querySql(sql);

        if (ObjectUtils.isEmpty(resultMaps)){

            return Lists.newArrayList();

        }

        // hive单独处理

        List<TableMetaInfo> result1 = getHiveTableMetaInfos(result, resultMaps);

        if (result1 != null) return result1;

        // 转换结果

        return resultMaps.stream().map(

                m->{

                    final TableMetaInfo info = new TableMetaInfo();

                    Object tableNameObj = m.get("table_name");

                    String tableName = tableNameObj == null ? m.get("TABLE_NAME") == null ? "" : String.valueOf(m.get("TABLE_NAME")) : String.valueOf(tableNameObj);

                    Object tableCommentObj = m.get("table_comment");

                    String tableComment = tableCommentObj == null ? m.get("TABLE_COMMENT") == null ? "" : String.valueOf(m.get("TABLE_COMMENT")) : String.valueOf(tableCommentObj);

                    info.setTableName(tableName);

                    info.setComment(tableComment);

                    return info;

                }

        ).collect(Collectors.toList());

    }

    /**

    * 根据schemeName,表名获取字段列表

    * @param tableName 一般是数据库 oracle是用户名

    */

    public List<TableColumnMetaInfo> getColumnsByTableName(String tableName){

        try {

            List<TableColumnMetaInfo> list = new ArrayList<>();

            final DatabaseMetaData metaData = connection.getMetaData();

            final ResultSet columns = metaData.getColumns(null, null, tableName, null);

            while (columns.next()){

                String columnName = columns.getString("COLUMN_NAME");

                String remarks = columns.getString("REMARKS");

                remarks = StringUtils.isBlank(remarks) ? "" : remarks;

                final TableColumnMetaInfo metaInfo = new TableColumnMetaInfo(tableName, columnName, remarks);

                list.add(metaInfo);

            }

            return list;

        } catch (SQLException e) {

            e.printStackTrace();

            return Lists.newArrayList();

        }

    }

    /**

    * 执行sql查询

    * @param querySql 查询sql

    * @return List<Map<String, Object>> 通过LinkedHashMap接受,序列化时可保证顺序一致

    */

    public List<LinkedHashMap<String, Object>> queryData(String querySql, boolean... fullScan){

        Statement statement = null;

        ResultSet resultSet = null;

        try {

            // 创建statement

            statement = this.connection.createStatement();

            // 执行全表扫描sql

            for (boolean b : fullScan) {

                if (b){

                    statement.execute(FULL_SCAN_MAX_COMPUTE);

                    break;

                }

            }

            // 执行查询语句

            resultSet = statement.executeQuery(querySql);

            // 构建结果返回

            return buildListMap(resultSet);

        } catch (SQLException e) {

            e.printStackTrace();

            throw new BizException(ResultCode.SQL_EXEC_ERR);

        } finally {

            // 关闭resultSet, statement

            close(resultSet, statement);

        }

    }

    /**

    * 执行sql查询

    * @param querySql 查询sql

    * @return List<Map<String, Object>>

    */

    public List<LinkedHashMap<String, Object>> queryData(String querySql, Integer page, Integer size){

        Statement statement = null;

        ResultSet resultSet = null;

        try {

            // 1、替换分号

            querySql = querySql.replaceAll(";", "");

            // 创建statement

            statement = this.connection.createStatement();

            // 2、格式化SQL

            int offset = (page - 1 ) * size;

            String execSql = "";

            switch (this.dbDialect){

                case 1:

                    // oracle

                    execSql = String.format(SELECT_PAGE_ORACLE, querySql, offset, size);

                    break;

                case 2:

                    // hive

                    execSql = String.format(SELECT_PAGE_HIVE, querySql, offset, size);

                    break;

                case 3:

                    // mysql

                    execSql = String.format(SELECT_PAGE_MYSQL, querySql, offset, size);

                    break;

                case 4:

                    // maxCompute

                    execSql = String.format(SELECT_PAGE_MAX_COMPUTE, querySql, offset, size);

                    break;

                default:

                    break;

            }

            // maxCompute开启全表扫描

            if (DbDialectEnum.MAX_COMPUTE.getCode().equals(this.dbDialect)){

                statement.execute(FULL_SCAN_MAX_COMPUTE);

            }

            log.info("=======>>>执行分页sql为:{}", execSql);

            // 执行查询语句

            resultSet = statement.executeQuery(execSql);

            // 构建结果返回

            return buildListMap(resultSet);

        } catch (SQLException e) {

            e.printStackTrace();

            throw new BizException(ResultCode.SQL_EXEC_ERR);

        } finally {

            // 关闭resultSet, statement

            close(resultSet, statement);

        }

    }

    /**

    * 执行分页查询

    * @param querySql 分页查询sql

    * @param page 页码 从1开始 第n页传n

    * @param size 每页记录数

    * @return 分页查询结果

    */

    public PageResult<LinkedHashMap<String, Object>> pageQueryMap(String querySql, Integer page, Integer size){

        // 1、替换分号

        querySql = querySql.replaceAll(";", "");

        String countSql = "";

        switch (this.dbDialect){

            case 1:

                // oracle

                countSql = String.format(SELECT_COUNT_ORACLE, querySql);

                break;

            case 2:

                // hive

                countSql = String.format(SELECT_COUNT_HIVE, querySql);

                break;

            case 3:

                // mysql

                countSql = String.format(SELECT_COUNT_MYSQL, querySql);

                break;

            case 4:

                // maxCompute

                countSql = String.format(SELECT_COUNT_MAX_COMPUTE, querySql);

                break;

            default:

                break;

        }

        log.info("=======>>>执行分页统计总数sql为:{}", countSql);

        // 查询总数

        final List<LinkedHashMap<String, Object>> countMap = queryData(countSql, DbDialectEnum.MAX_COMPUTE.getCode().equals(this.dbDialect));

        if (CollectionUtils.isEmpty(countMap)){

            return new PageResult<>(0L, new ArrayList<>());

        }

        long count = 0L;

        for (Object value : countMap.get(0).values()) {

            count = Long.parseLong(String.valueOf(value));

        }

        if (count == 0){

            return new PageResult<>(0L, new ArrayList<>());

        }

        // 执行分页查询 开启全表扫描

        final List<LinkedHashMap<String, Object>> resultList = queryData(querySql, page, size);

        return new PageResult<>(count, resultList);

    }

    /**

    * 执行分页查询

    * @param querySql 分页查询sql

    * @param page 页码 从1开始 第n页传n

    * @param size 每页记录数

    * @return 分页查询结果

    */

    public <T>PageResult<T> pageQuery(String querySql, Integer page, Integer size, Class<T> clazz){

        final PageResult<LinkedHashMap<String, Object>> result = pageQueryMap(querySql, page, size);

        List<T> rows = new ArrayList<>();

        for (LinkedHashMap<String, Object> row : result.getRows()) {

            final T t = JSONObject.parseObject(JSONObject.toJSONString(row), clazz);

            rows.add(t);

        }

        return new PageResult<>(result.getTotal(), rows);

    }

    /**

    * 获取hive的表注释

    * @param result 结果

    * @param resultMaps show tables结果

    * @return List<TableMetaInfo>

    */

    private List<TableMetaInfo> getHiveTableMetaInfos(List<TableMetaInfo> result, List<LinkedHashMap<String, Object>> resultMaps) {

        if (dbDialect.equals(DbDialectEnum.HIVE.getCode())){

            for (LinkedHashMap<String, Object> resultMap : resultMaps) {

                final String tabName = String.valueOf(resultMap.get("tab_name"));

                final String descTableCommentSql = String.format(SELECT_TABLES_2_HIVE, tabName);

                List<LinkedHashMap<String, Object>> resultMapsComments = querySql(descTableCommentSql);

//                col_name -> Detailed Table Information

                String comments = resultMapsComments.stream()

                        .filter(m -> "Detailed Table Information".equals(m.get("col_name")))

                        .map(m -> String.valueOf(m.get("data_type"))).findFirst()

                        .orElse("");

                comments = ReUtil.get("parameters:\\{(?!.*?\\().*transient_lastDdlTime.*?comment=(.*?)\\}", comments,1);

                if (StringUtils.isBlank(comments)) {

                    comments = "";

                }

                if (comments.contains(",")){

                    comments = comments.substring(0, comments.lastIndexOf(","));

                }

                result.add(new TableMetaInfo(tabName, comments));

                log.info("===========>>>获取表{}的注释成功:{}", tabName, comments);

                resultMapsComments.clear();

            }

            return result;

        }

        return null;

    }

    /**

    * 执行SQL查询

    * @param sql sql语句

    * @return 数据列表,使用LinkedHashMap是为了防止HashMap序列化后导致顺序乱序

    */

    public List<LinkedHashMap<String, Object>> querySql(String sql){

        // 执行sql

        Statement statement = null;

        ResultSet resultSet = null;

        try {

            statement = connection.createStatement();

            resultSet = statement.executeQuery(sql);

            return buildListMap(resultSet);

        } catch (SQLException e) {

            e.printStackTrace();

            throw new BizException(ResultCode.SQL_EXEC_ERR);

        }finally {

            // 关闭

            close(resultSet, statement);

        }

    }

    /**

    * 关闭对象 传入多个时注意顺序, 需要先关闭哪个就传在参数前面

    * @param objs 对象动态数组

    */

    public static void close(Object ...objs){

        if (objs == null || objs.length == 0){

            return;

        }

        for (Object obj : objs) {

            if (obj instanceof Statement){

                try {

                    ((Statement) obj).close();

                }catch (Exception e){

                    e.printStackTrace();

                }

            }

            if (obj instanceof ResultSet){

                try {

                    ((ResultSet) obj).close();

                }catch (Exception e){

                    e.printStackTrace();

                }

            }

            if (obj instanceof Connection){

                try {

                    ((Connection) obj).close();

                }catch (Exception e){

                    e.printStackTrace();

                }

            }

        }

    }

    /**

    * @Description 功能描述:将resultSet构造为List<Map>

    * @Author itdl

    * @Date 2022/4/18 21:13

    * @Param {@link ResultSet} resultSet

    * @Return {@link List < Map <String,Object>>}

    **/

    private List<LinkedHashMap<String, Object>> buildListMap(ResultSet resultSet) throws SQLException {

        if (resultSet == null) {

            return Lists.newArrayList();

        }

        List<LinkedHashMap<String, Object>> resultList = new ArrayList<>();

        // 获取元数据

        ResultSetMetaData metaData = resultSet.getMetaData();

        while (resultSet.next()) {

            // 获取列数

            int columnCount = metaData.getColumnCount();

            LinkedHashMap<String, Object> map = new LinkedHashMap<>();

            for (int i = 0; i < columnCount; i++) {

                String columnName = metaData.getColumnName(i + 1);

                // 过滤掉查询的结果包含序号的

                if("mm.row_num_01".equalsIgnoreCase(columnName)

                        || "row_num_01".equalsIgnoreCase(columnName)){

                    continue;

                }

                // 去除hive查询结果的mm.别名前缀

                if (columnName.startsWith("mm.")){

                    columnName = columnName.substring(columnName.indexOf(".") + 1);

                }

                Object object = resultSet.getObject(columnName);

                // maxCompute里面的空返回的是使用\n

                if ("\\N".equalsIgnoreCase(String.valueOf(object))) {

                    map.put(columnName, "");

                } else {

                    map.put(columnName, object);

                }

            }

            resultList.add(map);

        }

        return resultList;

    }

}

MaxCompute JDBC连接池封装

MaxCompute 已经有了JDBC连接方式 也就是 odbc-jdbc, 最终能够获取一个Connection. 官方文档:https://help.aliyun.com/document_detail/161246.html

封装MaxCompute JDBC连接参数

/**

* @author itdl

* @description maxCompute使用JDBC的连接参数

* @date 2022/08/08 10:07

*/

@Data

public class MaxComputeJdbcConnParam extends BaseJdbcConnParam{

    /**阿里云accessId 相当于用户名 */

    private String aliyunAccessId;

    /**阿里云accessKey 相当于密码 */

    private String aliyunAccessKey;

    /** maxcompute_endpoint */

    private String endpoint;

    /**项目名称*/

    private String projectName;

}

封装MaxCompute JDBC连接实现类

就是实现父类AbstractConnUtil,实现抽象方法buildConnection

/**

* @Description maxCompute JDBC连接实现

* @Author itdl

* @Date 2022/08/08 14:26

*/

@Slf4j

public class MaxComputeJdbcUtil extends AbstractConnUtil<MaxComputeJdbcConnParam>{

    /**JDBC 驱动名称*/

    private static final String DRIVER_NAME = "com.aliyun.odps.jdbc.OdpsDriver";

    /**

    * 构造函数, 构造工具类对象

    *

    * @param connParam 连接参数

    */

    public MaxComputeJdbcUtil(MaxComputeJdbcConnParam connParam) {

        super(connParam);

    }

    @Override

    protected Connection buildConnection() {

        return buildConn();

    }

    /**

    * 创建连接

    * @return 数据库连接

    */

    private Connection buildConn() {

        try {

            Class.forName(DRIVER_NAME);

        } catch (ClassNotFoundException e) {

            e.printStackTrace();

            throw new BizException(ResultCode.MAX_COMPUTE_DRIVE_LOAD_ERR);

        }

        try {

            Properties dbProperties = new Properties();

            dbProperties.put("user", connParam.getAliyunAccessId());

            dbProperties.put("password", connParam.getAliyunAccessKey());

            dbProperties.put("remarks", "true");

            // JDBCURL连接模板

            String jdbcUrlTemplate = "jdbc:odps:%s?project=%s&useProjectTimeZone=true";

            // 使用驱动管理器连接获取连接

            return DriverManager.getConnection(

                    String.format(jdbcUrlTemplate, connParam.getEndpoint(), connParam.getProjectName()), dbProperties);

        } catch (SQLException e) {

            e.printStackTrace();

            throw new BizException(ResultCode.CONN_USER_PWD_ERR);

        }

    }

}

连接测试代码一起放在结尾,将会开启多个线程获取连接,然后去获取表名,表注释,字段名,字段注释,传入page, size和普通sql就可以实现分页查询的封装方法

Hive JDBC连接池封装

Hive JDBC连接参数

Hive连接参数封装,除了基础的JDBC所需字段,还需要kerberos相关字段,因为hive开启kerberos认证后,需要使用kertab密钥文件和kbr5.conf配置文件去认证。将会在参数和测试代码中得到重复的体现。

/**

* @Description Hive JDBC connection params

* @Author itdl

* @Date 2022/08/10 16:40

*/

@Data

@EqualsAndHashCode(callSuper = false)

public class HiveJdbcConnParam extends BaseJdbcConnParam {

    /**

    * enable kerberos authentication

    */

    private boolean enableKerberos;

    /**

    * principal

    */

    private String principal;

    /**

    * kbr5 file path in dick

    */

    private String kbr5FilePath;

    /**

    * keytab file path in dick

    */

    private String keytabFilePath;

}

Hive JDBC获取连接实现

Hive获取JDBC连接之后,本来可以从Connection的元数据中获取表的注释,但是获取的中文注释居然是乱码,但是我们Hue上查看表注释又是正常,暂时没找到这种方式如何解决,从而退而求其次,通过表名去获取建表语句,从建表语句中通过正则表达式提取表的注释。

/**

* @Description hive connection util

* @Author itdl

* @Date 2022/08/10 16:52

*/

@Slf4j

public class HiveConnUtil extends AbstractConnUtil<HiveJdbcConnParam>{

    public HiveConnUtil(HiveJdbcConnParam connParam) {

        super(connParam);

    }

    /**

    * 获取连接

    * @return 连接

    */

    public Connection getConnection() {

        return connection;

    }

    @Override

    protected Connection buildConnection(){

        try {

//            Class.forName("org.apache.hive.jdbc.HiveDriver");

            Class.forName(connParam.getDriverName());

        } catch (ClassNotFoundException e) {

            e.printStackTrace();

            throw new BizException(ResultCode.HIVE_DRIVE_LOAD_ERR);

        }

        // 开启kerberos后需要私钥

        // 拼接jdbcUrl

        String jdbcUrl = "jdbc:hive2://%s:%s/%s";

        String ip = connParam.getIp();

        String port = connParam.getPort() + "";

        String dbName = connParam.getDbName();

        final String username = connParam.getUsername();

        final String password = connParam.getPassword();

        // is enable kerberos authentication

        final boolean enableKerberos = connParam.isEnableKerberos();

        // 格式化

        Connection connection;

        // 获取连接

        try {

            Properties dbProperties = new Properties();

            dbProperties.put("user", username);

            dbProperties.put("password", password);

            // 加上remark后, 能够获取到标注释 但是会出现中文乱码

            dbProperties.put("remarks", "true");

            if (!enableKerberos) {

                jdbcUrl = String.format(jdbcUrl, ip, port, dbName);

                connection = DriverManager.getConnection(jdbcUrl, dbProperties);

            } else {

                final String principal = connParam.getPrincipal();

                final String kbr5FilePath = connParam.getKbr5FilePath();

                final String secretFilePath = connParam.getKeytabFilePath();

                String format = "jdbc:hive2://%s:%s/%s;principal=%s";

                jdbcUrl = String.format(format, ip, port, dbName, principal);

                // 使用hadoop安全认证

                System.setProperty("java.security.krb5.conf", kbr5FilePath);

                System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");

                // 解决windows中执行可能出现找不到HADOOP_HOME或hadoop.home.dir问题

                // Kerberos认证

                org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();

                conf.set("hadoop.security.authentication", "Kerberos");

                conf.set("keytab.file", secretFilePath);

                conf.set("kerberos.principal", principal);

                UserGroupInformation.setConfiguration(conf);

                try {

                    UserGroupInformation.loginUserFromKeytab(username, secretFilePath);

                } catch (IOException e) {

                    e.printStackTrace();

                    throw new BizException(ResultCode.KERBEROS_AUTH_FAIL_ERR);

                }

                try {

                    connection = DriverManager.getConnection(jdbcUrl, dbProperties);

                } catch (SQLException e) {

                    e.printStackTrace();

                    throw new BizException(ResultCode.KERBEROS_AUTH_SUCCESS_GET_CONN_FAIL_ERR);

                }

            }

            log.info("=====>>>获取hive连接成功:username:{},jdbcUrl: {}", username, jdbcUrl);

            return connection;

        } catch (SQLException e) {

            e.printStackTrace();

            throw new BizException(ResultCode.HIVE_CONN_USER_PWD_ERR);

        } catch (BizException e){

            throw e;

        }

        catch (Exception e) {

            e.printStackTrace();

            throw new BizException(ResultCode.HIVE_CONN_ERR);

        }

    }

}

Oracle JDBC连接参数封装

只需要继承父类即可

/**

* @Description Oracle连接的JDBC参数

* @Author itdl

* @Date 2022/08/15 09:50

*/

public class OracleJdbcConnParam extends BaseJdbcConnParam{


}

Oracle JDBC连接实现类

包括了普通用户的认证和dba用户的认证

/**

* @Description Oracle获取jdbc连接工具类

* @Author itdl

* @Date 2022/08/15 09:52

*/

@Slf4j

public class OracleConnUtil extends AbstractConnUtil<OracleJdbcConnParam> {

    /**

    * 构造函数, 构造工具类对象

    *

    * @param connParam 连接参数

    */

    public OracleConnUtil(OracleJdbcConnParam connParam) {

        super(connParam);

    }

    @Override

    protected Connection buildConnection() {

        try {

            Class.forName("oracle.jdbc.driver.OracleDriver");

        } catch (ClassNotFoundException e) {

            e.printStackTrace();

            throw new BizException(ResultCode.ORACLE_DRIVE_LOAD_ERR);

        }

        // 拼接jdbcUrl

        String jdbcUrl = "jdbc:oracle:thin:@//%s:%s/%s";

        final String ip = connParam.getIp();

        final String port = connParam.getPort() + "";

        final String dbName = connParam.getDbName();

        final String username = connParam.getUsername();

        final String password = connParam.getPassword();

        // 格式化

        jdbcUrl = String.format(jdbcUrl, ip, port, dbName);

        // 获取连接

        Connection connection;

        try {

            Properties dbProperties = new Properties();

            // 用户名 如果是dba,则后面跟了as sysdba

            String dba = "as sysdba";

            dbProperties.put("password", password);

            dbProperties.put("remarks", "true");

            if (username.trim().endsWith(dba)) {

                dbProperties.put("user", username.trim().substring(0, username.trim().indexOf(dba) - 1));

                dbProperties.put("defaultRowPrefetch", "15");

                dbProperties.put("internal_logon", "sysdba");

                connection = DriverManager.getConnection(jdbcUrl, dbProperties);

            } else {

                dbProperties.put("user", username);

                connection = DriverManager.getConnection(jdbcUrl, dbProperties);

            }

            log.info("=====>>>获取oracle连接成功:username:{}, jdbcUrl: {}", username, jdbcUrl);

            return connection;

        } catch (SQLException e) {

            e.printStackTrace();

            if (e.getMessage().contains("TNS:listener")) {

                throw new BizException(ResultCode.CONN_LISTENER_UNKNOWN_ERR);

            }

            if (e.getMessage().contains("ORA-01017")) {

                throw new BizException(ResultCode.CONN_USER_PWD_ERR);

            }

            if (e.getMessage().contains("IO 错误: Got minus one from a read call")) {

                throw new BizException(ResultCode.CONN_CONN_TOO_MANY_ERR);

            }

            throw new BizException(ResultCode.CONN_UNKNOWN_ERR);

        } catch (Exception e) {

            throw new BizException(ResultCode.CONN_UNKNOWN_ERR);

        }

    }

}

Mysql JDBC连接池封装

Mysql JDBC连接参数封装

只需要继承父类即可

/**

* @Description Mysql连接的JDBC参数

* @Author itdl

* @Date 2022/08/15 09:50

*/

public class MysqlJdbcConnParam extends BaseJdbcConnParam{


}

Mysql JDBC连接实现

需要注意的是连接的属性里面配置useInformationSchema=true,表示可以直接从Connection中获取表和字段的注释。

/**

* @Description Mysql获取jdbc连接工具类

* @Author itdl

* @Date 2022/08/15 09:52

*/

@Slf4j

public class MysqlConnUtil extends AbstractConnUtil<MysqlJdbcConnParam> {

    /**

    * 构造函数, 构造工具类对象

    *

    * @param connParam 连接参数

    */

    public MysqlConnUtil(MysqlJdbcConnParam connParam) {

        super(connParam);

    }

    @Override

    protected Connection buildConnection() {

        try {

            Class.forName("com.mysql.cj.jdbc.Driver");

        } catch (ClassNotFoundException e) {

            e.printStackTrace();

            throw new BizException(ResultCode.MYSQL_DRIVE_LOAD_ERR);

        }

        // 拼接jdbcUrl

        String jdbcUrl = "jdbc:mysql://%s:%s/%s?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8";

        final String ip = connParam.getIp();

        final String port = connParam.getPort() + "";

        final String dbName = connParam.getDbName();

        final String username = connParam.getUsername();

        final String password = connParam.getPassword();

        // 格式化

        jdbcUrl = String.format(jdbcUrl, ip, port, dbName);

        // 获取连接

        try {

            Properties dbProperties = new Properties();

            dbProperties.put("user", username);

            dbProperties.put("password", password);

            dbProperties.put("remarks", "true");

            // 设置可以获取tables remarks信息

            dbProperties.setProperty("useInformationSchema", "true");

            Connection connection = DriverManager.getConnection(jdbcUrl,dbProperties);

            log.info("=====>>>获取mysql连接成功:username:{}, jdbcUrl: {}", username, jdbcUrl);

            return connection;

        } catch (SQLException e) {

            e.printStackTrace();

            if (e.getMessage().contains("Unknown database")){

                throw new BizException(ResultCode.CONN_UNKNOWN_DB_ERR);

            }

            throw new BizException(ResultCode.CONN_USER_PWD_ERR);

        } catch (Exception e) {

            throw new BizException(ResultCode.CONN_UNKNOWN_ERR);

        }

    }

}

测试代码连接各自数据库

@SpringBootTest(classes = DbConnectionDemoApplication.class)

@RunWith(value = SpringRunner.class)

@Slf4j

class DbConnectionDemoApplicationTests {

    private DbConnPool<?> connPool = null;

    @Test

    public void testMysqlConn() throws InterruptedException {

        // 创建连接参数

        final MysqlJdbcConnParam connParam = new MysqlJdbcConnParam();

        final String ip = "localhost";

        final Integer port = 3306;

        final String username = "root";

        final String password = "root";

        final String dbname = "test_db";

        // 设置参数

        connParam.setDriverName(Driver.class.getName());

        connParam.setIp(ip);

        connParam.setPort(port);

        connParam.setUsername(username);

        connParam.setPassword(password);

        connParam.setDbName(dbname);

        // 创建连接池

        connPool = new DbConnPool<>(connParam, 2);

        handler01(dbname, DbDialectEnum.MYSQL);

        new Thread(() -> handler01(dbname, DbDialectEnum.MYSQL)).start();

        new Thread(() -> handler01(dbname, DbDialectEnum.MYSQL)).start();

        Thread.sleep(60 * 1000);

    }

    @Test

    public void testOracleConn() throws InterruptedException {

        // 创建连接参数

        final OracleJdbcConnParam connParam = new OracleJdbcConnParam();

        final String ip = "你的Oracle的IP地址";

        final Integer port = 1521;

        // 如果是admin账号 用户后面+ as sysdba

        final String username = "用户名";

        final String password = "密码";

        final String dbname = "实例/服务名";

        // 设置参数

        connParam.setDriverName(Driver.class.getName());

        connParam.setIp(ip);

        connParam.setPort(port);

        connParam.setUsername(username);

        connParam.setPassword(password);

        connParam.setDbName(dbname);

        // 创建连接池

        connPool = new DbConnPool<>(connParam, 2);

        final DbDialectEnum dbDialectEnum = DbDialectEnum.ORACLE;

        // 处理操作(oracle的schemaName就是用户名)

        handler01(username, dbDialectEnum);

        // 新建两个线程获取连接

        new Thread(() -> handler01(username, dbDialectEnum)).start();

        new Thread(() -> handler01(username, dbDialectEnum)).start();

        Thread.sleep(60 * 1000);

    }

    @Test

    public void testHiveConn() throws InterruptedException {

        // 创建连接参数

        final HiveJdbcConnParam connParam = new HiveJdbcConnParam();

        final String ip = "连接的域名";

        final Integer port = 10000;

        // 如果是admin账号 用户后面+ as sysdba

        final String username = "账号@域名";

        final String password = "";

        final String dbname = "数据库名";

        final String principal = "hive/_HOST@域名";

        final String kbr5FilePath = "C:\\workspace\\krb5.conf";

        final String keytabFilePath = "C:\\workspace\\zhouyu.keytab";

        // 设置参数

        connParam.setDriverName(Driver.class.getName());

        connParam.setIp(ip);

        connParam.setPort(port);

        connParam.setUsername(username);

        connParam.setPassword(password);

        connParam.setDbName(dbname);

        connParam.setEnableKerberos(true);

        connParam.setPrincipal(principal);

        connParam.setKbr5FilePath(kbr5FilePath);

        connParam.setKeytabFilePath(keytabFilePath);

        // 创建连接池

        connPool = new DbConnPool<>(connParam, 2);

        final DbDialectEnum dbDialectEnum = DbDialectEnum.HIVE;

        // 处理操作(oracle的schemaName就是用户名)

        handler01(username, dbDialectEnum);

        // 新建两个线程获取连接

        new Thread(() -> handler01(username, dbDialectEnum)).start();

        new Thread(() -> handler01(username, dbDialectEnum)).start();

        Thread.sleep(10 * 60 * 1000);

    }

    @Test

    public void testMaxComputeConn() throws InterruptedException {

        // 创建连接参数

        final MaxComputeJdbcConnParam connParam = new MaxComputeJdbcConnParam();

        String accessId = "你的阿里云accessId";

        String accessKey = "你的阿里云accessKey";

        String endpoint = "http://service.cn-chengdu.maxcompute.aliyun.com/api";

        String projectName = "项目名=数据库名";

        // 设置参数

        connParam.setDriverName(Driver.class.getName());

        connParam.setAliyunAccessId(accessId);

        connParam.setAliyunAccessKey(accessKey);

        connParam.setEndpoint(endpoint);

        connParam.setProjectName(projectName);

        // 创建连接池

        connPool = new DbConnPool<>(connParam, 2);

        final DbDialectEnum dbDialectEnum = DbDialectEnum.MAX_COMPUTE;

        // 处理操作(oracle的schemaName就是用户名)

        handler01(projectName, dbDialectEnum);

        // 新建两个线程获取连接

        new Thread(() -> handler01(projectName, dbDialectEnum)).start();

        new Thread(() -> handler01(projectName, dbDialectEnum)).start();

        Thread.sleep(60 * 1000);

    }

    private void handler01(String schemaName, DbDialectEnum dbDialectEnum) {

        final Connection connection = connPool.getConnection();

        // 构建工具类

        final SqlUtil sqlUtil = new SqlUtil(connection, dbDialectEnum.getCode());

        // 获取表和注释

        final List<TableMetaInfo> tables = sqlUtil.getTables(schemaName);

        log.info("===============获取所有表和注释开始===================");

        log.info(tables.toString());

        log.info("===============获取所有表和注释结束===================");

        // 获取字段和注释

        final String tableName = tables.get(0).getTableName();

        final List<TableColumnMetaInfo> columns = sqlUtil.getColumnsByTableName(tableName);

        log.info("===============获取第一个表的字段和注释开始===================");

        log.info(columns.toString());

        log.info("===============获取第一个表的字段和注释结束===================");

        final PageResult<LinkedHashMap<String, Object>> pageResult = sqlUtil.pageQueryMap("select * from " + tableName, 1, 10);

        log.info("===============SQL分页查询开始===================");

        log.info("总数:{}", pageResult.getTotal());

        log.info("记录数:{}", JSONObject.toJSONString(pageResult.getRows()));

        log.info("===============SQL分页查询结束===================");

        connPool.freeConnection(connection);

    }

    @After

    public void close(){

        if (connPool != null){

            connPool.close();

            log.info("==================连接池成功关闭================");

        }

    }

}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容