背景
从某app里获得了一堆日志,需要从日志中过滤信息,存入ES,然后再统计分析。
原始数据
已经通过kafka存入es,内容结构大致如下:
personID,groupID,logType,content
1,20,wx,{"signal":100}
1,20,wx,{"sys":"ios"}
2,20,wx,{"signal":10, "network": "4g"}
1,20,wx,{"signal":80}
日志索引存储了四个字段,用户ID(personID),组ID(groupID),日志类型(logType),日志内容(content)
其中,日志内容是JSON。每条日志里,存储的content内容格式不同,字段也不确定。
目标
从日志数据中,把和signal相关的数据过滤出来,进行统计分析。
技术选型
- 通过logstash快速将es中匹配的数据导入新索引中
- 使用kibana进行目标数据的初步统计分析
思路拆解
【1】
从日志数据中,过滤出content里的signal字段,字段是整形,数值理论上落在-100 ~ 100的区间里。然后用一个索引:person_device_signal存储起来。索引需要存储两个字段:personID, groupID, signal
然后就能用kibana对索引进行关于signal的各种平均值,最值,方差等等的统计计算,也可以personID,groupID进行分桶然后再统计
【2】
logstash运行导入配置的命令是
./bin/logstash -f xxx.conf
【3】
logstash的导入配置.conf文件分为三个部分:
- input
- filter
- output
【4】input,对数据输入源的定义。
在这里,输入源是:es
先确认查询语句,找到所有匹配signal的
{
"query": {
"match": {
"content": "signal"
}
},
"sort": [ "_doc" ]
}
input {
elasticsearch {
hosts => "localhost"
index => "person_device_log"
query => '{"query": {"match": {"content": "signal"}},"sort": [ "_doc" ]}'
size => 500
scroll => "5m"
docinfo => true
}
}
【5】filter,对输入的数据进行处理
filter {
json {
source => "content"
}
prune {
whitelist_names => [ 'personID', 'groupID', 'signal']
}
}
使用prune指定想要的字段才写入新索引,避免json解析出来的其它附带字段
【6】output指定对应索引名称
output {
elasticsearch {
hosts => "http://elasticsearch:9200"
index => "person_device_signal"
document_id => "%{id}"
}
stdout {}
}
注:document_id一定要写上,保证数据不会因为重复执行命令重复写入