使用: 如果想监控目录,做相应逻辑处理(不想大费周章),相信这会是你很好的选择
**需求: 实现linux 目录监控,将新增的文件放入处理引擎中执行, python 3以上 **
注意事项:
由于文件上传到指定目录时,会触发多重监控事件,需要做逻辑处理
watchdog 简单demo 使用
# -*- coding: utf-8 -*-
# 作用: 用于监控目录的变化,调用对应的处理逻辑
from watchdog.observers import Observer
from watchdog.events import *
import time
import sys
import os
import logging
import zipfile
# 设置系统编码格式
reload(sys)
sys.setdefaultencoding('utf8')
# 创建一个logger,设置日志
logger = logging.getLogger('MonitorDir')
logger.setLevel(logging.DEBUG)
# 创建一个handler,用于写入日志文件
fh = logging.FileHandler('E:/testLog.log')
fh.setLevel(logging.DEBUG)
# 再创建一个handler,用于输出到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# 定义handler的输出格式
formatter = logging.Formatter(
'[%(asctime)s] [%(thread)d] [%(filename)s] [line: %(lineno)d][%(levelname)s] ## %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# 给logger添加handler
logger.addHandler(fh)
logger.addHandler(ch)
class FileEventHandler(FileSystemEventHandler):
def __init__(self):
FileSystemEventHandler.__init__(self)
def on_moved(self, event):
if event.is_directory:
logger.info("directory moved from {0} to {1}".format(event.src_path, event.dest_path))
else:
logger.info("file moved from {0} to {1}".format(event.src_path, event.dest_path))
def on_created(self, event):
if event.is_directory:
logger.info("directory created:{0}".format(event.src_path))
else:
logger.info("file created:{0}".format(event.src_path))
def on_deleted(self, event):
if event.is_directory:
logger.info("directory deleted:{0}".format(event.src_path))
else:
logger.info("file deleted:{0}".format(event.src_path))
# 主要监控目录下有文件修改
def on_modified(self, event):
# 监控目录下面的目录
if event.is_directory:
logger.info("directory modified:{0}".format(event.src_path))
else:
logger.info("file modified:{0}".format(event.src_path))
if __name__ == "__main__":
observer = Observer()
event_handler = FileEventHandler()
# 监控目录
observer.schedule(event_handler, "E:\TestMonitor", True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
通过上面的学习,想必已经对watchdog 有个简单的了解,下面进入正题,直接上实际使用中的代码,也会讲解使用中用到的一些问题,多的不说,正式上代码开始
# -*- coding: utf-8 -*-
# 作用: 用于监控目录的变化,调用对应的处理逻辑
from watchdog.observers import Observer
from watchdog.events import *
import time
import sys
import os
import logging
import zipfile
import DiffPlatDeal
import json
import requests
# 设置系统编码格式
# reload(sys)
# sys.setdefaultencoding('utf8')
# 创建一个logger
logger = logging.getLogger('MonitorDir')
logger.setLevel(logging.DEBUG)
# 创建一个handler,用于写入日志文件
logTime = time.strftime('%Y%m%d', time.localtime(time.time()))
fh = logging.FileHandler('/home/liuqing/logData/MonitorDirLog_' + logTime + '.log')
fh.setLevel(logging.DEBUG)
# 再创建一个handler,用于输出到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# 定义handler的输出格式
formatter = logging.Formatter(
'[%(asctime)s] [%(thread)d] [%(filename)s] [line: %(lineno)d][%(levelname)s] ## %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# 给logger添加handler
logger.addHandler(fh)
logger.addHandler(ch)
class FileEventHandler(FileSystemEventHandler):
def __init__(self):
FileSystemEventHandler.__init__(self)
def on_moved(self, event):
if event.is_directory:
logger.info("directory moved from {0} to {1}".format(event.src_path, event.dest_path))
else:
logger.info("file moved from {0} to {1}".format(event.src_path, event.dest_path))
def on_created(self, event):
if event.is_directory:
logger.info("directory created:{0}".format(event.src_path))
else:
logger.info("file created:{0}".format(event.src_path))
def on_deleted(self, event):
if event.is_directory:
logger.info("directory deleted:{0}".format(event.src_path))
else:
logger.info("file deleted:{0}".format(event.src_path))
# 主要监控目录下有文件修改
def on_modified(self, event):
sourcePath = event.src_path
global lastFile
global n
# 监控目录下面的目录
if event.is_directory:
logger.info("directory modified:{0}".format(sourcePath))
else:
size = os.path.getsize(sourcePath)
if size == 0:
logger.info('文件是空的')
else:
# 用于钉钉监控
url = "xxxxxxxx"
header = {
"Content-Type": "application/json",
"charset": "utf-8"
}
try:
filename, type = os.path.splitext(sourcePath)
# 处理临时文件以及压缩文件
if 'tmp' in type:
logger.info('文件是临时文件,不做处理')
elif 'zip' in type:
logger.info('需要先解压文件')
f = zipfile.ZipFile(sourcePath, 'r')
f.extractall(filename + '/zipData')
elif 'swp' in type:
logger.info('文件是swp临时文件,不做处理')
elif lastFile == sourcePath:
logger.info('处理文件重复的问题:{0}'.format(sourcePath))
else:
# 在这里面调用处理逻辑
nowTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
if type != ".txt":
lastFile = sourcePath
logger.info("file modified:{0}".format(sourcePath))
listFile.add(sourcePath)
logger.info("list:{0}".format(listFile))
# 时间处理,20s 执行一次或则list 文件中数量大于4个直接处理
if int(n) > 20 and len(listFile) == 0:
n = 0
if int(n) > 20 and len(listFile) > 0 and len(listFile) < 4:
logger.info("n:{0};listFile:{1}".format(n, listFile))
# 调用处理的方法
DiffPlatDeal.xxxx(logger, listFile)
sendList = []
for file in listFile:
sendList.append(os.path.basename(file))
dataSucee1 = {
"msgtype": "text",
"text": {
"content": "【Success: 处理成功】:【"+nowTime+"】\n"+ "; ####\n".join(sendList)+"\n 【总条数:】"+str(len(sendList))
}
}
sendData1 = json.dumps(dataSucee1).encode("utf-8")
requests.post(url=url, data=sendData1, headers=header)
n = 0
listFile.clear()
# 当list中的值大于等于4个后在处理
if len(listFile) >= 4:
# 调用处理逻辑然后清空list
DiffPlatDeal.xxxxxx(logger, listFile)
# 清空list
sendList = []
for file in listFile:
sendList.append(os.path.basename(file))
dataSucee = {
"msgtype": "text",
"text": {
"content": "【Success: 处理成功】:【"+nowTime+"】\n"+ "; ####\n".join(sendList)+"\n 【总条数:】"+str(len(sendList))
}
}
sendData = json.dumps(dataSucee).encode("utf-8")
requests.post(url=url, data=sendData, headers=header)
listFile.clear()
except Exception as e:
logger.error(e)
time.sleep(5)
# 读取文件,获取异常文件名发出通知
with open("/xxxxx/py/readExcelList/list.txt", 'r') as f: # 打开文件
lines = f.readlines() # 读取所有行
last_line = lines[-1].strip() # 取最后一行
# 处理异常
if len(last_line) != 0:
if len(last_line) != 0 and len(last_line.split("::")) == 2:
if "," not in last_line.split("::")[1]:
logger.error('当前批次文件全部有问题')
sendList = []
for file in listFile:
sendList.append(os.path.basename(file))
data = {
"msgtype": "text",
"text": {
"content": "【Fail: 文件没有处理】:【"+nowTime+"】\n"+ "; ####\n".join(sendList)+"\n【错误原因】:\n"+str(e)+"\n 【总条数:】"+str(len(sendList))
}
}
sendData = json.dumps(data).encode("utf-8")
requests.post(url=url, data=sendData, headers=header)
else:
spline = last_line.split("::")[1].split(",")
logger.info('Except当前批次处理的文件:{0}'.format(spline))
tmpset = listFile - set(spline)
sendList = []
for file in tmpset:
sendList.append(os.path.basename(file))
data = {
"msgtype": "text",
"text": {
"content": "【Fail: 文件没有处理】:【"+nowTime+"】\n"+ "; ####\n".join(sendList)+"\n【错误原因】:\n"+str(e)+"\n 【总条数:】"+str(len(sendList))
}
}
sendData = json.dumps(data).encode("utf-8")
requests.post(url=url, data=sendData, headers=header)
# 当文件异常时写入换行符
with open('/xxxxx/readExcelList/list.txt', 'a') as f:
f.write("\n")
listFile.clear()
if __name__ == "__main__":
# 定义一个集合,将一段时间内的目录放入这个集合中:
listFile = set()
# 定义一个变量,如果变量达到某个值就执行处理操作
n = 0
# 用于获取上次监控的文件
lastFile = ""
observer = Observer()
event_handler = FileEventHandler()
observer.schedule(event_handler, "/xxxxxx/importData", True)
observer.start()
try:
while True:
time.sleep(1)
n += 1
# 为了触发 def on_modified(self, event) 操作
if n == 25:
with open('/xxxxxxx/importData/test.txt', 'w+') as f:
f.write("test")
except KeyboardInterrupt:
observer.stop()
observer.join()
代码讲解:
- 作用: 用于监控目录 observer.schedule(event_handler, "/xxxxxx/importData", True) , 程序会将上传文件写入listFile = set() 集合当中,触发逻辑处理条件是: f int(n) > 20 and len(listFile) > 0 and len(listFile) < 4【大于20秒,并且set当中文件数量需要小于4,为什么要用set,因为文件写入目录的过程当作会多次触发def on_modified(self, event),避免文件重复】 或则 len(listFile) >= 4【文件数量大于4个】; 处理执行逻辑DiffPlatDeal.xxxxxx,根据自己业务需求做变化
- 文件处理的过程中【如果上次传入文件和这次相同,不做任何操作,也是为了避免文件误传,多次触发】 elif lastFile == sourcePath:
logger.info('处理文件重复的问题:{0}'.format(sourcePath))
- 如果文件处理失败,会记录下处理到那个文件了,然后做出对应钉钉告警信息
异常处理逻辑: except Exception as e: xxxxxx