Hive源码解读(4)命令处理器(CommandProcessor)

在前文中,我们知道命令的执行最终是由命令处理器CommandProcessor的run方法来完成的,命令处理器的构建采用设计模式中的工厂模式。
命令处理器主要分为两类,我们称之为Driver处理器和非Driver处理器。

非Driver命令处理器

非Driver处理器在org.apache.hadoop.hive.ql.processors包下。

image.png

我们先从HiveCommand枚举类的find方法入手。

HiveCommand

public enum HiveCommand {
  SET(),
  RESET(),
  DFS(),
  CRYPTO(true),
  ERASURE(true),
  ADD(),
  LIST(),
  LLAP_CLUSTER(),
  LLAP_CACHE(),
  RELOAD(),
  DELETE(),
  COMPILE();
  ...
}

我们注意到,最常见的select方法并不在HiveCommand枚举中。

find方法

处理原则:

  • role开头的命令不处理
  • from开头的命令不处理
  • set autocommit命令不处理
  • llap 开头的命令交给LlapSubCommand处理
  • 如果是非测试模式,且包好在枚举值范围内,返回对应的枚举命令。
public static HiveCommand find(String[] command, boolean findOnlyForTesting) {
    if (null == command){
      return null;
    }
    String cmd = command[0];
    if (cmd != null) {
      cmd = cmd.trim().toUpperCase();
      if (command.length > 1 && "role".equalsIgnoreCase(command[1])) {
        // special handling for set role r1 statement
        return null;
      } else if(command.length > 1 && "from".equalsIgnoreCase(command[1])) {
        //special handling for SQL "delete from <table> where..."
        return null;
      } else if(command.length > 1 && "set".equalsIgnoreCase(command[0]) && "autocommit".equalsIgnoreCase(command[1])) {
        return null;//don't want set autocommit true|false to get mixed with set hive.foo.bar...
      } else if (command.length > 1 && "llap".equalsIgnoreCase(command[0])) {
        return getLlapSubCommand(command);
      } else if (COMMANDS.contains(cmd)) {
        HiveCommand hiveCommand = HiveCommand.valueOf(cmd);

        if (findOnlyForTesting == hiveCommand.isOnlyForTesting()) {
          return hiveCommand;
        }

        return null;
      }
    }
    return null;
  }

...

private static HiveCommand getLlapSubCommand(final String[] command) {
    if ("cluster".equalsIgnoreCase(command[1])) {
      return LLAP_CLUSTER;
    } else if ("cache".equalsIgnoreCase(command[1])) {
      return LLAP_CACHE;
    } else {
      return null;
    }
  }

可以看出,find方法只处理HiveCommand枚举中非测试模式的命令。了解了HiveCommand枚举类之后,我们再来看命令处理器工厂类会更容易理解得多了。

非Driver命令处理器工厂类(CommandProcessorFactory)

在工厂类中,先通过HiveCommand枚举的find方法获取合法的命令枚举类,然后通过不同的命令枚举,构建相应的命令处理器。
这里我们说是合法的命令枚举类,有两层含义:一是没有匹配的枚举类,find方法返回null;另一种是,虽然获取到了命令枚举类,但不在我们的命令白名单内。

hive执行命令权限白名单。我们可以通过属性hive.security.command.whitelist设置可以允许执行的命令白名单,如白名单默认值:HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist",

  "set,reset,dfs,add,list,delete,reload,compile,llap",
    "Comma separated list of non-SQL Hive commands users are authorized to execute")

当然我们可以通过覆盖该属性来改变白名单。

这里的处理逻辑在getForHiveCommandInternal方法中。

