spark的shuffle 和原理分析

1. 概述

shuffle 就是对数据进行重组,由于分布式计算的特性和要求,在实现细节上更加繁琐和复杂.
在 MapReduce框架,Shuffle 是连接 Map 和 Reduce 之间的桥梁,Map 阶段通过 shuffle 读取数据并输出到对应的 Reduce ;而 Reduce 阶段负责从 Map 端拉取数据并进行计算,在整个 shuffle 过程中,往往伴随着大量的磁盘和网络 I/O.所以 shuffle 性能的高低也直接决定了整个程序的性能高低,Spark 也会有自己的 shuffle 实现过程.


2 .spark 中的shuffle介绍

在 DAG 调度的过程中,Stage 阶段的划分是根据是否有 shuffle 过程,也就是存在 ShuffleDependency 宽依赖的时候,需要进行 shuffle ,这时候会将作业 job 划分成多个 Stage ,并且在划分 Stage 的时候,构建 ShuffleDependency 的时候进行 shuffle 注册,获取后续数据读取所需要的 ShuffleHandle ,最终每一个 Job 提交后都会生成一个 ResultStage 和 若干个 ShuffleMapStage ,其在中 ResultStage 表示生成作业的最终结果所在的Stage.
ResultStage 与 ShuffleMapStage 中的 task 分别对应着 RedultTask 与 ShuffleMapTask.
一个作业,除了最终的 ResultStage 外,其他若干 ShuffleMapStage 中各个 ShuffleMapTask 需要将最终的数据根据相应的 Partitioner 对数据进行分组,然后持久化分区的数据.

2.1 HashShuffle 机制

2.1.1 HashShuffle 概述

在 spark-1.6 版本之前,一直使用 HashShuffle, 在 Spark-1.6 版本之后使用 Sort-Base Shuffle,因为 HashShuffle 存在的不足所以就替换了 HashShuffle.
我们知道,Spark 的运行主要分为 2 部分,一部分是 驱动程序,其核心是 SparkContext; 另一部分是 Worker 节点上 Task ,它是运行 实际任务的,程序运行的时候,Driver 和 Executor 进程相互交互,运行什么任务,即 Driver 会分配 Task 到 Executor, Driver 跟 Executor 进行网络传输,任务数据从哪儿获取,即 Task 要从 Driver 抓取其他上游的 Task 的数据结果,所以有这个过程中就不断的产生网络结果,其中,下一个 Stage 向上一个 Stage 要数据这个过程,我们就称为 Shuffle.

2.1.2 没有优化之前的 HashShuffle 机制


在HashShuffle 没有优化之前,每一个 ShuffleMapTask 会为每一个 ReduceTask 创建一个 bucket 缓存,并且会为每一个 bucket 创建一个文件.这个bucket 存放的数据就是经过 Partitioner 操作(默认是 HashPartitioner)之后,找到对应的 bucket 然后放进去,最后将数据
刷新 bucket 缓存的数据到磁盘上, 即对应的 block file.
然后 ShuffleMapTask 将输出作为 MapStatus 发送到 DAGScheduler 的
MapOutPutTrackerMaster ,每一个 MapStatus 包含了每一个 ResultTask 要拉取的数据的位置和大小.
ResultTask 然后去利用 BlockStoreShuffleFetcher 向 MapOutPutTrackerMaster 获取 MapStatus ,看哪一份数据是属于自己的,然后底层通过 BlockManager 将数据拉取过来.
拉取过来的数据会组成一个 内部的 ShuffleRDD,优先放入内存,内存不够则放入磁盘,然后 ResulTask 开始进行聚合,最后生成我们希望获取的那个 MapPartitionRDD.

存在的问题

如图所示: 在这里有 1 个worker, 2 个 executor ,每一个 executor 运行两个 ShuffleMapTask,有三个 ReduceTask,所以总共就有 4 * 3 = 12 个bucket 和 12 个 block file.
如果数据量较大,将会生成 M * R 个小文件,比如 ShuffleMapTask 有 100 , ResultTask 有 100 个,这就会产生 100 * 100 = 1000000 个小文件.
bucket 缓存很重要,需要将 ShuffleMapTask 所有的数据都写入 bucktet, 才会刷到 磁盘,那么如果 Map 端数据过多,这就很容易造成内存溢出,尽管后面有优化, bucket 写入的数据达到刷新到磁盘的阀值之后,就会将数据一点点的刷新到磁盘,但是这样 磁盘的 I/O 就多了.

