Spark消息总线实现

消息总线概述

消息总线是Spark内部进行消息传递,触发事件的框架,消息总线的核心是由三个抽象的对象组成:

  • Event: 定义了一个事件;
  • Listener:定义了一个监听器,是用于对Event作出响应的实体;
  • Bus: 将Event路由到某个Listener的管道,负责接收Event、注册Listener,可以说Bus是Listener与Event的连接器。

ListenerBus

在Spark中,消息总线最顶层的抽象是:ListenerBus,其源码实现如下:

package org.apache.spark.util

import java.util.concurrent.CopyOnWriteArrayList

import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import com.codahale.metrics.Timer

import org.apache.spark.internal.Logging

/**
 * An event bus which posts events to its listeners.
 */
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {

  private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]

  // Marked `private[spark]` for access in tests.
  private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava

  /**
   * Returns a CodaHale metrics Timer for measuring the listener's event processing time.
   * This method is intended to be overridden by subclasses.
   */
  protected def getTimer(listener: L): Option[Timer] = None

  /**
   * Add a listener to listen events. This method is thread-safe and can be called in any thread.
   */
  final def addListener(listener: L): Unit = {
    listenersPlusTimers.add((listener, getTimer(listener)))
  }

  /**
   * Remove a listener and it won't receive any events. This method is thread-safe and can be called
   * in any thread.
   */
  final def removeListener(listener: L): Unit = {
    listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
      listenersPlusTimers.remove(listenerAndTimer)
    }
  }

  /**
   * Post the event to all registered listeners. The `postToAll` caller should guarantee calling
   * `postToAll` in the same thread for all events.
   */
  def postToAll(event: E): Unit = {
    // JavaConverters can create a JIterableWrapper if we use asScala.
    // However, this method will be called frequently. To avoid the wrapper cost, here we use
    // Java Iterator directly.
    val iter = listenersPlusTimers.iterator
    while (iter.hasNext) {
      val listenerAndMaybeTimer = iter.next()
      val listener = listenerAndMaybeTimer._1
      val maybeTimer = listenerAndMaybeTimer._2
      val maybeTimerContext = if (maybeTimer.isDefined) {
        maybeTimer.get.time()
      } else {
        null
      }
      try {
        doPostEvent(listener, event)
      } catch {
        case NonFatal(e) =>
          logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
      } finally {
        if (maybeTimerContext != null) {
          maybeTimerContext.stop()
        }
      }
    }
  }

  /**
   * Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same
   * thread for all listeners.
   */
  protected def doPostEvent(listener: L, event: E): Unit

  private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
    val c = implicitly[ClassTag[T]].runtimeClass
    listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
  }

}

其中listenersPlusTimers是一个线程安全的List对象,用于存储注册在该总线上的Listener,此外还定义了注册Listener的方法-addListener和注销Listener的方法-removeListener。

在这个抽象类中最重要的方法是postToAll,该方法负责将传入的Event广播给listenersPlusTimers中注册的每一个Listener,通过调用doPostEvent方法将Event和Listener进行关联。doPostEvent在这个特质中没有给出具体的实现,可以通过继承ListenerBus来实现自定义的消息总线。

在Spark中ListenerBus有很多中实现,直接实现包括SparkListenerBus,ExternalCatalog等。每一种实现又根据特定的场景有细分,SparkListenerBus是Spark中最重要的消息总线实现,下面重点描述。

SparkListenerBus

SparkListenerBus的实现源码如下:

package org.apache.spark.scheduler

import org.apache.spark.util.ListenerBus

/**
 * A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners
 */
