[spark] BlockManager 解析

概述

BlockManager是spark自己的存储系统,RDD-Cache、 Shuffle-output、broadcast 等的实现都是基于BlockManager来实现的,BlockManager也是分布式结构,在driver和所有executor上都会有blockmanager节点,每个节点上存储的block信息都会汇报给driver端的blockManagerMaster作统一管理,BlockManager对外提供get和set数据接口,可将数据存储在memory, disk, off-heap。

blockManager的创建与注册

blockManagerMaster和blockManager都是在构造SparkEnv的时候创建的,Driver端是创建SparkContext的时候创建SparkEnv,Executor端的SparkEnv是在其守护进程CoarseGrainedExecutorBackend创建的时候创建的,下面看blockManager是怎么在sparkEnv中创建的:

// get&put 远程block的时候就是通过blockTransferService 完成的
val blockTransferService =
      new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
        blockManagerPort, numUsableCores)

 val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
      BlockManagerMaster.DRIVER_ENDPOINT_NAME,
      new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
      conf, isDriver)

    // NB: blockManager is not valid until initialize() is called later.
    val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
      serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
      blockTransferService, securityManager, numUsableCores)

构造blockManagerMaster的时候在Driver端是创建了一个BlockManagerMasterEndpoint并注册到了rpcEnv中,而在executor端是获取到了 Driver端BlockManagerMasterEndpoint的引用 BlockManagerMasterRef,以便后面的通信。随后都创建了自己blockManager,创建blockManager的时候都创建了BlockManagerSlaveEndpoint。

blockManager创建后还不能直接使用,接着都会调用blockManager的initialize方法,通过与master通信向master进行注册,master收到消息后会将blockManager的信息存到blockManagerInfo的map中,key为blockManagerId(保存着executorId、host、post等信息),value为BlockManagerInfo(保存着具体的block状态信息及 BlockManagerSlaveEndpoint 的引用 ),注册完后就可以真正干活了。

master与slave间的消息传递

slave -> master

    // slave向master注册,会保存在master的blockManagerInfo中
    case RegisterBlockManager(blockManagerId, maxMemSize, slaveEndpoint) =>
      context.reply(register(blockManagerId, maxMemSize, slaveEndpoint))
    
    // 一个Block的更新消息,BlockId作为一个Block的唯一标识,会保存Block所在的节点和位置关系,以及block 存储级别,大小 占用内存和磁盘大小
    case _updateBlockInfo @
        UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
      context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
      listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
  
    // 用于获取指定 blockId 的 block 所在的 BlockManagerId 列表
    case GetLocations(blockId) =>
      context.reply(getLocations(blockId))
    
    // 获取多个Block所在 的位置,位置中会反映Block位于哪个 executor, host 和端口
    case GetLocationsMultipleBlockIds(blockIds) =>
      context.reply(getLocationsMultipleBlockIds(blockIds))

    // 一个block有可能在多个节点上存在,返回一个节点列表
    case GetPeers(blockManagerId) =>
      context.reply(getPeers(blockManagerId))
    
    // 根据BlockId,获取所在executorEndpointRef 也就是 BlockManagerSlaveEndpoint的引用
    case GetExecutorEndpointRef(executorId) =>
      context.reply(getExecutorEndpointRef(executorId))

    // 获取所有节点上的BlockManager的最大内存和剩余内存
    case GetMemoryStatus =>
      context.reply(memoryStatus)
    
    // 获取所有节点上的BlockManager的最大磁盘空间和剩余磁盘空间
    case GetStorageStatus =>
      context.reply(storageStatus)

    // 获取一个Block的状态信息,位置,占用内存和磁盘大小
    case GetBlockStatus(blockId, askSlaves) =>
      context.reply(blockStatus(blockId, askSlaves))

    // 获取一个Block的存储级别和所占内存和磁盘大小
    case GetMatchingBlockIds(filter, askSlaves) =>
      context.reply(getMatchingBlockIds(filter, askSlaves))
 
    // 删除Rdd对应的Block数据
    case RemoveRdd(rddId) =>
      context.reply(removeRdd(rddId))
 
    // 删除 shuffleId对应的BlockId的Block
    case RemoveShuffle(shuffleId) =>
      context.reply(removeShuffle(shuffleId))

    // 删除Broadcast对应的Block数据
    case RemoveBroadcast(broadcastId, removeFromDriver) =>
      context.reply(removeBroadcast(broadcastId, removeFromDriver))
    
    // 删除一个Block数据,会找到数据所在的slave,然后向slave发送一个删除消息
    case RemoveBlock(blockId) =>
      removeBlockFromWorkers(blockId)
      context.reply(true)
    
    // 从BlockManagerInfo中删除一个BlockManager, 并且删除这个 BlockManager上的所有的Blocks
    case RemoveExecutor(execId) =>
      removeExecutor(execId)
      context.reply(true)

    // 用于停止 driver 或 executor 端的 BlockManager
    case StopBlockManagerMaster =>
      context.reply(true)
      stop()

    // slave 发送心跳给 master , 证明自己还活着
    case BlockManagerHeartbeat(blockManagerId) =>
      context.reply(heartbeatReceived(blockManagerId))
    
    // 用于检查 executor 是否有缓存 blocks(广播变量的 blocks 不作考虑,因为广播变量的 block 不会汇报给 Master)
    case HasCachedBlocks(executorId) =>
      blockManagerIdByExecutor.get(executorId) match {
        case Some(bm) =>
          if (blockManagerInfo.contains(bm)) {
            val bmInfo = blockManagerInfo(bm)
            context.reply(bmInfo.cachedBlocks.nonEmpty)
          } else {
            context.reply(false)
          }
        case None => context.reply(false)
      }

