基于TBDS的数据迁移(postgreSQL到hive)第二部分

一、tbds工作流xml文件

1.文件头:workflow_head.xml

<BatchFolderWorkflows>
  <folderWorkFlows>
    <FolderWorkflow>
      <folder>
        <id>$folderId</id>
        <name>$folderName</name>
      </folder>
        <workFlows>

2.文件肚:workflow_belly.xml


        <workFlow>
          <workflowId>$workflowId</workflowId>
          <workflowName>$workflowName</workflowName>
          <workflowDesc>$workflowDesc</workflowDesc>
          <projectId>4</projectId>
          <projectIdent>tbase_hive</projectIdent>
          <projectName>tbase_hive</projectName>
          <ownerId>4</ownerId>
          <owner>dsjj_user01</owner>
          <createTime class="sql-timestamp">$dateNow</createTime>
          <modifyTime class="sql-timestamp">$dateNow</modifyTime>
          <tasks>
            <task>
              <isAdmin>false</isAdmin>
              <taskId>$taskId1</taskId>
              <workflowId>$workflowId</workflowId>
              <workflowName>$workflowName</workflowName>
              <taskStatus>2</taskStatus>
              <taskName>$taskName1</taskName>
              <taskTypeId>70</taskTypeId>
              <taskTypeName>HIVE SQL脚本</taskTypeName>
              <ownerId>4</ownerId>
              <owner>dsjj_user01</owner>
              <resourceQueue>tbase_hive</resourceQueue>
              <cycle>D</cycle>
              <dataStartTime class="sql-timestamp">$dateYesZero</dataStartTime>
              <endTime class="sql-timestamp">2025-12-30 00:00:00</endTime>
              <startMin>0</startMin>
              <startHour>0</startHour>
              <startDay>0</startDay>
              <selfDepend>2</selfDepend>
              <properties>{"retriable":"1","endDate":"1657987200000","source_server":"Hive_Cluster","sql.file.name":"$sqlFileName","task_priority":"6","tryLimit":"5","cycleNum":"1","brokerIp":"any"}</properties>
              <createTime class="sql-timestamp">$dateNow</createTime>
              <creator>dsjj_user01</creator>
              <modifyTime class="sql-timestamp">$dateNow</modifyTime>
              <projectId>4</projectId>
              <projectIdent>tbase_hive</projectIdent>
              <projectName>tbase_hive</projectName>
              <webCanvas>left: 6px; top: 255px;</webCanvas>
              <category>lhotse</category>
              <realTaskId>20200721165909020</realTaskId>
              <managed>false</managed>
            </task>
            <task>
              <isAdmin>false</isAdmin>
              <taskId>$taskId2</taskId>
              <workflowId>$workflowId</workflowId>
              <workflowName>$workflowName</workflowName>
              <taskStatus>2</taskStatus>
              <taskName>$taskName2</taskName>
              <taskTypeId>119</taskTypeId>
              <taskTypeName>PG到HDFS</taskTypeName>
              <ownerId>4</ownerId>
              <owner>dsjj_user01</owner>
              <resourceQueue>tbase_hive</resourceQueue>
              <cycle>D</cycle>
              <dataStartTime class="sql-timestamp">$dateYesZero</dataStartTime>
              <endTime class="sql-timestamp">2025-12-30 00:00:00</endTime>
              <startMin>0</startMin>
              <startHour>0</startHour>
              <startDay>0</startDay>
              <selfDepend>2</selfDepend>
              <properties>{"endDate":"1657814400000","retriable":"1","source_server":"$sourceServer","target_server":"HDFS_CLUSTER","sql":"$sqlStatements","target.path":"$targetPath","charset.encoding":"UTF-8","separator":"001","ignore.empty.datasource":"true","errorThreshold":"10000","readConcurrency":"1","writeConcurrency":"4","task_priority":"6","tryLimit":"5","cycleNum":"1","brokerIp":"any"}</properties>
              <createTime class="sql-timestamp">$dateNow</createTime>
              <creator>dsjj_user01</creator>
              <modifyTime class="sql-timestamp">$dateNow</modifyTime>
              <projectId>4</projectId>
              <projectIdent>tbase_hive</projectIdent>
              <projectName>tbase_hive</projectName>
              <webCanvas>left: 0px; top: 62px;</webCanvas>
              <category>lhotse</category>
              <realTaskId>20200721165559644</realTaskId>
              <managed>false</managed>
            </task>
          </tasks>
          <edges>
            <edge>
              <isAdmin>false</isAdmin>
              <sourceTaskId>$taskId2</sourceTaskId>
              <targetTaskId>$taskId1</targetTaskId>
              <bridgeId>$bridgeId</bridgeId>
              <bridgeType>lhotse_lhotse</bridgeType>
              <status>2</status>
            </edge>
          </edges>
        </workFlow>


3.文件尾:workflow_last.xml


        </workFlows>
    </FolderWorkflow>
  </folderWorkFlows>
</BatchFolderWorkflows>

二、xml文件占位符

    public static String folderId = null;
    public static String folderName = null;
    public static String workflowId = null;
    public static String workflowName = null;
    public static String workflowDesc = null;
    public static String dateNow = null;
    public static String taskId1 = null;
    public static String taskName1 = null;
    public static String dateYesZero = null;
    public static String sqlFileName = null;
    public static String taskId2 = null;
    public static String taskName2 = null;
    public static String sourceServer = null;
    public static String sqlStatements = null;
    public static String targetPath = null;
    public static String bridgeId = null;

三、postgresql导出的表结构格式

1.导出表结构的sql

