一.分区器的区别
- HashPartitioner分区可能HashPartitioner导致每个分区中数据量的不均匀。
- RangePartitioner分区尽量保证每个分区中数据量的均匀,将一定范围内的数映射到某一个分区内。分区与分区之间数据是有序的,但分区内的元素是不能保证顺序的。
二.RangePartitioner分区执行原理概述
1.计算总体的数据抽样大小sampleSize,计算规则是:至少每个分区抽取20个数据或者最多1e6的样本的数据量。
2.根据sampleSize和分区数量计算每个分区的数据抽样样本数量最大值sampleSizePrePartition。
3.根据以上两个值进行水塘抽样,返回RDD的总数据量,分区中总元素的个数和每个分区的采样数据。
4.计算出数据量较大的分区通过RDD.sample进行重新抽样。
5.通过抽样数组 candidates: ArrayBuffer[(K, wiegth)]计算出分区边界的数组BoundsArray
6.在取数据时,如果分区数小于128则直接获取,如果大于128则通过二分法,获取当前Key属于那个区间,返回对应的BoundsArray下标即为partitionsID。
源码分析可参考以下几篇博客
下面只对RanagePartitioner的核心机制进行分析总结。
三.RangePartitioner的实现机制
1.在总数不知道的情况下如何等概率地从中抽取N行?
类比水塘抽样法,该方法可以解决在总数不知道的情况下如何等概率地从中抽取一行数据
定义取出的行号为choice,第一次直接以第一行作为取出行 choice ,而后第二次以二分之一概率决定是否用第二行替换 choice ,第三次以三分之一的概率决定是否以第三行替换 choice ……,以此类推。
由上面的分析我们可以得出结论,在取第n个数据的时候,我们生成一个0到1的随机数p,如果p小于1/n,保留第n个数。大于1/n,继续保留前面的数。直到数据流结束,返回此数,算法结束。
解决方案:在RangePartition中如何实现在总数不知道的情况下如何等概率地从中抽取N行数据:
采样算法是RangePartitioner分区的核心,其内部使用的就是水塘抽样,而这个抽样特别适合那种总数很大而且未知,并无法将所有的数据全部存放到主内存中的情况。也就是我们不需要事先知道RDD中元素的个数。
def reservoirSampleAndCount[T: ClassTag](
input: Iterator[T],
k: Int,
seed: Long = Random.nextLong())
: (Array[T], Long) = {
val reservoir = new Array[T](k)
// 把前K个元素放入到数组reservoir中,k为设置的每个分区的样本数,及sampleSizePerPartition
var i = 0
while (i < k && input.hasNext) {
val item = input.next()
reservoir(i) = item
i += 1
}
// 如果分区记录数少于设置的分区样本数,则直接返回
// 否则使用迭代器,每次迭代出的数据,为其生成一个0至 l 的随机数,如果随机数小于K,则把reservoir数组中的对应记录替换
if (i < k) {
val trimReservoir = new Array[T](i)
System.arraycopy(reservoir, 0, trimReservoir, 0, i)
(trimReservoir, i)
} else {
var l = i.toLong
val rand = new XORShiftRandom(seed)
while (input.hasNext) {
val item = input.next()
// l 的值不断迭代的
l += 1
val replacementIndex = (rand.nextDouble() * l).toLong
//如果随机数小于K,则把reservoir数组中的对应记录替换
if (replacementIndex < k) {
reservoir(replacementIndex.toInt) = item
}
}
(reservoir, l)
}
}
分析
RangePartition中水塘抽样的理解,首先从池塘中先从前往后取出足够的样本数据,暂且称这批数据为旧数据,之后对未遍历的数据进行遍历,此次i值为该分区元素的第 i 条记录,i = k。
之后 i 值不断的迭代,然后取出(0,i)的随机数,如果该随机数小于k则,进行替换,保留第 i 个数,随着不断的迭代,i 值是一定是逐渐变大的,k 值是不变的,所以取出的新数据替换旧数据的几率就越来越小,例如,一开始,i = k,随机值小于 k 的概率为 (k-1)/k,迭代 n 次之后,该几率为 (k-1)/k+n。
用伪代码表示如下所示
从S中抽取首k项放入「水塘」中
对于每一个S[j]项(j ≥ k):
随机产生一个范围0到j的整数r
若 r < k 则把水塘中的第r项换成S[j]项
2.计算样本权重,对数据多的分区再进行抽样
父RDD各分区中的数据量可能不均匀,在极端情况下,有些分区内的数据量会占有整个RDD的绝大多数的数据,如果按照水塘抽样进行采样,会导致该分区所采样的数据量不足,因此需要对取样数不足的分区还需要重新进行采样。
通过(采样因子*分区记录数)得到每个分区应采样本数。
如果fraction * 分区内记录数 > sampleSizePerPartition,则该分区会再进行一次抽样,否则计算权重weight为 1 /(总样本数/总记录总数),因为sample中的比例就是(总样本数/总记录总数)。
如果fraction * 分区内记录数 < sampleSizePerPartition,权重weight 为(分区总数 / 采样总数),为该分区的取出的样本的真实权重,可能会比平均权重大,因为有可能在上面的reservoirSampleAndCount水塘抽样中采样总数已经达到了该分区记录数的最大值。
// 计算总样本数量和总记录数的占比,占比最大为1.0
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
// 保存样本数据的集合buffer
val candidates = ArrayBuffer.empty[(K, Float)]
// 保存数据分布不均衡的分区id(数据量超过fraction比率的分区)
val imbalancedPartitions = mutable.Set.empty[Int]
// 计算抽取出来的样本数据
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
// 如果fraction乘以当前分区中的数据量大于之前计算的每个分区的抽象数据大小,
// 那么表示当前分区抽取的数据太少了,该分区数据分布不均衡,需要重新抽取
imbalancedPartitions += idx
} else {
// 当前分区不属于数据分布不均衡的分区,计算占比权重,并添加到candidates集合中
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.size).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
// 对于数据分布不均衡的RDD分区,重新进行数据抽样
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
// 获取数据分布不均衡的RDD分区,并构成RDD
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
// 随机种子
val seed = byteswap32(-rdd.id - 1)
// 利用rdd的sample抽样函数API进行数据抽样
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
// 将最终的抽样数据计算出rangeBounds出来
RangePartitioner.determineBounds(candidates, partitions)
3.根据样本权重解决分区边界问题
先将candidate(Array[(key, weight)])按照key排序,计算总权重sumWeights,除以分区数,得到每个分区的平均权重step,接下来while循环遍历已排序的candidate,累加其权重cumWeight,每当累加权重达到一个分区的平均权重step,就获取一个key作为分区间隔符,最后返回所有获取到的分隔符,determineBounds执行完毕,也就返回了变量rangeBounds作为每个分区边界的key的集合。
def determineBounds[K: Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
// 按照数据进行数据排序,默认升序排列
val ordered = candidates.sortBy(_._1)
// 获取总的样本数量大小
val numCandidates = ordered.size
// 计算总的权重大小
val sumWeights = ordered.map(_._2.toDouble).sum
// 计算步长
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0
var previousBound = Option.empty[K]
while ((i < numCandidates) && (j < partitions - 1)) {
// 获取排序后的第i个数据及权重
val (key, weight) = ordered(i)
// 累计权重
cumWeight += weight
if (cumWeight >= target) {
// Skip duplicate values.
// 权重已经达到一个步长的范围,计算出一个分区id的值
if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
// 上一个边界值为空,或者当前边界key数据大于上一个边界的值,那么当前key有效,进行计算
// 添加当前key到边界集合中
bounds += key
// 累计target步长界限
target += step
// 分区数量加1
j += 1
// 上一个边界的值重置为当前边界的值
previousBound = Some(key)
}
}
i += 1
}
// 返回结果
bounds.toArray
}
4.由rangeBounds计算分区数和key属于哪个分区
rangeBounds少于128,直接遍历比较key和分隔符,得到PartitionId,否则使用二分查找,并做了边界条件的判断,最后,根据升序还是降序返回PartitionId。
5.父RDD中每个分区采样样本数的确定
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
父RDD每个分区需要采样的数据量是正常数的3倍。
因为父RDD各分区中的数据量可能会出现倾斜的情况,乘于3的目的就是保证数据量小的分区能够采样到足够的数据,而对于数据量大的分区会进行第二次采样。