Spark应用分片介绍

引言

分布式计算的基本思路是将数据分为多个部分,将同样的数据操作方式在数据的不同部分上执行,分别获得结果,然后通过“汇聚处理”的方式得到结果。如何将数据分为多个部分(也就是“分片”)便是其中的一个重要组成部分。Spark框架同样对使用分片的操作,将数据分片(partition)处理。本文对Spark框架中的数据分片作简单介绍。

输入数据的分片

对于读取批数据生成rdd的操作,数据的分片都是通过输入文件格式本身提供的getSplit方法来对数据进行分片。
本部分主要介绍对于不同数据源的数据,spark如何定义/获取数据的分片数。

text文件分片(sc.textFile为例):

def textFile(
  path: String,
  minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat]  /* 数据文件的输入格式 :org.apache.hadoop.mapred.TextInputFormat */
, classOf[LongWritable], classOf[Text],
  minPartitions).map(pair => pair._2.toString).setName(path)
}
 hadoopFile方法生成HadoopRDD
 HadoopRDD(
  this,
  confBroadcast,
  Some(setInputPathsFunc),
  inputFormatClass,
  keyClass,
  valueClass,
  minPartitions) /* minPartitions为生成该RDD的最小分片数,表示该RDD的分片数最小值,默认为2*/
  .setName(path)

在执行action方法(如count)时,spark应用才真正开始计算,通过调用rdd.partitions.length计算出分片数

    def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}

通过跟踪该方法可以看出该函数最终会调用到HadoopRDD的getPartitions方法,在该方法中通过inputFormat的getSplit方法计算分片数

    getInputFormat(jobConf).getSplits(jobConf, minPartitions)

TextInputFormat继承至FileInputFormat,FileInputFormat的getSplit方法网上有许多分析,这里不再展开,大致的原理是根据文件个数,传入的minpartitions,mapreduce.input.fileinputformat.split.minsize等参数计算出分片数。

hbase表分片

在读取HBase数据时,没有类似textFile的接口的封装,可调用如下接口生成给予hbase数据的RDD,

val hBaseRDD = sc.newAPIHadoopRDD(conf, 
classOf[TableInputFormat], /*该类的全类名为:  org.apache.hadoop.hbase.mapreduce.TableInputFormat */
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],  
  classOf[org.apache.hadoop.hbase.client.Result])  
  
  该方法生成new NewHadoopRDD(this, fClass, kClass, vClass, jconf)

在执行action操作时,同样调用到rdd.partitions方法,跟踪至newHadoopRDD之后,发现调用到

inputFormat.getSplits(new JobContextImpl(_conf, jobId))

查看对应的getSplits方法可以看出:

默认情况下(hbase.mapreduce.input.autobalance的值为false)hbase表如果存在多个region,则每个region设置为一个split。

如果设置了开启均衡(设置hbase.mapreduce.input.autobalance的值为true:在hbase的region大小不均衡引发的数据倾斜,将导致不同的region处理耗时较多,该参数为了解决此场景),则会在每个region对应一个split的基础上,将较小(小于平均大小)的region进行合并作为一个split,较大(大于平均size的三倍(其中三可配置))的region拆分为两个region。

    splits伪代码如下(源码可参考TableInputFormatBase.calculateRebalancedSplits):
    
    while ( i < splits.size)
    {
        if(splits(i).size > averagesize * 3) {
        if(! splitAble)
            resultsplits.add(split(i))
        else{
            (split1,split2) = Split(splits(i))
            resultsplits.add(split1)
            resultsplits.add(split2)
        }
        i++ 
        }
        else if(splits(i).size > averagesize) {
            resultsplits.add(split(i))
            i++
        }else{
            startKey = split(i).getStartRow
            i++;
            while(totalSize + splits(i).size < averagesize * 3){
                totalSize += splits(i).size
                endKey = splits(i).getEndRow
            }
            resultsplits.add(new TableSplit(startKey,endKey,*))
        }
    }

Kafka数据的分片