private[spark] trait SparkListenerBus
  extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {

  protected override def doPostEvent(
      listener: SparkListenerInterface,
      event: SparkListenerEvent): Unit = {
    event match {
      case stageSubmitted: SparkListenerStageSubmitted =>
        listener.onStageSubmitted(stageSubmitted)
      case stageCompleted: SparkListenerStageCompleted =>
        listener.onStageCompleted(stageCompleted)
      case jobStart: SparkListenerJobStart =>
        listener.onJobStart(jobStart)
      case jobEnd: SparkListenerJobEnd =>
        listener.onJobEnd(jobEnd)
      case taskStart: SparkListenerTaskStart =>
        listener.onTaskStart(taskStart)
      case taskGettingResult: SparkListenerTaskGettingResult =>
        listener.onTaskGettingResult(taskGettingResult)
      case taskEnd: SparkListenerTaskEnd =>
        listener.onTaskEnd(taskEnd)
      case environmentUpdate: SparkListenerEnvironmentUpdate =>
        listener.onEnvironmentUpdate(environmentUpdate)
      case blockManagerAdded: SparkListenerBlockManagerAdded =>
        listener.onBlockManagerAdded(blockManagerAdded)
      case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
        listener.onBlockManagerRemoved(blockManagerRemoved)
      case unpersistRDD: SparkListenerUnpersistRDD =>
        listener.onUnpersistRDD(unpersistRDD)
      case applicationStart: SparkListenerApplicationStart =>
        listener.onApplicationStart(applicationStart)
      case applicationEnd: SparkListenerApplicationEnd =>
        listener.onApplicationEnd(applicationEnd)
      case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
        listener.onExecutorMetricsUpdate(metricsUpdate)
      case executorAdded: SparkListenerExecutorAdded =>
        listener.onExecutorAdded(executorAdded)
      case executorRemoved: SparkListenerExecutorRemoved =>
        listener.onExecutorRemoved(executorRemoved)
      case executorBlacklisted: SparkListenerExecutorBlacklisted =>
        listener.onExecutorBlacklisted(executorBlacklisted)
      case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
        listener.onExecutorUnblacklisted(executorUnblacklisted)
      case nodeBlacklisted: SparkListenerNodeBlacklisted =>
        listener.onNodeBlacklisted(nodeBlacklisted)
      case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
        listener.onNodeUnblacklisted(nodeUnblacklisted)
      case blockUpdated: SparkListenerBlockUpdated =>
        listener.onBlockUpdated(blockUpdated)
      case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
        listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
      case _ => listener.onOtherEvent(event)
    }
  }

}

由代码可以看出,SparkListenerBus重写了ListenerBus的doPostEvent方法,定义了自己的Listener类型:SparkListenerInterface和Event类型:SparkListenerEvent,在doPostEvent方法中通过类型匹配,只处理特定的事件,匹配到特定的事件后,将事件交给SparkListenerInterface的对应方法去处理。
SparkListenerInterface是一个特质,定义了所有SparkListenerEvent的处理方法,最常见的实现SparkListenerInterface的类是驻守在Spark Driver进程中的HeartbeatReceiver类,这个类用于维护Driver与Executor之间的心跳,后续会对这个类展开详细解读。

异步通信的消息总线AsyncEventQueue

SparkListenerBus的doPostEvent方法将消息路由到了Listener的具体方法之中,并未定义Listener是以什么形似对Event作出响应。
AsyncEventQueue通过继承SparkListenerBus,定义了一种异步响应Event的消息框架,其基本原理如下:
1 . 定义一个FIFO且线程安全的Queue-eventQueue,eventQueue用于存放等待触发执行的Event。

  // Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
  // it's perpetually being added to more quickly than it's being drained.
  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
    conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
  1. 定义一个线程dispatchThread,用于循环从eventQueue中取出Event,并将Event dispatch到Listener。
  private val dispatchThread = new Thread(s"spark-listener-group-$name") {
    setDaemon(true)
    override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
      dispatch()
    }
  }

其中tryOrStopSparkContext方法保证了在dispatch遇到无法控制的异常时将SparkContext自动退出,避免内存泄漏。

