[spark streaming] ReceiverTracker 数据产生与存储

前言

在Spark Streaming里,总体负责任务的动态调度是JobScheduler,而JobScheduler有两个很重要的成员:JobGeneratorReceiverTrackerJobGenerator 负责将每个 batch 生成具体的 RDD DAG ,而ReceiverTracker负责数据的来源。

需要在executor上运行的receiver接收数据的InputDStream都需要继承ReceiverInputDStream,ReceiverInputDStream有一个def getReceiver(): Receiver[T]方法,子类都需要实现这个方法。如KafkaInputDStream对应KafkaReceiverFlumeInputDStream对应FlumeReceiverTwitterInputDStream对应TwitterReceiver等。

流程概述:

  • ReceiverTracker 启动,获取所有InputDStreams对应的receivers
  • 根据调度策略确定每个Receiver的优先位置(能在哪些executor上执行)
  • 将Receiver包装成RDD并通过sc提交一个job,执行函数为创建supervisor实例,调用start()方法,也即调用了Receiver的onStart()方法
  • Receiver的onStart不断接收数据,通过store方法最终调用supervisor来存储块
  • 存储后通知ReceiverTracker此Block的信息
  • ReceiverTracker将Block消息交给ReceivedBlockTracker管理

启动 Receiver

先看看receiverTracker的启动过程:

ssc.start()
    scheduler.start()
        receiverTracker.start()
        jobGenerator.start()
