Spark Core源码精读计划#12:Spark序列化及压缩机制浅析

目录

前言

SparkEnv的初始化过程中,在RPC环境与广播管理器之间还夹着一个,即序列化管理器SerializerManager。本来它并不在这个系列的计划内(因为没有什么比较难的点),但是最近斟酌了一下,序列化和反序列化确实是渗透在Spark Core的每个角落中的,今后不会少见。并且SerializerManager除了负责序列化之外,还会负责一部分压缩和加密的工作,所以本文就来了解一下与它相关的具体实现。

SerializerManager在SparkEnv中的初始化参见文章#7及代码#7.6,这里不再赘述。

SerializerManager类

该类接受3个主构造方法参数。

  • defaultSerializer:默认序列化器。这个序列化器在SparkEnv中初始化时已经创建好了,类型是JavaSerializer。
  • conf:即配置项SparkConf。
  • encryptionKey:加密时使用的密钥,是可选的,当其存在时才会启用加密。

成员属性列表

代码#12.1 - o.a.s.serializer.SerializerManager类的成员属性

  private[this] val kryoSerializer = new KryoSerializer(conf)

  private[this] val stringClassTag: ClassTag[String] = implicitly[ClassTag[String]]
  private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = {
    val primitiveClassTags = Set[ClassTag[_]](
      ClassTag.Boolean,
      ClassTag.Byte,
      ClassTag.Char,
      ClassTag.Double,
      ClassTag.Float,
      ClassTag.Int,
      ClassTag.Long,
      ClassTag.Null,
      ClassTag.Short
    )
    val arrayClassTags = primitiveClassTags.map(_.wrap)
    primitiveClassTags ++ arrayClassTags
  }

  private[this] val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true)
  private[this] val compressShuffle = conf.getBoolean("spark.shuffle.compress", true)
  private[this] val compressRdds = conf.getBoolean("spark.rdd.compress", false)
  private[this] val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true)

  private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec(conf)
  • kryoSerializer:采用Google Kryo序列化库的序列化器。它的效率比普通的JavaSerializer更高,但是会有一定的限制,比如原生支持的类型比较少,如果必须使用自定义的类型,需要提前注册。
  • stringClassTag:String类的类型标记(ClassTag)。因为泛型类型在编译期会被擦除(即type erasure),故ClassTag在Scala中用来在运行期指定无法识别的泛型类型。
  • primitiveAndPrimitiveArrayClassTags:Scala基本类型(primitive types)及它们对应数组(即Array[...])的所有ClassTag。基本类型有Boolean、Byte、Char、Double、Float、Int、Long、Null、Short八种。
  • compressBroadcast:是否压缩广播变量,对应配置项spark.broadcast.compress,默认值true。
  • compressShuffle:是否压缩Shuffle过程的输出数据,对应配置项spark.shuffle.compress,默认值true。
  • compressRdds:是否压缩序列化RDD的分区数据,对应配置项spark.rdd.compress,默认值false。
  • compressShuffleSpill:是否压缩Shuffle过程中向磁盘溢写的数据,对应配置项spark.shuffle.spill.compress,默认值true。
  • compressionCodec:压缩编解码器,是CompressionCodec特征的实现类,并且它会延迟初始化。

获取序列化器

代码#12.2 - o.a.s.serializer.SerializerManager.getSerializer()与canUseKryo()方法

  def canUseKryo(ct: ClassTag[_]): Boolean = {
    primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
  }

  def getSerializer(ct: ClassTag[_], autoPick: Boolean): Serializer = {
    if (autoPick && canUseKryo(ct)) {
      kryoSerializer
    } else {
      defaultSerializer
    }
  }

  def getSerializer(keyClassTag: ClassTag[_], valueClassTag: ClassTag[_]): Serializer = {
    if (canUseKryo(keyClassTag) && canUseKryo(valueClassTag)) {
      kryoSerializer
    } else {
      defaultSerializer
    }
  }

由上可见,SerializerManager取得序列化器时,会先调用canUseKryo()方法判断要序列化的对象类型是否落在8种Scala基本类型与String类型中。如果确实属于以上的类型,就会获取KryoSerializer,反之就会获取默认的JavaSerializer。

获取序列化器的getSerializer()方法也有两种重载,其中第二种重载方法专门用来确定Pair RDD在Shuffle过程中的序列化器。

对输入/输出流的包装

SerializerManager提供了多种方法来对输入流(InputStream)和输出流(OutputStream)进行包装,也就是将它们转化为压缩的或者加密的流。代码如下。

代码#12.3 - SerializerManager的wrap类方法

  def wrapForEncryption(s: InputStream): InputStream = {
    encryptionKey
      .map { key => CryptoStreamUtils.createCryptoInputStream(s, conf, key) }
      .getOrElse(s)
  }

  def wrapForEncryption(s: OutputStream): OutputStream = {
    encryptionKey
      .map { key => CryptoStreamUtils.createCryptoOutputStream(s, conf, key) }
      .getOrElse(s)
  }

  def wrapForCompression(blockId: BlockId, s: OutputStream): OutputStream = {
    if (shouldCompress(blockId)) compressionCodec.compressedOutputStream(s) else s
  }

  def wrapForCompression(blockId: BlockId, s: InputStream): InputStream = {
    if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s
  }

