kafka自动管理
enable.auto.commit=true
为了更好地理解这一章节中提到的内容,我们先来做一些铺垫。如果是使用spark-streaming-kafka-0-10,那么我们建议将enable.auto.commit设为false。这个配置只是在这个版本生效,enable.auto.commit如果设为true的话,那么意味着offsets会按照auto.commit.interval.ms中所配置的间隔来周期性自动提交到Kafka中
enable.auto.commit
Consumer 在commit offset时有两种模式:自动提交,手动提交。手动提交在前面已经说过。自动提交:是Kafka Consumer会在后台周期性的去commit。
默认值是true。
auto.commit.interval.ms
自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)
checkpoints
对Kafka Stream 执行checkpoint操作使得offset保存在checkpoint中,如果是应用挂掉的话,那么SparkStreamig应用功能可以从保存的offset中开始读取消息。但是,如果是对Spark Streaming应用进行升级的话,那么很抱歉,不能checkpoint的数据没法使用,所以这种机制并不可靠,特别是在严格的生产环境中,我们不推荐这种方式。
自动提交到kafka
新版消费者不再保存偏移量到zookeeper中,而是保存在Kafka的一个内部主题中“__consumer_offsets”,该主题默认有50个分区,每个分区3个副本,分区数量有参数offset.topic.num.partition设置。通过消费者组ID的哈希值和该参数取模的方式来确定某个消费者组已消费的偏移量保存到__consumer_offsets主题的哪个分区中
def createKafkaRDD(ssc: StreamingContext, config: Source) = {
var SparkDStream: InputDStream[ConsumerRecord[String, String]] = null
try {
SparkDStream = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> config.servers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> config.group,
"auto.offset.reset" -> config.offset
)
/*
"enable.auto.commit" -> config.getString("kafkaSource.enable.auto.commit"))*/
// val subscribeTopics = config.getStringList("kafkaSource.topics").toIterable
import scala.collection.JavaConversions._
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](config.topic.toList, kafkaParams)
)
kafkaStream
}
} catch {
case e: Throwable => {
throw new Exception("Couldn't init Spark stream processing", e)
}
}
SparkDStream
}
var inputDStream: InputDStream[ConsumerRecord[String, String]] = createKafkaRDD()
inputDStream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 更新 Offset 值
inputDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
存储到redis
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import scala.collection.JavaConverters._
import scala.util.Try
/**
* Created by chouyarn of BI on 2018/8/21
*/
object KafkaUtilsRedis {
/**
* 根据groupId保存offset
* @param ranges
* @param groupId
*/
def storeOffset(ranges: Array[OffsetRange], groupId: String): Unit = {
for (o <- ranges) {
val key = s"bi_kafka_offset_${groupId}_${o.topic}_${o.partition}"
val value = o.untilOffset
JedisUtil.set(key, value.toString)
}
}
/**
* 根据topic,groupid获取offset
* @param topics
* @param groupId
* @return
*/
def getOffset(topics: Array[String], groupId: String): (Map[TopicPartition, Long], Int) = {
val fromOffSets = scala.collection.mutable.Map[TopicPartition, Long]()
topics.foreach(topic => {
val keys = JedisUtil.getKeys(s"bi_kafka_offset_${groupId}_${topic}*")
if (!keys.isEmpty) {
keys.asScala.foreach(key => {
val offset = JedisUtil.get(key)
val partition = Try(key.split(s"bi_kafka_offset_${groupId}_${topic}_").apply(1)).getOrElse("0")
fromOffSets.put(new TopicPartition(topic, partition.toInt), offset.toLong)
})
}
})
if (fromOffSets.isEmpty) {
(fromOffSets.toMap, 0)
} else {
(fromOffSets.toMap, 1)
}
}
/**
* 创建InputDStream,如果auto.offset.reset为latest则从redis读取
* @param ssc
* @param topic
* @param kafkaParams
* @return
*/
def createStreamingContextRedis(ssc: StreamingContext, topic: Array[String],
kafkaParams: Map[String, Object]): InputDStream[ConsumerRecord[String, String]] = {
var kafkaStreams: InputDStream[ConsumerRecord[String, String]] = null
val groupId = kafkaParams.get("group.id").get
val (fromOffSet, flag) = getOffset(topic, groupId.toString)
val offsetReset = kafkaParams.get("auto.offset.reset").get
if (flag == 1 && offsetReset.equals("latest")) {
kafkaStreams = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(topic, kafkaParams, fromOffSet))
} else {
kafkaStreams = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(topic, kafkaParams))
}
kafkaStreams
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("offSet Redis").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(60))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"group.id" -> "binlog.test.rpt_test_1min",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean),
"session.timeout.ms" -> "20000",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer]
)
val topic = Array("binlog.test.rpt_test", "binlog.test.hbase_test", "binlog.test.offset_test")
val groupId = "binlog.test.rpt_test_1min"
val lines = createStreamingContextRedis(ssc, topic, kafkaParams)
lines.foreachRDD(rdds => {
if (!rdds.isEmpty()) {
println("##################:" + rdds.count())
}
storeOffset(rdds.asInstanceOf[HasOffsetRanges].offsetRanges, groupId)
})
ssc.start()
ssc.awaitTermination()
}
}
import java.util
import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.StringDeserializer
import redis.clients.jedis.{HostAndPort, JedisCluster, JedisPool, JedisPoolConfig}
object JedisUtil {
private val config = ConfigFactory.load("realtime-etl.conf")
private val redisHosts: String = config.getString("redis.server")
private val port: Int = config.getInt("redis.port")
private val hostAndPortsSet: java.util.Set[HostAndPort] = new util.HashSet[HostAndPort]()
redisHosts.split(",").foreach(host => {
hostAndPortsSet.add(new HostAndPort(host, port))
})
private val jedisConf: JedisPoolConfig = new JedisPoolConfig()
jedisConf.setMaxTotal(5000)
jedisConf.setMaxWaitMillis(50000)
jedisConf.setMaxIdle(300)
jedisConf.setTestOnBorrow(true)
jedisConf.setTestOnReturn(true)
jedisConf.setTestWhileIdle(true)
jedisConf.setMinEvictableIdleTimeMillis(60000l)
jedisConf.setTimeBetweenEvictionRunsMillis(3000l)
jedisConf.setNumTestsPerEvictionRun(-1)
lazy val redis = new JedisCluster(hostAndPortsSet, jedisConf)
def get(key: String): String = {
try {
redis.get(key)
} catch {
case e: Exception => e.printStackTrace()
null
}
}
def set(key: String, value: String) = {
try {
redis.set(key, value)
} catch {
case e: Exception => {
e.printStackTrace()
}
}
}
def hmset(key: String, map: java.util.Map[String, String]): Unit = {
// val redis=pool.getResource
try {
redis.hmset(key, map)
}catch {
case e:Exception => e.printStackTrace()
}
}
def hset(key: String, field: String, value: String): Unit = {
// val redis=pool.getResource
try {
redis.hset(key, field, value)
} catch {
case e: Exception => {
e.printStackTrace()
}
}
}
def hget(key: String, field: String): String = {
try {
redis.hget(key, field)
}catch {
case e:Exception => e.printStackTrace()
null
}
}
def hgetAll(key: String): java.util.Map[String, String] = {
try {
redis.hgetAll(key)
} catch {
case e: Exception => e.printStackTrace()
null
}
}
}
存储ZK
在这个方案中,Spark Streaming任务在启动时会去Zookeeper中读取每个分区的offsets。如果有新的分区出现,那么他的offset将会设置在最开始的位置。在每批数据处理完之后,用户需要可以选择存储已处理数据的一个offset或者最后一个offset。
注意: Kafka offset在ZooKeeper中的存储路径为/consumers/[groupId]/offsets/topic/[partitionId], 存储的值为offset
此外,新消费者将使用跟旧的Kafka 消费者API一样的格式将offset保存在ZooKeeper中。因此,任何追踪或监控Zookeeper中Kafka Offset的工具仍然生效的。
val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkUrl, sessionTimeout, connectionTimeout)
val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)
Method for retrieving the last offsets stored in ZooKeeper of the consumer group and topic list.
def readOffsets(topics: Seq[String], groupId:String):
Map[TopicPartition, Long] = {
val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
val partitionMap = zkUtils.getPartitionsForTopics(topics)
// /consumers/<groupId>/offsets/<topic>/
partitionMap.foreach(topicPartitions => {
val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
topicPartitions._2.foreach(partition => {
val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
try {
val offsetStatTuple = zkUtils.readData(offsetPath)
if (offsetStatTuple != null) {
LOGGER.info("retrieving offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)
topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)),
offsetStatTuple._1.toLong)
}
} catch {
case e: Exception =>
LOGGER.warn("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*)
topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L)
}
})
})
topicPartOffsetMap.toMap
}
def persistOffsets(offsets: Seq[OffsetRange], groupId: String, storeEndOffset: Boolean): Unit = {
offsets.foreach(or => {
val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic);
val acls = new ListBuffer[ACL]()
val acl = new ACL
acl.setId(ANYONE_ID_UNSAFE)
acl.setPerms(PERMISSIONS_ALL)
acls += acl
val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition;
val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/"
+ or.partition, offsetVal + "", JavaConversions.bufferAsJavaList(acls))
LOGGER.debug("persisting offset details - topic: {}, partition: {}, offset: {}, node path: {}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)
})
}
使用zk获取的offset来初始化direct dstream
val inputDStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaParams, fromOffsets))