public static CommandProcessor getForHiveCommandInternal(String[] cmd, HiveConf conf,
                                                           boolean testOnly)
    throws SQLException {
    HiveCommand hiveCommand = HiveCommand.find(cmd, testOnly);
    if (hiveCommand == null || isBlank(cmd[0])) {
      return null;
    }
    if (conf == null) {
      conf = new HiveConf();
    }
    Set<String> availableCommands = new HashSet<String>();
    for (String availableCommand : conf.getVar(HiveConf.ConfVars.HIVE_SECURITY_COMMAND_WHITELIST)
      .split(",")) {
      availableCommands.add(availableCommand.toLowerCase().trim());
    }
    if (!availableCommands.contains(cmd[0].trim().toLowerCase())) {
      throw new SQLException("Insufficient privileges to execute " + cmd[0], "42000");
    }
    if (cmd.length > 1 && "reload".equalsIgnoreCase(cmd[0])
      && "function".equalsIgnoreCase(cmd[1])) {
      // special handling for SQL "reload function"
      return null;
    }
    switch (hiveCommand) {
    case SET:
      return new SetProcessor();
    case RESET:
      return new ResetProcessor();
    case DFS:
      SessionState ss = SessionState.get();
      return new DfsProcessor(ss.getConf());
    case ADD:
      return new AddResourceProcessor();
    case LIST:
      return new ListResourceProcessor();
    case LLAP_CLUSTER:
      return new LlapClusterResourceProcessor();
    case LLAP_CACHE:
      return new LlapCacheResourceProcessor();
    case DELETE:
      return new DeleteResourceProcessor();
    case COMPILE:
      return new CompileProcessor();
    case RELOAD:
      return new ReloadProcessor();
    case CRYPTO:
      try {
        return new CryptoProcessor(SessionState.get().getHdfsEncryptionShim(), conf);
      } catch (HiveException e) {
        throw new SQLException("Fail to start the command processor due to the exception: ", e);
      }
    case ERASURE:
      try {
        return new ErasureProcessor(conf);
      } catch (IOException e) {
        throw new SQLException("Fail to start the erasure command processor due to the exception: ", e);
      }
    default:
      throw new AssertionError("Unknown HiveCommand " + hiveCommand);
    }
  }

不管是那种命令处理器,其核心入口方法都是run方法,由于非Driver命令处理器的种类太多,这里就不展开分析各自的run方法了。
我们可以简单看看set命令的处理器的实现。

SetProcessor
  • 如果set命令后为空,则会更新HiveConf。这里可以使新修改的配置生效。
  • 如果是-v,且引擎(HIVE_EXECUTION_ENGINE)是tez,则通过org.apache.tez.dag.api.TezConfiguration特殊处理。否则更新HiveConf。
  • 如果是time-zone,则对命令要特殊处理。
  • 对等号的键值对拆分,并赋值。

最后的赋值方法是:setVaribale。这里不展开分析了。

  @Override
  public CommandProcessorResponse run(String command) {
    SessionState ss = SessionState.get();

    String nwcmd = command.trim();
    if (nwcmd.equals("")) {
      dumpOptions(ss.getConf().getChangedProperties());
      return createProcessorSuccessResponse();
    }

    if (nwcmd.equals("-v")) {
      Properties properties = null;
      if (ss.getConf().getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
        Class<?> clazz;
        try {
          clazz = Class.forName("org.apache.tez.dag.api.TezConfiguration");

          Configuration tezConf =
              (Configuration) clazz.getConstructor(Configuration.class).newInstance(ss.getConf());
          properties = HiveConf.getProperties(tezConf);
        } catch (Exception e) {
          return new CommandProcessorResponse(1, e.getMessage(), "42000", e);
        }
      } else {
        properties = ss.getConf().getAllProperties();
      }
      dumpOptions(properties);
      return createProcessorSuccessResponse();
    }

    // Special handling for time-zone
    Matcher matcher = TIME_ZONE_PATTERN.matcher(nwcmd);
    if (matcher.find()) {
      nwcmd = HiveConf.ConfVars.HIVE_LOCAL_TIME_ZONE.varname + "=" + nwcmd.substring(matcher.end());
    }

    String[] part = new String[2];
    int eqIndex = nwcmd.indexOf('=');

    if (nwcmd.contains("=")){
      if (eqIndex == nwcmd.length() - 1) { //x=
        part[0] = nwcmd.substring(0, nwcmd.length() - 1);
        part[1] = "";
      } else { //x=y
        part[0] = nwcmd.substring(0, eqIndex).trim();
        part[1] = nwcmd.substring(eqIndex + 1).trim();
      }
      if (part[0].equals("silent")) {
        ss.setIsSilent(getBoolean(part[1]));
        return new CommandProcessorResponse(0);
      }
      return executeSetVariable(part[0],part[1]);
    }
    try {
      return getVariable(nwcmd);
    } catch (Exception e) {
      return new CommandProcessorResponse(1, e.getMessage(), "42000", e);
    }
  }

Driver处理器

Driver处理器在org.apache.hadoop.hive.ql包下。

image.png

Driver处理器实现了IDriver接口,IDriver接口继承自CommandProcessor。从前面我们知道,入口方法是run方法。

Driver命令处理器工厂类(DriverFactory)

这里注意到IDriver接口的实现类有两个:Driver和ReExecDriver。从字面意思理解,一个是直接执行,一个重新执行,具体还得在后面深入之后才能明白真正的含义(初步推测可能类似于mysql数据库的缓存机制),暂且不管他。

入口函数

入口函数newDriver有两个重载,一个是一个参数HiveConf,一个是三个参数:

  • QueryState:查询状态
  • userName:用户
  • QueryInfo:查询信息

