【原创】用xapian打造一个文件“行”索引系统

  • 对于大量的非格式化文本数据(如txt、xls、xlsx、csv、sql、html等),某些情况下我们需要针对文件的某一行进行检索。这些文件往往难以格式化后存入结构化数据库中,特别是单个文件体量较大的情况,难以使用常规方式进行检索,即使使用一些特殊的工具,在没有进行索引的情况下,检索速速往往难以忍受。
  • 笔者利用开源的全文检索引擎xapian编写了一个完整的文件行索引程序,能自动对txt、csv、sql、html、xls,xlsx格式的文本文件按行建立索引,程序采用中文分词工具“结巴分词”工具对文本行内容进行分词。
  • 笔者对50余个总量约200G的文件按行进行索引,完成索引后,检索响应时间在300ms以内。
#coding:utf8
#######################################################################################################
#                             自动索引系统 by DaDaLuLa                                                #
#######################################################################################################
#   程序功能:自动对txt,xls,xlsx,csv,sql,html格式的文本文件按行建立索引                                   #
#   程序要求:1.需索引的文件要放到一个固定的文件夹内,注意文件扩展名要符合要求                              #                        
#            2.文件编码方式须为utf8                                                                    #
#            3.文件名不能有中文                                                                        #
#   执行流程:实例化filesIndex对象-->从上次断点处继续索引-->遍历文件目录检查未索引的文件-->逐个建立索引      #
#   其他:    1.默认的日志文件为/var/log/auto_index.log(自动建立)                                       #
#            2.每个待索引文件的名字,md5值,断点位置等信息保存在mongodb数据库中                             #
#            3.索引文件所在目录默认为/var/lib/xapian/                                                  #
######################################################################################################

#!/bin/python
import os
import sys
import time
import datetime
import hashlib
import string
import re
import io
import logging
import pymongo
import xapian
import jieba
import chardet
#from multiprocessing import Pool
import types
from pyexcel_xls import XLBook
reload(sys)
sys.setdefaultencoding("utf-8")

SG_FILES    = "/var/lib/files/"                                 #存放待索引文件的目录
INDEX_DIR   = "/var/lib/xapian/"                                #存放索引文件的目录
LOG_FILE    = "/var/log/auto_index.log"                         #日志文件
POOL_SIZE   = 4                                                 #进程池大小

