大数据学习之(十八)spark streaming kafka

前言

用spark streaming 的方式消费kafka里的数据

https://spark.apache.org/docs/2.3.4/streaming-kafka-0-10-integration.html

一、机器角色

node1 node2 node3 node4
sparkMaster *
sparkWorker * * *
kafka 1 1 1
zk * * *

二、准备kafka

1.部署

# 解压、配置
# node 1 解压配置好再分发到其他node2,3,4
tar -xf kafka_2.11-0.10.0.0.tgz
vi config/server.properties
# 配置如下属性
#每个节点id不通
broker.id=
# 存储路径不要用默认的/tmp
log.dirs=/var/bigdata/kafka
# zk可以指定kafka专用前缀
zookeeper.connect=node2:2181,node3:2181,node4:2181/kafka

#复制 node2,3,4
scp -r ./kafka_2.11-0.10.0.0/ node2:`pwd`
scp -r ./kafka_2.11-0.10.0.0/ node3:`pwd`
scp -r ./kafka_2.11-0.10.0.0/ node4:`pwd`

#node2
vi config/server.properties
broker.id=1
#node3
vi config/server.properties
broker.id=2
#node4
vi config/server.properties
broker.id=3

#node2,3,4 
vi /etc/profile
export KAFKA_HOME=/opt/bigdata/kafka_2.11-0.10.0.0
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZOOKEEPER_HOME/bin:$HIVE_HOME/bin:$HBASE_HOME/bin:$KAFKA_HOME/bin
#刷新
source  /etc/profile


2.启动

前提:zk集群已经启动

# node2,node3,node4
cd $KAFKA_HOME;
kafka-server-start.sh -daemon ./config/server.properties
# 验证启动成功zkCli.sh,看到几个broker.id= 1,2,3
[zk: localhost:2181(CONNECTED) 5] ls /kafka/brokers/ids
[1, 2, 3]

3.测试

node1中 cli测试kafka的消息投递、消费等功能正常。

3.1创建 topic

# node1 kafka-topics.sh
kafka-topics.sh   --zookeeper node2:2181,node3:2181,node4:2181/kafka  \
--create --topic "ooxx"  \
--partitions 3 --replication-factor  2;

3.2查看topic 列表

#node1 kafka-topics.sh
[root@node1 kafka_2.11-0.10.0.0]# kafka-topics.sh   --zookeeper node2:2181,node3:2181,node4:2181/kafka  --list;
ooxx

3.3查看topic 详情

分区、副本情况

#node1 kafka-topics.sh
[root@node1 kafka_2.11-0.10.0.0]# kafka-topics.sh   --zookeeper node2:2181,node3:2181,node4:2181/kafka  --describe --topic ooxx;
Topic:ooxx  PartitionCount:3    ReplicationFactor:2 Configs:
    Topic: ooxx Partition: 0    Leader: 1   Replicas: 1,3   Isr: 1,3
    Topic: ooxx Partition: 1    Leader: 2   Replicas: 2,1   Isr: 2,1
    Topic: ooxx Partition: 2    Leader: 3   Replicas: 3,2   Isr: 3,2

4.投递/消费

4.1投递

在cli_1里投递消息,

关注key的设计,同一个key的消息,落到同一个分区,分区内有序,取决余业务需求。

#node1 kafka-console-producer.sh
#指定broker节点进行连接(旧版本是zk),可以任意一个节点也可以多个。
#随后阻塞等待输入
kafka-console-producer.sh --broker-list  node4:9092   --topic  ooxx

4.2消费

在cli_2里消费消息

#node4 kafka-console-consumer.sh
#指定broker节点连接,可以任意一个节点也可以多个。
kafka-console-consumer.sh   --new-consumer --bootstrap-server  node2:9092  --topic  ooxx
#随后阻塞开始打印消费的消息内容,
#此后node1的cli_1 新输入消息,这里会即刻显示
#AUTO_OFFSET_RESET_CONFIG 自适应  必须参考  __consumer_offset_维护的
#   默认 latest 仅消费正在写入的消息,及 log-end-offset往后
#   earliest 最近一次消费的offset开始继续消费,即 current-offset 往后,group第一次创建时为0
#   consumer.seek最优先,覆盖以上两种 
#--from-beginning 从头开始消费

为什么显示不了历史消息?

#node3 kafka-consumer-groups.sh
kafka-consumer-groups.sh    --new-consumer --bootstrap-server node3:9092  --list;
kafka-consumer-groups.sh    --new-consumer --bootstrap-server node3:9092  --describe --group 'console-consumer-34583';


4.3延时查看

显示如下按group区分的消费进度

Lag:还有多少消息未读取(Lag = log-end-offset - current-offset)

[图片上传失败...(image-4cbcd7-1677995977084)]

5.webUi

https://github.com/obsidiandynamics/kafdrop Kafka (version 0.11.0 or newer)

https://blog.csdn.net/rao991207823/article/details/123489799 国内开源

https://www.jb51.cc/kafka/4057705.html Kafka 0.8以上版本

三、spark Streaming with kafka

demo

spark_stream_kafka_1.png
package com.msb.bigdata.spark.streaming

import java.util

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}

object lesson05_spark_kafka_consumer{


