业务背景:使用Spark 、streaming从kafka读取数据后写入HBase。kafkaDStream
是从kafka读到的一个批次的数据流。
遍历直接写入HBase
最最基础写法是直接遍历并一条一条写入hbase。
第一版的核心代码如下:
kafkaDStream.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
println("kafkaRDD get some data.")
rdd.foreachPartition(partitionRecords => {
// 获取HBase连接
val hbaseConnection: Connection = getHBaseConn()
partitionRecords.foreach(line => {
// 连接HBase表
val tableName: TableName = TableName.valueOf(ConfigLoader.getString("hbase.table.name"))
val table: Table = hbaseConnection.getTable(tableName)
// 将kafka的每一条消息解析为JSON格式数据
val jsonObj: Option[Any] = JSON.parseFull(line.value())
val uuid: String = UUID.randomUUID().toString
// println(line.value())
val data: Map[String, Any] = jsonObj.get.asInstanceOf[Map[String, Any]]
val a: String = data("a").asInstanceOf[String]
val b: String = data("b").asInstanceOf[String]
val c: String = data("c").asInstanceOf[String]
val put = new Put(Bytes.toBytes(uuid))
val tableColumnFamily = ConfigLoader.getString("hbase.table.column.family")
put.addColumn(Bytes.toBytes(tableColumnFamily), Bytes.toBytes("a"), Bytes.toBytes(a))
put.addColumn(Bytes.toBytes(tableColumnFamily), Bytes.toBytes("b"), Bytes.toBytes(b))
put.addColumn(Bytes.toBytes(tableColumnFamily), Bytes.toBytes("c"), Bytes.toBytes(c))
// 将数据写入HBase,若出错关闭table
Try(table.put(put)).getOrElse(table.close())
})
hbaseConnection.close()
})
} else {
println("kafkaRDD is Empty!!")
}
})
这种情况下实测消费数据入库速度约2600条每秒。(每条kafka消息约1KiB)
分批写入HBase
主要变化:创建一个List[Put],在foreach前创建一个计数器,不再每条数据提交写一次,而是计数器每10000时写一次。
具体多少条提交写一次,根据业务情况改变。
主要是开头和结尾提交时变化,伪代码如下:
var listPut = new ArrayList[Put]()
var count = 0
kafkaDStream.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
println("kafkaRDD get some data.")
rdd.foreachPartition(partitionRecords => {
// 获取HBase连接
val hbaseConnection: Connection = getHBaseConn()
partitionRecords.foreach(line => {
// 连接HBase表
val tableName: TableName = TableName.valueOf(ConfigLoader.getString("hbase.table.name"))
val table: Table = hbaseConnection.getTable(tableName)
// 将kafka的每一条消息解析为JSON格式数据
val jsonObj: Option[Any] = JSON.parseFull(line.value())
val uuid: String = UUID.randomUUID().toString
// println(line.value())
val data: Map[String, Any] = jsonObj.get.asInstanceOf[Map[String, Any]]
val a: String = data("a").asInstanceOf[String]
val b: String = data("b").asInstanceOf[String]
val c: String = data("c").asInstanceOf[String]
val put = new Put(Bytes.toBytes(uuid))
val tableColumnFamily = ConfigLoader.getString("hbase.table.column.family")
put.addColumn(Bytes.toBytes(tableColumnFamily), Bytes.toBytes("a"), Bytes.toBytes(a))
put.addColumn(Bytes.toBytes(tableColumnFamily), Bytes.toBytes("b"), Bytes.toBytes(b))
put.addColumn(Bytes.toBytes(tableColumnFamily), Bytes.toBytes("c"), Bytes.toBytes(c))
// 每次计数+1
count +=1
listPut.add(put)
if(count % 10000 == 0){
Try(table.put(listPut)).getOrElse(table.close())
listPut.clear()
count = 0
}
})
Try(table.put(listPut)).getOrElse(table.close())
hbaseConnection.close()
})
} else {
println("kafkaRDD is Empty!!")
}
})
这个可以提高入库速度,具体没有测。
使用原生批量写入方法saveAsHadoopDataset
val input = kafkaDStream.flatMap(line=>{
Some(line.value.toString)
})
input.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
println("kafkaRDD get some data.")
if(args(0).toInt == 0){
val spark1 = SparkSession.builder().getOrCreate()
val df = spark1.read.json(rdd)
df.createOrReplaceTempView("temp")
val ans = spark1.sql("select a,b,c from temp").rdd.map(x => {
(UUID.randomUUID.toString, x.getString(0), x.getString(1),x.getString(2),)
})
ans.map(line=>{
val put = new Put(Bytes.toBytes(line._1))
put.addColumn(Bytes.toBytes(tableColumnFamily), Bytes.toBytes("a"), Bytes.toBytes(line._2))
put.addColumn(Bytes.toBytes(tableColumnFamily), Bytes.toBytes("b"), Bytes.toBytes(line._3))
put.addColumn(Bytes.toBytes(tableColumnFamily), Bytes.toBytes("c"), Bytes.toBytes(line._4))
(new ImmutableBytesWritable, put)
}).saveAsHadoopDataset(jobConf)
} else {
println("接受到:"+rdd.count())
}
} else {
println("kafkaRDD is Empty!!")
}
})
这相当于调用RDD.saveAsHadoopDataset(jobConf)
,这就不需要自己去处理每多少条数据提交写一次了,后台使用直接写Hadoop File的方式。
实测速度提升到13500条数据每秒,速度是原来的5.1倍。
点:spark streaming、Hbase、写入速度调优、流计算
线:Spark
面:内存计算