1 环境准备
1)成功搭建Hadoop-2.2.0开发环境
2)成功启动HBase,通过HBase Shell进行测试
3)使用MyEclipse作为开发工具
4)使用Maven构建项目
2 创建项目
这里我就不带大家如何创建项目了,细节可看HBase Java API 练习中的操作,我们也是使用HBase Java API 练习中的项目。
3 创建上传数据至HBase的类
3.1 WordCountUpLoadToHBase.class
package com.szh.hbase;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.util.Tool;
public class WordCountUpLoadToHBase extends Configured {
public static class WCHBaseMapper extends Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
StringTokenizer strs = new StringTokenizer(value.toString());
while(strs.hasMoreTokens()){
word.set(strs.nextToken());
context.write(new ImmutableBytesWritable(Bytes.toBytes(word.toString())), one);
}
}
}
public static class WCHBaseReducer extends TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable>{
public void reduce(ImmutableBytesWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
int sum = 0;
for(IntWritable val:values){
sum += val.get();
}
Put put = new Put(key.get());
put.add(Bytes.toBytes("content"),Bytes.toBytes("count"),Bytes.toBytes(sum+""));
context.write(key, put);
}
}
@SuppressWarnings("all")
public static void main(String[] args) throws MasterNotRunningException, ZooKeeperConnectionException, IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
String tableName = "wordcount";
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","szh");
conf.set("hbase.zookeeper.property.clientPort","2181");
HBaseAdmin admin = new HBaseAdmin(conf);
//如果表格存在就删除
if(admin.tableExists(tableName)){
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnDescriptor =new HColumnDescriptor("content");
tableDescriptor.addFamily(columnDescriptor);
admin.createTable(tableDescriptor);
Job job = new Job(conf,"upload to hbase");
job.setJarByClass(WordCountUpLoadToHBase.class);
job.setMapperClass(WCHBaseMapper.class);
TableMapReduceUtil.initTableReducerJob(tableName, WCHBaseReducer.class, job,null,null,null,null,false);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
FileInputFormat.addInputPaths(job, "hdfs://szh:9000/myhbase/hbase-test.txt");
System.exit(job.waitForCompletion(true)?0:1);
}
}
使用HBase Shell查看执行结果:
4 从HBase获取数据上传至HDFS
4.1 创建MRReadFormHbase.class
package com.szh.hbase;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MRReadFromHbase extends Configured {
public static class WCHBaseMapper extends TableMapper<Text, Text>{
@Override
public void map(ImmutableBytesWritable key,Result values,Context context) throws IOException, InterruptedException{
StringBuffer sb =new StringBuffer("");
for(Map.Entry<byte[], byte[]> value:values.getFamilyMap("content".getBytes()).entrySet()){
String str =new String(value.getValue());
if(str!=null){
sb.append(str);
}
context.write(new Text(key.get()), new Text(sb.toString()));
}
}
}
public static class WCHBaseReducer extends Reducer<Text, Text, Text, Text>{
private Text result =new Text();
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
for(Text val:values){
result.set(val);
context.write(key,result);
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
String tableName = "wordcount";
Configuration conf =HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "szh");
conf.set("hbase.zookeeper.property.clientPort", "2181");
Job job =new Job(conf,"read from hbase to hdfs");
job.setJarByClass(MRReadFromHbase.class);
job.setReducerClass(WCHBaseReducer.class);
TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), WCHBaseMapper.class, Text.class, Text.class, job);
FileOutputFormat.setOutputPath(job, new Path("hdfs://szh:9000/myhbase/out"));
System.exit(job.waitForCompletion(true)?0:1);
}
}
用hdfs命令查看结果: