Mapper
class MapperModule extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
// TODO
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
// TODO
}
}
Reducer
class ReducerModule extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
// TODO
}
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
// TODO
}
}
Runner
public class RunnerModule implements Tool {
private Configuration conf = null;
// setConf
public void setConf(Configuration conf) {
conf.set("fs.defaultFS", "hdfs://hive.liangxw.CentOS.com:9000");
conf.setInt("RUN_TIMES", 1);
// 设置系统变量,解决hdfs权限问题
System.setProperty("HADOOP_USER_NAME", "liangxw");
// 设置分隔符
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "#");
// 设置map输出压缩
conf.set("mapreduce.map.output.compress", "true");
conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
this.conf = conf;
}
// getConf
public Configuration getConf() {
return this.conf;
}
// run
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
FileSystem fs = FileSystem.get(conf);
// set job
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(WCRunner.class);
// 输入输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 如果输出路径存在则删除
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
// 指定输入文件格式
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// mapper
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// ======================shuffle====================
// 1: partitioner
job.setPartitionerClass(partitioner.class);
// 2: sort
job.setSortComparatorClass(sorter.class);
// 3: combiner
job.setCombinerClass(WCReducer.class);
// 4: compress
// 在conf处进行设置
// 5: group
job.setGroupingComparatorClass(grouper.class);
// ======================shuffle====================
// reducer
job.setReducerClass(WCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置reduce输出压缩
FileOutputFormat.setCompressionOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// 设置顺序文件输出压缩
SequenceFileOutputFormat.setCompressOutput(job,true);
SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
// submit job
boolean isSucces = job.waitForCompletion(true);
return isSucces ? 0 : 1;
}
// main
public static void main(String[] args) throws Exception {
args = new String[]{
"/user/liangxw/wcinput",
"/user/liangxw/wcoutput"
};
// run job
int status = ToolRunner.run(new WCRunner(), args);
System.exit(status);
}
}