最近要进行大数据相关的任务,牛刀小试,先将最基础的HBase搭建并实践起来。本文借用docker,快速搭建HBase基础环境,并使用go结合thrift调用相关API进行数据操作。
0、方便起见,需要一个docker环境,安装配置略过不表。
1、拉取image
docker pull harisekhon/hbase
2、修改entrypoint.sh,启用thrift2
/hbase/bin/hbase-daemon.sh start thrift2
3、挂载修改后的启动文件,启动hbase,并暴露thrift2端口
docker run -d -p 9090:9090 -v `pwd`/entrypoint.sh:/entrypoint.sh --name hbase harisekhon/hbase
4、运行hbase shell,建表
docker exec -it hbase bash
hbase shell
// 建表
create 'elvizlai_test',{NAME => 'f1', VERSIONS => 2},{NAME => 'f2', VERSIONS => 2}
// 删除表
disable 'elvizlai_test'
drop 'elvizlai_test
4、thrift for mac安装,感谢brew
brew install thrift
5、下载hbase thrift2对应的hbase.thrift文件,生成go package
wget https://raw.githubusercontent.com/apache/hbase/master/hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift2/hbase.thrift
thrift -r -out . --gen go *.thrift
6、撰写main.go一探究竟吧
package main
import (
"encoding/binary"
"fmt"
"hbase"
"reflect"
"strconv"
"time"
"git.apache.org/thrift.git/lib/go/thrift"
)
const HOST = "127.0.0.1"
const PORT = "9090"
const TESTRECORD = 10
func main() {
startTime := currentTimeMillis()
logformatstr_ := "----%s\n"
logformatstr := "----%s 用时:%d-%d=%d毫秒\n\n"
logformattitle := "建立连接"
table := "elvizlai_test"
rowkey := "1"
family := "f1"
protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()
transport, err := thrift.NewTSocket(HOST + ":" + PORT)
if err != nil {
panic(err)
}
client := hbase.NewTHBaseServiceClientFactory(transport, protocolFactory)
if err := transport.Open(); err != nil {
panic(err)
}
tmpendTime := currentTimeMillis()
fmt.Printf(logformatstr, logformattitle, tmpendTime, startTime, (tmpendTime - startTime))
defer transport.Close()
//--------------Exists
logformattitle = "调用Exists方法"
fmt.Printf(logformatstr_, logformattitle)
tmpstartTime := currentTimeMillis()
isexists, err := client.Exists([]byte(table), &hbase.TGet{Row: []byte(rowkey)})
fmt.Printf("rowkey{%s} in table{%s} Exists:%t\n", rowkey, table, isexists)
tmpendTime = currentTimeMillis()
fmt.Printf(logformatstr, logformattitle, tmpendTime, tmpstartTime, (tmpendTime - tmpstartTime))
//--------------Put
logformattitle = "调用Put方法写数据"
fmt.Printf(logformatstr_, logformattitle)
tmpstartTime = currentTimeMillis()
cvarr := []*hbase.TColumnValue{
{
Family: []byte(family),
Qualifier: []byte("idoall.org"),
Value: []byte("welcome idoall.org"),
},
}
temptput := hbase.TPut{Row: []byte(rowkey), ColumnValues: cvarr}
err = client.Put([]byte(table), &temptput)
if err != nil {
fmt.Printf("Put err:%s\n", err)
} else {
fmt.Println("Put done")
}
tmpendTime = currentTimeMillis()
fmt.Printf(logformatstr, logformattitle, tmpendTime, tmpstartTime, (tmpendTime - tmpstartTime))
//------------Get---------------
logformattitle = "调用Get方法获取新增加的数据"
fmt.Printf(logformatstr_, logformattitle)
tmpstartTime = currentTimeMillis()
result, err := client.Get([]byte(table), &hbase.TGet{Row: []byte(rowkey)})
if err != nil {
fmt.Printf("Get err:%s\n", err)
} else {
fmt.Println("Rowkey:" + string(result.Row))
for _, cv := range result.ColumnValues {
printscruct(cv)
}
}
tmpendTime = currentTimeMillis()
fmt.Printf(logformatstr, logformattitle, tmpendTime, tmpstartTime, (tmpendTime - tmpstartTime))
//--------------put update
logformattitle = "调用Put update方法'修改'数据"
fmt.Printf(logformatstr_, logformattitle)
tmpstartTime = currentTimeMillis()
cvarr = []*hbase.TColumnValue{
{
Family: []byte(family),
Qualifier: []byte("idoall.org"),
Value: []byte("welcome idoall.org---update"),
},
}
temptput = hbase.TPut{Row: []byte(rowkey), ColumnValues: cvarr}
err = client.Put([]byte(table), &temptput)
if err != nil {
fmt.Printf("Put update err:%s\n", err)
} else {
fmt.Println("Put update done")
}
tmpendTime = currentTimeMillis()
fmt.Printf(logformatstr, logformattitle, tmpendTime, tmpstartTime, (tmpendTime - tmpstartTime))
//------------Get update---------------
logformattitle = "调用Get方法获取'修改'后的数据"
fmt.Printf(logformatstr_, logformattitle)
tmpstartTime = currentTimeMillis()
//
result, err = (client.Get([]byte(table), &hbase.TGet{Row: []byte(rowkey)}))
if err != nil {
fmt.Printf("Get update err:%s\n", err)
} else {
fmt.Println("update Rowkey:" + string(result.Row))
for _, cv := range result.ColumnValues {
printscruct(cv)
}
}
tmpendTime = currentTimeMillis()
fmt.Printf(logformatstr, logformattitle, tmpendTime, tmpstartTime, (tmpendTime - tmpstartTime))
//------------DeleteSingle------------
logformattitle = "调用DeleteSingle方法删除一条数据"
fmt.Printf(logformatstr_, logformattitle)
tmpstartTime = currentTimeMillis()
tdelete := hbase.TDelete{Row: []byte(rowkey)}
err = client.DeleteSingle([]byte(table), &tdelete)
if err != nil {
fmt.Printf("DeleteSingle err:%s\n", err)
} else {
fmt.Print("DeleteSingel done\n")
}
tmpendTime = currentTimeMillis()
fmt.Printf(logformatstr, logformattitle, tmpendTime, tmpstartTime, (tmpendTime - tmpstartTime))
//-------------PutMultiple----------------
logformattitle = "调用PutMultiple方法添加" + strconv.Itoa(TESTRECORD) + "条数据"
fmt.Printf(logformatstr_, logformattitle)
tmpstartTime = currentTimeMillis()
var tputArr []*hbase.TPut
for i := 0; i < TESTRECORD; i++ {
putrowkey := strconv.Itoa(i)
tputArr = append(tputArr, &hbase.TPut{
Row: []byte(putrowkey),
ColumnValues: []*hbase.TColumnValue{
{
Family: []byte(family),
Qualifier: []byte("idoall.org"),
Value: []byte(time.Now().String()),
},
}})
}
err = client.PutMultiple([]byte(table), tputArr)
if err != nil {
fmt.Printf("PutMultiple err:%s\n", err)
} else {
fmt.Print("PutMultiple done\n")
}
tmpendTime = currentTimeMillis()
fmt.Printf(logformatstr, logformattitle, tmpendTime, tmpstartTime, (tmpendTime - tmpstartTime))
//------------------GetMultiple-----------------------------
logformattitle = "调用GetMultiple方法获取" + strconv.Itoa(TESTRECORD) + "数据"
fmt.Printf(logformatstr_, logformattitle)
tmpstartTime = currentTimeMillis()
//
var tgets []*hbase.TGet
for i := 0; i < TESTRECORD; i++ {
putrowkey := strconv.Itoa(i)
tgets = append(tgets, &hbase.TGet{
Row: []byte(putrowkey)})
}
results, err := client.GetMultiple([]byte(table), tgets)
if err != nil {
fmt.Printf("GetMultiple err:%s", err)
} else {
fmt.Printf("GetMultiple Count:%d\n", len(results))
for _, k := range results {
fmt.Println("Rowkey:" + string(k.Row))
for _, cv := range k.ColumnValues {
printscruct(cv)
}
}
}
tmpendTime = currentTimeMillis()
fmt.Printf(logformatstr, logformattitle, tmpendTime, tmpstartTime, (tmpendTime - tmpstartTime))
//-------------------TMutation
//TMutation包含一个TGet一个TPut,就不做测试了
//可以和MutateRow结合使用
//
//-------------------OpenScanner
logformattitle = "调用OpenScanner方法"
fmt.Printf(logformatstr_, logformattitle)
tmpstartTime = currentTimeMillis()
startrow := make([]byte, 4)
binary.LittleEndian.PutUint32(startrow, 1)
stoprow := make([]byte, 4)
binary.LittleEndian.PutUint32(stoprow, 10)
scanresultnum, err := client.OpenScanner([]byte(table), &hbase.TScan{
StartRow: startrow,
StopRow: stoprow,
// FilterString: []byte("RowFilter(=, 'regexstring:00[1-3]00')"),
// FilterString: []byte("PrefixFilter('1407658495588-')"),
Columns: []*hbase.TColumn{
{
Family: []byte(family),
Qualifier: []byte("idoall.org"),
},
},
})
if err != nil {
fmt.Printf("OpenScanner err:%s\n", err)
} else {
fmt.Printf("OpenScanner %d done\n", scanresultnum)
scanresult, err := client.GetScannerRows(scanresultnum, 100)
if err != nil {
fmt.Printf("GetScannerRows err:%s\n", err)
} else {
fmt.Printf("GetScannerRows %d done\n", len(scanresult))
for _, k := range scanresult {
fmt.Println("scan Rowkey:" + string(k.Row))
for _, cv := range k.ColumnValues {
printscruct(cv)
}
}
}
}
tmpendTime = currentTimeMillis()
fmt.Printf(logformatstr, logformattitle, tmpendTime, tmpstartTime, (tmpendTime - tmpstartTime))
//--closescanner
logformattitle = "调用CloseScanner方法"
fmt.Printf(logformatstr_, logformattitle)
tmpstartTime = currentTimeMillis()
err = client.CloseScanner(scanresultnum)
if err != nil {
fmt.Printf("CloseScanner err:%s\n", err)
}
tmpendTime = currentTimeMillis()
fmt.Printf(logformatstr, logformattitle, tmpendTime, tmpstartTime, (tmpendTime - tmpstartTime))
//-------------------GetScannerResults
logformattitle = "调用GetScannerResults方法"
fmt.Printf(logformatstr_, logformattitle)
tmpstartTime = currentTimeMillis() //
gsr, err := client.GetScannerResults([]byte(table), &hbase.TScan{
StartRow: startrow,
StopRow: stoprow,
// FilterString: []byte("RowFilter(=, 'regexstring:00[1-3]00')"),
// FilterString: []byte("PrefixFilter('1407658495588-')"),
Columns: []*hbase.TColumn{
{
Family: []byte(family),
Qualifier: []byte("idoall.org"),
},
}}, 100)
if err != nil {
fmt.Printf("GetScannerResults err:%s\n", err)
} else {
fmt.Printf("GetScannerResults %d done\n", len(gsr))
for _, k := range gsr {
fmt.Println("scan Rowkey:" + string(k.Row))
for _, cv := range k.ColumnValues {
printscruct(cv)
}
}
}
tmpendTime = currentTimeMillis()
fmt.Printf(logformatstr, logformattitle, tmpendTime, tmpstartTime, (tmpendTime - tmpstartTime))
//---------------DeleteMultiple--------------
logformattitle = "调用DeleteMultiple方法删除" + strconv.Itoa(TESTRECORD) + "数据"
fmt.Printf(logformatstr_, logformattitle)
tmpstartTime = currentTimeMillis()
var tdelArr []*hbase.TDelete
for i := 0; i < TESTRECORD; i++ {
putrowkey := strconv.Itoa(i)
tdelArr = append(tdelArr, &hbase.TDelete{
Row: []byte(putrowkey)})
}
r, err := client.DeleteMultiple([]byte(table), tdelArr)
if err != nil {
fmt.Printf("DeleteMultiple err:%s\n", err)
} else {
fmt.Printf("DeleteMultiple %d done\n", TESTRECORD)
fmt.Println(r)
}
tmpendTime = currentTimeMillis()
fmt.Printf(logformatstr, logformattitle, tmpendTime, tmpstartTime, (tmpendTime - tmpstartTime))
endTime := currentTimeMillis()
fmt.Printf("\nGolang调用总计用时:%d-%d=%d毫秒\n", endTime, startTime, (endTime - startTime))
}
func currentTimeMillis() int64 {
return time.Now().UnixNano() / 1000000
}
func printscruct(cv interface{}) {
switch reflect.ValueOf(cv).Interface().(type) {
case *hbase.TColumnValue:
s := reflect.ValueOf(cv).Elem()
typeOfT := s.Type()
//获取Thrift2中struct的field
for i := 0; i < s.NumField(); i++ {
f := s.Field(i)
fileldformatstr := "\t%d: %s(%s)= %v\n"
switch f.Interface().(type) {
case []uint8:
fmt.Printf(fileldformatstr, i, typeOfT.Field(i).Name, f.Type(), string(f.Interface().([]uint8)))
case *int64:
var tempint64 int64
if f.Interface().(*int64) == nil {
tempint64 = 0
} else {
tempint64 = *f.Interface().(*int64)
}
fmt.Printf(fileldformatstr, i, typeOfT.Field(i).Name, f.Type(), tempint64)
default:
fmt.Print("I don't know")
}
}
default:
fmt.Print("I don't know")
fmt.Print(reflect.ValueOf(cv))
}
}