Logstash
数据搜集处理引擎,支持 200 多个插件。
Logstash | 架构简介
Codec(Code / Decode):将原始数据 decode 成 Event,再将 Event encode 成目标数据。
Logstash | 相关概念
Pipeline
- 包含了 input-filter-output 三个阶段的处理流程;
- Logstash 支持 200 多个插件,并支持插件的生命周期管理;
- Logstash 有自己的队列;
Event
- Event 是数据在内部流转时的具体表现形式,数据在 input 阶段被转换为 Event,在 output 被转换成目标格式数据;
- Event 其实是一个 Java Object,在 filter 阶段可以对 Event 的属性进行增删改查;
Input Plugins
一个 Pipeline 可以有多个 input 插件:
- Stdin / File
- Beats / Log4J / Elasticsearch / JDBC / Kafka / Rabbitmq / Redis
- JMX / HTTP / Websocket / UDP / TCP
- Google Cloud Storage / S3
- Github / Twitter
Input Plugin | File
- 支持从文件中读取数据;
- 文件读取需要解决的问题:文件只能被读取一次,重启后,需要从上次读取的位置继续,通过 sincedb 实现;
- 读取到文件新内容,发现新文件;
- 文件发生归档操作(文档位置发生变化,日志 Rotation),不能影响当前的内容读取;
Output Plugin
将 Event 发送到特定的目的地,是 Pipeline 的最后一个阶段:
- ElasticSearch
- Email / Pageduty
- Influxdb / Kafka / MongoDB / Opensdb / Zabbix
- HTTP / TCP / Websocket
Codec Plugin
将原始数据 decode 成 Event;将 Event encode 成目标数据:
- Line / Multiline
- JSON / Avro / Cef(ArcSight Common Event Format)
- Dots / Rubydebug
Filter Plugin
Filter Plugin 可以对 Logstash Event 进行各种处理,例如解析字段、删除字段、类型转换:
- Date - 日期解析;
- Dissect - 分隔符解析;
- Grok - 正则匹配解析;
- Mutate - 处理字段、重命名、删除、替换;
- Metircs - 聚合 Metrics;
- Ruby - 利用 Ruby 代码来动态修改 Event;
Filter Plugin | Mutate
对字段做各种操作:
- Convert - 类型转换;
- Gsub - 字符串替换;
- Split / Join / Merge - 切割 / 数组合并成字符串 / 数组合并;
- Rename - 字段重命名;
- Update / Replace - 字段内容更新替换;
- Remove_field - 字段删除;
Queue
In Memory Queue
进程 Crash,机器宕机都会引起数据的丢失。
Persistent Queue
进程 Crash,机器宕机也不会丢失数据;数据保证会被消费,可以替代 Kafka 等消息队列缓冲区的作用。
Codec Plugins - Single Line | 举几个栗子
sudo bin/logstash -e "input{stdin{codec=>line}}output{stdout{codec=> rubydebug}}"
sudo bin/logstash -e "input{stdin{codec=>json}}output{stdout{codec=> rubydebug}}"
sudo bin/logstash -e "input{stdin{codec=>line}}output{stdout{codec=> dots}}"
Codec Plugin - Multiline | 举个栗子
运行 multiline-exception.conf
sudo bin/logstash -f multiline-exception.conf
multiline-exception.conf 中的内容
- 以字母开头的输入会被匹配到;
- 匹配到的内容属于上一个 Event,需要再输入一个以字母开头的文本,Logstash 才会有输出;
input {
stdin {
codec => multiline {
pattern => "^\s"
what => "previous"
}
}
}
filter {}
output {
stdout { codec => rubydebug }
}
输入一段 Java 的堆栈异常作为多行数据
Exception in thread "main" java.lang.NullPointerException
at com.example.myproject.Book.getTitle(Book.java:16)
at com.example.myproject.Author.getBookTitles(Author.java:25)
at com.example.myproject.Bootstrap.main(Bootstrap.java:14)
再输入以字母开头的文本
hello word
输出上一段匹配到的 Java 的堆栈信息;
拿个 logstash.conf 出来分析一下
- 没有用到 sincedb,所以
sincedb_path => "/dev/null"
;
input {
file {
path => "/home/lixinlei/data/movielens/ml-latest-small/movies.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
- movies.csv 大概长这样:
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
...
- 每行的数据用
,
分割,分割出来的内容,分别归属到列 id,content,genre 下; - 列 genre 下的数据用
|
分割; - 列 content 下的数据用
(
分割,分割出来的第 1 个字符串存到新的列下,新的列叫 title;分割出来的第 2 个字符串存到新的列下,新的列叫 year; - 把列 year 的数据类型转为 integer;
- 去掉列 title 中的空格;
filter {
csv {
separator => ","
columns => ["id","content","genre"]
}
mutate {
split => { "genre" => "|" }
remove_field => ["path", "host","@timestamp","message"]
}
mutate {
split => ["content", "("]
add_field => { "title" => "%{[content][0]}"}
add_field => { "year" => "%{[content][1]}"}
}
mutate {
convert => {
"year" => "integer"
}
strip => ["title"]
remove_field => ["path", "host","@timestamp","message","content"]
}
}
- 把读取到的,转换过的信息写入 ElasticSearch 中;
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "movies"
document_id => "%{id}"
}
stdout {}
}