7.spark core之数据分区

简介

  spark一个最重要的特性就是对数据集在各个节点的分区进行控制。控制数据分布可以减少网络开销,极大地提升整体性能。
  只有Pair RDD才有分区,非Pair RDD分区的值是None。如果RDD只被扫描一次,没必要预先分区处理;如果RDD多次在诸如连接这种基于键的操作中使用时,分区才有作用。

分区器

  分区器决定了RDD的分区个数及每条数据最终属于哪个分区。
  spark提供了两个分区器:HashPartitioner和RangePartitioner,它们都继承于org.apache.spark.Partitioner类并实现三个方法。

  • numPartitions: Int: 指定分区数
  • getPartition(key: Any): Int: 分区编号(0~numPartitions-1)
  • equals(): 检查分区器对象是否和其他分区器实例相同,判断两个RDD分区方式是否一样。

HashPartitioner分区

  HashPartitioner分区执行原理:对于给定的key,计算其hashCode,再除以分区数取余,最后的值就是这个key所属的分区ID。实现如下:

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
 
  def numPartitions: Int = partitions
 
  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }
 
  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }
 
  override def hashCode: Int = numPartitions
}

RangePartitioner分区

  HashPartitioner分区可能导致每个分区中数据量的不均匀。而RangePartitioner分区则尽量保证每个分区中数据量的均匀,将一定范围内的数映射到某一个分区内。分区与分区之间数据是有序的,但分区内的元素是不能保证顺序的。
  RangePartitioner分区执行原理:

  • 计算总体的数据抽样大小sampleSize,计算规则是:至少每个分区抽取20个数据或者最多1M的数据量。
  • 根据sampleSize和分区数量计算每个分区的数据抽样样本数量sampleSizePrePartition
  • 调用RangePartitioner的sketch函数进行数据抽样,计算出每个分区的样本。
  • 计算样本的整体占比以及数据量过多的数据分区,防止数据倾斜。
  • 对于数据量比较多的RDD分区调用RDD的sample函数API重新进行数据抽取。
  • 将最终的样本数据通过RangePartitoner的determineBounds函数进行数据排序分配,计算出rangeBounds。
class RangePartitioner[K: Ordering : ClassTag, V](
                                                   partitions: Int,
                                                   rdd: RDD[_ <: Product2[K, V]],
                                                   private var ascending: Boolean = true)
  extends Partitioner {

  // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
  require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")

  // 获取RDD中K类型数据的排序器
  private var ordering = implicitly[Ordering[K]]

  // An array of upper bounds for the first (partitions - 1) partitions
  private var rangeBounds: Array[K] = {
    if (partitions <= 1) {
      // 如果给定的分区数小于等于1的情况下,直接返回一个空的集合,表示数据不进行分区
      Array.empty
    } else {
      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
      // 给定总的数据抽样大小,最多1M的数据量(10^6),最少20倍的RDD分区数量,也就是每个RDD分区至少抽取20条数据
      val sampleSize = math.min(20.0 * partitions, 1e6)
      // Assume the input partitions are roughly balanced and over-sample a little bit.
      // RDD各分区中的数据量可能会出现倾斜的情况,乘于3的目的就是保证数据量小的分区能够采样到足够的数据,而对于数据量大的分区会进行第二次采样
      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
      // 从rdd中抽取数据,返回值:(总rdd数据量, Array[分区id,当前分区的数据量,当前分区抽取的数据])
      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
      if (numItems == 0L) {
        // 如果总的数据量为0(RDD为空),那么直接返回一个空的数组
        Array.empty
      } else {
        // If a partition contains much more than the average number of items, we re-sample from it
        // to ensure that enough items are collected from that partition.
        // 计算总样本数量和总记录数的占比,占比最大为1.0
        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
        // 保存样本数据的集合buffer
        val candidates = ArrayBuffer.empty[(K, Float)]
        // 保存数据分布不均衡的分区id(数据量超过fraction比率的分区)
        val imbalancedPartitions = mutable.Set.empty[Int]
        // 计算抽取出来的样本数据
        sketched.foreach { case (idx, n, sample) =>
          if (fraction * n > sampleSizePerPartition) {
            // 如果fraction乘以当前分区中的数据量大于之前计算的每个分区的抽象数据大小,那么表示当前分区抽取的数据太少了,该分区数据分布不均衡,需要重新抽取
            imbalancedPartitions += idx
          } else {
            // 当前分区不属于数据分布不均衡的分区,计算占比权重,并添加到candidates集合中
            // The weight is 1 over the sampling probability.
            val weight = (n.toDouble / sample.size).toFloat
            for (key <- sample) {
              candidates += ((key, weight))
            }
          }
        }

        // 对于数据分布不均衡的RDD分区,重新进行数据抽样
        if (imbalancedPartitions.nonEmpty) {
          // Re-sample imbalanced partitions with the desired sampling probability.
          // 获取数据分布不均衡的RDD分区,并构成RDD
          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
          // 随机种子
          val seed = byteswap32(-rdd.id - 1)
          // 利用rdd的sample抽样函数API进行数据抽样
          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
          val weight = (1.0 / fraction).toFloat
          candidates ++= reSampled.map(x => (x, weight))
        }

        // 将最终的抽样数据计算出rangeBounds出来
        RangePartitioner.determineBounds(candidates, partitions)
      }
    }
  }

  // 下一个RDD的分区数量是rangeBounds数组中元素数量+ 1个
  def numPartitions: Int = rangeBounds.length + 1

  // 二分查找器,内部使用java中的Arrays类提供的二分查找方法
  private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

  // 根据RDD的key值返回对应的分区id。从0开始
  def getPartition(key: Any): Int = {
    // 强制转换key类型为RDD中原本的数据类型
    val k = key.asInstanceOf[K]
    var partition = 0
    if (rangeBounds.length <= 128) {
      // If we have less than 128 partitions naive search
      // 如果分区数据小于等于128个,那么直接本地循环寻找当前k所属的分区下标
      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
        partition += 1
      }
    } else {
      // Determine which binary search method to use only once.
      // 如果分区数量大于128个,那么使用二分查找方法寻找对应k所属的下标;
      // 但是如果k在rangeBounds中没有出现,实质上返回的是一个负数(范围)或者是一个超过rangeBounds大小的数(最后一个分区,比所有数据都大)
      partition = binarySearch(rangeBounds, k)
      // binarySearch either returns the match location or -[insertion point]-1
      if (partition < 0) {
        partition = -partition - 1
      }
      if (partition > rangeBounds.length) {
        partition = rangeBounds.length
      }
    }

    // 根据数据排序是升序还是降序进行数据的排列,默认为升序
    if (ascending) {
      partition
    } else {
      rangeBounds.length - partition
    }
  }

