例子实现目标
该代码实现的是在输入的数据对中,先以第一列由小到大排序,如果第一列值相等,以第二列由小到大排序。即:
添加cp.txt文件到input文件夹
$vim cp.txt
$hadoop fs -put cp.txt /input/
5,1
3,2
1,3
4,3
2,3
1,4
1,2
2,5
输出结果
1,2
1,3
1,4
2,3
2,5,
3,2
4,3
5,1
附图:
实践例子
1.终端执行>start-all.sh
2.input文件夹下增加cp.txt文件
3.打开eclipse
4.新建mapreduce项目,新建包(命名mr),新建类(命名MySortClass )类代码如下:
5.右键,选择run as hadoop
6.右键refresh一下hadoop文件,成功后output下会出现成功排序的结果文件
package mr;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import mr.MyWordCount.MyMapper;
import mr.MyWordCount.MyReduce;
public class MySortClass {
static class MySortMapper extends Mapper<LongWritable, Text, A, NullWritable>{
public void map(LongWritable k1, Text v1, Context context)
throws java.io.IOException, java.lang.InterruptedException
{
String[] lines= v1.toString().split(",");
A a1=new A(Long.parseLong(lines[0]),Long.parseLong(lines[1]));
context.write(a1, NullWritable.get());
System.out.println("map......");
}
}
static class MySortReduce extends Reducer<A, NullWritable, Text, Text>{
public void reduce(A k2, Iterable<NullWritable> v2, Context context) throws java.io.IOException, java.lang.InterruptedException
{
context.write(new Text(new Long(k2.firstNum).toString()), new Text(new Long(k2.secondNum).toString()));
System.out.println("reduce......");
}
}
private static class A implements WritableComparable<A> {
long firstNum;
long secondNum;
public A() {
}
public A(long first, long second) {
firstNum = first;
secondNum = second;
}
public void write(DataOutput out) throws IOException {
out.writeLong(firstNum);
out.writeLong(secondNum);
}
public void readFields(DataInput in) throws IOException {
firstNum = in.readLong();
secondNum = in.readLong();
}
/*
* 当key进行排序时会调用以下这个compreTo方法
*/
@Override
public int compareTo(A anotherKey) {
long min = firstNum - anotherKey.firstNum;
if (min != 0) {
// 说明第一列不相等,则返回两数之间小的数
return (int) min;
} else {
return (int) (secondNum - anotherKey.secondNum);
}
}
}
private static String INPUT_PATH="hdfs://master:9000/input/cp.txt";
private static String OUTPUT_PATH="hdfs://master:9000/output/c/";
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(new URI(OUTPUT_PATH),conf);
if(fs.exists(new Path(OUTPUT_PATH)))
fs.delete(new Path(OUTPUT_PATH));
Job job=new Job(conf,"myjob");
job.setJarByClass(MySortClass.class);
job.setMapperClass(MySortMapper.class);
job.setReducerClass(MySortReduce.class);
//job.setCombinerClass(MySortReduce.class);
job.setMapOutputKeyClass(A.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
/*如果map和reduce的<key,value>类型是一样的,
则仅设置job.setOutputKeyClass();job.setOutputValueClass();即可*/
FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
job.waitForCompletion(true);
}
}
部分代码理解
类A实现了WritableComparable,设置了两个属性firstNum; secondNum;
String[] lines= v1.toString().split(",");
读取一行(5,1)以逗号分隔,两个元素(5)(1)存入数组lines
A a1=new A(Long.parseLong(lines[0]),Long.parseLong(lines[1]));
Long.parseLong(lines[0])将string类型的“5”转化为long类型,a1.firstNum=5;a1.secondNum=1;
context.write(a1, NullWritable.get());
写入上下文,设置map的输出为<key,空>,不能使用new NullWritable()来定义,获取空值只能NullWritable.get()来获取
context.write(new Text(new Long(k2.firstNum).toString()), new Text(new Long(k2.secondNum).toString()));
reduce生成新的键值对,如:将<(5,1),null>转化为<5,1>
以map->reduce集群处理流程理解该例子(假设文件庞大)
1.首先对输入文件分片(inputSplit),假设分片大小为三行,那么分为三片:
5,1
3,2
1,3
4,3
2,3
1,4
1,2
2,5
2.三片交由三个map进程处理,生成键值对<a1,null>,为减少带宽负荷,在本地节点上做了排序,分区(partitioner,数据做了分区标记)输出结果:
(如果有需要在分区之前还可以进行combiner(本地reduce操作,详情请见文章《了解MapReduce》底部对combiner的解释),这里分区之前不需要combiner)
<(1,3),null>
<(3,2),null>
<(5,1),null>
<(1,4),null>
<(2,3),null>
<(4,3),null>
<(1,2),null>
<(2,5),null>
- 然后就是所有节点洗牌(shuffle),将各个节点上同个分区的数据放置到一个节点中,放置过去后做了排序:
<(1,2),null>
<(1,3),null>
<(1,4),null>
<(2,3),null>
<(2,5),null>
<(3,2),null>
<(4,3),null>
<(5,1),null>
- 最后就是reduce,生成新键值对并生成最后排序结果
(1,2)
(1,3)
(1,4)
(2,3)
(2,5)
(3,2)
(4,3)
(5,1)
总的来说就是:map(本地)->combiner(本地)->partitioner(本地)->shuffle(集群)->reduce(新本地),各部分又还有细节操作,combiner和partitioner属于map阶段的,shuffle属于reduce阶段的。
附图理解: