Spark Core源码精读计划 番外篇B-1:重回Spark RPC环境

目录

前言

又是很久没有连载,万分抱歉。今天(注:其实也包含昨天)需要盯着双11各个实时任务的运行,目前仍然无事发生,抽空来写几笔吧。

在前面的文章中,我们了解了块管理器BlockManager管理下的读写流程。并且已经知道,BlockManager读取块时,如果块在本地找不到,就会去集群内的远端节点去获取。同理,如果BlockManager写入块时需要复制,那么除了在本地写入之外,也要再写一份到远端节点。BlockManager与远端节点的交互就得依赖块传输服务BlockTransferService。但是BlockTransferService需要依赖之前偷懒没有讲过的RPC底层组件,所以现在得把这个坑填上,计划用3篇文章来填。

由于Spark 2.x的RPC环境是完全基于Netty搞的,所以如果看官对Netty有基本的了解的话,读起来会顺畅一点。

RPC底层概览

在系列的文章#8中,我们讲到了RPC环境——即NettyRpcEnv的构建和属性成员。来复习一下。

代码#B1.1 - o.a.s.rpc.netty.NettyRpcEnv中的部分属性成员

  private[netty] val transportConf = SparkTransportConf.fromSparkConf(
    conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
    "rpc",
    conf.getInt("spark.rpc.io.threads", 0))

  private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)

  private val streamManager = new NettyStreamManager(this)

  private val transportContext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager))

  private val clientFactory = transportContext.createClientFactory(createClientBootstraps())

  @volatile private var server: TransportServer = _
  • TransportConf:传输配置,作用在RPC环境中类似于SparkConf,负责管理与RPC相关的各种配置。
  • Dispatcher:调度器,或者叫分发器,用于将消息路由到正确的RPC端点,它的逻辑在文章#9中已经讲过,就不再提了。
  • StreamManager:流式管理器,用于处理RPC环境中的文件,如自定义的配置文件或者JAR包。
  • TransportContext:传输上下文,作用在RPC环境中类似于SparkContext,负责管理RPC的服务端(TransportServer)与客户端(TransportClient),和它们之间的Netty Channel和Pipeline。
  • TransportClientFactory:创建RPC客户端TransportClient的工厂。
  • TransportServer:RPC环境中的服务端,负责提供基础且高效的流式服务。

这些东西就是Spark RPC底层主要的组成部分,之前并没有了解过,下面我们从TransportConf、TransportContext这两样开始探究。

传输配置TransportConf

在Spark源码中并不会显式地创建TransportConf实例,而是通过SparkTransportConf对象代为实现。该对象的源码很短,如下。

代码#B1.2 - o.a.s.network.netty.SparkTransportConf对象

object SparkTransportConf {
  private val MAX_DEFAULT_NETTY_THREADS = 8

  def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = {
    val conf = _conf.clone

    val numThreads = defaultNumThreads(numUsableCores)
    conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
    conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)

    new TransportConf(module, new ConfigProvider {
      override def get(name: String): String = conf.get(name)
      override def get(name: String, defaultValue: String): String = conf.get(name, defaultValue)
      override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = {
        conf.getAll.toMap.asJava.entrySet()
      }
    })
  }

  private def defaultNumThreads(numUsableCores: Int): Int = {
    val availableCores =
      if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
    math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
  }
}

可见,SparkTransportConf.fromSparkConf()方法负责从SparkConf持有的参数创建TransportConf。TransportConf接受的构造参数有二:

  • module:该RPC配置所属的模块名,比如代码#B.1中创建NettyRpcEnv时,模块名就是"rpc"。
  • ConfigProvider:一个简单的负责数据类型转化的类,能够读取SparkConf中与RPC相关的配置,并传递给TransportConf。在如上的代码中,是直接初始化了一个匿名类。

这样,我们就可以通过SparkConf持有、并通过ConfigProvider获取Spark RPC的所有配置参数。换句话说,TransportConf就是SparkConf的一个子集,SparkConf仍然是配置的唯一入口,方便统一管理。

上面代码中需要初始化的参数如下:

  • spark.rpc.io.numConnectionsPerPeer:在每对RPC实体间建立的连接数量,默认是1;
  • spark.rpc.io.threads:RPC服务端和客户端的线程数,默认为0,即不设置。在此值不设置的情况下,最终会采用可用的核心数与MAX_DEFAULT_NETTY_THREADS常量(为8)之间的较小值。

