1. 背景
hive是基于hadoop的一个数据仓库工具,用来进行数据提取、转化、加载,这是一种可以存储、查询和分析存储在Hadoop中的大规模数据的机制。hive数据仓库工具能将结构化的数据文件映射为一张数据库表,并提供sql查询功能,能将sql转变成MapReduce任务来执行。Hive的优点是学习成本低,可以通过类似SQL语句实现快速MapReduce统计,使MapReduce变得更加简单,而不必开发专门的MapReduce应用程序。hive是十分适合数据仓库的统计分析和Windows注册表文件。
2. 目标
本文的目标,从一个简单的语法DESC TABLE来梳理Hive的代码,介绍hive针对这个语法的编译和运行原理。从而让大家对hive有个整体的了解。
3. Apache Hive代码初读
3.1 入口函数
/hive/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
public static void main(String[] args)throws Exception {
int ret =new CliDriver().run(args);
System.exit(ret);
}
main函数是入口,新建了一个CliDriver对象,并且传入参数执行了run函数。
之后的执行路径如下,不在赘述:
a. /hive/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java:run()
b. /hive/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java:executeDriver()
c. /hive/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java:processLine()
d. /hive/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java:processCmd
e. /hive/cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java: processLocalCmd
f./hive/ql/src/java/org/apache/hadoop/hive/ql/Driver.java: run()
g. /hive/ql/src/java/org/apache/hadoop/hive/ql/Driver.java:runInternal()
h. /hive/ql/src/java/org/apache/hadoop/hive/ql/Driver.java:compileInternal()
i. /hive/ql/src/java/org/apache/hadoop/hive/ql/Driver.java:compile()
3.2 编译过程
a. 生成抽象语法树:这一步主要是把describe table这样的sql语句转成一个语法书,过程比较复杂,涉及到编译原理的知识,不在此展开。大致是检查语法是否错误,识别sql中包含的操作实体,以及对每个实体的操作。
ParseDriver pd =new ParseDriver();
ASTNode tree = pd.parse(command,ctx);
b. 语义分析
sem.analyze(tree,ctx);
由于desc是DDL操作,实际进行语义分析的类是:/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java:analyzeDescribeTable(),详细分析下这个函数:
private void analyzeDescribeTable(ASTNode ast)throws SemanticException {
ASTNode tableTypeExpr = (ASTNode) ast.getChild(0);
//找出表的全路径名如:sytem.availability
String qualifiedName =
QualifiedNameUtil.getFullyQualifiedName((ASTNode) tableTypeExpr.getChild(0));
//从metastore里面验证该表是否存在
String tableName =
QualifiedNameUtil.getTableName(db, (ASTNode)(tableTypeExpr.getChild(0)));
//从metastore里面验证数据库是否存在
String dbName =
QualifiedNameUtil.getDBName(db, (ASTNode)(tableTypeExpr.getChild(0)));
//从metastore里面验证分区是否存在
Map partSpec =
QualifiedNameUtil.getPartitionSpec(db, tableTypeExpr, tableName);
//describe操作可以精确到列,查看列的创建情况
String colPath = QualifiedNameUtil.getColPath(
db, tableTypeExpr, (ASTNode) tableTypeExpr.getChild(0), qualifiedName, partSpec);
// if database is not the one currently using
// validate database
if (dbName !=null) {
validateDatabase(dbName);
}
if (partSpec !=null) {
validateTable(tableName, partSpec);
}
//创建一个DesctableDesc对象,该对象指示执行describe任务时,要做的事。支持参数:formatted, extended, pretty等3中描述方式
DescTableDesc descTblDesc =new DescTableDesc(
ctx.getResFile(), tableName, partSpec, colPath);
boolean showColStats =false;
if (ast.getChildCount() ==2) {
int descOptions = ast.getChild(1).getType();
descTblDesc.setFormatted(descOptions == HiveParser.KW_FORMATTED);
descTblDesc.setExt(descOptions == HiveParser.KW_EXTENDED);
descTblDesc.setPretty(descOptions == HiveParser.KW_PRETTY);
// in case of "DESCRIBE FORMATTED tablename column_name" statement, colPath
// will contain tablename.column_name. If column_name is not specified
// colPath will be equal to tableName. This is how we can differentiate
// if we are describing a table or column
if (!colPath.equalsIgnoreCase(tableName) && descTblDesc.isFormatted()) {
showColStats =true;
}
}
inputs.add(new ReadEntity(getTable(tableName)));
//创建task,这里创建DDLTask
Task ddlTask = TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
descTblDesc),conf);
rootTasks.add(ddlTask);
String schema = DescTableDesc.getSchema(showColStats);
setFetchTask(createFetchTask(schema));
LOG.info("analyzeDescribeTable done");
}
语义分析的过程是:根据ASTNode区分要执行的操作,如DDL,EXPLAIN, DML等,分别对待这些操作,DDL这类操作不需要执行mapreduce任务,直接从metastore里面取出数据返回。最后生成对应的Task
3.2 任务执行过程
生成DDLTask之后就会去运行任务,执行DDLTask里的execute()函数:/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java:execute(),接着根据describe talbe操作调用:describeTable()函数。
private int describeTable(Hive db, DescTableDesc descTbl)throws HiveException {
String colPath = descTbl.getColumnPath();
String tableName = descTbl.getTableName();
// describe the table - populate the output stream
Table tbl = db.getTable(tableName,false);
Partition part =null;
DataOutputStream outStream =null;
try {
Path resFile =new Path(descTbl.getResFile());
if (tbl ==null) {
FileSystem fs = resFile.getFileSystem(conf);
outStream = fs.create(resFile);
outStream.close();
outStream =null;
throw new HiveException(ErrorMsg.INVALID_TABLE, tableName);
}
if (descTbl.getPartSpec() !=null) {
part = db.getPartition(tbl, descTbl.getPartSpec(),false);
if (part ==null) {
FileSystem fs = resFile.getFileSystem(conf);
outStream = fs.create(resFile);
outStream.close();
outStream =null;
throw new HiveException(ErrorMsg.INVALID_PARTITION,
StringUtils.join(descTbl.getPartSpec().keySet(),','), tableName);
}
tbl = part.getTable();
}
}catch (IOException e) {
throw new HiveException(e, ErrorMsg.GENERIC_ERROR, tableName);
}finally {
IOUtils.closeStream(outStream);
}
try {
LOG.info("DDLTask: got data for " + tbl.getTableName());
Path resFile =new Path(descTbl.getResFile());
FileSystem fs = resFile.getFileSystem(conf);
outStream = fs.create(resFile);
List cols =null;
List colStats =null;
if (colPath.equals(tableName)) {
cols = (part ==null || tbl.getTableType() == TableType.VIRTUAL_VIEW) ?
tbl.getCols() : part.getCols();
if (!descTbl.isFormatted()) {
cols.addAll(tbl.getPartCols());
}
}else {
Deserializer deserializer = tbl.getDeserializer(true);
if (deserializerinstanceof AbstractSerDe) {
String errorMsgs = ((AbstractSerDe) deserializer).getConfigurationErrors();
if (errorMsgs !=null && !errorMsgs.isEmpty()) {
throw new SQLException(errorMsgs);
}
}
cols = Hive.getFieldsFromDeserializer(colPath, deserializer);
if (descTbl.isFormatted()) {
// when column name is specified in describe table DDL, colPath will
// will be table_name.column_name
String colName = colPath.split("\\.")[1];
String[] dbTab = Utilities.getDbTableName(tableName);
List colNames =new ArrayList();
colNames.add(colName.toLowerCase());
if (null == part) {
colStats = db.getTableColumnStatistics(dbTab[0].toLowerCase(), dbTab[1].toLowerCase(), colNames);
}else {
List partitions =new ArrayList();
partitions.add(part.getName());
colStats = db.getPartitionColumnStatistics(dbTab[0].toLowerCase(), dbTab[1].toLowerCase(), partitions, colNames).get(part.getName());
}
}
}
fixDecimalColumnTypeName(cols);
// In case the query is served by HiveServer2, don't pad it with spaces,
// as HiveServer2 output is consumed by JDBC/ODBC clients.
boolean isOutputPadded = !SessionState.get().isHiveServerQuery();
formatter.describeTable(outStream, colPath, tableName, tbl, part,
cols, descTbl.isFormatted(), descTbl.isExt(),
descTbl.isPretty(), isOutputPadded, colStats);
LOG.info("DDLTask: written data for " + tbl.getTableName());
outStream.close();
outStream =null;
}catch (SQLException e) {
throw new HiveException(e, ErrorMsg.GENERIC_ERROR, tableName);
}catch (IOException e) {
throw new HiveException(e, ErrorMsg.GENERIC_ERROR, tableName);
}finally {
IOUtils.closeStream(outStream);
}
return 0;
}
这段代码主要是根据tableName从metastore里面读出创建表的元数据,按照schema格式输出。细节大家可以再细读。
4.总结
本文主要介绍了describe table这个操作在Hive中的编译运行过程。可以看出来decribe table操作比较简单,从metastore里面取出表的元数据输出。通过这个操作的代码追踪我们也可以得出一些结论:
a. hive根据不同的操作,区分了不同的类型,如DDL, EXPLAIN, DML操作等,分别有不同的编译和运行方法
b. 对于DDL操作,无需执行mapreduce任务,直接从metastore里面读取数据