flume的配置详解

Flume是一种分布式的、可靠的、可用的服务,可以有效地收集、聚合和移动大量的日志数据。
它有一个基于流数据的简单而灵活的体系结构。
它具有健壮性和容错能力,具有可调的可靠性机制和许多故障转移和恢复机制。
它使用一个简单的可扩展数据模型,允许在线分析应用程序。

source:源    
    对channel而言,相当于生产者,通过接收各种格式数据发送给channel进行传输

channel:通道
    相当于数据缓冲区,接收source数据发送给sink

sink:沉槽
    对channel而言,相当于消费者,通过接收channel数据通过指定数据类型发送到指定位置

Event:
flume传输基本单位:
head + body

flume使用:

//flume可以将配置文件写在zk上
agent:    a1
source:    s1
channel:c1
sink:    n1

使用方法:
    1、编写配置文件r_nc.conf
        # 将agent组件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 8888

        # 配置sink
        a1.sinks.k1.type = logger

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 绑定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

    2、启动flume,指定配置文件
        flume-ng agent -n a1 -f r_nc.conf

    3、启动另一个会话,进行测试
        nc localhost 8888

//用户手册
http://flume.apache.org/FlumeUserGuide.html

后台运行程序:

ctrl + z :将程序放在后台运行 =====> [1]+  Stopped                 flume-ng agent -n a1 -f r_nc.conf

通过 bg %1 的方式将程序后台运行

通过jobs查看后台任务

通过  fg %1 的方式将程序放在前台运行

flume:
海量日志数据的收集、聚合和移动

flume-ng agent -n a1 -f xxx.conf
source
    相对于channel是生产者    //netcat
channel
    类似于缓冲区        //memory
sink
    相对于channel是消费者    //logger

Event:
header + body
k v data

source:

1、序列(seq)源:多用作测试
    # 将agent组件起名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # 配置source
    a1.sources.r1.type = seq
    # 总共发送的事件个数
    a1.sources.r1.totalEvents = 1000    

    # 配置sink
    a1.sinks.k1.type = logger

    # 配置channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # 绑定channel-source, channel-sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

2、压力(stress)源:多用作负载测试
    # 将agent组件起名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # 配置source
    a1.sources.r1.type = org.apache.flume.source.StressSource
    # 单个事件大小,单位:byte
    a1.sources.r1.size = 10240
    # 事件总数
    a1.sources.r1.maxTotalEvents = 1000000

    # 配置sink
    a1.sinks.k1.type = logger

    # 配置channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # 绑定channel-source, channel-sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

3、滚动目录(Spooldir)源:监听指定目录新文件产生,并将新文件数据作为event发送
    # 将agent组件起名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # 配置source
    a1.sources.r1.type = spooldir
    # 设置监听目录
    a1.sources.r1.spoolDir = /home/centos/spooldir

    # 通过以下配置指定消费完成后文件后缀
    #a1.sources.r1.fileSuffix = .COMPLETED 

    # 配置sink
    a1.sinks.k1.type = logger

    # 配置channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # 绑定channel-source, channel-sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1


4、exec源    //通过执行linux命令产生新数据
        //典型应用 tail -F (监听一个文件,文件增长的时候,输出追加数据)
        //不能保证数据完整性,很可能丢失数据

    # 将agent组件起名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # 配置source
    a1.sources.r1.type = exec
    # 配置linux命令
    a1.sources.r1.command = tail -F /home/centos/readme.txt

    # 配置sink
    a1.sinks.k1.type = logger

    # 配置channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # 绑定channel-source, channel-sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

5、Taildir源        //监控目录下文件
            //文件类型可通过正则指定
            //有容灾机制

    # 将agent组件起名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # 配置source
    a1.sources.r1.type = TAILDIR
    # 设置source组 可设置多个
    a1.sources.r1.filegroups = f1
    # 设置组员的监控目录和监控文件类型,使用正则表示,只能监控文件
    a1.sources.r1.filegroups.f1 = /home/centos/taildir/.*

    # 设置定位文件的位置
    # a1.sources.r1.positionFile     ~/.flume/taildir_position.json

    # 配置sink
    a1.sinks.k1.type = logger

    # 配置channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # 绑定channel-source, channel-sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

