Spark-Core源码精读(7)、“Driver“服务启动流程解析

本文将解析Spark中Driver服务的开启流程,闲言少叙,直接进入源码。

首先Driver服务的开启是在创建Driver的运行时环境的时候完成的,如下所示:

SparkContext中:

// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)

可以看到执行的是SparkEnv的createDriverEnv:

private[spark] def createSparkEnv(
    conf: SparkConf,
    isLocal: Boolean,
    listenerBus: LiveListenerBus): SparkEnv = {
  // 创建Driver的运行时环境,注意这里的numDriverCores是local模式下用来执行计算的cores的个数,如果不是本地模式的话就是0
  SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
}

numDriverCores的计算:

/**
 * The number of driver cores to use for execution in local mode, 0 otherwise.
 */
private[spark] def numDriverCores(master: String): Int = {
  def convertToInt(threads: String): Int = {
    if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt
  }
  master match {
    case "local" => 1
    case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
    case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
    case _ => 0 // driver is not used for execution
  }
}

在SparkEnv中创建Driver运行时环境的代码:

/**
 * Create a SparkEnv for the driver.
 */
private[spark] def createDriverEnv(
    conf: SparkConf,
    isLocal: Boolean,
    listenerBus: LiveListenerBus,
    numCores: Int,
    mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
  assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
  assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
  val hostname = conf.get("spark.driver.host")
  val port = conf.get("spark.driver.port").toInt
  create(
    conf,
    SparkContext.DRIVER_IDENTIFIER,  // "driver"
    hostname,
    port,
    isDriver = true,
    isLocal = isLocal,
    numUsableCores = numCores,
    listenerBus = listenerBus,
    mockOutputCommitCoordinator = mockOutputCommitCoordinator
  )
}

我们在前面的文章中大致的浏览过,现在聚焦Driver服务启动相关的部分:

// 这里我们是Driver,所以actorSystemName是"sparkDriver"
// 注意Spark2.x中已经移除了对Akka的依赖,所以在Spark2.x中这里是driverSystemName和executorSystemName
// Create the ActorSystem for Akka and get the port it binds to.
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
// 创建Driver的运行时环境,注意这里的clientMode等于false
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
  clientMode = !isDriver)

接下来是RpcEnv的create方法:

def create(
    name: String,
    host: String,
    port: Int,
    conf: SparkConf,
    securityManager: SecurityManager,
    clientMode: Boolean = false): RpcEnv = {
  // Using Reflection to create the RpcEnv to avoid to depend on Akka directly
  // 封装成RpcEnvConfig,这里的name是"sparkDriver",host是"driver",clientMode是"false"
  val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
  // 这里实际上是通过反射的到的是NettyRpcEnvFactory,所以调用的是NettyRpcEnvFactory的create()方法
  getRpcEnvFactory(conf).create(config)
}

底层实现是NettyRpcEnvFactory的create方法:

def create(config: RpcEnvConfig): RpcEnv = {
  val sparkConf = config.conf
  // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
  // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
  val javaSerializerInstance =
    new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
  // 实例化了NettyRpcEnv,名字为config.host,即driver
  val nettyEnv =
    new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager)
  // 传进来的clientMode为false,所以这里的判断为true
  if (!config.clientMode) {
    // 定义了一个函数startNettyRpcEnv
    val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
      nettyEnv.startServer(actualPort)
      // 返回NettyRpcEnv及其端口号
      (nettyEnv, nettyEnv.address.port)
    }
    try {
      // 开启“sparkDriver”服务,注意此处传进了上面定义的函数,这里的config.name是"sparkDriver",最后返回了NettyRpcEnv
      Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
    } catch {
      case NonFatal(e) =>
        nettyEnv.shutdown()
        throw e
    }
  }
  // 返回NettyRpcEnv
  nettyEnv
}

Utils中的startServiceOnPort方法:

