


private[spark] trait RpcEndpoint { 
/*** 当前RpcEndpoint所注册的[[RpcEnv]]***/
     val rpcEnv: RpcEnv  /* 

    * 注意:在`onStart`方法被调用之前,[[RpcEndpoint]]对象还未进行注册,所以就没有有效的[[RpcEndpointRef]] */
final def self: RpcEndpointRef = { require(rpcEnv != null, "rpcEnv has not been initialized") rpcEnv.endpointRef(this) } 

    * 用于处理从`RpcEndpointRef.send` 或 `RpcCallContext.reply`接收到的消息。
    * 如果接收到一个不匹配的消息,将会抛出SparkException异常,并发送给`onError`
    * 通过上面的receive方法,接收由RpcEndpointRef.send方法发送的消息,
    * 该类消息不需要进行响应消息(Reply),而只是在RpcEndpoint端进行处理。  */ 
       def receive: PartialFunction[Any, Unit] = { case _ => throw new SparkException(self + " does not implement 'receive'") } 

   /* * 处理来自`RpcEndpointRef.ask`的消息,RpcEndpoint端处理完消息后,需要给调用RpcEndpointRef.ask的通信端返回响应消息。 */
       def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case _ => context.sendFailure(new SparkException(self + " won't reply anything")) }

    * 在处理消息期间出现异常的话将被调用  */ 
       def onError(cause: Throwable): Unit = { // By default, throw e and let RpcEnv handle it throw cause } 

    * 当有远端连接到当前服务器时会被调用*/ 
       def onConnected(remoteAddress: RpcAddress): Unit = { // By default, do nothing. } 

    * 当远端与当前服务器断开时,该方法会被调用 */
       def onDisconnected(remoteAddress: RpcAddress): Unit = { // By default, do nothing. } 

    * 当前节点与远端之间的连接发生错误时,该方法将会被调用*/
       def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { // By default, do nothing. }

    * 在 [[RpcEndpoint]] 开始处理消息之前被调用*/
       def onStart(): Unit = { // By default, do nothing. } 

    * 当[[RpcEndpoint]]正在停止时,该方法将会被调用。
    * `self`将会在该方法中被置位null,因此你不能使用它来发送消息。*/
       def onStop(): Unit = { // By default, do nothing. }

    * A convenient method to stop [[RpcEndpoint]].*/
       final def stop(): Unit = { val _self = self if (_self != null) { rpcEnv.stop(_self) } } }


 * A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.
private[spark] abstract class RpcEndpointRef(conf: SparkConf)
  extends Serializable with Logging {

  private[this] val maxRetries = RpcUtils.numRetries(conf)
  private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
  private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)

   * return the address for the [[RpcEndpointRef]]
  def address: RpcAddress

  def name: String

   * Sends a one-way asynchronous message. Fire-and-forget semantics.
  def send(message: Any): Unit

   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
   * receive the reply within the specified timeout.
   * This method only sends the message once and never retries.
  def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
   * receive the reply within a default timeout.
   * This method only sends the message once and never retries.
  def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)

   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
   * default timeout, throw an exception if this fails.
   * Note: this is a blocking action which may cost a lot of time,  so don't call it in a message
   * loop of [[RpcEndpoint]].

   * @param message the message to send
   * @tparam T type of the reply message
   * @return the reply message from the corresponding [[RpcEndpoint]]
  def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)

   * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a
   * specified timeout, throw an exception if this fails.
   * Note: this is a blocking action which may cost a lot of time, so don't call it in a message
   * loop of [[RpcEndpoint]].
   * @param message the message to send
   * @param timeout the timeout duration
   * @tparam T type of the reply message
   * @return the reply message from the corresponding [[RpcEndpoint]]
  def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
    val future = ask[T](message, timeout)

表示远程的RpcEndpointRef的地址,Host + Port。

 * Address for an RPC environment, with hostname and port.
private[spark] case class RpcAddress(host: String, port: Int) {

  def hostPort: String = host + ":" + port

  /** Returns a string in the form of "spark://host:port". */
  def toSparkURL: String = "spark://" + hostPort

  override def toString: String = hostPort

private[spark] object RpcAddress {

  /** Return the [[RpcAddress]] represented by `uri`. */
  def fromURIString(uri: String): RpcAddress = {
    val uriObj = new
    RpcAddress(uriObj.getHost, uriObj.getPort)

  /** Returns the [[RpcAddress]] encoded in the form of "spark://host:port" */
  def fromSparkURL(sparkUrl: String): RpcAddress = {
    val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
    RpcAddress(host, port)
  1. RpcEnv
 * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to
 * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote
 * nodes, and deliver them to corresponding [[RpcEndpoint]]s. For uncaught exceptions caught by
 * [[RpcEnv]], [[RpcEnv]] will use [[RpcCallContext.sendFailure]] to send exceptions back to the
 * sender, or logging them if no such sender or `NotSerializableException`.
 * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri.
private[spark] abstract class RpcEnv(conf: SparkConf) {

  private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)

   * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
   * [[RpcEndpoint.self]]. Return `null` if the corresponding [[RpcEndpointRef]] does not exist.
  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef

   * Return the address that [[RpcEnv]] is listening to.
  def address: RpcAddress

   * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not
   * guarantee thread-safety.
  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef

   * Retrieve the [[RpcEndpointRef]] represented by `uri` asynchronously.
  def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]

   * Retrieve the [[RpcEndpointRef]] represented by `uri`. This is a blocking action.
  def setupEndpointRefByURI(uri: String): RpcEndpointRef = {

   * Retrieve the [[RpcEndpointRef]] represented by `address` and `endpointName`.
   * This is a blocking action.
  def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {
    setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString)

   * Stop [[RpcEndpoint]] specified by `endpoint`.
  def stop(endpoint: RpcEndpointRef): Unit

   * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully,
   * call [[awaitTermination()]] straight after [[shutdown()]].
  def shutdown(): Unit

   * Wait until [[RpcEnv]] exits.
   * TODO do we need a timeout parameter?
  def awaitTermination(): Unit

   * [[RpcEndpointRef]] cannot be deserialized without [[RpcEnv]]. So when deserializing any object
   * that contains [[RpcEndpointRef]]s, the deserialization codes should be wrapped by this method.
  def deserialize[T](deserializationAction: () => T): T

   * Return the instance of the file server used to serve files. This may be `null` if the
   * RpcEnv is not operating in server mode.
  def fileServer: RpcEnvFileServer

   * Open a channel to download a file from the given URI. If the URIs returned by the
   * RpcEnvFileServer use the "spark" scheme, this method will be called by the Utils class to
   * retrieve the files.
   * @param uri URI with location of the file.
  def openChannel(uri: String): ReadableByteChannel

 * A server used by the RpcEnv to server files to other processes owned by the application.
 * The file server can return URIs handled by common libraries (such as "http" or "hdfs"), or
 * it can return "spark" URIs which will be handled by `RpcEnv#fetchFile`.
private[spark] trait RpcEnvFileServer {

   * Adds a file to be served by this RpcEnv. This is used to serve files from the driver
   * to executors when they're stored on the driver's local file system.
   * @param file Local file to serve.
   * @return A URI for the location of the file.
  def addFile(file: File): String

   * Adds a jar to be served by this RpcEnv. Similar to `addFile` but for jars added using
   * `SparkContext.addJar`.
   * @param file Local file to serve.
   * @return A URI for the location of the file.
  def addJar(file: File): String

   * Adds a local directory to be served via this file server.
   * @param baseUri Leading URI path (files can be retrieved by appending their relative
   *                path to this base URI). This cannot be "files" nor "jars".
   * @param path Path to the local directory.
   * @return URI for the root of the directory in the file server.
  def addDirectory(baseUri: String, path: File): String

  /** Validates and normalizes the base URI for directories. */
  protected def validateDirectoryUri(baseUri: String): String = {
    val fixedBaseUri = "/" + baseUri.stripPrefix("/").stripSuffix("/")
    require(fixedBaseUri != "/files" && fixedBaseUri != "/jars",
      "Directory URI cannot be /files nor /jars.")



  private class EndpointData(
      val name: String,
      val endpoint: RpcEndpoint,
      val ref: NettyRpcEndpointRef) {
    val inbox = new Inbox(ref, endpoint)








  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,723评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,080评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,604评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,440评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,431评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,499评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,893评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,541评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,751评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,547评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,619评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,320评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,890评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,896评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,137评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,796评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,335评论 2 342


  • 本文基于spark源码2.11 1. 概要 spark中网络通信无处不在,例如 driver和master的通信,...
    aaron1993阅读 3,375评论 1 3
  • Spark Network 模块分析 为什么用Netty通信框架代替Akka 一直以来,基于Akka实现的RPC通...
    Alex90阅读 2,775评论 0 3
  • 相关概念 主要涉及RpcEnv,RpcEndpoint,RpcEndpointRef,其中RpcEnv是通信的基础...
    wangdy12阅读 2,921评论 0 3
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,579评论 18 139
  • rpc可以说是一个分布式系统最基础的组件了。这里解析一下spark的内部rpc框架。 RpcEndpoint Rp...
    WJL3333阅读 395评论 0 2