sink:

1、fileSink    //多用作数据收集
    # 将agent组件起名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # 配置source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 8888

    # 配置sink
    a1.sinks.k1.type = file_roll
    # 配置目标文件夹
    a1.sinks.k1.sink.directory = /home/centos/file
    # 设置滚动间隔,默认30s,设为0则不滚动,成为单个文件
    a1.sinks.k1.sink.rollInterval = 0

    # 配置channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # 绑定channel-source, channel-sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

2、hdfsSink        //默认以seqFile格式写入
            //k:LongWritable
            //v: BytesWritable
            //
    # 将agent组件起名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # 配置source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 8888
    
    # 配置sink
    a1.sinks.k1.type = hdfs
    # 配置目标文件夹
    a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/
    # 配置文件前缀
    a1.sinks.k1.hdfs.filePrefix = events-
    # 滚动间隔,秒
    a1.sinks.k1.hdfs.rollInterval = 0
    # 触发滚动文件大小,byte
    a1.sinks.k1.hdfs.rollSize = 1024
    # 配置使用本地时间戳
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    # 配置输出文件类型,默认SequenceFile
    # DataStream文本格式,不能设置压缩编解码器
    # CompressedStream压缩文本格式,需要设置编解码器
    a1.sinks.k1.hdfs.fileType = DataStream


    # 配置channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # 绑定channel-source, channel-sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

3、hiveSink:        //hiveserver帮助:hive --service help
            //1、hive --service metastore 启动hive的metastore服务,metastore地址:thrift://localhost:9083
            //2、将hcatalog的依赖放在/hive/lib下,cp hive-hcatalog* /soft/hive/lib    (位置/soft/hive/hcatalog/share/hcatalog)
            //3、创建hive事务表
            //SET hive.support.concurrency=true;                                  
              SET hive.enforce.bucketing=true;                                    
              SET hive.exec.dynamic.partition.mode=nonstrict;                     
              SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
              SET hive.compactor.initiator.on=true;                               
              SET hive.compactor.worker.threads=1;
              
            //create table myhive.weblogs(id int, name string, age int)
              clustered by(id) into 2 buckets                                         
              row format delimited                                                          
              fields terminated by '\t'                                                     
              stored as orc                                                                 
              tblproperties('transactional'='true');                                        


    # 将agent组件起名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # 配置source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 8888

    # 配置sink
    a1.sinks.k1.type = hive
    a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
    a1.sinks.k1.hive.database = myhive
    a1.sinks.k1.hive.table = weblogs
    a1.sinks.k1.useLocalTimeStamp = true
    #输入格式,DELIMITED和json
    #DELIMITED    普通文本
    #json        json文件
    a1.sinks.k1.serializer = DELIMITED
    #输入字段分隔符,双引号
    a1.sinks.k1.serializer.delimiter = ","
    #输出字段分隔符,单引号
    a1.sinks.k1.serializer.serdeSeparator = '\t'
    #字段名称,","分隔,不能有空格
    a1.sinks.k1.serializer.fieldnames =id,name,age

    # 配置channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # 绑定channel-source, channel-sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

4、hbaseSink            //SimpleHbaseEventSerializer将rowKey和col设置了默认值,不能自定义
                //RegexHbaseEventSerializer可以手动指定rowKey和col字段名称

    # 将agent组件起名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # 配置source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 8888
    
    # 配置sink
    a1.sinks.k1.type = hbase
    a1.sinks.k1.table = flume_hbase
    a1.sinks.k1.columnFamily = f1
    a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

    
    # 配置col正则手动指定
    # rowKeyIndex手动指定rowKey,索引以0开头
    a1.sinks.k1.serializer.colNames = ROW_KEY,name,age
    a1.sinks.k1.serializer.regex = (.*),(.*),(.*)
    a1.sinks.k1.serializer.rowKeyIndex=0

    # 配置channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # 绑定channel-source, channel-sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1