影响分区的算子操作

  影响分区的算子操作有:cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、partitionBy()、repartition()、coalesce()、sort()、mapValues()(如果父RDD有分区方式)、flatMapValues()(如果父RDD有分区方式)。
  对于执行两个RDD的算子操作,输出数据的分区方式取决于父RDD的分区方式。默认情况下,结果会采用哈希分区,分区的数量和操作的并行度一样。不过,如果其中一个父RDD设置过分区方式,结果就采用那种分区方式;如果两个父RDD都设置过分区方式,结果RDD采用第一个父RDD的分区方式。

repartition和partitionBy的区别

  repartition 和 partitionBy 都是对数据进行重新分区,默认都是使用 HashPartitioner。但是二者之间的区别有:

  • partitionBy只能用于Pair RDD
  • 都作用于Pair RDD时,结果也不一样


    repartition和partitionBy的区别.jpg

  其实partitionBy的结果才是我们所预期的。repartition 其实使用了一个随机生成的数来当作 key,而不是使用原来的key。

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
}
 
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = (new Random(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]
 
      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions).values
    } else {
      new CoalescedRDD(this, numPartitions)
    }
}

repartition和coalesce的区别

  两个算子都是对RDD的分区进行重新划分,repartition只是coalesce接口中shuffle为true的简易实现,(假设RDD有N个分区,需要重新划分成M个分区)

  • N<M。一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true。
  • 如果N>M并且N和M相差不多(假如N是1000,M是100),这时可以将shuffle设置为false,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系。在shuffle为false的情况下,如果N<M时,coalesce是无效的。
  • 如果N>M并且两者相差悬殊,这时如果将shuffle设置为false,父子RDD是窄依赖关系,同处在一个Stage中,就可能造成spark程序的并行度不够,从而影响性能。如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以将shuffle设置为true。

实例分析

需求

  统计用户访问其未订阅主题页面的情况。

  • 用户信息表:由(UserID,UserInfo)组成的RDD,UserInfo包含该用户所订阅的主题列表。
  • 事件表:由(UserID,LinkInfo)组成的RDD,存放着每五分钟内网站各用户访问情况。

代码实现

val sc = new SparkContext()
val userData = sc.sequenceFile[UserID,LinkInfo]("hdfs://...").persist
def processNewLogs(logFileName:String){
    val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
    //RDD of (UserID,(UserInfo,LinkInfo)) pairs
    val joined = usersData.join(events)
    val offTopicVisits = joined.filter {
        // Expand the tuple into its components
        case (userId, (userInfo, linkInfo)) => 
            !userInfo.topics.contains(linkInfo.topic)
    }.count()
    println("Number of visits to non-subscribed opics: " + offTopicVisits)
}

缺点

  连接操作会将两个数据集中的所有键的哈希值都求出来,将哈希值相同的记录通过网络传到同一台机器上,然后再对所有键相同的记录进行连接操作。userData表数据量很大,所以这样进行哈希计算和跨节点数据混洗非常耗时。

改进代码实现

val userData = sc.sequenceFile[UserID,LinkInfo]("hdfs://...")
.partionBy(new HashPartiotioner(100))
.persist()

优点

  userData表进行了重新分区,将键相同的数据都放在一个分区中。然后调用persist持久化结果数据,不用每次都计算哈希和跨节点混洗。程序运行速度显著提升。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,670评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,928评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,926评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,238评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,112评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,138评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,545评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,232评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,496评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,596评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,369评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,226评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,600评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,906评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,185评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,516评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,721评论 2 335

推荐阅读更多精彩内容