Spark kafka + streaming自适应topic partitions变化

背景

  1. spark streaming + kafka 有两种方案接收kafka数据-基于receiver的方案和direct方案(no receiver方案)。
  • 基于receiver的方案,属于比较老的方案,其采用Kafka’s high-level API通过专门的Rceiver去接收kafka数据。
    采用 KafkaUtils.createStream

  • direct方案,是当前的主流用法,其采用Kafka’s simple consumer API,创建的RDD partitions数与kafka partitions数一致。性能比前者好。
    采用 KafkaUtils.createDirectStream

    具体的介绍可以查看官方介绍:Spark Streaming + Kafka Integration Guide

  1. 对于第二种方案,spark streaming启动后当kafka的topic partition数从A升到B时,新增的分区上的数据会丢失。
    spark streaming只从topic中读取原来的A个分区数据,新增的分区并不能被spark streaming感知到。
    更具体的原因是
DirectKafkaInputDStream#compute生成的KafkaRDD, 其partitions数与spark streaming启动时topic的partitions数一致,topic的partitions和offset保存在currentOffsets map变量中,
这个变量在启动时初始化,后续不会根据topic的partition变化进行更新。所以导致kafka新增的partitions数据
会丢失。

更详细的分析,可以参考下面的文章:
Spark Streaming 自适应上游 kafka topic partition 数目变化

因为spark streaming是24x7运行的,如何让spark streaming不重启的情况下,自适应topic partitions的变化?
且能够符合如下要求:

  1. 不需要重新编译spark源码, 因为源代码不是我们维护的。
  2. 能够灵活的部署

解决方案

思路可以参考
Spark Streaming 自适应上游 kafka topic partition 数目变化

总的解决方案如下:

  1. MTDirectKafkaInputDStream继承DirectKafkaInputDStream,override compute方法,在每次生成KafkaRDD时,更新currentOffsets中的分区信息。
  2. 在org.apache.spark.streaming.kafka路径下,新建一个KafkaUtils.scala文件,里面的代码直接将spark源码中的KafkaUtils源码复制过来。 修改新建的KafkaUtils.scala,将createDirectStream中new DirectKafkaInputDStream,替换为 new MTDirectKafkaInputDStream.

具体的实现步骤如下

  1. 新建maven工程 kafkastreamingadpter, 配置好scala相关的pom配置。

  2. 新建package org.apache.spark.streaming.kafka

    备注:这里需要说明,因为下面创建的MTDirectKafkaInputDStream需要继承DirectKafkaInputDStream,而DirectKafkaInputDStream是private[streaming]修饰的。这里的规避技巧就是,将需要继承DirectKafkaInputDStream的子类对应的package设置为和DirectKafkaInputDStream一致,即可规避private无法继承的问题。

  3. 新建MTDirectKafkaInputDStream继承DirectKafkaInputDStream

override compute方法,在每次生成KafkaRDD时,更新currentOffsets中的分区信息。
  1. 在org.apache.spark.streaming.kafka路径下,新建一个KafkaUtils.scala文件,里面的代码直接复制spark源码中的KafkaUtils源码。修改新建的KafkaUtils.scala,将createDirectStream中new DirectKafkaInputDStream,替换为 new MTDirectKafkaInputDStream

    备注:streaming代码中是通过 KafkaUtils.createDirectStream来创建stream的。因此希望KafkaUtils.createDirectStream返回的是MTDirectKafkaInputDStream。所以要修改KafkaUtils代码,将createDirectStream中new DirectKafkaInputDStream,修改为 new MTDirectKafkaInputDStream。
    我们不希望修改spark源码,所以这里用来个小技巧,直接将KafkaUtils代码copy一份出来进行修改,需要保持KafkaUtils的package路径一致

    2,4中用到的小技巧参考了

    [2016中国云计算技术大会-腾讯林立伟-Spark-Streaming在腾讯广点通的应用.pdf]

    Spark Streaming 自适应上游 kafka topic partition 数目变化

  2. 新建spark streaming + kafka应用实例DirectKafkaWordCount1
    实例需要配置 spark、kafka streaming相关的依赖。

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        
  1. mvn clean package 打包 生成kafka-streaming-adpter-1.0.jar
    对应的配置文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sensetime.iva</groupId>
    <artifactId>kafka-streaming-adpter</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>
    <properties>
        <scala.version>2.11.8</scala.version>
        <commons.codec.version>1.8</commons.codec.version>
        <grizzled.version>1.0.1</grizzled.version>
        <slf4j-log4j12.version>1.7.5</slf4j-log4j12.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>${commons.codec.version}</version>
        </dependency>
        <dependency>
            <groupId>org.clapper</groupId>
            <artifactId>grizzled-slf4j_2.10</artifactId>
            <version>${grizzled.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j-log4j12.version}</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
  1. 上传jar包到spark on k8s的根路径下的jars,进行镜像打包