5、asynchbaseSink        //异步hbaseSink
                //异步机制,写入速度快
    # 将agent组件起名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # 配置source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 8888
    
    # 配置sink
    a1.sinks.k1.type = asynchbase
    a1.sinks.k1.table = flume_hbase
    a1.sinks.k1.columnFamily = f1
    a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer

    # 配置channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # 绑定channel-source, channel-sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

channel:缓冲区

1、memorychannel
    a1.channels.c1.type = memory
    # 缓冲区中存留的最大event个数
    a1.channels.c1.capacity = 1000
    # channel从source中每个事务提取的最大event数
    # channel发送给sink每个事务发送的最大event数
    a1.channels.c1.transactionCapacity = 100

2、fileChannel:    //检查点和数据存储在默认位置时,当多个channel同时开启
            //会导致文件冲突,引发其他channel会崩溃
    
    # 将agent组件起名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # 配置source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 8888

    # 配置sink
    a1.sinks.k1.type = logger

    # 配置channel
    a1.channels = c1
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /home/centos/flume/checkpoint
    a1.channels.c1.dataDirs = /home/centos/flume/data

    # 绑定channel-source, channel-sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1


memoryChannel:快速,但是当设备断电,数据会丢失

FileChannel:  速度较慢,即使设备断电,数据也不会丢失

Avro

source
    # 将agent组件起名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # 配置source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 4444

    # 配置sink
    a1.sinks.k1.type = logger

    # 配置channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # 绑定channel-source, channel-sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

***********************************************************************************************    
*启动avro客户端,发送数据:                                      *
*    flume-ng avro-client -H localhost -p 4444 -R ~/avro/header.txt -F ~/avro/user0.txt    *
*                 指定ip                   指定端口 指定header文件      指定数据文件          *
***********************************************************************************************


sink
    # 将agent组件起名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # 配置source
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /home/centos/taildir/.*

    # 配置sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.bind = 192.168.23.101
    a1.sinks.k1.port = 4444


    # 配置channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # 绑定channel-source, channel-sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

Flume跃点:

1、将s101的flume发送到其他节点
    xsync.sh /soft/flume
    xsync.sh /soft/apache-flume-1.8.0-bin/

2、切换到root用户,分发环境变量文件
    su root
    xsync.sh /etc/profile
    exit

3、配置文件
    1)配置s101    //hop.conf
        设置source:avro
        设置sink: hdfs

        # 将agent组件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = avro
        a1.sources.r1.bind = 0.0.0.0
        a1.sources.r1.port = 4444

        # 配置sink
        a1.sinks.k1.type = hdfs
        a1.sinks.k1.hdfs.path = /flume/hop/%y-%m-%d/
        a1.sinks.k1.hdfs.filePrefix = events-
        a1.sinks.k1.hdfs.rollInterval = 0
        a1.sinks.k1.hdfs.rollSize = 1024
        a1.sinks.k1.hdfs.useLocalTimeStamp = true
        a1.sinks.k1.hdfs.fileType = DataStream

        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 绑定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1


    2)配置s102-s104        //hop2.conf
        设置source:taildir
        设置sink: avro

        # 将agent组件起名
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # 配置source
        a1.sources.r1.type = TAILDIR
        a1.sources.r1.filegroups = f1
        a1.sources.r1.filegroups.f1 = /home/centos/taildir/.*

        # 配置sink
        a1.sinks.k1.type = avro
        a1.sinks.k1.hostname = 192.168.23.101
        a1.sinks.k1.port = 4444


        # 配置channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # 绑定channel-source, channel-sink
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

4、在s102-s104创建~/taildir文件夹
    xcall.sh "mkdir ~/taildir"


5、启动s101的flume
    flume-ng agent -n a1 -f /soft/flume/conf/hop.conf

6、分别启动s102-s104的flume,并将其放在后台运行
    flume-ng agent -n a1 -f /soft/flume/conf/hop2.conf &


7、进行测试,分别在s102-s104的taildir中创建数据,观察hdfs数据情况
    s102]$ echo 102 > taildir/1.txt 
    s103]$ echo 103 > taildir/1.txt
    s104]$ echo 104 > taildir/1.txt

interceptor:拦截器

