1.Shuffle机制
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
2.Shuffle排序机制
- hadoop排序是使用WritableComparator对象
- 实现排序的方法:
- 1.直接让参与对比的对象实现WritableComparable 接口,并指定泛型,实现compareTo方法,实现比较规则
- 2.自定义一个比较器对象,需要继承WritableComparator类,重写compare的方法。注意在构造器中调用父类对当前要参与比较的对象进行实例化。当前要参与比较的对象必须要实现WritableComparable接口,最后在driver类中指定自定义的比较器对象
//自定义的比较器对象
public class FlowBeanWritableComparator extends WritableComparator {
// 指定当前自定义的比较器对象为谁服务
// 注意在构造器中调用父类对当前要参与比较的对象进行实例化。
public FlowBeanWritableComparator() {
super(FlowBean.class, true);
}
/**
* 自定义比较规则
* @param a
* @param b
* @return
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
FlowBean abean = (FlowBean) a;
FlowBean bbean = (FlowBean) b;
System.out.println("aa"+abean);
System.out.println("bb"+bbean);
return -abean.getSumFlow().compareTo(((FlowBean) b).getSumFlow());
}
}
// 指定自定义的比较器对象
job.setSortComparatorClass(FlowBeanWritableComparator.class);
3.Shuffle排序源码分析
// 为当前Job中的对象获取比较器对象
comparator = job.getOutputKeyComparator();
// 获取比较器对象的核心逻辑
public RawComparator getOutputKeyComparator() {
// 在当前Job中获取比较器对象的class文件--> mapreduce.job.output.key.comparator.class
Class<? extends RawComparator> theClass = getClass(
JobContext.KEY_COMPARATOR, null, RawComparator.class);
// 如果通过JobContext.KEY_COMPARATOR 获取到比较器对象
if (theClass != null){
return ReflectionUtils.newInstance(theClass, this);
}
// 如果通过JobContext.KEY_COMPARATOR 获取不到比较器对象
// Hadoop 会默认获取比较器对象 通过调用WritableComparator对象的get方法获取,
// 在获取之前有个前提 判断当前job的MapOutputKeyClass 是否实现了WritableComparable接口
return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
}
4.hadoop如何给自身的数据类型获取比较器
1). 自身的数据类型已经实现WritableComparable接口
2). 自身的数据类型对象中 已经通过构造函数创建比较器对象
// 以Text为例
public static class Comparator extends WritableComparator {
public Comparator() {
super(Text.class);
}
3). 自身的数据类型对象中 通过静态代码块把 当前对象的class 和 它的比较器对象
放入一个Map进行了维护。
static {
// register this comparator
WritableComparator.define(Text.class, new Comparator());
}
public static void define(Class c, WritableComparator comparator) {
comparators.put(c, comparator);
}
5.Shuffle的combiner流程使用和注意事项
概念:是Shuffle过程中的一个可选流程(优化手段)
可以为Map阶段计算完的数据进行提前汇总,主要考虑到 减少 从Map阶段到
Reduce阶段的数据传输的大小控制以及减少Reduce端的计算压力。
使用场景:当不考虑多个MapTask的整体数据关联关系的时候才使用。