这里讲的hadoop1.0版本主要还是学习mr思想
大家都知道,当我们需要编写一个简单的MapReduce作业时,只需实现map()和reduce()两个函数即可,一旦将作业提交到集群上后,Hadoop内部会将这两个函数封装到Map Task和Reduce Task中,同时将它们调度到多个节点上并行执行,而任务执行过程中可能涉及的数据跨节点传输,记录按key分组等操作均由Task内部实现好了,用户无须关心。
为了深入了解Map Task和Reduce Task内部实现原理,我们将Map Task分解成Read、Map、Collect、Spill和Combine五个阶段,将Reduce Task分解成Shuffle、Merge、Sort、Reduce和Write五个阶段,并依次详细剖析每个阶段的内部实现细节。
Task运行过程概述
在MapReduce计算框架中,一个应用程序被划分成Map和Reduce两个计算阶段,它们分别由一个或者多个Map Task和Reduce Task组成。其中,每个Map Task处理输入数据集合中的一片数据(InputSplit),并将产生的若干个数据片段写到本地磁盘上,而Reduce Task则从每个Map Task上远程拷贝相应的数据片段,经分组聚集和归约后,将结果写到HDFS上作为最终结果,具体如图8-1所示。总体上看,Map Task与Reduce Task之间的数据传输采用了pull模型。为了能够容错,Map Task将中间计算结果存放到本地磁盘上,而Reduce Task则通过HTTP请求从各个Map Task端拖取(pull)相应的输入数据。为了更好地支持大量Reduce Task并发从Map Task端拷贝数据,Hadoop采用了Jetty Server作为HTTP Server处理并发数据读请求。
对于Map Task而言,它的执行过程可概述为:首先,通过用户提供的InputFormat将对应的InputSplit解析成一系列key/value,并依次交给用户编写的map()函数处理;接着按照指定的Partitioner对数据分片,以确定每个key/value将交给哪个Reduce Task处理;之后将数据交给用户定义的Combiner进行一次本地规约(用户没有定义则直接跳过);最后将处理结果保存到本地磁盘上。
对于Reduce Task而言,由于它的输入数据来自各个Map Task,因此首先需通过HTTP请求从各个已经运行完成的Map Task上拷贝对应的数据分片,待所有数据拷贝完成后,再以key为关键字对所有数据进行排序,通过排序,key相同的记录聚集到一起形成若干分组,然后将每组数据交给用户编写的reduce()函数处理,并将数据结果直接写到HDFS上作为最终输出结果。
基本数据结构和算法
在Map Task和Reduce Task实现过程中用到了大量数据结构和算法,我们选取其中几个非常核心的部分进行介绍。
前面提到,用户可通过InputFormat和OuputFormat两个组件自定义作业的输入输出格式,但并不能自定义Map Task的输出格式(也就是Reduce Task的输入格式)。考虑到Map Task的输出文件需要到磁盘上并被Reduce Task远程拷贝,为尽可能减少数据量以避免不必要的磁盘和网络开销,Hadoop内部实现了支持行压缩的数据存储格式——IFile。
按照MapReduce语义,Reduce Task需将拷贝自各个Map Task端的数据按照key进行分组后才能交给reduce()函数处理,为此,Hadoop实现了基于排序的分组算法。但考虑到若完全由Reduce Task进行全局排序会产生性能瓶颈,Hadoop采用了分布式排序策略:先由各个Map Task对输出数据进行一次局部排序,然后由Reduce Task进行一次全局排序。
在任务运行过程中,为了能够让JobTracker获取任务执行进度,各个任务会创建一个进度汇报线程Reporter,只要任务处理一条新数据,该线程将通过RPC告知TaskTracker,并由TaskTracker通过心跳进一步告诉JobTracker。
IFile存储格式
IFile是一种支持行压缩的存储格式。通常而言,Map Task中间输出结果和Reduce Task远程拷贝结果被存放在IFile格式的磁盘文件或者内存文件中。为了尽可能减少Map Task写入磁盘数据量和跨网络传输数据量,IFile支持按行压缩数据记录。当前Hadoop提供了Zlib(默认压缩方式)、BZip2等压缩算法。如果用户想启用数据压缩功能,则需为作业添加以下两个配置选项。
- mapred. compress.map.output:是否支持中间输出结果压缩,默认为false。
- mapred. map.output.compression.codec:压缩器(默认是基于Zlib算法的压缩器DefaultCodec)。任何一个压缩器需实现CompressionCodec接口以提供压缩输出流和解压缩输入流。
一旦启用了压缩机制,Hadoop会为每条记录的key和value值进行压缩。IFile定义的文件格式非常简单,整个文件顺次保存数据记录,每条数据记录格式为:
<key-len, value-len, key, value>
由于Map Task会按照key值对输出数据进行排序,因此IFile通常保存的是有序数据集。
IFile文件读写操作由类IFile实现,该类中包含两个重要内部类:Writer和Reader,分别用于Map Task生成IFile和Reduce Task读取一个IFile(对于内存中的数据读取,则使用InMemoryReader)。此外,为了保证数据一致性,Hadoop分别为Writer和Reader提供了IFileOutputStream和IFileInputStream两个支持CRC32校验的类,具体如图8-2所示。
排序
排序是MapReduce框架中最重要的操作之一。Map Task和Reduce Task均会对数据(按照key)进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
对于Map Task,它会将处理的结果暂时放到一个缓冲区中,当缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次排序,并将这些有序数据以IFile文件形式写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行一次合并,以将这些文件合并成一个大的有序文件。
对于Reduce Task,它从每个Map Task上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则放到磁盘上,否则放到内存中。如果磁盘上文件数目达到一定阈值,则进行一次合并以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据写到磁盘上。当所有数据拷贝完毕后,Reduce Task统一对内存和磁盘上的所有数据进行一次合并。
在Map Task和Reduce Task运行过程中,缓冲区数据排序使用了Hadoop自己实现快速排序算法,而IFile文件合并则使用了基于堆实现的优先队列。
Reporter
前面文章提到,所有Task需周期性向TaskTracker汇报最新进度和计数器值,而这正是由Reporter组件实现的。在Map/Reduce Task中,TaskReporter类实现了Reporter接口,并且以线程形式启动。TaskReporter汇报的信息中包含两部分:任务执行进度和任务计数器值。
1.任务执行进度
任务执行进度信息被封装到类Progress中,且每个Progress实例以树的形式存在。Hadoop采用了简单的线性模型计算每个阶段的进度值:如果一个大阶段可被分解成若干个子阶段,则可将大阶段看作一棵树的父节点,而子阶段可看作父节点对应的子节点,且大阶段的进度值可被均摊到各个子阶段中;如果一个阶段不可再分解,则该阶段进度值可表示成已读取数据量占总数据量的比例。
对于Map Task而言,它作为一个大阶段不可再分解,为了简便,我们直接将已读取数据量占总数据量的比例作为任务当前执行进度值。
对于Reduce Task而言,我们可将其分解成三个阶段:Shuffle、Sort和Reduce,每个阶段占任务总进度的1/3。考虑到在Shuffle阶段,Reduce Task需从M(M为Map Task数目)个Map Task上读取一片数据,因此,可被分解成M个阶段,每个阶段占Shuffle进度的1/M,具体如图8-5所示。
对于TaskReporter线程而言,它并不会总是每隔一段时间汇报进度和计数器值,而是仅当发现以下两种情况之一时才会汇报。
- 任务执行进度发生变化;
- 任务的某个计数器值发生变化。
在某个时间间隔内,如果任务执行进度和计数器值均未发生变化,则Task只会简单地通过调用RPC函数ping探测TaskTracker是否活着。在一定时间内,如果某个任务的执行进度和计数器值均未发生变化,则TaskTracker认为它处于悬挂(hang up)状态,直接将其杀掉。为了防止某条记录因处理时间过长导致被杀,用户可采用以下两种方法:
- 每隔一段时间调用一次TaskReporter.progress()函数,以告诉TaskTracker自己仍然活着。
- 增大任务超时参数mapred.task.timeout(默认是10 min)对应的值。
2.任务计数器
任务计数器(Counter)是Hadoop提供的,用于实现跟踪任务运行进度的全局计数功能。任务计数器(Counter)是Hadoop提供的,用于实现跟踪任务运行进度的全局计数功能。用户可在自己的应用程序中添加计数器。任务计数器由两部分组成:<name, value>,其中,name表示计数器名称,value表示计数器值(long类型)。计数器通常以组为单位管理,一个计数器属于一个计数器组(CounterGroup)。此外,Hadoop规定一个作业最多包含120个计数器(可通过参数mapreduce.job.counters.limit设定),50个计数器组。
对于同一个任务而言,所有任务包含的计数器相同,每个任务更新自己的计数器值,然后汇报给TaskTracker,并由TaskTracker通过心跳汇报给JobTracker,最后由JobTracker以作业为单位对所有计数器进行累加。作业的计数器分为两类:MapReduce内置计数器和用户自定义计数器。
(1)MapReduce内置计数器
MapReduce框架内部为每个任务添加了三个计数器组,分别位于File Input Format Counters, File Output Format Counters和Map-Reduce Framework中。它们包含的计数器分别见表8-1,表8-2和表8-3。
(2)用户自定义计数器
Map Task内部实现
前面提到,Map Task分为4种,分别是Job-setup Task、Job-cleanup Task、Task-cleanup Task和Map Task。
其中,Job-setup Task和Job-cleanup Task分别是作业运行时启动的第一个任务和最后一个任务,主要工作分别是进行一些作业初始化和收尾工作,比如创建和删除作业临时输出目录;而Task-cleanup Task则是任务失败或者被杀死后,用于清理已写入临时目录中数据的任务。本节主要讲解第四种任务——普通的Map Task。它需要处理数据,并将计算结果存到本地磁盘上。
Map Task整体流程
Map Task的整体计算流程如图8-6所示,共分为5个阶段,分别是:
- Read阶段:Map Task通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。
- Map阶段:该阶段主要是将解析出的key/value交给用户编写的map()函数处理,并产生一系列新的key/value。
- Collect阶段:在用户编写的map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分片(通过调用Partitioner),并写入一个环形内存缓冲区中。
- Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
-
Combine阶段:当所有数据处理完成后,Map Task对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
MapReduce框架提供了两套API,默认情况下采用旧API,用户可通过设置参数mapred.mapper.new-api为true启用新API。新API在封装性和扩展性等方面优于旧API,但性能上并没有改进。这里主要以旧API为例进行讲解。
在Map Task中,最重要的部分是输出结果在内存和磁盘中的组织方式,具体涉及Collect、Spill和Combine三个阶段,也就是用户调用OutputCollector.collect()函数之后依次经历的几个阶段。我们将在下面几小节深入分析这几个阶段。
Collect过程分析
待map()函数处理完一对key/value,并产生新的key/value后,会调用OutputCollector.collect()函数输出结果。本小节重点剖析该函数内部实现机制。
跟踪进入Map Task的入口函数run(),可发现,如果用户选用旧API,则会调用runOldMapper函数处理数据。该函数根据实际的配置创建合适的MapRunnable以迭代调用用户编写的map()函数,而map()函数的参数OutputCollector正是MapRunnable传入的OldOutputCollector对象。
OldOutputCollector根据作业是否包含Reduce Task封装了不同的MapOutputCollector实现,如果Reduce Task数目为0,则封装DirectMapOutputCollector对象直接将结果写入HDFS中作为最终结果,否则封装MapOutputBuffer对象暂时将结果写入本地磁盘上以供Reduce Task进一步处理。本小节主要分析Reduce Task数目非0的情况。
用户在map()函数中调用OldOutputCollector.collect(key, value)后,在该函数内部,首先会调用Partitioner.getPartition()函数获取记录的分区号partition,然后将三元组<key, value, partition>传递给MapOutputBuffer.collect()函数做进一步处理。
MapOutputBuffer内部使用了一个缓冲区暂时存储用户输出数据,当缓冲区使用率达到一定阈值后,再将缓冲区中的数据写到磁盘上。数据缓冲区的设计方式直接影响到Map Task的写效率,而现有多种数据结构可供选择,最简单的是单向缓冲区,生产者向缓冲区中单向写入输出,当缓冲区写满后,一次性写到磁盘上,就这样,不断写缓冲区,直到所有数据写到磁盘上。单向缓冲区最大的问题是性能不高,不能支持同时读写数据。双缓冲区是对单向缓冲区的一个改进,它使用两个缓冲区,其中一个用于写入数据,另一个将写满的数据写到磁盘上,这样,两个缓冲区交替读写,进而提高效率。实际上,双缓冲区只能一定程度上让读写并行,仍会存在读写等待问题。一种更好的缓冲区设计方式是采用环形缓冲区:当缓冲区使用率达到一定阈值后,便开始向磁盘上写入数据,同时,生产者仍可以向不断增加的剩余空间中循环写入数据,进而达到真正的读写并行。三种缓冲区结构如图8-7所示。
MapOutputBuffer正是采用了环形内存缓冲区保存数据,当缓冲区使用率达到一定阈值后,由线程SpillThread将数据写到一个临时文件中,当所有数据处理完毕后,对所有临时文件进行一次合并以生成一个最终文件。环形缓冲区使得Map Task的Collect阶段和Spill阶段可并行进行。
MapOutputBuffer内部采用了两级索引结构(见图8-8),涉及三个环形内存缓冲区,分别是kvoffsets、kvindices和kvbuffer,这三个缓冲区所占内存空间总大小为io.sort.mb(默认是100 MB)。下面分别介绍这三个缓冲区的含义。
(1)kvoffsets
kvoffsets即偏移量索引数组,用于保存key/value信息在位置索引kvindices中的偏移量。考虑到一对key/value需占用数组kvoffsets的1个int(整型)大小,数组kvindices的3个int大小(分别保存所在partition号、key开始位置和value开始位置),所以Hadoop按比例1:3将大小为{io.sort.mb}的内存空间分配给数组kvoffsets和kvindices,其间涉及的缓冲区分配方式见图8-9,计算过程如下:
private static final int ACCTSIZE=3;//每对key/value占用kvindices中的三项
private static final int RECSIZE=(ACCTSIZE+1)*4;//每对key/value共占用
kvoffsets和kvindices中的4个字节(4*4=16 byte)
final float recper=job.getFloat("io.sort.record.percent",(float)0.05);
final int sortmb=job.getInt("io.sort.mb",100);
int maxMemUsage=sortmb<<20;//将内存单位转化为字节
int recordCapacity=(int)(maxMemUsage*recper);
recordCapacity-=recordCapacity%RECSIZE;//保证recordCapacity是4*4的整数倍
recordCapacity/=RECSIZE;//计算内存中最多保存key/value数目
kvoffsets=new int[recordCapacity];//kvoffsets占用1:3中的1
kvindices=new int[recordCapacity*ACCTSIZE];//kvindices占用1:3中的3
当该数组使用率超过io.sort.spill.percent后,便会触发线程SpillThread将数据写入磁盘。
(2)kvindices
kvindices即位置索引数组,用于保存key/value值在数据缓冲区kvbuffer中的起始位置。
(3)kvbuffer
kvbuffer即数据缓冲区,用于保存实际的key/value值,默认情况下最多可使用io.sort.mb中的95%,当该缓冲区使用率超过io.sort.spill.percent后,便会触发线程SpillThread将数据写入磁盘。
以上几个缓冲区读写采用了典型的单生产者消费者模型,其中,MapOutputBuffer的collect方法和MapOutputBuffer.Buffer的write方法是生产者,spillThread线程是消费者,它们之间同步是通过可重入的互斥锁spillLock和spillLock上的两个条件变量(spillDone和spillReady)完成的。生产者主要的伪代码如下:
//取得下一个可写入的位置
spillLock.lock();
if(缓冲区使用率达到阈值){
//唤醒SpillThread线程,将缓冲区数据写入磁盘
spillReady.signal();
}
if(缓冲区满){
//等待SpillThread线程结束
spillDone.wait();
}
spillLock.lock();
//将数据写入缓冲区
下面分别介绍环形缓冲区kvoffsets和kvbuffer的数据写入过程。
(1)环形缓冲区kvoffsets
通常用一个线性缓冲区模拟实现环形缓冲区,并通过取模操作实现循环数据存储。下面介绍环形缓冲区kvoffsets的写数据过程。该过程由指针kvstart/kvend/kvindex控制,其中kvstart表示存有数据的内存段初始位置,kvindex表示未存储数据的内存段初始位置,而在正常写入情况下,kvend=kvstart,一旦满足溢写条件,则kvend=kvindex,此时指针区间[kvstart, kvend)为有效数据区间。具体涉及的操作如下。
操作1:写入缓冲区。
直接将数据写入kvindex指针指向的内存空间,同时移动kvindex指向下一个可写入的内存空间首地址,kvindex移动公式为:kvindex=(kvindex+1)%kvoffsets.length。由于kvoffsets为环形缓冲区,因此可能涉及两种写入情况。
情况1:kvindex>kvend,如图8-10所示。在这种情况下,指针kvindex在指针kvend后面,如果向缓冲区中写入一个字符串,则kvindex指针后移一位。
情况2:kvindex<=kvend,如图8-11所示。在这种情况下,指针kvindex位于指针kvend前面,如果向缓冲区中写入一个字符串,则kvindex指针后移一位。
操作2:溢写到磁盘。
当kvoffsets内存空间使用率超过io.sort.spill.percent(默认是80%)后,需将内存中数据写到磁盘上。为了判断是否满足该条件,需先求出kvoffsets已使用内存。如果kvindex>kvend,则已使用内存大小为kvindex-kvend;否则,已使用内存大小为kvoffsets.length-(kvend-kvindex)。
(2)环形缓冲区kvbuffer
环形缓冲区kvbuffer的读写操作过程由指针bufstart/bufend/bufvoid/bufindex/bufmark控制,其中,bufstart/bufend/bufindex含义与kvstart/kvend/kvindex相同,而bufvoid指向kvbuffer中有效内存结束为止,kvbuffer表示最后写入的一个完整key/value结束位置,具体写入过程中涉及的状态和操作如下:
情况1:初始状态。
初始状态下,bufstart=bufend=bufindex=bufmark=0,bufvoid=kvbuffer.length,如图8-12所示。
情况2:写入一个key。
写入一个key后,需移动bufindex指针到可写入内存初始位置,如图8-13所示。
情况3:写入一个value。
写入key对应的value后,除移动bufindex指针外,还要移动bufmark指针,表示已经写入一个完整的key/value,具体如图8-14所示。
情况4:不断写入key/value,直到满足溢写条件,即kvoffsets或者kvbuffer空间使用率超过io.sort.spill.percent(默认值为80%)。此时需要将数据写到磁盘上,如图8-15所示。
情况5:溢写。
如果达到溢写条件,则令bufend←bufindex,并将缓冲区[bufstart, bufend)之间的数据写到磁盘上,具体如图8-16所示。
溢写完成之后,恢复正常写入状态,令bufstart←bufend,如图8-17所示。
在溢写的同时,Map Task仍可向kvbuffer中写入数据,如图8-18所示。
情况6:某个key或者value太大,以至于整个缓冲区不能容纳它。
如果一条记录的key或value太大,整个缓冲区都不能容纳它,则Map Task会抛出MapBufferTooSmallException异常,并将该记录单独输出到一个文件中。
前面提到,Map Task将可用的缓冲区空间io.sort.mb按照一定比例(由参数io.sort.record.percent决定)静态分配给了kvoffsets、kvindices和kvbuffer三个缓冲区,而正如条件1所述,只要任何一个缓冲区的使用率达到一定比例,就会发生溢写现象,即使另外的缓冲区使用率非常低。因此,设置合理的io.sort.record.percent参数,对于充分利用缓冲区空间和减少溢写次数,是十分必要的。考虑到每条数据(一个key/value对)需占用索引大小为16 B,设置io.sort.record.percent:
io.sort.record.percent=16/(16+R)
其中R为平均每条记录的长度。
【实例】假设一个作业的Map Task输入数据量和输出数据量相同,每个Map Task输入数据量大小为128 MB,且共有1 342 177条记录,每条记录大小约为100 B,则需要索引大小为16*1 342 177=20.9 MB。根据这些信息,可设置参数如下:
根据这些信息,可设置参数如下:
- io. sort.mb:128 MB+20.9 MB=148.9 MB
- io. sort.record.percent:16/(16+100)=0.138
- io. sort.spill.percent:1.0
这样配置可保证数据只“落”一次地,效率最高!当然,实际使用时可能很难达到这种情况,比如每个Map Task输出数据量非常大,缓冲区难以全部容纳它们,但你至少可以设置合理的io.sort.record.percent以更充分地利用io.sort.mb并尽可能减少中间文件数目。
Spill过程分析
Spill过程由SpillThread线程完成。在前一小节中已经提到,SpillThread线程实际上是缓冲区kvbuffer的消费者,其主要代码如下:
spillLock.lock();
while(true){
spillDone.signal();
while(kvstart==kvend){
spillReady.await();
}
spillLock.unlock();
sortAndSpill();//排序,然后将缓冲区kvbuffer中的数据写到磁盘上
spillLock.lock();
//重置各个指针,以便为下一次溢写做准备
if(bufend<bufindex&&bufindex<bufstart){
bufvoid=kvbuffer.length;
}
vstart=kvend;
bufstart=bufend;
}
spillLock.unlock();
线程SpillThread调用函数sortAndSpill()将环形缓冲区kvbuffer中区间[bufstart, bufend)内的数据写到磁盘上。函数sortAndSpill()内部工作流程如下:
步骤1 利用快速排序算法对缓冲区kvbuffer中区间[bufstart, bufend)内的数据进行排序,排序方式是,先按照分区编号partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
步骤2 按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
步骤3 将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存中索引大小超过1 MB,则将内存索引写到文件output/spillN.out.index中。
Combine过程分析
当所有数据处理完后,Map Task会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。
在进行文件合并过程中,Map Task以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式:每轮合并io.sort.factor(默认为100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。让每个Map Task最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销.
Reduce Task内部实现
与Map Task一样,Reduce Task也分为四种,即Job-setup Task, Job-cleanup Task, Task-cleanup Task和Reduce Task。本节中重点介绍第四种——普通Reduce Task。Reduce Task要从各个Map Task上读取一片数据,经排序后,以组为单位交给用户编写的reduce()函数处理,并将结果写到HDFS上。本节将深入剖析Reduce Task内部各个阶段的实现原理。
Reduce Task整体流程
Reduce Task的整体计算流程如图8-22所示,共分为5个阶段。
- Shuffle阶段:也称为Copy阶段。Reduce Task从各个Map Task上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
- Merge阶段:在远程拷贝数据的同时,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
- Sort阶段:按照MapReduce语义,用户编写的reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个Map Task已经实现对自己的处理结果进行了局部排序,因此,Reduce Task只需对所有数据进行一次归并排序即可。
- Reduce阶段:在该阶段中,Reduce Task将每组数据依次交给用户编写的reduce()函数处理。
- Write阶段:reduce()函数将计算结果写到HDFS上。
在接下来几小节中,我们将详细介绍Shuffle、Merge、Sort和Reduce四个阶段。考虑到Write阶段比较简单,我们不再介绍。
Shuffle和Merge阶段分析
在Reduce Task中,Shuffle阶段和Merge阶段是并行进行的。当远程拷贝数据量达到一定阈值后,便会触发相应的合并线程对数据进行合并。这两个阶段均是由类ReduceCopier实现的,如图8-23所示,总体上看,Shuffle&Merge阶段可进一步划分为三个子阶段。
(1)准备运行完成的Map Task列表
GetMapEventsThread线程周期性通过RPC从TaskTracker获取已完成Map Task列表,并保存到映射表mapLocations(保存了TaskTracker Host与已完成任务列表的映射关系)中。为防止出现网络热点,Reduce Task通过对所有TaskTracker Host进行“混洗”操作以打乱数据拷贝顺序,并将调整后的Map Task输出数据位置保存到scheduledCopies列表中。
(2)远程拷贝数据
Reduce Task同时启动多个MapOutputCopier线程,这些线程从scheduledCopies列表中获取Map Task输出位置,并通过HTTP Get远程拷贝数据。对于获取的数据分片,如果大小超过一定阈值,则存放到磁盘上,否则直接放到内存中。
(3)合并内存文件和磁盘文件
为了防止内存或者磁盘上的文件数据过多,Reduce Task启动了LocalFSMerger和InMemFSMergeThread两个线程分别对内存和磁盘上的文件进行合并。
接下来,我们将详细剖析每个阶段的内部实现细节。
(1)准备运行完成的Map Task列表
我们知道TaskTracker启动了MapEventsFetcherThread线程。该线程会周期性(周期为心跳时间间隔)通过RPC从JobTracker上获取已经运行完成的Map Task列表,并保存到TaskCompletionEvent类型列表allMapEvents中。
而对于Reduce Task而言,它会启动GetMapEventsThread线程。该线程周期性通过RPC从TaskTracker上获取已运行完成的Map Task列表,并将成功运行完成的Map Task放到列表mapLocations中,具体如图8-24所示。
为了避免出现数据访问热点(大量进程集中读取某个TaskTracker上的数据),Reduce Task不会直接将列表mapLocations中的Map Task输出数据位置交给MapOutputCopier线程,而是事先进行一次预处理:将所有TaskTracker Host进行混洗操作(随机打乱顺序),然后保存到scheduledCopies列表中,而MapOutputCopier线程将从该列表中获取待拷贝的Map Task输出数据位置。需要注意的是,对于一个TaskTracker而言,曾拷贝失败的Map Task将优先获得拷贝机会。
(2)远程拷贝数据
Reduce Task同时启动mapred.reduce.parallel.copies(默认是5)个数据拷贝线程MapOutputCopier。该线程从scheduledCopies列表中获取Map Task数据输出描述对象,并利用HTTP Get从对应的TaskTracker远程拷贝数据,如果数据分片大小超过一定阈值,则将数据临时写到工作目录下,否则直接保存到内存中。不管是保存到内存中还是磁盘上,MapOutputCopier均会保存一个MapOutput对象描述数据的元信息。如果数据被保存到内存中,则将该对象添加到列表mapOutputsFilesInMemory中,否则将该对象保存到列表mapOutputFilesOnDisk中。
在Reduce Task中,大部分内存用于缓存从Map Task端拷贝的数据分片,这些内存占到JVM Max Heap Size(由参数-Xmx指定)的mapred.job.shuffle.input.buffer.percent(默认是0.70)倍,并由类ShuffleRamManager管理。Reduce Task规定,如果一个数据分片大小未超过该内存的0.25倍,则可存放到内存中。如果MapOutputCopier线程要拷贝的数据分片可存放到内存中,则它先要向ShuffleRamManager申请相应的内存,待同意后才会正式拷贝数据,否则需要等待它释放内存。
由于远程拷贝数据可能需要跨网络读取多个节点上的数据,期间很容易由于网络或者磁盘等原因造成读取失败,因此提供良好的容错机制是非常有必要的。当出现拷贝错误时,Reduce Task提供了以下几个容错机制:
如果拷贝数据出错次数超过abortFailureLimit,则杀死该Reduce Task(等待调度器重新调度执行),其中,abortFailureLimit计算方法如下:
abortFailureLimit=max{30,numMaps/10}如果拷贝数据出错次数超过maxFetchFailuresBeforeReporting(可通过参数mapreduce.reduce.shuffle.maxfetchfailures设置,默认是10),则进行一些必要的检查以决定是否杀死该Reduce Task。
如果前两个条件均不满足,则采用对数回归模型推迟一段时间后重新拷贝对应MapTask的输出数据,其中延迟时间delayTime的计算方法如下:
delayTime=10 000×1. 3noFailedFetches
其中noFailedFetches为拷贝错误次数。
(3)合并内存文件和磁盘文件
前面提到,Reduce Task从Map Task端拷贝的数据,可能保存到内存或者磁盘上。随着拷贝数据的增多,内存或者磁盘上的文件数目也必将增加,为了减少文件数目,在数据拷贝过程中,线程LocalFSMerger和InMemFSMergeThread将分别对内存和磁盘上的文件进行合并。
对于磁盘上文件,当文件数目超过(2*ioSortFactor-1)后(ioSortFactor值由参数io.sort.factor指定,默认是10),线程LocalFSMerger会从列表mapOutputFilesOnDisk中取出最小的ioSortFactor个文件进行合并,并将合并后的文件再次写到磁盘上。
对于内存中的文件,当满足以下几个条件之一时,InMemFSMergeThread线程会将内存中所有数据合并后写到磁盘上:
- 所有数据拷贝完毕后,关闭ShuffleRamManager。
- ShuffleRamManager中已使用内存超过可用内存的mapred.job.shuffle.merge.percent(默认是66%)倍且内存文件数目超过2个。
- 内存中的文件数目超过mapred.inmem.merge.threshold(默认是1 000)。
Sort和Reduce阶段分析
当所有数据拷贝完成后,数据可能存放在内存中或者磁盘上,此时还不能将数据直接交给用户编写的reduce()函数处理。根据MapReduce语义,Reduce Task需将key值相同的数据聚集到一起,并按组将数据交给reduce()函数处理。为此,Hadoop采用了基于排序的数据聚集策略。前面提到,各个Map Task已经事先对自己的输出分片进行了局部排序,因此,Reduce Task只需进行一次归并排序即可保证数据整体有序。为了提高效率,Hadoop将Sort阶段和Reduce阶段并行化。在Sort阶段,Reduce Task为内存和磁盘中的文件建立了小顶堆,保存了指向该小顶堆根节点的迭代器,且该迭代器保证了以下两个约束条件:
- 磁盘上文件数目小于io.sort.factor(默认是10)。
- 当Reduce阶段开始时,内存中数据量小于最大可用内存(JVM Max Heap Size)的mapred.job.reduce.input.buffer.percent(默认是0)
在Reduce阶段,Reduce Task不断地移动迭代器,以将key相同的数据顺次交给reduce()函数处理,期间移动迭代器的过程实际上就是不断调整小顶堆的过程,这样,Sort和Reduce可并行进行。
Map/Reduce Task优化
参数调优
由于参数调优与应用程序的特点直接相关,因此本小节仅列出了Map Task和Reduce Task中直接影响任务性能的一些可调整参数(见表8-4和表8-5),具体调整为何值需由用户根据作业特点自行决定。
考虑到Hadoop中用户可配置参数非常多,为了简化参数配置,一些研究机构尝试自动调优参数。
系统优化
Shuffle阶段内部优化
(1)Map端——用Netty代替Jetty
1.0.0版本中,TaskTracker采用了Jetty服务器处理来自各个Reduce Task的数据读取请求。由于Jetty采用了非常简单的网络模型,因此性能比较低。在Apache Hadoop 2.0.0版本中,Hadoop改用Netty,它是另一种开源的客户/服务器端编程框架。由于它内部采用了Java NIO技术,相比Jetty更加高效,且Netty社区更加活跃,其稳定性比Jetty好。
(2)Reduce端——批拷贝
1.0.0版本中,在Shuffle过程中,Reduce Task会为每个数据分片建立一个专门的HTTP连接(One-connection-per-map),即使多个分片同时出现在一个TaskTracker上,也是如此。为了提高数据拷贝效率,Apache Hadoop 2.0.0尝试采用批拷贝技术:不再为每个Map Task建立一个HTTP连接,而是为同一个TaskTracker上的多个Map Task建立一个HTTP连接,进而能够一次读取多个数据分片,具体如图8-25所示。
将Shuffle阶段从Reduce Task中拆分出来
前面提到,对于一个作业而言,当一定比例(默认是5%)的Map Task运行完成后,Reduce Task才开始被调度,且仅当所有Map Task运行完成后,Reduce Task才可能运行完成。在所有Map Task运行完成之前,已经启动的Reduce Task将始终处于Shuffle阶段,此时它们不断从已经完成的Map Task上远程拷贝中间处理结果,由于随着时间推移,不断会有新的Map Task运行完成,因此Reduce Task会一直处于“等待—拷贝—等待—拷贝……”的状态。待所有Map Task运行完成后,Reduce Task才可能将结果全部拷贝过来,这时候才能够进一步调用用户编写的reduce()函数处理数据。从以上Reduce Task内部运行流程分析可知,Shuffle阶段会带来两个问题:slot Hoarding和资源利用率低下。
(1)Slot Hoarding现象
Slot Hoarding是一种资源囤积现象,具体表现是:对于任意一个MapReduce作业而言,在所有Map Task运行完成之前,已经启动的Reduce Task将一直占用着slot不释放。Slot Hoarding可能会导致一些作业产生饥饿现象。下面给出一个例子进行说明。
【实例】如图8-26所示,整个集群中有三个作业,分别是job1、job2和job3,其中,job1的Map Task数目非常多,而其他两个作业的Map Task相对较少。在t0时刻,job1和job2的Reduce Task开始被调度;在t3时刻,job2的所有Map Task运行完成,不久之后(t3'时刻),job2的第一批Reduce Task运行完成;在t4'时刻,job2所有Reduce Task运行完成;在t4时刻,job3的Map Task开始运行并在t7时刻运行完成,但由于此时所有Reduce slot均被job1占用着,因此,除非job1的所有Map Task运行完成,否则job3的Reduce Task永远不可能得到调度。
(2)资源利用率低下
从资源利用率角度看,为了保证较高的系统资源利用率,所有Task都应充分使用一个slot所隐含的资源,包括内存、CPU、I/O等资源。然而,对单个Reduce Task而言,在整个运行过程中,它的资源利用率很不均衡,总体上看,刚开始它主要使用I/O资源(Shuffle阶段),之后主要使用CPU资源(Reduce阶段)。如图8-27所示,t4时刻之前,所有已经启动的Reduce Task处于Shuffle阶段,此时主要使用网络I/O和磁盘I/O资源,而在t4时刻之后,所有Map Task运行完成,则第一批Reduce Task逐渐开始进入Reduce阶段,此时主要消耗CPU资源。由此可见,Reduce Task运行过程中使用的资源依次以I/O、CPU为主,并没有重叠使用这两种资源,这使得系统整体资源利用率低下。
经过以上分析可知,I/O密集型的数据拷贝(Shuffle阶段)和CPU密集型的数据计算(Reduce阶段)紧耦合在一起是导致“Slot Hoarding”现象和系统资源利用率低下的主要原因。为了解决该问题,一种可行的解决方案是将Shuffle阶段从Reduce Task中分离出来,当前主要有以下两种具体的实现方案。
Copy-Compute Splitting:这是Berkeley的一篇论文提出的解决方案。该方案从逻辑上将Reduce Task拆分成“Copy Task”和“Compute Task”,其中,Copy Task用于数据拷贝,而Compute Task用于数据计算(调用用户编写的reduce()函数处理数据)。当一个Copy Task运行完成后,它会触发一个Compute Task进行数据计算,同时另外一个Copy Task将被启动拷贝另外的数据,从而实现I/O和CPU资源重叠使用。
将Shuffle阶段变为独立的服务:将Shuffle阶段从Reduce Task处理逻辑中出来变成为一个独立的服务,不再让其占用Reduce slot,这样也可达到I/O和CPU资源重叠使用的目的。“百度”曾采用了这一方案。
小结
本文将Map Task分解成Read、Map、Collect、Spill和Combine五个阶段,并详细介绍了后三个阶段:map()函数处理完结果后,Map Task会将处理结果存放到一个内存缓冲区中(Collect阶段),待缓冲区使用率达到一定阈值后,再将数据溢写到磁盘上(Spill阶段),而当所有数据处理完后,Map Task会将磁盘上所有文件合并成一个大文件(Combine阶段)。这几个阶段形成的流水线如图8-28所示。
本章将Reduce Task分解成Shuffle、Merge、Sort、Reduce和Write五个阶段,且重点介绍了前三个阶段:Reduce Task首先进入Shuffle阶段,在该阶段中,它会启动若干个线程,从各个完成的Map Task上拷贝数据,并将数据放到磁盘上或者内存中,待文件数目超过一定阈值后进行一次合并(Merge阶段),当所有数据拷贝完成后,再对所有数据进行一次排序(Sort阶段),并将key相同的记录分组依次交给reduce()函数处理。这几个阶段形成的流水线如图8-29所示。
Hadoop MapReduce shuffle的特性:
Reducer从Map端拉取属于自己Partition的数据时,该Partition的数据已经在Map端排好序。Reducer将属于它的所有的partition拉取过去后,进行Reducer端的归并排序(归并排序的原因是Reducer会从多个Mapper拉取相应的Partition,Reducer需要将所有这些Partition进行排序)
如果客户端定义了Combiner,那么在数据在排好序后,会调用CombinerClass对数据已经combine,然后才spill到磁盘。这就是说Sort操作在Combine操作之前执行,而Partititon操作在Sort之前执行,也就是Parttion->Sort->Combine的过程
reducer如何得知map已经产生了一个分区的输出文件?在Hadoop2中,mapper直接通知ApplicationMaster。在Hadoop1中,mapper通知TaskTracker,任务已经执行完成,而TaskTracker则通知JobTracker,那么JobTracker则会通知Reducer已经有Mapper任务执行完成并且数据的位置在什么地方(会通知吗?难道不是自己去TaskTracker查询?)(此处可见,JobTracker确实承担了很多的职责)
reducer拉取分区数据后,如果拉过来的数据量较小,那么直接加载到内存;如果较大,则存放到磁盘上。这跟Mapper端的处理过程类似,此时Reducer的内存大小是50M,随着拉取的数据越来越多,内存容不下,Reducer开启Spill到磁盘操作
每个partition经过Map后得到一个排序的文件,那么这个文件中的数据只被一个Reducer消费还是被所有的Reducer消费?
是被所有的Reducer消费,也就是说,一个Map输出文件包含了很多个Partition,Reducer只关心属于自己的Partition。比如一个Map产生的最终输出文件包含了3个Partition,而每个Partition由对应的reducer进行消费。
Mapreduce中Map与Reduce任务的个数
1、Map任务的个数
读取数据产生多少个Mapper??
Mapper数据过大的话,会产生大量的小文件,过多的Mapper创建和初始化都会消耗大量的硬件资源,Mapper数太小,并发度过小,Job执行时间过长,无法充分利用分布式硬件资源
Mapper数量由什么决定??
1)输入文件数目(2)输入文件的大小(3)配置参数 这三个因素决定的。
输入的目录中文件的数量决定多少个map会被运行起来,应用针对每一个分片运行一个map,一般而言,对于每一个输入的文件会有一个map split。如果输入文件太大,超过了hdfs块的大小(128M)那么对于同一个输入文件我们会有多余2个的map运行起来。
涉及参数:
mapreduce.input.fileinputformat.split.minsize //启动map最小的split size大小,默认0
mapreduce.input.fileinputformat.split.maxsize //启动map最大的split size大小,默认256M
dfs.block.size//block块大小,默认128M
计算公式:splitSize = Math.max(minSize, Math.min(maxSize, blockSize))
下面是FileInputFormat class 的getSplits()的伪代码:
num_splits = 0
for each input file f:
remaining = f.length
while remaining / split_size > split_slope:
num_splits += 1
remaining -= split_size
where:
split_slope = 1.1 分割斜率
split_size =~ dfs.blocksize 分割大小约等于hdfs块大小
会有一个比例进行运算来进行切片,为了减少资源的浪费
例如一个文件大小为260M,在进行MapReduce运算时,会首先使用260M/128M,得出的结果和1.1进行比较
大于则切分出一个128M作为一个分片,剩余132M,再次除以128,得到结果为1.03,小于1.1
则将132作为一个切片,即最终260M被切分为两个切片进行处理,而非3个切片。
2、reduce任务的个数
Reduce任务是一个数据聚合的步骤,数量默认为1。而使用过多的Reduce任务则意味着复杂的shuffle,并使输出文件的数量激增。
一个job的ReduceTasks数量是通过mapreduce.job.reduces参数设置
也可以通过编程的方式,调用Job对象的setNumReduceTasks()方法来设置
一个节点Reduce任务数量上限由mapreduce.tasktracker.reduce.tasks.maximum设置(默认2)。
可以采用以下探试法来决定Reduce任务的合理数量:
1.每个reducer都可以在Map任务完成后立即执行:
0.95 * (节点数量 * mapreduce.tasktracker.reduce.tasks.maximum)
2.较快的节点在完成第一个Reduce任务后,马上执行第二个:
1.75 * (节点数量 * mapreduce.tasktracker.reduce.tasks.maximum)