Livy Session 详解(上)

本文基于 incubator-livy 0.4.0-incubating

Livy Rest Api的介绍中我们可以知道,livy 共有两种 job,分别是 session 和 batch。然而,在源码实现中,session 和 batch 都是 Session 的子类,rest api 中的 session 对应源码中的 InteractivateSession;rest api 中的 batch 对应源码中的 BatchSession。在之后关于 livy 的所有文章中,session 或 batch 对应 rest api 中的含义,InteractivateSessionBatchSessionSession 都对应代码中的含义。

session 和 batch 的创建过程也很不相同,batch 的创建以对应的 spark app 启动为终点;而 session 除了要启动相应的 spark app,还要能支持共享 sparkContext 来接受一个个 statements 的提交及运行,我将 session 的创建分为两个大步骤:

  • client 端:运行在 LivyServer 中,接受 request 直到启动 spark app(注意,这里虽然叫 client 端,但是运行在 LivyServer 中的)
  • server 端:session 对应的 spark app driver 的启动

这篇文章主要讲讲 client 端 都做了些什么

一:整体流程

create session-livy client side.png

一图胜千言,上图就是创建一个 session 在 client 端的主要流程,我们将以注释的方式来说明那些没那么重要或复杂的流程,而核心的流程都在下文中分小节进行剖析。

二:启动 session 对应的 spark app

接下来直捣黄龙,直接到第 (8) 步 ContextLauncher#startDriver 看看 session 对应的 spark app 是如何启动的。ContextLauncher#startDriver 可以分为两个大步骤:

  1. 启动 spark app
  2. 等待 SparkSubmit 退出

2.2:启动 spark app

startDriver.png

如上图,startDriver 无非就是 new 了一个 SparkLauncher 对象,进行了配置、资源、mainClass 等设置,然后调用 launch() 方法拿到了 SparkSubmit 进程的 对应的 Process 对象 process。
可以看到,session 对应的 spark app 的 mainClass 为 org.apache.livy.rsc.driver.RSCDriverBootstrapper

2.3:等待 SparkSubmit 退出

SparkLauncher#launch() 返回的进程是 SparkSubmit 进程,再返回 process 后,会 new 一个 ContextLauncher.ChildProcess 对象,在过程中会新启动一个线程来一直等待 SparkSubmit 进程退出,该线程中的逻辑如下:若 SparkSubmit 非正常退出(exitCode != 0),表示 Spark App 启动失败,会抛异常

public void run() {
  try {
    int exitCode = child.waitFor();
    if (exitCode != 0) {
      LOG.warn("Child process exited with code {}.", exitCode);
      fail(new IOException(String.format("Child process exited with code %d.", exitCode)));
    }
  } catch (InterruptedException ie) {
    LOG.warn("Waiting thread interrupted, killing child process.");
    Thread.interrupted();
    child.destroy();
  } catch (Exception e) {
    LOG.warn("Exception while waiting for child process.", e);
  }
}

三:与 driver 建立连接

我们知道,session 最大的特点就是可以共享 SparkContext,让用户提交的多个代码片段都能跑在一个 SparkContext 上,这有两个好处:

  1. 大大加速任务的启动速度:我们知道,在 yarn 上启动一个 app 是比较耗时的,一般都需要 20s 左右;而使用 session,除了启动 session 也需要相当的耗时外,之后提交的代码片段都将立即执行
  2. 共享 RDD、table:持久化的 RDD、table 都可以被之后的代码片段使用,这在不同用户需要在相同的 RDD、table 上做计算的场景非常有用

而共享 SparkContext 就需要 client 与 driver 之间建立起连接,能让 client 向 driver 发送代码片段、查询运行状态、获取运行结果等

3.1:client 传递其 RpcServer 信息给 driver

时序图中的第 (5) 步:RSCClientFactory#createClient,在该调用中创建了一个 org.apache.livy.rsc.rpc.RpcServer(后文简称 RpcServer)对象赋值给成员 server。该 server 会在 driver 启动时被 driver 中的 rpc client 连接并告知 driver 中的 RpcServer 的信息,以便之后 client 端可以通过该信息向 driver 中的 RpcServer 发起连接及请求。由于 driver 可能被 yarn 调度到任何一个节点启动,所以无法由 LivyServer 主动与 driver 建立连接,而是预先在 client 端建立好 RpcServer 等待 driver 来连接。

