要是你是个Java开发,那么你肯定听说过MapReduce,下面就来看看这个东东吧
一、简介
- MapReduce:一个分布式计算框架;
- MapReduce存在意义:大大减小用户开发分布式计算的开发难度,从而提高分布式计算的开发效率。因为MapReduce把分布式业务的各种任务调度操作和计算程序与HDFS程序的衔接工作封装了起来,开发人员只需要关注自己的业务逻辑;
- MapReduce结构:
- MRAppMaster:MapReduce Application Master,负责整个程序的过程中MapTask和ReduceTask调度和状态协调的工作;
- MapTask:负责Map阶段的整个数据处理流程;
- ReduceTask:负责Reduce阶段的整个数据处理流程;
二、MapReduce运行流程
- 当我们通过Hadoop运行jar,运行到最后的submit/waitForCompletion之后,Job底层会去获取我们要处理的文件的规模(FileInputFormat中的getSplits方法:获取所有的文件,累加所有的文件大小),根据我们在Job上的参数配置,计算形成这个任务执行规划(任务分片规划:job.split(一个切片对应一个MapTask),规划元信息:job.splitmetainfo,输入输出配置:job.xml),然后把这个规划内容除去原信息和我们上传的jar提交给YARN;
- YARN根据ResourceManager对各个NodeManager的记录情况,选出一个比较清闲的NodeManager作为MRAppMaster,YARN就把规划发送给MRAppMaster,MRAppMaster就会根据规划启动相应个数的MapTask进程,进程启动后,这些进程会使用FileInputFormat打开HDFS的流读取文件数据,同时把读取到的数据传入Mapper的map方法中,map方法处理后会把结果交给OutPutCollector,OutPutCollector会把这些数据写入本地文件;文件中的数据是排序且分区的,有几个ReduceTask就有几个分区,分区是由[Hash]Partitioner组件完成的;
- 当MRAppMaster检测到所有的MapTask完成之后,ResourceManager就会根据规划启动(有可能提前启动)相应台数的ReduceTask,这些ReduceTask就会去获取MapTask输出文件的特定分区数据,把所有MapTask的特定分区全部获取到之后进行归并排序,然后按照相同key的KV为一个组,调用Reducer的reduce()方法进行逻辑运算;其中,每个MapTask的内容都会按照分区被对应的ReduceTask读取;Reduce的结果就会到OutputFormat对象中,OutputFormat会根据当前的对接口的实现决定把文件做怎样的输出,可能是网络IO,也可能是HDFS等等,要看OutputFormat的实现是什么样子;
三、集成过程
- 引入Maven:Common/CommonLib/HDFS/HDFSLib/MapReduce/MapReduceLib/YARN/YARNLib
- 编写MapTask程序:实现类org.apache.hadoop.mapreduce.Mapper子类的map方法
- Mapper泛型:
- KEYIN: 默认情况下,是mr框架所读到的一行Long类型文本的起始偏移量,但是在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable;
- VALUEIN:默认情况下,是mr框架所读到的一行String类型文本的内容,用Text;
- KEYOUT:是用户自定义逻辑处理完成之后输出String类型数据中的key,在此处是单词,用Text;
- VALUEOUT:是用户自定义逻辑处理完成之后输出Integer类型数据中的value,在此处是单词次数,用IntWritable;
- MapTask会对每一行输入数据调用一次我们自定义的map()方法;
- map阶段的业务逻辑就写在自定义的map()方法中;
- 在整个MapTask运行完毕之前,MapTask会把每次的结果保存在一个临时文件中;
- Mapper泛型:
- 编写ReduceTask程序:实现类org.apache.hadoop.mapreduce.Reducer子类的reduce方法
- Reducer泛型:
- KEYIN, VALUEIN对应Mapper输出的KEYOUT,VALUEOUT类型;
- KEYOUT, VALUEOUT 是自定义reduce逻辑处理结果的输出数据类型;
- KEYOUT是单词;
- VLAUEOUT是总次数;
- Reducer泛型:
- 编写YARN程序来采集程序运行相关的参数,将下面代码写在main方法中:
if (args == null || args.length == 0) { args = new String[2]; args[0] = "hdfs://master:9000/wordcount/input/wordcount.txt"; args[1] = "hdfs://master:9000/wordcount/output8"; } Configuration conf = new Configuration(); /** // 设置Hadoop用户名,在win上开发需要设置 //conf.set("HADOOP_USER_NAME", "hadoop"); //conf.set("dfs.permissions.enabled", "false"); **/ /*conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.resoucemanager.hostname", "mini1");*/ Job job = Job.getInstance(conf); /*job.setJar("/home/shreker/wordcount.jar");*/ //指定本程序的jar包所在的本地路径 job.setJarByClass(WordcountDriver.class); //指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); //指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定Reduce最终输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 指定需要缓存一个文件到所有的maptask运行节点工作目录 /* job.addArchiveToClassPath(archive); */// 缓存jar包到task运行节点的classpath中 /* job.addFileToClassPath(file); */// 缓存普通文件到task运行节点的classpath中 /* job.addCacheArchive(uri); */// 缓存压缩包文件到task运行节点的工作目录 /* job.addCacheFile(uri) */// 缓存普通文件到task运行节点的工作目录 //指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path(args[0])); //指定job的输出结果所在目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行 /*job.submit();*/ boolean successful = job.waitForCompletion(true); System.exit(successful ? 0 : 1);
- 把这几个类达成一个jar;
- 启动HDFS和YARN集群;
- 上传到集群的任何一个节点上;
- 在HDFS上创建一个文件夹,把要处理的文件上传到这个文件夹下;
- 运行jar(-cp:-classpath)
# java程序依赖运行 java -cp <jar-path>[:<jar-path>[:<jar-path>...]] <main-package-class> <main-class-params> # hadoop中jar的使用 hadoop <jar-path> <main-package-class> <main-class-params>
四、MapReduce数据流向
- HDFS数据流
- InputFormat:主要定义读取数据的方式,如一次读取多少字节,还是一次读取一行等
- RecordReader:通过read方法开始从数据流中读取数据;
- Mapper:数据流经我们编写的map方法;
- OutPutCollector:map方法输出数据收集器;
- MapOutputBuffer(环形缓冲区)
- mr.sort.size:100;
- Partitioner:默认使用HashPartitioner,即按照key的Hash值分区;
- SpillRecord:
- 每次溢出产生一个文件,每个文件都是分好区的;
- 数据处理完成后,合并所有的文件,结果文件也是分区合并的(使用归并排序);
- Combiner:本质还是一个Reducer,作用是减少网络IO,即提前进行reduce操作;但是注意,不是所有的操作都适合使用Combiner,只有在不影响业务结果的情况下才能使用;
- Reducer:对所有的数据进行汇总统计工作,注意:每个ReduceTask都是去获取所有Map输出文件的同一个分区获取数据;
- OutputFormat:定义目标文件;
- RecordWriter:根据目标文件进行写入;
五、自定义MapTask分区
- 实现一个类继承自
org.apache.hadoop.mapreduce.Partitioner
,封装分区的业务逻辑; - 在Job中设置分区个数(
job.setNumReduceTasks
),和分区的实现类(job.setPartitionerClass
);
六、MapTask任务切片规划的机制
- 任务切片规划讲的是,任务的切片的生成过程
- 切片的原则:
- 简单地按照文件的内容长度进行切片;
- 切片大小,默认等于block大小;
- 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片;
- 当程序运行到submit/waitForCompletion的时候,InputFormat类中的getSplit()切片简要过程如下:
- 获取到要处理的数据所在文件夹;
- 遍历这个文件夹下的每个文件,对每个文件做切片运算,主要是欺骗大小的计算,如:
computeSplitSize::Math.max(minSize, Math.min(goalSize, blockSize))
,并把切片运算结果记录在job.split文件中,并计算所有的文件并做切片运算,都记录在job.split文件中; - 需要注意的是,在做切片的时候还有很多的细节,比如,在切片的最后的时候,还要判断剩下的是不是只有一点(block的1.1倍),如果只有一点,那么根据简单切片原则的最后这个切片就直接归在倒数第二个切片上,这算是对切片的一些优化内容,如果想知道的更详细可以跟踪源码查看,这里就不再赘述了;
七、序列化反序列化
- 我们在编写Java程序的时候序列化一般都是实现接口
java.io.Serializable
,但是这个接口有个缺点,就是会把集成体系结构中所有的数据都会进行序列化; - 在MapReduce体系中,有自己一套序列化和反序列化的工具
- String → Text
- int → IntWritable
- long → LongWritable
- ...
- 自定义类 → 实现接口
org.apache.hadoop.io.WritableComparable
,参考这里;- Comparable接口主要用来比对Key是否是同一个Key;
八、小文件处理
- 在Job上设置InputFormatClass,这个仅仅只是把多个小文件看做是一个切片,没有进行物理的合并,在数据读取的时候进行:
job.setInputFormatClass(CombineTextInputFormat.class);
- 同时限制切片的上线和下限
CombineTextInputFormat.setMaxInputSplitSize(<job>, <max-bytes-size>); CombineTextInputFormat.setMinInputSplitSize(<job>, <min-bytes-size>);
- MapTask的个数与CPU的核心个数一样的时候,处理效率最高
九、超大文件处理
- 如果要处理的文件比较大,那么就可以通过设置提高blocksize的大小
十、并行度
- MapTask的并行度:由Job切片数决定
- ReduceTask的并行度:可以手动设置,默认值是1:
job.setNumReduceTasks(3);
十一、Combiner
- Combiner是一种特殊的Reducer,因为事先Combiner组件要继承Reducer,该组件位于Map和Reduce之间;
- Combiner的意义:局部Reduce,以减少网络IO,提供系统性能;
- Combiner的实现:
- 实现一个继承Reducer的类,这个类就是一个Combiner;
- 在Job对象上设置Combiner类:
job.setCombinerClass(<Combiner>.class)
- 注意:
- 不是任何地方都能够使用Combiner,前提是不能影响业务逻辑;
- Combiner输出的类型要与Reduce输入一致,输入与Map输出一致;
十二、Shuffle
- shuffle:指的是Map把数据传递给Reduce的这个流程,其核心就是:数据分区和排序;
- shuffle流程再现:
- OutPutCollector收集MapTask输出的KV对,放到环形缓冲区中;
- 环形缓冲区数据的不断流入而溢出本地磁盘文件,每次溢出形成一个文件,其中调用组件Partitioner对数据进行分区以及针对key的排序,多个溢出文件会被合并成大的溢出文件,如果设置了Combiner组件就会进行Combine操作;
- ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据;
- ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会利用归并排序将这些文件再进行合并;
- 合并成大文件后,Shuffle的过程也就结束了,后面进入Reduce流程;
十三、MapReduce运行模式
- 本地运行:如果我们在win上运行程序,默认情况下吧程序提交到一个本地的模拟器上,使用线程模拟YarnChild;如果需要指定的话,在Configuration对象上设置
mapreduce.framework.name
值可以是:"local","yarn"等,还有就是fs.defaultFS
,用来指定输入输出的数据在哪; - 集群运行模式,设置以下参数(当然,如果是打jar在Linux上执行,倒也没有必要设置,因为Hadoop会自动设置):
-
mapreduce.framework.name
为yarn
-
yarn.resourcemanager.hostname
为对应的IP或者主机名 -
fs.defaultFS
为hdfs://<host>:9000
-
- 本地运行集群
- 这样的方式直接运行时会报错的,需要修改源码,因为在本地运行集群的时候,Job的相关参数设置的时候是win形式的配置,当这些配置在运行的时候解析就会出现问题。
可以在其中完成一些SQL完成的示例比如JOIN的逻辑
* 疑问
- Mapper的泛型和Reducer的泛型可以不对应吗?
- Mapper的输出和Reducer的输入必须对应;
- MapTask完成后才分区还是一边执行MapTask一边分区?
- MapOutputBuffer的每次溢出都会产生一个文件,这个文件是分好区的,当所有的数据都完成处理的时候,就会把这些文件合并;所以,是一边执行MapTask一边分区;
- ReduceTask获取所有的MapTask数据之后进行归并排序吗?如果每个MapTask的输出文件都很大,每个数据分区也都很大呢。。。