Spark Core源码精读计划#8:SparkEnv中RPC环境的基础构建

目录

前言

在之前的文章中,我们由SparkContext的初始化提到了事件总线LiveListenerBus与执行环境SparkEnv。在讲解SparkEnv的过程中,RPC环境RpcEnv又是首先被初始化的重要组件。做个不怎么恰当的比较,SparkEnv之于SparkContext,正如RpcEnv之于SparkEnv。

由于RPC环境负责着Spark体系内几乎所有内部及外部通信,内容很多,所以一篇文章必然讲不完。本文还是从基础开始看起。

RPC端点及其引用

RpcEnv抽象类是Spark RPC环境的通用表示,它其中定义的setupEndpoint()方法用来向RPC环境注册一个RPC端点(RpcEndpoint),并返回其引用(RpcEndpointRef)。如果客户端要对一个RpcEndpoint发送消息,那么必须首先获得其对应的RpcEndpointRef。它们之间的关系可以用如下简图表示。


图#8.1 - RPC环境与RPC端点

既然RpcEndpoint和RpcEndpointRef是RPC环境中的基础组件,我们先来研究它们的源码。

RpcEndpoint

RpcEndpoint是一个特征,其代码如下。

代码#8.1 - o.a.s.rpc.RpcEndpoint特征

private[spark] trait RpcEndpoint {
  val rpcEnv: RpcEnv

  final def self: RpcEndpointRef = {
    require(rpcEnv != null, "rpcEnv has not been initialized")
    rpcEnv.endpointRef(this)
  }

  def receive: PartialFunction[Any, Unit] = {
    case _ => throw new SparkException(self + " does not implement 'receive'")
  }

  def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
  }

  def onError(cause: Throwable): Unit = {
    throw cause
  }

  def onConnected(remoteAddress: RpcAddress): Unit = { }

  def onDisconnected(remoteAddress: RpcAddress): Unit = { }

  def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { }

  def onStart(): Unit = { }

  def onStop(): Unit = { }

  final def stop(): Unit = {
    val _self = self
    if (_self != null) {
      rpcEnv.stop(_self)
    }
  }
}

其中定义了如下方法,这些相当于是RPC端点在RPC环境中的“行为准则”。

  • self():取得当前RpcEndpoint对应的RpcEndpointRef。
  • receive()/receiveAndReply():接收其他RpcEndpointRef传来的消息并进行处理,receiveAndReply()方法还会发送回复。
  • onError():消息处理出现异常时调用的方法。
  • onConnected()/onDisconnected():当前RPC端点建立连接或断开连接时调用的方法。
  • onNetworkError():RPC端点的连接出现网络错误时调用的方法。
  • onStart()/onStop():RPC端点初始化与关闭时调用的方法。
  • stop():停止当前RpcEndpoint。

RpcEndpoint继承体系

RpcEndpoint的主要继承体系如下图所示。

#图8.2 - RpcEndpoint的主要继承体系

图中可以看到不少之前出现过的RPC端点,如文章#2中的HeartbeatReceiver,文章#7中的MapOutputTrackerMasterEndpoint、BlockManagerMasterEndpoint等。在今后涉及到它们时,会专门进行讲解。

另外,图中的ThreadSafeRpcEndpoint是直接继承自RpcEndpoint的特征。顾名思义,它要求RPC端点对消息的处理必须是线程安全的,用文档中的话说,线程安全RPC端点处理消息必须满足happens-before原则。

RpcEndpointRef

RpcEndpointRef是一个抽象类,其代码如下。

代码#8.2 - o.a.s.rpc.RpcEndpointRef抽象类

private[spark] abstract class RpcEndpointRef(conf: SparkConf)
  extends Serializable with Logging {
  private[this] val maxRetries = RpcUtils.numRetries(conf)
  private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
  private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)

  def address: RpcAddress

  def name: String

  def send(message: Any): Unit

  def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

  def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)

  def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)

  def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
    val future = ask[T](message, timeout)
    timeout.awaitResult(future)
  }
}

这个抽象类的开头有三个属性,都是通过RpcUtils工具类从Spark配置项中取出来的,如下。

  • maxRetries:最大重连次数,对应配置项为spark.rpc.numRetries,默认值3次。
  • retryWaitMs:每次重连之前等待的时长,对应配置项为spark.rpc.retry.wait,默认值3秒。
  • defaultAskTimeout:对RPC端点进行ask()操作(下面会讲到)的默认超时时长,对应配置项为spark.rpc.askTimeout与spark.network.timeout(前者优先级高于后者),默认值120秒。

