python watchdog 实现文件目录的监控

使用: 如果想监控目录,做相应逻辑处理(不想大费周章),相信这会是你很好的选择

**需求: 实现linux 目录监控,将新增的文件放入处理引擎中执行, python 3以上 **

注意事项:

由于文件上传到指定目录时,会触发多重监控事件,需要做逻辑处理

watchdog 简单demo 使用

# -*- coding: utf-8 -*-
# 作用: 用于监控目录的变化,调用对应的处理逻辑

from watchdog.observers import Observer
from watchdog.events import *
import time
import sys
import os
import logging
import zipfile

# 设置系统编码格式
reload(sys)
sys.setdefaultencoding('utf8')

# 创建一个logger,设置日志
logger = logging.getLogger('MonitorDir')
logger.setLevel(logging.DEBUG)

# 创建一个handler,用于写入日志文件
fh = logging.FileHandler('E:/testLog.log')
fh.setLevel(logging.DEBUG)

# 再创建一个handler,用于输出到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)

# 定义handler的输出格式
formatter = logging.Formatter(
    '[%(asctime)s] [%(thread)d] [%(filename)s] [line: %(lineno)d][%(levelname)s] ## %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)

# 给logger添加handler
logger.addHandler(fh)
logger.addHandler(ch)

class FileEventHandler(FileSystemEventHandler):

    def __init__(self):
        FileSystemEventHandler.__init__(self)

    def on_moved(self, event):
        if event.is_directory:
            logger.info("directory moved from {0} to {1}".format(event.src_path, event.dest_path))
        else:
            logger.info("file moved from {0} to {1}".format(event.src_path, event.dest_path))

    def on_created(self, event):
        if event.is_directory:
            logger.info("directory created:{0}".format(event.src_path))
        else:
            logger.info("file created:{0}".format(event.src_path))

    def on_deleted(self, event):
        if event.is_directory:
            logger.info("directory deleted:{0}".format(event.src_path))
        else:
            logger.info("file deleted:{0}".format(event.src_path))
    # 主要监控目录下有文件修改
    def on_modified(self, event):
        # 监控目录下面的目录
        if event.is_directory:
            logger.info("directory modified:{0}".format(event.src_path))
        else:
            logger.info("file modified:{0}".format(event.src_path))

if __name__ == "__main__":
    observer = Observer()
    event_handler = FileEventHandler()
    # 监控目录
    observer.schedule(event_handler, "E:\TestMonitor", True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

通过上面的学习,想必已经对watchdog 有个简单的了解,下面进入正题,直接上实际使用中的代码,也会讲解使用中用到的一些问题,多的不说,正式上代码开始

# -*- coding: utf-8 -*-
# 作用: 用于监控目录的变化,调用对应的处理逻辑

from watchdog.observers import Observer
from watchdog.events import *
import time
import sys
import os
import logging
import zipfile
import DiffPlatDeal
import json
import requests

# 设置系统编码格式
# reload(sys)
# sys.setdefaultencoding('utf8')

# 创建一个logger
logger = logging.getLogger('MonitorDir')
logger.setLevel(logging.DEBUG)

# 创建一个handler,用于写入日志文件
logTime = time.strftime('%Y%m%d', time.localtime(time.time()))
fh = logging.FileHandler('/home/liuqing/logData/MonitorDirLog_' + logTime + '.log')
fh.setLevel(logging.DEBUG)

# 再创建一个handler,用于输出到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)

# 定义handler的输出格式
formatter = logging.Formatter(
    '[%(asctime)s] [%(thread)d] [%(filename)s] [line: %(lineno)d][%(levelname)s] ## %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)

# 给logger添加handler
logger.addHandler(fh)
logger.addHandler(ch)


class FileEventHandler(FileSystemEventHandler):

    def __init__(self):
        FileSystemEventHandler.__init__(self)

    def on_moved(self, event):
        if event.is_directory:
            logger.info("directory moved from {0} to {1}".format(event.src_path, event.dest_path))
        else:
            logger.info("file moved from {0} to {1}".format(event.src_path, event.dest_path))

    def on_created(self, event):
        if event.is_directory:
            logger.info("directory created:{0}".format(event.src_path))
        else:
            logger.info("file created:{0}".format(event.src_path))

    def on_deleted(self, event):
        if event.is_directory:
            logger.info("directory deleted:{0}".format(event.src_path))
        else:
            logger.info("file deleted:{0}".format(event.src_path))

    # 主要监控目录下有文件修改
    def on_modified(self, event):
        sourcePath = event.src_path
        global lastFile
        global n
        # 监控目录下面的目录
        if event.is_directory:
            logger.info("directory modified:{0}".format(sourcePath))
        else:
            size = os.path.getsize(sourcePath)
            if size == 0:
                logger.info('文件是空的')
            else:
                # 用于钉钉监控
                url = "xxxxxxxx"
                header = {
                    "Content-Type": "application/json",
                    "charset": "utf-8"
                }
                try:
                    filename, type = os.path.splitext(sourcePath)
                    # 处理临时文件以及压缩文件
                    if 'tmp' in type:
                        logger.info('文件是临时文件,不做处理')
                    elif 'zip' in type:
                        logger.info('需要先解压文件')
                        f = zipfile.ZipFile(sourcePath, 'r')
                        f.extractall(filename + '/zipData')
                    elif 'swp' in type:
                        logger.info('文件是swp临时文件,不做处理')
                    elif lastFile == sourcePath:
                        logger.info('处理文件重复的问题:{0}'.format(sourcePath))
                    else:
                        # 在这里面调用处理逻辑
                        nowTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
                        if type != ".txt":
                            lastFile = sourcePath
                            logger.info("file modified:{0}".format(sourcePath))
                            listFile.add(sourcePath)
                            logger.info("list:{0}".format(listFile))
                        # 时间处理,20s 执行一次或则list 文件中数量大于4个直接处理
                        if int(n) > 20 and len(listFile) == 0:
                            n = 0
                        if int(n) > 20 and len(listFile) > 0 and len(listFile) < 4:
                            logger.info("n:{0};listFile:{1}".format(n, listFile))
                            # 调用处理的方法
                            DiffPlatDeal.xxxx(logger, listFile)
                            sendList = []
                            for file in listFile:
                                sendList.append(os.path.basename(file))

                            dataSucee1 = {
                                "msgtype": "text",
                                "text": {
                                    "content": "【Success: 处理成功】:【"+nowTime+"】\n"+ ";  ####\n".join(sendList)+"\n 【总条数:】"+str(len(sendList))
                                }
                            }
                            sendData1 = json.dumps(dataSucee1).encode("utf-8")
                            requests.post(url=url, data=sendData1, headers=header)
                            n = 0
                            listFile.clear()
                        # 当list中的值大于等于4个后在处理
                        if len(listFile) >= 4:
                            # 调用处理逻辑然后清空list
                            DiffPlatDeal.xxxxxx(logger, listFile)
                            # 清空list
                            sendList = []
                            for file in listFile:
                                sendList.append(os.path.basename(file))
                            dataSucee = {
                                "msgtype": "text",
                                "text": {
                                    "content": "【Success: 处理成功】:【"+nowTime+"】\n"+  ";  ####\n".join(sendList)+"\n 【总条数:】"+str(len(sendList))
                                }
                            }
                            sendData = json.dumps(dataSucee).encode("utf-8")
                            requests.post(url=url, data=sendData, headers=header)
                            listFile.clear()

                except Exception as e:
                    logger.error(e)
                    time.sleep(5)
                    #  读取文件,获取异常文件名发出通知
                    with open("/xxxxx/py/readExcelList/list.txt", 'r') as f:  # 打开文件
                        lines = f.readlines()  # 读取所有行
                        last_line = lines[-1].strip()  # 取最后一行
                        # 处理异常
                        if len(last_line) != 0:
                            if len(last_line) != 0 and len(last_line.split("::")) == 2:
                                if "," not in last_line.split("::")[1]:
                                    logger.error('当前批次文件全部有问题')
                                    sendList = []
                                    for file in listFile:
                                        sendList.append(os.path.basename(file))
                                    data = {
                                        "msgtype": "text",
                                        "text": {
                                            "content": "【Fail: 文件没有处理】:【"+nowTime+"】\n"+  ";  ####\n".join(sendList)+"\n【错误原因】:\n"+str(e)+"\n 【总条数:】"+str(len(sendList))
                                        }
                                    }
                                    sendData = json.dumps(data).encode("utf-8")
                                    requests.post(url=url, data=sendData, headers=header)
                                else:
                                    spline = last_line.split("::")[1].split(",")
                                    logger.info('Except当前批次处理的文件:{0}'.format(spline))
                                    tmpset = listFile - set(spline)
                                    sendList = []
                                    for file in tmpset:
                                        sendList.append(os.path.basename(file))
                                    data = {
                                        "msgtype": "text",
                                        "text": {
                                            "content": "【Fail: 文件没有处理】:【"+nowTime+"】\n"+  ";  ####\n".join(sendList)+"\n【错误原因】:\n"+str(e)+"\n 【总条数:】"+str(len(sendList))
                                        }
                                    }
                                    sendData = json.dumps(data).encode("utf-8")
                                    requests.post(url=url, data=sendData, headers=header)
                            # 当文件异常时写入换行符
                            with open('/xxxxx/readExcelList/list.txt', 'a') as f:
                                f.write("\n")
                    listFile.clear()


if __name__ == "__main__":
    # 定义一个集合,将一段时间内的目录放入这个集合中:
    listFile = set()
    # 定义一个变量,如果变量达到某个值就执行处理操作
    n = 0
    # 用于获取上次监控的文件
    lastFile = ""
    observer = Observer()
    event_handler = FileEventHandler()
    observer.schedule(event_handler, "/xxxxxx/importData", True)
    observer.start()
    try:
        while True:
            time.sleep(1)
            n += 1
            # 为了触发 def on_modified(self, event) 操作
            if n == 25:
                with open('/xxxxxxx/importData/test.txt', 'w+') as f:
                    f.write("test")
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

代码讲解:

  1. 作用: 用于监控目录 observer.schedule(event_handler, "/xxxxxx/importData", True) , 程序会将上传文件写入listFile = set() 集合当中,触发逻辑处理条件是: f int(n) > 20 and len(listFile) > 0 and len(listFile) < 4【大于20秒,并且set当中文件数量需要小于4,为什么要用set,因为文件写入目录的过程当作会多次触发def on_modified(self, event),避免文件重复】 或则 len(listFile) >= 4【文件数量大于4个】; 处理执行逻辑DiffPlatDeal.xxxxxx,根据自己业务需求做变化
  1. 文件处理的过程中【如果上次传入文件和这次相同,不做任何操作,也是为了避免文件误传,多次触发】 elif lastFile == sourcePath:
    logger.info('处理文件重复的问题:{0}'.format(sourcePath))
  1. 如果文件处理失败,会记录下处理到那个文件了,然后做出对应钉钉告警信息
    异常处理逻辑: except Exception as e: xxxxxx
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容