通过前面的学习我们知道Mapper最终处理的键值对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer节点中进行归并。哪个key到哪个Reducer的分配过程,是由Partitioner规定的。在一些集群应用中,例如分布式缓存集群中,缓存的数据大多都是靠哈希函数来进行数据的均匀分布的,在Hadoop中也不例外。
Hadoop内置Partitioner
MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R)。用户在中间key上使用分区函数来对数据进行分区,之后在输入到后续任务执行进程。一个默认的分区函数式使用hash方法(比如常见的:hash(key) mod R)进行分区。hash方法能够产生非常平衡的分区,鉴于此,Hadoop中自带了一个默认的分区类HashPartitioner,它继承了Partitioner类,提供了一个getPartition的方法,它的定义如下所示:
/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
现在我们来看看HashPartitoner所做的事情,其关键代码就一句:(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
这段代码实现的目的是将key均匀分布在Reduce Tasks上,例如:如果Key为Text的话,Text的hashcode方法跟String的基本一致,都是采用的Horner公式计算,得到一个int整数。但是,如果string太大的话这个int整数值可能会溢出变成负数,所以和整数的上限值Integer.MAX_VALUE(即0111111111111111)进行与运算,然后再对reduce任务个数取余,这样就可以让key均匀分布在reduce上。
自己定制Partitioner
流量汇总程序开发
这里添加了新需求,要求流量汇总统计并按省份区分。也就是说不但计算每个用户(手机号)的上行流量,下行流量,总流量外,还要按照每个手机号所属不同的省份来将计算结果写到不同的文件中(假如共4个省份,那么需要将输出结果写到4个文件中,也就是说有4个分区每个分区对应一个reduce task)。
public class Flowcount {
/**
* KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,Long,但是在hadoop中有自己的
* 更精简的序列化接口(Seria会将类结构都序列化,而实际我们只需要序列化数据),所以不直接用Long,而用LongWritable
* VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String,同上,用Text
* KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key
* VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value
* @author 12706
*
*/
static class FlowcountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//输入为1234 23455 33333 33333(中间是制表符)
//第二列为手机号,倒数第二列为下行流量,倒数第三列为上行流量
String line = value.toString();
String[] values = line.split("\t");
//获取手机号
String phoneNum = values[1];
//获取上行流量下行流量
long upFlow = new Long(values[values.length-3]);
long downFlow = new Long(values[values.length-2]);
//封装好后写出到输出收集器
context.write(new Text(phoneNum), new FlowBean(upFlow,downFlow));
}
}
/**
* KEYIN VALUEIN对应mapper输出的KEYOUT KEYOUT类型对应
* KEYOUT,VALUEOUT:是自定义reduce逻辑处理结果的输出数据类型
* KEYOUT
* VALUEOUT
* @author 12706
*
*/
static class FlowcountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
@Override
protected void reduce(Text key, Iterable<FlowBean> beans,Context context)
throws IOException, InterruptedException {
//传进来的实例<13345677654,beans>,即多个该电话的键值对
//取出values获得上下行和总流量求和
long upFlow = 0;
long downFlow = 0;
for (FlowBean flowBean : beans) {
upFlow += flowBean.getUpFlow();
downFlow += flowBean.getDownFlow();
}
context.write(key, new FlowBean(upFlow,downFlow));
}
}
/**
* 相当于一个yarn集群的客户端
* 需要在此封装mr程序的相关运行参数,指定jar包
* 最后提交给yarn
* @author 12706
* @param args
* @throws Exception
*
*/
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//指定Partitioner
job.setPartitionerClass(FlowPartitioner.class);
//设置reduce task数量
job.setNumReduceTasks(5);
job.setJarByClass(Flowcount.class);
//指定本业务job要使用的mapper,reducer业务类
job.setMapperClass(FlowcountMapper.class);
job.setReducerClass(FlowcountReducer.class);
//虽然指定了泛型,以防框架使用第三方的类型
//指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//指定job输入原始文件所在位置
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定job输入原始文件所在位置
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//将job中配置的相关参数以及job所用的java类所在的jar包,提交给yarn去运行
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}
public class FlowBean implements Writable{
private long upFlow;//上行流量
private long downFlow;//下行流量
private long totalFlow;//总流量
//序列化时需要无参构造方法
public FlowBean() {
}
public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.totalFlow = upFlow + downFlow;
}
//序列化方法 hadoop的序列化很简单,要传递的数据写出去即可
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(totalFlow);
}
//反序列化方法 注意:反序列化的顺序跟序列化的顺序完全一致
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.totalFlow = in.readLong();
}
//重写toString以便展示
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + totalFlow;
}
get,set方法
}
/**
* Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask
*默认的分发规则为:根据key的hashcode%reducetask数来分发
*所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner
*自定义一个CustomPartitioner继承抽象类:Partitioner
*然后在job对象中,设置自定义partitioner: job.setPartitionerClass(CustomPartitioner.class)
* @author 12706
*
*/
public class FlowPartitioner extends Partitioner<Text, FlowBean>{
private static HashMap<String, Integer> map = new HashMap<String, Integer>();
static {
//模拟手机号归属地 0:北京,1:上海,2:广州,3:深圳,4:其它
map.put("135", 0);
map.put("136", 1);
map.put("137", 2);
map.put("138", 3);
}
//返回分区号
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
//进来的数据是<13567898766,flowbean1>,flowbean1中封装了上下行流量,总流量
String phoneNum = key.toString();
//截取手机号前3位
String num = phoneNum.substring(0, 3);
//获取对应的省
Integer provinceId = map.get(num);
return provinceId==null?4:provinceId;
}
}
测试程序
将工程打jar包到本地,上传到linux,启动hadoop集群
数据以及在hdfs下的文件均使用流量汇总程序中的。使用以下命令
[root@mini2 ~]# hadoop jar flowcount.jar com.scu.hadoop.partitioner.Flowcount /flowcount/input /flowcount/output
17/10/09 10:47:49 INFO mapreduce.JobSubmitter: number of splits:1
17/10/09 10:47:49 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1507516839481_0001
17/10/09 10:47:50 INFO impl.YarnClientImpl: Submitted application application_1507516839481_0001
17/10/09 10:47:50 INFO mapreduce.Job: The url to track the job: http://mini1:8088/proxy/application_1507516839481_0001/
17/10/09 10:47:50 INFO mapreduce.Job: Running job: job_1507516839481_0001
17/10/09 10:47:58 INFO mapreduce.Job: Job job_1507516839481_0001 running in uber mode : false
17/10/09 10:47:58 INFO mapreduce.Job: map 0% reduce 0%
17/10/09 10:48:03 INFO mapreduce.Job: map 100% reduce 0%
17/10/09 10:48:13 INFO mapreduce.Job: map 100% reduce 20%
17/10/09 10:48:14 INFO mapreduce.Job: map 100% reduce 40%
17/10/09 10:48:19 INFO mapreduce.Job: map 100% reduce 100%
17/10/09 10:48:20 INFO mapreduce.Job: Job job_1507516839481_0001 completed successfully
17/10/09 10:48:21 INFO mapreduce.Job: Counters: 50
File System Counters
FILE: Number of bytes read=863
FILE: Number of bytes written=642893
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2278
HDFS: Number of bytes written=551
HDFS: Number of read operations=18
HDFS: Number of large read operations=0
HDFS: Number of write operations=10
Job Counters
Killed reduce tasks=1
Launched map tasks=1
Launched reduce tasks=5
Data-local map tasks=1
...
从打印信息可以看到切片splits为1,即一个maptask从Job Counters可以看出map tasks=1,reduce tasks=5所以输出文件应该也有5个。
查看输出
[root@mini2 ~]# hadoop fs -ls /flowcount/output
-rw-r--r-- 2 root supergroup 0 2017-10-09 10:48 /flowcount/output/_SUCCESS
-rw-r--r-- 2 root supergroup 84 2017-10-09 10:48 /flowcount/output/part-r-00000
-rw-r--r-- 2 root supergroup 53 2017-10-09 10:48 /flowcount/output/part-r-00001
-rw-r--r-- 2 root supergroup 104 2017-10-09 10:48 /flowcount/output/part-r-00002
-rw-r--r-- 2 root supergroup 22 2017-10-09 10:48 /flowcount/output/part-r-00003
-rw-r--r-- 2 root supergroup 288 2017-10-09 10:48 /flowcount/output/part-r-00004
确实是5个文件
查看每个文件内容
[root@mini2 ~]# hadoop fs -cat /flowcount/output/part-r-00000
13502468823 7335 110349 117684
13560436666 1116 954 2070
13560439658 2034 5892 7926
[root@mini2 ~]# hadoop fs -cat /flowcount/output/part-r-00001
13602846565 1938 2910 4848
13660577991 6960 690 7650
...
按照省份划分了5个文件,每个文件里面有对应省份手机号与计算出的上行流量,下行流量,总流量。