  def main(args: Array[String]): Unit = {
    /**
    (一)初始化配置:sparkConf 和 ConsumerConfig
      配置目的:
      1.限制每批次拉取kafka数据时不要太多,定量拉取。
      2.手动提交offset,防止消费失败数据丢失。
      3.数据第一次从头开始消费,后续重启后从中断地方的offset再继续消费。
     */
    val conf: SparkConf = new SparkConf().setMaster("local[8]").setAppName("kafka01")
    conf.set("spark.streaming.backpressure.enabled","true")
    //运行时状态,每个job对每个分区拉几条,没配这个默认尽量多的拉取
    //目前每次2,3个分区,每次拉到 6条。
    conf.set("spark.streaming.kafka.maxRatePerPartition","2")
    //起步状态,冷启动拉几条
//    conf.set("spark.streaming.backpressure.initialRate","2")
    conf.set("spark.streaming.stopGracefullyOnShutdown","true")

    val ssc = new StreamingContext(conf,Duration(1000))

    ssc.sparkContext.setLogLevel("ERROR")
    //如何得到kakfa的DStream
    val map: Map[String, Object] = Map[String, Object](
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node4:9092"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]),
      (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),
      //earliest    按   CURRENT-OFFSET   特殊状态:  group第一创建的时候,0
      // latest     按   LOG-END-OFFSET
      (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"),  //需要手动维护offset 1,kafka    2,第三方
      (ConsumerConfig.GROUP_ID_CONFIG, "BULA666")
//      (ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"1")
    )

    /**
     * 访问数据库
     * 取曾经持久化的offset
     *
     */
//    val mapsql = Map[TopicPartition,Long](
//      (new TopicPartition("from mysql topic",0),33),
//      (new TopicPartition("from mysql topic",1),32)
//    )

//    kafka用于提交offset到kafka
//    kafka提供的提交API,它内部包含的RDD提供了offset
    val kafka: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](List("ooxx"), map)

//      ConsumerStrategies.Subscribe[String, String](List("ooxx"), map,mapsql)
    )

    /**
    (二)编写业务逻辑
     * 业务实现:
     * 1.kafka.map()将record转换成业务逻辑的元素:只提取出key,value,进行业务操作,如下dstream
     * 或者
     * 2.直接在kafka.foreachRDD回调里对rdd进行业务操作
     * 注意:
     * 第一个通过kafkautils创建的kafka: DStream 要先去转换一下,不能直接kafka.print(),
     * 否则报错 ConsumerRecord Serialization stack:object not serializable
     * 其实这个DStream就是 consumer@poll回来的records
     */
      //业务代码编写处1
    val dstream: DStream[(String, (String, String, Int, Long))] = kafka.map(
      record => {
        val t: String = record.topic()
        val p: Int = record.partition()
        val o: Long = record.offset()
        val k: String = record.key()
        val v: String = record.value()


        (k, (v, t, p, o))
      }
    )
    dstream.print()

    //完成了业务代码后

    //维护offset是为了什么,哪个时间点用起你维护的offset?:application重启的时候,driver重启的时候
    //维护offset的另一个语义是什么:持久化
    var ranges: Array[OffsetRange] = null;
    //正确的,讲提交offset的代码放到dstream对象的接受函数里,那么未来在调度线程里,这个函数每个job有机会调用一次,伴随着,提交offset
    kafka.foreachRDD(
      rdd=>{
        //driver端可以拿到offset
        println(s"foreachRDD..fun.......")
        ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        //闭包,通过kafkautils得到的第一个DStream向上转型,提交offset
        //1,维护/持久化offset到kafka
        kafka.asInstanceOf[CanCommitOffsets].commitAsync(ranges,new OffsetCommitCallback {
          override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
              if(offsets != null){
                  //此时提交offset成功
              }
          }
        })
        //2,维护/持久化到  mysql,offset和data 在一个事务中
        //同步:
        //业务代码编写处2
        // val local: Array[(String, String)] = rdd.map(r=>(r.key(),r.value())).reduceByKey(_+_).collect()

        /**
        
         * 开启事务
         * 提交数据
         * 提交offset
         * commit
         */
      }
    )

    ssc.start()

    ssc.awaitTermination()
  }
}

注意

  1. 默认每次job执行,都会尽可能拉取所有增量数据,此外可以限制每次从kafka拉取的数据量。
val conf: SparkConf = new SparkConf().setMaster("local[8]").setAppName("kafka01")
conf.set("spark.streaming.backpressure.enabled","true")
//没配这个默认尽量多的拉取
conf.set("spark.streaming.kafka.maxRatePerPartition","2")
  1. driver 会在每个批次的job成功跑完以后,都会去更新kafka 对应topic的每个partition的offset。

    (此时会有失败重试导致部分数据重复消费的隐患

  2. 怎么处理失败重试导致的数据重复消费的隐患
    解耦:

    1. 不要输出到mysql,而是输出到文件,冥等性,重复输出到同一个文件 可以覆盖。
    2. 开启mysql事务,且repartition 趋向为接近为1,这样才避免部分失败(多partition并发操作mysql)造成的部分失败重试。(注意避免job延迟太大)
    3. 开启mysql事务,等collect数据到 driver时,才把data和offset写入mysql,此时在同一个事务中。
  3. driver collect 注意

  4. driver的内存是否足够接收数据。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,902评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,037评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,978评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,867评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,763评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,104评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,565评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,236评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,379评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,313评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,363评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,034评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,637评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,719评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,952评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,371评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,948评论 2 341

推荐阅读更多精彩内容