记录利用 HAProxy 给 Flume 做负载均衡的关键步骤。
Rsyslog 配置
业务端:
module(load="omrelp")
$SystemLogRateLimitInterval 0
$WorkDirectory /tmp
$EscapeControlCharactersOnReceive on
$ActionQueueType LinkedList
$ActionQueueFileName da_queue
$ActionResumeRetryCount -1
$ActionQueueSaveOnShutdown on
$ActionQueueSize 100000000
$ActionQueueMaxFileSize 1000M
$ActionQueueMaxDiskSpace 100G
if($pri-text == "local5.info") then {
action(type="omrelp" target="rsyslog-proxy" port="5140")
}
转发端:
一定要加上RebindInterval="1000"
参数,每发送1000条数据,关闭当前连接,建立新连接。HAProxy 才能进行轮询。
$template billfile,"datadir/%$year%%$month%%$day%/%$hour%/%msg:R,ERE,1,ZERO:([0-9]{5,6})\|.*--end%"
$template flume,"<%pri%> %timereported:::date-rfc3339% %hostname% %msg:2:$:drop-last-lf%"
$template kafka,"%msg:2:$:drop-last-lf%"
if($pri-text == "local5.info") then {
action(type="omfile" dynaFile="billfile" template="verbose" dirCreateMode="0755")
action(type="omkafka" topic="kafka topic" broker="kafka broker list" template="kafka")
action(type = "omfwd" Target = "haproxy" Port = "4444" Protocol = "tcp" Template = "flume" queue.type = "LinkedList" queue.filename = "queueData" queue.spoolDirectory = "/tmp" queue.size = "100000000" queue.dequeuebatchsize = "1000" queue.maxfilesize = "100m" queue.saveonshutdown = "on" queue.workerthreads = "4" RebindInterval="1000")
}
HAProxy 配置
global
log 127.0.0.1 local2 info # haproxy 用 syslog 写日志
chroot /data/haproxy/chroot
pidfile /data/haproxy/haproxy.pid
maxconn 10000
user root
defaults
mode tcp
log global
option tcplog
option redispatch
timeout connect 10000
timeout client 300000
timeout server 300000
maxconn 60000
retries 3
listen rsyslog_balance
bind *:4444
mode tcp
balance roundrobin
server tcp-1 flume1:4444 check port 4444 inter 5000 fall 5
server tcp-2 flume2:4444 check port 4444 inter 5000 fall
HAProxy
用 Rsyslog
写日志文件,所以 HAProxy
服务器的 Rsyslog 配置文件中需要加入以下配置:
# 表示 local2 的日志不会向 messages 里面写
local2.none /var/log/messages
# local2 的所有日志都写入指定 log 文件
local2.* /path/to/haproxy/log
Flume 配置
a1.sources = syslogtcp
a1.channels = hdfschannel
a1.sinks = hdfssink
# source
a1.sources.syslogtcp.channels = kafkachannel hdfschannel
a1.sources.syslogtcp.type = syslogtcp
a1.sources.syslogtcp.bind = 0.0.0.0
a1.sources.syslogtcp.port = 4444
a1.sources.syslogtcp.eventSize = 10000
# sink
a1.sinks.hdfssink.channel = hdfschannel
a1.sinks.hdfssink.type = hdfs
a1.sinks.hdfssink.hdfs.path = hdfs://nameservice1:8020/data/flume/%Y%m%d/%H
a1.sinks.hdfssink.hdfs.fileType = DataStream
a1.sinks.hdfssink.hdfs.writeFormat = Text
a1.sinks.hdfssink.hdfs.useLocalTimeStamp = true
a1.sinks.hdfssink.hdfs.filePrefix = FlumeData.1
a1.sinks.hdfssink.hdfs.inUsePrefix = tmp.
a1.sinks.hdfssink.hdfs.inUseSuffix = .tmp
a1.sinks.hdfssink.hdfs.rollInterval = 60
a1.sinks.hdfssink.hdfs.rollSize = 0
a1.sinks.hdfssink.hdfs.rollCount = 0
a1.sinks.hdfssink.hdfs.round = true
a1.sinks.hdfssink.hdfs.roundValue = 1
a1.sinks.hdfssink.hdfs.roundUnit = minute
a1.sinks.hdfssink.hdfs.batchSize = 2000
a1.sinks.hdfssink.hdfs.callTimeout = 180000
a1.sinks.hdfssink.hdfs.retryInterval = 0
a1.sinks.hdfssink.hdfs.closeTries = 1
a1.channels.hdfschannel.type = memory
a1.channels.hdfschannel.capacity = 200000000
a1.channels.hdfschannel.transactionCapacity = 10000