摘要:Java
,Impala
,Parquet
导数流程
通过Java直接在HDFS上落CSV数据文件,通过Imapal的load语法将CSV映射成二维临时表,最后临时表写入分区表
建表
先构建目标表,parquet格式以dt字段分区
CREATE TABLE fin_operation.score_week (
uid STRING,
update_week STRING,
update_date STRING,
ent_name STRING,
model_base_risk DOUBLE,
model_related_risk DOUBLE,
model_seq_risk DOUBLE,
model_rule_risk DOUBLE,
score DOUBLE,
score_grade STRING,
reason_code STRING)
PARTITIONED BY (
`dt` string
)
COMMENT '评分存量表'
STORED AS PARQUET;
在构建一张临时表,格式是默认的textfile,指定分割符号是|
CREATE TABLE fin_operation.score_week_tmp (
uid STRING,
update_week STRING,
update_date STRING,
ent_name STRING,
model_base_risk DOUBLE,
model_related_risk DOUBLE,
model_seq_risk DOUBLE,
model_rule_risk DOUBLE,
score DOUBLE,
score_grade STRING,
reason_code STRING)
COMMENT '评分临时表'
row format delimited fields terminated by '|';
impala load导入HDFS到impala不能直接支持text文件到parquet,因此先load到一张textfile格式的表,分隔符需要提前指定,在JAVA API往HDFS写数据时保持一致的分隔符
Java代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.net.URI;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class SaveImpala {
public static void main(String[] args) throws SQLException, ClassNotFoundException {
List<List<String>> data = new ArrayList<>();
List<String> list1 = new ArrayList<>();
list1.add("qq");
list1.add("123333");
list1.add("123");
list1.add("123");
list1.add("123");
list1.add("123");
list1.add("123");
list1.add("123");
list1.add("123");
list1.add("123");
list1.add("123");
data.add(list1);
List<String> list2 = new ArrayList<>();
list2.add("33");
list2.add("234");
list2.add("234");
list2.add("234");
list2.add("234");
list2.add("234");
list2.add("234");
list2.add("234");
list2.add("234");
list2.add("234");
list2.add("234");
data.add(list2);
String hdfsPath = "/tmp/test_data.csv";
String tmpTable = "fin_operation.score_week_tmp";
String parquetTabel = "fin_operation.score_week";
String dt = "20201214";
createFile(hdfsPath, data);
ImpalaUtils.getInstance().loadTextToParquet(hdfsPath, tmpTable, parquetTabel, dt);
}
public static void createFile(String filePath, List<List<String>> data) {
try (FileSystem fileSystem = FileSystem.get(new URI("hdfs://xxx.xxx.xxx.xxx:8020"), new Configuration(), "impala");
FSDataOutputStream outputStream = fileSystem.create(new Path(filePath))) {
StringBuilder sb = new StringBuilder();
for (List<String> arg : data) {
for (String value : arg) {
sb.append(value).append("|");
}
sb.deleteCharAt(sb.length() - 1);
sb.append("\n");
}
sb.deleteCharAt(sb.length() - 1);
byte[] contents = sb.toString().getBytes();
outputStream.write(contents);
System.out.println("文件创建成功!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
ImpalaUtils
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
public class ImpalaUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(ImpalaUtils.class);
private Connection conn = null;
private static ImpalaUtils instance = null;
public ImpalaUtils() throws ClassNotFoundException, SQLException {
Class.forName("org.apache.hive.jdbc.HiveDriver");
conn = DriverManager.getConnection("jdbc:hive2://xxx.xxx.xxx.xxx:21050", "cdh_dev", "123456");
}
public static ImpalaUtils getInstance() throws SQLException, ClassNotFoundException {
if (instance == null) {
synchronized (ImpalaUtils.class) {
if (instance == null) {
instance = new ImpalaUtils();
}
}
}
return instance;
}
public void loadTextToParquet(String filePath, String tmpTable, String parquetTable, String dt) {
Statement statement = null;
try {
statement = ImpalaUtils.getInstance().conn.createStatement();
statement.executeUpdate(String.format("LOAD DATA INPATH 'hdfs://%s' OVERWRITE INTO TABLE %s", filePath, tmpTable));
LOGGER.info("hdfs导入临时表完成");
statement.executeUpdate(String.format("INSERT OVERWRITE TABLE %s PARTITION(dt='%s') select * from %s", parquetTable, dt, tmpTable));
LOGGER.info("临时表导入parquet完成");
} catch (Exception e) {
LOGGER.error("hdfs导入parquet失败");
LOGGER.error(e.getMessage());
} finally {
if (null != statement) {
try {
statement.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}