高吞吐量Flume Agent调优小结

前言

所有电商企业在一年一度的双11都要迎来大促与大考,我司也不例外(所以最近真是前所未有的忙乱)。前段时间在配合执行全链路压测的过程中,发现平时不太关注的Flume配置可能存在瓶颈。Flume在笔者负责的实时计算平台里用于收集所有后端访问日志和埋点日志,其效率和稳定性比较重要。除了及时扩容之外,也有必要对Flume进行调优。今天在百忙之中挤出一点时间来写写。

Flume系统以一个或多个Flume-NG Agent的形式部署,一个Agent对应一个JVM进程,并且由三个部分组成:Source、Channel和Sink,示意图如下。

Source

Flume有3种能够监听文件的Source,分别是Exec Source(配合tail -f命令)、Spooling Directory Source和Taildir Source。Taildir Source显然是最好用的,在我们的实践中,需要注意的参数列举如下。

  • filegroups
    如果需要监听的日志文件较多,应该将它们分散在不同的目录下,并配置多个filegroup来并行读取。注意日志文件的正则表达式要写好,防止日志滚动重命名时仍然符合正则表达式造成重复。示例:
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /data/logs/ng1/access.log
a1.sources.r1.headers.f1.headerKey1 = ng1
a1.sources.r1.filegroups.f2 = /data/logs/ng2/.*log
a1.sources.r1.headers.f2.headerKey1 = ng2
  • batchSize
    该参数控制向Channel发送数据的批次大小,默认值为100。如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制(后文再说)。示例:
a1.sources.r1.batchSize = 1000
  • maxBatchCount
    该参数控制从同一个文件中连续读取的最大批次数量,默认不限制。如果Flume同时监听多个文件,并且其中某个文件的写入速度远快于其他文件,那么其他文件有可能几乎无法被读取,所以强烈建议设定此参数。示例:
a1.sources.r1.maxBatchCount = 100
  • writePosInterval
    该参数控制向记录读取位置的JSON文件(由positionFile参数指定)写入inode和偏移量的频率,默认为3000ms。当Agent重新启动时,会从JSON文件中获取最近记录的偏移量开始读取。也就是说,适当降低writePosInterval可以减少Agent重启导致的重复读取的数据量。
a1.sources.r1.writePosInterval = 1000

Channel

Flume内置了多种Channel的实现,比较常用的有Memory Channel、File Channel、JDBC Channel、Kafka Channel等。我们的选择主要针对Memory Channel和File Channel两种,对比一下:

  • Memory Channel将staging事件数据存储在Agent堆内存中,File Channel则将它们存储在指定的文件中;
  • 如果Agent失败,Memory Channel会丢失所有缓存的staging事件,File Channel则可以通过额外记录的checkpoint信息恢复,保证断点续传;
  • Memory Channel能够容纳的数据量受堆内存的影响,而File Channel不受此限制。

鉴于我们下游业务的主要痛点在吞吐量与实时性,且可以容忍数据少量丢失,日志服务器的磁盘压力也已经比较大了,故Memory Channel更加合适。需要注意的参数如下。

  • capacity、transactionCapacity
    这两个参数分别代表Channel最多能容纳的事件数目,以及每个事务(即Source的一次put或者Sink的一次take)能够包含的最多事件数目。显然,必须满足batchSize <= transactionCapacity <= capacity的关系。适当调大capacity和transactionCapacity可以使得Channel的吞吐量增高,且能够保证不会出现The channel is full or unexpected failure的异常。示例:
a1.channels.c1.type = memory
a1.channels.c1.transactionCapacity = 5000
a1.channels.c1.capacity = 10000
  • byteCapacity
    该参数代表Memory Channel中缓存的事件消息的最大总大小,以字节为单位,默认是Flume Agent最大堆内存的80%。此值不建议更改为固定的,而是建议通过改变Agent的JVM参数来影响,后面再提。

  • byteCapacityBufferPercentage
    Memory Channel中缓存的事件消息头占byteCapacity的比例,默认是20%。如果事件的header信息很少,可以适当减小(我们没有更改)。

  • keep-alive
    向Channel中put或take一条事件的超时时间,默认为3秒,对于Memory Channel一般不用更改。如果业务数据是由很多突发流量组成(也就是说Channel经常处于时满时空的状态),那么建议适当调大。示例:

a1.channels.c1.keep-alive = 15

当然File Channel也很常用,其参数就不再赘述,看官可参考官方文档。

Sink

我们实时数仓接入层的起点是Kafka,自然要利用Kafka Sink。需要注意的参数列举如下。

  • kafka.flumeBatchSize
    从Channel取出数据并发送到Kafka的批次大小,与Source的batchSize同理。

  • kafka.producer.acks
    该参数的含义就留给看官去回想(很基础的),一般设为折衷的1即可。设为-1的可靠性最高,但是相应地会影响吞吐量。

  • kafka.producer.linger.ms
    Kafka Producer检查批次是否ready的超时时间,超时即发送(与producer.batch.size共同作用)。一般设为数十到100毫秒,可以在时效性和吞吐量之间取得比较好的平衡。

  • kafka.producer.compression.type
    Producer消息压缩算法,支持gzip/snappy/lz4,如果希望降低消息的体积可以配置。

示例:

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.flumeBatchSize = 1000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 50
a1.sinks.k1.kafka.producer.compression.type = snappy

Kafka Sink也支持其他Producer参数,可以按需配置。

还有一点需要注意的是,Flume默认引用的Kafka Client版本为0.9,其产生的消息在较高版本的Kafka Broker上没有时间戳,因此非常建议手动将$FLUME_HOME/lib目录下的kafka-client JAR包替换成0.10.2或更高的版本。

Interceptor

拦截器方面就比较简单粗暴,在注重吞吐量的场合一定不要使用或者自定义规则复杂的拦截器(比如自带的Regex Interceptor、Search and Replace Interceptor),最好是不使用任何拦截器,把数据清洗的任务交给下游去处理(Flink它不香嘛

Agent Process

在flume-env.sh中添加JVM参数,避免默认堆内存太小导致OOM。

export JAVA_OPTS="-Xms8192m -Xmx8192m -Xmn3072m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+HeapDumpOnOutOfMemoryError"

另外,Taildir Source会积极地使用堆外内存,如果发现Flume消耗的总内存量过大,可以适当限制直接内存的用量,如:-XX:MaxDirectMemorySize=4096m

Flume原生并没有传统意义上的“高可用”配置(Sink Group Failover不算)。为了防止Agent进程因为各种原因静默地挂掉,需要用一个“保姆脚本”(nanny script)定期检测Agent进程的状态,并及时拉起来。当然也可以在下游采用两级Collector的架构增强鲁棒性,本文不表。Cloudera Community上有一个关于Flume HA的提问,参见这里

The End

经过上述适当的调优过程,我们的单个Flume-NG Agent能够轻松承受高达5W+ RPS的持续流量高峰,比较令人满意了。

民那晚安晚安。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,311评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,339评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,671评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,252评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,253评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,031评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,340评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,973评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,466评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,937评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,039评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,701评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,254评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,259评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,485评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,497评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,786评论 2 345

推荐阅读更多精彩内容