Spark Core源码精读计划#31:BlockManager块写入流程

目录

前言

一个多月没顾上这个专题,惭愧惭愧。十一期间尽量赶上进度吧。

上一篇文章讲完了BlockManager管理下的数据读取流程,今天就来看写入的流程,顺便将读写放在一起总结一下。

块写入流程

块写入的入口

由于距离上一篇已经过去很久了,这里再贴一次BlockManager提供的getOrElseUpdate()方法的源码。该方法提供了块读写的统一入口。

代码#31.1 - o.a.s.storage.BlockManager.getOrElseUpdate()方法

  def getOrElseUpdate[T](
      blockId: BlockId,
      level: StorageLevel,
      classTag: ClassTag[T],
      makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
    get[T](blockId)(classTag) match {
      case Some(block) =>
        return Left(block)
      case _ =>
    }
    doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
      case None =>
        val blockResult = getLocalValues(blockId).getOrElse {
          releaseLock(blockId)
          throw new SparkException(s"get() failed for block $blockId even though we held a lock")
        }
        releaseLock(blockId)
        Left(blockResult)
      case Some(iter) =>
       Right(iter)
    }
  }

该方法会首先根据块ID尝试读取数据(先从本地,后从远端)。如果获取不到,就调用传入的makeIterator函数将数据转化为迭代器并写入之。最终将读取或写入的数据封装在BlockResult结构中返回。写入数据的方法是doPutIterator(),下面来看它的代码。

doPutIterator()方法

代码#31.2 - o.a.s.storage.BlockManager.doPutIterator()方法

  private def doPutIterator[T](
      blockId: BlockId,
      iterator: () => Iterator[T],
      level: StorageLevel,
      classTag: ClassTag[T],
      tellMaster: Boolean = true,
      keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
    doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
      val startTimeMs = System.currentTimeMillis
      var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None

      var size = 0L
      if (level.useMemory) {
        if (level.deserialized) {
          memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match {
            case Right(s) =>
              size = s
            case Left(iter) =>
              if (level.useDisk) {
                logWarning(s"Persisting block $blockId to disk instead.")
                diskStore.put(blockId) { channel =>
                  val out = Channels.newOutputStream(channel)
                  serializerManager.dataSerializeStream(blockId, out, iter)(classTag)
                }
                size = diskStore.getSize(blockId)
              } else {
                iteratorFromFailedMemoryStorePut = Some(iter)
              }
          }
        } else {
          memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {
            case Right(s) =>
              size = s
            case Left(partiallySerializedValues) =>
              if (level.useDisk) {
                logWarning(s"Persisting block $blockId to disk instead.")
                diskStore.put(blockId) { channel =>
                  val out = Channels.newOutputStream(channel)
                  partiallySerializedValues.finishWritingToStream(out)
                }
                size = diskStore.getSize(blockId)
              } else {
                iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
              }
          }
        }

      } else if (level.useDisk) {
        diskStore.put(blockId) { channel =>
          val out = Channels.newOutputStream(channel)
          serializerManager.dataSerializeStream(blockId, out, iterator())(classTag)
        }
        size = diskStore.getSize(blockId)
      }

      val putBlockStatus = getCurrentBlockStatus(blockId, info)
      val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
      if (blockWasSuccessfullyStored) {
        info.size = size
        if (tellMaster && info.tellMaster) {
          reportBlockStatus(blockId, putBlockStatus)
        }
        addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
        logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
        if (level.replication > 1) {
          val remoteStartTime = System.currentTimeMillis
          val bytesToReplicate = doGetLocalBytes(blockId, info)
          val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) {
            scala.reflect.classTag[Any]
          } else {
            classTag
          }
          try {
            replicate(blockId, bytesToReplicate, level, remoteClassTag)
          } finally {
            bytesToReplicate.dispose()
          }
          logDebug("Put block %s remotely took %s"
            .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
        }
      }
      assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty)
      iteratorFromFailedMemoryStorePut
    }
  }

虽然它本质上调用的是doPut()方法,但是整个写入的主体逻辑是作为一个函数(即doPut()方法的柯里化参数)传入的,因此我们可以先看上面的逻辑。阅读代码,可以整理出如下的执行逻辑:

  1. 检查块的存储等级(即StorageLevel)是否会使用内存,如果会,就优先将块展开到内存中。
  2. 如果存储等级是反序列化的,就调用MemoryStore.putIteratorAsValues()方法(具体逻辑见文章#26)将块数据作为对象写入。反之,如果是序列化的,就调用MemoryStore.putIteratorAsBytes()方法将块数据作为字节流写入。
  3. 检查上一步调用MemoryStore相关方法的结果,是否成功地展开了。如果只展开了一部分,说明内存中无法容纳块数据,因此在存储等级会使用磁盘的情况下,要继续调用DiskStore.put()方法,将多出的数据序列化地溢写到磁盘。若最终仍然没有完全展开,就将剩余的数据记录在iteratorFromFailedMemoryStorePut这个迭代器中(类型为PartiallyUnrolledIterator)。
  4. 若该块的存储等级不允许使用内存,而只允许使用磁盘,就直接调用DiskStore.put()方法写到磁盘中。
  5. 块数据写入完毕之后,如果tellMaster标记为真,调用reportBlockStatus()方法将新块的信息报告给BlockManagerMaster。
  6. 检查块的存储等级是否有副本(即后缀为_2的),如果有,还需要调用replicate()方法将块向其他节点复制一份。
  7. 方法返回iteratorFromFailedMemoryStorePut迭代器。

可见,块写入的过程符合Spark积极使用内存的特征。另外,通过阅读replicate()方法的代码(这里略去),可以发现复制块的过程是阻塞的。这就是在一般情况下不推荐使用带副本的StorageLevel的原因,会造成块写入性能下降,以及造成较大的网络传输开销。

然后就来看看doPut()方法的逻辑。

doPut()方法

该方法是doPutBytes()和doPutIterator()两个方法公用的方法,为写入的逻辑做一些前置和后置工作。代码如下。

代码#31.3 - o.a.s.storage.BlockManager.doPut()方法

  private def doPut[T](
      blockId: BlockId,
      level: StorageLevel,
      classTag: ClassTag[_],
      tellMaster: Boolean,
      keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = {
    require(blockId != null, "BlockId is null")
    require(level != null && level.isValid, "StorageLevel is null or invalid")

    val putBlockInfo = {
      val newInfo = new BlockInfo(level, classTag, tellMaster)
      if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
        newInfo
      } else {
        logWarning(s"Block $blockId already exists on this machine; not re-adding it")
        if (!keepReadLock) {
          releaseLock(blockId)
        }
        return None
      }
    }

    val startTimeMs = System.currentTimeMillis
    var exceptionWasThrown: Boolean = true
    val result: Option[T] = try {
      val res = putBody(putBlockInfo)
      exceptionWasThrown = false
      if (res.isEmpty) {
        if (keepReadLock) {
          blockInfoManager.downgradeLock(blockId)
        } else {
          blockInfoManager.unlock(blockId)
        }
      } else {
        removeBlockInternal(blockId, tellMaster = false)
        logWarning(s"Putting block $blockId failed")
      }
      res
    } catch {
      case NonFatal(e) =>
        logWarning(s"Putting block $blockId failed due to exception $e.")
        throw e
    } finally {
      if (exceptionWasThrown) {
        removeBlockInternal(blockId, tellMaster = tellMaster)
        addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
      }
    }
    if (level.replication > 1) {
      logDebug("Putting block %s with replication took %s"
        .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
    } else {
      logDebug("Putting block %s without replication took %s"
        .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
    }
    result
  }

其中,putBody即为写入块数据的函数,keepReadLock标记表示是否在写入后还继续持有对块的读锁。方法的执行流程如下:

  1. 为块生成新的BlockInfo,并调用BlockInfoManager.lockNewBlockForWriting()加写锁,准备写入。BlockInfoManager的相关细节在文章#22中已经讲过,不再赘述。
  2. 调用putBody函数的逻辑,真正地写入块数据。
  3. 若写入成功,当keepReadLock为真时,就调用BlockInfoManager.downgradeLock()方法将原先持有的写锁降级为读锁,方便后续读取。反之,当keepReadLock为假时,就直接调用BlockInfoManager.unlock()方法直接释放锁。
  4. 若putBody未能写入全部的块数据(返回的迭代器不为空)或者中途抛出了异常,说明写入不成功,调用removeBlockInternal()方法移除失败的块。该方法的实现如下,比较简单。

代码#31.4 - o.a.s.storage.BlockManager.removeBlockInternal()方法

  private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = {
    val removedFromMemory = memoryStore.remove(blockId)
    val removedFromDisk = diskStore.remove(blockId)
    if (!removedFromMemory && !removedFromDisk) {
      logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
    }
    blockInfoManager.removeBlock(blockId)
    if (tellMaster) {
      reportBlockStatus(blockId, BlockStatus.empty)
    }
  }

总结

为了方便理解,下面用图示出BlockManager读写流程的方法调用链。

图#31.1 - BlockManager的读写流程(不含BlockTransferService)

由上图可见,BlockTransferService充当了本地BlockManager与远程BlockManager之间交互的媒介,要想补全BlockManager读写过程的全貌,还得必须研究BlockTransferService的实现细节才可以。下一篇文章会详细地讲解。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容