Kyuubi服务源码解析:FrontendService

FrontendService(Thrift服务Server端)

  FrontendService负责与客户端进行交互:维护与客户端的连接,并将SQL执行结果返回至客户端。

FrontendService.scala:

FrontendService的类声明如下所示:

  从FrontendService类声明中可以看出,该类实现了Runnable接口,看一下它如何重写的run方法。

  • run()方法
  override def run(): Unit = {
    try {
      // Server thread pool
      val minThreads = conf.get(FRONTEND_MIN_WORKER_THREADS.key).toInt
      val maxThreads = conf.get(FRONTEND_MAX_WORKER_THREADS.key).toInt
      //使用Java线程池ThreadPoolExecutor
      val executorService = new ThreadPoolExecutor(
        minThreads,
        maxThreads,
        conf.getTimeAsSeconds(FRONTEND_WORKER_KEEPALIVE_TIME.key),
        TimeUnit.SECONDS,
        new SynchronousQueue[Runnable],
        new NamedThreadFactory(threadPoolName))

      // Thrift configs
      authFactory = new KyuubiAuthFactory(conf)
      val transportFactory = authFactory.getAuthTransFactory
      val processorFactory = authFactory.getAuthProcFactory(this)
      //使用TServerSocket创建阻塞式IO,TServerSocket继承自TTransport
      //也就是说,TServerSocket属于TTransport这一层
      val serverSocket: TServerSocket = getServerSocket(serverIPAddress, portNum)

      // Server args
      val maxMessageSize = conf.get(FRONTEND_MAX_MESSAGE_SIZE.key).toInt
      val requestTimeout = conf.getTimeAsSeconds(FRONTEND_LOGIN_TIMEOUT.key).toInt
      val beBackoffSlotLength = conf.getTimeAsMs(FRONTEND_LOGIN_BEBACKOFF_SLOT_LENGTH.key).toInt
      val args = new TThreadPoolServer.Args(serverSocket)
        .processorFactory(processorFactory)
        .transportFactory(transportFactory)
        //客户端协议要一致,这里使用的协议是TBinaryProtocol,它使用二进制格式
        .protocolFactory(new TBinaryProtocol.Factory)
        .inputProtocolFactory(
          new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
        .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS)
        .beBackoffSlotLength(beBackoffSlotLength)
        .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
        .executorService(executorService)
      // TCP Server
      //建立TThreadPoolServer线程池服务模型
      server = Some(new TThreadPoolServer(args))
      server.foreach(_.setServerEventHandler(serverEventHandler))
      info(s"Starting $name on host ${serverIPAddress.getCanonicalHostName} at port $portNum with" +
        s" [$minThreads, $maxThreads] worker threads")
      //启动服务
      server.foreach(_.serve())
    } catch {
      case t: Throwable =>
        error("Error starting " + name +  " for KyuubiServer", t)
        System.exit(-1)
    }
  }

  其中,TThreadPoolServer涉及到Thrift服务的通信模型,理解Thrift服务可以参照这篇博文:Thrift 通信模型
  下面这张图是Thrift服务的通信协议栈,TThreadPoolServer属于TServer的一种模型(图中红色方框标注处)。TServer主要作用是接收Client的请求,并转到某个TProcessor上进行请求处理。针对不同的访问规模,Thrift提供了不同的TServer模型。TThreadPoolServer使用阻塞IO的多线程服务器,使用线程池管理处理线程。
  TThreadPoolServer的示例代码参见:
  Thrift 多线程阻塞式IO服务模型-TThreadPoolServer

Thrift通信协议栈

  我们与Hive源码中org.apache.hive.service.cli.thrift包下ThriftBinaryCLIService(该类继承自ThriftCLIService)的run方法进行类比,可以看出代码逻辑几乎完全相似。

@Override
  public void run() {
      // Server thread pool
      String threadPoolName = "HiveServer2-Handler-Pool";
      ExecutorService executorService = new ThreadPoolExecutorWithOomHook(minWorkerThreads,
          maxWorkerThreads, workerKeepAliveTime, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), new ThreadFactoryWithGarbageCleanup(threadPoolName),
          oomHook);
      // Thrift configs
      hiveAuthFactory = new HiveAuthFactory(hiveConf);
      TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
      TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
      ***省略部分代码***
      // Server args
      int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
      int requestTimeout = (int) hiveConf.getTimeVar(
          HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS);
      int beBackoffSlotLength = (int) hiveConf.getTimeVar(
          HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS);
      TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
          .processorFactory(processorFactory).transportFactory(transportFactory)
          .protocolFactory(new TBinaryProtocol.Factory())
          .inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
          .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS)
          .beBackoffSlotLength(beBackoffSlotLength).beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
          .executorService(executorService);

      // TCP Server
      server = new TThreadPoolServer(sargs);
      server.setServerEventHandler(new TServerEventHandler() {
          ***省略部分代码***
      }
      String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
          + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
      LOG.info(msg);
      server.serve();
    } catch (Throwable t) {
      LOG.error(
          "Error starting HiveServer2: could not start "
              + ThriftBinaryCLIService.class.getSimpleName(), t);
      System.exit(-1);
    }
  }

是不是几乎一毛一样?哈哈~~


Thrift服务Hive JDBC客户端

  既然Thrift服务是基于C/S模式,而FrontendService又负责与客户端交互。上文通过解析FrontendService的代码,可以看出Thrift Server端的大致逻辑。下面简单分析一下Thrift Client端的代码逻辑。
  无论是MySQL还是Hive,通过JDBC连接时(包括Beeline客户端连接)的逻辑大致相同,在连接时都会基于反射机制去加载jar包中的代码。感兴趣的同学可以参考以下博文(这两篇分别是两个作者写的):JAVA JDBC(MySQL)驱动源码分析(一)JDBC源码解析(二):获取connection
  Hive JDBC连接的jar包叫org.apache.hadoop.hive.jdbc.HiveDriver,当JDBC连接时会调用HiveDriver的connect方法,connect方法中又会实例化一个HiveConnection的对象。

HiveConnection.java:

  • HiveConnection构造方法
 public HiveConnection(String uri, Properties info) throws SQLException {
 ***省略部分代码***
      for (int numRetries = 0;;) {
        try {
          // open the client transport
          //这里的transport我理解为socket,当然transport不仅仅是socket
          openTransport();
          // set up the client
          client = new TCLIService.Client(new TBinaryProtocol(transport));
          // open client session
          openSession();
          //这行代码相关的JIRA还是Hive Commiter帮我提交进去的,哈哈哈~
          executeInitSql();

          break;
        } 
 ***省略部分代码***
 }

  Hive源码中的注释已经写得很清楚,我们来分析这一行代码:
  client = new TCLIService.Client(new TBinaryProtocol(transport));
  这行代码做了一层层的封装,最后创建了Client对象。结合Thrift服务的通信模型,也就是从底层往上一层层地进行封装:TTransport => TProtocol => Client。
  openSession()的解析,请看我写的这篇博文:Kyuubi服务源码解析:OpenSession解析
  这里有必要分析一下openTransport()的过程,因为涉及到Kerberos认证,有助于梳理Kerberos认证的流程。详细内容见我写的Kerberos文集中HiveConnection之openTransport()方法解析

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容