以下文章来源于:https://blog.csdn.net/u014720624/article/details/89308649
作者:CSDN博主「jeff_yxj」
目前生产环境的日志基本都是采用 Flask 文件日志写入 + Gunicorn 文件日志写入 + logrotate做按日切割,日志文件保留30天的数量,这样的方案比较适合简单的场景,长时间下来就有以下几个问题:
服务程序都是分布式部署,所以每个服务器都有自己的本地日志,如果有问题需要查看日志,需要连接到每个服务器来查看
日志是按日切割的,所以排查日志跨度比较大,则需要查看多个日志文件切换排查
日志下载则受限服务器出口带宽下载慢,在线查看vim的搜索机制比较单一
所以根据资料查询,目前确定将日志都统一使用 ELK 收集, 对于以上的问题都有比较好的解决,除此之外还可以保留更长时间的日志及美观的可视化操作,ElasticSearch提供比较优秀的开源关键字搜索,而且这个方案已经被很多人验证,又有商业公司专门维护开源版本,所以是一个很好的选择。
-
服务器版本是 Centos 7.2 版本,ELK 则直接使用 Docker 来安装,并且可以通过Docker直接托管进程,非常的方便。我们使用的是一个开源的 ELK 镜像配置项目,该项目要求 Docker 17.05+
yum自带的Docker 版本比较低,需要安装更新版本的 Docker,如果已经安装了旧版本的Docker,需要先手动卸载
$ sudo yum remove docker \
docker-client \
docker-client-latest \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-selinux \
docker-engine-selinux \
docker-engine
然后安装Docker 必须的依赖库及、添加Dokcer官方的yum源及安装 Docker
$ sudo yum install -y yum-utils device-mapper-persistent-data lvm2
$ sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
$ sudo yum install -y docker-ce
安装完成后输入docker -v 可以查看 Docker 的版本
$ docker -v
Docker version 18.09.4-rc1, build 9d6081e
将 Docker 配置为开机自动启动
$ systemctl enable docker
$ systemctl daemon-reload
$ systemctl start docker
- 安装完成后需要开始安装 ELK,这里我们使用ELK的一个开源Docker 配置项目,该项目提供 Docker-compose 的编排脚本方便统一管理,github地址是:https://github.com/deviantony/docker-elk#how-can-i-tune-the-elasticsearch-configuration
可以下载也可以直接使用git将此项目clone到本地,另外项目启动需要安装docker-compose,这个是由python编写的基于Docker官方API的一个编排工具,我已经安装了python3环境,所以直接使用pip3安装了docker-compose这个库即可使用docker-compose的相关命令。
$ pip3 install docker-compose
$ yum install -y git
$ git clone https://github.com/deviantony/docker-elk.git
$ cd docker-elk
$ docker-compose up
第一次使用 docker-compose up 命令后会直接开始自动安装 ELK 各自的镜像文件,完成安装后即可开始工作,如果需要以deamon的形式运行可以加入 -d 参数
如果下载 Docker 镜像很慢可以使用能科学上网的代理或者使用阿里云的容器镜像服务里面的镜像加速器
完成安装之后可以在项目中直接修改ELK的运行配置,例如直接在 docker-compose.yml 中修改 ElasticSearch 的数据库目录,但是这样修改后需要把文件目录授权给用户 1000,这里默认使用1000这个用户来启动 ELK 的服务的
elasticsearch:
build:
context: elasticsearch/
args:
ELK_VERSION: $ELK_VERSION
volumes:
- ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml:ro
- /data/elasticsearch:/usr/share/elasticsearch/data
ports:
- "9200:9200"
- "9300:9300"
environment:
ES_JAVA_OPTS: "-Xmx256m -Xms256m"
networks:
- elk
logstash中修改下pipe配置,编码使用json类型,修改 logstash/pipeline/logstash.conf 下配置文件
input {
tcp {
port => 5000
codec => json
}
}
然后 docker-compose 重启服务
- 到目前为止,已经完成了 ELK 日志收集的搭建,然后需要在 Flask 中的logger中加入一个将日志数据导入 logstash 的日志文件,这里我们使用 python-logstash 这个库
class MyLogstashFormatter(LogstashFormatterBase):
def __init__(self, message_type='Logstash', tags=None, fqdn=False, status=None):
super(MyLogstashFormatter, self).__init__(message_type, tags, fqdn)
self.status = status if status else {}
def format(self, record):
message = {
'@timestamp': self.format_timestamp(record.created),
'@version': '1',
'message': self.format_exception(record.exc_info) if self.format_exception(
record.exc_info) else record.getMessage(),
'host': local_ip,
'hostId': HOST_ID,
'path': '[line:{}]{}'.format(record.lineno, record.pathname),
'tags': self.tags,
'process': record.processName,
'msgType': self.message_type,
'app': 'api/' + env,
# Extra Fields
'level': record.levelname,
'logger_name': self.host,
}
# Add extra fields
message.update(self.get_extra_fields(record))
# delete random item if length of status over 1000
[self.status.popitem() for i in range(len(self.status)-1000) if len(self.status) >= 1000]
return self.serialize(message)
class LogHandler(TCPLogstashHandler):
def __init__(self, host, port, message_type='flask', tags=None, fqdn=False, version=0):
super(LogHandler, self).__init__(host, port, message_type, tags, fqdn, version)
self.formatter = MyLogstashFormatter(message_type, tags, fqdn)
def init(app):
app.logger.addHandler(LogHandler(LOG_STASH_HOST, LOG_STASH_PORT, tags=''))
LOG_STASH_HOST 和 LOG_STASH_PORT 就是部署 ELK 的地址及端口,然后通过 app.logger 输出的日志会直接导入到 logstash中,然后可以在 kibana 中查看到
- 已经完成了 Flask 的日志导入,接下去还可以继续完成 Gunicorn 的日志导出,Gunicorn 我们使用配置文件的形式来配置参数
import logging
import multiprocessing
import os
from app.config import LOG_STASH_HOST, LOG_STASH_PORT, LOG_PATH
from app.utils.logger import LogHandler
debug = False
deamon = False
loglevel = 'info'
bind = '0.0.0.0:12345'
max_requests = 50000
worker_connections = 50000
x_forwarded_for_header = "X-Real-IP"
# 启动的进程数
workers = multiprocessing.cpu_count()
# workers = 3
worker_class = "gevent"
accesslog = os.path.join(LOG_PATH, 'access.log')
access_log_format = '%({X-Real-IP}i)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"'
errorlog = os.path.join(LOG_PATH, 'error.log')
timeout = 60
err_handler = LogHandler(LOG_STASH_HOST, LOG_STASH_PORT, 'error', tags='')
access_handler = LogHandler(LOG_STASH_HOST, LOG_STASH_PORT, 'access', tags='')
err_handler.setLevel(logging.ERROR)
access_handler.setLevel(logging.INFO)
logging.getLogger('gunicorn.error').addHandler(err_handler)
logging.getLogger('gunicorn.access').addHandler(access_handler)
- 然后我们的负载均衡使用 Haproxy,而 Kibana 的登录认证需要使用付费的模块,所以我们目前先使用基于负载均衡的简单的http认证来实现需要登录才能访问 Kibana的功能,Haproxy 和 Nginx都有提供相关的功能
listen api
bind *:80
mode http
acl log_page hdr_beg(host) -i logs.xxxxxx.com
acl kibana-admin http_auth(kibana-auth-list)
use_backend kibana if log_page
default_backend api-backend
userlist kibana-auth-list
group admin
user jeff password $5$ySIPTQpg$9tk7BU7DF3tdZhUk8JztPH65Sfxnr.XZWDc5TLD1/3B
groups admin
backend kibana
acl kibana-admin http_auth(kibana-auth-list)
http-request auth realm ibeelinkLogger unless kibana-admin
mode http
server kibana_1 172.19.19.39:5601 check
具体的配置方式可以参考:
https://blog.sleeplessbeastie.eu/2018/03/08/how-to-define-basic-authentication-on-haproxy/
以上就完成了日志到ELK的整个流程的部署