批处理引擎MapReduce
MapReduce是一个经典的分布式批处理计算引擎,被广泛应用于搜索引擎索引构建、大规模数据处理等场景中,具有易于编程、良好的扩展性与容错性以及高吞吐率等特点。
它主要由两部分组成:编程模型和运行时环境。其中,编程模型为用户提供了非常易用的编程接口,用户只需像编写串行程序一样实现几个简单的函数即可实现一个分布式程序,而其他比较复杂的工作,如节点间的通信、节点失效、数据切分等,全部由MapReduce运行时环境完成,用户无需关心这些细节。
在本章中,我们将从产生背景、设计目标、编程模型和基本架构等方面对MapReduc引擎进行介绍。
编程思想
MapReduce模型是对大量分布式处理问题的总结和抽象,它的核心思想是分而治之,即将一个分布式计算过程拆解成两个阶段:
第一阶段:Map阶段,由多个可并行执行的Map Task构成,主要功能是,将待处理数据集按照数据量大小切分成等大的数据分片,每个分片交由一个任务处理。
第二阶段:Reduce阶段,由多个可并行执行的Reduce Task构成,主要功能是,对前一阶段中各任务产生的结果进行规约,得到最终结果。
MapReduce的出现,使得用户可以把主要精力放在设计数据处理算法上,至于其他的分布式问题,包括节点间的通信、节点失效、数据切分、任务并行化等,全部由MapReduce运行时环境完成,用户无需关心这些细节。
以前面的wordcount为例,用户只需编写map()和reduce()两个函数,即可完成分布式程序的设计,这两个函数作用如下:
❑ map()函数:获取给定文件中一行字符串,对其分词后,依次输出这些单词。
❑ reduce()函数:将相同的词聚集在一起,统计每个词出现的总频率,并将结果输出。
以上两个函数与“回调函数”类似,MapReduce框架将在合适的时机主动调用它们,并处理与之相关的数据切分、数据读取、任务并行化等复杂问题。
MapReduce编程组件
为了简化程序设计,MapReduce首先对数据进行了建模。MapReduce将待处理数据划分成若干个InputSplit(简称split),它是一个基本计算单位。考虑到HDFS以固定大小的block(默认是128MB)为基本单位存储数据,split与block存在一定的对应关系,具体如下图所示。
split是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等,它的划分方法完全受用户程序控制,默认情况下,每个split对应一个block。但需要注意的是,split的多少决定了map task的数目,因为每个split会交由一个map task处理。
数据在MapReduce引擎中是以<key, value>形式流动的:
首先,每个split中的数据会被转换成一系列<key, value>,交由用户的map()函数处理,该函数进一步产生另外一系列<key, value>,之后,经(按照key)排序分组后,交给用户编写的reduce()函数处理,最终产生结果。总结起来,MapReduce编程模型实际上是一种包含5个步骤的分布式计算方法:
- 迭代(iteration)遍历输入数据,并将之解析成<key, value>对。
- 将输入<key, value>对映射(map)成另外一些<key, value>对。
- 依据key对中间数据进行分组(grouping)。
- 以组为单位对数据进行归约(reduce)。
- 迭代(iteration)将最终产生的<key, value>保存到输出文件中。
MapReduce将计算过程分解成以上5个步骤带来的最大好处是组件化与并行化。
为了实现MapReduce编程模型,Hadoop设计了一系列对外编程接口,用户可通过实现这些接口完成应用程序的开发。
Hadoop MapReducer对外提供了5个可编程组件,分别是InputFormat、Mapper、Partitioner、Reducer和OutputFormat,其中Mapper和Reducer跟应用程序逻辑相关,因此必须由用户编写(一个MapReduce程序可以只有Mapper没有Reducer),至于其他几个组件,MapReduce引擎内置了默认实现,如果这些默认实现能够满足用户需求,则可以直接使用。
Mapper
Mapper中封装了应用程序的数据处理逻辑,为了简化接口,MapReduce要求所有存储在底层分布式文件系统上的数据均要解释成<key, value>的形式,并以迭代方式依次交给Mapper中的map函数处理,产生另外一些<key, value>。
在MapReduce中,key/value对象可能被写入磁盘,或者通过网络传输到不同机器上,因此它们必须是可序列化的。为简化用户开发工作量,MapReduce对常用的基本类型进行了封装,使其变得可序列化,包括IntWritable、FloatWritable、LongWritable、BytesWritable、Text等。用户可以通过继承Writable类实现自己的可序列化类。
Reducer
Reducer主要作用是,基于Mapper产生的结果进行规约操作,产生最终结果。Map阶段产生的数据,按照key分片后,被远程拷贝给不同的Reduce Task。Reduce Task按照key对其排序,进而产生一系列以key为划分单位的分组,它们迭代被Reducer函数处理,进而产生最终的<key, value>对。
用户编写完MapReduce程序后,按照一定的规则指定程序的输入和输出目录,并提交到Hadoop集群中。作业在Hadoop中执行过程如图所示,Hadoop会将输入数据切分成若干个split,并将每个split交给一个Map Task处理:Map Task以迭代方式从对应的split中解析出一系列<key, value>,并调用map()函数处理。待数据处理完后,Reduce Task将启动多线程远程拷贝各自对应的数据,然后使用基于排序的方法将key相同的数据聚集在一起,并调用reduce()函数处理,将结果输出到文件中。
InputFormat
InputFormat主要用于描述输入数据的格式,它提供以下两个功能:
❑ 数据切分:按照某个策略将输入数据切分成若干个split,以便确定Map Task个数以及对应的split。
❑ 为Mapper提供输入数据:给定某个split,能将其解析成一系列<key, value>对。为了方便用户编写MapReduce程序,Hadoop自带了一些针对数据库和文件的InputFormat实现。
Partitioner
Partitioner的作用是对Mapper产生的中间结果进行分片,以便将同一组的数据交给同一个Reducer处理,它直接影响Reduce阶段的负载均衡。
MapReduce默认采用了HashPartitioner,它实现了一种基于哈希值的分片方法,HashPartitioner能够将key相同的所有<key, value>交给同一个Reduce Task处理,适用于绝大部分应用场景,用户也可按照自己的需求定制Partitioner。
OutputFormat
OutputFormat主要用于描述输出数据的格式,它能够将用户提供key/value对写入特定格式的文件中。Hadoop自带了很多OutputFormat实现,它们与InputFormat实现相对应,所有基于文件的OutputFormat实现的基类为FileOutputFormat,并由此派生出一些基于文本文件格式、二进制文件格式的或者多输出的实现。
Combiner
除了前面讲的5个可编程组件,MapReduce还允许用户定制另外一个组件:Combiner,它是一个可选的性能优化组件,可看作Map端的local reducer,如图所示,它通常跟Reducer的逻辑是一样的,运行在Map Task中,主要作用是,对Mapper输出结果做一个局部聚集,以减少本地磁盘写入量和网络数据传输量,并减少Reducer计算压力。
MapReduce作业生命周期
MapReduce作业作为一种分布式应用程序,可直接运行在Hadoop资源管理系统YARN之上(MapReduce On YARN)。每个MapReduce应用程序由一个MRAppMaster以及一系列MapTask和ReduceTask构成,它们通过ResourceManager获得资源,并由NodeManager启动运行。
当用户向YARN中提交一个MapReduce应用程序后,YARN将分两个阶段运行该应用程序:第一个阶段是由ResourceManager启动MRAppMaster;第二个阶段是由MRAppMaster创建应用程序,为它申请资源,并监控它的整个运行过程,直到运行成功。
YARN的工作流程分为以下几个步骤:
- 用户向YARN集群提交应用程序,该应用程序包括以下配置信息:MRAppMaster所在jar包、启动MRAppMaster的命令及其资源需求(CPU、内存等)、用户程序jar包等。
- ResourceManager为该应用程序分配第一个Container,并与对应的NodeManager通信,要求它在这个Container中启动应用程序的MRAppMaster。
- MRAppMaster启动后,首先向ResourceManager注册(告之所在节点、端口号以及访问链接等),这样,用户可以直接通过ResourceManager查看应用程序的运行状态,之后,为内部Map Task和Reduce Task申请资源并运行它们,期间监控它们的运行状态,直到所有任务运行结束,即重复步骤4~7。
- MRAppMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。
- 一旦MRAppMaster申请到(部分)资源后,则通过一定的调度算法将资源分配给内部的任务,之后与对应的NodeManager通信,要求它启动这些任务。
- NodeManager为任务准备运行环境(包括环境变量、jar包、二进制程序等),并将任务执行命令写到一个shell脚本中,并通过运行该脚本启动任务。
- 启动的Map Task或Reduce Task通过RPC协议向MRAppMaster汇报自己的状态和进度,以让MRAppMaster随时掌握各个任务的运行状态,从而可以在任务失败时触发相应的容错机制。在应用程序运行过程中,用户可随时通过RPC向MRAppMaster查询应用程序的当前运行状态。
8.应用程序运行完成后,MRAppMaster通过RPC向ResourceManager注销,并关闭自己。
ResourceManager、NodeManager、MRAppMaster以及MapTask/ReduceTask管理关系如下图所示。ResourceManager为MRAppMaster分配资源,并告之NodeManager启动它,MRAppMaster启动后,会通过心跳维持与ResourceManager之间的联系;MRAppMaster负责为MapTask/ReduceTask申请资源,并通知NodeManager启动它们,MapTask/ReduceTask启动后,会通过心跳维持与MRAppMaster之间的联系,基于以上设计机制,接下来介绍MapReduce On YARN架构的容错性。
❑ YARN:YARN本身具有高度容错性
❑ MRAppMaster:MRAppMaster由ResourceManager管理,一旦MRAppMaster因故障挂掉,ResourceManager会重新为它分配资源,并启动之。重启后的MRAppMaster需借助上次运行时记录的信息恢复状态,包括未运行、正在运行和已运行完成的任务。
❑ MapTask/ReduceTask:任务由MRAppMaster管理,一旦MapTask/ReduceTask因故障挂掉或因程序bug阻塞住,MRAppMaster会为之重新申请资源并启动之。
MapTask与ReduceTask
Map Task可以分解成Read、Map、Collect、Spill和Combine五个阶段;
Reduce Task可以分解成Shuffle、Merge、Sort、Reduce和Write五个阶段。
在MapReduce计算框架中,一个应用程序被划成Map和Reduce两个计算阶段,它们分别由一个或者多个Map Task和Reduce Task组成。
其中,每个Map Task处理输入数据集合中的一片数据(split),产生若干数据片段,并将之写到本地磁盘上;
而Reduce Task则从每个MapTask上远程拷贝一个数据片段,经分组聚集和规约后,将结果写到HDFS中。
Map Task与Reduce Task之间的数据传输采用了pull模型。为了提高容错性,Map Task将中间计算结果存放到本地磁盘上,而Reduce Task则通过HTTP协议从各个Map Task端拉取(pull)相应的待处理数据。为了更好地支持大量Reduce Task并发从MapTask端拷贝数据,Hadoop采用了Netty作为高性能网络服务器。
MapTask详细流程
Map Task的整体计算流程如图所示,共分为5个阶段,分别是:
- Read阶段:Map Task通过InputFormat,从split中解析出一系列<key, value>。
- Map阶段:将解析出的<key, value>依次交给用户编写的map()函数处理,并产生一系列新的<key, value>。
- Collect阶段:在map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果,在该函数内部,它将<key, value>划分成若干个数据分片(通过调用Partitioner),并写入一个环形内存缓冲区中。
- Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
Map Task为何将处理结果写入本地磁盘?
该问题实际上包含两层含义,即处理结果为何不写入内存,或者直接发送给Reduce Task?
首先,Map Task不能够将数据写入内存,因为一个集群中可能会同时运行多个作业,且每个作业可能分多批运行Map Task,显然,将计算结果直接写入内存会耗光机器的内存;
其次,MapReduce采用的是动态调度策略,这意味着,一开始只有Map Task执行,而Reduce Task则处于未调度状态,因此无法将Map Task计算结果直接发送给Reduce Task。
将Map Task写入本地磁盘,使得Reduce Task执行失败时可直接从磁盘上再次读取各个Map Task的结果,而无需让所有Map Task重新执行。
总之,MapTask将处理结果写入本地磁盘主要目的是减少内存存储压力和容错。
- Combine(整合,合并)阶段:当所有数据处理完成后,Map Task对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
每个Map Task为何最终只产生一个数据文件?
如果每个Map Task产生多个数据文件(比如每个Map Task为每个Reduce Task产生一个文件),则会生成大量中间小文件,这将大大降低文件读取性能,并严重影响系统扩展性(M个Map Task和R个Reduce Task可能产生M*R个小文件)。
ReduceTask详细流程
Reduce Task的整体计算流程如图所示,共分为5个阶段,分别是:
- Shuffle阶段:也称为Copy阶段,Reduce Task从各个Map Task上远程拷贝一片数据,并根据数据分片大小采取不同操作,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
- Merge阶段:在远程拷贝数据的同时,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用量过多或磁盘上文件数目过多。
- Sort阶段:按照MapReduce语义,用户编写的reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略,由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此Reduce Task只需对所有数据进行一次归并排序即可。
- Reduce阶段:在该阶段中,Reduce Task将每组数据依次交给用户编写的reduce()函数处理。
- Write阶段:将reduce()函数输出结果写到HDFS上。
小结
MapReduce最初源自于Google,主要被用于搜索引擎索引构建,之后在Hadoop中得到开源实现。随着开源社区的推进和发展,已经成为一个经典的分布式批处理计算引擎,被广泛应用于搜索引擎索引构建、大规模数据处理等场景中,具有易于编程、良好的扩展性与容错性以及高吞吐率等特点。它为用户提供了非常易用的编程接口,用户只需像编写串行程序一样实现几个简单的函数即可实现一个分布式程序,而其他比较复杂的工作,如节点间的通信、节点失效、数据切分等,全部由MapReduce运行时环境完成,用户无需关心这些细节。MapReduce为用户提供了InputFormat、Mapper、Partitioner、Reducer和OutputFormat等可编程组件,用户可通过实现这些组件完成分布式程序设计。为了方便非Java程序员编写程序,MapReduce提供了Hadoop Streaming工具,用户可使用任意语言开发Mapper和Reducer,大大提高了程序开发效率。