1. pom.xml增加依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.yinbodotcc</groupId>
<artifactId>countwords</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>countwords</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>3.0.3</hadoop.version>
<hive.version>0.13.1</hive.version>
<hbase.version>0.98.6-hadoop2</hbase.version>
</properties>
<dependencies>
<!--dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency-->
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>1.1.0</version>
<classifier>hadoop2</classifier>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.5.0</version>
</dependency>
<!-- hive client -->
<!--dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency-->
<!-- hbase client -->
<!--dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency-->
</dependencies>
</project>
2. Java文件
2.1 测试用例
package chapter3;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;
public class WordCountWithToolsTest {
MapDriver<Object, Text, Text, IntWritable> mapDriver;
ReduceDriver<Text,IntWritable,Text,IntWritable> reduceDriver;
MapReduceDriver<Object, Text, Text, IntWritable, Text,IntWritable> mapReduceDriver;
@Before
public void setUp() {
WordCountWithTools.TokenizerMapper mapper = new WordCountWithTools.TokenizerMapper();
WordCountWithTools.IntSumReducer reducer = new WordCountWithTools.IntSumReducer();
mapDriver = MapDriver.newMapDriver(mapper);
reduceDriver = ReduceDriver.newReduceDriver(reducer);
mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
}
@Test
public void testWordCountMapper() throws IOException {
IntWritable inKey = new IntWritable(0);
mapDriver.withInput(inKey, new Text("Test Quick"));
mapDriver.withInput(inKey, new Text("Test Quick"));
mapDriver.withOutput(new Text(
"Test"),new IntWritable(1));
mapDriver.withOutput(new Text(
"Quick"),new IntWritable(1));
mapDriver.withOutput(new Text(
"Test"),new IntWritable(1));
mapDriver.withOutput(new Text(
"Quick"),new IntWritable(1));
mapDriver.runTest();
}
@Test
public void testWordCountReduce() throws IOException {
ArrayList<IntWritable> reduceInList = new ArrayList<IntWritable>();
reduceInList.add(new IntWritable(1));
reduceInList.add(new IntWritable(2));
reduceDriver.withInput(new Text("Quick"), reduceInList);
reduceDriver.withInput(new Text("Test"), reduceInList);
ArrayList<Pair<Text, IntWritable>> reduceOutList = new ArrayList<Pair<Text,IntWritable>>();
reduceOutList.add(new Pair<Text, IntWritable>(new Text(
"Quick"),new IntWritable(3)));
reduceOutList.add(new Pair<Text, IntWritable>(new Text(
"Test"),new IntWritable(3)));
reduceDriver.withAllOutput(reduceOutList);
reduceDriver.runTest();
}
@Test
public void testWordCountMapReduce() throws IOException {
IntWritable inKey = new IntWritable(0);
mapReduceDriver.withInput(inKey, new Text("Test Quick"));
mapReduceDriver.withInput(inKey, new Text("Test Quick"));
ArrayList<Pair<Text, IntWritable>> reduceOutList = new ArrayList<Pair<Text,IntWritable>>();
reduceOutList.add(new Pair<Text, IntWritable>(new Text(
"Quick"),new IntWritable(2)));
reduceOutList.add(new Pair<Text, IntWritable>(new Text(
"Test"),new IntWritable(2)));
mapReduceDriver.withAllOutput(reduceOutList);
mapReduceDriver.runTest();
}
}
2.2 要测试的MapReduce类
package chapter3;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCountWithTools extends Configured implements Tool {
/**
* <p>
* The mapper extends from the org.apache.hadoop.mapreduce.Mapper interface. When Hadoop runs,
* it receives each new line in the input files as an input to the mapper. The "map" function
* tokenize the line, and for each token (word) emits (word,1) as the output. </p>
*/
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
/**
* <p>Reduce function receives all the values that has the same key as the input, and it output the key
* and the number of occurrences of the key as the output.</p>
*/
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.out.println("chapter3.WordCountWithTools <inDir> <outDir>");
ToolRunner.printGenericCommandUsage(System.out);
System.out.println("");
return -1;
}
String inputPath = args[0];
String outPath = args[1];
Job job = prepareJob(inputPath, outPath, getConf());
job.waitForCompletion(true);
return 0;
}
public Job prepareJob(String inputPath, String outPath,Configuration conf)
throws IOException {
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCountWithTools.class);
job.setMapperClass(TokenizerMapper.class);
// Uncomment this to
// job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outPath));
return job;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new WordCountWithTools(), args);
System.exit(res);
}
}
3. 运行测试用例(以Eclipse为例)
run as---junit test
另外一个例子
http://www.cnblogs.com/zimo-jing/p/8647113.html
https://www.cnblogs.com/zimo-jing/p/8650588.html