需求:
做一个收集Nginx的access log和error log并绘制图表
解决方案
采用filebeat6.6.2
在Nginx应用服务器上收集日志,经过kafka2.1.1
(zookeeper
集群)消息队列中间件传入到logstash
进行过滤解析,然后存储到elasticsearch
,最终由kibana
进行查询和制图。并且elk
、filebeat
和kafka
都采用docker
的方式进行部署,采用docker-compose
进行编排方便线上的维护
使用filebeat的原因:对比logstash,filebeat更加轻量,且消耗资源更少
采用kafka作为中间件原因:避免直接传入logstash引起的io瓶颈,同时具有较高吞吐量,而且比较稳定,处理消息的效率很高
PS:当看到kibana正常load数据的时候,有几点感触
1.一定要好好看elk和filebeat的官方文档,知道filebeat的output和logstash的input的各个参数使用
2.使用docker部署kafka & zookeeper会有个坑,可能也是楼主对docker网络和kafka不熟悉的原因造成的
实际场景
应用程序 | 虚拟机IP |
---|---|
Nginx | 10.150.33.123 |
部署情况
应用程序 | 虚拟机IP | 备注 | 端口 |
---|---|---|---|
Nginx | 10.150.33.123 | 日志文件位于/var/log/nginx | |
filebeat | 10.150.33.123 | ||
kafka & zookeeper | 10.150.33.126 | zookeeper集群,单机kafka | 9092、2181 |
elk | 10.150.33.126 | 需与kafka处于同一network | 9200,5601,9300,5000,9600 |
Nginx
/var/log/nginx下有文件,Nginx中的access和error日志格式是不同的,需要通过logstash进行处理
- www.test.com.access.log
- www.test.com.error.log
- api.test.com.access.log
- api.test.com.error.log
- openapi.test.com.access.log
- openapi.test.com.error.log
filebeat部署
先看filebeat的目录结构如图
第一步:编辑Dockerfile
FROM docker.elastic.co/beats/filebeat:6.6.2
##enable Nginx modules,注意这里需要开启filebeat的Nginx模块
RUN /usr/share/filebeat/filebeat modules enable nginx
COPY filebeat.yml /usr/share/filebeat/filebeat.yml
USER root
RUN chown root:filebeat /usr/share/filebeat/filebeat.yml
USER filebeat
第二步:编辑docker-compose.yml
version: '2.3'
services:
beat:
build:
context: ${PWD}/.
user: root
environment:
- BEAT_STRICT_PERMS=false
restart: always
volumes:
#filebeat.yml作为配置文件,ro表示只读
- ${PWD}/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
#为了filebeat的module.d配置nginx.yml
- ${PWD}/modules.d:/usr/share/filebeat/modules.d
- /var/lib/docker/containers:/var/lib/docker/containers:ro
# We launch docker containers to test docker autodiscover:
- /var/run/docker.sock:/var/run/docker.sock
#把logs和data共享出来
- ${PWD}/logs:/usr/share/filebeat/logs
- ${PWD}/data:/usr/share/filebeat/data
##为了读取外部的日志文件,开启共享,下面举例为读取外部Nginx的/var/logs/nginx里的日志
- /var/log/nginx:/var/log/nginx
extra_hosts:
## 这里是为了解决kafka的一个坑,需要在filebeat上作hosts的映射
- "kafka:10.150.33.126"
第三步:编辑module.d的nginx.yml文件
可以通过下载filebeat的安装包然后把里面的module.d文件夹复制过来
然后把nginx.yml.disabled
修改名为nginx.yml
,然后编辑如下:
# Module: nginx
# Docs: https://www.elastic.co/guide/en/beats/filebeat/master/filebeat-module-nginx.html
- module: nginx
# Access logs
access:
enabled: true
# Set custom paths for the log files. If left empty,
# Filebeat will choose the paths depending on your OS.
#var.paths:
var.paths: ["/var/log/nginx/*.test.com.access.log"]
# Convert the timestamp to UTC. Requires Elasticsearch >= 6.1.
var.convert_timezone: true
# Error logs
error:
enabled: true
# Set custom paths for the log files. If left empty,
# Filebeat will choose the paths depending on your OS.
var.paths: ["/var/log/nginx/*.test.com.error.log"]
# Convert the timestamp to UTC. Requires Elasticsearch >= 6.1.
var.convert_timezone: true
具体参考官方文档 Nginx Filebeat module
第四步:编辑filebeat.yml
###################### Filebeat Configuration Example #########################
# This file is an example configuration file highlighting only the most common
# options. The filebeat.reference.yml file from the same directory contains all the
# supported options with more comments. You can use it as a reference.
#
# You can find the full configuration reference here:
# https://www.elastic.co/guide/en/beats/filebeat/index.html
# For more available modules and options, please see the filebeat.reference.yml sample
# configuration file.
#=========================== Filebeat inputs =============================
filebeat.inputs:
# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.
# - type: log
# # Change to true to enable this input configuration.
# enabled: false
# # Paths that should be crawled and fetched. Glob based paths.
# paths:
# - /var/log/*.log
#- c:\programdata\elasticsearch\logs\*
# Exclude lines. A list of regular expressions to match. It drops the lines that are
# matching any regular expression from the list.
#exclude_lines: ['^DBG']
# Include lines. A list of regular expressions to match. It exports the lines that are
# matching any regular expression from the list.
#include_lines: ['^ERR', '^WARN']
# Exclude files. A list of regular expressions to match. Filebeat drops the files that
# are matching any regular expression from the list. By default, no files are dropped.
#exclude_files: ['.gz$']
# Optional additional fields. These fields can be freely picked
# to add additional information to the crawled log files for filtering
#fields:
# level: debug
# review: 1
### Multiline options
# Multiline can be used for log messages spanning multiple lines. This is common
# for Java Stack Traces or C-Line Continuation
# The regexp Pattern that has to be matched. The example pattern matches all lines starting with [
#multiline.pattern: ^\[
# Defines if the pattern set under pattern should be negated or not. Default is false.
#multiline.negate: false
# Match can be set to "after" or "before". It is used to define if lines should be append to a pattern
# that was (not) matched before or after or as long as a pattern is not matched based on negate.
# Note: After is the equivalent to previous and before is the equivalent to to next in Logstash
#multiline.match: after
#============================= Filebeat modules ===============================
filebeat.config.modules:
# Glob pattern for configuration loading
##加载modules的配置
path: ${path.config}/modules.d/*.yml
# Set to true to enable config reloading
reload.enabled: true
# Period on which files under path should be checked for changes
#reload.period: 10s
#==================== Elasticsearch template setting ==========================
setup.template.settings:
index.number_of_shards: 1
#index.codec: best_compression
#_source.enabled: false
#================================ General =====================================
# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
##filebeat.name配不配都行
filebeat.name: "nginx"
# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]
##filbeat.tags配不配都行
filebeat.tags: ["nginx", 'access', 'log', 'error']
# Optional fields that you can specify to add additional information to the
# output.
#fields:
# env: staging
#============================== Dashboards =====================================
# These settings control loading the sample dashboards to the Kibana index. Loading
# the dashboards is disabled by default and can be enabled either by setting the
# options here, or by using the `-setup` CLI flag or the `setup` command.
#setup.dashboards.enabled: false
# The URL from where to download the dashboards archive. By default this URL
# has a value which is computed based on the Beat name and version. For released
# versions, this URL points to the dashboard archive on the artifacts.elastic.co
# website.
#setup.dashboards.url:
#============================== Kibana =====================================
# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.
# This requires a Kibana endpoint configuration.
#setup.kibana:
# Kibana Host
# Scheme and port can be left out and will be set to the default (http and 5601)
# In case you specify and additional path, the scheme is required: http://localhost:5601/path
# IPv6 addresses should always be defined as: https://[2001:db8::1]:5601
#host: "localhost:5601"
# Kibana Space ID
# ID of the Kibana Space into which the dashboards should be loaded. By default,
# the Default Space will be used.
#space.id:
#============================= Elastic Cloud ==================================
# These settings simplify using filebeat with the Elastic Cloud (https://cloud.elastic.co/).
# The cloud.id setting overwrites the `output.elasticsearch.hosts` and
# `setup.kibana.host` options.
# You can find the `cloud.id` in the Elastic Cloud web UI.
#cloud.id:
# The cloud.auth setting overwrites the `output.elasticsearch.username` and
# `output.elasticsearch.password` settings. The format is `<user>:<pass>`.
#cloud.auth:
#================================ Outputs =====================================
# Configure what output to use when sending the data collected by the beat.
#-------------------------- Elasticsearch output ------------------------------
# 这里我们不输出到es,果断注释掉
# output.elasticsearch:
# # Array of hosts to connect to.
# hosts: ["localhost:9200"]
# Enabled ilm (beta) to use index lifecycle management instead daily indices.
#ilm.enabled: false
# Optional protocol and basic auth credentials.
#protocol: "https"
#username: "elastic"
#password: "changeme"
#----------------------------- Logstash output --------------------------------
# 这里我们不输出到logstash,果断注释掉
# output.logstash:
# # The Logstash hosts
# hosts: ["0.0.0.0:5044"]
# Optional SSL. By default is off.
# List of root certificates for HTTPS server verifications
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Certificate for SSL client authentication
#ssl.certificate: "/etc/pki/client/cert.pem"
# Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"
#----------------------------- Redis output --------------------------------
# 这里我们不用redis作为中间件,所以注释掉,可以参考博主的配置,但最好看回官方文档了解每个参数的作用
# output.redis:
# ## The redis hsots
# hosts: ["10.150.33.126"]
# ## redis password
# ## password: "my_password"
# datatype: "list"
# key: "nginx_log"
# # 存储的key
# keys:
# - key: "nginx_www_access_log"
# when.equals:
# source: "/var/log/nginx/logs/www.test.com.access.log"
# - key: "nginx_openapi_access_log"
# when.equals:
# source: "/var/log/nginx/logs/openapi.test.com.access.log"
# - key: "nginx_api_access_log"
# when.equals:
# source: "/var/log/nginx/logs/api.test.com.access.log"
# - key: "nginx_www_error_log"
# when.equals:
# source: "/var/log/nginx/logs/www.test.com.error.log"
# - key: "nginx_openapi_error_log"
# when.equals:
# source: "/var/log/nginx/logs/openapi.test.com.error.log"
# - key: "nginx_api_error_log"
# when.equals:
# source: "/var/log/nginx/logs/api.test.com.error.log"
# ##存储的db
# db: 0
# ##超时
# timeout: 60
#----------------------------- Kafka output --------------------------------
output.kafka:
##kafka位于10.150.33.126上,这里由于在docker-compose.yml上做了映射,所以配置上kafka即可
hosts: ["kafka:9092"]
##key: "nginx_log"
#存储的key
#为了针对不同的source文件分配不同的topic,这样后面可以在es上分配不同的index
topics:
- topic: "nginx-www-access-log"
when.equals:
source: "/var/log/nginx/www.test.com.access.log"
- topic: "nginx-openapi-access-log"
when.equals:
source: "/var/log/nginx/openapi.test.com.access.log"
- topic: "nginx-api-access-log"
when.equals:
source: "/var/log/nginx/api.test.com.access.log"
- topic: "nginx-www-error-log"
when.equals:
source: "/var/log/nginx/www.test.com.error.log"
- topic: "nginx-openapi-error-log"
when.equals:
source: "/var/log/nginx/openapi.test.com.error.log"
- topic: "nginx-api-error-log"
when.equals:
source: "/var/log/nginx/api.test.com.error.log"
##超时
timeout: 60
##partition策略必须为random、round_robin或者hash的其中一个
##在分区程序选择下一个分区之前,设置要发布到同一分区的事件数。默认值为1,表示每次事件后将选择下一个分区。
partition.round_robin:
reachable_only: false
##ACK可靠性级别要求,0:无响应,1:等待本地提交,-1:等待所有副本提交,默认值为1
required_acks: 1
## none,snappy,lz4或gzip其中一个
compression: gzip
##超过1000000字节的事件会被丢弃
max_message_bytes: 100000000
#================================ Processors =====================================
# Configure processors to enhance or manipulate events generated by the beat.
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
#================================ Logging =====================================
# Sets log level. The default log level is info.
# Available log levels are: error, warning, info, debug
#logging.level: debug
# At debug level, you can selectively enable logging only for some components.
# To enable all selectors use ["*"]. Examples of other selectors are "beat",
# "publish", "service".
#logging.selectors: ["*"]
#============================== Xpack Monitoring ===============================
# filebeat can export internal metrics to a central Elasticsearch monitoring
# cluster. This requires xpack monitoring to be enabled in Elasticsearch. The
# reporting is disabled by default.
# Set to true to enable the monitoring reporter.
#xpack.monitoring.enabled: false
# Uncomment to send the metrics to Elasticsearch. Most settings from the
# Elasticsearch output are accepted here as well. Any setting that is not set is
# automatically inherited from the Elasticsearch output configuration, so if you
# have the Elasticsearch output configured, you can simply uncomment the
# following line.
#xpack.monitoring.elasticsearch:
具体参考官方文档如下:
filebeat.yml
filebeat.output for kafka
第五步:运行
在项目根目录中运行docker-compose up -d --build
部署kafka2.1.1
kafka & zookeeper则采用 wurstmeister/kafka-docker
把代码拉下来
第一步:编辑docker-compose.yml
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
restart: always
kafka1:
image: kafka_kafka1
ports:
##防止容器被销毁后端口发生变化,而且部署的是kafka单机
- "9092:9092"
depends_on:
##确保在zookeeper后运行
- zookeeper
restart: always
environment:
##固定broker_id为1,防止发生变化把以前的数据丢失了
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_PORT: 9092
##保存数据的时间为168小时,配不配都行
KAFKA_LOG_RETENTION_HOURS: "168"
##超过10000000字节的事件会被丢弃,配不配都行
KAFKA_LOG_RETENTION_BYTES: "100000000"
KAFKA_ADVERTISED_HOST_NAME: 172.19.0.3
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
##预先设好topic,1个leader和一个副本
KAFKA_CREATE_TOPICS: "nginx-www-access-log:1:1,nginx-openapi-access-log:1:1,nginx-api-access-log:1:1,nginx-www-error-log:1:1,nginx-openapi-error-log:1:1,nginx-api-error-log:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./logs:/kafka
extra_hosts:
##这里必须注意,要采用映射的方式,且172.19.0.3表示为kafka在docker网络里的ip,可以通过docker exec kafka1 bash ifconfig查看eth0中得到
- "kafka:172.19.0.3"
第二步:运行
在项目根目录中运行docker-compose up -d --build
部署elk6.6.1
采用的是 deviantony/docker-elk
把代码拉下来
第一步:编辑docker-compose.yml
version: '2.3'
services:
elasticsearch:
build:
context: elasticsearch/
args:
ELK_VERSION: $ELK_VERSION
container_name: elasticsearch
volumes:
- ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml:ro
ports:
- "9200:9200"
- "9300:9300"
restart: always
environment:
##可用配置为docker的1/4,因为要和logstash平分docker的一半堆内存
##默认为docker的1/8,注意,设置太高会使得es的影响其他字段存储
##这里因为126机器为24g内存,所以设为3g
ES_JAVA_OPTS: -Xms3g -Xmx3g
networks:
- elk
logstash:
build:
context: logstash/
args:
ELK_VERSION: $ELK_VERSION
container_name: logstash
volumes:
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
- ./logstash/pipeline:/usr/share/logstash/pipeline:ro
- ./logstash/template:/usr/share/logstash/template
ports:
- "5000:5000"
- "9600:9600"
- "5044:5044" ##这是为了filebeat直接传入logstash,可以关闭该端口
restart: always
environment:
##可用配置为docker的1/4,因为要和es平分docker的一半堆内存
##默认为docker的1/8,注意,设置太高会使得es的影响其他字段存储
##这里因为126机器为24g内存,所以设为3g
LS_JAVA_OPTS: "-Xmx3g -Xms3g"
networks:
- elk
depends_on:
- elasticsearch
##这里做host的映射
extra_hosts:
- "kafka:172.19.0.3"
kibana:
build:
context: kibana/
args:
ELK_VERSION: $ELK_VERSION
container_name: kibana
volumes:
- ./kibana/config/:/usr/share/kibana/config:ro
ports:
- "5601:5601"
restart: always
networks:
- elk
depends_on:
- elasticsearch
##让elk能够和kafka处于同一网段,这样可以读取docker内部的kafka服务
networks:
elk:
external:
name: kafka_default ##kafka_default是kakfa的默认网络
第二步:修改Dockerfile
ARG ELK_VERSION
# https://github.com/elastic/logstash-docker
FROM docker.elastic.co/logstash/logstash-oss:${ELK_VERSION}
##这是为了个让logstash支持kafka输入
##RUN logstash-plugin install logstash-input-beats
##RUN logstash-plugin install logstash-input-redis
RUN logstash-plugin install logstash-input-kafka
# Add your logstash plugins setup here
# Example: RUN logstash-plugin install logstash-filter-json
第三步:修改logstash/pipeline/logstash.conf
input {
##这是支持filebeat input的配置
##filebeat input
# beats {
# port => 5044
# ##codec => "json"
# }
##支持Redis input的配置
##Redis input
# redis {
# key => "nginx_www_access_log"
# host => ["10.150.33.126"]
# port => 6379
# data_type => "list"
# db => 0
# tags => ["nginx", "www", "access"]
# }
##支持kafka input的配置
##kafka input
kafka {
client_id => "beats" ##由于filebeat默认为beats,必须要填写才能有数据进来
bootstrap_servers => "kafka:9092" ##填写对应的kafka所在机器的ip地址和端口,即对应在docker里的ip地址
topics => ["nginx-www-access-log", "nginx-api-access-log", "nginx-openapi-access-log", "nginx-www-error-log", "nginx-api-error-log", "nginx-openapi-error-log"] ##要订阅kafka的对应topic
group_id => "beats" ##设不设的问题都不大,最好设了,毕竟走通了
consumer_threads => 3
codec => "json" ##数据json化才能读取到message数据
decorate_events => true ##decorate_events开启后用mutate才可以拿到topic字段
}
}
## Add your filters / logstash plugins configuration here
filter {
mutate {
##拿到@metadata嵌套里topic字段放到topic字段里
copy => { "@metadata" => "meta" }
add_field => {
"topic" => "%{[meta][topic]}"
}
}
##针对Nginx的access log进行解析
if [fileset][name] == "access" {
dissect {
##匹配,dissect比grok更快更方便使用
mapping => {
"message" => '[%{logtime}] %{remote_addr} %{http_x_forwarded_for} %{remote_user} "%{request}" %{status_code} %{request_time} %{upstream_response_time} %{request_length} %{body_bytes_sent} "%{http_referer}" "%{http_user_agent}"'
}
##不在output中输出以下字段,节省空间
remove_field => [
"beat",
"@version",
"host",
"meta",
"message",
"input",
"offset",
"source",
"fileset",
"event",
"prospector",
"log"
]
##字段类型转化,作为int可以在绘制图表的时候进行聚簇
convert_datatype => {
"request_length" => "int"
"body_bytes_sent" => "int"
"status_code" => "int"
}
}
##日志生成的时间替换掉@timestamp(原本是入es的时间),这里替换掉进行日志按日切割
date {
##这里的yyyy/MM/dd HH:mm:ss是因为error日志里的时间格式是这样,而不是你想要配置成这样的格式
"match" => ["logtime", "dd/MMM/yyyy:HH:mm:ss Z"]
"target" => "@timestamp"
}
}
##针对Nginx的error log进行解析
if [fileset][name] == "error" {
grok {
##匹配error日志格式
match => [ "message" , "(?<logtime>%{YEAR}[./-]%{MONTHNUM}[./-]%{MONTHDAY}[- ]%{TIME}) \[%{LOGLEVEL:severity}\] %{POSINT:pid}#%{NUMBER}: %{GREEDYDATA:errormessage}(?:, client: (?<remote_addr>%{IP}|%{HOSTNAME}))(?:, server: %{IPORHOST:server}?)(?:, request: %{QS:request})?(?:, upstream: (?<upstream>\"%{URI}\"|%{QS}))?(?:, host: %{QS:request_host})?(?:, referrer: \"%{URI:referrer}\")?"]
##不在output中输出以下字段,节省空间
remove_field => [
"beat",
"@version",
"host",
"meta",
"message",
"input",
"offset",
"source",
"fileset",
"event",
"prospector",
"log"
]
}
##日志生成的时间替换掉@timestamp(原本是入es的时间),这里替换掉进行日志按日切割
date {
##这里的yyyy/MM/dd HH:mm:ss是因为error日志里的时间格式是这样,而不是你想要配置成这样的格式
"match" => ["logtime", "yyyy/MM/dd HH:mm:ss"]
"target" => "@timestamp"
}
}
}
output {
##根据filebeat传过来的topic确定对应不同的access文件或者error文件,然后生成es的index
if [topic] == "nginx-www-access-log" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "nginx_www_access_%{+YYYY.MM.dd}" ##根据@timestamp进行index分割,即根据日志的生成时间进行分割
manage_template => true
template => "/usr/share/logstash/template/nginx_mapping.json"
template_name => "nginx-access-log"
}
}
if [topic] == "nginx-api-access-log" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "nginx_api_access_%{+YYYY.MM.dd}"
manage_template => true
template => "/usr/share/logstash/template/nginx_mapping.json"
template_name => "nginx-access-log"
}
}
if [topic] == "nginx-openapi-access-log" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "nginx_openapi_access_%{+YYYY.MM.dd}"
manage_template => true
template => "/usr/share/logstash/template/nginx_mapping.json"
template_name => "nginx-access-log"
}
}
if [topic] == "nginx-www-error-log" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "nginx_www_error_%{+YYYY.MM.dd}"
manage_template => false
}
}
if [topic] == "nginx-api-error-log" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "nginx_api_error_%{+YYYY.MM.dd}"
manage_template => false
}
}
if [topic] == "nginx-openapi-error-log" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "nginx_openapi_error_%{+YYYY.MM.dd}"
manage_template => false
}
}
##输出到控制台
##if [topic] == "nginx-www-error-log" {
## stdout {
## codec => rubydebug
## }
##}
}
第四步:新增logstash/template/nginx_mapping.json模板(这里只是为了玩一下模板,如果不需要,则在logstash.conf去掉对应配置)
{
"index_patterns": ["nginx-access-log"],
"mappings": {
"doc": {
"properties": {
"logtime" : {
"type" : "date",
"format": "dd/MMM/yyyy:HH:mm:ss Z"
},
"http_x_forwarded_for": {
"type": "ip",
"doc_values": true
},
"remote_addr": {
"type": "ip",
"doc_values": true
},
"remote_user": { "type": "keyword" },
"request": { "type": "text" },
"status_code": { "type": "integer" },
"request_time": { "type": "double" },
"upstream_response_time": { "type": "keyword" },
"request_length": { "type": "integer" },
"body_bytes_sent": { "type": "integer" }
}
}
}
}
第五步:运行
在项目根目录中运行docker-compose up -d --build
结束
完美运行!
后面采用kibana绘制图标就不写了,意义不大
期间遇到的一些其他问题
1.由于在项目中运行拉取kafka的镜像失败,所以在其他机子上把kafka的镜像拉取下来,然后运行docker save -o images.tar openjdk kafka_kafka wurstmeister/zookeeper
,把镜像导出来
然后在线上运行docker load images.tar
参考资料
logstash reference官方文档
logstash介绍
logstash解析Nginx error日志格式
logstash把message中日志时间替换到@timestamp
logstash实用介绍
kafka官方文档
学会kafka shell脚本测试kafka
kibana绘图教程1
kibana绘图教程2
kibana绘图教程3
mac搭建kafka集群参考文章
elk+redis集群参考文章
让外网访问docker里的kafka服务参考文章1
让外网访问docker里的kafka服务参考文章2
让外网访问docker里的kafka服务参考文章3
让外网访问docker里的kafka服务参考文章4