master -> slave

    // slave删除自己BlockManager上的一个Block
    case RemoveBlock(blockId) =>
      doAsync[Boolean]("removing block " + blockId, context) {
        blockManager.removeBlock(blockId)
        true
      }
     
    // 删除Rdd对应的Block数据
    case RemoveRdd(rddId) =>
      doAsync[Int]("removing RDD " + rddId, context) {
        blockManager.removeRdd(rddId)
      }

    // 删除 shuffleId对应的BlockId的Block
    case RemoveShuffle(shuffleId) =>
      doAsync[Boolean]("removing shuffle " + shuffleId, context) {
        if (mapOutputTracker != null) {
          mapOutputTracker.unregisterShuffle(shuffleId)
        }
        SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
      }

    // 删除 BroadcastId对应的BlockId的Block
    case RemoveBroadcast(broadcastId, _) =>
      doAsync[Int]("removing broadcast " + broadcastId, context) {
        blockManager.removeBroadcast(broadcastId, tellMaster = true)
      }

    // 获取一个Block的存储级别和所占内存和磁盘大小
    case GetBlockStatus(blockId, _) =>
      context.reply(blockManager.getStatus(blockId))

    case GetMatchingBlockIds(filter, _) =>
      context.reply(blockManager.getMatchingBlockIds(filter))

    case TriggerThreadDump =>
      context.reply(Utils.getThreadDump())

存储

在blockManager被创建的时候创建了MemoryStore和DiskStore两个对象用以存取block。

DiskStore

diskSore就是基于磁盘来存储数据的,diskStore有一个成员DiskBlockManager,其主要作用就是逻辑block和磁盘block的映射,block的blockId对应磁盘文件中的一个文件。

def getFile(filename: String): File = {
    // Figure out which local directory it hashes to, and which subdirectory in that
    val hash = Utils.nonNegativeHash(filename)
    val dirId = hash % localDirs.length
    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

    // Create the subdirectory if it doesn't already exist
    val subDir = subDirs(dirId).synchronized {
      val old = subDirs(dirId)(subDirId)
      if (old != null) {
        old
      } else {
        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
        if (!newDir.exists() && !newDir.mkdir()) {
          throw new IOException(s"Failed to create local dir in $newDir.")
        }
        subDirs(dirId)(subDirId) = newDir
        newDir
      }
    }

    new File(subDir, filename)
  }

