在前文中,我们知道命令的执行最终是由命令处理器CommandProcessor的run方法来完成的,命令处理器的构建采用设计模式中的工厂模式。
命令处理器主要分为两类,我们称之为Driver处理器和非Driver处理器。
非Driver命令处理器
非Driver处理器在org.apache.hadoop.hive.ql.processors
包下。
我们先从HiveCommand枚举类的find方法入手。
HiveCommand
public enum HiveCommand {
SET(),
RESET(),
DFS(),
CRYPTO(true),
ERASURE(true),
ADD(),
LIST(),
LLAP_CLUSTER(),
LLAP_CACHE(),
RELOAD(),
DELETE(),
COMPILE();
...
}
我们注意到,最常见的select
方法并不在HiveCommand枚举中。
find方法
处理原则:
- role开头的命令不处理
- from开头的命令不处理
- set autocommit命令不处理
- llap 开头的命令交给LlapSubCommand处理
- 如果是非测试模式,且包好在枚举值范围内,返回对应的枚举命令。
public static HiveCommand find(String[] command, boolean findOnlyForTesting) {
if (null == command){
return null;
}
String cmd = command[0];
if (cmd != null) {
cmd = cmd.trim().toUpperCase();
if (command.length > 1 && "role".equalsIgnoreCase(command[1])) {
// special handling for set role r1 statement
return null;
} else if(command.length > 1 && "from".equalsIgnoreCase(command[1])) {
//special handling for SQL "delete from <table> where..."
return null;
} else if(command.length > 1 && "set".equalsIgnoreCase(command[0]) && "autocommit".equalsIgnoreCase(command[1])) {
return null;//don't want set autocommit true|false to get mixed with set hive.foo.bar...
} else if (command.length > 1 && "llap".equalsIgnoreCase(command[0])) {
return getLlapSubCommand(command);
} else if (COMMANDS.contains(cmd)) {
HiveCommand hiveCommand = HiveCommand.valueOf(cmd);
if (findOnlyForTesting == hiveCommand.isOnlyForTesting()) {
return hiveCommand;
}
return null;
}
}
return null;
}
...
private static HiveCommand getLlapSubCommand(final String[] command) {
if ("cluster".equalsIgnoreCase(command[1])) {
return LLAP_CLUSTER;
} else if ("cache".equalsIgnoreCase(command[1])) {
return LLAP_CACHE;
} else {
return null;
}
}
可以看出,find方法只处理HiveCommand枚举中非测试模式的命令。了解了HiveCommand枚举类之后,我们再来看命令处理器工厂类会更容易理解得多了。
非Driver命令处理器工厂类(CommandProcessorFactory)
在工厂类中,先通过HiveCommand枚举的find方法获取合法的命令枚举类,然后通过不同的命令枚举,构建相应的命令处理器。
这里我们说是合法的命令枚举类,有两层含义:一是没有匹配的枚举类,find方法返回null;另一种是,虽然获取到了命令枚举类,但不在我们的命令白名单内。
hive执行命令权限白名单。我们可以通过属性
hive.security.command.whitelist
设置可以允许执行的命令白名单,如白名单默认值:HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist",
"set,reset,dfs,add,list,delete,reload,compile,llap",
"Comma separated list of non-SQL Hive commands users are authorized to execute")
当然我们可以通过覆盖该属性来改变白名单。
这里的处理逻辑在getForHiveCommandInternal方法中。
public static CommandProcessor getForHiveCommandInternal(String[] cmd, HiveConf conf,
boolean testOnly)
throws SQLException {
HiveCommand hiveCommand = HiveCommand.find(cmd, testOnly);
if (hiveCommand == null || isBlank(cmd[0])) {
return null;
}
if (conf == null) {
conf = new HiveConf();
}
Set<String> availableCommands = new HashSet<String>();
for (String availableCommand : conf.getVar(HiveConf.ConfVars.HIVE_SECURITY_COMMAND_WHITELIST)
.split(",")) {
availableCommands.add(availableCommand.toLowerCase().trim());
}
if (!availableCommands.contains(cmd[0].trim().toLowerCase())) {
throw new SQLException("Insufficient privileges to execute " + cmd[0], "42000");
}
if (cmd.length > 1 && "reload".equalsIgnoreCase(cmd[0])
&& "function".equalsIgnoreCase(cmd[1])) {
// special handling for SQL "reload function"
return null;
}
switch (hiveCommand) {
case SET:
return new SetProcessor();
case RESET:
return new ResetProcessor();
case DFS:
SessionState ss = SessionState.get();
return new DfsProcessor(ss.getConf());
case ADD:
return new AddResourceProcessor();
case LIST:
return new ListResourceProcessor();
case LLAP_CLUSTER:
return new LlapClusterResourceProcessor();
case LLAP_CACHE:
return new LlapCacheResourceProcessor();
case DELETE:
return new DeleteResourceProcessor();
case COMPILE:
return new CompileProcessor();
case RELOAD:
return new ReloadProcessor();
case CRYPTO:
try {
return new CryptoProcessor(SessionState.get().getHdfsEncryptionShim(), conf);
} catch (HiveException e) {
throw new SQLException("Fail to start the command processor due to the exception: ", e);
}
case ERASURE:
try {
return new ErasureProcessor(conf);
} catch (IOException e) {
throw new SQLException("Fail to start the erasure command processor due to the exception: ", e);
}
default:
throw new AssertionError("Unknown HiveCommand " + hiveCommand);
}
}
不管是那种命令处理器,其核心入口方法都是run方法,由于非Driver命令处理器的种类太多,这里就不展开分析各自的run方法了。
我们可以简单看看set
命令的处理器的实现。
SetProcessor
- 如果set命令后为空,则会更新HiveConf。这里可以使新修改的配置生效。
- 如果是-v,且引擎(HIVE_EXECUTION_ENGINE)是tez,则通过
org.apache.tez.dag.api.TezConfiguration
特殊处理。否则更新HiveConf。 - 如果是time-zone,则对命令要特殊处理。
- 对等号的键值对拆分,并赋值。
最后的赋值方法是:setVaribale。这里不展开分析了。
@Override
public CommandProcessorResponse run(String command) {
SessionState ss = SessionState.get();
String nwcmd = command.trim();
if (nwcmd.equals("")) {
dumpOptions(ss.getConf().getChangedProperties());
return createProcessorSuccessResponse();
}
if (nwcmd.equals("-v")) {
Properties properties = null;
if (ss.getConf().getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
Class<?> clazz;
try {
clazz = Class.forName("org.apache.tez.dag.api.TezConfiguration");
Configuration tezConf =
(Configuration) clazz.getConstructor(Configuration.class).newInstance(ss.getConf());
properties = HiveConf.getProperties(tezConf);
} catch (Exception e) {
return new CommandProcessorResponse(1, e.getMessage(), "42000", e);
}
} else {
properties = ss.getConf().getAllProperties();
}
dumpOptions(properties);
return createProcessorSuccessResponse();
}
// Special handling for time-zone
Matcher matcher = TIME_ZONE_PATTERN.matcher(nwcmd);
if (matcher.find()) {
nwcmd = HiveConf.ConfVars.HIVE_LOCAL_TIME_ZONE.varname + "=" + nwcmd.substring(matcher.end());
}
String[] part = new String[2];
int eqIndex = nwcmd.indexOf('=');
if (nwcmd.contains("=")){
if (eqIndex == nwcmd.length() - 1) { //x=
part[0] = nwcmd.substring(0, nwcmd.length() - 1);
part[1] = "";
} else { //x=y
part[0] = nwcmd.substring(0, eqIndex).trim();
part[1] = nwcmd.substring(eqIndex + 1).trim();
}
if (part[0].equals("silent")) {
ss.setIsSilent(getBoolean(part[1]));
return new CommandProcessorResponse(0);
}
return executeSetVariable(part[0],part[1]);
}
try {
return getVariable(nwcmd);
} catch (Exception e) {
return new CommandProcessorResponse(1, e.getMessage(), "42000", e);
}
}
Driver处理器
Driver处理器在org.apache.hadoop.hive.ql
包下。
Driver处理器实现了IDriver接口,IDriver接口继承自CommandProcessor。从前面我们知道,入口方法是run方法。
Driver命令处理器工厂类(DriverFactory)
这里注意到IDriver接口的实现类有两个:Driver和ReExecDriver。从字面意思理解,一个是直接执行,一个重新执行,具体还得在后面深入之后才能明白真正的含义(初步推测可能类似于mysql数据库的缓存机制),暂且不管他。
入口函数
入口函数newDriver有两个重载,一个是一个参数HiveConf,一个是三个参数:
- QueryState:查询状态
- userName:用户
- QueryInfo:查询信息
从调用链return DriverFactory.newDriver(conf);
可知,这里调用的是一个参数的newDriver方法。
public static IDriver newDriver(HiveConf conf) {
return newDriver(getNewQueryState(conf), null, null);
}
可以看出,只传入了QueryState,其他两个参数传了null;所以重点在QueryState。
QueryState是如何构造的呢?
通过方法getNewQueryState方法创建。最终是依靠内部类Builder的build方法创建。
private static QueryState getNewQueryState(HiveConf conf) {
return new QueryState.Builder().withGenerateNewQueryId(true).withHiveConf(conf).build();
}
Builder的builder方法:
- 隔离查询配置(防止配置污染)。默认情况下是弃用配置隔离的,除非是在测试的时候。
- 如果设置了特定的参数,则将特定参数添加到HiveConf中。
- 如果需要构建新的查询ID,则构建一个新的查询ID。
- 创建QueryState。
- 如果存在血缘状态,则设置血缘状态。
public QueryState build() {
HiveConf queryConf;
if (isolated) {
// isolate query conf
if (hiveConf == null) {
queryConf = new HiveConf();
} else {
queryConf = new HiveConf(hiveConf);
}
} else {
queryConf = hiveConf;
}
// Set the specific parameters if needed
if (confOverlay != null && !confOverlay.isEmpty()) {
// apply overlay query specific settings, if any
for (Map.Entry<String, String> confEntry : confOverlay.entrySet()) {
try {
queryConf.verifyAndSet(confEntry.getKey(), confEntry.getValue());
} catch (IllegalArgumentException e) {
throw new RuntimeException("Error applying statement specific settings", e);
}
}
}
// Generate the new queryId if needed
if (generateNewQueryId) {
String queryId = QueryPlan.makeQueryId();
queryConf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId);
setApplicationTag(queryConf, queryId);
// FIXME: druid storage handler relies on query.id to maintain some staging directories
// expose queryid to session level
if (hiveConf != null) {
hiveConf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId);
}
}
QueryState queryState = new QueryState(queryConf);
if (lineageState != null) {
queryState.setLineageState(lineageState);
}
return queryState;
}
真正创建IDriver
这里有个参数hive.query.reexecution.enabled
,默认情况下系统设置为true。这个参数到底什么含义,还期望后面的学习中能够解答。
public static IDriver newDriver(QueryState queryState, String userName, QueryInfo queryInfo) {
boolean enabled = queryState.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ENABLED);
if (!enabled) {
return new Driver(queryState, userName, queryInfo);
}
String strategies = queryState.getConf().getVar(ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES);
strategies = Strings.nullToEmpty(strategies).trim().toLowerCase();
ArrayList<IReExecutionPlugin> plugins = new ArrayList<>();
for (String string : strategies.split(",")) {
if (string.trim().isEmpty()) {
continue;
}
plugins.add(buildReExecPlugin(string));
}
return new ReExecDriver(queryState, userName, queryInfo, plugins);
}
Driver的run方法(敲黑板,划重点)
其实在run方法中主要是runInternal方法,其他大篇幅的都是异常处理。所以,我们的关注点又转移到了runInternal方法上来。
public CommandProcessorResponse run(String command) {
return run(command, false);
}
public CommandProcessorResponse run(String command, boolean alreadyCompiled) {
try {
runInternal(command, alreadyCompiled);
return createProcessorResponse(0);
} catch (CommandProcessorResponse cpr) {
//这里都是很大篇幅的异常处理
...
}
}
runInternal方法
这里主要逻辑如下:
- 如果已经编译了,则修改Driver的状态为执行,否则修改状态为编译中。
- 获取HiveDriver上下文的钩子。
- 获取所有的运行钩子,并且预处理他们。
- 如果没有编译,则进行编译(编译也是个复杂的过程,我们在后面着重分析)。编译的入口在
compileInternal
方法。 - 最后调用执行方法
execute
execute方法
主要逻辑如下:
- 设置driver的状态为执行中
- 从执行计划(plan)中拿出所有的task,并执行。其中是while循环来实现的。其中包括了异常task的处理等。
总结
走读到这里,我们已经发现整个流程已经完成了。这里有很复杂的编译sql并获得MR的task的过程。这里并没有展开讨论。下面我们着重看一下编译到底经历了什么样的蜕变。
细心的话,发现上面描述地很粗,编译,执行都一笔带过,其实,这和其复杂性息息相关,都需要单独展开来分析。之前我们在说hive架构的时候,在编译模块中,我们有转化(Parser),执行计划(Plan),语义分析(SematicAnalyzer)等。
编译的方法入口是compileInternal
,最后调用了compile
方法。
private void compileInternal(String command, boolean deferClose) throws CommandProcessorResponse {
...
try {
compile(command, true, deferClose);
} catch (CommandProcessorResponse cpr) {
...
throw cpr;
}
...
}
}
如何发现SQL转化器(Parser)的调用踪迹?
之前框架分析过,SQL转化器(Parser)是在编译器中调用的。这里我们发现将SQL转化为了树(ASTNode tree
)。
下面的代码在Driver类的617行开始,当然不同版本的源码可能行号不同。但只要记住是在compile方法中即可。
...
// 下面的代码在Driver类的617行开始,当然不同版本的源码可能行号不同。但只要记住是在compile方法中即可。
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE);
// Trigger query hook before compilation
hookRunner.runBeforeParseHook(command);
ASTNode tree;
try {
tree = ParseUtils.parse(command, ctx);
} catch (ParseException e) {
parseError = true;
throw e;
} finally {
hookRunner.runAfterParseHook(command, parseError);
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);
如何发现语义分析(SemanticAnalyzer)的调用踪迹?
转化后,便紧挨着是语义分析。这里都不展开分析了,在后面开辟专题来分析这一块的逻辑。
下面的代码在Driver类的633行开始,当然不同版本的源码可能行号不同。但只要记住是在compile方法中即可。
hookRunner.runBeforeCompileHook(command);
// clear CurrentFunctionsInUse set, to capture new set of functions
// that SemanticAnalyzer finds are in use
SessionState.get().getCurrentFunctionsInUse().clear();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE);
// Flush the metastore cache. This assures that we don't pick up objects from a previous
// query running in this same thread. This has to be done after we get our semantic
// analyzer (this is when the connection to the metastore is made) but before we analyze,
// because at that point we need access to the objects.
Hive.get().getMSC().flushCache();
backupContext = new Context(ctx);
boolean executeHooks = hookRunner.hasPreAnalyzeHooks();
HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
if (executeHooks) {
hookCtx.setConf(conf);
hookCtx.setUserName(userName);
hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
hookCtx.setCommand(command);
hookCtx.setHiveOperation(queryState.getHiveOperation());
tree = hookRunner.runPreAnalyzeHooks(hookCtx, tree);
}
// Do semantic analysis and plan generation
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
if (!retrial) {
if ((queryState.getHiveOperation() != null)
&& queryState.getHiveOperation().equals(HiveOperation.REPLDUMP)) {
setLastReplIdForDump(queryState.getConf());
}
openTransaction();
generateValidTxnList();
}
sem.analyze(tree, ctx);
if (executeHooks) {
hookCtx.update(sem);
hookRunner.runPostAnalyzeHooks(hookCtx, sem.getAllRootTasks());
}
LOG.info("Semantic Analysis Completed (retrial = {})", retrial);
如何发现查询计划(Plan)和优化器(Optimizer)的调用踪迹?
下面的代码在Driver类的685行开始,当然不同版本的源码可能行号不同。但只要记住是在compile方法中即可。
// validate the plan
sem.validate();
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
checkInterrupted("after analyzing query.", null, null);
// get the output schema
schema = getSchema(sem, conf);
plan = new QueryPlan(queryStr, sem, queryDisplay.getQueryStartTime(), queryId,
queryState.getHiveOperation(), schema);
// save the optimized plan and sql for the explain
plan.setOptimizedCBOPlan(ctx.getCalcitePlan());
plan.setOptimizedQueryString(ctx.getOptimizedSql());
conf.set("mapreduce.workflow.id", "hive_" + queryId);
conf.set("mapreduce.workflow.name", queryStr);
至此,我们对编译的框架流程已经有了一个整体的还有点模糊的印象。具体HiveQL是怎么转化的为MR的,其实快要拨开迷雾了,但是看源码真心好痛苦。