模板概述
基于 Antlr4 编译 hive 相关 xxx.g 文件生成对应的模板,如 hive 源码中:
编译完成生成对应 *.java 文件,Antlr4 详见:Antlr4
解析流程
Parser
重点:获取SELECT操作中的表和列的相关操作。其他操作这判断到字段级别。
- 实现思路:对AST深度优先遍历,遇到操作的token则判断当前的操作,遇到子句则压栈当前处理,处理子句。子句处理完,栈弹出。
- 处理字句的过程中,遇到子查询就保存当前子查询的信息,判断与其父查询的关系,最终形成树形结构;
- 遇到字段或者条件处理则记录当前的字段和条件信息、组成Block,嵌套调用。
TableBlood
重点:TableBlood 主要包含 HiveTableNode 与 HiveTableEdge
-
获取节点信息:
-
获取边缘信息:
TableFields
核心逻辑处理过程:重点:来源字段与目标字段依赖
FieldBloodByTable
重点:来源字段与目标字段依赖;获取血缘关系节点和边缘;
-
字段血缘
-
节点与边缘血缘
使用案例
本项目采用 SQL 通过 spark-submit 引入方式
整体架构
SQL 用例
insert overwrite table temp.d1 select t1.id, t1.name, t2.age,t1.age+t2.age as age from temp.c1 t1 join temp.c2 t2 on t1.id = t2.id;
表依赖获取
TableBlood tableBlood = bloodEngine.getTableBlood(Arrays.asList(hqls));
printJsonString(tableBlood);
SqlSession sqlSession = MybatisSessionFactory.getSqlSessionFactory(PROPERTIES_FILE).openSession();
TableDependencyMapper mapper = sqlSession.getMapper(TableDependencyMapper.class);
List<TableDependencyDO> tableDependencyList = new ArrayList<>();
tableBlood.getEdges().forEach(edge -> {
TableDependencyDO tableDependencyDO = new TableDependencyDO();
tableDependencyDO.setDb(edge.getTarget().getDbName());
tableDependencyDO.setTname(edge.getTarget().getTableName());
tableDependencyDO.setParentDb(edge.getSource().getDbName());
tableDependencyDO.setParentTname(edge.getSource().getTableName());
tableDependencyDO.setExpr("");
tableDependencyDO.setPtype("");
tableDependencyDO.setCreateTime(new Date());
tableDependencyList.add(tableDependencyDO);
});
try {
mapper.batchInsert(tableDependencyList);
sqlSession.commit();
} catch (Throwable e) {
//回滚事务
sqlSession.rollback();
System.out.println("MysqlSinkFunction cause Exception,sqlSession transaction rollback..." + e.getStackTrace());
} finally {
sqlSession.close();
}
结果
获取字段依赖
FieldBlood fieldBlood = bloodEngine.getFieldBloodByTable(Arrays.asList(hqls), new HiveTable("temp", "d1"));
printJsonString(fieldBlood);
SqlSession sqlSession = MybatisSessionFactory.getSqlSessionFactory(PROPERTIES_FILE).openSession();
TableColumnDependencyMapper mapper = sqlSession.getMapper(TableColumnDependencyMapper.class);
List<TableColumnDependencyDO> tableColumnDependencyList = new ArrayList<>();
Set<FieldBloodTree> bloodTrees = new HashSet<>(fieldBlood.values());
getFieldBloodGraph(tableColumnDependencyList, bloodTrees, null);
List<TableColumnDependencyDO> suitableTableColumnDependencyList = tableColumnDependencyList.stream().filter(item -> item.getDb() != null).collect(Collectors.toList());
try {
mapper.batchInsert(suitableTableColumnDependencyList);
sqlSession.commit();
} catch (Throwable e) {
//回滚事务
sqlSession.rollback();
e.getStackTrace();
} finally {
sqlSession.close();
}
结果
下载地址
链接: https://pan.baidu.com/s/1vrQ1vEEI4RxST6Wqlcju0w 提取码: txkd 复制这段内容后打开百度网盘手机App,操作更方便哦