select 
t1.relname as table_name,
t3.attname as field,
format_type(t3.atttypid,t3.atttypmod) as field_type, 
case when t3.atttypid=1043 then '是' when t3.atttypid=1042 then '是' when t3.atttypid=25 then '是' else '否' end as is_string,
t3.attnum field_number,
t4.description field_comment,
t5.description table_comment
from pg_class t1
left join pg_user t2 on t1.relowner=t2.usesysid
left join pg_attribute t3 on t3.attrelid=t1.oid
left join pg_description t4 on t4.objsubid=t3.attnum and t4.objoid=t1.oid
left join pg_description t5 on t5.objoid=t1.oid and t5.objsubid=0
where t2.usename='cqsswj' 
and t1.relname in(select tablename from pg_tables where schemaname='cqsswj' and tableowner='cqsswj') and t1.relname not in('cqdc_ggdmb','cqdc_sjdzb','cqdc_sjgxbs')
and attnum>0
order by relname, attnum

2.数据格式


将数据导入到bmk_second中

四、代码部分

代码结构图:


代码结构图1
代码结构图2
  • pom.xml
    <dependencies>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
            <type>pom</type>
        </dependency>
    </dependencies>
mysql java连接包

1.jdbc.properties

#mysql
jdbc.driverClassName=com.mysql.jdbc.Driver
jdbc.url=jdbc:mysql://hadoop102:3306/bmk?useSSH=true&characterEncoding=utf-8
jdbc.username=root
jdbc.password=111111

2.mysql连接
MysqlConnection.java

package com.dfjx.postgresql.pg2hive.jdbc;

import java.io.FileInputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Properties;

/**
 * mysql连接
 */
public class MysqlConnection {
    /**
     * 功能:获取mysql连接非静态方法
     *
     * @return
     */
    public Connection getConnection(){
        //初始化变量
        String driver=null;
        String url=null;
        String username=null;
        String password=null;
        Connection conn=null;

        try {
            //从配置文件中获取配置
            Properties properties = new Properties();
            //获取配置文件路径
            String path = Thread.currentThread().getContextClassLoader().getResource("jdbc.properties").getPath();
            //初始化文件输入流
            FileInputStream in = null;
            in = new FileInputStream(path);
            properties.load(in);

            //获取配置文件中的信息,放入到对应的变量中
            driver=properties.getProperty("jdbc.driverClassName");
            url=properties.getProperty("jdbc.url");
            username=properties.getProperty("jdbc.username");
            password=properties.getProperty("jdbc.password");
            //关闭文件输入流
            in.close();
            //加载驱动
            Class.forName(driver);
            //连接数据库
            conn = DriverManager.getConnection(url, username, password);
            System.out.println( "数据库连接为:" + conn);
        }catch (Exception e) {
            e.printStackTrace();
        }
        return conn;
    }

    /**
     * 测试mysql连接
     */
/*    public static void main(String[] args) {
        Connection connection = null;
        MysqlConnection mysqlConn2 = new MysqlConnection();
        connection = mysqlConn2.getConnection();
        System.out.println(connection+"========");
    }*/
}

3.读取流返回字节数组
BytesStream.java

package com.dfjx.postgresql.pg2hive.utils;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;

public class BytesStream {
    /**
     * 功能:读取输入流,返回byte[]数组
     * @param instream
     * @return
     * @throws IOException
     */
    public static byte[] read(InputStream instream) throws IOException
    {
        //字节数组输出流
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        //缓冲区
        byte[] buffer = new byte[1024];
        int len = 0;
        //将instream流中的内容读到buffer缓冲区,写到字节数组输出流中
        while ((len = instream.read(buffer)) != -1)
        {
            bos.write(buffer, 0, len);
        }
        //返回值:字节数组
        return bos.toByteArray();
    }
}

4.文件写类
FileWrite.java

package com.dfjx.postgresql.pg2hive.utils;
import java.io.FileWriter;
import java.io.IOException;

public class FileWrite {

