Partitioner与自定义Partitioner

通过前面的学习我们知道Mapper最终处理的键值对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer节点中进行归并。哪个key到哪个Reducer的分配过程,是由Partitioner规定的。在一些集群应用中,例如分布式缓存集群中,缓存的数据大多都是靠哈希函数来进行数据的均匀分布的,在Hadoop中也不例外。

Hadoop内置Partitioner

MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R)。用户在中间key上使用分区函数来对数据进行分区,之后在输入到后续任务执行进程。一个默认的分区函数式使用hash方法(比如常见的:hash(key) mod R)进行分区。hash方法能够产生非常平衡的分区,鉴于此,Hadoop中自带了一个默认的分区类HashPartitioner,它继承了Partitioner类,提供了一个getPartition的方法,它的定义如下所示:

/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner<K, V> extends Partitioner<K, V> {
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

现在我们来看看HashPartitoner所做的事情,其关键代码就一句:(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

这段代码实现的目的是将key均匀分布在Reduce Tasks上,例如:如果Key为Text的话,Text的hashcode方法跟String的基本一致,都是采用的Horner公式计算,得到一个int整数。但是,如果string太大的话这个int整数值可能会溢出变成负数,所以和整数的上限值Integer.MAX_VALUE(即0111111111111111)进行与运算,然后再对reduce任务个数取余,这样就可以让key均匀分布在reduce上。

image

自己定制Partitioner

流量汇总程序开发
这里添加了新需求,要求流量汇总统计并按省份区分。也就是说不但计算每个用户(手机号)的上行流量,下行流量,总流量外,还要按照每个手机号所属不同的省份来将计算结果写到不同的文件中(假如共4个省份,那么需要将输出结果写到4个文件中,也就是说有4个分区每个分区对应一个reduce task)。

public class Flowcount {
    /**
     * KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,Long,但是在hadoop中有自己的
     * 更精简的序列化接口(Seria会将类结构都序列化,而实际我们只需要序列化数据),所以不直接用Long,而用LongWritable
     * VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String,同上,用Text
     * KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key
     * VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value
     * @author 12706
     *
     */
    static class FlowcountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            //输入为1234    23455   33333   33333(中间是制表符)
            //第二列为手机号,倒数第二列为下行流量,倒数第三列为上行流量
            String line = value.toString();
            String[] values = line.split("\t");
            //获取手机号
            String phoneNum = values[1];
            //获取上行流量下行流量
            long upFlow = new Long(values[values.length-3]);
            long downFlow = new Long(values[values.length-2]);
            //封装好后写出到输出收集器
            context.write(new Text(phoneNum), new FlowBean(upFlow,downFlow));
        }
    }
    /**
     * KEYIN VALUEIN对应mapper输出的KEYOUT KEYOUT类型对应
     * KEYOUT,VALUEOUT:是自定义reduce逻辑处理结果的输出数据类型
     * KEYOUT
     * VALUEOUT
     * @author 12706
     *
     */
    static class FlowcountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
        @Override
        protected void reduce(Text key, Iterable<FlowBean> beans,Context context)
                throws IOException, InterruptedException {
            //传进来的实例<13345677654,beans>,即多个该电话的键值对
            //取出values获得上下行和总流量求和
            long upFlow = 0;
            long downFlow = 0;
            for (FlowBean flowBean : beans) {
                upFlow += flowBean.getUpFlow();
                downFlow += flowBean.getDownFlow();
            }
            context.write(key, new FlowBean(upFlow,downFlow));
        }
    }
    /**
     * 相当于一个yarn集群的客户端
     * 需要在此封装mr程序的相关运行参数,指定jar包
     * 最后提交给yarn
     * @author 12706
     * @param args
     * @throws Exception
     *
     */
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //指定Partitioner
        job.setPartitionerClass(FlowPartitioner.class);
        //设置reduce task数量
        job.setNumReduceTasks(5);

        job.setJarByClass(Flowcount.class);
        //指定本业务job要使用的mapper,reducer业务类
        job.setMapperClass(FlowcountMapper.class);
        job.setReducerClass(FlowcountReducer.class);
        //虽然指定了泛型,以防框架使用第三方的类型
                //指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        //指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //指定job输入原始文件所在位置
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job输入原始文件所在位置
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        //将job中配置的相关参数以及job所用的java类所在的jar包,提交给yarn去运行
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

public class FlowBean implements Writable{

    private long upFlow;//上行流量

    private long downFlow;//下行流量

    private long totalFlow;//总流量

    //序列化时需要无参构造方法
    public FlowBean() {
    }

    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.totalFlow = upFlow + downFlow;
    }

    //序列化方法 hadoop的序列化很简单,要传递的数据写出去即可
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(totalFlow);
    }
    //反序列化方法 注意:反序列化的顺序跟序列化的顺序完全一致
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.totalFlow = in.readLong();
    }
    //重写toString以便展示
    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + totalFlow;
    }
    get,set方法
}

