背景
在项目开发中对流式数据使用Spark Structured Streaming进行处理,处理流程为:消息中间件(source) -> Spark Structured Streaming(work) -> AFS(sink)。
在source->work这个过程中,消息以protobuf形式存储,其中 Spark Structured Streaming接受到的数据格式为Array[Byte],所以我们需要将Array[Byte]形式的数据通过protobuf反序列化出来,得到最终完整的String。
PS:一开始比较懒,直接使用new String(Array[Byte])的方式企图将数据粗暴的转成String,最后果然只能看到其中一部分数据,剩下的另外很大一部分数据全都是乱码,或者直接是空格,导致我拿不到我想要的字段内容,所以最后还是老老实实使用protobuf进行反序列化。
心酸历程
废话少说,过程如下:
需准备的东西
- 一个可以拿到protoBuf格式数据的消息中间件;
- Spark Structured Streaming运行环境,我是用的是Hadoop环境;
- 消息中间件中protobuf数据对应的.proto文件;
- 本地可以执行protoc命令的protobuf编译器;
protobuf及本地环境相关准备
protobuf相关知识、proto语法等proto相关基础知识见proto官网:https://developers.google.cn/protocol-buffers/ 或者可以自己去搜一搜一些别人的博客之类的。
我的test.proto文件如下:
syntax = "proto2";
//语言种类枚举,后续可持续补充
enum LanguageTypes {
CH = 0; //中文
ENGLISH = 1; //英文
//以下补充标准列表
}
//日志字符编码枚举
enum CodeType {
CODE_TYPE_UNKNOWN = 0; //为止类型
UTF8 = 1; //utf8
GBK = 2; //gbk
}
message Log {
//以上ID小于129的字段为预留字段,不能添加,用户自有字段ID从130开始
optional int64 connection_code = 130; //用户连接号
optional string action_json = 131; //用户行为数据
required string send_time = 132; //请求发送时间
}
自己电脑安装proto编译器,Window、Mac环境的安装教程一搜一大堆,自己去下载,安装就行。
因为我的程序运行环境问题,我在本机上安装了2.4.1版本,下载路径如下:https://github.com/protocolbuffers/protobuf/releases/tag/v2.4.1。
我的系统是MAC,所以安装完成后,在终端输入命令
protoc --version
结果显示为
libprotoc 2.4.1
就成功了。
protobuf编译成Java类
在本机终端输入命令:
protoc -I=proto文件存储目录 --java_out=最终Java类想要存放目标目录的绝对路径 proto文件的绝对路径
就会在--java_out
参数指定的路径下生成目标Java类,我使用test.proto生成了Test.java类;将生成的Test.java类移到代码中指定目录下。
Maven依赖
我的项目是Scala项目,所以需要在maven项目中需添加protobuf-java依赖才可以使用上面的Java类,我的POM依赖如下:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.4.1</version>
</dependency>
Maven依赖的坑
- Maven依赖版本一定要和本地生成Java类的protoc编译器版本保持一致,否则将造成proto对应的Java类不可用;
- 我们的Hadoop环境中依赖的protobuf版本为2.5.0,但是引入的spark-core依赖,使用的是protobuf2.4.1,其中,2.4.1和2.5.0不可兼容。
- 可在项目路径下使用命令
mvn dependency:tree
来查看maven依赖自身还依赖了哪些包及具体版本; - 一开始我安装了最新的3.5.1版本,但是编译运行时,报错信息为:
- 可在项目路径下使用命令
java.lang.NoSuchMethodError
但是我在本地跳转却可以跳转到相应的方法中去,造成这种情况是因为包冲突,所以我将protoc版本降到了2.5.0,但是报错信息为:
java.lang.VerifyError:class com.XX.XX.Test$Log overrides final method......
这是因为虽然Hadoop环境中用到了2.5.0,但在运行Spark程序的时候,还是会去调用2.4.1,所以我又将版本降成了2.4.1。在降为2.4.1后,对一些与2.4.1版本无法兼容的其他依赖的版本做了相应修改。
调用成功!
Spark代码
val inputStream = spark.readStream
......
.load
.as[Array[Byte]]
.map(row => {
val log = Test.Log.parseFrom(row)
val action_json = log.getActionJson
action_json
})
.toDF("value")
[图片上传失败...(image-b2d825-1627808612072)]