shuffle过程:
map task:
1.首先每个输入分片(input split)会让一个map任务处理。默认情况下,以HDFS的一个块的大小为一个分片。
2.map输出的结果会暂时存放在一个环形内存缓冲区(buffer in memory)中,(该缓冲区的大小默认为100M,由io.sort.mb属性配置)。
3.当该缓冲区快要溢出时(默认为缓冲区大小的80%,由io.sort.spill.percent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据进行分区(partition:根据reduce任务的数目划分数目,一个reuduce任务对应一个分区。目的是为了避免有些reduce任务分配不均的情况),排序(sort)后,如果设置了combiner,将排序后的结果进行组合(combine:目的是尽可能的数据写入到磁盘,减少之后的map端到reduce端的传输),再溢写入这个文件(spill to disk)。
4.在磁盘上合并数据(merge on disk)。当数据都溢出到磁盘后,需要对溢出文件进行合并。合并的过程中会不断地进行排序(sort)和组合(combine)操作。目的有两个:1.尽量减少每次写入磁盘的数据量;2.尽量减少下一复制阶段(copy phase)网络传输的数据量。最后合并成了一个已分区且已排序的文件。为了减少网络传输的数据量,还可以将数据压缩(mapred.compress.map.out=true)。
5.将分区中的数据拷贝给相对应的reduce任务。
reduce task:
1.Reduce会接收到不同map任务传来的数据,并且每个map传来的数据都是有序的。如果reduce端接受的数据量相当小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间的百分比),如果数据量超过了该缓冲区大小的一定比例(由mapred.job.shuffle.merge.percent决定),则对数据合并后溢写到磁盘中。
2.随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节省时间。其实不管在map端还是reduce端,MapReduce都是反复地执行排序,合并操作。
3.合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到reduce函数。
概念:
分块:
在HDFS系统中,为了便于文件的管理和备份,引入分块概念(block)。这里的 块 是HDFS存储系统当中的最小单位,HDFS默认定义一个块的大小为64MB(hadoop2为128M)。当有文件上传到HDFS上时,则该文件会按照分块大小被切分存储为多个块,多个块可以存放在不同的DataNode上,整个过程中 HDFS系统会保证一个块存储在一个datanode上 。 如果某文件大小没有到达64MB,该文件并不会占据整个块空间。
HDFS中的NameNode会记录在上述文件分块中文件的各个块都存放在哪个dataNode上,这些信息一般也称为 元信息(MetaInfo) 。元信息的存储位置由dfs.name.dir
指定。
分片
当一个作业提交到Hadoop运行的时候,其中的核心步骤是MapReduce,Hadoop会将MapReduce的输入数据划分为等长的小数据块,称为输入分片。hadoop为每个分片构建一个map任务。
hadoop计算的分片大小不小于blockSize,并且不小于mapred.min.split.size。默认情况下,以HDFS的一个块的大小(默认为64M)为一个分片,即分片大小等于分块大小。当某个分块分成均等的若干分片时,会有最后一个分片大小小于定义的分片大小,则该分片独立成为一个分片。