/**
 * Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask
 *默认的分发规则为:根据key的hashcode%reducetask数来分发
 *所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner
 *自定义一个CustomPartitioner继承抽象类:Partitioner
 *然后在job对象中,设置自定义partitioner: job.setPartitionerClass(CustomPartitioner.class)
 * @author 12706
 *
 */
public class FlowPartitioner extends Partitioner<Text, FlowBean>{

    private static HashMap<String, Integer> map = new HashMap<String, Integer>();

    static {
        //模拟手机号归属地 0:北京,1:上海,2:广州,3:深圳,4:其它
        map.put("135", 0);
        map.put("136", 1);
        map.put("137", 2);
        map.put("138", 3);
    }
    //返回分区号
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        //进来的数据是<13567898766,flowbean1>,flowbean1中封装了上下行流量,总流量
        String phoneNum = key.toString();
        //截取手机号前3位
        String num = phoneNum.substring(0, 3);
        //获取对应的省
        Integer provinceId = map.get(num);
        return provinceId==null?4:provinceId;
    }

}

测试程序
将工程打jar包到本地,上传到linux,启动hadoop集群
数据以及在hdfs下的文件均使用流量汇总程序中的。使用以下命令

[root@mini2 ~]# hadoop jar flowcount.jar com.scu.hadoop.partitioner.Flowcount /flowcount/input /flowcount/output
17/10/09 10:47:49 INFO mapreduce.JobSubmitter: number of splits:1
17/10/09 10:47:49 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1507516839481_0001
17/10/09 10:47:50 INFO impl.YarnClientImpl: Submitted application application_1507516839481_0001
17/10/09 10:47:50 INFO mapreduce.Job: The url to track the job: http://mini1:8088/proxy/application_1507516839481_0001/
17/10/09 10:47:50 INFO mapreduce.Job: Running job: job_1507516839481_0001
17/10/09 10:47:58 INFO mapreduce.Job: Job job_1507516839481_0001 running in uber mode : false
17/10/09 10:47:58 INFO mapreduce.Job:  map 0% reduce 0%
17/10/09 10:48:03 INFO mapreduce.Job:  map 100% reduce 0%
17/10/09 10:48:13 INFO mapreduce.Job:  map 100% reduce 20%
17/10/09 10:48:14 INFO mapreduce.Job:  map 100% reduce 40%
17/10/09 10:48:19 INFO mapreduce.Job:  map 100% reduce 100%
17/10/09 10:48:20 INFO mapreduce.Job: Job job_1507516839481_0001 completed successfully
17/10/09 10:48:21 INFO mapreduce.Job: Counters: 50
        File System Counters
                FILE: Number of bytes read=863
                FILE: Number of bytes written=642893
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=2278
                HDFS: Number of bytes written=551
                HDFS: Number of read operations=18
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=10
        Job Counters 
                Killed reduce tasks=1
                Launched map tasks=1
                Launched reduce tasks=5
                Data-local map tasks=1
    ...

从打印信息可以看到切片splits为1,即一个maptask从Job Counters可以看出map tasks=1,reduce tasks=5所以输出文件应该也有5个。
查看输出

[root@mini2 ~]# hadoop fs -ls /flowcount/output
-rw-r--r--   2 root supergroup          0 2017-10-09 10:48 /flowcount/output/_SUCCESS
-rw-r--r--   2 root supergroup         84 2017-10-09 10:48 /flowcount/output/part-r-00000
-rw-r--r--   2 root supergroup         53 2017-10-09 10:48 /flowcount/output/part-r-00001
-rw-r--r--   2 root supergroup        104 2017-10-09 10:48 /flowcount/output/part-r-00002
-rw-r--r--   2 root supergroup         22 2017-10-09 10:48 /flowcount/output/part-r-00003
-rw-r--r--   2 root supergroup        288 2017-10-09 10:48 /flowcount/output/part-r-00004

确实是5个文件
查看每个文件内容

[root@mini2 ~]# hadoop fs -cat /flowcount/output/part-r-00000
13502468823     7335    110349  117684
13560436666     1116    954     2070
13560439658     2034    5892    7926
[root@mini2 ~]# hadoop fs -cat /flowcount/output/part-r-00001
13602846565     1938    2910    4848
13660577991     6960    690     7650
...

按照省份划分了5个文件,每个文件里面有对应省份手机号与计算出的上行流量,下行流量,总流量。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,980评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,178评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,868评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,498评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,492评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,521评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,910评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,569评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,793评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,559评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,639评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,342评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,931评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,904评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,144评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,833评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,350评论 2 342

推荐阅读更多精彩内容