是source端组件:负责修改或删除event
每个source可以配置多个拦截器    ===> interceptorChain



1、Timestamp Interceptor    //时间戳拦截器    + header

    # 将agent组件起名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # 配置source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 8888
    # 给拦截器起名
    a1.sources.r1.interceptors = i1
    # 指定拦截器类型
    a1.sources.r1.interceptors.i1.type = timestamp


    # 配置sink
    a1.sinks.k1.type = logger

    # 配置channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # 绑定channel-source, channel-sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    
2、Static Interceptor    //静态拦截器    + header

3、Host Interceptor    //主机拦截器    + header

4、设置拦截器链:
    
    # 将agent组件起名
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # 配置source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 8888

    a1.sources.r1.interceptors = i1 i2 i3
    a1.sources.r1.interceptors.i1.type = timestamp
    a1.sources.r1.interceptors.i2.type = host
    a1.sources.r1.interceptors.i3.type = static
    a1.sources.r1.interceptors.i3.key = location
    a1.sources.r1.interceptors.i3.value = NEW_YORK


    # 配置sink
    a1.sinks.k1.type = logger

    # 配置channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # 绑定channel-source, channel-sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

channel selector:通道挑选器

是source端组件:负责将event发送到指定的channel,相当于分区
    
当一个source设置多个channel时,默认以副本形式向每个channel发送一个event拷贝


1、replication副本通道挑选器    //默认挑选器,source将所有channel发送event副本
                //设置source x 1, channel x 3, sink x 3 
                //    nc       memory    file

    # 将agent组件起名
    a1.sources = r1
    a1.sinks = k1 k2 k3
    a1.channels = c1 c2 c3

    # 配置source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 8888
    a1.sources.r1.selector.type = replicating

    # 配置channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 1000
    a1.channels.c2.transactionCapacity = 100

    a1.channels.c3.type = memory
    a1.channels.c3.capacity = 1000
    a1.channels.c3.transactionCapacity = 100

    
    # 配置sink
    a1.sinks.k1.type = file_roll
    a1.sinks.k1.sink.directory = /home/centos/file1
    a1.sinks.k1.sink.rollInterval = 0

    a1.sinks.k2.type = file_roll
    a1.sinks.k2.sink.directory = /home/centos/file2
    a1.sinks.k2.sink.rollInterval = 0

    a1.sinks.k3.type = file_roll
    a1.sinks.k3.sink.directory = /home/centos/file3
    a1.sinks.k3.sink.rollInterval = 0

    # 绑定channel-source, channel-sink
    a1.sources.r1.channels = c1 c2 c3
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
    a1.sinks.k3.channel = c3



2、Multiplexing 多路复用通道挑选器    //选择avro源发送文件
                    
                    
                    
                    

    # 将agent组件起名
    a1.sources = r1
    a1.sinks = k1 k2 k3
    a1.channels = c1 c2 c3
    
    # 配置source
    a1.sources.r1.type = avro
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 4444
    # 配置通道挑选器
    a1.sources.r1.selector.type = multiplexing
    a1.sources.r1.selector.header = country
    a1.sources.r1.selector.mapping.CN = c1
    a1.sources.r1.selector.mapping.US = c2
    a1.sources.r1.selector.default = c3
    
    # 配置channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    a1.channels.c2.type = memory
    a1.channels.c2.capacity = 1000
    a1.channels.c2.transactionCapacity = 100

    a1.channels.c3.type = memory
    a1.channels.c3.capacity = 1000
    a1.channels.c3.transactionCapacity = 100

    
    # 配置sink
    a1.sinks.k1.type = file_roll
    a1.sinks.k1.sink.directory = /home/centos/file1
    a1.sinks.k1.sink.rollInterval = 0

    a1.sinks.k2.type = file_roll
    a1.sinks.k2.sink.directory = /home/centos/file2
    a1.sinks.k2.sink.rollInterval = 0

    a1.sinks.k3.type = file_roll
    a1.sinks.k3.sink.directory = /home/centos/file3
    a1.sinks.k3.sink.rollInterval = 0

    # 绑定channel-source, channel-sink
    a1.sources.r1.channels = c1 c2 c3
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2
    a1.sinks.k3.channel = c3


    1、创建file1 file2 file3文件夹,家目录
        mkdir file1 file2 file3

    2、创建文件夹country,并放入头文件和数据
        创建头文件CN.txt、US.txt、OTHER.txt 
            CN.txt ===> country CN              
            US.txt ===> country US              
            OTHER.txt ===> country OTHER   
        
        创建数据 1.txt 
            1.txt ====> helloworld

    3、运行flume
        flume-ng agent -n a1 -f /soft/flume/selector_multi.conf

    4、运行Avro客户端
        flume-ng avro-client -H localhost -p 4444 -R ~/country/US.txt -F ~/country/1.txt    ===> 查看file2
        flume-ng avro-client -H localhost -p 4444 -R ~/country/CN.txt -F ~/country/1.txt    ===> 查看file1
        flume-ng avro-client -H localhost -p 4444 -R ~/country/OTHER.txt -F ~/country/1.txt    ===> 查看file3