node 73节点
cd /home/huangchuibi/k8s/spark-2.2.0-k8s-0.5.0-bin-2.7.3
./sbin/build-push-docker-images.sh -r register.sensetime.sz.test-portus.com/spark-on-k8s -t 0.1 build
./sbin/build-push-docker-images.sh -r register.sensetime.sz.test-portus.com/spark-on-k8s -t 0.1 push

  1. 在k8s各个节点上pull 镜像,因为spark on k8s,配置镜像拉取策略为本地有就不再拉取。所以需要手动进行拉取。
docker pull register.sensetime.sz.test-portus.com/spark-on-k8s/spark-driver:0.1
docker pull register.sensetime.sz.test-portus.com/spark-on-k8s/spark-executor:0.1
  1. 创建topic
ssh node91
kubectl -n snappydata exec -it kafka-0 /bin/bash
cd /opt/kafka_2.11-0.10.2.0
bin/kafka-topics.sh --create --zookeeper zookeeper-0.zookeeper.snappydata.svc.cluster.local:2181,zookeeper-1.zookeeper.snappydata.svc.cluster.local:2181,zookeeper-2.zookeeper.snappydata.svc.cluster.local:2181/kafka --replication-factor 1 --partitions 1 --topic test8

  1. 运行streaming 应用
bin/spark-submit \
  --deploy-mode cluster \
  --class org.apache.spark.examples.streaming.DirectKafkaWordCount1 \
  --master k8s://http://172.20.2.91:8001 \
  --kubernetes-namespace snappydata \
  --conf spark.executor.instances=3 \
  --conf spark.app.name=spark-streaming \
  --conf spark.kubernetes.driver.docker.image=register.sensetime.sz.test-portus.com/spark-on-k8s/spark-driver:0.1 \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  --conf spark.kubernetes.executor.docker.image=register.sensetime.sz.test-portus.com/spark-on-k8s/spark-executor:0.1 \
  --conf spark.kubernetes.initcontainer.docker.image=register.sensetime.sz.test-portus.com/spark-on-k8s/spark-init:0.1 \
  --conf "spark.driver.extraJavaOptions=-Xss10m" \
  --conf "spark.executor.extraJavaOptions=-Xss10m" \
  --conf "spark.driver.extraClassPath=/opt/spark/examples/jars/kafka-streaming-adpter-1.0.jar" \
  --conf "spark.executor.extraClassPath=/opt/spark/examples/jars/kafka-streaming-adpter-1.0.jar" \
  --conf "spark.executor.userClassPathFirst=true" \
  --conf "spark.driver.userClassPathFirst=true" \
  local:///opt/spark/examples/jars/kafka-streaming-adpter-1.0.jar kafka-0.kafka.snappydata.svc.cluster.local:9092,kafka-1.kafka.snappydata.svc.cluster.local:9092,kafka-2.kafka.snappydata.svc.cluster.local:9092 test8

备注 这里增加的四个conf配置,是为了让kafka-streaming-adpter-1.0.jar里的KafkaUtils优先级高于spark源码中的KafkaUtils。但是实际测试发现,应用还是以同一个jar包中的KafkaUtils为优先级最高。所以下面的四个配置也可以去掉。

--conf "spark.driver.extraClassPath=/opt/spark/examples/jars/kafka-streaming-adpter-1.0.jar" \
--conf "spark.executor.extraClassPath=/opt/spark/examples/jars/kafka-streaming-adpter-1.0.jar" \
--conf "spark.executor.userClassPathFirst=true" \
--conf "spark.driver.userClassPathFirst=true" \
  1. 动态增加分区
kafka-topics.sh --zookeeper zookeeper-0.zookeeper.snappydata.svc.cluster.local:2181,zookeeper-1.zookeeper.snappydata.svc.cluster.local:2181,zookeeper-2.zookeeper.snappydata.svc.cluster.local:2181/kafka -alter -partitions 2 --topic test8
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,390评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,821评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,632评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,170评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,033评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,098评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,511评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,204评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,479评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,572评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,341评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,893评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,171评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,486评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,676评论 2 335