MapReduce框架结构##
MapReduce是一个用于大规模数据处理的分布式计算模型
MapReduce模型主要有Mapper和Reducer两个抽象类.
Mapper端主要负责对数据的分析处理,最终转化为Key-value的数据结构
Reducer端主要是获取Mapper出来的结果,对结果进行统计
MapReduce实现存储的均衡,未实现计算的均衡
MapReduce框架组成
注意:TaskTracker都需要运行在HDFS的DataNode上
Mapper和Reducer
运行于Hadoop的MapReduce应用程序最基本的组成部分包括:一个Mapper抽象类和一个Reducer抽象类,以及创建JobConf的执行程序,在一些应用中还可以包括Combiner类,Combiner实际也是Reducer的实现
JobTracker
JobTracker是一个master服务,软件启动后JobTracker接收Job,负责调度Job的每一个子任务Task运行于TaskTracker上,并监控它们,如果发现有失败的Task就重新运行它。
一般情况下应该把JobTracker部署在单独的机器上
负责任务的分发和监控
TaskTracker
运行在多个节点上的slave服务,TaskTracker主动与JobTracker通信(与DataNode和NameNode相似,通过心跳来实现)接收作业
负责直接执行每一个任务
JobClient
每一个job都会在用户端通过JobClient类将应用程序以及配置参数Configuration打包成JAR文件存储在HDFS,并把路径提交到JobTracker的master服务,然后由master创建每一个Task(即Map Task和Reduce Task)将它们分发到各个TaskTracker服务中去执行
JobInProgress
JobClient提交Job后,JobTracker会创建一个JobInProgress来跟踪和调度这个Job,并把它添加到Job队列里。
JobInProgress会根据提交的任务JAR中定义的输入数据集(已分解成FileSplit)创建对应的一批TaskInProgress用户监控和调度MapTask,同时创建指定数目的TaskInProgress用于监控和调度ReduceTask,默认为1个ReduceTask
TaskInProgress
JobTracker启动任务时通过每一个TaskInProgress来运行Task,这时会把Task对象(即MapTask和ReduceTask)序列化写入相应的TaskTracker服务中,TaskTracker收到后会创建对应的TaskInProgress(此TaskInProgress实现非JobTracker中使用的TaskInProgress管理,通过TaskRunner对象来运行)
TaskRunner会自动装载任务JAR文件并设置好环境变量后,启动一个独立的Java Child进程来执行Task,即MapTask或者ReduceTask,但它们不一定运行在同一个TaskTracker中
MapTask和ReduceTask
一个完整的Job会自动依次执行Mapper、Combiner(在JobConf指定Combiner时执行)和Reducer,其中Mapper和Combiner是由MapTask调用执行的,Reducer则由ReduceTask调用
Combiner实际也是Reducer接口类的实现。
MapReduce运行原理
一个MapReduce作业(Job)通常会把输入的数据集切分为若干独立的数据块,由Map任务以完全并行的方式处理它们。框架会对map函数的输出先进行排序,然后把结果输入给Reduce任务
通常,MapReduce框架和分布式文件系统是运行在一组相同的节点上的,也就是说,计算节点和存储节点通常在一起的。
作业的提交
JobClient的runJob()方法用于新建JobClient实例并调用其submitJob()方法,这种便捷方式提交作业后,runJob()每秒轮询作业的进度,如果发现自上次上报后的信息有改动,便把进度报告输出到控制台。
Hadoop运行MapReduce作业的工作原理如图所示:
JobClient的submitJob()方法实现作业提交过程如下:
1.向JobTracker请求一个新的作业ID(通过JobTracker的getNewJobId()获取)
2.检查作业的输出说明。例如,如果没有指定输出目录或者它已经存在,作业就不会被提交,并将错误返回给MapReduce程序
3.计算作业的输出划分.如果划分无法计算,例如,因为输入路径不存在,作业就不会被提交,并将错误返回给MapReduce程序
4.将运行作业所需要的资源(包括作业的JAR文件、配置文件和计算所得的输入划分)复制到一个以作业ID命名的目录中JobTracker的文件系统中。作业JAR的副本较多(由mapred.submit.replication属性控制.默认为10),因此在TaskTracker运行作业任务时,集群能为它们提供许多副本进行访问
5.调用JobTracker的submitJob()方法,告诉JobTracker作业准备执行
6.JobTracker接收到对其submitJob()方法调用后,会把此调用放入一个内部队列中,交由作业调度器进行调度,并对其进行初始化。初始化包括创建一个代表该正在运行的作业对象,它封装任务和记录信息,以便跟踪任务的状态和进程
7.要创建运行任务列表,作业调度器首先从共享文件系统中获取JobClient已经计算好的输入划分信息,然后为每个划分创建一个Map任务。创建的Reduce任务的数量由JobConf的mapred.reduce.tasks属性决定,它是用setNumReduceTasks()方法设定的。然后调度器便创建指定个数的Reduce来运行任务。
8.TaskTracker执行一个简单的循环,定期发送心跳方法调用JobTracker。作业心跳方法调用指TaskTracker会向JobTracker汇报当前的状态,如果正常,JobTracker会为它分配一个任务,并使用心跳方法的返回值与TaskTracker进行通信
现在TaskTracker已经被分配了任务,下面是运行任务步骤
1:本地化作业的JAR文件,将它从共享文件系统复制到TaskTracker所在文件系统。同时,将应用程序所需要的全部文件从分布式缓存复制到本地磁盘
2:为任务新建一个本地工作目录,并把JAR文件中的内容解压到这个文件夹下
3:新建一个TaskRunner实例来运行任务
TaskRunner启动一个新的Java虚拟机来运行每个任务,使得用户执行任务启动map和reduce函数的任何缺陷都不会影响TaskTracker。但在不同的任务之间重用JVM还是可能的。
作业初始化(map分发策略优化)
Job初始化过程主要是在JobTracker建立一个slave node对Task的映射模型,其他都是附属工作。
首先需要知道的是Task是Job的基本单元,由JobTracker分发到TaskTracker来执行。Task分为以下两类:
Map Task:处理输入数据,它就应该是输入数据、Job相关信息等组成的对象
Reduce Task:汇总Map Task的输出结果,最后生成Job的输出,它也应是Job相关信息的组成
Job将所有输入数据组装成逻辑分片,这些逻辑分片只是在HDFS上物理数据Block的索引以及存储信息。
Map Task依赖于这些信息来决定将Task分发到哪些TaskTracker上。JobTracker可以取到Job相关的metadata信息,然后由这些信息决定如何分发Task,这些分片的相关信息就存放在特定的目录下,jobTracker通过JobId可以访问到
Reduce Task不管在哪个TaskTracker上执行,都得从其他那些执行Map Task的TaskTracker上拉取数据,所以对它的分发JobTracker不需要准备什么,只要在合适的时候放到某台TaskTracker上执行即可
JobTracker主要还是关注Map Task的准备工作(Reduce Task并不是从所有Map Task拉取临时数据。如果有多个Reduce Task,每个Reduce Task只是拉取一部分Map Task的临时数据)
Map Task的执行效率依赖于读取输入数据的效率。
根据数据所处的位置与TaskTracker的距离,有三种本地数据级别
node-local 输入分片就在TaskTracker本地
rack-local 输入分片在TaskTracker所在Rack的其他节点上
off-switch 输入分片在其他的Rack内
>JobTracker在Task分发时应充分考虑本地数据级别。
分发策略对job执行效率的影响很大程度是如何优化Map Task的本地数据
JobTracker可以从Job的metadata中得到并维护这样一种映射关系:
job split--->HDFS Block && slave node
这种映射关系就是生成Map Task的基础。有多少个Split,就会有多少个Map Task
响应心跳而选择Map Task 的处理步骤如下所示:
1:根据TaskTracker的机器,查看JobTracker中是否存在一个Map Task,它关联的Block(假设一个Block划分为一个Split)存储在 TaskTracker的本地磁盘上,那么就优先执行这个Map Task
2:如果没有1可选的Map Task,那么查看是否有Map关联的Block在TaskTracker所在的Rack内
3:如果上面两步都没有选到某个Map Task,那么就根据情况看是否执行跨Rack的Task或其他推测式执行Task
>当用户开启Task推测式执行,推测式执行就会发生在JobTracker意识到某个Task执行效率低的时候,尽量要让推测式Task是node local级别的。
任务的分配
每个TaskTracker定期向JobTracker发送心跳信息,心跳信息包含TaskTracker的状态,是否可以接收新的任务.
JobTracker以此来决定将任务分配给谁(仍然使用心跳的返回值与TaskTracker通信).
每个TaskTracker会有固定数量的任务槽(slot)来处理Map和Reduce(表示TaskTracker可以同时运行两个Map和Reduce),由机器内核的数量和内存大小来决定。
jobTracker会先将TaskTracker的Map槽填满,然后分配Reduce任务到TaskTracker
JobTracker选择哪个TaskTracker来运行Map任务需要考虑网络位置,它会选择一个离输入分片较近的TaskTracker,优先级是数据本地化(data-local) ,然后再到机架本地化(rack-local)
任务的执行
TaskTracker分配到一个任务后,首先从HDFS中把作业的JAR文件复制到TaskTracker所在的本地文件系统(JAR本地化用来启动JVM).
同时将应用程序所需要的全部文件从分布式缓存复制到本地磁盘上。
接下来TaskTracker为任务新建一个本地工作目录work,并把JAR文件的内容解压到这个文件夹下
TaskTracker新建一个TaskRunner实例来运行该任务.TaskRunner启动一个新的JVM来运行每个任务,以便客户的MapReduce不会影响TaskTracker守护进程。
但在不同任务之间重用JVM还是可能的。
进度和状态的更新
一个作业和每个任务都有一个状态信息,包括作业或任务的运行状态(running,successful,failed)、Map和Reduce的进度、计数器值、状态消息和描述等
这些信息通过一定的时间间隔由Child JVM-->TaskTracker-->JobTracker汇聚。
JobTracker将产生一个声明所有运行作业及其任务状态的全局视图
mapreduce的进度组成
MapReduce构成的所有操作如下:
读入一条输入记录(在Mapper或Reducer中)
写入一条输入记录(在Mapper或Reducer中)
在一个Context中设置状态描述
增加计数器Counter
调用progress()方法
任务完成
当JobTracker收到作业最后一个任务已完成的通知后,便把作业的状态设置成"成功”
MapReduce容错
任务失败
最常见的是Map或Reduce任务的失败,发生Map或Reduce失败的时候,子任务JVM进程会在退出之前向其上一级TaskTracker发送错误报告
另一个错误情况就是子进程JVM突然退出,可能由JVM的bug导致,从而导致MapReduce用户代码执行失败
还有一种情况,如果超时设置成0将关闭超时检测,所有长时间运行的任务永远不会被标记为failed
TaskTracker失败
当TaskTracker停止或者很少向JobTracker发送心跳,JobTracker会注意到此TaskTracker发送心跳的情况,从而将此TaskTracker从等待任务调度的TaskTracker池中移除,JobTracker会安排此TaskTracker上一个成功运行的Map任务返回
下面介绍两种TaskTracker失败的情况
1:如果它们属于未完成的作业,Reduce阶段无法获取本地Map输出的文件结果,任务都需要重新调度和执行,只要是Map阶段失败必然是重新执行这个任务
2:如果是Reduce阶段,自然是执行未完成的Reduce任务,因为Reduce只要执行完就会把输出写入到HDFS上
JobTracker失败
JobTracker失败应该说是最严重的失败方式,而且在Hadoop中存在单点故障的情况下是相当严重的,因为这种情况下作业最终失败.
可以通过启动多个JobTracker,在这种情况下只运行一个主JobTracker。使用Zookeeper作为JobTracker的协调机制来决定哪一个是主JobTracker
子任务失败
Map任务和Reduce任务失败的三种情况:
1:当Map或者Reduce子任务中的代码抛出异常,JVM进程会在退出之前向TaskTracker进程发送错误报告,TaskTracker会将此(任务尝试) task attempt标记为failed状态,释放一个槽以便运行另外一个任务
2:对于流任务,如果流进程以非零退出代码退出执行,会标记为failed
3:子JVM突然退出,即JVM错误,这时TaskTracker会注意到进程已经退出,标记为failed
TaskTracker将子任务标记为失败后会将自身计数器减1,为了JobTracker申请新的任务,也是通过心跳告知JobTracker本地的一个任务尝试失败
JobTracker接到任务失败的通知后,会将其重新加入到调度队列重新分配给其他的TaskTracker执行(避免将失败的任务分配给执行失败的TaskTracker),但是这个尝试也是有次数限制的,默认情况下,任务尝试4次仍然没有完成,就不会再重试(JobTracker会将其标记为Killed),此时整个作业就执行失败了
任务失败反复次数的处理方法
当Map Task执行失败后会重试,超过重试次数(由mapred.map.max.attempts指定,默认认为4)整个Job会失败
Hadoop提供配置参数mapred.max.map.failures.percent解决这个问题
如果一个Job有200个Map Task,该参数设置为5,则单个Job最多允许10个Map Task(200*5%=10)失败
把该配置放到mapred-site.xml中即可
Reduce Task也有类似配置mapred.max.reduce.failures.percent属性
Shuffle阶段和sort阶段
当Map开始产生输出时,它并不是简单地把数据写到磁盘上,因为频繁的磁盘操作会导致性能严重下降。它的处理过程更复杂,数据首先写到内存中的一个缓冲区,并做一些预排序,以提升效率
map端的shuffle
每个Map任务都有一个用来写入输出数据的循环内部缓冲区。这缓冲区默认大小是100MB,可以通过io.sort.mb属性来设置具体大小。
当缓冲区的数据量达到一个特定阀值时(io.sort.mb*io.srot.spill.percent,其中io,sort,spill.percent默认是0.8)系统将会启动一个后台线程把缓冲区中的内容spill到磁盘
在spill过程中,Map的输出将会继续写入到缓冲区,但如果缓冲区已满,Map就会被阻塞直到spill完成。spill线程在把缓冲区的数据写到磁盘前,会对它进行一个二次快速排序
首先根据数据所属的Partition排序,然后每个partition中再按Key排序.输出包括一个索引文件和数据文件。
如果设定了Combiner,将在排序输出的基础上运行
Combiner就是一个Mini Reducer.它在执行Map任务的节点本身运行,先对Map的输出做一次简单的Reduce,使得Map的输出更紧凑
spill文件保存在由mapred.local.dir指定的目录中,map任务结束后删除
每当内存中的数据达到spill阀值的时候,都会产生一个新的spill文件,所以在map任务写完它的最后一个输出记录时,可能会有多个spill文件。在Map任务完成前,所有的spill文件将会被归并排序为一个索引文件和数据文件,这是一个多路归并过程,最大归并路数由io.sort.factor控制(默认是10)
如果设定了Combiner,并且spill文件的数量至少是3(由min.num.spills.for.combine属性控制),Combiner将在输出文件被写入磁盘前运行以压缩数据
默认输出是不被压缩的,但可以很简单的设置mapred.compress.map.output为true启用该功能
压缩所使用的库由mapred.map.output.compression.codec来设定
当spill文件归并完毕后,map将删除所有临时spill文件,并告知TaskTracker任务已完成。
Reduce端通过HTTP获取对应的数据
用来传输partitions数据的工作线程个数由tasktracker.http.threads控制,这个设定是针对每一个TaskTracker的,并不是单个Map,默认值是40
注意:Map输出总是写到本地磁盘,但Reduce输出不是,一般是写到HDFS
Reduce任务的输入数据分布在集群内的多个Map任务的输出中,Map任务可能会在不同的时间内完成,只要有其中的一个map任务完成,Reduce任务就开始复制它的 输出,这个阶段称为Cope阶段
Reduce任务拥有多个cope线程,可以并行的获取Map输出,可以通过设定mapred.reduce.parallel.copies来改变线程数,默认是5
reduce端的shuffle
Recduce端怎么知道从哪些TaskTracker中获取Map端输出呢?
当Map任务完成之后,会通知它们的父TaskTracker,告知状态更新,然后TaskTracker再转告JobTracker。
这些通知信息是通过心跳通信机制传输的
因此针对一个特定的作业,JobTracker知道Map输出与TaskTracker的映射关系
Reduce端中有一个线程会间歇地向JobTracker询问Map输出的地址,直到把所有的数据都获取到。
在Redce取走了Map输出之后,TaskTracker不会立即删除这些数据,因为Reduce可能会失败。它们会在整个作业完成后,JobTracker告知它们要删除的时候才去删除
如果map输出足够小,它们会被复制到Reduce TaskTracker的内存中(缓冲区的大小由mapred.job.shuffle.input.buffer.percent控制,制定了用于此目的的堆内存的百分比);如果缓冲区空间不足,会被复制到磁盘上。当被内存中的缓冲去用量达到一定的比例阀值(由mapred.job.shuffle.merge.threshold)控制,或者达到了Map输出的阀值大小(由mapred.inmem.merge.threshold控制),缓冲区中的数据会被归并然后spill到磁盘
下面分段描述Reduce端的shuffle细节
1:Copy过程
简单地拉取数据。Reduce进程启动一些数据copy线程(fetcher),通过HTTP方式请求Map Task所在的TaskTracker获取Map Task的输出文件
因为Map Task早已结束,这些文件就归TaskTracker管理在本地磁盘上
2:Merge阶段
这里的merge如Map端的Merge动作,只是数组中存放的是不同的Map端的数值
复制来的数据会先放入内存缓冲区,它是基于JVM的heap size设置,因为Shuffle阶段Reduce不运行,所以应该把绝大部分的内存都给shuffle用
Merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘
默认情况下第一种形式不启用,第二种merge方式一直在运行,直到没有Map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件
3:Reducer的输入文件
不断地Merge操作后,最后会生成一个“最终文件”,因为该文件可能存在于磁盘之上,也可能存在内存中
Yarn介绍
Yarn架构
Yarn最基本的设计思想是将JobTracker的两个主要功能,即资源管理和作业调度、监控分成两个独立的进程。
在该解决方案中包含两个组件:
全局的ResourceManager(RM)、与每一个应用相关的ApplicationManager(AM)
这里,“应用”指的是一个单独的MapReduce 作业或DAG作业。RM与NM(NodeManager)共同组成整个计算框架。
RM是系统中将资源分配给各个应用的最终决策者
AM实际上是一个具体的框架库,它的任务是与RM协商获取应用所需资源和与NM合作,以完成执行和监控Task的任务
ResourceManager
ResourceManager有点类似于JobTracker,它有两个主要的组件:调度器(Scheduler)和应用程序管理器(ApplicationManager)
Scheduler负责分配资源到各个正在运行的应用程序中.
调度器不执行监控和应用程序,从这个意义上来,它是纯粹的调度器。此外,它也不保证重启失败的任务。
调度器是基于资源请求来执行它的调度功能,它是基于资源容器的抽象概念
调度器支持可插入的策略
ApplicationManager
ApplicationManager负责接送提交的作业,协商第一个执行该任务的容器,并提供失败作业的重启。
NodeManager是每个节点的框架代理,它负责监控资源的使用情况,并报告给ResourceManager.
每个应用的ApplicationMaster负责与调度器谈判资源占用的Container数量,追踪状态和监控进程
NodeManager
NodeManager类似于TaskTracker,它负责启动应用程序Container(类似于JVM),监控Container的资源(cpu,内存,磁盘,网络等),并将信息上报给ResourceManager.调度器就是根据应用程序的Container进行调度的
Yarn工作流程
首先说的概念是"Application Submission Client",它负责将“Application”提交到Yarn的Resourcemanager.客户端通过ClientRMProtocol协议与ResourceManager联系,如果需要Client,会通过ClientRPProtocol::getNewApplication来获取新的ApplicationId,然后通过ClientRMProtocol::submitApplication将应用提交运行
Yarn上的ResourceManager会在一个获得的Container上启动ApplicationMaster.
ApplicationMaster通过AMRMProtocol协议与ResourceManager通信,首先ApplicationMaster需要将自身注册到ResourceManager.
ApplicationMaster为了完成交给它的任务,会通过AMRMProtocol::allocate申请Container.
如果获得了Container,ApplicationMaster会通过ContainerManager::startContainer和NodeManager联系为任务启动一个Container.
作为启动Container的一部分,ApplicationManager需要指定ContainerLauchContext
ContainerLauchhContext和ApplicationSubmissionContext相似,包括一些启动时需要的信息,如命令行命令,环境变量等
一旦任务完成,ApplicationManager会通过AMRProtocol::finishApplicationMaster通知ResourceManager任务完成。
同时Client可以通过查询ResourceManager来获取Application的状态信息,或者如果ApplicationMaster支持也可以直接从ApplicationMaster查询信息。如果需要,Client可以通过ClientRMProtocol::forceKillApplication来kill掉Application