思考问题
MapReduce总结
MapReduce
MapReduce的定义
MapReduce是一种编程模型, 用于大规模数据集(大于1TB)的并行运算。它将分布式磁盘读写的问题进行抽象,并转换为对一个数据集(由键/值对组成)的计算,该计算由 map 和 reduce 两部分组成。MapReduce编程模型流行的三个技术方面的原因:
- MapReduce采用无共享大规模集群系统。集群系统具有良好的性价比和可伸缩性。
- MapReduce模型简单、易于理解、易于使用。它不仅用于处理大规模数据,而且能够将很多繁琐的细节隐藏起来(比如,自动化并行、负载均衡和灾备管理等),极大地简化了程序员的开发工作,而且大量数据处理问题,包括很多机器学习和数据挖掘算法,都可以使用MapReduce实现。
- 虽然基本的MapReduce模型只提供一个过程性的编程接口,但在海量数据环境、需要保证可伸缩性的前提下,通过使用合适的查询优化和索引技术,MapReduce仍能够提供很好的数据处理性能。
-
MapReduce和关系型数据库的比较
MapReduce和关系型数据库之间的另一个区别在于他们所操作的数据集的结构化的程度,这在第一篇的文章已经讨论过。
需要强调的是,MapReduce输入的键和值并不是数据固有的属性,而是由分析数据的人员来选择的。 MapReduce的特点:
- 软件框架
- 并行处理
- 可靠且容错
- 大规模集群
- 海量数据集
- Map 和 Reduce
Hadoop框架使用Mapper将数据处理成一个个的<key,value>键值对,在网络节点间对其进行整理(shuffle),然后使用Reducer处理数据并进行最终输出。
-
Map 映射的意思,简单来说就是把一个输入映射为一组(多个)全新的数据,而不去改变原始的数据。
Mapper负责的是分。把复杂的任务分解为若干个“简单的任务”来处理。“简单的任务”包含三层含义:- 数据或计算的规模相对原任务要大大缩小
- 就近计算原则,任务会分配到存放着所需数据的节点上进行计算
- 这些小任务可以并行计算彼此间几乎没有依赖关系
-
Reduce 化简的意思,就是把通过Map得到的一组数据经过某些方法(化简)归一成想要的输出值。
所以说,MapReduce的思想就是:分而治之
MapReduce的工作机制
下面,我们来剖析MapReduce作业的运行机制
- 作业的提交:客户端通过JobClient.runJob()来提交一个作业到jobtracker,JobClient程序逻辑如下:
- 向Jobtracker请求一个新的job id (JobTracker.getNewJobId());
- 检查作业的输出说明,如已存在抛错误给客户端;计算作业的输入分片;
- 将运行作业所需要的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中以jobid命名的目录下。作业jar副本较多(mapred.submit.replication = 10);
- 告知jobtracker作业准备执行 (submit job)。
简单来说呢,就是
- 通过JobClient提交,与JobTracker通信得到一个jar的存储路径和JobId
- 检查输入输出的路径
- 计算分片的信息
- 将作于所需要的资源(jar,配置文件,计算所得的输入分片)赋值到以作业ID命名的HDFS上
- 告知JobTracker作业准备执行
- 作业的初始化
- job tracker接收到对其submitJob()方法的调用后,将其放入内部队列,交由job scheduler进行调度,并对其进行初始化,包括创建一个正在运行作业的对象(封装任务和记录信息)。
- 为了创建任务运行列表,job scheduler首先从共享文件系统中获取JobClient已计算好的输入分片信息,然后为每个分片创建一个map任务;创建的reduce任务数量由JobConf的mapred.reduce.task属性决定,schedule创建相应数量的reduce任务。任务此时被指定ID。
简单来说呢,就是
- JobTracker配置好Job需要的资源后,JobTracker就会初始化作业
- 初始化主要做的是将Job放入一个内部的队列,让配置好的作业调度器能调度到这个作业,作业调度器会初始化这个job。
- 初始化就是创建一个正在运行的job对象(封装任务和记录信息),以便JobTracker跟踪job的状态和进程。
- 初始化完毕后,作业调度器会获取输入分片信息(input split),每个分片创建一个map任务。
- 任务的分配
- jobtacker应该先选择哪个job来运行?这个由job scheduler来决定
- jobtracker如何选择tasktracker来运行选中作业的任务呢?看下面
- 每个tasktracker定期发送心跳给jobtracker,告知自己还活着,是否可以接受新的任务。
jobtracker以此来决定将任务分配给谁(仍然使用心跳的返回值与tasktracker通信)。
每个tasktracker会有固定数量的任务槽来处理map和reduce(比如2,表示tasktracker可以同时运行两个map和reduce),由机器内核的数量和内存大小来决定。job tracker会先将tasktracker的map槽填满,然后分配reduce任务到tasktracker。 - jobtracker选择哪个tasktracker来运行map任务需要考虑网络位置,它会选择一个离输入分片较近的tasktracker,优先级是数据本地化(data-local)–>机架本地化(rack-local)。
- 对于reduce任务,没有什么标准来选择哪个tasktracker,因为无法考虑数据的本地化。map的输出始终是需要经过整理(切分排序合并)后通过网络传输到reduce的,可能多个map的输出会切分出一部分送给一个reduce,所以reduce任务没有必要选择和map相同或最近的机器上。
- 任务的执行
- tasktracker分配到一个任务后,首先从HDFS中把作业的jar文件复制到tasktracker所在的本地文件系统(jar本地化用来启动JVM)。同时将应用程序所需要的全部文件从分布式缓存复制到本地磁盘。
- 接下来tasktracker为任务新建一个本地工作目录,并把jar文件的内容解压到这个文件夹下。
- tasktracker新建一个taskRunner实例来运行该任务。
TaskRunner启动一个新的JVM来运行每个任务,以便客户的map/reduce不会影响tasktracker守护进程。但在不同任务之间重用JVM还是可能的。
子进程通过umbilical接口与父进程进行通信。任务的子进程每隔几秒便告知父进程的进度,直到任务完成。
- 进度和状态的更新
- 一个作业和每个任务都有一个状态信息,包括:作业或任务的运行状态(running, successful, failed),map和reduce的进度,计数器值,状态消息或描述。
- task会定期向tasktracker汇报执行情况,tasktracker会定期收集所在集群上的所有task信息,并想JobTracker汇报.JobTracker会根据所有tasktracker汇报上来的信息进行汇总。
- 这些信息通过一定的时间间隔由child JVM –> task tracker –> job tracker汇聚。job tracker将产生一个表明所有运行作业及其任务状态的全局视图。你可以通过Web UI查看。同时JobClient通过每秒查询jobtracker来获得最新状态。
- 作业的完成
- JobTracker是在接收到最后一个任务完成后,才将任务标记为"成功"。并将数据结果写入到HDFS上。
- 作业的失败
- JobTracker失败:JobTracker失败这是最为严重的一种任务失败,失败机制--它是一个单节点故障,因此,作业注定失败。(hadoop2.0解决了)。
- tasktracker失败:tasktracker失败崩溃了会停止向jobt发送心跳信息,并且JobTracker会将tasktracker从等待的任务池中移除,将该任务转移到其他的地方执行.obTracker会将tasktracker加入到黑名单。
- task失败:map或reduce运行失败,会向tasktracker抛出异常,任务挂起.
MapReduce工作涉及到的4个对象
- 客户端(client):编写mapreduce程序,配置作业,提交作业,这就是程序员完成的工作。
- JobTracker:初始化作业,分配作业,与TaskTracker通信,协调整个作业的执行。
- TaskTracker:保持与JobTracker的通信,在分配的数据片段上执行Map或Reduce任务,TaskTracker和JobTracker的不同有个很重要的方面,就是在执行任务时候TaskTracker可以有n多个,JobTracker则只会有一个(JobTracker只能有一个就和hdfs里namenode一样存在单点故障,我会在后面的mapreduce的相关问题里讲到这个问题的)。
- HDFS:保存作业的数据、配置信息等等,最后的结果也是保存在hdfs上面
<small>jobtracker的单点故障:
jobtracker和hdfs的namenode一样也存在单点故障,
单点故障一直是hadoop被人诟病的大问题,
为什么hadoop的做的文件系统和mapreduce计算框架都是高容错的,但是最重要的管理节点的故障机制却如此不好,我认为主要是namenode和jobtracker在实际运行中都是在内存操作,而做到内存的容错就比较复杂了,只有当内存数据被持久化后容错才好做,namenode和jobtracker都可以备份自己持久化的文件,但是这个持久化都会有延迟,因此真的出故障,任然不能整体恢复,另外hadoop框架里包含zookeeper框架,zookeeper可以结合jobtracker,用几台机器同时部署jobtracker,保证一台出故障,有一台马上能补充上,不过这种方式也没法恢复正在跑的mapreduce任务。</small>
MapRedece作业的处理流程
按照时间顺序包括:
- 输入分片(input split)
- map阶段
- combiner阶段
- shuffle阶段和
- reduce阶段。
- 输入分片(input split):
在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务
输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组,输入分片(input split)往往和hdfs的block(块)关系很密切
<small>假如我们设定hdfs的块的大小是64mb,如果我们输入有三个文件,大小分别是3mb、65mb和127mb,那么mapreduce会把3mb文件分为一个输入分片(input split),65mb则是两个输入分片(input split)而127mb也是两个输入分片(input split)
即我们如果在map计算前做输入分片调整,例如合并小文件,那么就会有5个map任务将执行,而且每个map执行的数据大小不均,这个也是mapreduce优化计算的一个关键点。
</small> - Map阶段:
程序员编写好的map函数了,因此map函数效率相对好控制,而且一般map操作都是本地化操作也就是在数据存储节点上进行。 - Combiner阶段:
combiner阶段是程序员可以选择的,combiner其实也是一种reduce操作。
Combiner是一个本地化的reduce操作,它是map运算的后续操作,主要是在map计算出中间文件前做一个简单的合并重复key值的操作。
<small>例如我们对文件里的单词频率做统计,map计算时候如果碰到一个hadoop的单词就会记录为1,但是这篇文章里hadoop可能会出现n多次,那么map输出文件冗余就会很多,因此在reduce计算前对相同的key做一个合并操作,那么文件会变小,这样就提高了宽带的传输效率,毕竟hadoop计算力宽带资源往往是计算的瓶颈也是最为宝贵的资源,但是combiner操作是有风险的,使用它的原则是combiner的输入不会影响到reduce计算的最终输入,
例如:如果计算只是求总数,最大值,最小值可以使用combiner,但是做平均值计算使用combiner的话,最终的reduce计算结果就会出错。</small> - shuffle阶段:
将map的输出作为reduce的输入的过程就是shuffle了。 - reduce阶段:
和map函数一样也是程序员编写的,最终结果是存储在hdfs上的。
Combiner深入理解
在上述过程中,我们看到至少两个性能瓶颈:
(1)如果我们有10亿个数据,Mapper会生成10亿个键值对在网络间进行传输,但如果我们只是对数据求最大值,那么很明显的Mapper只需要输出它所知道的最大值即可。这样做不仅可以减轻网络压力,同样也可以大幅度提高程序效率。
总结:网络带宽严重被占降低程序效率;
(2)假设使用美国专利数据集中的国家一项来阐述数据倾斜这个定义,这样的数据远远不是一致性的或者说平衡分布的,由于大多数专利的国家都属于美国,这样不仅Mapper中的键值对、中间阶段(shuffle)的键值对等等,大多数的键值对最终会聚集于一个单一的Reducer之上,计算任务分配不够均衡,从而大大降低程序的性能。
总结:单一节点承载过重降低程序性能;
在MapReduce编程模型中,在Mapper和Reducer之间有一个非常重要的组件,它解决了上述的性能瓶颈问题,它就是Combiner。
① 与mapper和reducer不同的是,combiner没有默认的实现,需要显式的设置在conf中才有作用。
② 并不是所有的job都适用combiner,只有操作满足结合律的才可设置combiner。
combine操作类似于:opt(opt(1, 2, 3), opt(4, 5, 6))。如果opt为求和、求最大值的话,可以使用,但是如果是求平均值的话,则不适用。
因为每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能。是MapReduce的一种优化手段。
Combiner总结:
在实际的Hadoop集群操作中,我们是由多台主机一起进行MapReduce的,
如果加入规约(Combiner)操作,每一台主机会在reduce之前进行一次对本机数据的规约,
然后在通过集群进行reduce操作,这样就会大大节省reduce的时间,
从而加快MapReduce的处理速度
Partitioner理解
MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R)。
用户在中间key上使用分区函数来对数据进行分区,之后在输入到后续任务执行进程。一个默认的分区函数式使用hash方法(比如常见的:hash(key) mod R)进行分区。hash方法能够产生非常平衡的分区。
默认的分区函数HashPartitioner
Partitioner小结:分区Partitioner主要作用在于以下两点
- 根据业务需要,产生多个输出文件
- 多个reduce任务并发运行,提高整体job的运行效率
Shuffle理解
Shuffle是什么
针对多个map任务的输出按照不同的分区(Partition)通过网络复制到不同的reduce任务节点上,这个过程就称作为Shuffle。
Hadoop的shuffle过程就是从map端输出到reduce端输入之间的过程,这一段应该是Hadoop中最核心的部分,因为涉及到Hadoop中最珍贵的网络资源,所以shuffle过程中会有很多可以调节的参数,也有很多策略可以研究
map过程的输出是写入本地磁盘而不是HDFS,但是一开始数据并不是直接写入磁盘而是缓冲在内存中,缓存的好处就是减少磁盘I/O的开销,提高合并和排序的速度。又因为默认的内存缓冲大小是100M(当然这个是可以配置的),所以在编写map函数的时候要尽量减少内存的使用,为shuffle过程预留更多的内存,因为该过程是最耗时的过程。
再来详细梳理下整个流程
- 在map端首先是InputSplit,在InputSplit中含有DataNode中的数据,每一个InputSplit都会分配一个Mapper任务,Mapper任务结束后产生<K2,V2>的输出,这些输出先存放在缓存中,每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spil l.percent),一个后台线程就把内容写到(spill)Linux本地磁盘中的指定目录(mapred.local.dir)下的新建的一个溢出写文件。
- 写磁盘前,要进行partition、sort和combine等操作。通过分区,将不同类型的数据分开处理,之后对不同分区的数据进行排序,如果有Combiner,还要对排序后的数据进行combine。等最后记录写完,将全部溢出文件合并为一个分区且排序的文件。
- 最后将磁盘中的数据送到Reduce中,图中Map输出有三个分区,有一个分区数据被送到图示的Reduce任务中,剩下的两个分区被送到其他Reducer任务中。而图示的Reducer任务的其他的三个输入则来自其他节点的Map输出。
- Copy阶段:Reducer通过Http方式得到输出文件的分区。
reduce端可能从n个map的结果中获取数据,而这些map的执行速度不尽相同,当其中一个map运行结束时,reduce就会从JobTracker中获取该信息。map运行结束后TaskTracker会得到消息,进而将消息汇报给JobTracker,reduce定时从JobTracker获取该信息,reduce端默认有5个数据复制线程从map端复制数据 - Merge阶段:如果形成多个磁盘文件会进行合并。
从map端复制来的数据首先写到reduce端的缓存中,同样缓存占用到达一定阈值后会将数据写到磁盘中,同样会进行partition、combine、排序等过程。如果形成了多个磁盘文件还会进行合并,最后一次合并的结果作为reduce的输入而不是写入到磁盘中
3.Reducer的参数:最后将合并后的结果作为输入传入Reduce任务中
总结:当Reducer的输入文件确定后,整个Shuffle操作才最终结束。之后就是Reducer的执行了,最后Reducer会把结果存到HDFS上。
Hadoop的压缩
Shuffle过程中看到,map端在写磁盘的时候采用压缩的方式将map的输出结果进行压缩是一个减少网络开销很有效的方法
MapReduce排序分组
排序: 在Hadoop默认的排序算法中,只会针对key值进行排序。
自定义排序:
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
自定义类型MyNewKey实现了WritableComparable的接口,
该接口中有一个compareTo()方法,当对key进行比较时会调用该方法,而我们将其改为了我们自己定义的比较规则,从而实现我们想要的效果
分组:在Hadoop中的默认分组规则中,也是基于Key进行的,会将相同key的value放到一个集合中去。
自定义分组:
public interface RawComparator<T> extends Comparator<T> {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}
public interface Comparator<T> {
int compare(T o1, T o2);
boolean equals(Object obj);
}
自定义了一个分组比较器MyGroupingComparator,该类实现了RawComparator接口,而RawComparator接口又实现了Comparator接口,这两个接口的定义:
Hadoop数据类型
Hadoop MapReduce操作的是键值对,但这些键值对并不是Integer、String等标准的Java类型。为了让键值对可以在集群上移动,Hadoop提供了一些实现了WritableComparable接口的基本数据类型,以便用这些类型定义的数据可以被序列化进行网络传输、文件存储与大小比较。
- 值:仅会被简单的传递,必须实现Writable或WritableComparable接口。
- 键:在Reduce阶段排序时需要进行比较,故只能实现WritableComparable接口。
类 | 描述 |
---|---|
BooleanWritable | 标准布尔变量的封装 |
ByteWritable | 单字节数的封装 |
DoubleWritable | 双字节数的封装 |
FloatWritable | 浮点数的封装 |
IntWritable | 整数的封装 |
LongWritable | Long的封装 |
NullWritable | 无键值时的占位符 |
Text | 使用UTF-8格式的文本封装 |