TransportConf类的实现就非常简单了,主要由很多get方法组成。以下是部分代码,不再赘述。

代码#B1.3 - o.a.s.network.netty.TransportConf类的部分代码

  public TransportConf(String module, ConfigProvider conf) {
    this.module = module;
    this.conf = conf;
    SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode");
    SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs");
    SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout");
    // ...略...
  }

  public String ioMode() { return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase(Locale.ROOT); }

  public boolean preferDirectBufs() { return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true); }

  public int connectionTimeoutMs() {
    long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec(
      conf.get("spark.network.timeout", "120s"));
    long defaultTimeoutMs = JavaUtils.timeStringAsSec(
      conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000;
    return (int) defaultTimeoutMs;
  }

  public int numConnectionsPerPeer() { return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1); }

  public int serverThreads() { return conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0); }

  public int clientThreads() { return conf.getInt(SPARK_NETWORK_IO_CLIENTTHREADS_KEY, 0); }

  public int receiveBuf() { return conf.getInt(SPARK_NETWORK_IO_RECEIVEBUFFER_KEY, -1); }

  public int sendBuf() { return conf.getInt(SPARK_NETWORK_IO_SENDBUFFER_KEY, -1); }

传输上下文TransportContext

既然SparkContext是Spark Core功能的主要入口,那么TransportContext自然就是Spark RPC环境的入口了。它比前面讲过的NettyRpcEnv更加底层,如果没有它,RPC环境也就无从谈起了。

成员属性与构造方法

代码#B1.4 - o.a.s.network.TransportContext类的成员属性与构造方法

  private final TransportConf conf;
  private final RpcHandler rpcHandler;
  private final boolean closeIdleConnections;

  private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE;
  private static final MessageDecoder DECODER = MessageDecoder.INSTANCE;

  public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
    this(conf, rpcHandler, false);
  }

  public TransportContext(
      TransportConf conf,
      RpcHandler rpcHandler,
      boolean closeIdleConnections) {
    this.conf = conf;
    this.rpcHandler = rpcHandler;
    this.closeIdleConnections = closeIdleConnections;
  }

以下3个成员属性同时也是TransportContext构造方法的参数:

  • conf:即RPC传输配置TransportConf;
  • rpcHandler:RPC消息处理器RpcHandler,是一个抽象类。顾名思义,它其中包含了所有RPC消息的具体处理逻辑;
  • closeIdleConnections:表示是否关闭空闲连接的标志。

另外还有两个常量ENCODER和DECODER。前者是消息编码器MessageEncoder的实例,由Netty提供的MessageToMessageEncoder派生而来,RPC服务端使用它来编码向客户端发送的消息;后者是消息解码器MessageDecoder的实例,由Netty提供的MessageToMessageDecoder派生而来,RPC客户端使用它来解码从服务端收到的消息。

下面具体看看TransportContext提供的方法,通过这些方法,我们能见识到更多其他由TransportContext创建的RPC组件。

创建传输客户端工厂TransportClientFactory

代码#B1.5 - o.a.s.network.TransportContext.createClientFactory()方法

  public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
    return new TransportClientFactory(this, bootstraps);
  }

  public TransportClientFactory createClientFactory() {
    return createClientFactory(new ArrayList<>());
  }

createClientFactory()方法负责创建传输客户端工厂TransportClientFactory,由TransportClientFactory进而可以创建更多的传输客户端TransportClient。其参数中的TransportClientBootstrap表示传输客户端的初始化逻辑(通常是一些一次性的工作,比如让它们携带SASL认证的token等)。

createClientFactory()方法是直接调用了TransportClientFactory的构造方法,关于它的逻辑,下篇文章详细讲。

创建传输服务端TransportServer

代码#B1.6 - o.a.s.network.TransportContext.createServer()方法

  public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, null, port, rpcHandler, bootstraps);
  }

  public TransportServer createServer(
      String host, int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, host, port, rpcHandler, bootstraps);
  }

  public TransportServer createServer(List<TransportServerBootstrap> bootstraps) {
    return createServer(0, bootstraps);
  }

  public TransportServer createServer() {
    return createServer(0, new ArrayList<>());
  }