sinkProcessor

sink Runner 运行一个 sink Group

sink Group 是由一个或多个 sink 构成

sink Runner 告诉 sink Group 处理下一批 event

sink Group 含有一个 sink Processor , 负责指定一个 sink 来处理这批数据


2、failover 容灾    //将所有sink设置一个优先级
            //数量越大,优先级越高
            //当数据传入时,优先级最高的sink负责处理
            //当sink挂掉,次高优先级的sink被激活,继续处理数据
            //channel和sink必须一对一

    a1.sources = r1
    a1.sinks = s1 s2 s3
    a1.channels = c1 c2 c3

    # Describe/configure the source
    a1.sources.r1.type = seq

    a1.sinkgroups = g1
    a1.sinkgroups.g1.sinks = s1 s2 s3
    a1.sinkgroups.g1.processor.type = failover
    a1.sinkgroups.g1.processor.priority.s1 = 5
    a1.sinkgroups.g1.processor.priority.s2 = 10
    a1.sinkgroups.g1.processor.priority.s3 = 15
    a1.sinkgroups.g1.processor.maxpenalty = 10000

    # Describe the sink
    a1.sinks.s1.type = file_roll
    a1.sinks.s1.sink.directory = /home/centos/file1
    a1.sinks.s2.type = file_roll
    a1.sinks.s2.sink.directory = /home/centos/file2
    a1.sinks.s3.type = file_roll
    a1.sinks.s3.sink.directory = /home/centos/file3

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c2.type = memory
    a1.channels.c3.type = memory

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1 c2 c3
    a1.sinks.s1.channel = c1
    a1.sinks.s2.channel = c2
    a1.sinks.s3.channel = c3

Event事件是由Source端封装输入数据的字节数组得来的
Event event = EventBuilder.withBody(body);

Sink中的process方法返回两种状态:
1、READY //一个或多个event成功分发
2、BACKOFF //channel中没有数据提供给sink

flume中事务的生命周期:

tx.begin()    //开启事务,之后执行操作
tx.commit()    //提交事务,操作完成后由此提交
tx.rollback()    //回滚事务,出现异常可以采取回滚措施
tx.close()    //关闭事务,最后一定要关闭事务

本文章来源自 https://www.cnblogs.com/zyde/p/8946069.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容

  • 博客原文 翻译作品,水平有限,如有错误,烦请留言指正。原文请见 官网英文文档 引言 概述 Apache Flume...
    rabbitGYK阅读 11,437评论 13 34
  • title: Flume构建日志采集系统date: 2018-02-03 19:45tags: [flume,k...
    溯水心生阅读 16,122评论 3 25
  • 简介 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume...
    达微阅读 667评论 0 2
  • 本文是Flume官方开发者文档的翻译。 Flume 1.8.0开发者指南 简介 概览 Apache Flume是一...
    悠扬前奏阅读 2,260评论 0 3
  • 介绍 概述 Apache Flume是为有效收集聚合和移动大量来自不同源到中心数据存储而设计的可分布,可靠的,可用...
    ximengchj阅读 3,512评论 0 13