如果encryptionKey存在的话,调用wrapForEncryption()方法可以将流转化为加密的流。如果存储块的ID对应的数据类型支持压缩,调用wrapForCompression()方法可以将流数据用指定的编解码器压缩。判断是否可压缩的shouldCompress()方法代码如下。

代码#12.4 - o.a.s.serializer.SerializerManager.shouldCompress()方法

  private def shouldCompress(blockId: BlockId): Boolean = {
    blockId match {
      case _: ShuffleBlockId => compressShuffle
      case _: BroadcastBlockId => compressBroadcast
      case _: RDDBlockId => compressRdds
      case _: TempLocalBlockId => compressShuffleSpill
      case _: TempShuffleBlockId => compressShuffle
      case _ => false
    }
  }

序列化与反序列化的方法

SerializerManager对序列化器Serializer的serializeStream()及deserializeStream()方法进行了一定的封装,其代码如下。

代码#12.5 - SerializerManager的序列化与反序列化方法

  def dataSerializeStream[T: ClassTag](
      blockId: BlockId,
      outputStream: OutputStream,
      values: Iterator[T]): Unit = {
    val byteStream = new BufferedOutputStream(outputStream)
    val autoPick = !blockId.isInstanceOf[StreamBlockId]
    val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
    ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
  }

  def dataSerializeWithExplicitClassTag(
      blockId: BlockId,
      values: Iterator[_],
      classTag: ClassTag[_]): ChunkedByteBuffer = {
    val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)
    val byteStream = new BufferedOutputStream(bbos)
    val autoPick = !blockId.isInstanceOf[StreamBlockId]
    val ser = getSerializer(classTag, autoPick).newInstance()
    ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
    bbos.toChunkedByteBuffer
  }

  def dataDeserializeStream[T](
      blockId: BlockId,
      inputStream: InputStream)
      (classTag: ClassTag[T]): Iterator[T] = {
    val stream = new BufferedInputStream(inputStream)
    val autoPick = !blockId.isInstanceOf[StreamBlockId]
    getSerializer(classTag, autoPick)
      .newInstance()
      .deserializeStream(wrapForCompression(blockId, stream))
      .asIterator.asInstanceOf[Iterator[T]]
  }

由上可见,序列化方法既可以直接序列化为流,也可以根据值的ClassTag序列化为ChunkedByteBuffer,即分块的字节缓存。反序列化方法则是返回值类型的迭代器。并且当存储块ID的类型为StreamBlockId(Spark Streaming中用到的块ID)时,SerializerManager就不会自动判别该使用哪种序列化器,而是完全采用用户指定的类型。

CompressionCodec特征

SerializerManager实现压缩主要靠CompressionCodec。它实际上是个仅定义了两个方法(即compressedOutputStream()与compressedInputStream())的特征,所有具体逻辑都位于其伴生对象中。下面是它的部分代码。

代码#12.6 - o.a.s.io.CompressionCodec的伴生对象

  private val configKey = "spark.io.compression.codec"

  private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
    (codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
      || codec.isInstanceOf[LZ4CompressionCodec] || codec.isInstanceOf[ZStdCompressionCodec])
  }

  private val shortCompressionCodecNames = Map(
    "lz4" -> classOf[LZ4CompressionCodec].getName,
    "lzf" -> classOf[LZFCompressionCodec].getName,
    "snappy" -> classOf[SnappyCompressionCodec].getName,
    "zstd" -> classOf[ZStdCompressionCodec].getName)

  def getCodecName(conf: SparkConf): String = {
    conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
  }

  def createCodec(conf: SparkConf): CompressionCodec = {
    createCodec(conf, getCodecName(conf))
  }

  def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
    val codecClass =
      shortCompressionCodecNames.getOrElse(codecName.toLowerCase(Locale.ROOT), codecName)
    val codec = try {
      val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf])
      Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
    } catch {
      case _: ClassNotFoundException | _: IllegalArgumentException => None
    }
    codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " +
      s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC"))
  }

可见,Spark目前支持4种压缩编解码器,分别是LZ4、LZF、Snappy和ZStd,可以通过配置项spark.io.compression.codec来设置。其中LZ4是默认值,即常量DEFAULT_COMPRESSION_CODEC的值。createCodec()方法会获得Codec短名称对应的具体类名,然后通过反射创建对应的实例。

CompressionCodec的实现类都十分简短,因此不再详细列举。

总结

本文通过阅读SerializerManager与CompressionCodec的源码,大致了解了Spark内是如何处理数据序列化、反序列化及压缩的。在之后的源码阅读过程中,我们会频繁地遇到序列化和压缩相关方法的调用,相信到时候我们就不会感到迷惑了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容