Spark Streaming(4) - 反压

1. 前言

Spark Streaming在处理不断流入的数据时通过每间隔一段时间(batch interval)将这段时间内的流入的数据积累为一个batch,然后以这个batch内的数据作为job DAG的输入rdd提交新的job运行。当一个batch的的处理时间大于batch interval时,意味着数据处理速度跟不上数据接收速度,此时在数据接收端(即Receiver一般数据接收端都运行在executor上)就会积累数据,而数据是通过BlockManager管理的,如果数据存储采取MEMORY_ONLY模式就会导致OOM,采取MEMORY_AND_DISK多余的数据保存到磁盘上反而会增加数据读取时间。

说到这里,反压实际就是一种根据当前系统的处理能力来动态调节接收数据速率的功能。

2. 反压

前言中提到数据接收端Receiver,可以参考文章Spark stream receiver,简单说就是stream job运行期间会有一个或者多个Receiver运行在Executor上专门接收数据,并以batch interval为时间间隔将流式数据分割为一个个batch,然后以一个batch的数据启动job。但是stream job中Receiver并不是必然存在的,例外的情况是当数据源是kafka时,spark内置了一种叫DirectKafkaInputDStream的输入源(可以通过KafkaUtils.createDirectStream(...)创建),这种类型的InputDStream(输入源会实现这个类)没有Receiver。对于那些带Receiver的InputDStream实现类,当从InputDStream创建RDD时,源头RDD中的数据就是Receiver接收的数据,而从DirectKafkaInputDStream创建RDD时,数据实际上还没有从kafka读取过来,这个时候的RDD只包含了kafka的topic以及offset信息,等到rdd对应的task运行时才从kafka中获取数据。

由于存在有Receiver和没有两种情况,实际上反压的实现也不一样。有Receiver时控制Receiver接收数据的速率就可以了,没有Receiver的DirectKafkaInputDStream时的实现会在后文单独提一下。

关于spark stream的原理可以参考:

  1. Spark Streaming(1) - 基本原理
  2. Spark Streaming(2) - JobScheduler、JobGenerator
  3. Spark Streaming(3) - Receiver和ReceiverTacker

2.1 开启反压

指定配置spark.streaming.backpressure.enabled为true即可开启反压。

2.2 有Receiver时反压原理

反压的原理是根据之前系统的处理能力来调节未来系统接收数据速率,它的过程是下面这样的:

stream (stream id标志)里所有job完成后
-> 反馈运行信息(包括,开始结束时间、处理本次处理记录数等信息) 给JobScheduler
   -> JobScheduler将信息交给RateController(通过一个job成功事件,下文会说到) ,RateController根据反馈信息计算接下来应该控制住Receiver接收多少条数据 
      -> RateController委托JobScheduler的receiverTracker将的计算结果通知给所有在Executor上运行的Receiver 
          -> Receiever控制接收数据速率

上面过程中JobScheduler处在核心的位置,由它来负责协调,接下来分别讲述RateController,以及Recceiver是如何控制速率的。

2.2.1 RateController

RateController实现了StreamingListener, 它作为JobScheduler的lister,监听这个stream job的提交,开始运行,以及完成。其实它只关心StreamingListenerBatchCompleted事件的发生(该事件表示任务成功执行),这个事件包含了如下信息:

case class BatchInfo(
    batchTime: Time,   
    streamIdToInputInfo: Map[Int, StreamInputInfo],
    submissionTime: Long,
    processingStartTime: Option[Long],
    processingEndTime: Option[Long],
    outputOperationInfos: Map[Int, OutputOperationInfo]
  ) 
  1. RateController从何而来?
    RateContoller 是InputDStream的成员,并在子类ReceiverInputDStream中初始化了具体的实例,如下:
abstract class ReceiverInputDStream {
override protected[streaming] val rateController: Option[RateController] = {
    // 开启反压的情况下创建了了ReceiverRateController
    if (RateController.isBackPressureEnabled(ssc.conf)) {
      Some(new ReceiverRateController(id, RateEstimator.create(ssc.conf, ssc.graph.batchDuration)))
    } else {
      None
    }
  }
...
}