def startServiceOnPort[T](
    startPort: Int,
    startService: Int => (T, Int),
    conf: SparkConf,
    serviceName: String = ""): (T, Int) = {
  // 我们传进来的startPort为0,所以会生成一个随机的端口号
  require(startPort == 0 || (1024 <= startPort && startPort < 65536),
    "startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.")
  // " 'sparkDriver'"
  val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
  // 通过"spark.port.maxRetries"设置,如果没有设置,而设置中包括"spark.testing",
  // 最大重试次数就是100次,否则最大重试次数就是10次
  val maxRetries = portMaxRetries(conf)
  for (offset <- 0 to maxRetries) {
    // 设置端口号
    // Do not increment port if startPort is 0, which is treated as a special port
    val tryPort = if (startPort == 0) {
      startPort
    } else {
      // If the new port wraps around, do not try a privilege port
      ((startPort + offset - 1024) % (65536 - 1024)) + 1024
    }
    try {
      // 开启服务,并返回服务和端口号,注意这里的startService是上面传进来的那个函数startNettyRpcEnv
      // 所以我们实际上执行的是startNettyRpcEnv(tryPort),而根据startNettyRpcEnv函数的定义,实际
      // 上是调用了nettyEnv.startServer(tryPort)方法
      val (service, port) = startService(tryPort)
      // 创建成功后打印日志,serviceString就是"sparkDriver"
      logInfo(s"Successfully started service$serviceString on port $port.")
      // 返回服务和端口号
      return (service, port)
    } catch {
      case e: Exception if isBindCollision(e) =>
        if (offset >= maxRetries) {
          val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after " +
            s"$maxRetries retries! Consider explicitly setting the appropriate port for the " +
            s"service$serviceString (for example spark.ui.port for SparkUI) to an available " +
            "port or increasing spark.port.maxRetries."
          val exception = new BindException(exceptionMessage)
          // restore original stack trace
          exception.setStackTrace(e.getStackTrace)
          throw exception
        }
        logWarning(s"Service$serviceString could not bind on port $tryPort. " +
          s"Attempting port ${tryPort + 1}.")
    }
  }
  // Should never happen
  throw new SparkException(s"Failed to start service$serviceString on port $startPort")
}

下面我们就具体看一下NettyRpcEnv中的这个startServer方法,具体的启动方法我们不再追踪了,最后实际上创建了一个TransportServer。

def startServer(port: Int): Unit = {
  // 首先实例化bootstraps
  val bootstraps: java.util.List[TransportServerBootstrap] =
    if (securityManager.isAuthenticationEnabled()) {
      java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager))
    } else {
      java.util.Collections.emptyList()
    }
  // 实例化server
  server = transportContext.createServer(host, port, bootstraps)
  // 向dispatcher注册
  dispatcher.registerRpcEndpoint(
    RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}

再回到SparkEnv中,开启了"sparkDriver"服务后,又创建了Akka的ActorSystem,具体的创建过程就不分析了。

// 开启了sparkDriverActorSystem服务,spark2.x中已经移除了对Akka的依赖
val actorSystem: ActorSystem =
  if (rpcEnv.isInstanceOf[AkkaRpcEnv]) {
    rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
  } else {
    val actorSystemPort =
      if (port == 0 || rpcEnv.address == null) {
        port
      } else {
        rpcEnv.address.port + 1
      }
    // Create a ActorSystem for legacy codes
    AkkaUtils.createActorSystem(
      actorSystemName + "ActorSystem",
      hostname,
      actorSystemPort,
      conf,
      securityManager
    )._1
  }
  
// 最后使用开启的服务的端口替换掉原来的端口
if (isDriver) {
  conf.set("spark.driver.port", rpcEnv.address.port.toString)
} else if (rpcEnv.address != null) {
  conf.set("spark.executor.port", rpcEnv.address.port.toString)
}

我们使用spark-submit的client模式提交应用程序时,就可以看到关于这部分的日志信息:

17/03/02 09:38:28 INFO Utils: Successfully started service 'sparkDriver' on port 33861.
17/03/02 09:38:29 INFO Slf4jLogger: Slf4jLogger started
17/03/02 09:38:29 INFO Remoting: Starting remoting
17/03/02 09:38:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@172.17.0.2:34803]
17/03/02 09:38:29 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 34803.

注意:本文基于的是Spark 1.6.3版本的源码,并对Spark 2.x版本的改变进行了相应的说明,这里给出具体的连接供大家参考:

Spark 1.6.3 源码

Spark 2.1.0 源码

本文为原创,欢迎转载,转载请注明出处、作者,谢谢!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容