1.6之后的spark信息传输都是基于nio实现的rpc。
让我们深入了解且源码分析一波
再次之前介绍几个名词
1.RpcEndpoint
该类定义了RPC通信过程中的服务器端对象。表示一个个需要通信的个体(如master,worker,driver),主要根据接收的消息来进行对应的处理。一个RpcEndpoint经历的过程依次是:构建->onStart→receive→onStop。其中onStart在接收任务消息前调用,receive和receiveAndReply分别用来接收另一个RpcEndpoint(也可以是本身)send和ask过来的消息。并给出了通信过程中RpcEndpoint所具有的基于事件驱动的行为(连接、断开、网络异常),实际上对于Spark框架来说主要是接收消息并处理。
private[spark] trait RpcEndpoint {
/*** 当前RpcEndpoint所注册的[[RpcEnv]]***/
val rpcEnv: RpcEnv /*
当前[[RpcEndpoint]]的代理,当`onStart`方法被调用时`self`生效,当`onStop`被调用时,`self`变成null。
* 注意:在`onStart`方法被调用之前,[[RpcEndpoint]]对象还未进行注册,所以就没有有效的[[RpcEndpointRef]] */
final def self: RpcEndpointRef = { require(rpcEnv != null, "rpcEnv has not been initialized") rpcEnv.endpointRef(this) }
/**
* 用于处理从`RpcEndpointRef.send` 或 `RpcCallContext.reply`接收到的消息。
* 如果接收到一个不匹配的消息,将会抛出SparkException异常,并发送给`onError`
*
* 通过上面的receive方法,接收由RpcEndpointRef.send方法发送的消息,
* 该类消息不需要进行响应消息(Reply),而只是在RpcEndpoint端进行处理。 */
def receive: PartialFunction[Any, Unit] = { case _ => throw new SparkException(self + " does not implement 'receive'") }
/* * 处理来自`RpcEndpointRef.ask`的消息,RpcEndpoint端处理完消息后,需要给调用RpcEndpointRef.ask的通信端返回响应消息。 */
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case _ => context.sendFailure(new SparkException(self + " won't reply anything")) }
/**
* 在处理消息期间出现异常的话将被调用 */
def onError(cause: Throwable): Unit = { // By default, throw e and let RpcEnv handle it throw cause }
/**
* 当有远端连接到当前服务器时会被调用*/
def onConnected(remoteAddress: RpcAddress): Unit = { // By default, do nothing. }
/**
* 当远端与当前服务器断开时,该方法会被调用 */
def onDisconnected(remoteAddress: RpcAddress): Unit = { // By default, do nothing. }
/**
* 当前节点与远端之间的连接发生错误时,该方法将会被调用*/
def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { // By default, do nothing. }
/**
* 在 [[RpcEndpoint]] 开始处理消息之前被调用*/
def onStart(): Unit = { // By default, do nothing. }
/**
* 当[[RpcEndpoint]]正在停止时,该方法将会被调用。
* `self`将会在该方法中被置位null,因此你不能使用它来发送消息。*/
def onStop(): Unit = { // By default, do nothing. }
/**
* A convenient method to stop [[RpcEndpoint]].*/
final def stop(): Unit = { val _self = self if (_self != null) { rpcEnv.stop(_self) } } }
2.RpcEndpointRef
RpcEndpointRef是对远程RpcEndpoint的一个引用。当我们需要向一个具体的RpcEndpoint发送消息时,一般我们需要获取到该RpcEndpoint的引用,然后通过该应用发送消息。
/**
* A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.
*/
private[spark] abstract class RpcEndpointRef(conf: SparkConf)
extends Serializable with Logging {
private[this] val maxRetries = RpcUtils.numRetries(conf)
private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
/**
* return the address for the [[RpcEndpointRef]]
*/
def address: RpcAddress
def name: String
/**
* Sends a one-way asynchronous message. Fire-and-forget semantics.
*/
def send(message: Any): Unit
/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
* receive the reply within the specified timeout.
*
* This method only sends the message once and never retries.
*/
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
* receive the reply within a default timeout.
*
* This method only sends the message once and never retries.
*/
def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
* default timeout, throw an exception if this fails.
*
* Note: this is a blocking action which may cost a lot of time, so don't call it in a message
* loop of [[RpcEndpoint]].
* @param message the message to send
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)
/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
* specified timeout, throw an exception if this fails.
*
* Note: this is a blocking action which may cost a lot of time, so don't call it in a message
* loop of [[RpcEndpoint]].
*
* @param message the message to send
* @param timeout the timeout duration
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
val future = ask[T](message, timeout)
timeout.awaitResult(future)
}
/**
因为单词都不难所以就不着一翻译了。在这总结下该类的几个方法。
send属于发送后遗忘性,不要求答复。
ask要求答复,并分为了默认时间和给定时间。
sendSync属于线程阻塞性的ask。
}
3.RpcAddress
表示远程的RpcEndpointRef的地址,Host + Port。
/**
* Address for an RPC environment, with hostname and port.
*/
private[spark] case class RpcAddress(host: String, port: Int) {
def hostPort: String = host + ":" + port
/** Returns a string in the form of "spark://host:port". */
def toSparkURL: String = "spark://" + hostPort
override def toString: String = hostPort
}
private[spark] object RpcAddress {
/** Return the [[RpcAddress]] represented by `uri`. */
def fromURIString(uri: String): RpcAddress = {
val uriObj = new java.net.URI(uri)
RpcAddress(uriObj.getHost, uriObj.getPort)
}
/** Returns the [[RpcAddress]] encoded in the form of "spark://host:port" */
def fromSparkURL(sparkUrl: String): RpcAddress = {
val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
RpcAddress(host, port)
}
}
- RpcEnv
RpcEnv为RpcEndpoint提供处理消息的环境。RpcEnv负责RpcEndpoint整个生命周期的管理,包括:注册endpoint,endpoint之间消息的路由,以及停止endpoint。由RpcFactory产生。前面说rpc是基于netty实现的,所以其实现类叫做,nettyRpcEnv。
/**
* An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to
* receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote
* nodes, and deliver them to corresponding [[RpcEndpoint]]s. For uncaught exceptions caught by
* [[RpcEnv]], [[RpcEnv]] will use [[RpcCallContext.sendFailure]] to send exceptions back to the
* sender, or logging them if no such sender or `NotSerializableException`.
*
* [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri.
*/
private[spark] abstract class RpcEnv(conf: SparkConf) {
private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)
/**
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
* [[RpcEndpoint.self]]. Return `null` if the corresponding [[RpcEndpointRef]] does not exist.
*/
private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
/**
* Return the address that [[RpcEnv]] is listening to.
*/
def address: RpcAddress
/**
* Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not
* guarantee thread-safety.
*/
def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
/**
* Retrieve the [[RpcEndpointRef]] represented by `uri` asynchronously.
*/
def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]
/**
* Retrieve the [[RpcEndpointRef]] represented by `uri`. This is a blocking action.
*/
def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
}
/**
* Retrieve the [[RpcEndpointRef]] represented by `address` and `endpointName`.
* This is a blocking action.
*/
def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {
setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString)
}
/**
* Stop [[RpcEndpoint]] specified by `endpoint`.
*/
def stop(endpoint: RpcEndpointRef): Unit
/**
* Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully,
* call [[awaitTermination()]] straight after [[shutdown()]].
*/
def shutdown(): Unit
/**
* Wait until [[RpcEnv]] exits.
*
* TODO do we need a timeout parameter?
*/
def awaitTermination(): Unit
/**
* [[RpcEndpointRef]] cannot be deserialized without [[RpcEnv]]. So when deserializing any object
* that contains [[RpcEndpointRef]]s, the deserialization codes should be wrapped by this method.
*/
def deserialize[T](deserializationAction: () => T): T
/**
* Return the instance of the file server used to serve files. This may be `null` if the
* RpcEnv is not operating in server mode.
*/
def fileServer: RpcEnvFileServer
/**
* Open a channel to download a file from the given URI. If the URIs returned by the
* RpcEnvFileServer use the "spark" scheme, this method will be called by the Utils class to
* retrieve the files.
*
* @param uri URI with location of the file.
*/
def openChannel(uri: String): ReadableByteChannel
}
/**
* A server used by the RpcEnv to server files to other processes owned by the application.
*
* The file server can return URIs handled by common libraries (such as "http" or "hdfs"), or
* it can return "spark" URIs which will be handled by `RpcEnv#fetchFile`.
*/
private[spark] trait RpcEnvFileServer {
/**
* Adds a file to be served by this RpcEnv. This is used to serve files from the driver
* to executors when they're stored on the driver's local file system.
*
* @param file Local file to serve.
* @return A URI for the location of the file.
*/
def addFile(file: File): String
/**
* Adds a jar to be served by this RpcEnv. Similar to `addFile` but for jars added using
* `SparkContext.addJar`.
*
* @param file Local file to serve.
* @return A URI for the location of the file.
*/
def addJar(file: File): String
/**
* Adds a local directory to be served via this file server.
*
* @param baseUri Leading URI path (files can be retrieved by appending their relative
* path to this base URI). This cannot be "files" nor "jars".
* @param path Path to the local directory.
* @return URI for the root of the directory in the file server.
*/
def addDirectory(baseUri: String, path: File): String
/** Validates and normalizes the base URI for directories. */
protected def validateDirectoryUri(baseUri: String): String = {
val fixedBaseUri = "/" + baseUri.stripPrefix("/").stripSuffix("/")
require(fixedBaseUri != "/files" && fixedBaseUri != "/jars",
"Directory URI cannot be /files nor /jars.")
fixedBaseUri
}
}
/*由类介绍可以知道,env还有发放信息的功能,
他会处理来自RpcEndpointRef和各节点的信息,并交付到各个RpcEndpoint中。
5.Dispatcher
消息转发路由器。它有一个EndpointData,主要包含了一个收邮件,和RpcEndpoint主体和i信息。它相当于消息中转站。
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
val inbox = new Inbox(ref, endpoint)
}
6.OutBox
发件箱。OnBox作用于客户端。类似于收件箱InBox,它与TransportClient是一对一的关系,而一个TransportClient对应着一个TransportRequestHandler,每一个又对应着一条channl。
7.InBox
收件箱。从上面的体系结构图可知,InBox作用于服务器端。它与RpcEndpoint是一对一的关系,每一个命名唯一的RpcEndpoint对应一个线程安全的InBox。所有发送给一个RpcEndpoint的消息,都由对应的InBox进行存储。InBox提供一个process方法实现,该方法会在一个dispatcher-event-loop线程池中被调用,将InBox中的消息提供给关联的RpcEndpoint进行消费。
需要注意的是,如果通信端端点的实现是继承自ThreadSafeRpcEndpoint,则表明该Endpoint不允许并发处理消息。如果继承自RpcEndpoint,那么就可以并发的调用该服务。在具体的process方法中,如果enableConcurrent为false,即只允许单线程处理。那么执行process方法时,如果numActiveThreads大于0,说明已经至少有一个线程正在处理,则立即返回,取消本次处理操作。
此图来自[落枫寒2017]
实践:
从源码分析,worker->master心跳