前言
用spark streaming 的方式消费kafka里的数据
https://spark.apache.org/docs/2.3.4/streaming-kafka-0-10-integration.html
一、机器角色
node1 | node2 | node3 | node4 | |
---|---|---|---|---|
sparkMaster | * | |||
sparkWorker | * | * | * | |
kafka | 1 | 1 | 1 | |
zk | * | * | * |
二、准备kafka
1.部署
# 解压、配置
# node 1 解压配置好再分发到其他node2,3,4
tar -xf kafka_2.11-0.10.0.0.tgz
vi config/server.properties
# 配置如下属性
#每个节点id不通
broker.id=
# 存储路径不要用默认的/tmp
log.dirs=/var/bigdata/kafka
# zk可以指定kafka专用前缀
zookeeper.connect=node2:2181,node3:2181,node4:2181/kafka
#复制 node2,3,4
scp -r ./kafka_2.11-0.10.0.0/ node2:`pwd`
scp -r ./kafka_2.11-0.10.0.0/ node3:`pwd`
scp -r ./kafka_2.11-0.10.0.0/ node4:`pwd`
#node2
vi config/server.properties
broker.id=1
#node3
vi config/server.properties
broker.id=2
#node4
vi config/server.properties
broker.id=3
#node2,3,4
vi /etc/profile
export KAFKA_HOME=/opt/bigdata/kafka_2.11-0.10.0.0
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZOOKEEPER_HOME/bin:$HIVE_HOME/bin:$HBASE_HOME/bin:$KAFKA_HOME/bin
#刷新
source /etc/profile
2.启动
前提:zk集群已经启动
# node2,node3,node4
cd $KAFKA_HOME;
kafka-server-start.sh -daemon ./config/server.properties
# 验证启动成功zkCli.sh,看到几个broker.id= 1,2,3
[zk: localhost:2181(CONNECTED) 5] ls /kafka/brokers/ids
[1, 2, 3]
3.测试
在node1中 cli测试kafka的消息投递、消费等功能正常。
3.1创建 topic
# node1 kafka-topics.sh
kafka-topics.sh --zookeeper node2:2181,node3:2181,node4:2181/kafka \
--create --topic "ooxx" \
--partitions 3 --replication-factor 2;
3.2查看topic 列表
#node1 kafka-topics.sh
[root@node1 kafka_2.11-0.10.0.0]# kafka-topics.sh --zookeeper node2:2181,node3:2181,node4:2181/kafka --list;
ooxx
3.3查看topic 详情
分区、副本情况
#node1 kafka-topics.sh
[root@node1 kafka_2.11-0.10.0.0]# kafka-topics.sh --zookeeper node2:2181,node3:2181,node4:2181/kafka --describe --topic ooxx;
Topic:ooxx PartitionCount:3 ReplicationFactor:2 Configs:
Topic: ooxx Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: ooxx Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: ooxx Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2
4.投递/消费
4.1投递
在cli_1里投递消息,
关注key的设计,同一个key的消息,落到同一个分区,分区内有序,取决余业务需求。
#node1 kafka-console-producer.sh
#指定broker节点进行连接(旧版本是zk),可以任意一个节点也可以多个。
#随后阻塞等待输入
kafka-console-producer.sh --broker-list node4:9092 --topic ooxx
4.2消费
在cli_2里消费消息
#node4 kafka-console-consumer.sh
#指定broker节点连接,可以任意一个节点也可以多个。
kafka-console-consumer.sh --new-consumer --bootstrap-server node2:9092 --topic ooxx
#随后阻塞开始打印消费的消息内容,
#此后node1的cli_1 新输入消息,这里会即刻显示
#AUTO_OFFSET_RESET_CONFIG 自适应 必须参考 __consumer_offset_维护的
# 默认 latest 仅消费正在写入的消息,及 log-end-offset往后
# earliest 最近一次消费的offset开始继续消费,即 current-offset 往后,group第一次创建时为0
# consumer.seek最优先,覆盖以上两种
#--from-beginning 从头开始消费
为什么显示不了历史消息?
#node3 kafka-consumer-groups.sh
kafka-consumer-groups.sh --new-consumer --bootstrap-server node3:9092 --list;
kafka-consumer-groups.sh --new-consumer --bootstrap-server node3:9092 --describe --group 'console-consumer-34583';
4.3延时查看
显示如下按group区分的消费进度
Lag:还有多少消息未读取(Lag = log-end-offset - current-offset)
[图片上传失败...(image-4cbcd7-1677995977084)]
5.webUi
https://github.com/obsidiandynamics/kafdrop Kafka (version 0.11.0 or newer)
https://blog.csdn.net/rao991207823/article/details/123489799 国内开源
https://www.jb51.cc/kafka/4057705.html Kafka 0.8以上版本
三、spark Streaming with kafka
demo
package com.msb.bigdata.spark.streaming
import java.util
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}
object lesson05_spark_kafka_consumer{
def main(args: Array[String]): Unit = {
/**
(一)初始化配置:sparkConf 和 ConsumerConfig
配置目的:
1.限制每批次拉取kafka数据时不要太多,定量拉取。
2.手动提交offset,防止消费失败数据丢失。
3.数据第一次从头开始消费,后续重启后从中断地方的offset再继续消费。
*/
val conf: SparkConf = new SparkConf().setMaster("local[8]").setAppName("kafka01")
conf.set("spark.streaming.backpressure.enabled","true")
//运行时状态,每个job对每个分区拉几条,没配这个默认尽量多的拉取
//目前每次2,3个分区,每次拉到 6条。
conf.set("spark.streaming.kafka.maxRatePerPartition","2")
//起步状态,冷启动拉几条
// conf.set("spark.streaming.backpressure.initialRate","2")
conf.set("spark.streaming.stopGracefullyOnShutdown","true")
val ssc = new StreamingContext(conf,Duration(1000))
ssc.sparkContext.setLogLevel("ERROR")
//如何得到kakfa的DStream
val map: Map[String, Object] = Map[String, Object](
(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node4:9092"),
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]),
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]),
(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),
//earliest 按 CURRENT-OFFSET 特殊状态: group第一创建的时候,0
// latest 按 LOG-END-OFFSET
(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"), //需要手动维护offset 1,kafka 2,第三方
(ConsumerConfig.GROUP_ID_CONFIG, "BULA666")
// (ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"1")
)
/**
* 访问数据库
* 取曾经持久化的offset
*
*/
// val mapsql = Map[TopicPartition,Long](
// (new TopicPartition("from mysql topic",0),33),
// (new TopicPartition("from mysql topic",1),32)
// )
// kafka用于提交offset到kafka
// kafka提供的提交API,它内部包含的RDD提供了offset
val kafka: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](List("ooxx"), map)
// ConsumerStrategies.Subscribe[String, String](List("ooxx"), map,mapsql)
)
/**
(二)编写业务逻辑
* 业务实现:
* 1.kafka.map()将record转换成业务逻辑的元素:只提取出key,value,进行业务操作,如下dstream
* 或者
* 2.直接在kafka.foreachRDD回调里对rdd进行业务操作
* 注意:
* 第一个通过kafkautils创建的kafka: DStream 要先去转换一下,不能直接kafka.print(),
* 否则报错 ConsumerRecord Serialization stack:object not serializable
* 其实这个DStream就是 consumer@poll回来的records
*/
//业务代码编写处1
val dstream: DStream[(String, (String, String, Int, Long))] = kafka.map(
record => {
val t: String = record.topic()
val p: Int = record.partition()
val o: Long = record.offset()
val k: String = record.key()
val v: String = record.value()
(k, (v, t, p, o))
}
)
dstream.print()
//完成了业务代码后
//维护offset是为了什么,哪个时间点用起你维护的offset?:application重启的时候,driver重启的时候
//维护offset的另一个语义是什么:持久化
var ranges: Array[OffsetRange] = null;
//正确的,讲提交offset的代码放到dstream对象的接受函数里,那么未来在调度线程里,这个函数每个job有机会调用一次,伴随着,提交offset
kafka.foreachRDD(
rdd=>{
//driver端可以拿到offset
println(s"foreachRDD..fun.......")
ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//闭包,通过kafkautils得到的第一个DStream向上转型,提交offset
//1,维护/持久化offset到kafka
kafka.asInstanceOf[CanCommitOffsets].commitAsync(ranges,new OffsetCommitCallback {
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
if(offsets != null){
//此时提交offset成功
}
}
})
//2,维护/持久化到 mysql,offset和data 在一个事务中
//同步:
//业务代码编写处2
// val local: Array[(String, String)] = rdd.map(r=>(r.key(),r.value())).reduceByKey(_+_).collect()
/**
* 开启事务
* 提交数据
* 提交offset
* commit
*/
}
)
ssc.start()
ssc.awaitTermination()
}
}
注意
- 默认每次job执行,都会尽可能拉取所有增量数据,此外可以限制每次从kafka拉取的数据量。
val conf: SparkConf = new SparkConf().setMaster("local[8]").setAppName("kafka01")
conf.set("spark.streaming.backpressure.enabled","true")
//没配这个默认尽量多的拉取
conf.set("spark.streaming.kafka.maxRatePerPartition","2")
-
driver 会在每个批次的job成功跑完以后,都会去更新kafka 对应topic的每个partition的offset。
(此时会有失败重试导致部分数据重复消费的隐患)
-
怎么处理失败重试导致的数据重复消费的隐患?
解耦:- 不要输出到mysql,而是输出到文件,冥等性,重复输出到同一个文件 可以覆盖。
- 开启mysql事务,且repartition 趋向为接近为1,这样才避免部分失败(多partition并发操作mysql)造成的部分失败重试。(注意避免job延迟太大)
- 开启mysql事务,等collect数据到 driver时,才把data和offset写入mysql,此时在同一个事务中。
driver collect 注意
driver的内存是否足够接收数据。