hql的执行过程
hive在执行用户给定的hql时,会经过如下步骤:
- 语法解析
Antlr定义SQL的语法规则,完成SQL词法,语法解析,将SQL转化为抽象 语法树AST Tree;
- 语义解析
遍历AST Tree,抽象出查询的基本组成单元QueryBlock;
生成逻辑执行计划:遍历QueryBlock,翻译为执行操作树OperatorTree;
- 优化逻辑执行计划
逻辑层优化器进行OperatorTree变换,合并不必要的ReduceSinkOperator,减少shuffle数据量;
- 生成物理执行计划
遍历OperatorTree,翻译为MapReduce任务;
- 优化物理执行计划
物理层优化器进行MapReduce任务的变换,生成最终的执行计划;
Hive SQL的编译过程
hive sql是如何被编译成mapreduce的,参考https://tech.meituan.com/hive-sql-to-mapreduce.html
使用hive API获取hql的执行计划、lineage关系
参考http://lxw1234.com/archives/2015/09/476.htm
最后也给了一个获取hql的lineage关系的小示例,
flamy hive graph
flamy是hive管理工具,它提供的一个功能就是根据给定的hql绘制会各个表的关系依赖graph。
flamy-demo提供了flamy的使用示例,这里只展示graph相关的功能使用。
cd flamy-demo
tree conf
-----------------
conf
├── flamy.properties
└── log4j2.properties
在conf文件夹下,flamy.properties是flamy的配置文件。配置了本地hsql文件的存放位置,已经hive连接相关的属性
cat flamy.properties
---------------------
### Flaminem project
flamy.model.dir.paths = model
flamy.variables.path = ${flamy.model.dir.paths}/VARIABLES.properties
flamy.env.model.hive.presets.path = ${flamy.model.dir.paths}/model_PRESETS.hql
flamy.env.local.hive.presets.path = ${flamy.model.dir.paths}/model_PRESETS.hql
flamy.env.local.hive.meta.fetcher.type = client
flamy.env.local.hive.metastore.uri = "thrift://localhost:9083"
flamy.env.local.hive.server.uri = "localhost:10000"
flamy.model.dir.paths 属性就是配置了model路径,示例中就是解析这些hql文件,生成对应的graph
tree model
-----------------------
model
├── model_PRESETS.hql
├── nasa
│ ├── facts.db
│ │ └── http_status
│ │ └── CREATE.hql
│ ├── nasa_access.db
│ │ ├── daily_logs
│ │ │ ├── CREATE.hql
│ │ │ └── POPULATE.hql
│ │ ├── daily_url_error_rates
│ │ │ ├── CREATE.hql
│ │ │ └── POPULATE.hql
│ │ ├── daily_urls
│ │ │ ├── CREATE.hql
│ │ │ └── POPULATE.hql
│ │ └── daily_urls_with_error
│ │ ├── CREATE.hql
│ │ └── POPULATE.hql
│ └── nasa_access_import.db
│ ├── daily_logs
│ │ ├── CREATE.hql
│ │ └── POPULATE.hql
│ └── raw_data
│ └── CREATE.hql
└── VARIABLES.properties
flamy的安装与配置可以参考flamy的文档
运行demo.sh
可以运行示例,并输入show graph
可以得到如下输出
这就是对model
文件夹下的hql之间的依赖关系图。
接下来就看看flamy是如何实现这个功能的。
flamy graph生成源码分析
先将flamy源码导入到idea中。
因为我们上面的运行入口是demo.sh
,所以先看看demo.sh
做了哪些工作
cat demo.sh
------------------------
#!/bin/bash
if [[ ! -f ${FLAMY_HOME}/bin/flamy ]]; then
echo 'Could not find ${FLAMY_HOME}/bin/flamy executable. Please make sure the environment variable FLAMY_HOME is correctly set.'
fi
$FLAMY_HOME/bin/flamy --config-file conf/flamy.properties shell
可以看到,他就是将配置文件传给了$FLAMY_HOME/bin/flamy
脚本,那就看看flamy又做了什么操作
cat $FLAMY_HOME/bin/flamy
-----------------------
#!/bin/bash
# Reference: http://stackoverflow.com/questions/59895/can-a-bash-script-tell-what-directory-its-stored-in
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
ARGS=("$@")
FLAMY_JAVA_OPTIONS="-XX:MaxPermSize=512M -XX:+CMSClassUnloadingEnabled"
RUN="java ${FLAMY_JAVA_OPTIONS} ${FLAMY_EXTRA_JAVA_OPTIONS} -cp $DIR/../lib/*:$DIR/../conf:$DIR/../additional_jars/* com.flaminem.flamy.Launcher"
exec ${RUN} "${ARGS[@]}"
flamy脚本配置了flamy运行时需要的jvm参数,和依赖jar包的拷贝,然后运行的main函数为com.flaminem.flamy.Launcher
,并讲参数都传给了它。所以,我们的源码入口为com.flaminem.flamy.Launcher
Launcher
object Launcher的main函数如下:
def main(args: Array[String]): Unit = {
// 初始化输出相关
FlamyOutput.init()
val returnStatus =
try {
// 以shell模式开启了一个Launcher class并启动
new Launcher(args, withShell = true).launch()
}
finally{
FlamyOutput.shutdown()
}
System.exit(returnStatus.exitCode)
}
函数内新建了一个Launcher class并将参数传给它启动。
def launch(globalOptions: FlamyGlobalOptions = new FlamyGlobalOptions()): ReturnStatus = {
/* This is necessary to avoid collisions with other running instances of flamy */
try {
// other code
unsafeLaunch(globalOptions)
}
// other code
}
Launcher.launch()方法调用了unsafeLaunch
方法,而unsafeLaunch
方法则是根据不同的Commands
进行不同的处理。
private def unsafeLaunch(rootGlobalOptions: FlamyGlobalOptions): ReturnStatus = {
if(opts.optException.isDefined) {
handleException(opts.optException.get)
}
/* This "if" cannot be moved inside the "match case" because opts.subcommands is not defined when optError is defined */
if(opts.optError.isDefined) {
opts.optError.get
}
else {
opts.subcommands match {
// other cases
case (command: FlamySubcommand) :: subCommands =>
val res: ReturnStatus = command.doCommand(rootGlobalOptions.overrideWith(opts.globalOptions), subCommands)
stats = command.stats.orNull
res
}
}
对于是FlamySubcommand
类型的command,直接调用doCommand
执行命令。
现在看看flamy定义了哪些Commands类型
class Commands(args: Seq[String]) extends Options(args) {
val show = new commands.Show
val diff = new commands.Diff
val describe = new commands.Describe
val check = new commands.Check
val run = new commands.Run
val push = new commands.Push
val repair = new commands.Repair
val count = new commands.Count
val waitForPartition = new commands.WaitForPartition
val gatherInfo = new commands.GatherInfo
}
因为show graph
是查看hql的graph关系的,所以我们只看commands.Show
查看com.flaminem.flamy.commands.Show
,它也定义一些子命令
val conf = new Subcommand("conf") ...
val schemas = new Subcommand("schemas") ...
val tables = new Subcommand("tables") ...
val partitions = new Subcommand("partitions") ...
val graph = new ShowGraph
val select = new Subcommand("select") ...
Show的doCommand方法如下:
override def doCommand(globalOptions: FlamyGlobalOptions, subCommands: List[ScallopConf]): ReturnStatus = {
subCommands match {
case (command: FlamySubcommand)::Nil => command.doCommand(globalOptions, Nil)
// other cases
}
}
也是直接调用了子命令的doCommand,所以接下来看ShowGraph.doCommand
所做的处理。
override def doCommand(globalOptions: FlamyGlobalOptions, subCommands: List[ScallopConf]): ReturnStatus = {
//初始化了flamy上下文
val context = new FlamyContext(globalOptions)
// 处理hql
showGraph(context)
ReturnSuccess
}
所以这个graph的处理就是在showGraph方法中
private def showGraph(context: FlamyContext): Unit = {
val model: Model =
if(complete()){
Model.getCompleteModel(context, Nil)
}
else{
Model.getIncompleteModel(context, Nil)
}
val g: TableGraph = TableGraph(model).applySkipViews.applyFilter
val graphDir: String = FlamyGlobalContext.RUN_DIR.getProperty + "/result"
context.getLocalFileSystem.fileSystem.mkdirs(new Path(graphDir))
val graphPath = makeGraphPath(graphDir, items())
val lightPath = s"${graphPath}_light.png"
val fullPath = s"$graphPath.png"
if(schemaOnly()){
g.export.toSchemaPng(graphPath)
println(
f"""graph printed at :
| ${FilePath(fullPath)}
""".stripMargin
)
openGraph(fullPath)
}
else {
g.export.toLightPng(graphPath + "_light")
g.export.toFullPng(graphPath)
println(
f"""graphs printed at :
| ${FilePath(fullPath)}
| ${FilePath(lightPath)}
""".stripMargin
)
openGraph(lightPath, fullPath)
}
}
从代码中可以看出来,showGraph有三个步骤:
- 获取hql的模型
- 根据模型获取TableGraph
- 输出graph
接下来逐步分析。
model的获取
以为第一次运行,model的获取走的是else,所以看Model.getIncompleteModel,源码如下
def getIncompleteModel(context: FlamyContext, items:Iterable[ItemName]): IncompleteModel = {
val index = context.getFileIndex.strictFilter(items).get
new IncompleteModelFactory(context).generateModel(index)
}
FlamyContext中定义了一个FileIndex的字段,用来获取配置的model.dir.paths下的文件位置
private lazy val fileIndex: FileIndex = FileIndex(this)
然后再看看FileIndex是如何解析的
object FileIndex {
def apply(context: FlamyContext): FileIndex = {
val index: Index = new Index()
// flamy.model.dir.paths配置的路径
val dbPaths = context.modelDirs
// dbPaths 所有文件
val files: FileList = FileUtils.listItemFiles(dbPaths)
val schemaFiles = new mutable.HashMap[SchemaName, SchemaFile]()
for {
// 遍历所有.db文件夹
dbDir: File <- listDatabaseDirectories(dbPaths)
} {
val schemaFile = new MissingSchemaFile(dbDir)
schemaFiles += schemaFile.schemaName -> schemaFile
}
for {
file <- files
fileType <- FileType.getTypeFromFileName(file.getName)
} {
if (TableFile.VALID_FILE_TYPES.contains(fileType)) {
val tableFile: TableFile = new ExistingTableFile(file, fileType)
index.addFile(tableFile)
}
else if (SchemaFile.VALID_FILE_TYPES.contains(fileType)) {
val schemaFile: SchemaFile = new ExistingSchemaFile(file)
schemaFiles += schemaFile.schemaName -> schemaFile
}
}
schemaFiles.values.foreach {
index.addFile
}
new FileIndex(index)
}
}
FileIndex就是根据model下的文件路径进行了解析,然后添加到FileIndex中。
flamy中运行定义的FileType
object FileType {
// CREATE.hql
case object CREATE extends FileType {
override val filePrefix: String = "CREATE"
override val fileExtension: String = "hql"
override val multipleFilesAllowed: Boolean = false
}
// VIEW.hql
case object VIEW extends FileType {
override val filePrefix: String = "VIEW"
override val fileExtension: String = "hql"
override val multipleFilesAllowed: Boolean = false
}
//POPULATE.hql
case object POPULATE extends FileType {
override val filePrefix: String = "POPULATE"
override val fileExtension: String = "hql"
override val multipleFilesAllowed: Boolean = true
}
//TEST.hql
case object TEST extends FileType {
override val filePrefix: String = "TEST"
override val fileExtension: String = "hql"
override val multipleFilesAllowed: Boolean = true
}
//META.properties
case object META extends FileType {
override val filePrefix: String = "META"
override val fileExtension: String = "properties"
override val multipleFilesAllowed: Boolean = false
}
//CREATE_SCHEMA.hql
case object CREATE_SCHEMA extends FileType {
override val filePrefix: String = "CREATE_SCHEMA"
override val fileExtension: String = "hql"
override val multipleFilesAllowed: Boolean = false
}
//PRESETS.hql
case object PRESETS extends FileType {
override val filePrefix: String = "PRESETS"
override val fileExtension: String = "hql"
override val multipleFilesAllowed: Boolean = false
}
}
在之前,我们看到了model文件夹下的hql文件有CREATE,POPULATE类型的。所以这里会被ExistingTableFile所封装。
因为getIncompleteModel
的items参数为nil,所以val index = context.getFileIndex.strictFilter(items).get
没有过滤。
然后就是调用IncompleteModelFactory.generateModel(index)
生成模型操作了。
// 生成模型
def generateModel(fileIndex: FileIndex): IncompleteModel = {
logger.info("Generating model")
mergeableTableInfoSet = MergeableTableInfoCollection()
//获取所有CREATE.hql文件
val creates = fileIndex.getAllTableFilesOfType(FileType.CREATE)
val views = fileIndex.getAllTableFilesOfType(FileType.VIEW)
// 获取所有POPULATE.hql文件
val populates = fileIndex.getAllTableFilesOfType(FileType.POPULATE)
// 获取所有META.properties文件
val metas = fileIndex.getAllTableFilesOfType(FileType.META)
// 解析create语句
runner.run(analyzeCreate(_: TableFile), creates)
// 解析查询语句
runner.run(analyzePopulate(_: TableFile), populates ++ views)
runner.run(analyzeMeta(_: TableFile), metas)
val stats: FileRunner#Stats = runner.getStats
FlamyOutput.out.info(stats.format("analyzed"))
if (stats.getFailCount > 0) {
throw new FlamyException("Interrupting command, some file were not validated.")
}
// 加入所有未被处理的hql文件
mergeableTableInfoSet ++= getMissingTables(fileIndex)
val result = new IncompleteModel(mergeableTableInfoSet.toTableInfoCollection, fileIndex)
logger.info("model generated")
result
}
generateModel先获取model文件夹下所有的hql文件,然后对不同类型的hql文件进行分析,这里先看analyzeCreate
以model\nasa\nasa_access_import.db\daily_logs\CREATE.hql
为例
-- DROP TABLE IF EXISTS nasa_access_import.daily_logs ;
CREATE TABLE nasa_access_import.daily_logs(
source_ip STRING,
source_url STRING,
time TIMESTAMP ,
action STRING,
url STRING,
protocol STRING,
response_code INT,
size INT,
line STRING
)
PARTITIONED BY (day STRING)
STORED AS ORC
;
analyzeCreate方法代码如下:
// 这里是获取create语句
private def analyzeCreate(tableFile: TableFile) {
//解析创建的table
val table: Table = CreateTableParser.parseText(tableFile.text)(context)
checkName(table, tableFile)
mergeableTableInfoSet += TableInfo(table)
}
首先调用CreateTableParser.parseText
解析create语句
CreateTableParser object内容如下:
object CreateTableParser {
@throws(classOf[SemanticException])
@throws(classOf[ParseException])
@throws(classOf[IOException])
def parseQuery(query: String)(implicit context: FlamyContext): Table = {
val cti: CreateTableInfo = new CreateTableInfo(context)
cti.parse(query)
}
@throws(classOf[SemanticException])
@throws(classOf[ParseException])
@throws(classOf[IOException])
def parseText(text: String)(implicit context: FlamyContext): Table = {
// 获取查询语句,用的是正则
val queries: Seq[String] = QueryUtils.cleanAndSplitQuery(text)
if (queries.size != 1) {
throw new RuntimeException("More than 1 query parsed")
}
parseQuery(queries.iterator.next)
}
}
只有两个函数:
- parseText 分割查询语句,删掉注释
- parseQuery 处理parseText获取的查询语句。
parseText只是将提供的文本进行了注释的去除和用正则表达式按;分割文本。
parseQuery就是处理单条查询语句了。在parseQuery语句中新建了一个CreateTableInfo对象。
CreateTableInfo实现了org.apache.hadoop.hive.ql.lib.NodeProcessor
接口,该接口是hive解析hql的操作接口。
/**
* Base class for processing operators which is no-op. The specific processors
* can register their own context with the dispatcher.
*/
public interface NodeProcessor {
Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException;
}
parseQuery调用了CreateTableInfo.parse方法
@throws(classOf[ParseException])
@throws(classOf[SemanticException])
@throws(classOf[IOException])
//noinspection ScalaStyle
def parse(query: String): Table = {
// 这里调用了hive提供的ParseDriver,进行句法分析
val pd: ParseDriver = new ParseDriver
// 获取语法树
val completeTree = pd.parse(query, ModelHiveContext.getLightContext(context).hiveContext)
var tree: ASTNode = completeTree
while ((tree.getToken == null) && (tree.getChildCount > 0)) {
tree = tree.getChild(0)
}
table = null
columns.clear()
partitions.clear()
val rules: util.Map[Rule, NodeProcessor] = new util.LinkedHashMap[Rule, NodeProcessor]
val disp: Dispatcher = new DefaultRuleDispatcher(this, rules, null)
val ogw: GraphWalker = new DefaultGraphWalker(disp)
val topNodes: util.ArrayList[Node] = new util.ArrayList[Node]
topNodes.add(tree)
ogw.startWalking(topNodes, null)
if (table == null) {
throw new UnexpectedBehaviorException("Could not parse this AST:" + HiveParserUtils.drawTree(completeTree))
}
table.columns = columns
table.partitions = partitions
table
}
在函数的第一行就初始化了org.apache.hadoop.hive.ql.parse.ParseDriver
,ParseDriver提供了语法树的解析生成功能,所以调用ParseDriver.parse可以获取当前查询语句的语法树。
获取到语法树后,就可以遍历语法树进行处理了。
之后又实例化了DefaultRuleDispatcher
、DefaultGraphWalker
,最后调用DefaultGraphWalker.startWalking
对语法进行遍历。
public void startWalking(Collection<Node> startNodes,
HashMap<Node, Object> nodeOutput) throws SemanticException {
toWalk.addAll(startNodes);
while (toWalk.size() > 0) {
Node nd = toWalk.remove(0);
walk(nd);
if (nodeOutput != null) {
nodeOutput.put(nd, retMap.get(nd));
}
}
}
对每个节点调用walk方法。
public void walk(Node nd) throws SemanticException {
if (opStack.empty() || nd != opStack.peek()) {
opStack.push(nd);
}
if ((nd.getChildren() == null)
|| getDispatchedList().containsAll(nd.getChildren())) {
// all children are done or no need to walk the children
if (!getDispatchedList().contains(nd)) {
dispatch(nd, opStack);
}
opStack.pop();
return;
}
// add children, self to the front of the queue in that order
getToWalk().add(0, nd);
getToWalk().removeAll(nd.getChildren());
getToWalk().addAll(0, nd.getChildren());
}
}
显示判断当前节点以及其子节点是否被遍历过,若子节点已经被遍历,自己没有被遍历,则调用dispatch进行处理。这里可以看出,DefaultGraphWalker的遍历方式是深度优先遍历。
public void dispatch(Node nd, Stack<Node> ndStack) throws SemanticException {
dispatchAndReturn(nd, ndStack);
}
public <T> T dispatchAndReturn(Node nd, Stack<Node> ndStack) throws SemanticException {
Object[] nodeOutputs = null;
if (nd.getChildren() != null) {
nodeOutputs = new Object[nd.getChildren().size()];
int i = 0;
for (Node child : nd.getChildren()) {
nodeOutputs[i++] = retMap.get(child);
}
}
Object retVal = dispatcher.dispatch(nd, ndStack, nodeOutputs);
retMap.put(nd, retVal);
return (T) retVal;
}
先获取当前节点的子节点,保存在nodeOutputs数组中,然后调用dispatcher.dispatch方法。这里的dispatcher是CreateTableInfo实例的引用,所以看CreateTableInfo的process方法。
@throws(classOf[SemanticException])
def process(pt: Node, stack: util.Stack[Node], procCtx: NodeProcessorCtx, nodeOutputs: AnyRef*): AnyRef = {
pt.getToken.getType match {
case HiveParser.TOK_CREATETABLE =>
table = HiveParserUtils.getTable(TableType.TABLE, pt.getChild(0))
case HiveParser.TOK_TABCOLLIST =>
pt.getParent.getType match {
case HiveParser.TOK_CREATETABLE =>
columns.addAll(HiveParserUtils.getColumns(pt))
case HiveParser.TOK_TABLEPARTCOLS =>
partitions.addAll(HiveParserUtils.getColumns(pt).map {
new PartitionKey(_)
})
case _ =>
}
case _ =>
}
null
}
可以看到,CreateTableInfo只对三种类型做了处理
- 类型为创建table的TOK_CREATETABLE
- 类型为TOK_TABCOLLIST
- TOK_TABLEPARTCOLS
最后将遍历的节点保存在retMap中。
所以在遍历中CreateTableInfo只保存了table, columns, partitions信息。
analyzeCreate的工作就是分析了CREATE.hql中创建table的语句
analyzePopulate函数就是分析了POPULATE.hql中查询的语句了。
这里看它具体是如何处理的。
private def analyzePopulate(tableFile: TableFile) {
// 获取所有源表与目标表的依赖关系
val tables: MergeableTableDependencyCollection = PopulatePreParser.parseText(tableFile.text)
mergeableTableInfoSet ++= tables.map{TableInfo(_, tableFile)}
tables.foreach{table => checkName(table, tableFile)}
}
可以看出,只要是获取查询语句各个table的依赖关系。调用PopulatePreParser.parseText
方法获取。
def parseText(text: String): MergeableTableDependencyCollection = {
// 获取所有查询语句
val queries: Seq[String] = QueryUtils.cleanAndSplitQuery(text).map{emptyStrings}
queries.flatMap{parseQuery}.toMergeableTableDependencyCollection
}
同样是先获取查询语句,然后再对每条查询语句调用parseQuery进行处理。
// 处理查询语句
private def parseQuery(query: String): MergeableTableDependencyCollection = {
// 获取Common Table Expression tables
val cteTables: Set[String] = findCTEs(query).map{_.toLowerCase}.toSet
logger.debug(s"CTE founds: $cteTables")
// 获取所有目标tables
val destTables =
getTableNamesFromRegex(destRE,query).flatMap{ nameToTableDep(_,TableType.TABLE,Set()) }++
getTableNamesFromRegex(viewRE,query).flatMap{ nameToTableDep(_,TableType.VIEW,Set()) }
// 获取所有source tables
val srcTables = findSourceTables(query).flatMap{ nameToTableDep(_,TableType.REF, cteTables) }
// 建立起依赖关系
destTables.foreach{ t => t.tableDeps = new TableDependencyCollection(srcTables)}
destTables.toMergeableTableDependencyCollection
}
显而易见,flamy的parseQuery先是获取destTables,srcTables,然后建立起他们之间的关系。
所有tables的获取都是通过正则表达式匹配查询语句来获取的。由TableDependency保存。
一个TableDependency保存了一个table所有的结构信息以及它的依赖信息。
得到tables,和tables之间的依赖信息,然后再实例化一个IncompleteModel对象并返回,就完成了generateModel的处理。
也及时说showGraph就完成了Model的获取,接下来就是根据model生成tableGraph。
TableGraph建立了tables之间的依赖图
def apply(model: Model): TableGraph = {
var graph: Graph[TableName, DiEdge] = Graph[TableName, DiEdge]()
model.tables.foreach{
td: TableInfo =>
graph += TableName(td.fullName)
graph ++= td.tableDeps.map{t => TableName(t.fullName) ~> TableName(td.fullName)}
}
val set1: Set[TableName] = model.fileIndex.getTableNames
val set2: Set[TableName] = graph.getNodeSeq.toSet
assert(set1.forall{set2.contains}, s"Please report a bug: the following tables are in the fileIndex and not in the graph: ${set1.diff(set2)}")
new TableGraph(model, graph)
}
然后就是输出graph,它是将graph转化为dot语言,然后根据Graphviz将dot转化为png图片文件。
flamy的show graph
命令的执行逻辑就完毕了。
附
通过hive api获取Lineage信息
通过对flamy获取表创建语句的表信息,是通过ParseDriver来进行解析的,这里简单写一个例子
/**
* @(#)MyLineageInfo.java, 2017/8/28.
* <p/>
* Copyright 2017 Netease, Inc. All rights reserved.
* NETEASE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*/
package org.jjzhu.hive;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
import java.util.TreeSet;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.lib.GraphWalker;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.ParseDriver;
import org.apache.hadoop.hive.ql.parse.ParseException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.jjzhu.hive.util.HiveParserUtil;
/**
* @author 祝佳俊(hzzhujiajun@corp.netease.com)
*/
public class MyLineageInfo implements NodeProcessor {
/**
* Stores input tables in sql.
*/
private TreeSet<String> inputTableList = new TreeSet<>();
/**
* Stores output tables in sql.
*/
private TreeSet<String> outputTableList = new TreeSet<>();
private Set<String> columns = new TreeSet<>();
private Set<String> partitions = new TreeSet<>();
public Set<String> getColumns() {
return columns;
}
public TreeSet getInputTableList() {
return inputTableList;
}
public TreeSet getOutputTableList() {
return outputTableList;
}
public Set<String> getPartitions() {
return partitions;
}
/**
* Implements the process method for the NodeProcessor interface.
*/
public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
ASTNode pt = (ASTNode) nd;
switch (pt.getToken().getType()) {
case HiveParser.TOK_CREATETABLE:
case HiveParser.TOK_TAB:
outputTableList.add(BaseSemanticAnalyzer.getUnescapedName((ASTNode) pt.getChild(0)));
break;
case HiveParser.TOK_TABCOLLIST:
switch (pt.getParent().getType()){
case HiveParser.TOK_CREATETABLE:
columns.addAll(HiveParserUtil.getColumns(pt));
break;
case HiveParser.TOK_TABLEPARTCOLS:
partitions.addAll(HiveParserUtil.getColumns(pt));
break;
}
break;
case HiveParser.TOK_TABREF:
ASTNode tabTree = (ASTNode) pt.getChild(0);
String table_name = (tabTree.getChildCount() == 1) ?
BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0)) :
BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0)) + "." + tabTree.getChild(1);
inputTableList.add(table_name);
break;
}
return null;
}
/**
* parses given query and gets the lineage info.
*
* @param query 查询语句
* @throws ParseException exception
*/
public void getLineageInfo(String query) throws ParseException,
SemanticException {
/*
* Get the AST tree
*/
ParseDriver pd = new ParseDriver();
ASTNode tree = pd.parse(query);
while ((tree.getToken() == null) && (tree.getChildCount() > 0)) {
tree = (ASTNode) tree.getChild(0);
}
/*
* initialize Event Processor and dispatcher.
*/
inputTableList.clear();
outputTableList.clear();
partitions.clear();
columns.clear();
// create a walker which walks the tree in a DFS manner while maintaining
// the operator stack. The dispatcher
// generates the plan from the operator tree
Map<Rule, NodeProcessor> rules = new LinkedHashMap<>();
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(this, rules, null);
GraphWalker ogw = new DefaultGraphWalker(disp);
// Create a list of topop nodes
ArrayList<Node> topNodes = new ArrayList<>();
topNodes.add(tree);
ogw.startWalking(topNodes, null);
}
public static void main(String[] args) throws IOException, ParseException,
SemanticException {
String query = "INSERT OVERWRITE TABLE hzzhujiajun.table1 SELECT a.name FROM hzzhujiajun.table2 a join hzzhujiajun.table3 b ON (a.id = b.id)";
String query2 = "CREATE EXTERNAL TABLE facts.http_status (\n" +
" code INT,\n" +
" status_group STRING,\n" +
" message STRING,\n" +
" description STRING\n" +
")\n" +
"ROW FORMAT DELIMITED \n" +
"FIELDS TERMINATED BY '\\t'\n" +
"LINES TERMINATED BY '\\n'\n" +
"STORED AS TEXTFILE\n" +
"LOCATION \"${EXTERNAL_DATA_LOCATION}/facts.db/http_status\"\n";
String query3 = "CREATE TABLE nasa_access.daily_logs(\n" +
" source_ip STRING,\n" +
" source_url STRING,\n" +
" time TIMESTAMP ,\n" +
" action STRING,\n" +
" url STRING,\n" +
" size INT,\n" +
" line STRING\n" +
")\n" +
"PARTITIONED BY (day STRING)\n" +
"STORED AS ORC";
MyLineageInfo lep = new MyLineageInfo();
lep.getLineageInfo(query);
HiveParserUtil.output(lep);
System.out.println("------------------------------------------");
lep.getLineageInfo(query2);
HiveParserUtil.output(lep);
System.out.println("------------------------------------------");
lep.getLineageInfo(query3);
HiveParserUtil.output(lep);
}
}
运行得到的输出
Input tables = [hzzhujiajun.table2, hzzhujiajun.table3]
Output tables = [hzzhujiajun.table1]
columns = []
partitions columns:[]
------------------------------------------
Input tables = []
Output tables = [facts.http_status]
columns = [code, description, message, status_group]
partitions columns:[]
------------------------------------------
Input tables = []
Output tables = [nasa_access.daily_logs]
columns = [action, line, size, source_ip, source_url, time, url]
partitions columns:[day]