另外,RpcServer 与 rpc client 是通过一个由 RpcServer 自身生成的 secret 进行匹配的。要能让 driver 连接到该 RpcServer,还需要知道 LivyServer 的 host 和 port,这这些信息都是通过 conf 传给 driver 的,在 ContextLauncher 构造函数中实现:

// 生成 client id
this.clientId = UUID.randomUUID().toString();
// 由server 生成 secret
this.secret = factory.getServer().createSecret();

...

conf.set(LAUNCHER_ADDRESS, factory.getServer().getAddress());
conf.set(LAUNCHER_PORT, factory.getServer().getPort());
conf.set(CLIENT_ID, clientId);
conf.set(CLIENT_SECRET, secret);

这些配置最终也将作为启动 driver 的 conf 的一部分传给 driver,这样 driver 在启动后就知道 client 中的 RpcServer 的地址和 secret 了

3.2:driver 连接 client 并传递其 RpcServer 信息

rscdriver-init.png

该过程在 RSCDriver#initializeServer 中实现,是 seesion driver 的初始化步骤

3.3:client 接收 driver rpcServer 地址信息并连接

client 传递其 RpcServer 信息给 driver 之前已经为 RSCClientFactory 对象的成员 server: RpcServer 注册了 client 以及相应 client 成功连接的处理函数:

final RegistrationHandler handler = new RegistrationHandler();
factory.getServer().registerClient(clientId, secret, handler);

这里的 clientId、secret 即 3.1 小节中传递给 driver 的。Registration 类用来处理 driver 端的 rpcClient 连接到 server 时的处理逻辑,即:

private void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) {
  ContextInfo info = new ContextInfo(msg.host, msg.port, clientId, secret);
  if (promise.trySuccess(info)) {
    timeout.cancel(true);
  }
}

参数 RemoteDriverAddress msg 即在 3.2 小节中 driver 中的 rpcClient 发送给 server 的 driver 中 rpcServer 的地址(包括 address、host),之后再结合 clientId、serrect 来构造 ContextInfo info 来触发 promise.trySuccess(info),info 表名了 driver 中 rpcServer 的地址已经发起连接需要的 clientId、secret,这与 3.2 小节中 driver 中的 rpcServer 注册的 client 信息相符。

在创建 RSCClient 对象时会在 promise 上 add 相应的 listener,promise.trySuccess(info) 会触发 onSuccess(ContextInfo info) 进而调用 connectToContext(info)

Utils.addListener(this.contextInfoPromise, new FutureListener<ContextInfo>() {
  @Override
  public void onSuccess(ContextInfo info) throws Exception {
    connectToContext(info);
    ...
  }

  @Override
  public void onFailure(Throwable error) {
    connectionError(error);
    ...
  }
})

connectToContext(info) 方法中会使用拿到的 driver 端 rpcServer 的连接信息发起连接得到 driverRpc,即用于向 driver 端 rpcServer 发送 rpc 调用的 client,这是 RSCClient 的成员,之后 RSCClient 和 driver 之间的通信都通过 driverRpc 来进行。

四:Session 的创建与初始化

new InteractiveSession and init

在与 driver 建立连接之后,会使用 rscClient、livyConf 等信息来创建 InteractiveSession 对象并进行初始化,流程如上。初始化过程汇总,比较关键的步骤是将 session 信息存储到 state store 中以便livy server 挂掉后能进行 recovery;再就是向 driver 发送一个空的 PingJob 来确定 driver 的状态是否 ok,若 PingJob 成功执行,则说明 driver 状态 ok,将 session 置为 running 状态;若出错或失败,则说明 driver 出了一些问题,则将 session 的状态置为 error。

在成功完成 session 的创建及初始化后,会将 session 添加到 SessionManager 中进行统一管理。SessionManager 的主要职责包括:

  • 持有所有 sessions
  • 清理过期 session
  • 从 state store 中恢复 sessions
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容