    /**
     * 功能:将字符串写入到目标文件
     * @param filePath  目标文件路径
     * @param writeString  要写入文件的String
     */
    public static boolean IOFileWrite(String filePath,String writeString) {
        //初始化文件写对象
        FileWriter fwriter = null;
        try {
            // true表示不覆盖原来的内容,而是加到文件的后面。若要覆盖原来的内容,直接省略这个参数就好
            fwriter = new FileWriter(filePath, true);
            fwriter.write(writeString);
            return true;
        } catch (IOException ex) {
            ex.printStackTrace();
        } finally {
            try {
                fwriter.flush();
                fwriter.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
        return false;
    }
}

5.hive建表
BuildTableStatements.java

package com.dfjx.postgresql.pg2hive.buildingHiveTables;

import com.dfjx.postgresql.pg2hive.jdbc.MysqlConnection;
import com.dfjx.postgresql.pg2hive.utils.FileWrite;
import java.io.File;
import java.sql.*;

/**
 * BuildTableStatements
 * 类功能:
 *          1.批量拼接hive建表sql
 *          2.hive字段加注释
 */
public class BuildTableStatements {

    //初始化文件写类
    public static FileWrite fileWrite;

    public static String dept_name;       //部门名   该处指mysql中schema中表的名称
    public static String table_name;      //表字段   字段table_name内容
    public static String field;           //表字段   字段名称
    public static String field_type;      //表字段   字段类型
    public static String is_string;       //表字段   是否文本
    public static int field_number;       //表字段   字段序号
    public static String field_comment;   //表字段   字段注释
    public static String table_comment;   //表字段   表注释

    /**
     * create_table()
     * 功能:拼接hive建表语句
     *      stemp建普通表
     *      ods建分区表
     * 静态方法
     * @throws SQLException
     */
    public static void create_table() throws SQLException {
        //todo  查mysql库,获取需要的部门名称
        //初始化连接
        Connection connection1;
        MysqlConnection mysqlConnection1 = new MysqlConnection();
        connection1 = mysqlConnection1.getConnection();
        //sql statements
        String sql1 = "select table_name from information_schema.tables where table_schema='bmk_second'";
        PreparedStatement preparedStatement1 = connection1.prepareStatement(sql1);
        ResultSet rs1 = preparedStatement1.executeQuery();

        //todo 遍历结果集,拼接建表sql
        while (rs1.next()){
            //获取部门名(该处指的是mysql的表名)
            dept_name = rs1.getString(1);
            System.out.println("===================================    "+ dept_name + "   =================================");

            //todo 获取表字段信息:
            Connection connection2;
            MysqlConnection mysqlConnection2 = new MysqlConnection();
            connection2 = mysqlConnection2.getConnection();
            String sql2 = "select * from bmk_second." + dept_name;
            //预编译
            Statement stmt = connection2.createStatement(
                    ResultSet.TYPE_SCROLL_INSENSITIVE,   //TYPE_SCROLL_INSENSITIVE 该常量指示可滚动但通常不受其他的更改影响的 ResultSet 对象的类型。
                    ResultSet.CONCUR_READ_ONLY          //CONCUR_READ_ONLY 该常量指示不可以更新的 ResultSet 对象的并发模式。
            );
            ResultSet rs2 = stmt.executeQuery(sql2);

            // TODO: 2020/9/7  遍历结果集
            while (rs2.next()) {

                //获取表字段内容
                table_name = rs2.getString("table_name");
                field = rs2.getString("field");
                field_type = rs2.getString("field_type");
                is_string = rs2.getString("is_string");
                field_number = rs2.getInt("field_number");
                field_comment = rs2.getString("field_comment");
                table_comment = rs2.getString("table_comment");

                // TODO: 2020/9/7 建表
                /*
                 * 建表sql格式:
                 * drop table if exists stemp.stemp_bmk_sfpb_pkh;
                 * create table if not exists stemp.stemp_bmk_sfpb_pkh(`所在省` string,`所在市` string,`预留字段` string )
                 * row format delimited fields terminated by '\001' stored as textfile;
                 */
                String stemp_t1 = "drop table if exists stemp.stemp_bmk_" + dept_name + "_" + table_name + ";";
                String ods_t1 = "drop table if exists ods.ods_bmk_" + dept_name + "_" + table_name + ";";

                String t2 = "create table if not exists stemp.stemp_bmk_" + dept_name + "_" + table_name + "(" ;
                String ods_t2 = "create table if not exists ods.ods_bmk_" + dept_name + "_" + table_name + "(" ;

                String t3 =  "`" + field + "`" + " string,";
                String t5 =  "`" + field + "`" + " string )";
                String stemp_t4 = "row format delimited fields terminated by '\\001' stored as textfile;\n";
                String ods_t4 = "partitioned by(data_dt string) stored as orc;\n";

                //stemp     将建表语句写入到文件
                String stemp_filePath="src/main/output/stemp/"+dept_name;
                //创建文件夹
                File file2 = new File(stemp_filePath);
                file2.mkdir();

                //ods       将建表语句写入到文件
                String odsFilePath="src/main/output/ods/"+dept_name;
                //创建文件夹
                File file3 = new File(odsFilePath);
                file3.mkdir();

                //stemp     建表sql文件
                String stemp_filePathLoad = stemp_filePath+"/stemp_bmk_"+ dept_name + "_" + rs2.getString("table_name") + ".sql";
                //ods       建表sql文件
                String ods_FilePathLoad = odsFilePath+"/ods_bmk_"+ dept_name + "_" + rs2.getString("table_name") + ".sql";

                //将建表语句汇总
                //stemp  建表语句汇总地址
                String stemp_fileTotalPath = "src/main/output/stemp/"+"stemp_all.sql";
                //ods    建表语句汇总地址
                String ods_FileTotalPath = "src/main/output/ods/"+"ods_all.sql";

                if (field_number == 1) {
                    //当字段排序为1
                    //stemp
                    fileWrite.IOFileWrite(stemp_filePathLoad,stemp_t1+t2+t3);
                    fileWrite.IOFileWrite(stemp_fileTotalPath,stemp_t1+t2+t3);
                    //ods
                    fileWrite.IOFileWrite(ods_FilePathLoad,ods_t1 + ods_t2 + t3);
                    fileWrite.IOFileWrite(ods_FileTotalPath,ods_t1 +ods_t2 + t3);
                    System.out.println( rs2.getInt("field_number")+"          " + rs2.getString("table_name") +  "              111111111111111111111111111111111");

                } else if(rs2.next() && rs2.getInt("field_number")==1 ){
                    //当ResultSet下一个有值,且字段排序为1,表示下一张新表开始的第一个字段
                    System.out.println("指针上移之前:  "+rs2.getInt("field_number"));
                    rs2.previous(); //指针上移一位,因为if条件中rs2.next()指针下移了一位
                    System.out.println("指针上移之后:  "+rs2.getInt("field_number"));

                    //stemp
                    fileWrite.IOFileWrite(stemp_filePathLoad, t5 + stemp_t4);
                    fileWrite.IOFileWrite(stemp_fileTotalPath, t5 + stemp_t4);
                    //ods
                    fileWrite.IOFileWrite(ods_FilePathLoad, t5 + ods_t4);
                    fileWrite.IOFileWrite(ods_FileTotalPath, t5 + ods_t4);

                } else if(rs2.previous() && !rs2.next()){
                    //条件:因为上一个if判断条件中指针下移了一位,所以rs2.previous()一定为真,!rs2.next()判断下一个是否存在,即判断该指针位置是否是表的最后一条数据
                    rs2.previous();     //指针上移一位,是因为该if判断条件!rs2.next()使指针下移了一位
                    //stemp
                    fileWrite.IOFileWrite(stemp_filePathLoad, t5 + stemp_t4);
                    fileWrite.IOFileWrite(stemp_fileTotalPath, t5 + stemp_t4);
                    //ods
                    fileWrite.IOFileWrite(ods_FilePathLoad, t5 + ods_t4);
                    fileWrite.IOFileWrite(ods_FileTotalPath, t5 + ods_t4);
                } else {
                    rs2.previous();     //指针上移一位,因为上一个if条件指针上移了一位有下移了一位,但是上上一个if条件指针下移了一位
                    //stemp
                    fileWrite.IOFileWrite(stemp_filePathLoad,t3);
                    fileWrite.IOFileWrite(stemp_fileTotalPath,t3);
                    //ods
                    fileWrite.IOFileWrite(ods_FilePathLoad,t3);
                    fileWrite.IOFileWrite(ods_FileTotalPath,t3);
                }

                // TODO: 2020/9/7 给字段加注释语句
                //格式:alter table stemp.stemp_bmk_表名 change column bank_name bank_name string comment '分行/支行名称'
                String cm1 = "alter table ";
                String cm2 = "stemp.stemp_bmk_";
                String cm2_2 = "ods.ods_bmk_";
                String cm3 = " change column ";
                String cm4 = " string comment '";

                //给字段Comments文件位置
                String stempComentsTotalPath = "src/main/output/alterTalbeComments/"+"stemp_field_allComments.sql";   //stemp表
                String odsComentsTotalPath = "src/main/output/alterTalbeComments/"+"ods_field_allComments.sql";       //ods表
                //如果字段field_comment不为空,才写入到comments文件中
                if(field_comment != null){
                    fileWrite.IOFileWrite(stempComentsTotalPath,cm1 + cm2 + dept_name +"_"+ table_name+ cm3 + "`" + field + "` `" + field + "`" + cm4 + field_comment +"';\n");
                    fileWrite.IOFileWrite(odsComentsTotalPath,cm1 + cm2_2 + dept_name +"_"+ table_name+ cm3 + "`" + field + "` `" + field + "`" + cm4 + field_comment +"';\n");
                }

                // TODO: 2020/9/7 给表加注释
                /*
                 * 格式:ALTER TABLE 表名 SET TBLPROPERTIES ('comment' = '注释内容')
                 * 示例: ALTER TABLE da.shop_recharge_serial_monthly SET TBLPROPERTIES ('comment' = '财务月结数据表')
                 */
                String tb1= "ALTER TABLE ";
                String tb2 = " SET TBLPROPERTIES ('comment' = '";

                //给表Comments文件位置
                String stempTableComentsTotalPath = "src/main/output/alterTalbeComments/"+"stemp_table_allComments.sql";
                String odsTableComentsTotalPath = "src/main/output/alterTalbeComments/"+"ods_table_allComments.sql";

                //如果字段排序为第一个,字段table_comment不为空,才写入到comments文件中
                if(field_number==1 &&  table_comment != null){
                    fileWrite.IOFileWrite(stempTableComentsTotalPath,tb1+"stemp.stemp_bmk_"+dept_name+"_"+table_name + tb2 + table_comment+ "');" + "\n");
                    fileWrite.IOFileWrite(odsTableComentsTotalPath,tb1+"ods.ods_bmk_"+dept_name+"_"+table_name + tb2 + table_comment+ "');" + "\n");
                }
            }
        }
    }

    /**
     * 测试类
     * @param args
     * @throws SQLException
     */
/*    public static void main(String[] args) throws SQLException {
        BuildTableStatements.create_table();
    }*/
}

6.查询postgresql的select语句
SelectReplaceStatements.java

package com.dfjx.postgresql.pg2hive.postgresqlSelects;

import com.dfjx.postgresql.pg2hive.jdbc.MysqlConnection;
import com.dfjx.postgresql.pg2hive.utils.FileWrite;
import java.io.File;
import java.sql.*;

/**
 * SelectReplaceStatements
 * 功能:生成查询源服务器postgresql的查询语句
 */
public class SelectReplaceStatements {
    //初始化文件写类
    public static FileWrite fileWrite;

    //初始化变量
    public static String dept_name;   //部门名 同时指mysql中schema中表的名称
    public static String table_name;  //表字段 table_name
    public static String field;       //表字段 table_name
    public static String field_type;  //表字段 field_type
    public static String is_string;   //表字段 is_string
    public static int field_number;   //表字段 field_number

    /**
     * selectReplace()
     * 功能:生成查询源服务器postgresql的查询语句
     * 静态方法
     * sql样式:select replace(replace(ss1,chr(10),''),chr(13),'') as ss1,replace(replace(ss2,chr(10),''),chr(13),'') as ss2 from sfpb.pkh;
     * 说明:因为数据中有换行符,写到hdfs中时,一条数据会被分割成两条数据,所以通过replace全部替换为空字符串
     * @throws SQLException
     */
    public static void selectReplace() throws SQLException {

        //todo 01 查mysql库,获取表名(部门名称)
        Connection connection1;
        MysqlConnection mysqlConnection1 = new MysqlConnection();
        connection1 = mysqlConnection1.getConnection();
        //sql statements
        String sql1 = "select table_name from information_schema.tables where table_schema='bmk_second'";
        PreparedStatement preparedStatement1 = connection1.prepareStatement(sql1);
        ResultSet rs1 = preparedStatement1.executeQuery();

        // TODO: 2020/9/7 遍历结果集
        while (rs1.next()){
            //获取部门名(这里指的是mysql的表名)
            dept_name = rs1.getString(1);
            System.out.println("===================================    "+ dept_name + "   =================================");

            // TODO: 2020/9/7 获取字段信息 :table_name,field,field_type,is_string,field_number
            Connection connection2;
            MysqlConnection mysqlConnection2 = new MysqlConnection();
            connection2 = mysqlConnection2.getConnection();
            String sql2 = "select * from bmk_second." + dept_name;
            Statement stmt = connection2.createStatement(
                    ResultSet.TYPE_SCROLL_INSENSITIVE,   //TYPE_SCROLL_INSENSITIVE 该常量指示可滚动但通常不受其他的更改影响的 ResultSet 对象的类型。
                    ResultSet.CONCUR_READ_ONLY          //CONCUR_READ_ONLY 该常量指示不可以更新的 ResultSet 对象的并发模式。
            );
            ResultSet rs2 = stmt.executeQuery(sql2);
            while (rs2.next()) {
                table_name = rs2.getString("table_name");
                field = rs2.getString("field");
                field_type = rs2.getString("field_type");
                is_string = rs2.getString("is_string");
                field_number = rs2.getInt("field_number");

                /**
                 * sql样式
                 * select replace(replace(ss1,chr(10),''),chr(13),'') as ss1,replace(replace(ss2,chr(10),''),chr(13),'') as ss2 from sfpb.pkh;
                 */
                String s1 = "select ";
                String s2 = "replace(replace(";
                String s3 = ",chr(10),''),chr(13),'') as ";
                String s4 = " from ";

                //select语句目标文件夹
                String filePath="src/main/output/selects/"+dept_name;
                //创建文件夹
                File file2 = new File(filePath);
                file2.mkdir();
                //select语句目标文件
                String filePathLoad = filePath+"/"+ rs2.getString("table_name") + ".sql";

                if (field_number == 1) {

                    //当字段排序为1
                    String s = "是";
                    if (rs2.getString("is_string").equalsIgnoreCase(s)) {
                        fileWrite.IOFileWrite(filePathLoad,s1 + s2 + field + s3 + field + ",");
                        System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>是");
                    } else {
                        fileWrite.IOFileWrite(filePathLoad,s1 + field + ",");
                        System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>否");
                    }

                } else if(rs2.next() &&  rs2.getInt("field_number" ) == 1){

                    //当ResultSet下一个有值,且字段排序为1,表示下一张新表开始的第一个字段
                    rs2.previous(); //指针上移一位,因为if条件中rs2.next()指针下移了一位
                    if (rs2.getString("is_string").equalsIgnoreCase("是")) {
                        fileWrite.IOFileWrite(filePathLoad, s2 + field + s3 + field + s4 + dept_name + "." + table_name + ";");
                    } else {
                        fileWrite.IOFileWrite(filePathLoad, field + s4 + dept_name + "." + table_name + ";");
                    }

                } else if(rs2.previous() && !rs2.next()){
                    //条件:因为上一个if判断条件中指针下移了一位,所以rs2.previous()一定为真,!rs2.next()判断下一个是否存在,即判断该指针位置是否是表的最后一条数据
                    rs2.previous();     //指针上移一位,是因为该if判断条件!rs2.next()使指针下移了一位
                    if (rs2.getString("is_string").equalsIgnoreCase("是")) {
                        fileWrite.IOFileWrite(filePathLoad, s2 + field + s3 + field + s4 + dept_name + "." + table_name + ";");
                    } else {
                        fileWrite.IOFileWrite(filePathLoad,  field + s4 + dept_name + "." + table_name + ";");
                    }

                } else {

                    rs2.previous();     //指针上移一位,因为上一个if条件指针上移了一位有下移了一位,但是上上一个if条件指针下移了一位
                    if (rs2.getString("is_string").equalsIgnoreCase("是")) {
                        fileWrite.IOFileWrite(filePathLoad, s2 + field + s3 + field + ",");
                    } else {
                        fileWrite.IOFileWrite(filePathLoad, field + ",");
                    }

                }
            }
        }
    }

    /**
     * 测试类
     * @param args
     * @throws SQLException
     */
/*    public static void main(String[] args) throws SQLException {
        SelectReplaceStatements.selectReplace();
    }*/
}

7.将4中生成的select语句插入到bmk库中
InsertSelectsIntoTables.java

package com.dfjx.postgresql.pg2hive.postgresqlSelects;

import com.dfjx.postgresql.pg2hive.jdbc.MysqlConnection;

import java.io.*;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class InsertSelectsIntoTables {

    public static String dept_name;
    public static String table_name;
    public static String selectStatements;

    /**
     * getFileDirOrName(String path)
     * 功能:将SelectReplaceStatements生成的select语句插入到mysql库里面
     * @param path
     * @throws SQLException
     * @throws IOException
     */
    public static void getFileDirOrName(String path) throws SQLException, IOException {

        Connection connection1;
        MysqlConnection mysqlConnection1 = new MysqlConnection();
        connection1 = mysqlConnection1.getConnection();

        // TODO: 2020/9/7 读src/main/output/selects/下的文件
        File dirFile = new File(path);
        if (dirFile.exists()) {
            File[] files = dirFile.listFiles();
            if (files != null) {
                for (File fileChildDir : files) {
                    if (fileChildDir.isDirectory()) {
                       dept_name = fileChildDir.getName();

                        //建表sqlStatements
                        String sql1 = "CREATE TABLE `" + dept_name + "` (`table_name` varchar(1000) DEFAULT NULL,`sql_statements` varchar(18000) DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
                        PreparedStatement preparedStatement1 = connection1.prepareStatement(sql1);
                        preparedStatement1.execute();

                        //递归调用,获取文件夹名称
                        System.out.println(fileChildDir.getAbsolutePath());
                        getFileDirOrName(fileChildDir.getAbsolutePath());

                    } else {

                        //是文件,则获取文件名称
                        String absolutePath = fileChildDir.getAbsolutePath();
                        File file2 = new File(absolutePath);
                        FileReader reader = new FileReader(file2);//定义一个fileReader对象,用来初始化BufferedReader
                        BufferedReader bReader = new BufferedReader(reader);//new一个BufferedReader对象,将文件内容读取到缓存
                        StringBuilder sb = new StringBuilder();//定义一个字符串缓存,将字符串存放缓存中
                        String s = "";
                        while ((s =bReader.readLine()) != null) {//逐行读取文件内容,不读取换行符和末尾的空格
                            sb.append(s + "\n");//将读取的字符串添加换行符后累加存放在缓存中
                            System.out.println(s);
                        }

                        bReader.close();

                        selectStatements = sb.toString();
                        System.out.println(selectStatements );

                        String file_name = fileChildDir.getName();
                        System.out.println(file_name);
                        String[] splits = file_name.split("\\.");
                        table_name = splits[0];
                        System.out.println(table_name + "   =================");

                        String sql2 = "insert into " + dept_name + "(table_name,sql_statements) values(?,?)";
                        PreparedStatement preparedStatement1 = connection1.prepareStatement(sql2);
                        preparedStatement1.setString(1, table_name);//设置占位符的值
                        preparedStatement1.setString(2, selectStatements);
                        preparedStatement1.execute();

                    }
                }
            }
        }
    }


/*    public static void main(String[] args) throws SQLException, IOException {
        InsertSelectsIntoTables .getFileDirOrName("src/main/output/selects");
    }
    */
}

8.生成tbds工作流xml文件和load hdfs文件的脚本
FileDetailUtilsPlus.java

package com.dfjx.postgresql.pg2hive.workFlowXml;
/**
 * 类功能:通过读库表获取需要修改的值信息,修改xml文件对应的占位符,生成tbds workflow xml文件
 */
import com.dfjx.postgresql.pg2hive.jdbc.MysqlConnection;
import com.dfjx.postgresql.pg2hive.utils.BytesStream;
import com.dfjx.postgresql.pg2hive.utils.FileWrite;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;

public class FileDetailUtilsPlus {

    //初始化文件写类
    public static FileWrite fileWrite;

    //初始化xml中占位符变量
    public static String folderId = null;
    public static String folderName = null;
    public static String workflowId = null;
    public static String workflowName = null;
    public static String workflowDesc = null;
    public static String dateNow = null;
    public static String taskId1 = null;
    public static String taskName1 = null;
    public static String dateYesZero = null;
    public static String sqlFileName = null;
    public static String taskId2 = null;
    public static String taskName2 = null;
    public static String sourceServer = null;
    public static String sqlStatements = null;
    public static String targetPath = null;
    public static String bridgeId = null;

    //todo 读取预制xml模板,返回值:InputStream
    //xml文件头
    InputStream instreamHead = this.getClass().getClassLoader().getResourceAsStream("workflow_head.xml");
    //xml文件肚
    InputStream instreamWorkFlow = this.getClass().getClassLoader().getResourceAsStream("workflow_belly.xml");
    //xml文件尾
    InputStream instreamLast = this.getClass().getClassLoader().getResourceAsStream("workflow_last.xml");

    /**
     * parsingXml()
     * 功能:解析xml文件模板,生成workflow的xml文件
     * 静态方法
     * @throws IOException
     * @throws SQLException
     */
    public static void parsingXml() throws IOException, SQLException {

        //todo 读取xml文件,返回String
        //初始化类对象
        FileDetailUtilsPlus fileDetailUtils = new FileDetailUtilsPlus();
        //获取xml文件头,返回值:String
        String instreamHeadOldXml = new String(BytesStream.read(fileDetailUtils.instreamHead), "UTF-8");
        //获取xml文件肚,返回值:String
        String instreamWorkFlowOldXML = new String(BytesStream.read(fileDetailUtils.instreamWorkFlow), "UTF-8");
        //获取xml文件尾,返回值:String
        String instreamLastOldXML = new String(BytesStream.read(fileDetailUtils.instreamLast), "UTF-8");

        //todo 查询从postgresql中导出的表结构信息
        //初始化mysql连接
        Connection connection1;
        MysqlConnection mysqlConnection1 = new MysqlConnection();
        connection1 = mysqlConnection1.getConnection();
        //sql statements
        String sql1 = "select table_name from information_schema.tables where table_schema='bmk'";
        //预编译
        PreparedStatement preparedStatement1 = connection1.prepareStatement(sql1);
        //提交
        ResultSet rs1 = preparedStatement1.executeQuery();

        //todo 遍历ResultSet,修改xml文件占位符
        while (rs1.next()){
            //获取mysql的表名称作为部门名
            String dept_name = rs1.getString(1);
            System.out.println("===================================    "+ dept_name + "   =================================");
            //替换占位符: folderId   格式:uuid
            folderId = UUID.randomUUID().toString();
            //替换占位符:folderName  格式:bmk_部门名
            folderName = "bmk_" + dept_name;
            //替换workflow_head的占位符内容
            String instreamHeadNewXml = instreamHeadOldXml
                    .replaceAll("\\$folderId", folderId)
                    .replaceAll("\\$folderName", folderName);
            //将新的表头内容写入到一个新文件,文件名:workflow_部门名
            String filePath1 = "src/main/output/workFlows/"+"workflow_" + dept_name + ".xml";   //xml文件的目标路径

            //将workflow_head类容写入到目标文件中
            fileWrite.IOFileWrite(filePath1,instreamHeadNewXml);

            //todo 查询表字段内容:table_name,field
            Connection connection2;
            MysqlConnection mysqlConnection2 = new MysqlConnection();
            connection2 = mysqlConnection2.getConnection();
            //sql statements
            String sql2 = "select * from bmk." + dept_name;
            PreparedStatement preparedStatement2 = connection2.prepareStatement(sql2);
            ResultSet rs2 = preparedStatement2.executeQuery();

            //todo 遍历ResultSet,修改xml文件占位符
            while (rs2.next()) {
                //01    替换占位符:workflowId  格式:uuid
                workflowId = UUID.randomUUID().toString();
                //02    替换占位符:workflowName   bmk_部门名_表名
                workflowName="bmk_"+ dept_name +"_"+ rs2.getString("table_name"); //该处为字段名为table_name的内容
                //03    替换占位符:workflowDesc 格式:字段table_name的内容
                workflowDesc = rs2.getString("table_name");
                //04    替换占位符:dateNow 当前时间  格式:2020-07-21 16:44:18
                DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                Date date = new Date();             //获取当前时间
                dateNow  = format.format(date);     //格式化日期
                //05    替换占位符:taskId1   格式:uuid
                taskId1 = UUID.randomUUID().toString();
                //06    替换占位符:taskName1 格式:ods_bmk_部门名称_表名称
                taskName1 = "ods_bmk_" + dept_name + "_" + rs2.getString("table_name");
                //07    替换占位符:dateYesZero 昨天凌晨 格式:2020-07-20 00:00:00
                DateFormat format2 = new SimpleDateFormat("yyyy-MM-dd");
                String yesterDay  = format2.format(new Date(System.currentTimeMillis()-1000*60*60*24));  //昨天日期
                dateYesZero = yesterDay + " 00:00:00";
                //08    替换占位符:sqlFileName   格式:/ftp/dsjj_user01/all/ods_bmk_部门名称_表名称.sql
                sqlFileName  ="/ftp/dsjj_user01/all/ods_bmk_" +dept_name + "_" +  rs2.getString("table_name")+".sql";
                System.out.println("sqlFileName: "+sqlFileName);
                //09    替换占位符:taskId2   格式:uuid
                taskId2 = UUID.randomUUID().toString();
                //10    替换占位符:taskName2   pth_bmk_部门名_表名
                taskName2 = "pth_bmk_" +dept_name + "_"  + rs2.getString("table_name");
                //11    替换占位符:sourceServer 源服务器 格式:"tbase"
                sourceServer = "tbase";
                //12    替换占位符:sqlStatements 格式:select replace(replace(cqdc_sjscbs,chr(10),''),chr(13),'') as cqdc_sjscbs from cqsxfjyzd.xfcl;
                String sql_statements_tmp = rs2.getString("sql_statements");
                String[] split = sql_statements_tmp.split("\n");//因tbds解析问题需要去除换行符
                sqlStatements = split[0];
                //13    替换占位符:targetPath hdfs文件路径   格式:/user/dsjj_user01/bmk/部门名/表名
                targetPath = "/user/dsjj_user01/bmk/"+ dept_name + "/" + rs2.getString("table_name");
                //14    替换占位符:bridgeId 多个有级联的task之间的桥接 格式:uuid
                bridgeId = UUID.randomUUID().toString();
                //替换workflow文件肚的占位符内容
                String instreamWorkFlowNewXML = instreamWorkFlowOldXML
                        .replaceAll("\\$workflowId", workflowId)
                        .replaceAll("\\$workflowName", workflowName)
                        .replaceAll("\\$workflowDesc", workflowDesc)
                        .replaceAll("\\$dateNow", dateNow)
                        .replaceAll("\\$taskId1", taskId1)
                        .replaceAll("\\$taskName1", taskName1)
                        .replaceAll("\\$dateYesZero", dateYesZero)
                        .replaceAll("\\$sqlFileName", sqlFileName)
                        .replaceAll("\\$taskId2", taskId2)
                        .replaceAll("\\$taskName2", taskName2)
                        .replaceAll("\\$sourceServer", sourceServer)
                        .replaceAll("\\$sqlStatements", sqlStatements)
                        .replaceAll("\\$targetPath", targetPath)
                        .replaceAll("\\$bridgeId", bridgeId);

                //将文件肚内容写入到对应的xml文件中
                fileWrite.IOFileWrite(filePath1,instreamWorkFlowNewXML);

                //todo 从targetPath目标路径load hdfs文件到hive,并插入到分区表
                /**
                 * insert overwrite table ods.ods_bmk_cqsxfjyzd_xfcl partition(data_dt='${YYYYMMDD}') select * from stemp.stemp_bmk_cqsxfjyzd_xfcl;
                 * 说明:partition(data_dt='${YYYYMMDD}')
                 *       ${YYYYMMDD}这个写法为tbds的特有的功能,自动获取当前日期作为分区字段日期
                 */
                //load文件的hiveSql内容
                String hsqlShell = "load data inpath \'" + "/user/dsjj_user01/bmk/"+dept_name+ "/" + rs2.getString("table_name") +"/*" + "\' overwrite into table stemp.stemp_bmk_" +dept_name + "_" +  rs2.getString("table_name") +";"
                        +"\n"+"insert overwrite table ods.ods_bmk_"+ dept_name + "_" + rs2.getString("table_name") + " partition(data_dt='${YYYYMMDD}') select * from stemp.stemp_bmk_" + dept_name + "_" + rs2.getString("table_name")+";";
                System.out.println(hsqlShell);
                //hiveSql目标文件夹路径
                String filePath="src/main/output/hsqls/"+dept_name;
                //创建目标文件夹
                File file2 = new File(filePath);
                file2.mkdir();
                //hiveSql目标sql文件
                String filePathLoad = filePath+"\\ods_bmk_"+ dept_name + "_" + rs2.getString("table_name") + ".sql";
                //将hiveSql脚本内容写入到目标文件中
                fileWrite.IOFileWrite(filePathLoad,hsqlShell);
            }
            //将文件尾内容写入到对应的xml文件中
            fileWrite.IOFileWrite(filePath1,instreamLastOldXML);
        }
    }

    /**
     * 测试类
     */
/*    public static void main(String[] args) throws IOException, SQLException {
        FileDetailUtilsPlus.parsingXml();
    }*/
}
  1. 程序入口
    PgMainEntry.java
package com.dfjx.postgresql.mainEntry;

import com.dfjx.postgresql.pg2hive.buildingHiveTables.BuildTableStatements;
import com.dfjx.postgresql.pg2hive.postgresqlSelects.InsertSelectsIntoTables;
import com.dfjx.postgresql.pg2hive.postgresqlSelects.SelectReplaceStatements;
import com.dfjx.postgresql.pg2hive.workFlowXml.FileDetailUtilsPlus;

import java.io.IOException;
import java.sql.SQLException;

public class PgMainEntry {
    public static void main(String[] args) throws SQLException, IOException {
        //hive建表
        BuildTableStatements.create_table();
        //查询postgreSQL的select语句
        SelectReplaceStatements.selectReplace();
        //将select语句插入到mysql库bmk中
        InsertSelectsIntoTables.getFileDirOrName("src/main/output/selects");
        //生成tbds工作流xml文件
        FileDetailUtilsPlus.parsingXml();
    }
}

五、知识点

1.通过操作ResultSet的指针来判断下一个值状态

if (field_number == 1) {
                    //当字段排序为1
                    //stemp
                    fileWrite.IOFileWrite(stemp_filePathLoad,stemp_t1+t2+t3);
                    fileWrite.IOFileWrite(stemp_fileTotalPath,stemp_t1+t2+t3);
                    //ods
                    fileWrite.IOFileWrite(ods_FilePathLoad,ods_t1 + ods_t2 + t3);
                    fileWrite.IOFileWrite(ods_FileTotalPath,ods_t1 +ods_t2 + t3);
                    System.out.println( rs2.getInt("field_number")+"          " + rs2.getString("table_name") +  "              111111111111111111111111111111111");

                } else if(rs2.next() && rs2.getInt("field_number")==1 ){
                    //当ResultSet下一个有值,且字段排序为1,表示下一张新表开始的第一个字段
                    System.out.println("指针上移之前:  "+rs2.getInt("field_number"));
                    rs2.previous(); //指针上移一位,因为if条件中rs2.next()指针下移了一位
                    System.out.println("指针上移之后:  "+rs2.getInt("field_number"));

                    //stemp
                    fileWrite.IOFileWrite(stemp_filePathLoad, t5 + stemp_t4);
                    fileWrite.IOFileWrite(stemp_fileTotalPath, t5 + stemp_t4);
                    //ods
                    fileWrite.IOFileWrite(ods_FilePathLoad, t5 + ods_t4);
                    fileWrite.IOFileWrite(ods_FileTotalPath, t5 + ods_t4);

                } else if(rs2.previous() && !rs2.next()){
                    //条件:因为上一个if判断条件中指针下移了一位,所以rs2.previous()一定为真,!rs2.next()判断下一个是否存在,即判断该指针位置是否是表的最后一条数据
                    rs2.previous();     //指针上移一位,是因为该if判断条件!rs2.next()使指针下移了一位
                    //stemp
                    fileWrite.IOFileWrite(stemp_filePathLoad, t5 + stemp_t4);
                    fileWrite.IOFileWrite(stemp_fileTotalPath, t5 + stemp_t4);
                    //ods
                    fileWrite.IOFileWrite(ods_FilePathLoad, t5 + ods_t4);
                    fileWrite.IOFileWrite(ods_FileTotalPath, t5 + ods_t4);
                } else {
                    rs2.previous();     //指针上移一位,因为上一个if条件指针上移了一位有下移了一位,但是上上一个if条件指针下移了一位
                    //stemp
                    fileWrite.IOFileWrite(stemp_filePathLoad,t3);
                    fileWrite.IOFileWrite(stemp_fileTotalPath,t3);
                    //ods
                    fileWrite.IOFileWrite(ods_FilePathLoad,t3);
                    fileWrite.IOFileWrite(ods_FileTotalPath,t3);
                }

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343