----
 def start(): Unit = synchronized {
    if (isTrackerStarted) {
      throw new SparkException("ReceiverTracker already started")
    }

    if (!receiverInputStreams.isEmpty) {
      endpoint = ssc.env.rpcEnv.setupEndpoint(
        "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
      if (!skipReceiverLaunch) launchReceivers()
      logInfo("ReceiverTracker started")
      trackerState = Started
    }
  }

在start方法中先创建了ReceiverTracker的Endpoint,接着调用launchReceivers()方法来启动Recivers:

 private def launchReceivers(): Unit = {
    val receivers = receiverInputStreams.map { nis =>
      val rcvr = nis.getReceiver()
      rcvr.setReceiverId(nis.id)
      rcvr
    }

    runDummySparkJob()

    logInfo("Starting " + receivers.length + " receivers")
    endpoint.send(StartAllReceivers(receivers))
  }

遍历所有的InputStream,并得到所对应的Receiver集合receivers。并向ReceiverTrackerEndpoint发送了StartAllReceivers消息,看看接收到该消息后是如何处理的:

 case StartAllReceivers(receivers) =>
        val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
        for (receiver <- receivers) {
          val executors = scheduledLocations(receiver.streamId)
          updateReceiverScheduledExecutors(receiver.streamId, executors)
          receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
          startReceiver(receiver, executors)
        }

通过调度策略来计算决定每个receiver的一组优先位置,即一个Receiver改在哪个executor节点上启动,调度的主要原则是:

  • 满足Receiver的preferredLocation。
  • 其次保证将Receiver分布的尽量均匀。

接着遍历所有receivers调用了startReceiver(receiver, executors)方法来启动receiver:

 private def startReceiver(
        receiver: Receiver[_],
        scheduledLocations: Seq[TaskLocation]): Unit = {
      def shouldStartReceiver: Boolean = {
        // It's okay to start when trackerState is Initialized or Started
        !(isTrackerStopping || isTrackerStopped)
      }

      val receiverId = receiver.streamId
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
        return
      }

      val checkpointDirOption = Option(ssc.checkpointDir)
      val serializableHadoopConf =
        new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

      // Function to start the receiver on the worker node
      val startReceiverFunc: Iterator[Receiver[_]] => Unit =
        (iterator: Iterator[Receiver[_]]) => {
          if (!iterator.hasNext) {
            throw new SparkException(
              "Could not start receiver as object not found.")
          }
          if (TaskContext.get().attemptNumber() == 0) {
            val receiver = iterator.next()
            assert(iterator.hasNext == false)
            val supervisor = new ReceiverSupervisorImpl(
              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
            supervisor.start()
            supervisor.awaitTermination()
          } else {
            // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
          }
        }

      // Create the RDD using the scheduledLocations to run the receiver in a Spark job
      val receiverRDD: RDD[Receiver[_]] =
        if (scheduledLocations.isEmpty) {
          ssc.sc.makeRDD(Seq(receiver), 1)
        } else {
          val preferredLocations = scheduledLocations.map(_.toString).distinct
          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
        }
      receiverRDD.setName(s"Receiver $receiverId")
      ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
      ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

      val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
        receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
      // We will keep restarting the receiver job until ReceiverTracker is stopped
      future.onComplete {
        case Success(_) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
        case Failure(e) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logError("Receiver has been stopped. Try to restart it.", e)
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
      }(ThreadUtils.sameThread)
      logInfo(s"Receiver ${receiver.streamId} started")
    }

注意这里巧妙的将receiver包装成了RDD,并把scheduledLocations作为RDD的优先位置locationPrefs。

然后通过sc提交了一个Spark Core Job,执行函数是startReceiverFunc(也就是要在executor上执行的),在该方法中创建一个ReceiverSupervisorImpl对象,并调用了start()方法,在该方法中会调用 receiver的onStart 后立即返回。

receiver的onStart 方法一般会新建线程或线程池来接收数据,比如在 KafkaReceiver 中,就新建了线程池,在线程池中接收 topics 的数据。

supervisor.start() 返回后,由 supervisor.awaitTermination() 阻塞住线程,以让这个 task 一直不退出,从而可以源源不断接收数据。

Receiver 数据处理

前面提到receiver的onStart()方法会新建线程或线程池来接收数据,那接收的数据怎么处理的呢?都会调用receiver的store(),而store方法又调用了supervisor的方法。对应的store方法有多种形式:

  • pushSingle: 对应单条小数据,需要通过BlockGenerator聚集多条数据后再成块的存储
  • pushArrayBuffer: 对应数组形式的数据
  • pushIterator: 对应 iterator 形式数据
  • pushBytes: 对应 ByteBuffer 形式的块数据

除了pushSingle需要通过BlockGenerator将数据聚集成一个块的时候再存储,其他方法都是直接成块存储。

看看pushSingle是怎么通过聚集的方式存储块的:

def pushSingle(data: Any) {
    defaultBlockGenerator.addData(data)
  }
------
def addData(data: Any): Unit = {
    if (state == Active) {
      waitToPush()
      synchronized {
        if (state == Active) {
          currentBuffer += data
        } else {
          throw new SparkException(
            "Cannot add data as BlockGenerator has not been started or has been stopped")
        }
      }
    } else {
      throw new SparkException(
        "Cannot add data as BlockGenerator has not been started or has been stopped")
    }
  }

这里的先调用waitToPush(),会有rateLimiter检查速率,防止加入过快,如果过快会block住等到下一秒再添加,一秒能添加的条数受spark.streaming.receiver.maxRate控制,即一个Receiver每秒能添加的条数。
检查完后会将数据添加到一个变长数组currentBuffer中。

另外,BlockGenerator被初始化的时候就创建了一个定时器:

private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
  require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value")

  private val blockIntervalTimer =
    new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")

定时间隔默认200ms,可通过spark.streaming.blockInterval配置,每次定时执行的是updateCurrentBuffer方法:

private def updateCurrentBuffer(time: Long): Unit = {
    try {
      var newBlock: Block = null
      synchronized {
        if (currentBuffer.nonEmpty) {
          val newBlockBuffer = currentBuffer
          currentBuffer = new ArrayBuffer[Any]
          val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
          listener.onGenerateBlock(blockId)
          newBlock = new Block(blockId, newBlockBuffer)
        }
      }

      if (newBlock != null) {
        blocksForPushing.put(newBlock)  // put is blocking when queue is full
      }
    } catch {
      case ie: InterruptedException =>
        logInfo("Block updating timer thread was interrupted")
      case e: Exception =>
        reportError("Error in block updating thread", e)
    }
  }
  • 将 currentBuffer 赋值给 newBlockBuffer
  • 重新为currentBuffer分配一个新对象,以供存储新的数据
  • 将currentBuffer封装为Block后添加到blocksForPushing中,blocksForPushing是一个默认长度为10的Queue,可通过spark.streaming.blockQueueSize配置

BlockGenerator初始化的时候还启动了一个线程来从blocksForPushing队列中取出Block通过supervisor来存储块的:

private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }

supervisor 存储数据块

先存储再向上报告:

#pushAndReportBlock
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))

存储数据块有对应的receivedBlockHandler,在启用了WAL(spark.streaming.receiver.writeAheadLog.enable为true)的情况下对应的是WriteAheadLogBasedBlockHandler(启用了WAL的情况下在应用程序挂掉后可以从WAL恢复数据),未启用的情况下对应的是BlockManagerBasedBlockHandler。

private val receivedBlockHandler: ReceivedBlockHandler = {
    if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
      if (checkpointDirOption.isEmpty) {
        throw new SparkException(
          "Cannot enable receiver write-ahead log without checkpoint directory set. " +
            "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
            "See documentation for more details.")
      }
      new WriteAheadLogBasedBlockHandler(env.blockManager, env.serializerManager, receiver.streamId,
        receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
    } else {
      new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
    }

storeBlock方法部分代码:

case ArrayBufferBlock(arrayBuffer) =>
    numRecords = Some(arrayBuffer.size.toLong)
    blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,tellMaster = true)
case IteratorBlock(iterator) =>
    val countIterator = new CountingIterator(iterator)
    val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,tellMaster = true)
    numRecords = countIterator.count
    putResult
case ByteBufferBlock(byteBuffer) =>
    blockManager.putBytes(blockId, new ChunkedByteBuffer(byteBuffer.duplicate()), storageLevel, tellMaster = true)

两种handler都是通过blockManager来存储block到内存或者磁盘,存储的细节可见BlockManager 解析

通知 ReceiverTracker

存储了block后,接着创建了ReceivedBlockInfo实例,对应该block的一些信息,包括streamId(一个InputDStream对应一个Receiver,一个Receiver对应一个streamId)、block中数据的条数、storeResult等信息。

接着将receivedBlockInfo作为参数和ReceiverTracker通信发送AddBlock消息,ReceiverTracker收到消息后的处理如下:

 case AddBlock(receivedBlockInfo) =>
        if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
          walBatchingThreadPool.execute(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              if (active) {
                context.reply(addBlock(receivedBlockInfo))
              } else {
                throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
              }
            }
          })
        } else {
          context.reply(addBlock(receivedBlockInfo))
        }

都会调用addBlock(receivedBlockInfo)方法:

private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    receivedBlockTracker.addBlock(receivedBlockInfo)
  }

ReceiverTracker有个专门管理block的成员receivedBlockTracker,通过addBlock(receivedBlockInfo)来添加block信息:

def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    try {
      val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
      if (writeResult) {
        synchronized {
          getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
        }
        logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
          s"block ${receivedBlockInfo.blockStoreResult.blockId}")
      } else {
        logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
          s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
      }
      writeResult
    } catch {
      case NonFatal(e) =>
        logError(s"Error adding block $receivedBlockInfo", e)
        false
    }
  }

若启用WAL则会先将block信息以WAL保存,之后都会将block信息保存到streamIdToUnallocatedBlockQueuesmutable.HashMap[Int, ReceivedBlockQueue]中,其中key为InputDStream唯一id,value为已存储但未分配的block信息。之后为 batch 分配blocks,会访问该结构来获取每个 InputDStream 对应的未消费的 blocks。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容