createServer()方法有4个重载,可以指定RPC服务端要绑定到的主机地址和端口号,以及初始化逻辑TransportServerBootstrap。TransportServer的构造方法和相关细节会与TransportClientFactory一起讲。

初始化Netty ChannelPipeline与ChannelHandler

先来看initializePipeline()方法。

代码#B1.7 - o.a.s.network.TransportContext.initializePipeline()方法

  public TransportChannelHandler initializePipeline(SocketChannel channel) {
    return initializePipeline(channel, rpcHandler);
  }

  public TransportChannelHandler initializePipeline(
      SocketChannel channel,
      RpcHandler channelRpcHandler) {
    try {
      TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
      channel.pipeline()
        .addLast("encoder", ENCODER)
        .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
        .addLast("decoder", DECODER)
        .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
        .addLast("handler", channelHandler);
      return channelHandler;
    } catch (RuntimeException e) {
      logger.error("Error while initializing Netty pipeline", e);
      throw e;
    }
  }

这段代码看起来可能有些sophisticated,我们先回忆一下Netty的基础知识。

在Netty架构中,Channel(通道)是消息通信的载体,ChannelHandler(通道处理器)则负责Channel中消息通信具体逻辑的实现。而ChannelPipeline(通道管线)将多个ChannelHandler按顺序组织起来,ChannelEvent(消息体)就按照ChannelPipeline规定的顺序流转。下面用一幅简图来表示它们之间的关系。

图#B1.1 - Netty Channel、ChannelPipeline、ChannelHandler之间的关系

在ChannelPipeline内部用双链表来维护ChannelHandler以及它对应的上下文实例ChannelHandlerContext,另外还有特殊的头节点HeadContext和尾节点TailContext。Netty的这种设计可以让用户专注于实现ChannelHandler的逻辑细节,这大概也是Spark开发者们所看重的优点之一吧。

从上文代码中的initializePipeline()方法可以看出,通过链式调用ChannelPipeline.addLast()方法,按顺序添加了以下ChannelHandler:

  • 消息编码器MessageEncoder;
  • 帧解码器TransportFrameDecoder;
  • 消息解码器MessageDecoder;
  • 空闲状态处理器IdleStateHandler;
  • 真正的RPC处理逻辑TransportChannelHandler。它是Spark RPC环境专用的ChannelHandler。

它们的类图如下所示。

图#B1.2 - Spark RPC ChannelPipeline内的组件类图

其中,实现了ChannelInboundHandler接口的处理器用于处理请求消息,而实现了ChannelOutboundHandler接口的处理器用于处理响应消息,并且它们的顺序是相反的。因此,处理请求的流程是:TransportFrameDecoder→MessageDecoder→IdleStateHandler→TransportChannelHandler,处理响应的流程是:IdleStateHandler→MessageEncoder。

那么TransportChannelHandler是哪里来的呢?它是由createChannelHandler()方法创建的。

代码#B1.8 - o.a.s.network.TransportContext.createChannelHandler()方法

  private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
    TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
    TransportClient client = new TransportClient(channel, responseHandler);
    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
      rpcHandler, conf.maxChunksBeingTransferred());
    return new TransportChannelHandler(client, responseHandler, requestHandler,
      conf.connectionTimeoutMs(), closeIdleConnections);
  }

由代码可见,一个TransportChannelHandler实际上由三个组件组成:

  • TransportResponseHandler,是客户端处理服务端发回的响应消息的处理器;
  • TransportClient,单独的RPC客户端,不受TransportClientFactory的管理;
  • TransportRequestHandler,是服务端处理客户端发来的请求消息的处理器。

注意,虽然TransportResponseHandler和TransportRequestHandler的名称里都有“Handler”,但它们不是Netty层面上的东西,仅仅是Spark内置的MessageHandler抽象类的实现而已,它规定了处理请求和响应的一些基本规范,后文会讲解到。

图#B1.3 - MessageHandler类图

总结

本文讲解了RPC环境中的传输配置TransportConf与传输上下文TransportContext的细节,探究了由TransportContext初始化的传输客户端工厂TransportClientFactory、传输服务端TransportServer,最后结合Netty的部分知识讲解了ChannelPipeline与ChannelHandler的初始化逻辑。

民那晚安。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容