$ 2.1.2 优化的 HashShuffle


每一个 Executor 进程根据核数,决定 Task 的并发数量,比如 executor 核心数是2 , 就是可以并发运行两个 task ,如果是一个 则只能运行一个 task.
假设 executor 核心数是1 , ShuffleMapTask 数量是 M, 那么塔依然会根据 ReduceTask 的数量T , 创建 R 个 bucket 缓存,然后对 Key 进行 Hash ,数据进入不同的 bucket 中,每一个 bucket 对应一个 block file ,用于刷新 bucket 缓存里的数据。
然后下一个 task 运行的时候,那么不会再创建新的 bucket 和 block file,而是复用之前的 task 已经创建好的 bucket 和 block file 。即所谓的用一个 Executor 进程里所有的 Task 都会把相同的 Key 放入相同的 bucket 缓冲中。
这样的话,生成文件的数量就是 ( 本地 worker 的 executor 数量 * executor 的 cores * ResultTask 数量) 如上如所示,。即 2 * 1 * 3 = 6 个文件,每一个 Executor 的 shuffleMapTask 数量 100, 那么 未优化的 HasjShuffle 的文件数 2 * 1 * 100 100 = 2090000,优化之后的数量是 2 1 * 100 = 200 文件,相当于少了100 倍.

存在的问题

如果 Reducer 端的并行任务或者是数据分片过多的话 则 Core * Reducer Task 依旧过大,也会产生很多小文件.

2.2 Sort-Based Shuffle 机制

2.2.1 概述

HashShuffle 回顾
HashShuffle 写数据的时候,内存有一个 bucket 缓冲区,同时在本地磁盘有对应的本地文本,如果本地有文件,那么在内存应该也有文件句柄也是需要消耗内存的,也就是说,从内存的角度考虑,即有一部分存储数据,一部分管理文件句柄,如果 Mapper 分片数量为 1000 , Reduce 分片数量为 1000 ,那么总共就需要 1000000 个小文件,所以就会有很多内存的消耗,频繁 IO 以及 GC 频繁 或者 出现内存溢出.

2.2.2 Sorted-Based Shuffle

为了缓解 Shuffle 过程产生文件过多 和 Writer 缓解开销过大的问题, spark 引入了类似于 Hadoop Map-Reduce 的 Shuffle 机制,该机制每一个 ShuffleMapTask 不会为后续的任务创建单独的文件,而是会将所有的 Task 结果写入同一个文件,并且对应生成一个 索引文件,以前的数据是放在内存缓存中,等到数据完了在刷到磁盘,现在为了减少内存的使用,在内存不够用的时候,可以将输出溢写到磁盘,结束的时候,再将这些不同的文件联合内存的数据一起进行归并,从而减少内存的使用量,一方面文件数量显著减少,另一方面减少 Writer 缓存所占用的内存大小,从而同时避免 GC 的风险和频率.


####### 对于 BypassMergeSortShuffleWriter ,使用这个模式特点
主要用于处理不需要排序和聚合的 Shuffle 操作,所以数据是直接写入文件,数据量较大的时候,网络 I/O 和内存负担较重.
主要适合处理 Reducer 任务数量较少的情况下
将每一个分区写入一个单独的文件,最后将这些文件合并,减少文件数量,但是这种方式需要并发打开多个文件,对内存消耗比较大.
因为 BypassMergeSortShuffleWriter

补充

另外这个 Sort-Based Shuffle 跟 Executor 核心没有关系,即跟并发度没有关系,它是每一个 ShuffleMapTask 都会产生一个 data文件和 index 文件,所谓合并也只是将该 ShuffleMapTask 的各个 partition 对应的分区文件合并到data文件而已,所以这个就需要优化后的 HashShuffle 的区别开来.
比较适合数据量很大的场景或者集群规模很大
引入了外部排序器,可以支持在 Map 端进行本地聚合或者不聚合
如果外部排序器 enable 了 Spill 功能,如果内存不够,可以先将输出溢写 到本地磁盘, 最后将 内存结果和本地磁盘的溢写文件进行合并.

对于 UnsafeShuffleWriter 由于需要谨慎使用,我们暂时不做分析了.

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容