我尽量用好懂的方式总结一下,踩过坑的前辈多提意见,同时也希望为各位同学提供帮助。
简单的打个比方,shuffle就是顺丰快递,将map端的数据经过排序分类等一些列的操作,到reduce端的过程。reduce端并不是等map端执行完后将结果传来,而是直接去map端去Copy输出文件。 我住长江头,你住长江尾,我吃火锅,你吃火锅底料 ... =.=
map的shuffle过程
【1.Partition分区】
map端处理完数据后,当key/value被写入缓冲区之前,都会被序列化为字节流。mapreduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reducetask处理(分区)。默认对key hash后再以reducetask数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
注意:虽然Partitioner接口会计算出一个值来决定某个输出会交给哪个reduce去处理,但是在缓冲区中并不会实现物理上的分区,而是将结果加载key-value后面。物理上的分区实在磁盘上进行的。
【2.环形缓冲区】
map在内存中有一个环形缓冲区(字节数组实现),用于存储任务的输出。默认是100M,这其中80%的容量用来缓存,当这部分容量满了的时候会启动一个溢出线程进行溢出操作,写入磁盘形成溢写文件;在溢出的过程中剩余的20%对新生产的数据继续缓存。【简单来说就是别读边写】但如果再次期间缓冲区被填满,map会阻塞直到写磁盘过程完成。
阈值是可以设置的,但一般默认就可以了。1)环形缓冲区大小:mapred-site.xml中设置mapreduce.task.io.sort.mb的值 2)环形缓冲区溢写的阈值:mapred-site.xml中设置mapreduce.map.sort.spill.percent的值
作用:为什么要分区呢??由于map()处理后的数据量可能会非常大,所以如果由一个reduce()处理效率不高,为了解决这个问题可以用分布式的思想,一个reduce()解决不了,就用多个reduce节点。一般来说有几类分区就对应有几个reduce节点,把相同分区交给一个reduce节点处理。
【3.Spill溢写sort排序】
缓冲区的数据写到磁盘前,会对它进行一个二次快速排序,首先根据数据所属的partition (分区)排序,然后每个partition中再按Key 排序。输出包括一个索引文件和数据文件。如果设定了Combiner,将在排序输出的基础上运行。【Combiner】就是一个简单Reducer操作,它在执行Map 任务的节点本身运行,先对Map 的输出做一次简单Reduce,使得Map的输出更紧凑,更少的数据会被写入磁盘和传送到Reducer。临时文件会在map任务结束后删除。
【4.merg文件合并】
每次溢写会在磁盘上生成一个溢写文件,如果map的输出结果很大,有多次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。因为最终的文件只有一个,所以需要将这些溢写文件归并到一起,这个过程就叫做Merge。【merge就是多个溢写文件合并到一个文件】所以可能也有相同的key存在,在这个过程中如果client设置过Combiner,也会使用Combiner来合并相同的key。
map端就处理完了,接下来就是reduce端了。reduce端的shuffle开始工作,而不是reduce操作开始执行,在shuffle阶段reduce不会运行。map完成后,会通过心跳将信息传给tasktracker,其进而通知jobtracker,然后reducetask开始shuffle工作
reduce的shuffle过程
【 copy复制 】:reduce端默认有5个数据复制线程从map端复制数据,其通过Http方式得到Map对应分区的输出文件。reduce端并不是等map端执行完后将结果传来,而是直接去map端去Copy输出文件。
【Merge合并】:reduce端的shuffle也有一个环形缓冲区,它的大小要比map端的灵活(由JVM的heapsize设置),由Copy阶段获得的数据,会存放的这个缓冲区中,同样,当到达阀值时会发生溢写到磁盘操作,这个过程中如果设置了Combiner也是会执行的,这个过程会一直执行直到所有的map输出都被复制过来,如果形成了多个磁盘文件还会进行合并,最后一次合并的结果作为reduce的输入而不是写入到磁盘中。
【reduce执行】当Reducer的输入文件确定后,整个Shuffle操作才最终结束。之后就是Reducer的执行了,最后Reducer会把结果存到HDFS上。
欢迎各位大佬补充批评。