1、支持分批导入(可配置)
2、批量导入降低数据库操作
3、批量数据错误会自动重新生成导入数据最终只剩下错误数据,保证数据正确率和打入率最大化
CODE:
#!/usr/bin/python3
import xlrd
import pymysql
import os
import json
dataHost = ""
dataUser = ""
dataPwd = ""
dataName = "test"
#错误数据文件存储位置
errorFile = "./errorFile.json"
#导入数据源
excelFile = "./jb.xlsx"
#一次允许灌入数据库数据的条数
onceInsertNum = 5
#一次允许灌入数据库的原始数据
dataList = []
#一次允许最多读入EXCEL数据条数
onceReadExcelNum = 10
# 打开数据库连接
print("连接数据库.......")
db = pymysql.connect(dataHost,dataUser,dataPwd,dataName)
print("数据库连接成功!")
'''
将数据灌入数据库
'''
def insertDbData(params=[], onceInsertNums=20):
lens = len(params)
if lens < 1:
print("无插入数据")
return
if int(lens / onceInsertNums) * onceInsertNums == lens:
tempLens = int(lens / onceInsertNums)
else:
tempLens = int((lens / onceInsertNums) + 1)
for i in range(tempLens):
dataList.clear()
# SQL 插入语句
jnums = onceInsertNums
if i == tempLens - 1:
if tempLens * onceInsertNums != lens:
jnums = lens % onceInsertNums
for j in range(i*onceInsertNums, i*onceInsertNums+jnums):
dataList.append(params[j])
sqlLists = getSqlLists(dataList)
insertData(sqlLists)
def getSqlLists(params=[]):
sql = "INSERT INTO test(bh,ptdjr,gcrxm,khjl,jsyxm,bjscjc,cph,dgjc,bz) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s) "
lists = []
# for i in params:
# sql += str(tuple(i)) + ","
# sql = sql.strip(',')
# sql += ";"
for i in params:
lists.append(tuple(i))
return [[sql, lists]]
def insertData(sqlLists):
# 使用cursor()方法获取操作游标
cursor = db.cursor()
try:
print("开始操作数据库.....")
for sql in sqlLists:
print(sql[0])
print(sql[1])
# 执行sql语句
cursor.executemany(sql[0], sql[1])
db.commit()
print("提交数据成功!")
# results = cursor.fetchall()
# print(results)
except:
# 发生错误时回滚
db.rollback()
print("插入失败=>数据回滚中")
writeFiles(errorFile ,dataList)
#获取EXCEL数据
def redDataFormExcel(fileName):
dontReadRows = [0,1]
dontReadCols = []
# 打开execl
workbook = xlrd.open_workbook(fileName)
# 根据sheet索引或者名称获取sheet内容
Data_sheet = workbook.sheets()[0] # 通过索引获取
# Data_sheet = workbook.sheet_by_index(0) # 通过索引获取
# Data_sheet = workbook.sheet_by_name(u'Sheet2') # 通过名称获取
rowNum = Data_sheet.nrows # sheet行数
colNum = Data_sheet.ncols # sheet列数
print("行数:" + str(rowNum))
print("列数:" + str(colNum))
if int(rowNum / onceReadExcelNum) * onceReadExcelNum == rowNum:
tempRowNum = int(rowNum / onceReadExcelNum)
else:
tempRowNum = int(rowNum / onceReadExcelNum) + 1
iCnt = 0
lists = []
rowlist = []
while iCnt < tempRowNum:
print("-----------------------------开始读取数据---------------------------")
getRowNum = onceReadExcelNum
if iCnt == tempRowNum - 1:
if tempRowNum * onceReadExcelNum != rowNum:
getRowNum = rowNum % onceReadExcelNum
lists.clear()
rowListIndex = list(range(iCnt * onceReadExcelNum, iCnt * onceReadExcelNum + getRowNum))
colListIndex = list(range(colNum))
for i in dontReadCols:
if i in colListIndex:
colListIndex.remove(i)
for i in dontReadRows:
if i in rowListIndex:
rowListIndex.remove(i)
for i in rowListIndex:
rowlist = []
for j in colListIndex:
rowlist.append(Data_sheet.cell_value(i, j))
lists.append(rowlist)
print("--------------------------------读取完成-----------------------------")
insertDbData(lists, onceInsertNum)
iCnt += 1
# 输出所有单元格的内容
# print(list)
# os._exit(0)
# for i in lists:
# for j in i:
# print(j, '\t\t', end="")
# print()
# 获取整行和整列的值(列表)
# rows = Data_sheet.row_values(0) # 获取第一行内容
# cols = Data_sheet.col_values(1) # 获取第二列内容
# print (rows)
# print (cols)
# os._exit(0)
# 获取单元格内容
# cell_A1 = Data_sheet.cell(0, 0).value
# cell_B1 = Data_sheet.row(0)[1].value # 使用行索引
# cell_C1 = Data_sheet.cell(0, 2).value
# cell_D2 = Data_sheet.col(3)[1].value # 使用列索引
# print(cell_A1, cell_B1, cell_C1, cell_D2)
# os._exit(0);
# 获取单元格内容的数据类型
# ctype:0 empty,1 string, 2 number, 3 date, 4 boolean, 5 error
# print('cell(0,0)数据类型:', Data_sheet.cell(0, 2).ctype)
# print('cell(1,0)数据类型:', Data_sheet.cell(1, 0).ctype)
# print('cell(5,1)数据类型:', Data_sheet.cell(5, 1).ctype)
# print('cell(1,2)数据类型:', Data_sheet.cell(1, 2).ctype)
# os._exit(0)
# 获取单元格内容为日期的数据
# date_value = xlrd.xldate_as_tuple(Data_sheet.cell_value(1,0),workbook.datemode)
# print(type(date_value), date_value)
# print('%d:%d:%d' % (date_value[0:3]))
#写入文件
def writeFiles(fileName, data):
file = open(fileName, "a+", encoding="utf8")
for i in data:
temp = i
if isinstance(i, list):
temp = json.dumps(i,ensure_ascii=False)
file.write(temp+"\n")
file.close()
print("失败数据写入文件成功")
def readErrorFile():
if not os.path.exists(errorFile):
return
booles = False
tempIcnt = onceInsertNum
while True:
if not booles:
file = open(errorFile, 'r+', encoding='utf8')
lines = file.readlines()
file.seek(0)
file.truncate()
file.close()
tempDataList = []
for i in lines:
tempDataList.append(json.loads(i.strip()))
iCnt = int(tempIcnt / 2)
tempIcnt = iCnt
if iCnt == 0:
iCnt = 1
if iCnt == 1:
booles = True
insertDbData(tempDataList, iCnt)
else:
break
if __name__ == '__main__':
redDataFormExcel(excelFile)
readErrorFile()
#关闭数据库
db.close()