这篇文章如果对你有帮助,记得点赞哦!有问题也可以给我评论~
一、背景
公司的日志希望能够同一到一个Kibana去做一个同一的展示,那就需要将任务的日志写到kafka。
Flink1.12开始默认的日志框架就是log4j2,那么配置的方式跟之前log4j的方式有了一些区别,这边也踩了一些坑才解决。
二、需要解决的痛点
- 如何区分JobManager和TaskManager的日志
- 如何将jobName信息添加到每条日志中,为后期的日志聚合提供方便
三、详细配置介绍
1、log4j.properties完整配置如下:
# This affects logging for both user code and Flie
rootLogger.appenderRef.rolling.ref = RollingFile
rootLogger.appenderRef.kafka.ref = Kafka
rootLogger.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.pupu.name = com.pupu
logger.pupu.level = DEBUG
# Log all infos in the given rolling file
appender.rolling.type = RollingFile
appender.rolling.name = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 200MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
# kafka appender config
appender.kafka.type=Kafka
appender.kafka.name=Kafka
appender.kafka.topic=flink_job_logs
appender.kafka.property.type=Property
appender.kafka.property.name=bootstrap.servers
appender.kafka.property.value=xxxxxxxxxxxx:9092
## kafka的输出的日志pattern
appender.kafka.layout.type=PatternLayout
appender.kafka.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS} ${sys:log.file} ${sys:flink_per_job_name} %-5p %-60c %x - %m%n
## 输出json格式的日志
#appender.kafka.layout.type=JSONLayout
#appender.kafka.layout.compact=true
#appender.kafka.layout.complete=false
#appender.kafka.layout.includeTimeMillis=true
#appender.kafka.layout.additionalField1.type=KeyValuePair
#appender.kafka.layout.additionalField1.key=logdir
#appender.kafka.layout.additionalField1.value=${sys:log.file}
#appender.kafka.layout.additionalField2.type=KeyValuePair
#appender.kafka.layout.additionalField2.key=flink_per_job_name
#appender.kafka.layout.additionalField2.value=${sys:flink_per_job_name}
2、hdfs的/flink/lib包目录下添加相关jar包
因为log4j2自带 kafka-log4j-appender,所以不需要添加这个包。
如果你的jar包没有包含kafka-client包,最好添加跟你其他地方一个版本的,我这里添加是kafka-clients-2.6.0.jar
3、启动任务时需要添加对应的flink_per_job_name到环境变量中
"env.java.opts":"-Dflink_per_job_name=mytestyjb"
这里有两种方式:
- flink run的方式:指定-yD
- 通过将配置项放到flink-conf.yaml
四、问题及解决
1、log4j2的properties配置方式
- 官网配置
[http://logging.apache.org/log4j/2.x/manual/configuration.html#Properties]
[http://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender]
- Stack Overflow上的解决
[https://stackoverflow.com/questions/56252787/configure-log4j-properties-for-kafka-appender-error-when-parsing-property-boots]
2、Class org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
flink lib缺少了kafka-clients-2.6.0.jar 包导致,详细参考跳转链接 。
之前刚看到这个报错以为是依赖冲突,后来排查半天发现没有依赖冲突,然后各种百度,有的说需要在new producer前面加一个Thread.currentThread().setContextClassLoader(null)
,然而并没有解决,想了解跳转链接。
3、打到kafka的日志如何区分不同的jobname、JM和TM的日志?
(1)区分不同的jobname的日志
精髓 :就是想办法把环境变量传到flink运行时输出日志时,能够被log4j2识别到。
步骤:
- 想传入被识别到,目前经过实践,通过env.java.opts这个参数可以达到我们的目的。见官网跳转链接
还有另外几个参数可以尝试一下:env.java.opts.jobmanager、env.java.opts.taskmanager;containerized.master.env.和containerized.taskmanager.env.这个试了不行。
- log4j想要识别到这个环境变量,他的格式是:${sys: 环境变量名}
(2)区分JM和TM的日志
这边真的是踏遍铁鞋无匿处,得来全不费功夫!
flink 的配置的RollingFile已经用这个参数${sys:log.file}
指明了文件名,然后web UI上面能够取到对应的数据,那么把${sys:log.file}这个参数放到我的layout里面,那kafka那边就可以直接区分!真是完美!
4、打到kafka的layout日志支持json格式
参考这个跳转链接,上面也有具体的配置