值得注意的是,maxRetries与retryWaitMs两个属性在当前的2.3.3版本中都没有用到,而在之前的版本中还是有用到的,证明Spark官方取消了RPC重试机制,也就是统一为消息传递语义中的at most once语义了。当然,我们也可以自己实现带有重试机制的RPC端点引用。

address和name方法分别返回RPC端点引用对应的地址和名称,不必多讲。下面几个方法的含义如下。

  • send()方法:异步发送一条单向的消息,并且“发送即忘记”(fire-and-forget),不需要回复。
  • ask()方法:异步发送一条消息,并在规定的超时时间内等待RPC端点的回复。RPC端点会调用receiveAndReply()方法来处理。
  • askSync()方法:是ask()方法的同步实现。由于它是阻塞操作,有可能会消耗大量时间,因此必须慎用。

RpcEndpointRef只有一个子类,即NettyRpcEndpointRef。它对send()和ask()两个方法的实现如下。

代码#8.3 - o.a.s.rpc.netty.NettyRpcEndPointRef.send()与ask()方法

  override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
    nettyEnv.ask(new RequestMessage(nettyEnv.address, this, message), timeout)
  }

  override def send(message: Any): Unit = {
    require(message != null, "Message is null")
    nettyEnv.send(new RequestMessage(nettyEnv.address, this, message))
  }

可见是依赖于NettyRpcEnv类的,下面来看一下它是如何创建出来的。

NettyRpcEnv概况

创建NettyRpcEnv

在文章#7的代码#7.4~#7.5中,通过工厂类NettyRpcEnvFactory的create()方法创建出了NettyRpcEnv,它是目前Spark官方提供的RPC环境的唯一实现。该方法的代码如下。

代码#8.4 - o.a.s.rpc.netty.NettyRpcEnvFactory.create()方法

  def create(config: RpcEnvConfig): RpcEnv = {
    val sparkConf = config.conf
    val javaSerializerInstance =
      new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
    val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
        config.securityManager, config.numUsableCores)
    if (!config.clientMode) {
      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
      try {
        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
      } catch {
        case NonFatal(e) =>
          nettyEnv.shutdown()
          throw e
      }
    }
    nettyEnv
  }

可见,这个方法先创建了JavaSerializer序列化器,用于RPC传输中的序列化。然后通过NettyRpcEnv的构造方法创建NettyRpcEnv,这其中也会涉及到一些RPC基础组件的初始化,后面会讲解到。最后定义偏函数startNettyRpcEnv,并调用通用工具类Utils中的startServiceOnPort()方法来启动NettyRpcEnv。

NettyRpcEnv中的属性成员

我们暂时先不看NettyRpcEnv类的细节,而是先来看看它内部包含了哪些组件。

代码#8.5 - o.a.s.rpc.netty.NettyRpcEnv中的属性成员

  private[netty] val transportConf = SparkTransportConf.fromSparkConf(
    conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
    "rpc",
    conf.getInt("spark.rpc.io.threads", 0))

  private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)

  private val streamManager = new NettyStreamManager(this)

  private val transportContext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager))

  private val clientFactory = transportContext.createClientFactory(createClientBootstraps())

  @volatile private var fileDownloadFactory: TransportClientFactory = _

  val timeoutScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout")

  private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool(
    "netty-rpc-connection",
    conf.getInt("spark.rpc.connect.threads", 64))

  @volatile private var server: TransportServer = _

  private val stopped = new AtomicBoolean(false)

  private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
  • TransportConf:传输配置,作用在RPC环境中类似于SparkConf,负责管理与RPC相关的各种配置。
  • Dispatcher:调度器,或者叫分发器,用于将消息路由到正确的RPC端点。
  • NettyStreamManager:流式管理器,用于处理RPC环境中的文件,如自定义的配置文件或者JAR包。
  • TransportContext:传输上下文,作用在RPC环境中类似于SparkContext,负责管理RPC的服务端(TransportServer)与客户端(TransportClient),与它们之间的Netty传输管道。
  • TransportClientFactory:创建RPC客户端TransportClient的工厂。
  • TransportServer:RPC环境中的服务端,负责提供基础且高效的流式服务。

TransportConf和TransportContext提供底层的基于Netty的RPC机制,TransportClient和TransportServer则是RPC端点的最低级别抽象。

总结

本文讲解了RPC环境的基本组成部分RpcEndpoint、RpcEndpointRef的细节实现,并初步了解了NettyRpcEnv的创建过程,以及它内部包含的主要组件。虽然TransportConf和TransportContext更为基础,但为了避免嵌套太深出不来,下一篇文章暂时不准备讲它们,而主要来研究NettyRpcEnv内的调度器Dispatcher,它是整个RPC环境高效运转的基础。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容