Flink任务日志写到kafka【最新1.12,1.13】

这篇文章如果对你有帮助,记得点赞哦!有问题也可以给我评论~

一、背景

公司的日志希望能够同一到一个Kibana去做一个同一的展示,那就需要将任务的日志写到kafka。
Flink1.12开始默认的日志框架就是log4j2,那么配置的方式跟之前log4j的方式有了一些区别,这边也踩了一些坑才解决。

二、需要解决的痛点

    - 如何区分JobManager和TaskManager的日志
    - 如何将jobName信息添加到每条日志中,为后期的日志聚合提供方便

三、详细配置介绍

1、log4j.properties完整配置如下:

# This affects logging for both user code and Flie
rootLogger.appenderRef.rolling.ref = RollingFile
rootLogger.appenderRef.kafka.ref = Kafka
rootLogger.level = INFO


# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.pupu.name = com.pupu
logger.pupu.level = DEBUG

# Log all infos in the given rolling file
appender.rolling.type = RollingFile
appender.rolling.name = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 200MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10


# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

# kafka appender config
appender.kafka.type=Kafka
appender.kafka.name=Kafka
appender.kafka.topic=flink_job_logs
appender.kafka.property.type=Property
appender.kafka.property.name=bootstrap.servers
appender.kafka.property.value=xxxxxxxxxxxx:9092
## kafka的输出的日志pattern
appender.kafka.layout.type=PatternLayout
appender.kafka.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  ${sys:log.file}  ${sys:flink_per_job_name} %-5p %-60c %x - %m%n
## 输出json格式的日志
#appender.kafka.layout.type=JSONLayout
#appender.kafka.layout.compact=true
#appender.kafka.layout.complete=false
#appender.kafka.layout.includeTimeMillis=true
#appender.kafka.layout.additionalField1.type=KeyValuePair
#appender.kafka.layout.additionalField1.key=logdir
#appender.kafka.layout.additionalField1.value=${sys:log.file}
#appender.kafka.layout.additionalField2.type=KeyValuePair
#appender.kafka.layout.additionalField2.key=flink_per_job_name
#appender.kafka.layout.additionalField2.value=${sys:flink_per_job_name}

2、hdfs的/flink/lib包目录下添加相关jar包

因为log4j2自带 kafka-log4j-appender,所以不需要添加这个包。
如果你的jar包没有包含kafka-client包,最好添加跟你其他地方一个版本的,我这里添加是kafka-clients-2.6.0.jar

3、启动任务时需要添加对应的flink_per_job_name到环境变量中

"env.java.opts":"-Dflink_per_job_name=mytestyjb"

这里有两种方式:
    - flink run的方式:指定-yD
    - 通过将配置项放到flink-conf.yaml

四、问题及解决

1、log4j2的properties配置方式

     - 官网配置
         [http://logging.apache.org/log4j/2.x/manual/configuration.html#Properties]
         [http://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender]
     - Stack Overflow上的解决
         [https://stackoverflow.com/questions/56252787/configure-log4j-properties-for-kafka-appender-error-when-parsing-property-boots]

2、Class org.apache.kafka.common.serialization.ByteArraySerializer could not be found.

   flink lib缺少了kafka-clients-2.6.0.jar 包导致,详细参考跳转链接
   之前刚看到这个报错以为是依赖冲突,后来排查半天发现没有依赖冲突,然后各种百度,有的说需要在new producer前面加一个Thread.currentThread().setContextClassLoader(null) ,然而并没有解决,想了解跳转链接

3、打到kafka的日志如何区分不同的jobname、JM和TM的日志?

(1)区分不同的jobname的日志

  精髓 :就是想办法把环境变量传到flink运行时输出日志时,能够被log4j2识别到。
  步骤:
     - 想传入被识别到,目前经过实践,通过env.java.opts这个参数可以达到我们的目的。见官网跳转链接
还有另外几个参数可以尝试一下:env.java.opts.jobmanager、env.java.opts.taskmanager;containerized.master.env.和containerized.taskmanager.env.这个试了不行。
     - log4j想要识别到这个环境变量,他的格式是:${sys: 环境变量名}

(2)区分JM和TM的日志

     这边真的是踏遍铁鞋无匿处,得来全不费功夫!
     flink 的配置的RollingFile已经用这个参数${sys:log.file}指明了文件名,然后web UI上面能够取到对应的数据,那么把${sys:log.file}这个参数放到我的layout里面,那kafka那边就可以直接区分!真是完美!

4、打到kafka的layout日志支持json格式

参考这个跳转链接,上面也有具体的配置

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容