之前一直将offset存到zookeeper,由于streaming程序会对zk有大量的读取操作,故将offset存到zk不太好,现已将offset都改到hbase中
kafka版本:0.10.2.0
spark版本:2.0
hbase表结构:groupid名字作为表名,topic名字作为rowkey,列族为info,分区号作为列名
kafka_offset:groupid | info:0 | info:1 | info:2 |
---|---|---|---|
rowkey(topicName) | 10000 | 10000 | 10000 |
- 改完后streaming程序中的代码调用
//先初始化hbase连接对象
HbaseUtil.setConf("zk address", "zk port")
//hbase中存offset的命名空间和表名
val offsetTbName = "kafka_offset:groupId"
HbaseUtil.createTable(offsetTbName, "info")//hbase中不存这个表就创建
//去hbase中获取topic partition范围,hbase中不存在也没关系(第一次用这个groupid的时候)
val fromOffsets: Map[TopicPartition, Long] = OffsetUtil.getFromOffsets
/**
* param offsets :
* offsets to begin at on initial startup. If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
* 引用源码的注释,意思大概就是如果第一次获取不到topicPartition就用auto.offset.reset这个配置来决定是从earliest还是latest开始读取kafka数据,也就是说不用担心fromOffset第一次取为空的情况
*/
//创建kafkaStream
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,//StreamingContext
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaParams, fromOffsets)
)
kafkaStream.foreachRDD(rdd => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//忽略逻辑代码
pass...
//提交offset到hbase
OffsetUtil.saveOffsetToHbase(offsetRanges, "groupId")
})
HbaseUtil.close()
- 附上上面用到的HbaseUtil.scala和OffsetUtil.scala
- HbaseUtil.scala
object HbaseUtil {
var conf: Configuration = _
//线程池
lazy val connection: Connection = ConnectionFactory.createConnection(conf)
lazy val admin: Admin = connection.getAdmin
/**
* hbase conf
*
* @param quorum hbase的zk地址
* @param port zk端口2181
* @return
*/
def setConf(quorum: String, port: String): Unit = {
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", quorum)
conf.set("hbase.zookeeper.property.clientPort", port)
this.conf = conf
}
/**
* 如果不存在就创建表
* @param tableName 命名空间:表名
* @param columnFamily 列族
*/
def createTable(tableName: String, columnFamily: String): Unit = {
val tbName = TableName.valueOf(tableName)
if (!admin.tableExists(tbName)) {
val htableDescriptor = new HTableDescriptor(tbName)
val hcolumnDescriptor = new HColumnDescriptor(columnFamily)
htableDescriptor.addFamily(hcolumnDescriptor)
admin.createTable(htableDescriptor)
}
}
/**
* 获取hbase单元格内容
* @param tableName 命名空间:表名
* @param rowKey rowkey
* @return 返回单元格组成的List
*/
def getCell(tableName: String, rowKey: String): mutable.Buffer[Cell] = {
val get = new Get(Bytes.toBytes(rowKey))
val table = connection.getTable(TableName.valueOf(tableName))
val result: Result = table.get(get)
import scala.collection.JavaConverters._
result.listCells().asScala
}
/**
* 单条插入
* @param tableName 命名空间:表名
* @param rowKey rowkey
* @param family 列族
* @param qualifier column列
* @param value 列值
*/
def singlePut(tableName: String, rowKey: String, family: String, qualifier: String, value: String): Unit = {
//单个插入
val put: Put = new Put(Bytes.toBytes(rowKey)) //参数是行健
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value))
//获得表对象
val table: Table = connection.getTable(TableName.valueOf(tableName))
table.put(put)
table.close()
}
def close(): Unit = {
admin.close()
connection.close()
}
- OffsetUtil.scala
object OffsetUtil {
//从hbase中获取offset
def getFromOffsets: Map[TopicPartition, Long] ={
var fromOffsets: Map[TopicPartition, Long] = Map()
AnalysisParam.topicSet.foreach(topic => {
val get = new Get(Bytes.toBytes(topic))
val table: Table = HbaseUtil.connection.getTable(TableName.valueOf(s"kafka_offset:groupId"))
if (table.exists(get)) {
val cells = HbaseUtil.getCell(s"kafka_offset:groupId", topic)
cells.foreach(cell => {
val partition = Bytes.toString(CellUtil.cloneQualifier(cell))
val offset = Bytes.toString(CellUtil.cloneValue(cell))
val tp = new TopicPartition(topic, partition.toInt)
fromOffsets += (tp -> offset.toLong)
})
}
})
fromOffsets
}
//将offset存到hbase
def saveOffsetToHbase(offsetRanges:Array[OffsetRange],groupId:String): Unit ={
offsetRanges.foreach(o => {
val topic = o.topic
val partition = o.partition
val offset = o.fromOffset
HbaseUtil.singlePut(s"kafka_offset:$groupId", topic, "info", partition.toString, offset.toString)
})
}
}