通过blockId的hash值和localDirs的个数求余来决定在哪个localDir下创建文件,这里的localDirs是可配置的多个目录,可通过SPARK_LOCAL_DIRS进行设置,多个目录以逗号分割,配置多个目录的目的是可分散磁盘的读写压力。另外spark在每个localDir中创建了64(可通过spark.diskStore.subDirectories配置)个子目录来分散文件,子文件的选择也是通过blockId的hash值来计算的。

在diskStore中的putButes方法就是真正写数据到磁盘的方法:

def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
    put(blockId) { fileOutputStream =>
      val channel = fileOutputStream.getChannel
      Utils.tryWithSafeFinally {
        bytes.writeFully(channel)
      } {
        channel.close()
      }
    }
  }
def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
    if (contains(blockId)) {
      throw new IllegalStateException(s"Block $blockId is already present in the disk store")
    }
    logDebug(s"Attempting to put block $blockId")
    val startTime = System.currentTimeMillis
    val file = diskManager.getFile(blockId)
    val fileOutputStream = new FileOutputStream(file)
    var threwException: Boolean = true
    try {
      writeFunc(fileOutputStream)
      threwException = false
    } finally {
      try {
        Closeables.close(fileOutputStream, threwException)
      } finally {
         if (threwException) {
          remove(blockId)
        }
      }
    }
    val finishTime = System.currentTimeMillis
    logDebug("Block %s stored as %s file on disk in %d ms".format(
      file.getName,
      Utils.bytesToString(file.length()),
      finishTime - startTime))
  }

接收一个blockId和要写的字节数据,通过blockId获取到要写的具体文件并得到对应的文件输出流,将该bytes直接write这个流里,完成写文件。

diskStore还有一个重要的方法getBytes方法,即读磁盘文件的方法,通过blockId获取到对应的磁盘文件,以字节 buffer 的形式返回。

此外还有查询blockId对应文件的大小、是否存在blockId对应的文件、删除blockId对应的文件等方法。

MemoryStore

memorySore是基于JVM的堆内存来存储数据,可以用于存数据的内存大小为:

(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong

其中memoryFraction 是可通过配置的一个比例(spark.storage.memoryFraction,默认0.6),safetyFraction是一个安全比例,可通过spark.storage.safetyFraction设置。

MemoryStore内部维护了一个hash map来管理所有的block,以block id为key将block存放到hash map中。

private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)

放内存就意味着要有足够的内存来放,不然会导致OOM。

  • 若以blockId 对应的数据以bytes数据的方式存放,则可根据其size大小来判断是否有这么多内存来存入,不够的可以放磁盘,对应的方法是:

    putBytes[T: ClassTag](blockId: BlockId, size: Long, memoryMode: MemoryMode, _bytes: () => ChunkedByteBuffer): Boolean
    
  • 若是以blockId 对应的数据通过迭代器的方式写入内存,则无法提前知道其数据大小,这里的做法是逐步展开迭代器来检查是否还有空余内存。如果迭代器顺利展开了,那么用来展开迭代器的内存直接转换为存储内存,而不用再去分配内存来存储该 block 数据。如果未能完全开展迭代器,则返回一个包含 block 数据的迭代器,其对应的数据是由多个局部块组合而成的 block 数据,对应的方法是:

    putIteratorAsValues[T](blockId: BlockId, values: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long]
    
    putIteratorAsBytes[T](blockId: BlockId, values: Iterator[T], classTag: ClassTag[T], memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long]
    

通过memoryStore读数据也有两种方式,一个是以字节buffer的形式返回指定的block数据,另一个是以迭代器的形式返回指定的block数据。

blockManager对外服务

blockManager典型的几个应用场景如下:

  • spark shuffle过程的数据就是通过blockManager来存储的。
  • spark broadcast 将task调度到多个executor的时候,broadCast 底层使用的数据存储就是blockManager。
  • 对一个rdd进行cache的时候,cache的数据就是通过blockManager来存放的。
  • spark streaming 一个 ReceiverInputDStream 接受到的数据也是先放在 BlockManager 中, 然后封装为一个 BlockRdd 进行下一步运算的。

参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容