Flume高可靠传输实现

简介

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

image.png

Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了 Flume-728,对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本统称为 Flume NG(next generation);改动的另一原因是将 Flume 纳入 apache 旗下,cloudera Flume 改名为 Apache Flume。

image.png

Flume的核心概念

l Event:是Flume数据传输的基本单元。flume以事件的形式将数据从源头传送到最终的目的。Event由可选的hearders和载有数据的一个byte array构成。

l Clinet:是一个将原始数据包装成events并且发送它们到一个或多个agent的实体

l Agent:一个Agent包含Sources, Channels, Sinks和其他组件,它利用这些组件将events从一个节点传输到另一个节点或最终目的。

l Source:负责接收events或通过特殊机制产生events,并将events批量的放到一个或多个Channels。

l Channel:位于Source和Sink之间,用于缓存进来的events,当Sink成功的将events发送到下一跳的channel或最终目的,events从Channel移除。

l Sink:负责将events传输到下一跳或最终目的,成功完成后将events从channel移除。

其中Event是Flume数据传输的基本单元。flume以事件的形式将数据从源头传送到最终的目的。Event由可选的hearders和载有数据的一个byte array构成。

载有的数据对flume是不透明的

Headers是容纳了key-value字符串对的无序集合,key在集合内是唯一的。

Headers可以在上下文路由中使用扩展

image.png
image.png

Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成

image.png

Flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source,比如上图中的Web Server生成。当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。

image.png

Flume的数据流细化一下则是首先Source捕捉到外部进来的Events,然后把Events提交给ChannelProcessor,ChannelProcessor先走一遍Interceptor(进行一些过滤处理),然后通过ChannelSelector引用对象获得Channel列表,使用事务方式把Events提交到Channel,因此Source的Events提交到Channel实际上是在ChannelProcessor中进行的;而Sink则通过SinkProcessor去Channel中获得Events并消费Events,整个过程就是一个生产者消费者模式。

高可靠传输实现原理

Flume使用事务的办法来保证event的可靠传递。Source和Sink分别被封装在事务中,这些事务由保存event的存储提供或者由 Channel提供。这就保证了event在数据流的点对点传输中是可靠的。在多级数据流中,如下图,上一级的Sink和下一级的Source都被包含在事务中,保证数据可靠地从一个Channel到另一个Channel转移

其次,数据流中 Channel的持久性。Flume中MemoryChannel是可能丢失数据的(当Agent死掉时),而FileChannel是持久性的,提供类似mysql的日志机制,保证数据不丢失。

image.png

当一个正常的Flow运行时,每个Agent中的Channel中的Events数量是均衡(消费速度大于生产速度的情况),而一旦Agent直接出现故障,那么Channel就会暂时持有Events,直到故障恢复(MemoryChannel可能会丢失Events)。

image.png

Flume启动分析

image.png

Flume的Agent启动是从Application的main函数开始的,首先把自己的实例注册到了EventBus,然后通过LifecycleAware模式(类似Tomcat的开始结束模式),创建PollingPropertiesFileConfigurationProvider对象并执行start()函数,在start()函数中,通过线程池启动了一个FileWatcherRunnable任务去不断的检查启动文件是否修改,第一次或者发现文件修改了的时候就去读取配置文件,并通过EvenBus的post()方法响应读取结果,而Application的主线程因为注册过EventBus,handleConfigurationEvent()函数获得post()事件消息后就会执行先stopAllComponents(),然后startAllComponents(conf),当执行startAllComponents函数的时候,就会启动channel、sink和source这三个核心组件,注意启动顺序是先channel,后sink,最后source,这次才不会有消息丢失问题发生。整个启动过程如图所示。

简化来说:

第一步:Application主线程启动,通过PollingPropertiesFileConfigurationProvider获取source、sink、channel等配置信息

第二步:PollingPropertiesFileConfigurationProvider把配置信息通过事件总线广播给Application主线程

第三步:Application主线程重新启动channel、sink和source

image.png

Source分析

image.png

Source的继承关系类图如图所示,所有source均实现自source接口,接口方法只有两个:setChannelProcessor()和getChannelProcessor(),所以所source具体的业务实现(比如把Events发送给Channel以及过程中的事务实现都是在ChannelProcessor中实现的)。

image.png

source又分为两种类型:EventDrivenSource和PollableSource,PollableSource主要用于接收外部驱动程序的Events,比如来自Kafka的消息等,而其他source基本都是实现于EventDrivenSource,这种source不需要外部的驱动程序pollEvents,而是有自己的事件监控获得Events,比如SpoolDirectorySource,它可以从磁盘中某个文件获取文件更新数据。

以SpoolDirectorySource为例,其创建启动过程如下:

第一步:Application主线程启动的时候,通过AbstractConfigurationProvider(前面提到的PollingPropertiesFileConfigurationProvider的父类)获取配置信息

第二步:当判断配置为SpoolDirectorySource时,则通过SourceFactory实例化一个SpoolDirectorySource

第三步:AbstractConfigurationProvider调用实例化的SpoolDirectorySource对象的configure()进行初始化配置

第四步:Application主线程通过SpoolDirectoryRunnable(SpoolDirectorySource的内部类)启动source并且500毫秒执行一次

如下图所示:

image.png

注:Application.startAllComponents()启动source的时候其实最先启动的是SourceRunner,Flume有两类SourceRunner:EventDrivenSourceRunner和PollableSourceRunner,EventDrivenSourceRunner再启动具体类型的source。

SpoolDirectorySource实例的每次执行,则会读取具体目录下的文件,生成Event数据,通过ChannelProcessor把Event放入Channel,同时对文件的读取位置进行标记,下次则从标记位置进行读取。如下图所示。

image.png

Source提交Channel事务处理

image.png

之前也有说明,source是通过channelProcessor来提交Events的,如下图ChannelProcessor.class所示,channelprocessor先获得一个事务,然后开启事务,之后才进行event的提交操作,最后提交事务,如果中间出现异常则进行事务回滚。(finally中还有一个事务关闭)

image.png

ChannelProcessor.class

image.png

FileChannel内部类FileBackedTransaction处理put代码参考

FileChannel处理Event的时序

Channel有两种:MemoryChannel和FileChannel,这里以FileChannel为例,其调用时序如下图所示:

image.png

BasicTransactionSemantics.put()->BasicTransactionSemantics.put()->FileBackedTransaction.doPut(),在FileBackedTransaction执行doPut()操作的时候执行了两步操作:

1、 调用Log.put(),把Event写入实体文件

2、 调用FlumeEventQueue.addWithoutCommit(),把Event写入队列以便sink获取

Sink分析

Sink从Channel消费Event,然后进行转移到收集/聚合层或存储层,它的启动过程和source类似是从Application的main主线程开始的,通过AbstractConfigurationProvider获取配置信息,然后通过SinkFactory实例化具体Sink,然后调用sink实例的configure进行实例的初始化配置,最后通过SinkRunner启动Sink实例。

和Source不同的是SinkRunner不直接启动Sink实例,而是通过SinkProcessor异步启动的。

image.png

SinkProcessor主要有三种:

l DefaultSinkProcessor:默认实现,用于单个Sink的场景使用

l FailoverSinkProcessor:故障转移实现

l LoadBalanceSinkProcessor:用于实现Sink的负载均衡

其类的继承关系如图所示:

image.png

多个Sink可以构成一个SinkGroup。一个Sink Processor负责从一个指定的Sink Group中激活一个Sink。Sink Processor可以通过组中所有Sink实现负载均衡;也可以在一个Sink失败时转移到另一个。

• Flume通过Sink Processor实现负载均衡(Load Balancing)和故障转移(failover)

