Flume+Kafka+Spark2.3.3 实战 之 序列化类-StringDeserializer 异常处理

  上周搭起了公司的测试集群环境,本人使用的是 apache 版本的,在测试flume+kafka+spark的时候,在 Idea 上运行 spark 程序是没问题的,但是在把程序打成 jar 包之后却出现了问题,百度了两天,请教了几个大神,也没解决问题,这就有点不淡定了,周末都没有心情喝酒了,哈哈。

  这次测试中总共出现了三大问题  》》》
一个是 :
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/StringDeserializer
...
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.StringDeserializer
...

第二个是:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka010/KafkaUtils$
...
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka010.KafkaUtils$
...

第三个是:
java.lang.ClassNotFoundException: com.hw.stream.FlumeKafkaStream
问题的解决方案:

先看下我的集群组件:

image.png

官网链接文档的api为:http://spark.apache.org/docs/2.3.3/streaming-kafka-0-10-integration.html
文档中提到的mvn如下:
image.png

  刚开始,看到这里我也是感觉集群中的 kafka 版本不对,后来发现与kafka的版本无关,只要大于等于 10 版本就可以了,之所以打包后在集群上报 第一个问题 和 第二个问题 的错误主要是因为 pom.xml 中 包的引入不全 以及 build 设置问题,这里经过修改之后的 pom.xml 文件中所涉及到的依赖应如下:
 <dependencies>
        <!-- Spark的依赖引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.3.3</version>
        </dependency>

        <!-- 注:以下两个包很关键,这是spark操作 kafka 的关键包,必须要引入,否则会包 问题一 和 问题二 的错误 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.3.3</version>
        </dependency>

        <!-- 这里要和上面的依赖包的版本一致 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.2</version>
        </dependency>
    </dependencies>

  接下来这个 build 属性的设置很关键:
    <build>
        <sourceDirectory>src</sourceDirectory>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <!-- maven 打包集成插件 -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <descriptorRefs>
                        <!-- 将依赖一起打包到 JAR -->
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!-- 配置主程序 java -jar 默认Class,这里设置为自己的主类的绝对路径 -->
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>com.hw.stream.FlumeKafkaStream</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>


  主程序的代码我们完全可以根据官网案例进行编写,创建一个 FlumeKafkaStream 的scala 的 object 文件:
package com.hw.stream

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

/**
  * @author feiniu
  * @create 2019-06-30 23:15
  */
object FlumeKafkaStream {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[*]").setAppName("kafkaStream6")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    val sc = spark.sparkContext
    val ssc = new StreamingContext(sc, Seconds(5))

    //设置kafka的参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hadoop101:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "666",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    //注意:这里的主题要和你监控的主题一致
    val topics = Array("topic_ccc")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    val lines = stream.map(x => x.value())
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_+_)

    //直接输出,当然这一步完全可以放入到数据库中去
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()

  }

}

   别忘了在 resources 文件夹中放入log4j.properties文件,并进入 Project structure 做如下设置(否则即使妳在 pom.xml 中设置了主类路径以及 在 submit 指定了 --class 主类之后 同样还是报 第三个问题的错误,这个小事情有时候会搞死人的):
image.png
  接下来可以在 flume 的 conf 目录下设置监控的文件了 file-flume-kafka-producer.conf :
a1.sources= r1
a1.channels= c1

#配置数据源
a1.sources.r1.type=exec
#配置需要监控的日志输出文件或目录
a1.sources.r1.command=tail -F /opt/module/flume/mytestdata/yin.txt
a1.sources.r1.channels = c1

#配置数据通道
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_ccc
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer

  接下来,在 hadoop 集群以及 zookeeper 已经启动的情况下,启动 flume 和 kafka 服务:
#在采集服务器上启动 flume 
bin/flume-ng agent \
--name a1 \
--conf conf  \
--conf-file conf/file-flume-kafka-producer.conf  \
-Dflume.root.logger=INFO,console

#三台服务器都启动 kafka
bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
  此时,在 Idea 中启动程序,再在被监控的日记中添加数据即可看到控制台输出统计的数据了,添加数据命令如下:
echo "spark spark flume kafka kafka kafka spark flink" >> yin.txt
  一般来说,运行本地的 Idea 是不会出错的,除非没导入完整的依赖,那么,接下来在我们打包的时候如果不注意可能会出现 问题一 和 问题二 那种情况,因此我们打包的时候必须要带上有关 kafka 的依赖包:
image.png

  最后我们再上传生成的 jar 包到服务去,并在 spark 中提交指令即可看到效果:

bin/spark-submit \
--class com.hw.stream.FlumeKafkaStream \
--master spark://hadoop101:7077 \
--executor-memory 4G \
--total-executor-cores 6  \
/opt/myjar/XXX.jar 

  到此为止,我们已经成功的完成了对集群组件以及相关的依赖包的简单测试;原本是不想写这篇文章的,只是这次耗的时候有点久,因此想记录一下,希望能帮到后来者。。。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343