- 对于大量的非格式化文本数据(如txt、xls、xlsx、csv、sql、html等),某些情况下我们需要针对文件的某一行进行检索。这些文件往往难以格式化后存入结构化数据库中,特别是单个文件体量较大的情况,难以使用常规方式进行检索,即使使用一些特殊的工具,在没有进行索引的情况下,检索速速往往难以忍受。
- 笔者利用开源的全文检索引擎xapian编写了一个完整的文件行索引程序,能自动对txt、csv、sql、html、xls,xlsx格式的文本文件按行建立索引,程序采用中文分词工具“结巴分词”工具对文本行内容进行分词。
- 笔者对50余个总量约200G的文件按行进行索引,完成索引后,检索响应时间在300ms以内。
#coding:utf8
#######################################################################################################
# 自动索引系统 by DaDaLuLa #
#######################################################################################################
# 程序功能:自动对txt,xls,xlsx,csv,sql,html格式的文本文件按行建立索引 #
# 程序要求:1.需索引的文件要放到一个固定的文件夹内,注意文件扩展名要符合要求 #
# 2.文件编码方式须为utf8 #
# 3.文件名不能有中文 #
# 执行流程:实例化filesIndex对象-->从上次断点处继续索引-->遍历文件目录检查未索引的文件-->逐个建立索引 #
# 其他: 1.默认的日志文件为/var/log/auto_index.log(自动建立) #
# 2.每个待索引文件的名字,md5值,断点位置等信息保存在mongodb数据库中 #
# 3.索引文件所在目录默认为/var/lib/xapian/ #
######################################################################################################
#!/bin/python
import os
import sys
import time
import datetime
import hashlib
import string
import re
import io
import logging
import pymongo
import xapian
import jieba
import chardet
#from multiprocessing import Pool
import types
from pyexcel_xls import XLBook
reload(sys)
sys.setdefaultencoding("utf-8")
SG_FILES = "/var/lib/files/" #存放待索引文件的目录
INDEX_DIR = "/var/lib/xapian/" #存放索引文件的目录
LOG_FILE = "/var/log/auto_index.log" #日志文件
POOL_SIZE = 4 #进程池大小
class filesIndex(object):
def __init__(self):
self.all_files = set() #存放遍历出来的所有文件
self.need_indexed_files = set()
self.bad_chars = [' ',',',u',','"',"'","(",")",u"(",u")","/","~","^","-",u".",u"\r",u"\n",u"\t","NULL","null",u'[',u']',u'{',u'}'] #结巴分词需要过滤的坏字符
logging.basicConfig(
level = logging.INFO,
format ='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt = '%a, %d %b %Y %H:%M:%S',
filename = LOG_FILE,
filemode = 'a'
)
self.log = logging #日志模块
#self.pool = Pool(processes = POOL_SIZE) #多进程
self.conn = pymongo.MongoClient("localhost",27017) #连接mongodb
self.col = self.conn.sg_db.files #打开sg_db中的files集合
self.ix_dir = INDEX_DIR #索引文件存放位置
if not os.path.exists(self.ix_dir):
os.mkdir(self.ix_dir)
self.walkDir() #遍历社工库文件夹中所有文件
self.check_new_file() #检查所有待索引新文件
self.init_index()
#遍历待索引文件夹中所有文件
def walkDir(self):
for root,dirs,files in os.walk(SG_FILES,topdown = True):
for name in files:
#print name
fname = os.path.join(root,name)
ext = os.path.splitext(fname)[-1].strip('.').lower()
if ext in ("txt","log","csv","xls","xlsx"):
self.all_files.add(fname)
elif ext == "sql":
self.all_files.add(self.__sql2txt(fname))
elif ext == "old":
pass
else:
self.log.info(u"文件 "+fname+u" 格式无法处理")
#sql格式转为txt
def __sql2txt(self,fname):
f = open("sql.sed","w")
f.write("s/),(/)-\\n(/g\ns/);/)-\\n/g")
f.close()
self.log.info(u"文件 "+fname+u" 开始转化格式")
fname_new = os.path.splitext(fname)[0]+"_sql.txt"
fname_modify = os.path.splitext(fname)[0]+"_sql.old"
os.system("cat "+fname+"| sed -f 'sql.sed' | grep ')-' | sed 's/)-/)/' > "+fname_new)
os.rename(fname,fname_modify)
self.log.info(u"文件 "+fname+u" 转化格式完成")
return fname_new
#检查没有被索引的文件
def check_new_file(self):
old_files = set()
for file_item in self.col.find():
old_files.add(file_item["file_name"])
new_files = self.all_files - old_files
for f in new_files:
print "[!]"+f
md5value = self.__md5_file(f)
dup_file = self.col.find({"md5value":md5value})
if dup_file.count():
self.log.info(u"文件 "+f+u" 与文件 "+dup_file[0]["file_name"]+u" 系同一个文件")
os.remove(f)
else:
charset = chardet.detect(open(f,"r").read(10000))["encoding"]
self.col.insert_one({"file_name":f,"md5value":md5value,"charset":charset,"is_indexed":0,"line_pointer":0})
#计算文件的md5值
def __md5_file(self,file_name):
m = hashlib.md5()
f = io.FileIO(file_name,'r')
bytes = f.read(1024)
while(bytes != b''):
m.update(bytes)
bytes = f.read(1024)
f.close()
md5value = m.hexdigest()
return md5value
#按文件类别找到索引入口
def __indexFiles(self,fname,start = 0):
ext = os.path.splitext(fname)[-1].strip('.')
ext = ext.lower()
if ext in ("txt","log","html","csv"):
self.__index_txt(fname,start)
elif ext in("xls","xlsx"):
self.__index_excel(fname,start)
else:
self.log.info(u"文件"+fname+u"格式无法识别")
#针对txt,log,csv格式文件索引函数
def __index_txt(self,fname,start):
f = open(fname,"r")
index_dir = self.ix_dir+os.path.split(fname)[1].replace(".","_")
index_db = xapian.WritableDatabase(index_dir,xapian.DB_CREATE_OR_OPEN)
print "开始"
if start > 0: #断点索引
line_num = start
self.log.info(u"文件"+fname+ u"从第"+str(start)+u"行处开始索引")
while start > 0: #移动文件指针到断点位置
f.readline()
#print start
start = start -1
else:
self.log.info(u"文件"+fname+u"开始索引") #正常索引
line_num = 0
datas = f.readlines(20000000) #一次读取20M加快速度
try:
while datas:
lines = 0
for data in datas:
doc = xapian.Document()
doc.set_data(data)
seg_list = jieba.cut_for_search(data)
seg_list_clean = list(set(seg_list).difference(set(self.bad_chars)))
for seg in seg_list_clean:
#print seg
doc.add_term(seg)
index_db.add_document(doc)
lines = lines + 1
#print lines
#index_db.flush()
line_num = line_num + lines
print line_num
datas= f.readlines(20000000)
except:
line_num = line_num + lines
f.close()
print "[!]############"
info = sys.exc_info()
print info[0],":",info[1] #把错误打印出来,便于调试
print "[!]***********"
index_db.flush()
self.__index_exit(fname,line_num) #调用索引中断处理程序
index_db.flush()
self.col.update_one({"file_name":fname},{"$set":{"is_indexed":1,"line_pointer":line_num}})
self.log.info(u"文件"+fname+u"索引成功完成,共"+str(line_num)+u"条记录") #执行到这一步说明索引成功完成
f.close()
#针对excel文件的处理函数
def __index_excel(self,fname,start):
book = XLBook(fname)
xls_data = dict(book.sheets())
index_dir = self.ix_dir+os.path.split(fname)[1].replace(".","_")
index_db = xapian.WritableDatabase(index_dir,xapian.DB_CREATE_OR_OPEN)
line_num = 0
if start > 0:
self.log.info(u"文件"+fname+ u"从第"+str(start)+u"行处开始索引")
else:
self.log.info(u"文件"+fname+u"开始索引") #正常索引
try:
for sheet in xls_data:
for row in xls_data[sheet]:
line_num = line_num + 1
if line_num < start:
continue
data = ""
for col in row:
if type(col) == float:
col = str(int(col))
elif type(col) == int or type(col) == datetime.datetime or type(col) == datetime.date:
col = str(col)
else:
col = col.encode("utf-8")
data = data+col+ ","
data = data.strip(",")
doc = xapian.Document()
doc.set_data(data)
seg_list = jieba.cut_for_search(data)
seg_list_clean = list(set(seg_list).difference(set(self.bad_chars)))
for seg in seg_list_clean:
doc.add_term(seg)
index_db.add_document(doc)
except:
info = sys.exc_info()
print info[0],":",info[1] #把错误打印出来,便于调试
index_db.flush()
self.__index_exit(fname,line_num) #调用索引中断处理程序
index_db.flush()
self.col.update_one({"file_name":fname},{"$set":{"is_indexed":1,"line_pointer":line_num}})
self.log.info(u"文件"+fname+u"索引成功完成") #执行到这一步说明索引成功完成
self.log.info(u"文件"+fname+u"索引成功完成,共"+str(line_num)+u"条记录") #执行到这一步说明索引成功完成
#索引断点处理程序 (保存断点位置,写入日志)
def __index_exit(self,fname,line_num):
#记录索引断点的位置
self.col.update_one({"file_name":fname},{"$set":{"line_pointer":line_num}})
self.log.info(u"文件"+fname+u"索引中断发生在第"+str(line_num)+u"行")
sys.exit()
#索引入口,判断索引模式(断点索引还是正常索引)
def init_index(self):
#断点索引
for doc in self.col.find({"is_indexed":0,"line_pointer":{"$gt":0}}):
#self.pool.apply_async(self.__indexFiles,args=(doc["file_name"],doc["line_pointer"]))
self.data_tag = os.path.split(doc["file_name"])[1].split(".")[0] #数据标签
self.__indexFiles(doc["file_name"],doc["line_pointer"])
#新文件索引
for doc in self.col.find({"is_indexed":0,"line_pointer":0}):
#self.pool.apply_async(self.__indexFiles,args=(doc["file_name"],))
self.data_tag = os.path.split(doc["file_name"])[1].split(".")[0]
self.__indexFiles(doc["file_name"])
#self.pool.close()
#self.pool.join()
#数据检索接口
def search(self,keyword):
db = xapian.Database()
for d in os.listdir(self.ix_dir):
print self.ix_dir+d
db_d = xapian.Database(self.ix_dir+d)
db.add_database(db_d)
enquire =xapian.Enquire(db)
query = xapian.Query(keyword)
print keyword
enquire.set_query(query)
matches = enquire.get_mset(0,50)
print matches.size()
for m in matches:
print "%i %i%% docid=%i [%s]" % (m.rank+1,m.percent,m.docid,m.document.get_data())
#i析构函数
def __del__(self):
self.conn.close() #关闭mongodb连接
if __name__ == "__main__":
reload(sys)
sys.setdefaultencoding("utf8")
INDEX = filesIndex()
INDEX.walkDir() #遍历待索引文件夹中所有文件
INDEX.check_new_file() #检查所有待索引新文件
INDEX.init_index()
#start_time = time.time()
#INDEX.search("123456") #数据检索测试
#finsh_time = time.time()
#print "[+] 检索耗时 "+str(finsh_time-start_time)
【原创文章,转载文章和文中代码请注明出处】