从调用链return DriverFactory.newDriver(conf);可知,这里调用的是一个参数的newDriver方法。

public static IDriver newDriver(HiveConf conf) {
    return newDriver(getNewQueryState(conf), null, null);
  }

可以看出,只传入了QueryState,其他两个参数传了null;所以重点在QueryState。

QueryState是如何构造的呢?

通过方法getNewQueryState方法创建。最终是依靠内部类Builder的build方法创建。

private static QueryState getNewQueryState(HiveConf conf) {
    return new QueryState.Builder().withGenerateNewQueryId(true).withHiveConf(conf).build();
  }

Builder的builder方法:

  • 隔离查询配置(防止配置污染)。默认情况下是弃用配置隔离的,除非是在测试的时候。
  • 如果设置了特定的参数,则将特定参数添加到HiveConf中。
  • 如果需要构建新的查询ID,则构建一个新的查询ID。
  • 创建QueryState。
  • 如果存在血缘状态,则设置血缘状态。
    public QueryState build() {
      HiveConf queryConf;

      if (isolated) {
        // isolate query conf
        if (hiveConf == null) {
          queryConf = new HiveConf();
        } else {
          queryConf = new HiveConf(hiveConf);
        }
      } else {
        queryConf = hiveConf;
      }

      // Set the specific parameters if needed
      if (confOverlay != null && !confOverlay.isEmpty()) {
        // apply overlay query specific settings, if any
        for (Map.Entry<String, String> confEntry : confOverlay.entrySet()) {
          try {
            queryConf.verifyAndSet(confEntry.getKey(), confEntry.getValue());
          } catch (IllegalArgumentException e) {
            throw new RuntimeException("Error applying statement specific settings", e);
          }
        }
      }

      // Generate the new queryId if needed
      if (generateNewQueryId) {
        String queryId = QueryPlan.makeQueryId();
        queryConf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId);
        setApplicationTag(queryConf, queryId);

        // FIXME: druid storage handler relies on query.id to maintain some staging directories
        // expose queryid to session level
        if (hiveConf != null) {
          hiveConf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId);
        }
      }

      QueryState queryState = new QueryState(queryConf);
      if (lineageState != null) {
        queryState.setLineageState(lineageState);
      }
      return queryState;
    }

真正创建IDriver

这里有个参数hive.query.reexecution.enabled,默认情况下系统设置为true。这个参数到底什么含义,还期望后面的学习中能够解答。

public static IDriver newDriver(QueryState queryState, String userName, QueryInfo queryInfo) {
    boolean enabled = queryState.getConf().getBoolVar(ConfVars.HIVE_QUERY_REEXECUTION_ENABLED);
    if (!enabled) {
      return new Driver(queryState, userName, queryInfo);
    }

    String strategies = queryState.getConf().getVar(ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES);
    strategies = Strings.nullToEmpty(strategies).trim().toLowerCase();
    ArrayList<IReExecutionPlugin> plugins = new ArrayList<>();
    for (String string : strategies.split(",")) {
      if (string.trim().isEmpty()) {
        continue;
      }
      plugins.add(buildReExecPlugin(string));
    }

    return new ReExecDriver(queryState, userName, queryInfo, plugins);
  }

Driver的run方法(敲黑板,划重点)

其实在run方法中主要是runInternal方法,其他大篇幅的都是异常处理。所以,我们的关注点又转移到了runInternal方法上来。

public CommandProcessorResponse run(String command) {
    return run(command, false);
  }

  public CommandProcessorResponse run(String command, boolean alreadyCompiled) {

    try {
      runInternal(command, alreadyCompiled);
      return createProcessorResponse(0);
    } catch (CommandProcessorResponse cpr) {
    //这里都是很大篇幅的异常处理
     ...
    }
  }

runInternal方法

这里主要逻辑如下:

  • 如果已经编译了,则修改Driver的状态为执行,否则修改状态为编译中。
  • 获取HiveDriver上下文的钩子。
  • 获取所有的运行钩子,并且预处理他们。
  • 如果没有编译,则进行编译(编译也是个复杂的过程,我们在后面着重分析)。编译的入口在compileInternal方法。
  • 最后调用执行方法execute

execute方法

主要逻辑如下:

  • 设置driver的状态为执行中
  • 从执行计划(plan)中拿出所有的task,并执行。其中是while循环来实现的。其中包括了异常task的处理等。

总结

走读到这里,我们已经发现整个流程已经完成了。这里有很复杂的编译sql并获得MR的task的过程。这里并没有展开讨论。下面我们着重看一下编译到底经历了什么样的蜕变。

