#读
input {
file {
path => "文件全路径"
type => "任意名字最好有意义"#自定义日志区分类型
start_position => "beginning" #从文件开始处读写
}
}
#过滤
filter {
grok {
#切割后日期名字叫logdate
match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]
}
date {
#logdate 从上面过滤后取到的字段名,yyyy-MM-dd HH:mm:ss.SSS 日期格式条件
match => ["logdate", "yyyy-MM-dd HH:mm:ss.SSS"]
#match => ["logdate", "yyyyMMdd","yyyy-MM-dd"]
#赋值给那个key
target => "@timestamp"
#删除不需要的字段
remove_field => ["logdate"]
}
#合并错误日志
multiline {
pattern => "^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}"
negate => true
what => "previous"
}
}
#输出
output{
#输出到ES
elasticsearch{
hosts=>["127.0.0.1:9200"]
#es的index名字,默认就是这个,可以更改
index => "logstash-%{+YYYY.MM.dd}"
}
#输出到控制台
stdout{codec => rubydebug}
}
Logstash将日志产生时间替换@timestamp
https://blog.csdn.net/zmx729618/article/details/80893763
2018年07月03日 09:40:21 零度anngle 阅读数:5581
<article class="baidu_pl" style="box-sizing: inherit; outline: 0px; margin: 0px; padding: 16px 0px 0px; display: block; position: relative; color: rgb(51, 51, 51); font-family: "Microsoft YaHei", "SF Pro Display", Roboto, Noto, Arial, "PingFang SC", sans-serif; font-size: 14px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">
一、跟着官网学习一下date插件
日期过滤器用于从字段中解析日期,然后使用该日期或时间戳作为事件的logstash时间戳。例如,syslog事件通常具有这样的时间戳:
Bash
"Apr 17 09:32:01"
你可以使用日期格式MMM dd HH:mm:ss来解析这个。日期过滤器对于排序事件和回填旧数据尤为重要。 如果您在活动中没有找到正确的日期,那么稍后搜索它们可能会排序错乱。在没有这个过滤器的情况下,logstash会根据第一次看到事件(在输入时间),如果时间戳还没有在事件中设置,选择一个时间戳。 例如,在文件输入的情况下,时间戳被设置为每次读取的时间。
1.1 Date过滤器配置选项
该插件支持以下配置选项和稍后介绍的通用选项。
locale:
值类型是字符串,这个设置没有默认值。使用IETF-BCP47或POSIX语言标记指定用于日期分析的区域设置。 简单的例子是en,美国的BCP47或者en_US的POSIX。大多数情况下需要设置语言环境来解析月份名称(MMM模式)和星期几名称(EEE模式)。如果未指定,则将使用平台默认值,但对于非英文平台默认情况下,英文解析器也将用作回退机制。
match:
值类型是数组,默认值是[]。首先有字段名称的数组,格式模式如下,[field,formats ...]如果你的时间字段有多种可能的格式,你可以这样做:
Bash
match => [ "logdate", "MMM dd yyyy HH:mm:ss", "MMM d yyyy HH:mm:ss", "ISO8601" ]
#以上将匹配一个系统日志(rfc3164)或iso8601时间戳。
有一些特殊的例外。 以下格式文字可帮助您节省时间并确保日期解析的正确性。
Bash
ISO8601 - 应解析任何有效的ISO8601时间戳,如2011-04-19T03:44:01.103ZUNIX - 将解析float或int值,表示自1346149001.132以及1326149001.132以来的秒数(以秒为单位)UNIX_MS - 将分析int值表示unix时间(以毫秒为单位),如1366125117000TAI64N - 将解析tai64n时间值
例如,如果您有一个字段logdate,值类似于Aug 13 2010 00:03:44,您可以使用此配置:
Bash
filter { date { match => [ "logdate", "MMM dd yyyy HH:mm:ss" ] }}
如果您的字段嵌套在结构中,则可以使用嵌套语法[foo] [bar]来匹配其值。 有关更多信息,请参阅字段参考[<u style="box-sizing: border-box; outline: 0px; margin: 0px; padding: 0px; word-wrap: break-word;">https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#logstash-config-field-references</u>](https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#logstash-config-field-references)
有关语法的更多细节 :
用于解析日期和时间文本的语法使用字母来指示时间值(月,分等)的类型,以及重复的字母来表示该值的形式(2位月份,全月份名称等)。以下是可用于解析日期和时间的内容:
y year:
Bash
yyyy #全年号码。 例如:2015。yy #两位数年份。 例如:2015年的15。
M month of the year :
Bash
M #最小数字月份。 例如:1 for January and 12 for December.。MM #两位数月份。 如果需要,填充零。 例如:01 for January and 12 for DecembeMMM #缩短的月份文本。 例如: Jan for January。 注意:使用的语言取决于您的语言环境。 请参阅区域设置以了解如何更改语言。MMMM #全月文本,例如:January。 注意:使用的语言取决于您的语言环境。
d day of the month :
Bash
d #最少数字的一天。 例如:1月份的第一天1。dd #两位数的日子,如果需要的话可以填零.例如:01 for the 1st of the month。
H hour of the day (24-hour clock):
Bash
H #最小数字小时。 例如:0表示午夜。HH #两位数小时,如果需要填零。 例如:午夜00。
m minutes of the hour (60 minutes per hour):
Bash
m #最小的数字分钟。 例如:0。mm #两位数分钟,如果需要填零。 例如:00。
s seconds of the minute (60 seconds per minute) :
Bash
s #最小数字秒。 例如:0。ss #两位数字,如果需要填零。 例如:00。
S 秒的小数部分最大精度是毫秒(SSS)。 除此之外,零附加。
Bash
S #十分之一秒。例如:0为亚秒值012SS #百分之一秒 例如:01为亚秒值01SSS #千分之一秒 例如:012为亚秒值012
Z 时区偏移或身份
Bash
Z #时区偏移,结构为HHmm(Zulu/UTC的小时和分钟偏移量)。例如:-0700。ZZ #时区偏移结构为HH:mm(小时偏移和分钟偏移之间的冒号)。 例如:-07:00。ZZZ #时区身份。 例如:America/Los_Angeles。 注意:有效的ID在列表中列出http://joda-time.sourceforge.net/timezones.html
z 时区名称。 时区名称(z)不能被分析。
w 一年中的一周
Bash
w #最小数字周。 例如:1。ww #两位数周,如果需要填零。例如:01。
D 一年中的一天
e 星期几(数量)
E 一周中的天(文本)
Bash
E,EE,EEE #星期几缩写。 例如:Mon, Tue, Wed, Thu, Fri, Sat, Sun。 注意:这个的实际语言将取决于您的语言环境。EEEE #一周中的全文。 例如:Monday, Tuesday,...注意:这个的实际语言将取决于您的语言环境。
对于非格式化语法,您需要在值的周围放置单引号字符。 例如,如果你解析ISO8601时间,“2015-01-01T01:12:23”这个小小的“T”不是一个有效的时间格式,而你想说“从字面上看,一个T”,你的格式将是 这个:“yyyy-MM-dd’T'HH:mm:ss”
其他不太常见的日期单位,如时代(G),世纪(C),上午/下午(一),和更多,可以了解joda-time文档:<u style="box-sizing: border-box; outline: 0px; margin: 0px; padding: 0px; word-wrap: break-word;">http://www.joda.org/joda-time/key_format.html</u>
tag_on_failure :
值类型是数组,默认值是[“_dateparsefailure”]。没有成功匹配时,将值追加到标签字段
target :
值类型是字符串,默认值是“@timestamp”。将匹配的时间戳存储到给定的目标字段中。 如果未提供,则默认更新事件的@timestamp字段。
timezone:
值类型是字符串,这个设置没有默认值。指定要用于日期分析的时区标准ID。有效的ID列在Joda.org可用时区页面上。如果时区无法从数值中提取,那么这非常有用,并且不是平台默认值。 如果没有指定,将使用平台默认值。 Canonical ID是很好的,因为它为您节省了夏令时。例如,America/Los_Angeles或Europe/Paris是有效的ID。该字段可以是动态的,并使用%{field}语法包含事件的一部分
2.2 常规选项
前面翻译好多次了这里就不翻译了。
博文来自:www.51niux.com
二、来走一下date的小例子
2.1 先不加date过滤插件查看一下logstash的输出:
这里搞了个三月份的旧日志来测试一下
下面是stdout { codec => rubydebug}的输出:
Bash
"@timestamp" => 2017-12-20T08:39:07.982Z, "request_time" => "0.040", "nginx.access.method" => "GET", "nginx.access.time" => "14/Mar/2017:00:00:02 +0800",
如果不用 date 插件,那么 Logstash 将处理时间作为时间戳。时间戳字段是 Logstash 自己添加的内置字段 @timestamp,也就是说如果你没有设置date过滤器,时间是因为这个时间是UTC时间,那么timestamp时间呢就是比北京时间采集时间少8个小时,也就是说如果我们的采集时间是16:39:07秒,那么timestamp的时间就是08:39:07。
下面是es上面的查看:
可以看到Kibana显示的信息跟ES里面记录的时间是不一致的,因为Logstash采集到数据之后插入到ES中,所以ES中保留的时间也是UTC的时间,创建索引也是按照Logstash里面的时间创建的。但是Kibana呢是根据你当前浏览器的时区然后再在timestamp基础上加减时间的,所以kibana显示的时间是我们以为正确的。
博文来自:www.51niux.com
3.2 加上date插件之后再来试一试
[14/Mar/2017:06:57:12 +0800] #怎么在logstash里面把这个转换成"2017-11-20 12:00:00",直接这种格式我是真不会......。在nginx里面还好直接指定日志格式'{"timestamp":"$time_iso8601",',然后logstash里面通过match => ["timestamp", "ISO8601"]匹配到。
我这里将日志传送到ES集群里面也不是主要靠filebeat采集数据到Logstash,然后Logstash再进行grok转换成json格式。如果数据很大的话grok转换成json格式挺耗资源的,所以都是事先将日志通过程序转换成json格式。
Bash
{"agent":"Windows 7","client_time":"2017-11-20 12:00:00","client_ip":"123.10.91.106"}
类似于上面这种格式当然里面的内容要很多啊,我这里特意把日志时间往前调了一个月方便对比哈。
然后logstash直接通过file的codec => "json"格式采集就可以了,信息都是以json格式存储在logstash本地的。
$ vim /home/elk/logstash/conf/date-test.conf
Bash
input { file { path => "/usr/local/nginx/logs/access.*" codec => "json" }}filter { date { match => ["client_time", "yyyy-MM-dd HH:mm:ss"] #这里是如果client_time跟后面的格式匹配上了就会去替换,替换什么呢?target默认指的就是@timestamp,所以就是以client_time的时间更新@timestamp的时间 }}output { stdout { codec => rubydebug } elasticsearch { hosts => ["192.168.14.60","192.168.14.61","192.168.14.62","192.168.14.63","192.168.14.64"] index => "logstash-nginx-access-test-51niux-com-%{+YYYY.MM}" }}
$ /home/elk/logstash/bin/logstash -f /home/elk/logstash/conf/date-test.conf #启动logstash,然后插入一条测试数据类似于我上面展示的那种json格式到被采集文件中去
logstash打印到屏幕的输出查看:
Bash
{ "path" => "/usr/local/nginx/logs/access.log", "agent" => "Windows 7", "@timestamp" => 2017-11-20T04:00:00.000Z, #从屏幕端的数据可以看到timestamp时间已经不是采集时间而是采集数据里面的日志时间了,只不过将时间转变成了UTC时间。 "client_time" => "2017-11-20 12:00:00", "@version" => "1", "host" => "localhost.localdomain", "client_ip" => "123.10.91.106"}
Kibana上面的查看:
注意我特意标红了啊,因为当前是12月份所以你得把时间调整到十一月,也就是上图右上角的地方可以选择时间范围。
ES上面的索引查看:
这其实挺有用的,因为你日志排序的时候包括分析的时候以及创建索引的时候如果你不用date转换的话,默认都是走的采集时间,如果你是实时日志那倒是差别不是太大,但是采集的日志如果不是很实时呢,比如说是离线转换格式日志,还比如说filebeat会把日志存会再发送等等,那你日志产生的时间戳就不对了,然后呢实现的思路肯定有很多,重点就是date插件这个知识点,了解掌握住就哦了,剩下的就是看需求去嗨了。
</article>
Logstash详解之——filter模块
ritit 2017-08-01 17:57:58 浏览18869 评论0
摘要: Logstash三个组件的第二个组件,也是真个Logstash工具中最复杂,最蛋疼的一个组件,当然,也是最有作用的一个组件。 1、grok插件 grok插件有非常强大的功能,他能匹配一切数据,但是他的性能和对资源的损耗同样让人诟病。
Logstash三个组件的第二个组件,也是真个Logstash工具中最复杂,最蛋疼的一个组件,当然,也是最有作用的一个组件。
1、grok插件 grok插件有非常强大的功能,他能匹配一切数据,但是他的性能和对资源的损耗同样让人诟病。
filter{
grok{
#只说一个match属性,他的作用是从message 字段中吧时间给抠出来,并且赋值给另个一个字段logdate。
#首先要说明的是,所有文本数据都是在Logstash的message字段中中的,我们要在过滤器里操作的数据就是message。
#第二点需要明白的是grok插件是一个十分耗费资源的插件,这也是为什么我只打算讲解一个TIMESTAMP_ISO8601正则表达式的原因。
#第三点需要明白的是,grok有超级多的预装正则表达式,这里是没办法完全搞定的,也许你可以从这个大神的文章中找到你需要的表达式
#http://blog.csdn.net/liukuan73/article/details/52318243
#但是,我还是不建议使用它,因为他完全可以用别的插件代替,当然,对于时间这个属性来说,grok是非常便利的。
match => ['message','%{TIMESTAMP_ISO8601:logdate}']
}
}
2、mutate插件 mutate插件是用来处理数据的格式的,你可以选择处理你的时间格式,或者你想把一个字符串变为数字类型(当然需要合法),同样的你也可以返回去做。可以设置的转换类型 包括: "integer", "float" 和 "string"。
filter {
mutate {
#接收一个数组,其形式为value,type
#需要注意的是,你的数据在转型的时候要合法,你总是不能把一个‘abc’的字符串转换为123的。
convert => [
#把request_time的值装换为浮点型
"request_time", "float",
#costTime的值转换为整型
"costTime", "integer"
]
}
}
3、ruby插件 官方对ruby插件的介绍是——无所不能。ruby插件可以使用任何的ruby语法,无论是逻辑判断,条件语句,循环语句,还是对字符串的操作,对EVENT对象的操作,都是极其得心应手的。
filter {
ruby {
#ruby插件有两个属性,一个init 还有一个code
#init属性是用来初始化字段的,你可以在这里初始化一个字段,无论是什么类型的都可以,这个字段只是在ruby{}作用域里面生效。
#这里我初始化了一个名为field的hash字段。可以在下面的coed属性里面使用。
init => [field={}]
#code属性使用两个冒号进行标识,你的所有ruby语法都可以在里面进行。
#下面我对一段数据进行处理。
#首先,我需要在把message字段里面的值拿到,并且对值进行分割按照“|”。这样分割出来的是一个数组(ruby的字符创处理)。
#第二步,我需要循环数组判断其值是否是我需要的数据(ruby条件语法、循环结构)
#第三步,我需要吧我需要的字段添加进入EVEVT对象。
#第四步,选取一个值,进行MD5加密
#什么是event对象?event就是Logstash对象,你可以在ruby插件的code属性里面操作他,可以添加属性字段,可以删除,可以修改,同样可以进行树脂运算。
#进行MD5加密的时候,需要引入对应的包。
#最后把冗余的message字段去除。
code => "
array=event。get('message').split('|')
array.each do |value|
if value.include? 'MD5_VALUE'
then
require 'digest/md5'
md5=Digest::MD5.hexdigest(value)
event.set('md5',md5)
end
if value.include? 'DEFAULT_VALUE'
then
event.set('value',value)
end
end
remove_field=>"message"
"
}
}
4、date插件 这里需要合前面的grok插件剥离出来的值logdate配合使用(当然也许你不是用grok去做)。
filter{
date{
#还记得grok插件剥离出来的字段logdate吗?就是在这里使用的。你可以格式化为你需要的样子,至于是什么样子。就得你自己取看啦。
#为什什么要格式化?
#对于老数据来说这非常重要,应为你需要修改@timestamp字段的值,如果你不修改,你保存进ES的时间就是系统但前时间(+0时区)
#单你格式化以后,就可以通过target属性来指定到@timestamp,这样你的数据的时间就会是准确的,这对以你以后图表的建设来说万分重要。
#最后,logdate这个字段,已经没有任何价值了,所以我们顺手可以吧这个字段从event对象中移除。
match=>["logdate","dd/MMM/yyyy:HH:mm:ss Z"]
target=>"@timestamp"
remove_field => 'logdate'
#还需要强调的是,@timestamp字段的值,你是不可以随便修改的,最好就按照你数据的某一个时间点来使用,
#如果是日志,就使用grok把时间抠出来,如果是数据库,就指定一个字段的值来格式化,比如说:"timeat", "%{TIMESTAMP_ISO8601:logdate}"
#timeat就是我的数据库的一个关于时间的字段。
#如果没有这个字段的话,千万不要试着去修改它。
}
}
5、json插件,这个插件也是极其好用的一个插件,现在我们的日志信息,基本都是由固定的样式组成的,我们可以使用json插件对其进行解析,并且得到每个字段对应的值。
filter{
#source指定你的哪个值是json数据。
json {
source => "value"
}
#注意:如果你的json数据是多层的,那么解析出来的数据在多层结里是一个数组,你可以使用ruby语法对他进行操作,最终把所有数据都装换为平级的。
}
json插件还是需要注意一下使用的方法的,下图就是多层结构的弊端:
[图片上传失败...(image-a3c912-1556457723389)]
对应的解决方案为:
ruby{
code=>"
kv=event.get('content')[0]
kv.each do |k,v|
event.set(k,v)
end"
remove_field => ['content','value','receiptNo','channelId','status']
}
Logstash filter组件的插件基本介绍到这里了,这里需要明白的是:
add_field、remove_field、add_tag、remove_tag 是所有 Logstash 插件都有。相关使用反法看字段名就可以知道。不如你也试试吧。。。。