全文约2000字,阅读大约需要10分钟
在日常工作中,我们经常会遇到需要处理大量文件并将数据存储至数据库或整合到一个文件的需求。这个任务对于人力和时间来说都是一大挑战。幸运的是,我们有Python这个神奇的工具,可以帮助我们自动化这个任务,省时又省力!现在,我将向你展示如何使用Python处理Excel文件并将数据存储到PostgreSQL数据库中。
先打个底:以理解为主,不够严谨,如果看完还是不会,那一定是我讲的不够好,千万别影响你们探索Python的兴趣。
在我们的奇妙冒险中,如果你想将多个excel文件整合到一个表中,需要满足一个前置条件——每个excel文件的格式和列对应的含义顺序必须一致。但是,如果表头不一样也没关系,我们可以用程序来解决这个问题。本文将带你进入Python的魔法世界,教你如何处理Excel文件并将数据存储到PostgreSQL数据库中。在开始之前,我们需要安装一些神奇的库:
- pandas:用于处理Excel文件中的数据
- sqlalchemy:用于连接和操作PostgreSQL数据库
安装方法这里就不再重点讲了了,直接搜网上的教程安装即可。
1.日志记录
开局先送送你一串Python日志记录的代码,可在任何场景下复用,它能够实时监测程序的运行状态,轻松解决测试和问题排查的难题。
注意:log_home
需要改为自己本地路径
# 定义日志记录器
log_home = '/home/xusl/log/excel' # 请将此路径改为你自己的本地路径
log_level = logging.INFO
log_to_console = True
log_config = {
'version': 1,
'formatters': {
'generic': {
'format': '%(asctime)s %(levelname)-5.5s [%(name)s:%(lineno)s][%(threadName)s] %(message)s',
},
'simple': {
'format': '%(asctime)s %(levelname)-5.5s %(message)s',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'generic',
},
'file': {
'class': 'logging.FileHandler',
'filename': os.path.join(log_home, 'excel_to_data.log'),
'encoding': 'utf-8',
'formatter': 'generic',
},
},
'root': {
'level': log_level,
'handlers': ['console', 'file', ] if log_to_console else ['file', ],
}
}
logging.config.dictConfig(log_config)
logger = logging.getLogger(__name__)
2.数据库连接
接下来,我们需要配置自己的数据库信息。
# 建立与PostgreSQL数据库的连接 此处需要更改为自己的数据库配置
db_user = 'dps'
db_password = 'DPS888'
db_host = '10.12.8.88'
db_port = '5432'
db_name = 'dpstest'
def get_conn():
conn_url = 'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}'
engine = create_engine(conn_url.format(database=db_name, user=db_user, password=db_password, host=db_host, port=db_port),
pool_size=20,
pool_recycle=7200,
connect_args={'connect_timeout': 30})
try:
with engine.connect():
logger.info('成功连接到数据库')
except Exception as e:
logger.error('无法连接到数据库:', str(e))
return engine
3.设计及创建表结构
根据文件内容来设计和创建表结构,当然你也可以用中文
# 创建存储数据的表
table_name = 'public.excel_data'
ddl = """
DROP TABLE IF EXISTS public.excel_data;
CREATE TABLE IF NOT EXISTS public.excel_data (
file_nm VARCHAR(255),
cust_nm VARCHAR(255),
cert_no VARCHAR(255),
prod_nm VARCHAR(255),
amt numeric(20,2),
crt_dtm timestamp NOT NULL DEFAULT now() -- 创建时间
);
"""
4.处理数据
思路如下:
提取文件名
读取Excel文件数据并提取前4列
列名重命名
根据条件过滤末尾的空行
将数据存储到PostgreSQL表中
处理成功后将Excel文件移动到end目录
重点讲下to_sql()
函数:
name:SQL 表名
con:与数据库链接的⽅式,推荐使⽤sqlalchemy的engine类型
schema:相应数据库的引擎,不设置则使⽤数据库的默认引擎,如mysql中的innodb引擎
if_exists:当数据库中已经存在数据表时对数据表的操作,有replace替换、append追加,fail则当表存在时提⽰
index:对DataFrame的index索引的处理,为True时索引也将作为数据写⼊数据表
index_label:当上⼀个参数index为True时,设置写⼊数据表时index的列名称
chunsize:设置整数,如20000,⼀次写⼊数据时的数据⾏数量,当数据量很⼤时,需要设置,否则会链接超时写⼊失败。
dtype:列名到 SQL 类型的字典,默认无;可选地指定列的数据类型
完整代码如下:
import os
import pandas as pd
import logging.config
import shutil
import datetime
from sqlalchemy import create_engine
_tb_nm = 'excel_to_data'
_tb_nm_cn = "excel数据入库"
_service_code = _tb_nm
# 日志目录
log_home = '/home/xusl/log/excel'
# 日志level
log_level = logging.INFO
# 日志打印到控制台
log_to_console = True
# 配置日志记录器
log_config = {
'version': 1,
'formatters': {
'generic': {
'format': '%(asctime)s %(levelname)-5.5s [%(name)s:%(lineno)s][%(threadName)s] %(message)s',
},
'simple': {
'format': '%(asctime)s %(levelname)-5.5s %(message)s',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'generic',
},
'file': {
'class': 'logging.FileHandler',
'filename': os.path.join(log_home, _tb_nm + '.log'),
'encoding': 'utf-8',
'formatter': 'generic',
},
},
'root': {
'level': log_level,
'handlers': ['console', 'file', ] if log_to_console else ['file', ],
}
}
logging.config.dictConfig(log_config)
logger = logging.getLogger(_tb_nm)
# 建立与PostgreSQL数据库的连接 39数据库
db_user = 'dps'
db_password = 'DPS888'
db_host = '10.12.8.88'
db_port = '5432'
db_name = 'dpstest'
def get_conn():
conn_url = 'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}'
engine = create_engine(conn_url.format(database=db_name, user=db_user, password=db_password, host=db_host, port=db_port),
pool_size=20,
pool_recycle=7200,
connect_args={'connect_timeout': 30})
try:
with engine.connect():
print('成功连接到数据库')
except Exception as e:
print('无法连接到数据库:', str(e))
return engine
# engine = create_engine(f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')
# 创建存储数据的表
table_name = 'public.excel_data'
ddl = """
DROP TABLE IF EXISTS public.excel_data;
CREATE TABLE IF NOT EXISTS public.excel_data (
file_nm VARCHAR(255),
cust_nm VARCHAR(255),
cert_no VARCHAR(255),
prod_nm VARCHAR(255),
amt numeric(20,2),
crt_dtm timestamp NOT NULL DEFAULT now() -- 创建时间
);
"""
# 遍历指定目录下的所有Excel文件
excel_dir = '/home/xusl/data'
src_excel = '/home/xusl/data/src'
end_excel = '/home/xusl/data/end'
src_dir = 'src'
end_dir = 'end'
def deal(conn):
for filename in os.listdir(src_excel):
if not filename.endswith('.xlsx'):
logging.info('没有excel文件!')
continue
else:
logging.info('')
logging.info('')
excel_file = os.path.join(src_excel, filename)
# 提取文件名
file_nm = os.path.basename(excel_file)
func_name = file_nm
logging.info('start %s' % func_name)
logging.info(f'Reading data from {excel_file}')
d0 = datetime.datetime.now()
# 读取Excel文件数据并提取前4列
try:
df = pd.read_excel(excel_file, usecols=[0, 1, 2, 3])
logging.info('df读取内容:%s ' % df)
except Exception as e:
logging.error(f'Error reading file {excel_file}: {str(e)}')
continue
# 修改列名
df.columns = ['cust_nm', 'cert_no', 'prod_nm', 'amt']
logging.info('df修改后内容:%s ' % df)
# 根据条件过滤末尾的空行
if not df.empty and df.iloc[-1].isnull().all():
df = df[:-1]
logging.debug('df删减末尾后:%s ' % df)
# 将数据存储到PostgreSQL表中
df['file_nm'] = file_nm
df = df[['file_nm', 'cust_nm', 'cert_no', 'prod_nm', 'amt']]
try:
# 将整个DF导入数据库中
df.to_sql(name='excel_data', schema='public', con=conn, if_exists="append", index=False)
d1 = datetime.datetime.now()
s = (d1 - d0).total_seconds()
logging.info('... end %s, 耗时: %s 秒. ' % (func_name, s))
except Exception as e:
logging.error(f'Error inserting data from file {excel_file}: {str(e)}')
continue
# 处理成功后将Excel文件移动到end目录
src_file = os.path.join(src_excel, filename)
end_file = os.path.join(end_excel, filename)
try:
shutil.move(src_file, end_file)
except Exception as e:
logging.error(f'Error moving file {src_file} to {end_file}: {str(e)}')
# 关闭数据库连接
# engine.dispose()
if __name__ == '__main__':
engine = get_conn()
deal(engine)