• 内建的SinkProcessors:

• Load Balancing Sink Processor – 使用RANDOM, ROUND_ROBIN或定制的选择算法

• Failover Sink Processor

• Default Sink Processor(单Sink)

• 所有的Sink都是采取轮询(polling)的方式从Channel上获取events。这个动作是通过SinkRunner激活的

• Sink Processor充当Sink的一个代理

Sink Group

     Groups配置可以实现sink的负载均衡和失败重试机制

• 负载均衡配置示例

a1.sinkgroups =g1

a1.sinkgroups.g1.sinks= k1 k2

a1.sinkgroups.g1.processor.type= load_balance

a1.sinkgroups.g1.processor.backoff= true

a1.sinkgroups.g1.processor.selector= random

• 失败重试配置示例

a1.sinkgroups =g1

a1.sinkgroups.g1.sinks= k1 k2

a1.sinkgroups.g1.processor.type= failover

a1.sinkgroups.g1.processor.priority.k1= 5

a1.sinkgroups.g1.processor.priority.k2= 10

a1.sinkgroups.g1.processor.maxpenalty= 10000

Sink事务实现

image.png

以HBaseSink.class为例,首先从channel获取一个事务,然后事务开启后进行take操作,即从channel获取Event,然后对Event进行消费处理(putEventsAndCommit即提交事务),处理完成后关闭事务。

image.png

引用美团的使用场景

参考自:http://www.aboutyun.com/thread-8317-1-1.html

image.png

a. 整个系统分为三层:Agent层,Collector层和Store层。其中Agent层每个机器部署一个进程,负责对单机的日志收集工作;Collector层部署在中心服务器上,负责接收Agent层发送的日志,并且将日志根据路由规则写到相应的Store层中;Store层负责提供永久或者临时的日志存储服务,或者将日志流导向其它服务器。

b. Agent到Collector使用LoadBalance策略,将所有的日志均衡地发到所有的Collector上,达到负载均衡的目标,同时并处理单个Collector失效的问题。

c. Collector层的目标主要有三个:SinkHdfs,SinkKafka和SinkBypass。分别提供离线的数据到Hdfs,和提供实时的日志流到Kafka和Bypass。其中SinkHdfs又根据日志量的大小分为SinkHdfs_b,SinkHdfs_m和SinkHdfs_s三个Sink,以提高写入到Hdfs的性能,具体见后面介绍。

d. 对于Store来说,Hdfs负责永久地存储所有日志;Kafka存储最新的7天日志,并给Storm系统提供实时日志流;Bypass负责给其它服务器和应用提供实时日志流。

image.png

a. 模块命名规则:所有的Source以src开头,所有的Channel以ch开头,所有的Sink以sink开头;

b. Channel统一使用美团开发的DualChannel,具体原因后面详述;对于过滤掉的日志使用NullChannel,具体原因后面详述;

c. 模块之间内部通信统一使用Avro接口;

(DualChannel:基于 MemoryChannel和 FileChannel开发。当堆积在Channel中的events数小于阈值时,所有的events被保存在MemoryChannel中,Sink从MemoryChannel中读取数据;当堆积在Channel中的events数大于阈值时,所有的events被自动存放在FileChannel中,Sink从FileChannel中读取数据。这样当系统正常运行时,我们可以使用MemoryChannel的高吞吐特性;当系统有异常时,我们可以利用FileChannel的大缓存的特性。)

参考:(基于Flume的美团日志收集系统(一)架构和设计)http://www.aboutyun.com/thread-8317-1-1.html
参考:(基于Flume的美团日志收集系统(二)改进和优化)http://www.aboutyun.com/thread-8318-1-1.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,723评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,080评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,604评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,440评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,431评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,499评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,893评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,541评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,751评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,547评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,619评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,320评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,890评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,896评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,137评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,796评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,335评论 2 342

推荐阅读更多精彩内容