简介
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了 Flume-728,对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本统称为 Flume NG(next generation);改动的另一原因是将 Flume 纳入 apache 旗下,cloudera Flume 改名为 Apache Flume。
Flume的核心概念
l Event:是Flume数据传输的基本单元。flume以事件的形式将数据从源头传送到最终的目的。Event由可选的hearders和载有数据的一个byte array构成。
l Clinet:是一个将原始数据包装成events并且发送它们到一个或多个agent的实体
l Agent:一个Agent包含Sources, Channels, Sinks和其他组件,它利用这些组件将events从一个节点传输到另一个节点或最终目的。
l Source:负责接收events或通过特殊机制产生events,并将events批量的放到一个或多个Channels。
l Channel:位于Source和Sink之间,用于缓存进来的events,当Sink成功的将events发送到下一跳的channel或最终目的,events从Channel移除。
l Sink:负责将events传输到下一跳或最终目的,成功完成后将events从channel移除。
其中Event是Flume数据传输的基本单元。flume以事件的形式将数据从源头传送到最终的目的。Event由可选的hearders和载有数据的一个byte array构成。
载有的数据对flume是不透明的
Headers是容纳了key-value字符串对的无序集合,key在集合内是唯一的。
Headers可以在上下文路由中使用扩展
Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成
Flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source,比如上图中的Web Server生成。当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。
Flume的数据流细化一下则是首先Source捕捉到外部进来的Events,然后把Events提交给ChannelProcessor,ChannelProcessor先走一遍Interceptor(进行一些过滤处理),然后通过ChannelSelector引用对象获得Channel列表,使用事务方式把Events提交到Channel,因此Source的Events提交到Channel实际上是在ChannelProcessor中进行的;而Sink则通过SinkProcessor去Channel中获得Events并消费Events,整个过程就是一个生产者消费者模式。
高可靠传输实现原理
Flume使用事务的办法来保证event的可靠传递。Source和Sink分别被封装在事务中,这些事务由保存event的存储提供或者由 Channel提供。这就保证了event在数据流的点对点传输中是可靠的。在多级数据流中,如下图,上一级的Sink和下一级的Source都被包含在事务中,保证数据可靠地从一个Channel到另一个Channel转移
其次,数据流中 Channel的持久性。Flume中MemoryChannel是可能丢失数据的(当Agent死掉时),而FileChannel是持久性的,提供类似mysql的日志机制,保证数据不丢失。
当一个正常的Flow运行时,每个Agent中的Channel中的Events数量是均衡(消费速度大于生产速度的情况),而一旦Agent直接出现故障,那么Channel就会暂时持有Events,直到故障恢复(MemoryChannel可能会丢失Events)。
Flume启动分析
Flume的Agent启动是从Application的main函数开始的,首先把自己的实例注册到了EventBus,然后通过LifecycleAware模式(类似Tomcat的开始结束模式),创建PollingPropertiesFileConfigurationProvider对象并执行start()函数,在start()函数中,通过线程池启动了一个FileWatcherRunnable任务去不断的检查启动文件是否修改,第一次或者发现文件修改了的时候就去读取配置文件,并通过EvenBus的post()方法响应读取结果,而Application的主线程因为注册过EventBus,handleConfigurationEvent()函数获得post()事件消息后就会执行先stopAllComponents(),然后startAllComponents(conf),当执行startAllComponents函数的时候,就会启动channel、sink和source这三个核心组件,注意启动顺序是先channel,后sink,最后source,这次才不会有消息丢失问题发生。整个启动过程如图所示。
简化来说:
第一步:Application主线程启动,通过PollingPropertiesFileConfigurationProvider获取source、sink、channel等配置信息
第二步:PollingPropertiesFileConfigurationProvider把配置信息通过事件总线广播给Application主线程
第三步:Application主线程重新启动channel、sink和source
Source分析
Source的继承关系类图如图所示,所有source均实现自source接口,接口方法只有两个:setChannelProcessor()和getChannelProcessor(),所以所source具体的业务实现(比如把Events发送给Channel以及过程中的事务实现都是在ChannelProcessor中实现的)。
source又分为两种类型:EventDrivenSource和PollableSource,PollableSource主要用于接收外部驱动程序的Events,比如来自Kafka的消息等,而其他source基本都是实现于EventDrivenSource,这种source不需要外部的驱动程序pollEvents,而是有自己的事件监控获得Events,比如SpoolDirectorySource,它可以从磁盘中某个文件获取文件更新数据。
以SpoolDirectorySource为例,其创建启动过程如下:
第一步:Application主线程启动的时候,通过AbstractConfigurationProvider(前面提到的PollingPropertiesFileConfigurationProvider的父类)获取配置信息
第二步:当判断配置为SpoolDirectorySource时,则通过SourceFactory实例化一个SpoolDirectorySource
第三步:AbstractConfigurationProvider调用实例化的SpoolDirectorySource对象的configure()进行初始化配置
第四步:Application主线程通过SpoolDirectoryRunnable(SpoolDirectorySource的内部类)启动source并且500毫秒执行一次
如下图所示:
注:Application.startAllComponents()启动source的时候其实最先启动的是SourceRunner,Flume有两类SourceRunner:EventDrivenSourceRunner和PollableSourceRunner,EventDrivenSourceRunner再启动具体类型的source。
SpoolDirectorySource实例的每次执行,则会读取具体目录下的文件,生成Event数据,通过ChannelProcessor把Event放入Channel,同时对文件的读取位置进行标记,下次则从标记位置进行读取。如下图所示。
Source提交Channel事务处理
之前也有说明,source是通过channelProcessor来提交Events的,如下图ChannelProcessor.class所示,channelprocessor先获得一个事务,然后开启事务,之后才进行event的提交操作,最后提交事务,如果中间出现异常则进行事务回滚。(finally中还有一个事务关闭)
ChannelProcessor.class
FileChannel内部类FileBackedTransaction处理put代码参考
FileChannel处理Event的时序
Channel有两种:MemoryChannel和FileChannel,这里以FileChannel为例,其调用时序如下图所示:
BasicTransactionSemantics.put()->BasicTransactionSemantics.put()->FileBackedTransaction.doPut(),在FileBackedTransaction执行doPut()操作的时候执行了两步操作:
1、 调用Log.put(),把Event写入实体文件
2、 调用FlumeEventQueue.addWithoutCommit(),把Event写入队列以便sink获取
Sink分析
Sink从Channel消费Event,然后进行转移到收集/聚合层或存储层,它的启动过程和source类似是从Application的main主线程开始的,通过AbstractConfigurationProvider获取配置信息,然后通过SinkFactory实例化具体Sink,然后调用sink实例的configure进行实例的初始化配置,最后通过SinkRunner启动Sink实例。
和Source不同的是SinkRunner不直接启动Sink实例,而是通过SinkProcessor异步启动的。
SinkProcessor主要有三种:
l DefaultSinkProcessor:默认实现,用于单个Sink的场景使用
l FailoverSinkProcessor:故障转移实现
l LoadBalanceSinkProcessor:用于实现Sink的负载均衡
其类的继承关系如图所示:
多个Sink可以构成一个SinkGroup。一个Sink Processor负责从一个指定的Sink Group中激活一个Sink。Sink Processor可以通过组中所有Sink实现负载均衡;也可以在一个Sink失败时转移到另一个。
• Flume通过Sink Processor实现负载均衡(Load Balancing)和故障转移(failover)
• 内建的SinkProcessors:
• Load Balancing Sink Processor – 使用RANDOM, ROUND_ROBIN或定制的选择算法
• Failover Sink Processor
• Default Sink Processor(单Sink)
• 所有的Sink都是采取轮询(polling)的方式从Channel上获取events。这个动作是通过SinkRunner激活的
• Sink Processor充当Sink的一个代理
Sink Group
Groups配置可以实现sink的负载均衡和失败重试机制
• 负载均衡配置示例
a1.sinkgroups =g1
a1.sinkgroups.g1.sinks= k1 k2
a1.sinkgroups.g1.processor.type= load_balance
a1.sinkgroups.g1.processor.backoff= true
a1.sinkgroups.g1.processor.selector= random
• 失败重试配置示例
a1.sinkgroups =g1
a1.sinkgroups.g1.sinks= k1 k2
a1.sinkgroups.g1.processor.type= failover
a1.sinkgroups.g1.processor.priority.k1= 5
a1.sinkgroups.g1.processor.priority.k2= 10
a1.sinkgroups.g1.processor.maxpenalty= 10000
Sink事务实现
以HBaseSink.class为例,首先从channel获取一个事务,然后事务开启后进行take操作,即从channel获取Event,然后对Event进行消费处理(putEventsAndCommit即提交事务),处理完成后关闭事务。
引用美团的使用场景
参考自:http://www.aboutyun.com/thread-8317-1-1.html
a. 整个系统分为三层:Agent层,Collector层和Store层。其中Agent层每个机器部署一个进程,负责对单机的日志收集工作;Collector层部署在中心服务器上,负责接收Agent层发送的日志,并且将日志根据路由规则写到相应的Store层中;Store层负责提供永久或者临时的日志存储服务,或者将日志流导向其它服务器。
b. Agent到Collector使用LoadBalance策略,将所有的日志均衡地发到所有的Collector上,达到负载均衡的目标,同时并处理单个Collector失效的问题。
c. Collector层的目标主要有三个:SinkHdfs,SinkKafka和SinkBypass。分别提供离线的数据到Hdfs,和提供实时的日志流到Kafka和Bypass。其中SinkHdfs又根据日志量的大小分为SinkHdfs_b,SinkHdfs_m和SinkHdfs_s三个Sink,以提高写入到Hdfs的性能,具体见后面介绍。
d. 对于Store来说,Hdfs负责永久地存储所有日志;Kafka存储最新的7天日志,并给Storm系统提供实时日志流;Bypass负责给其它服务器和应用提供实时日志流。
a. 模块命名规则:所有的Source以src开头,所有的Channel以ch开头,所有的Sink以sink开头;
b. Channel统一使用美团开发的DualChannel,具体原因后面详述;对于过滤掉的日志使用NullChannel,具体原因后面详述;
c. 模块之间内部通信统一使用Avro接口;
(DualChannel:基于 MemoryChannel和 FileChannel开发。当堆积在Channel中的events数小于阈值时,所有的events被保存在MemoryChannel中,Sink从MemoryChannel中读取数据;当堆积在Channel中的events数大于阈值时,所有的events被自动存放在FileChannel中,Sink从FileChannel中读取数据。这样当系统正常运行时,我们可以使用MemoryChannel的高吞吐特性;当系统有异常时,我们可以利用FileChannel的大缓存的特性。)
参考:(基于Flume的美团日志收集系统(一)架构和设计)http://www.aboutyun.com/thread-8317-1-1.html
参考:(基于Flume的美团日志收集系统(二)改进和优化)http://www.aboutyun.com/thread-8318-1-1.html