Logstash 是一款强大的数据处理工具,它可以实现数据传输,格式处理,格式化输出,常用于日志处理、或一些具有一定格式的数据导入到ES的处理。
Logstash 的安装使用步骤:
1、下载安装包
wget https://artifacts.elastic.co/downloads/logstash/logstash-5.6.1.tar.gz
2、解压安装包 : tar -zxvf logstash-5.6.1.tar.gz
部分数据集较大,需要在/bin/logstash目录下添加: exportLS_HEAP_SIZE=10g
3、在config路径下创建配置文件,如 test.conf
4、进入 logstash-5.6.1目录,执行bin/logstash -f test.conf
Logstash 的三个阶段:
input 数据输入端,接收源数据,如:
file:从文件中读取
syslog:监听系统日志信息,并解析成RFC3164格式。
Filter 数据规范,进行格式处理,如:
grok: 通过正则解析和结构化任何文本日志。
mutate: 对字段进行转换,如重命名、删除、替换字段等。
drop: 丢弃字段(日志)
output 将数据输出到指定位置,如:
file: 将事件数据写入到文件内;
elasticsearch: 发送事件数据到 Elasticsearch;
logstash导入数据时常用的参数:
参数 | 类型 | 描述 | 示例 |
---|---|---|---|
path | array | 用来设置导入目标文件的文件路径 | path => [ "/data/test.log", "/data/.txt" ] path => "/data/.txt" |
exclude | array | 排除文件,支持glob 展开 | exclude => "/data/*.txt" |
codec | codec | 用于对规范数据数据进行解码 | codec => json |
discover_interval | number | 设置logstash读取新文件的时间间隔,默认值是15秒 | discover_interval => 15 |
max_open_files | number | 配置当前input可以监控的文件的最大值 | max_open_files => 10 |
close_older | number | 设置结束时间,即如果在限制时间段内没有更新内容,就关闭监听它的文件句柄,默认是3600秒 | close_older => 3600 |
delimiter | string | 设置行与行之间的分隔符,默认使用\n | delimiter => "\n" |
sincedb_path | string | 记录数据读取的位置,配置存放该位置的文件路径 | sincedb_path => "/data/sincedb_test.txt" |
sincedb_write_interval | number | logstash 每隔多久写一次sincedb文件,默认是15秒 | sincedb_write_interval => 15 |
start_position | string | 设置logstash是从“beginning”还是“end”去读取文件,默认是“end” | start_position => "beginning" |
stat_interval | number | logstash每隔多久检查被监听文件是否有更新,默认是1秒 | stat_interval => 1 |
tags | array | 添加一个任意的数字作为当前事件的标签 | tags => 12 |
add_field | hash | 在事件中加入一个field | add_field => { test } |
type | string | type属性主要在filter场景中使用 | type => "testlog" |
从elasticsearch导出数据至文件 demo :
input {
elasticsearch {
hosts => "192.168.31.130:9200" #配置 elasticsearch的地址及index
index => "test"
query => '{ "query": {"match_all" : {} } }' #配置匹配模式
size => 10000 #配置遍历数据
scroll => "5m" # 配置遍历时间间隔
docinfo => true
}
}
output {
file {
#配置文件导出路径
path => "/data01/test.txt"
}
}
将json文件导入elasticsearch配置demo:
input {
file {
path => "/data/*.log" #配置读取的文件
discover_interval => 5
start_position => "beginning" #从文件开始位置读取
sincedb_path => "/data/sincedb_test.txt" #记录读取的位置
sincedb_write_interval => 15
codec => json { #配置文本类型
charset => "UTF-8"
}
}
}
output {
elasticsearch {
hosts => "192.168.31.130:9200"
index => "test"
}
}
从elasticsearch导入到另一个elasticsearch配置demo:
input {
elasticsearch {
hosts => "192.168.31.130:9200"
index => "test*"
query => '{ "query": {"match_all" : {} } }'
size => 10000
scroll => "5m"
docinfo => true
}
}
output {
elasticsearch {
hosts => "192.168.31.130:9200"
index => "%{[@metadata][_index]}"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
}
}
持续更新。。。