【Flume采集日志】

Flume安装部署

安装地址

(1)Flume官网地址:http://flume.apache.org/
(2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html
(3)下载地址:http://archive.apache.org/dist/flume/

安装部署

(1)将apache-flume-1.9.0-bin.tar.gz上传到linux的/opt/software目录下
(2)解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下
(3)修改apache-flume-1.9.0-bin的名称为flume
(4)修改配置文件

tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume

cp flume-env.sh.template flume-env.sh
vi flume-env.sh
export JAVA_HOME=jdk目录

# 配置环境变量
vi ~/.bashrc 
#FLUME
export FLUME_HOME=flume目录
export PATH=$PATH:$FLUME_HOME/bin
source ~/.bashrc 

flume-ng version # 查看flume版本号

Flume Source 测试

常见类型: spooling directory, exec, syslog, avro, netcat,Taildir等
无论是Spooling Directory Source和Exec Source均不能满足动态实时收集的需求,TaildirSource可以。

SpoolingDirSource(监控一个目录)

spoolingDirsource是安全的,不会丢失数据,但采集文件时不可以被修改,且文件不能重名。

mkdir /opt/module/flume/job
vi job/file_to_logger_spooling.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/opt/module/flume/testdata/log/hi

#配置Channel组件
a1.channels.c1.type=memory 
a1.channels.c1.capacity=1000 
a1.channels.c1.transactionCapacity=100  

#配置Sink组件
a1.sinks.k1.type=logger  
a1.sinks.k1.maxBytesToLog=100

#将三大组件绑定到一起
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1   

启动 flume job

bin/flume-ng agent -n a1 -c conf/ -f job/file_to_logger_spooling.conf -Dflume.root.logger=INFO,console

可以看到采集完的日志文件都加上了.COMPLETED后缀

TAILDIR Souce

在/opt/module/flume/job目录下vi file_to_logger_taildir.conf

a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/opt/module/flume/testdata/log/hi/.*json
a1.sources.r1.filegroups.f2=/opt/module/flume/testdata/log/test/.*json
a1.sources.r1.positionFile=./taildir_position.json

#配置Channel组件
a1.channels.c1.type=memory 
a1.channels.c1.capacity=1000 
a1.channels.c1.transactionCapacity=100  

#配置Sink组件
a1.sinks.k1.type=logger  
a1.sinks.k1.maxBytesToLog=100

#将三大组件绑定到一起
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1   

启动 flume job

bin/flume-ng agent -n a1 -c conf/ -f job/file_to_logger_taildir.conf -Dflume.root.logger=INFO,console

上面两种方式都不能监控多级目录

为解决监控多级目录的问题,我们下载flume的源码,修改TaildirSource的代码,将修改好的TaildirSource模块打包 ,将 flume-taildir-source-1.9.0.jar 上传到flume的lib目录下替换原有的 flume-taildir-source-1.9.0.jar

修改 file_to_logger_taildir.conf 配置

a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/opt/module/flume/testdata/log/hi/.*
a1.sources.r1.filegroups.f2=/opt/module/flume/testdata/log/test/.*
a1.sources.r1.positionFile=./taildir_position.json

#配置Channel组件
a1.channels.c1.type=memory 
a1.channels.c1.capacity=1000 
a1.channels.c1.transactionCapacity=100  

#配置Sink组件
a1.sinks.k1.type=logger  
a1.sinks.k1.maxBytesToLog=100

#将三大组件绑定到一起
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1   

启动 flume job

bin/flume-ng agent -n a1 -c conf/ -f job/file_to_logger_taildir.conf -Dflume.root.logger=INFO,console

经测试,成功监控到多级目录下的文件变化

项目经验

修改/opt/module/flume/conf/flume-env.sh文件,配置如下参数(测试环境暂不配置)

export JAVA_OPTS="-Xms4096m -Xmx4096m -Dcom.sun.management.jmxremote"
export JAVA_HOME=

可选择TaildirSource和KafkaChannel搭配,并配置日志校验拦截器。
选择TailDirSource和KafkaChannel的原因如下:

TailDirSource相比ExecSource的优势
TailDirSource:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
ExecSource可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
采用Kafka Channel,省去了Sink,提高了效率。

在Flume的job目录下创建file_to_kafka.conf

#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
# a1.sources.r1.interceptors =  i1
# a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#组装 
a1.sources.r1.channels = c1
Flume从Kafka同步数据到HDFS

在Flume的job目录下创建kafka_to_hdfs_log.conf

#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
#a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.bootstrap.servers = hadoop2:9092

a1.sources.r1.kafka.topics=topic_log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder

#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false

a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

编写 flume TimestampInterceptor,将日志时间戳放到header当中的timestamp字段当中,这样hdfs会按日期分目录

public class TimestampInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        //1 获取body和header
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
        Map<String, String> headers = event.getHeaders();

        //2 将log当中的ts字段解析出来
        JSONObject jsonObject = JSONObject.parseObject(log);
        String ts = jsonObject.getString("ts");

        //3 将ts字段 放到header当中的timestamp字段当中
        headers.put("timestamp", ts);
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

把上面代码打成flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar放到/opt/module/flume/lib下

启动 flume job

bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_log.conf -Dflume.root.logger=info,console

往kafka推条数据测试

   @Test
    public void testProducerSend() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop2:9092");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, "0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(properties);

        String value = "{\"database\":\"gmall\",\"table\":\"cart_info\",\"type\":\"update\",\"ts\":1670340710200,\"xid\":13090,\"xoffset\":1573,\"data\":{\"id\":100924,\"user_id\":\"93\",\"sku_id\":16,\"cart_price\":4488.00,\"sku_num\":1,\"img_url\":\"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-sklaALrngAAHGDqdpFtU741.jpg\",\"sku_name\":\"华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机\",\"is_checked\":null,\"create_time\":\"2020-06-14 09:28:57\",\"operate_time\":null,\"is_ordered\":1,\"order_time\":\"2021-10-17 09:28:58\",\"source_type\":\"2401\",\"source_id\":null},\"old\":{\"is_ordered\":0,\"order_time\":null}}";
        ProducerRecord<String, String> record = new ProducerRecord<>("topic_log", value);
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                System.out.println("partition : " + recordMetadata.partition() + " , offset : " + recordMetadata.offset());
            }
        });

        // 所有的通道打开都需要关闭
        producer.close();
    }

然后访问发现有文件生成了
http://hadoop2:9870/explorer.html#/origin_data/gmall/log/topic_log

截屏2022-12-06 下午11.44.07.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容