初学Hadoop对学习的东西作一个总结。
Partitioner(分区)
我们知道Hadoop中默认使用的InputFormat是TextInputFormat,它的默认切片方式是一个文件一个切片。如果我们不设置Partitioner,默认只有一个分区,那么即使是不同Mapper按照各自切片Map出来的的KV信息,最终都会归并到一个分区,由于分区和Reducer是一对一的关系,最终所有数据都会由一个Reducer统一处理。
这样有两个坏处:
1.首先分区只有一个,所以Reducer只有一个,那么在KV数据仍然巨大的情况下由一个Reducer执行任务是效率低下的。
2.其次在我们由某些特殊要求的时候,我们可能需要某部分数据有自己的输出文件,由于分区和输出文件也是一对一的关系,我们必须自定义分区。
首先理解分区和Shuffle的关系
这是尚硅谷Hadoop网课的shuffle流程图。我先总结一下数据的处理过程。
这里假设我们有三个文件:
三个文件分别:1.1703.txt 包含1703班的学生成绩,2.1703-1704.txt 1703部分同学和1704部分同学的成绩,3.1704.txt 1704部分同学的成绩
文件格式:班级\t学号\t姓名\t成绩
现在我们要读取三个文件输出1703和1704两个班同学的总成绩。
结合上面流程图:
假设我们的Mapper的 key为一个包含属性:班级+学号的WritableComparable类型的Bean,value为一个包含 姓名+成绩的Writable类型的Bean;
1.提交Job
2.Job按照InputFormat的getSplits方法进行切片,这里假设我们没有设置InputFormat,那么默认的TextInputFormat的切片方式是按照文件切,也就是一个文件一个切片,一共会有三个切片,切片信息被传给Yarn。
3.Yarn会启动三个MapTask分别对每个切片进行处理,三个Map执行完成后会把读取出来的信息按照KV提交。
4.提交的信息会储存在一个环形缓冲区,环形缓冲区会在内存使用80%的时候进行溢写,也就是说一个文件MapTask可能会产生多个溢写小文件。
5.当MapTask完成后,会将它所产生的小文件进行归并,归并的同时会进行分区和排序。
(此次归并为将同一个MapTask溢写的多个小文件归并成一个大文件)
6.(假设我们之前按照班级自定义了分区),归并的时候会把所有小文件归并成一个大文件,大文件会按照我们设置的分区,将文件内容分成几块,这里会产生两个分区,分别为1703班的分区和1704班的分区,并且1703和1704分区中的key会按照compareTo方法进行排序,每个分区的排序互不干扰。(注意此时虽然产生分区,但是仍然在一个文件中)。
排序后结果应如下图:
同一块数据不同分区的文件各自排序,互不影响
7.最后每个Reducer来拉去自己分区的数据,拉去时按分区再次进行归并,将不同MapTask产生的同一分区的数据放在一起,且归并过程会把不同MapTask同一分区的key再次排序这次排序是归并排序,因为不同MapTask中每个分区的key已经有序),把最终该分区的数据传递给该分区的Reducer。
(这里我们看一下其中一个分区的模拟归并结果,文件只是模拟过程,实际过程是在内存中)
(此次归并是将不同MapTask归并产生的大文件再次进行归并,产生一个更大的最终结果文件)
注意它的排序不可能是6712345,而肯定是1234567,因为这里它会进行一次归并排序(把两个有序的文件进行排序的排序算法)。
8.最后将数据写入磁盘等待Reducer读取,Reducer按照分区读取数据。
我之前也有过疑问,为什么这么麻烦排两次,直接先合并在一起再排序一次不就行了吗?
要考虑hadoop的集群环境,我们要知道第一次排序它的数据就是在数据所处的Map下,也就是负责第一个切片的Mapper负责第一个文件也就是第一个切片的排序,把数据再提交给另一个服务器汇总的时候不由它管,进行汇总的服务器因为拿到的是已经排过序的许多小文件,所以排序比较轻松,但如果直接提交给另一个服务器,那么那个服务器可能需要对所有数据进行统一排序(非归并排序),可能会导致工作量巨大,所以两次排序其实也是为了减少第二次排序时服务器的压力。
combine过程这里没有讲,因为在shuffle中不是必须步骤。
代码实现:
非源码解析
我们这里简单的实现分区的使用,不统计总成绩,但是把不同班级的学生信息按照班级分区,并把同一班级的学生按照学号进行排序。
1.序列化类ClassAndIdBean 储存学生的班级和学号,因为它作为key要实现默认排序,所以这里我们实现WritableComparable接口,实现它的compareTo方法,按照学号进行升序排序。并实现了它的序列化和反序列化方法,重写了toString。
2.序列化类StudentBean,储存了学生的姓名和分数,因为它不作Key,我们实现Writable接口,实现它的序列化和反序列化方法,并重写toString。
3.Mapper类,简单的切割封装。
4.Reducer,因为我们的Map结果就是我们想要的结果所以直接写出就好。
5.Driver
6.Partitioner,按照班级进行分区,1703一个分区,1704一个分区。其他班级一个分区
结果:
先把Driver中的Parttition设置注释掉:
运行结果为一个文件,按照学号进行排序了,但是并没有区分班级:
把注释取消:
三个文件,分别对应1703,1704和其他班级,因为无其他班级,所以这里为空。
写到这里出现了新的疑惑,观察注释掉Partitioner的分区,MapReduce是如何判断两个key相同呢?我之前认为是通过compareTo方法判断,但是若是这样,那么1703班和1704班相同学号的同学应被认为是同一个key,为什么两个key又都输出出来了呢?还是得好好研究一下。