dispatch()函数的实现逻辑如下:

  private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
    try {
      var next: SparkListenerEvent = eventQueue.take()
      while (next != POISON_PILL) {
        val ctx = processingTime.time()
        try {
          super.postToAll(next)
        } finally {
          ctx.stop()
        }
        eventCount.decrementAndGet()
        next = eventQueue.take()
      }
      eventCount.decrementAndGet()
    } catch {
      case ie: InterruptedException =>
        logInfo(s"Stopping listener queue $name.", ie)
    }
  }

该方法定义了一个循环体不停的从eventQueue中取出event,并派发到总线中注册的Listener中。该总线定义了一个“哨兵”Event-POISON_PILL,用于标记stop, 当调用stop时,这个Event-POISON_PILL会被添加到eventQueue的尾部,当取出Event-POISON_PILL时,退出循环,从而退出线程dispatch线程。当线程退出时,eventCount变为0.

  1. 定义post方法,将Event添加到eventQueue中
 def post(event: SparkListenerEvent): Unit = {
    if (stopped.get()) {
      return
    }

    eventCount.incrementAndGet()
    if (eventQueue.offer(event)) {
      return
    }

    eventCount.decrementAndGet()
    droppedEvents.inc()
    droppedEventsCounter.incrementAndGet()
    if (logDroppedEvent.compareAndSet(false, true)) {
      // Only log the following message once to avoid duplicated annoying logs.
      logError(s"Dropping event from queue $name. " +
        "This likely means one of the listeners is too slow and cannot keep up with " +
        "the rate at which tasks are being started by the scheduler.")
    }
    logTrace(s"Dropping event $event")

    val droppedCount = droppedEventsCounter.get
    if (droppedCount > 0) {
      // Don't log too frequently
      if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
        // There may be multiple threads trying to decrease droppedEventsCounter.
        // Use "compareAndSet" to make sure only one thread can win.
        // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
        // then that thread will update it.
        if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
          val prevLastReportTimestamp = lastReportTimestamp
          lastReportTimestamp = System.currentTimeMillis()
          val previous = new java.util.Date(prevLastReportTimestamp)
          logWarning(s"Dropped $droppedEvents events from $name since $previous.")
        }
      }
    }
  }

整个实现是一个典型的“生产者-消费者”模式,post方法是Event的生产者,负责向eventQueue中添加Event,dispatchThread线程是消费者,负责从eventQueue取出Event并派发到Listener。该方法中eventCount 是一个AtomicLong对象,用于单独记录Event的个数,单独记录Event,而不使用eventQueue的原因是能够保证当eventCount为0时,所有的Event都被处理了,而不仅仅是从eventQueue中取出了,这个可以在类的dispatch方法中看出。

当AsyncEventQueue的stop被调用后,post不再接收新的Event。

LiveListenerBus 这个AsyncEventQueue的容器

LiveListenerBus从面向对象的角度来讲,它不是一个bus,它定义了一个CopyOnWriteArrayList对象queues来存储AsyncEventQueue,LiveListenerBus负责添加、移除queues中的AsyncEventQueue,Spark在queues中预定义了三个AsyncEventQueue:

  • SHARED_QUEUE
  • APP_STATUS_QUEUE
  • EXECUTOR_MANAGEMENT_QUEUE
  • EVENT_LOG_QUEUE
    分别处理特定类型的Spark Event。

LiveListenerBus是SparkContext的重要组成部分。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,902评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,037评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,978评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,867评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,763评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,104评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,565评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,236评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,379评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,313评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,363评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,034评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,637评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,719评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,952评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,371评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,948评论 2 341

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,566评论 18 139
  • NSThread 第一种:通过NSThread的对象方法 NSThread *thread = [[NSThrea...
    攻城狮GG阅读 778评论 0 3
  • 史上最全的iOS面试题及答案 iOS面试小贴士———————————————回答好下面的足够了----------...
    Style_伟阅读 2,341评论 0 35
  • 多线程、特别是NSOperation 和 GCD 的内部原理。运行时机制的原理和运用场景。SDWebImage的原...
    LZM轮回阅读 1,999评论 0 12
  • ———————————————回答好下面的足够了---------------------------------...
    恒爱DE问候阅读 1,706评论 0 4