class filesIndex(object):
    def __init__(self):
        self.all_files = set()                             #存放遍历出来的所有文件
        self.need_indexed_files = set()
        self.bad_chars = [' ',',',u',','"',"'","(",")",u"(",u")","/","~","^","-",u".",u"\r",u"\n",u"\t","NULL","null",u'[',u']',u'{',u'}']   #结巴分词需要过滤的坏字符
    
        logging.basicConfig(
                    level = logging.INFO,
                                    format ='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                                    datefmt = '%a, %d %b %Y %H:%M:%S',
                                    filename = LOG_FILE,
                                    filemode = 'a'
                                   )
        self.log = logging                                 #日志模块
    
        #self.pool = Pool(processes = POOL_SIZE)           #多进程
    
        self.conn = pymongo.MongoClient("localhost",27017) #连接mongodb
        self.col = self.conn.sg_db.files                   #打开sg_db中的files集合

        self.ix_dir = INDEX_DIR                            #索引文件存放位置
        if not os.path.exists(self.ix_dir):
            os.mkdir(self.ix_dir)

        self.walkDir()                                 #遍历社工库文件夹中所有文件
        self.check_new_file()                          #检查所有待索引新文件
        self.init_index()
            
    #遍历待索引文件夹中所有文件
    def walkDir(self):
        for root,dirs,files in os.walk(SG_FILES,topdown = True):
            for name in files:
                #print name
                fname = os.path.join(root,name)
                ext = os.path.splitext(fname)[-1].strip('.').lower()
                if ext in ("txt","log","csv","xls","xlsx"):
                    self.all_files.add(fname)
                elif ext == "sql":
                    self.all_files.add(self.__sql2txt(fname))
                elif ext == "old":
                    pass
                else:
                    self.log.info(u"文件 "+fname+u" 格式无法处理")
    
    #sql格式转为txt
    def __sql2txt(self,fname):
        f = open("sql.sed","w")
        f.write("s/),(/)-\\n(/g\ns/);/)-\\n/g")
        f.close()
        self.log.info(u"文件 "+fname+u" 开始转化格式")
        fname_new = os.path.splitext(fname)[0]+"_sql.txt"
        fname_modify = os.path.splitext(fname)[0]+"_sql.old"
        os.system("cat "+fname+"| sed -f 'sql.sed' | grep ')-' | sed 's/)-/)/' > "+fname_new)
        os.rename(fname,fname_modify)   
        self.log.info(u"文件 "+fname+u" 转化格式完成")
        return fname_new

    #检查没有被索引的文件
    def check_new_file(self):
        old_files = set()
        for file_item in self.col.find():
            old_files.add(file_item["file_name"])
        new_files = self.all_files - old_files
        for f in new_files:
            print "[!]"+f 
            md5value = self.__md5_file(f)
            dup_file = self.col.find({"md5value":md5value})
            if dup_file.count():
                self.log.info(u"文件 "+f+u" 与文件 "+dup_file[0]["file_name"]+u" 系同一个文件")
                os.remove(f)
            else:
                charset = chardet.detect(open(f,"r").read(10000))["encoding"]
                self.col.insert_one({"file_name":f,"md5value":md5value,"charset":charset,"is_indexed":0,"line_pointer":0})
            
    #计算文件的md5值
    def __md5_file(self,file_name):
        m = hashlib.md5()   
        f = io.FileIO(file_name,'r')
        bytes = f.read(1024)
        while(bytes != b''):
            m.update(bytes)
            bytes = f.read(1024)
        f.close()
        md5value = m.hexdigest()
        return md5value
    
    #按文件类别找到索引入口
    def __indexFiles(self,fname,start = 0):
        ext = os.path.splitext(fname)[-1].strip('.')
        ext = ext.lower()
        if ext in ("txt","log","html","csv"):
            self.__index_txt(fname,start)
        elif ext in("xls","xlsx"):
            self.__index_excel(fname,start)
        else:
            self.log.info(u"文件"+fname+u"格式无法识别")    
    
    #针对txt,log,csv格式文件索引函数  
    def __index_txt(self,fname,start):
        f = open(fname,"r")
        index_dir = self.ix_dir+os.path.split(fname)[1].replace(".","_")
        index_db = xapian.WritableDatabase(index_dir,xapian.DB_CREATE_OR_OPEN)
        print "开始"  
        if start > 0:                                           #断点索引
            line_num = start
            self.log.info(u"文件"+fname+ u"从第"+str(start)+u"行处开始索引")
            while start > 0:                                #移动文件指针到断点位置
                f.readline()
                #print start
                start = start -1
        else:
            self.log.info(u"文件"+fname+u"开始索引")          #正常索引
            line_num = 0
            
        datas = f.readlines(20000000)                           #一次读取20M加快速度
        try:
            while datas:
                lines = 0
                for data in datas:
                    doc = xapian.Document()
                    doc.set_data(data)
                    seg_list = jieba.cut_for_search(data)
                    seg_list_clean = list(set(seg_list).difference(set(self.bad_chars)))
                    for seg in seg_list_clean:
                        #print seg
                        doc.add_term(seg)
                    index_db.add_document(doc)
                    lines = lines + 1
                    #print lines
                    #index_db.flush()
                
                line_num = line_num + lines
                print line_num
                datas= f.readlines(20000000)
        
        except:
                line_num = line_num + lines
                f.close()
                print "[!]############" 
                info = sys.exc_info()
                print info[0],":",info[1]                           #把错误打印出来,便于调试
                print "[!]***********"
                index_db.flush()
                self.__index_exit(fname,line_num)                   #调用索引中断处理程序
        index_db.flush()
        self.col.update_one({"file_name":fname},{"$set":{"is_indexed":1,"line_pointer":line_num}})
        self.log.info(u"文件"+fname+u"索引成功完成,共"+str(line_num)+u"条记录")                     #执行到这一步说明索引成功完成
        f.close()
    
    #针对excel文件的处理函数     
    def __index_excel(self,fname,start):
        book = XLBook(fname)
        xls_data = dict(book.sheets())
        index_dir = self.ix_dir+os.path.split(fname)[1].replace(".","_")
        index_db = xapian.WritableDatabase(index_dir,xapian.DB_CREATE_OR_OPEN)
        line_num = 0
        if start > 0: 
            self.log.info(u"文件"+fname+ u"从第"+str(start)+u"行处开始索引")
        else:
            self.log.info(u"文件"+fname+u"开始索引")          #正常索引
        try:
            for sheet in xls_data:
                for row in xls_data[sheet]:
                    line_num = line_num + 1
                    if line_num < start:
                        continue
                    data = ""
                    for col in row:
                        if type(col) == float:
                            col = str(int(col))
                        elif type(col) == int or type(col) == datetime.datetime or type(col) == datetime.date:
                            col = str(col)
                        else:
                            col = col.encode("utf-8")
                        data = data+col+ ","
                    data = data.strip(",")
                    doc = xapian.Document()
                    doc.set_data(data)
                    seg_list = jieba.cut_for_search(data)
                    seg_list_clean = list(set(seg_list).difference(set(self.bad_chars)))
                    for seg in seg_list_clean:
                        doc.add_term(seg)
                    index_db.add_document(doc)
        except:
                info = sys.exc_info()
                print info[0],":",info[1]                           #把错误打印出来,便于调试
                index_db.flush()
                self.__index_exit(fname,line_num)                   #调用索引中断处理程序
        index_db.flush()
        self.col.update_one({"file_name":fname},{"$set":{"is_indexed":1,"line_pointer":line_num}})
        self.log.info(u"文件"+fname+u"索引成功完成")                    #执行到这一步说明索引成功完成
        self.log.info(u"文件"+fname+u"索引成功完成,共"+str(line_num)+u"条记录")                     #执行到这一步说明索引成功完成
        
    #索引断点处理程序 (保存断点位置,写入日志)
    def __index_exit(self,fname,line_num):
        #记录索引断点的位置
        self.col.update_one({"file_name":fname},{"$set":{"line_pointer":line_num}})

        self.log.info(u"文件"+fname+u"索引中断发生在第"+str(line_num)+u"行")
        sys.exit()
        
    #索引入口,判断索引模式(断点索引还是正常索引)
    def init_index(self):
        #断点索引
        for doc in self.col.find({"is_indexed":0,"line_pointer":{"$gt":0}}):
            #self.pool.apply_async(self.__indexFiles,args=(doc["file_name"],doc["line_pointer"]))
            self.data_tag = os.path.split(doc["file_name"])[1].split(".")[0]    #数据标签
            self.__indexFiles(doc["file_name"],doc["line_pointer"])             
        #新文件索引      
        for doc in self.col.find({"is_indexed":0,"line_pointer":0}):
            #self.pool.apply_async(self.__indexFiles,args=(doc["file_name"],))
            self.data_tag = os.path.split(doc["file_name"])[1].split(".")[0]
            self.__indexFiles(doc["file_name"])
        #self.pool.close()
        #self.pool.join()       

    #数据检索接口     
    def search(self,keyword):
        db = xapian.Database()
        for d in os.listdir(self.ix_dir):
            print self.ix_dir+d
            db_d = xapian.Database(self.ix_dir+d)
            db.add_database(db_d)
        enquire =xapian.Enquire(db)
        query = xapian.Query(keyword)
        print keyword
        enquire.set_query(query)
        matches = enquire.get_mset(0,50)
        print matches.size()
        for m in matches:
            print "%i %i%% docid=%i [%s]" % (m.rank+1,m.percent,m.docid,m.document.get_data())  
    #i析构函数
    def __del__(self):
        self.conn.close()      #关闭mongodb连接

if __name__ == "__main__":
    reload(sys)
    sys.setdefaultencoding("utf8")
    INDEX = filesIndex()
    INDEX.walkDir()                                 #遍历待索引文件夹中所有文件
    INDEX.check_new_file()                          #检查所有待索引新文件
    INDEX.init_index()
    #start_time = time.time()
    #INDEX.search("123456")                          #数据检索测试
    #finsh_time = time.time()
    #print "[+] 检索耗时 "+str(finsh_time-start_time)

【原创文章,转载文章和文中代码请注明出处】

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容