Spark框架在读取Kafka消息时,将Kafka数据抽象为KafkaRDD(SparkStreaming)或者KafkaSourceRDD(StructedStreaming),查看对应RDD的getPartitions方法和定义:

KafkaSourceRDD:

    override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray
 }
 offsetRanges的数据结构为
 private[kafka010] case class KafkaSourceRDDOffsetRange(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long,
preferredLoc: Option[String]) 

可以看出partition个数为对应的TopicPartition的个数

KafkaRDD

override def getPartitions: Array[Partition] = {
    offsetRanges.zipWithIndex.map { case (o, i) =>
        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)
    }.toArray
  }
  offsetRanges数据结构为:
  final class OffsetRange private(
        val topic: String,
        val partition: Int,
        val fromOffset: Long,
        val untilOffset: Long) 

可以看出partition个数为对应的partition的个数

总结

在spark框架中,对于输入数据获取RDD的处理:

  • 读取数据时的分片由数据量,数据"存储格式"决定,框架/应用并不能真正决定分片数。
  • 对于通过数据生成的RDD,如makeRDD,parallize等方法生成的RDD,则可以指定相应的RDD的分片数。
  • 对于FileInputFormat格式的数据,可通过设置最小的分片数来扩大RDD分片数,但不能决定最终由多少分片数(最终分片数 >= 设置的最小分片数)
  • 其他类型的数据/文件的分片方法也是通过输入文件格式的getSplit方法来获取分片
  • Split方法直接决定了输入数据的分片数,影响应用并行度,在一些场景下,应用可以定制特定的getSplits方法来实现一些特殊需求。如hive在处理小文件时自定义了combineFileInputForamt,Hbase在以region为单位划分split之后,再跟进每个region数据量来合并/分拆split来优化性能

PS: 其他相关的数据分片

对于输入文件的分片,不同的文件格式使用的分片方法不尽相同。 如hive中使用的parquet,RCFIle格式文件,其getsplits方法直接使用的是FileInputFormat.getSplits, 而orc格式文件的getsplits方法则是继承于InputFormat

在Hive中默认使用的是CombineFileInputFormat,它的作用是在启动map时,会将多个小文件进行合并,已启动较少的map提升应用运行速度。其getsplits方法在合并小文件时会考虑更多的因素,如:

mapreduce.input.fileinputformat.split.minsize,
mapreduce.input.fileinputformat.split.minsize.per.node
mapreduce.input.fileinputformat.split.minsize.per.rack
mapreduce.input.fileinputformat.split.maxsize

经过转换的分片

  • Spark框架中,RDD的分片数决定了对RDD处理时的并发度,因此合理的RDD分片数,对应用的性能有较大影响。

  • RDD的转换通常不会改变RDD的partition数,如map,flatmap,mappartitions等操作并没有传入partition数的API,无法修改新生成的RDD的的分片数。可参考org.apache.spark.rdd.RDD

  • 如果需要强制修改新生成RDD的分片数,可直接调用RDD.repartition,RDD.coalesce强制修改新生成RDD的分片数

  • 对于RDD[KEY,VALUE]类型的RDD的操作如join,reduceByKey,aggregateByKey,combineByKey等接口可通过传入分片数/设置partitioner等方式设置shuffle之后的RDD的partition个数,从而调整后续的stage的并发task个数.可参考org.apache.spark.rdd.PairRDDFunctions

  • 对于需要进行shuffle操作的算子,在变换的过程中,会自动生成shuffledRDD,该RDD的分片数可通过触发shuffle操作的算子调用时设置。如果没有设置时,则会使用默认的分片数。

  • 对于普通的应用shuffle后的默认的分片数由spark.default.parallelism参数决定,默认200 对于sql相关的操作,shuffle后的默认分片数由spark.sql.shuffle.partitions操作决定,默认为200

  • 对于某些特殊的操作,sql的内部优化可能会触发shuffle操作。如使用到treeaggregate会触发shuffle操作,shuffle后的partition数目默认为原始的开方。即原有2000个partition时,shuffle后的partition为44个。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容