目录
前言
又是很久没有连载,万分抱歉。今天(注:其实也包含昨天)需要盯着双11各个实时任务的运行,目前仍然无事发生,抽空来写几笔吧。
在前面的文章中,我们了解了块管理器BlockManager管理下的读写流程。并且已经知道,BlockManager读取块时,如果块在本地找不到,就会去集群内的远端节点去获取。同理,如果BlockManager写入块时需要复制,那么除了在本地写入之外,也要再写一份到远端节点。BlockManager与远端节点的交互就得依赖块传输服务BlockTransferService。但是BlockTransferService需要依赖之前偷懒没有讲过的RPC底层组件,所以现在得把这个坑填上,计划用3篇文章来填。
由于Spark 2.x的RPC环境是完全基于Netty搞的,所以如果看官对Netty有基本的了解的话,读起来会顺畅一点。
RPC底层概览
在系列的文章#8中,我们讲到了RPC环境——即NettyRpcEnv的构建和属性成员。来复习一下。
代码#B1.1 - o.a.s.rpc.netty.NettyRpcEnv中的部分属性成员
private[netty] val transportConf = SparkTransportConf.fromSparkConf(
conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
"rpc",
conf.getInt("spark.rpc.io.threads", 0))
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
private val streamManager = new NettyStreamManager(this)
private val transportContext = new TransportContext(transportConf,
new NettyRpcHandler(dispatcher, this, streamManager))
private val clientFactory = transportContext.createClientFactory(createClientBootstraps())
@volatile private var server: TransportServer = _
- TransportConf:传输配置,作用在RPC环境中类似于SparkConf,负责管理与RPC相关的各种配置。
- Dispatcher:调度器,或者叫分发器,用于将消息路由到正确的RPC端点,它的逻辑在文章#9中已经讲过,就不再提了。
- StreamManager:流式管理器,用于处理RPC环境中的文件,如自定义的配置文件或者JAR包。
- TransportContext:传输上下文,作用在RPC环境中类似于SparkContext,负责管理RPC的服务端(TransportServer)与客户端(TransportClient),和它们之间的Netty Channel和Pipeline。
- TransportClientFactory:创建RPC客户端TransportClient的工厂。
- TransportServer:RPC环境中的服务端,负责提供基础且高效的流式服务。
这些东西就是Spark RPC底层主要的组成部分,之前并没有了解过,下面我们从TransportConf、TransportContext这两样开始探究。
传输配置TransportConf
在Spark源码中并不会显式地创建TransportConf实例,而是通过SparkTransportConf对象代为实现。该对象的源码很短,如下。
代码#B1.2 - o.a.s.network.netty.SparkTransportConf对象
object SparkTransportConf {
private val MAX_DEFAULT_NETTY_THREADS = 8
def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = {
val conf = _conf.clone
val numThreads = defaultNumThreads(numUsableCores)
conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)
new TransportConf(module, new ConfigProvider {
override def get(name: String): String = conf.get(name)
override def get(name: String, defaultValue: String): String = conf.get(name, defaultValue)
override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = {
conf.getAll.toMap.asJava.entrySet()
}
})
}
private def defaultNumThreads(numUsableCores: Int): Int = {
val availableCores =
if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
}
}
可见,SparkTransportConf.fromSparkConf()方法负责从SparkConf持有的参数创建TransportConf。TransportConf接受的构造参数有二:
- module:该RPC配置所属的模块名,比如代码#B.1中创建NettyRpcEnv时,模块名就是"rpc"。
- ConfigProvider:一个简单的负责数据类型转化的类,能够读取SparkConf中与RPC相关的配置,并传递给TransportConf。在如上的代码中,是直接初始化了一个匿名类。
这样,我们就可以通过SparkConf持有、并通过ConfigProvider获取Spark RPC的所有配置参数。换句话说,TransportConf就是SparkConf的一个子集,SparkConf仍然是配置的唯一入口,方便统一管理。
上面代码中需要初始化的参数如下:
- spark.rpc.io.numConnectionsPerPeer:在每对RPC实体间建立的连接数量,默认是1;
- spark.rpc.io.threads:RPC服务端和客户端的线程数,默认为0,即不设置。在此值不设置的情况下,最终会采用可用的核心数与MAX_DEFAULT_NETTY_THREADS常量(为8)之间的较小值。
TransportConf类的实现就非常简单了,主要由很多get方法组成。以下是部分代码,不再赘述。
代码#B1.3 - o.a.s.network.netty.TransportConf类的部分代码
public TransportConf(String module, ConfigProvider conf) {
this.module = module;
this.conf = conf;
SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode");
SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs");
SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout");
// ...略...
}
public String ioMode() { return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase(Locale.ROOT); }
public boolean preferDirectBufs() { return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true); }
public int connectionTimeoutMs() {
long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec(
conf.get("spark.network.timeout", "120s"));
long defaultTimeoutMs = JavaUtils.timeStringAsSec(
conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000;
return (int) defaultTimeoutMs;
}
public int numConnectionsPerPeer() { return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1); }
public int serverThreads() { return conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0); }
public int clientThreads() { return conf.getInt(SPARK_NETWORK_IO_CLIENTTHREADS_KEY, 0); }
public int receiveBuf() { return conf.getInt(SPARK_NETWORK_IO_RECEIVEBUFFER_KEY, -1); }
public int sendBuf() { return conf.getInt(SPARK_NETWORK_IO_SENDBUFFER_KEY, -1); }
传输上下文TransportContext
既然SparkContext是Spark Core功能的主要入口,那么TransportContext自然就是Spark RPC环境的入口了。它比前面讲过的NettyRpcEnv更加底层,如果没有它,RPC环境也就无从谈起了。
成员属性与构造方法
代码#B1.4 - o.a.s.network.TransportContext类的成员属性与构造方法
private final TransportConf conf;
private final RpcHandler rpcHandler;
private final boolean closeIdleConnections;
private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE;
private static final MessageDecoder DECODER = MessageDecoder.INSTANCE;
public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
this(conf, rpcHandler, false);
}
public TransportContext(
TransportConf conf,
RpcHandler rpcHandler,
boolean closeIdleConnections) {
this.conf = conf;
this.rpcHandler = rpcHandler;
this.closeIdleConnections = closeIdleConnections;
}
以下3个成员属性同时也是TransportContext构造方法的参数:
- conf:即RPC传输配置TransportConf;
- rpcHandler:RPC消息处理器RpcHandler,是一个抽象类。顾名思义,它其中包含了所有RPC消息的具体处理逻辑;
- closeIdleConnections:表示是否关闭空闲连接的标志。
另外还有两个常量ENCODER和DECODER。前者是消息编码器MessageEncoder的实例,由Netty提供的MessageToMessageEncoder派生而来,RPC服务端使用它来编码向客户端发送的消息;后者是消息解码器MessageDecoder的实例,由Netty提供的MessageToMessageDecoder派生而来,RPC客户端使用它来解码从服务端收到的消息。
下面具体看看TransportContext提供的方法,通过这些方法,我们能见识到更多其他由TransportContext创建的RPC组件。
创建传输客户端工厂TransportClientFactory
代码#B1.5 - o.a.s.network.TransportContext.createClientFactory()方法
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
return new TransportClientFactory(this, bootstraps);
}
public TransportClientFactory createClientFactory() {
return createClientFactory(new ArrayList<>());
}
createClientFactory()方法负责创建传输客户端工厂TransportClientFactory,由TransportClientFactory进而可以创建更多的传输客户端TransportClient。其参数中的TransportClientBootstrap表示传输客户端的初始化逻辑(通常是一些一次性的工作,比如让它们携带SASL认证的token等)。
createClientFactory()方法是直接调用了TransportClientFactory的构造方法,关于它的逻辑,下篇文章详细讲。
创建传输服务端TransportServer
代码#B1.6 - o.a.s.network.TransportContext.createServer()方法
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
return new TransportServer(this, null, port, rpcHandler, bootstraps);
}
public TransportServer createServer(
String host, int port, List<TransportServerBootstrap> bootstraps) {
return new TransportServer(this, host, port, rpcHandler, bootstraps);
}
public TransportServer createServer(List<TransportServerBootstrap> bootstraps) {
return createServer(0, bootstraps);
}
public TransportServer createServer() {
return createServer(0, new ArrayList<>());
}
createServer()方法有4个重载,可以指定RPC服务端要绑定到的主机地址和端口号,以及初始化逻辑TransportServerBootstrap。TransportServer的构造方法和相关细节会与TransportClientFactory一起讲。
初始化Netty ChannelPipeline与ChannelHandler
先来看initializePipeline()方法。
代码#B1.7 - o.a.s.network.TransportContext.initializePipeline()方法
public TransportChannelHandler initializePipeline(SocketChannel channel) {
return initializePipeline(channel, rpcHandler);
}
public TransportChannelHandler initializePipeline(
SocketChannel channel,
RpcHandler channelRpcHandler) {
try {
TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
channel.pipeline()
.addLast("encoder", ENCODER)
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
.addLast("decoder", DECODER)
.addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
.addLast("handler", channelHandler);
return channelHandler;
} catch (RuntimeException e) {
logger.error("Error while initializing Netty pipeline", e);
throw e;
}
}
这段代码看起来可能有些sophisticated,我们先回忆一下Netty的基础知识。
在Netty架构中,Channel(通道)是消息通信的载体,ChannelHandler(通道处理器)则负责Channel中消息通信具体逻辑的实现。而ChannelPipeline(通道管线)将多个ChannelHandler按顺序组织起来,ChannelEvent(消息体)就按照ChannelPipeline规定的顺序流转。下面用一幅简图来表示它们之间的关系。
在ChannelPipeline内部用双链表来维护ChannelHandler以及它对应的上下文实例ChannelHandlerContext,另外还有特殊的头节点HeadContext和尾节点TailContext。Netty的这种设计可以让用户专注于实现ChannelHandler的逻辑细节,这大概也是Spark开发者们所看重的优点之一吧。
从上文代码中的initializePipeline()方法可以看出,通过链式调用ChannelPipeline.addLast()方法,按顺序添加了以下ChannelHandler:
- 消息编码器MessageEncoder;
- 帧解码器TransportFrameDecoder;
- 消息解码器MessageDecoder;
- 空闲状态处理器IdleStateHandler;
- 真正的RPC处理逻辑TransportChannelHandler。它是Spark RPC环境专用的ChannelHandler。
它们的类图如下所示。
其中,实现了ChannelInboundHandler接口的处理器用于处理请求消息,而实现了ChannelOutboundHandler接口的处理器用于处理响应消息,并且它们的顺序是相反的。因此,处理请求的流程是:TransportFrameDecoder→MessageDecoder→IdleStateHandler→TransportChannelHandler,处理响应的流程是:IdleStateHandler→MessageEncoder。
那么TransportChannelHandler是哪里来的呢?它是由createChannelHandler()方法创建的。
代码#B1.8 - o.a.s.network.TransportContext.createChannelHandler()方法
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
TransportClient client = new TransportClient(channel, responseHandler);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler, conf.maxChunksBeingTransferred());
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs(), closeIdleConnections);
}
由代码可见,一个TransportChannelHandler实际上由三个组件组成:
- TransportResponseHandler,是客户端处理服务端发回的响应消息的处理器;
- TransportClient,单独的RPC客户端,不受TransportClientFactory的管理;
- TransportRequestHandler,是服务端处理客户端发来的请求消息的处理器。
注意,虽然TransportResponseHandler和TransportRequestHandler的名称里都有“Handler”,但它们不是Netty层面上的东西,仅仅是Spark内置的MessageHandler抽象类的实现而已,它规定了处理请求和响应的一些基本规范,后文会讲解到。
总结
本文讲解了RPC环境中的传输配置TransportConf与传输上下文TransportContext的细节,探究了由TransportContext初始化的传输客户端工厂TransportClientFactory、传输服务端TransportServer,最后结合Netty的部分知识讲解了ChannelPipeline与ChannelHandler的初始化逻辑。
民那晚安。