使用Filebeat + ES + Kibina的组合进行日志收集的一个优点就是轻量级,因为去掉了笨重的logstash, 占用资源更少。但这也引入了一个问题,即filebeat并没有logstash那样强大的日志解析能力,往往只能把整条日志当成一个整体扔到ES中。好消息是,ES从5.x版本开始引入了Ingest Node
,即允许你在对文档进行索引之前进行预处理,且支持logstash的Grok语法。因此我们可以在ES中针对我们的日志格式创建一个预处理pipeline, 通过配置pipeline中的processor完成日志解析。
以下面这条日志为基础举例:
[2019-02-19 17:04:28:017] http-nio-8050-exec-2 INFO c.b.o.xxx.ms.api.TaskController - response = {"jobId":"123","ms":10}
我们期望能够将这条日志中的时间2019-02-19 17:04:28:017
、线程http-nio-8050-exec-2
、日志级别INFO
、Java类名c.b.o.xxx.ms.api.TaskController
和日志正文response = {"jobId":"123","ms":10}
分别提取出来方便我们日后在kibana中做筛选统计, 同时时间要以日志中打印的时间为基准而不是filebeat发送消息时的时间。为了实现这一目标,我们可以向ES发一个HTTP请求创建一个名为xxx-log
的pipeline:
PUT /_ingest/pipeline/xxx-log HTTP/1.1
Host: localhost:8200
Content-Type: application/json
{
"description" : "xxx-log",
"processors": [
{
"grok": {
"field": "message",
"patterns": ["\\[%{TIMESTAMP_ISO8601:log_time}\\] %{NOTSPACE:thread} %{NOTSPACE:log_level} %{NOTSPACE:java_class} - %{GREEDYDATA:content}"]
}
},
{
"date": {
"field": "log_time",
"formats": ["yyyy-MM-dd HH:mm:ss:SSS"],
"timezone": "Asia/Shanghai",
"target_field": "@timestamp"
}
}
]
}
在这里我们定义了两个processor
,第一个为grok
处理器,用于解析日志字符串提取其中关键字段; 第二个是日期处理器,功能为把log_time
字段以yyyy-MM-dd HH:mm:ss:SSS
格式解析成日期,然后将结果保存到@timestamp
字段中。
创建完processor以后,我们只需要配置filebeat在输出日志到ES时使用这个名为xxx-log
的预处理器即可:
这样就完成了所有的工作。这时启动filebeat, 如果如出以下错误信息
ERROR pipeline/output.go:92 Failed to publish events: temporary bulk send failure
大概率是因为你发送的日志格式无法与grok表达式匹配,修改processor定义json即可。也可以在启动filebeat时添加-d "*"
参数来查看具体的错误原因。
下图是日志在kibana中的展示效果:
可以看到主要的字段都已经被正确解析。