filebeat+redis+ELK收集Tomcat的access日志并分析

一、背景

  • Tomcat的access日志记录的是部署在tomcat中的应用被访问的链接信息等。下如图。下面将记录一下我将access日志用filebeat收集,存入redis,然后logstash过滤后存储到es中,并且用kibana可视化展示的过程。


    Tomcat access日志

二、配置filebeat收集access日志

  1. 编辑filebeat.yml
vi /etc/filebeat/filebeat.yml
###################### Filebeat Configuration Example #########################

# This file is an example configuration file highlighting only the most common
# options. The filebeat.reference.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/filebeat/index.html

# For more available modules and options, please see the filebeat.reference.yml sample
# configuration file.

#=========================== Filebeat inputs =============================

filebeat.inputs:

- type: log

  enabled: true

  paths:
    - /var/log/nginx/access-json.log   #指明读取文件的位置
  tags: ["nginx-access"]       #用于logstash过滤


- type: log

  enabled: true

  paths:
    - /var/log/nginx/error.log   #指明读取文件的位置
  tags: ["nginx-error"]      #用于logstash过滤


- type: log

  enabled: true

  paths:
    - /root/server/apache-tomcat-aic/logs/localhost_access_log.*.txt   #指明tomcat-access日志文件的位置
  tags: ["tomcat-access"]      #用于logstash过滤

#============================= Filebeat modules ===============================

filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml

  # Set to true to enable config reloading
  reload.enabled: false

  # Period on which files under path should be checked for changes
  #reload.period: 10s

#==================== Elasticsearch template setting ==========================

setup.template.settings:
  index.number_of_shards: 3
  #index.codec: best_compression
  #_source.enabled: false

#================================ Outputs =====================================

# Configure what output to use when sending the data collected by the beat.

#-------------------------- Redis output ------------------------------
output.redis:
   hosts: ["192.168.1.110:6379"]   #输出到redis的机器
   password: "123456"
   key: "filebeat:test16"   #redis中日志数据的key值ֵ
   db: 0
   timeout: 5


#================================ Processors =====================================

# Configure processors to enhance or manipulate events generated by the beat.

processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~


三、配置logstash过滤access日志

  1. 新建一个logstash的配置文件
vi /etc/logstash/conf.d/tomcat110-access.conf
  1. 输入以下内容
input {
    redis {
        data_type =>"list"
        key =>"filebeat:test16"
        host =>"192.168.1.110"
        port => 6379
        password => "123456"
        threads => "8"
        db => 0
        #codec => json
        }
}

filter {
    
    if "tomcat-access" in [tags]{
        grok {
            match => ["message", "%{IPORHOST:client} (%{USER:ident}|-) (%{USER:auth}|-) \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\" %{NUMBER:response} %{NUMBER:bytes}"]
        }
        date{
            match=>["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
            target=>"@timestamp"
        }

        mutate {
            split => [ "request" ,"?" ]
            add_field => ["url" , "%{request[0]}"]
        }
        
    }
    geoip {
        source => "client" 
        database => "/opt/GeoLite2-City/GeoLite2-City.mmdb"
        remove_field => ["[geoip][latitude]", "[geoip][longitude]", "[geoip][country_code]", "[geoip][country_code2]", "[geoip][country_code3]", "[geoip][timezone]", "[geoip][continent_code]", "[geoip][region_code]", "[geoip][ip]"]
        target => "geoip"
     }
    mutate {
        convert => [ "[geoip][coordinates]", "float" ] 
        remove_field => "log"
        remove_field => "beat"
        remove_field => "meta"
        remove_field => "prospector"
    }
}


output {
    if "tomcat-access" in [tags]{
        elasticsearch {
            hosts => ["192.168.1.110:9200"]      
            index => "logstash-test16-tomcat-access-%{+yyyy.MM.dd}"      
        }
    }
    
}

  • 这里简单解释一下logstash的内容
    input节点的内容代表,从redis中获取key为filebeat:test16的值。
input {
    redis {
        data_type =>"list"
        key =>"filebeat:test16"
        host =>"192.168.1.110"
        port => 6379
        password => "123456"
        threads => "8"
        db => 0
        #codec => json
        }
}
  • filter节点
filter {
    # 当遇到tags中含有tomcat-access,执行以下逻辑。这个tags是在filebeat.yml中设置的。
    if "tomcat-access" in [tags]{
        #使用grok过滤,匹配日志
        grok {
            match => ["message", "%{IPORHOST:client} (%{USER:ident}|-) (%{USER:auth}|-) \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\" %{NUMBER:response} %{NUMBER:bytes}"]
        }
       #将  11/Apr/2019:09:22:28 +0800 时间转换为: 2019-04-11T01:22:28.000Z
        date{
            match=>["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
            target=>"@timestamp"
        }
       # 将request字段用问号分隔,并且增加一个字段为url,将分隔后的第一个字段赋值给它。此处的作用是为了取接口地址,去掉访问参数。如:/api/publicScreen?shopId=12&deviceNo=123456&status=1。则url为:/api/publicScreen
        mutate {
            split => [ "request" ,"?" ]
            add_field => ["url" , "%{request[0]}"]
        }
        
    }
   #使用geoip库定位ip
    geoip {
        source => "client" #日志中外部访问ip对应字段
        database => "/opt/GeoLite2-City/GeoLite2-City.mmdb"
        #去掉显示geoip显示的多余信息
        remove_field => ["[geoip][latitude]", "[geoip][longitude]", "[geoip][country_code]", "[geoip][country_code2]", "[geoip][country_code3]", "[geoip][timezone]", "[geoip][continent_code]", "[geoip][region_code]", "[geoip][ip]"]
        target => "geoip"
     }
    mutate {
        convert => [ "[geoip][coordinates]", "float" ] 
        remove_field => "log"
        remove_field => "beat"
        remove_field => "meta"
        remove_field => "prospector"
    }
}
  • 另外grok 可以在kibana中测试自己的正则语法。


    image.png
  1. 重启logstash,查看启动日志是否报错。
systemctl restart logstash   #重启
tail -f /var/log/logstash/logstash-plain.log      #查看运行日志

三、配置Kibana展示access日志

  • 上述配置正确后。可以在kibana中看到我们建立的日志索引


    image.png
  1. 建立kibana日志索引,可视化日志


    image.png

    image.png
  2. 查看日志


    image.png
  3. 为这个索引创建一个Dashboard
  • 先为这个Dashboard添加需要的Visualize。此处先创建排名前10的访问链接。


    image.png

    image.png

    image.png

    image.png

    image.png
  • 点击save后,保存数据名字即可。接下来再创建一个访问IP的可视化地图,接下来就不贴那么多图了,步骤类似上面。注意类型选择 Coordinate Map
image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容