本节中接下来关注ReceiverRateController的实现。

  1. RateController工作原理
    上面说到RateController实现了StreamingListener,并且只关注StreamingListenerBatchCompleted,该事件发生时,会调用RateController # onBatchCompleted方法,方法体如下:
 override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
    val elements = batchCompleted.batchInfo.streamIdToInputInfo

    for {
      processingEnd <- batchCompleted.batchInfo.processingEndTime
      workDelay <- batchCompleted.batchInfo.processingDelay
      waitDelay <- batchCompleted.batchInfo.schedulingDelay
      elems <- elements.get(streamUID).map(_.numRecords)
    } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
  }
}

上面方法拿到stream job结束时间processingEnd, job真实运行时间workDelay, job从提交到结束时间waitDelay, 本次job处理记录数elems,
然后调用computeAndPublish计算,computeAndPublish方法如下:

private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
    Future[Unit] {
     // 使用rateEstimator来计算接下来的接收速率,
      val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
      newRate.foreach { s =>
        rateLimit.set(s.toLong)
        // publish是一个抽象方法,也就是将计算出来的速率通知出处
        publish(getLatestRate())
      }
}

下面是ReceiverRateController的publish的实现:

override def publish(rate: Long): Unit =
     // 通过JobScheduler的receiverTracker将计算出来的速率通知给所有的Receiver
      ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)

附:ReceiverTracker
这里顺带提一下ReceiverTracker中涉及的反压过程的成员或方法吧:
(这里涉及到spark rpc,可以参考spark rpc)

class ReceiverTracker{
   // 这个rpc一端是运行在JobScheduler上的,负责接收Receiver传过来的消息,
  // 它是类ReceiverTrackerEndpoint(ReceiverTracker的内部类)的实例
   private var endpoint: RpcEndpointRef = null
   // stream id到Receiver的信息,其中就包含了receiver的rpc通信信息
   private val receiverTrackingInfos = new HashMap[Int, ReceiverTrackingInfo]

   // 上面ReceiverRateController # push就是调用这个方法去通知receiver新的速率的
   def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
    if (isTrackerStarted) {
      // 这个enpoint是运行在JobScheduler上的,也就是给自己发了一个UpdateReceiverRateLimit更新速率的事件
      endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
    }
  }
}

下面是ReceiverTrackerEndpoint接收到UpdateReceiverRateLimit消息时的处理:
def receive{
...
case UpdateReceiverRateLimit(streamUID, newRate) =>
        //  拿到receiver的rpc信息eP, 然后发送UpdateRateLimit(newRate)更新速率
        for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
          eP.send(UpdateRateLimit(newRate))
        }
...
}

RateController更新速率以及通知新接收速率过程就是这样,接下来是Receiver如何去根据新的速率newRate控制接收速率。

2.2.2 Receiver控制速率

Receiver上运行了很多组件:

  • Receiver负责接收数据
  • 接收的数据上报给ReceiverSupervisorImpl
  • 如果接受的数据是一条条上报(调用方法ReceiverSupervisorImpl # putSingle), 则ReceiverSupervisorImpl 使用BlockGenerator用来将一条条的记录汇聚成block(如果Receiver一次接收并上报一批数据就不会使用BlockGenerator)

上面BlockGenerator将一条条数据汇聚成block, Receiver上控制接受速率就是通过BlockGenerator处理速度来实现的,BlockGenerator阻塞了也就相当于间接阻塞了Receiver接受速率。

但是上面说只有一条条接受的数据会走BlockGenerator,如果Receiver不使用
ReceiverSupervisorImpl # putSingle而是使用其他方法一次上报一批数据,其实反压是不起作用的。

  1. ReceiverSupervisorImpl接收UpdateRateLimit消息
    下面是ReceiverSupervisorImpl接受消息的rpc端处理代码:
 private val endpoint = env.rpcEnv.setupEndpoint(
    "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
      override val rpcEnv: RpcEnv = env.rpcEnv

      override def receive: PartialFunction[Any, Unit] = {
        case StopReceiver =>
          logInfo("Received stop signal")
          ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
        case CleanupOldBlocks(threshTime) =>
          logDebug("Received delete old batch signal")
          cleanupOldBlocks(threshTime)
        case UpdateRateLimit(eps) =>
          logInfo(s"Received a new rate limit: $eps.")
         // 通知所有的BlockGenerator去更新速率
          registeredBlockGenerators.asScala.foreach { bg =>
            bg.updateRate(eps)
          }
      }
    })
  1. BlockGenerator控制速率
    BlockGenerator采用令牌桶算法实现速率控制,原理简介如下:

假设想把接收速率控制在m条记录每秒, 那么生产者只需要以恒定的速度每秒向桶中放m个令牌,数据接收者接收数据之前需要从桶中拿走一个令牌才能接收一条数据,显然数据接收速率不会超过m

来看看BlockGenerator是怎么实现的,BlockGenerator实现了RateLimiter抽象类,下面是RateLimiter的部分实现:

// 最大接收速率,接收数据速率由maxRateLimit和动态更新的反压速率共同控制
private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
  // rateLimiter负责产生令牌
  private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)
 
 // BlockGenerator在存储数据之前会调用这个方法,相当于取令牌。
  def waitToPush() {
    rateLimiter.acquire()
  }

 // 更新速率,也就是令牌产生的速率
  private[receiver] def updateRate(newRate: Long): Unit =
    if (newRate > 0) {
      if (maxRateLimit > 0) {
        rateLimiter.setRate(newRate.min(maxRateLimit))
      } else {
        rateLimiter.setRate(newRate)
      }
 }

BlockGenerator的父类RateLimiter实现了令牌桶速率控制算法,接下来就是BlockGenerator在接收Receiver传递过来的数据时调用waitToPush去获取令牌了,没有令牌是,BlockGenerator阻塞,那么Receiver也会阻塞下去。

2.3 DirectKafkaInputDStream时的反压实现

DirectKafkaInputDStream是没有Receiver的。

  • 有Receiver的stream job, 数据从由Receiver接收,然后组装成block然后汇报到JobScheduler,stream job提交运行前从JobScheduler拿一个block运行。

  • 但是对于DirectKafkaInputDStream而言,不存在独立运行的Receiver接收数据,而是在从DirectKafkaInputDStream创建出KafkaRDD然后提交stream job运行时,在KafkaRDD的compute方法中才开始从kafka读取数据。此时由于接收数据是在job开始运行后在task中进行的,因此反压实现也是通过控制本次task从kafka中读取多少数据来实现的。

1. 计算接收速率
不贴代码了,DirectKafkaInputDStream使用IDRateEstimator去评估每秒接收的数据量R,同时由于DirectKafkaInputDStream可以同时从一个topic的n个分区接收数据,这个R是整个全部分区的数据接收速度,在DirectKafkaInputDStream中还有一个重要的参数spark.streaming.kafka.maxRatePerPartition(每秒)控制每个分区的最大接收速度(假设是maxR)。

评估出R之后,就是计算n个分区每个分区的速度了,这个不是简单的R/n分的,
而是根据之前每一个分区消费到的offset(假设是prefOffset)和现在每个分区的最新offset(假设是curOffset)的差值/ 总差值的比例计算出来的,

假设全部n个分区从上一batch消费之后到现在的整个未消费记录数是totalLag,
分区n1未消费数是lagN1, 那么本次batch从分区n1应该消费的记录数就是:

(min( (lagN1 / totalLag) * R, maxR)) * batchDuration(换算成秒)

2. 控制速率
「1」中计算出每个分区最大消费记录数m,接下来从DirectKafkaInputDStream生成KafkaRDD(KafkaRDD的分区个数也就是topic的分区个数), 然后提交job运行后KafkaRDD的compute方法开始从kafka消费m条记录。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,179评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,229评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,032评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,533评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,531评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,539评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,916评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,813评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,568评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,654评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,354评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,937评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,918评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,152评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,852评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,378评论 2 342

推荐阅读更多精彩内容