1.文档编写目的
主要是为了给大家提供一种监控某个服务运行情况或者是针对该服务日志进行一些其他分析功能的技术选型方式。本篇文档将针对大数据集群的hue组件,用户在使用hue查询hive数据后会有一个下载数据的操作,这个操作将严重影响hue服务的稳定性。所以我们决定通过logstash+kafka的方式分析hue日志,监控下载数据的操作。
- 内容概述
- 环境描述
- 利用logstash收集hue日志并发送到kafka
- 编写python代码消费Kafka消息,并分析日志,最后发送钉钉报警
- 验证报警机制
- 前提条件
- logstash已安装并能正常运行
- Kafka已创建相关topic
2.环境描述
- hue的日志都分布在...、...、...*上的/var/log/hue/目录下。
- logstash版本logstash-6.2.3(里面有个坑:这个版本必须用java8才能使用,由于生产环境都是jdk1.7的所以这里要单独配置环境:编辑bin/logstash 添加jdk1.8的路径)
- kafka地址:...,...,...**,端口都是9092(由于版本较新不需要zookeeper的相关信息)。topic信息:hue-download-group。
3.logstash收集hue日志
- 配置logstash文件
input {
file {
path => ["/var/log/hue/access.log","/var/log/hue/access.log.1"]
stat_interval => 5
}
}
output {
kafka {
topic_id => "hue-access-log"
bootstrap_servers => "**.*.**.***:9092;**.*.**.***:9092;**.*.**.***:9092"
}
}
- 启动logstash服务:
这里要修改一下启动脚本logstash:添加环境变量export JAVA_HOME=/opt/soft/java/jdk1.8.0_111/
启动命令:nohup bin/logstash -f confs/hue-download-monitor.conf >>hue_download.log 2&1 &
- 可以开启一个Kafka console comsumer看看是否收到日志消息:
进到kafka服务器,bin/kafka-console-consumer.sh --topic hue-access-log --bootstrap-server localhost:9092
4.代码编写
from kafka import KafkaConsumer
import re
import requests
import json
dingTalkProdURL = '*****'
def findDownload(inputMessage):
#一条日志中包含‘download’就是在下载数据
if 'download' in inputMessage:
print inputMessage
elements = re.split(r"[ ]+", inputMessage)
# server ip , time , user ip , user name
return (elements[1],elements[3],elements[6],elements[7])
def postToDingTalk(monitorMessage):
#封装发送的消息
messageBody = {}
messageBody["msgtype"] = "markdown"
messageBody["markdown"] = {}
messageBody["markdown"]["title"] = "Hue Download Monitor"
messageBody["markdown"]["text"] = "### Download User Name: " + monitorMessage[3] + "\n" + \
"1. User IP: " + monitorMessage[2] + "\n" + \
"2. Download Time: " + monitorMessage[1] + "\n" + \
"3. Hue Server: " + monitorMessage[0] + "\n"
messageBody["at"] = {}
messageBody["at"]["atMobiles"] = []
messageBody["at"]["isAtAll"] = 'false'
header = {}
header["Content-Type"] = "application/json; charset=utf-8"
print json.dumps(messageBody)
r = requests.post(dingTalkProdURL, json.dumps(messageBody), headers=header)
print r.status_code
print r.reason
if __name__ == '__main__':
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('hue-access-log',
group_id='hue-download-group',
bootstrap_servers=['**.*.**.***:9092', '**.*.**.***:9092', '**.*.**.***:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
# print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
# message.offset, message.key,
# message.value))
elementTuple = findDownload(message.value)
if elementTuple != None:
#将报警信息封装好后post到钉钉的接口
postToDingTalk(elementTuple)
5.验证报警机制的准确性
- 登录到hue页面,查询hive数据,然后点击下载ecxl表格
- 钉钉出现报警,验证完毕!
6.总结
- 利用logstash收集日志,将日志打到kafka,最后再在代码中消费kafka消息是大数据中解决实时报警的基本方式。同时这也是解决一些需要处理实时数据的重要手段。
- 这里采用logstash而不是flume收集日志主要是logstash配置相对简单,适合轻量级的应用。