细心的话,发现上面描述地很粗,编译,执行都一笔带过,其实,这和其复杂性息息相关,都需要单独展开来分析。之前我们在说hive架构的时候,在编译模块中,我们有转化(Parser),执行计划(Plan),语义分析(SematicAnalyzer)等。

编译的方法入口是compileInternal,最后调用了compile方法。

  private void compileInternal(String command, boolean deferClose) throws CommandProcessorResponse {
      ...
      try {
        compile(command, true, deferClose);
      } catch (CommandProcessorResponse cpr) {
        ...
        throw cpr;
      }
      ...
    }
  }

如何发现SQL转化器(Parser)的调用踪迹?

之前框架分析过,SQL转化器(Parser)是在编译器中调用的。这里我们发现将SQL转化为了树(ASTNode tree)。

下面的代码在Driver类的617行开始,当然不同版本的源码可能行号不同。但只要记住是在compile方法中即可。

      ...
      // 下面的代码在Driver类的617行开始,当然不同版本的源码可能行号不同。但只要记住是在compile方法中即可。
      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.PARSE);

      // Trigger query hook before compilation
      hookRunner.runBeforeParseHook(command);

      ASTNode tree;
      try {
        tree = ParseUtils.parse(command, ctx);
      } catch (ParseException e) {
        parseError = true;
        throw e;
      } finally {
        hookRunner.runAfterParseHook(command, parseError);
      }
      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PARSE);

如何发现语义分析(SemanticAnalyzer)的调用踪迹?

转化后,便紧挨着是语义分析。这里都不展开分析了,在后面开辟专题来分析这一块的逻辑。
下面的代码在Driver类的633行开始,当然不同版本的源码可能行号不同。但只要记住是在compile方法中即可。

      hookRunner.runBeforeCompileHook(command);
      // clear CurrentFunctionsInUse set, to capture new set of functions
      // that SemanticAnalyzer finds are in use
      SessionState.get().getCurrentFunctionsInUse().clear();
      perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ANALYZE);

      // Flush the metastore cache.  This assures that we don't pick up objects from a previous
      // query running in this same thread.  This has to be done after we get our semantic
      // analyzer (this is when the connection to the metastore is made) but before we analyze,
      // because at that point we need access to the objects.
      Hive.get().getMSC().flushCache();

      backupContext = new Context(ctx);
      boolean executeHooks = hookRunner.hasPreAnalyzeHooks();

      HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
      if (executeHooks) {
        hookCtx.setConf(conf);
        hookCtx.setUserName(userName);
        hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
        hookCtx.setCommand(command);
        hookCtx.setHiveOperation(queryState.getHiveOperation());

        tree =  hookRunner.runPreAnalyzeHooks(hookCtx, tree);
      }

      // Do semantic analysis and plan generation
      BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);

      if (!retrial) {
        if ((queryState.getHiveOperation() != null)
                && queryState.getHiveOperation().equals(HiveOperation.REPLDUMP)) {
          setLastReplIdForDump(queryState.getConf());
        }
        openTransaction();
        generateValidTxnList();
      }

      sem.analyze(tree, ctx);

      if (executeHooks) {
        hookCtx.update(sem);
        hookRunner.runPostAnalyzeHooks(hookCtx, sem.getAllRootTasks());
      }

      LOG.info("Semantic Analysis Completed (retrial = {})", retrial);

如何发现查询计划(Plan)和优化器(Optimizer)的调用踪迹?

下面的代码在Driver类的685行开始,当然不同版本的源码可能行号不同。但只要记住是在compile方法中即可。

      // validate the plan
      sem.validate();
      perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);

      checkInterrupted("after analyzing query.", null, null);

      // get the output schema
      schema = getSchema(sem, conf);
      plan = new QueryPlan(queryStr, sem, queryDisplay.getQueryStartTime(), queryId,
          queryState.getHiveOperation(), schema);
      // save the optimized plan and sql for the explain
      plan.setOptimizedCBOPlan(ctx.getCalcitePlan());
      plan.setOptimizedQueryString(ctx.getOptimizedSql());

      conf.set("mapreduce.workflow.id", "hive_" + queryId);
      conf.set("mapreduce.workflow.name", queryStr);

至此,我们对编译的框架流程已经有了一个整体的还有点模糊的印象。具体HiveQL是怎么转化的为MR的,其实快要拨开迷雾了,但是看源码真心好痛苦。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,905评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,140评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,791评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,483评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,476评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,516评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,905评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,560评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,778评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,557评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,635评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,338评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,925评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,898评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,142评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,818评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,347评论 2 342

推荐阅读更多精彩内容