数据库为pgsql,表中有jsonb形式的字段
方式一:使用execute直接执行insert语句,比较慢
- insert语句中json格式字段需要转换
- 数据库中NULL、true、false字段,python中需要替换为None、True、False
# -*- coding: UTF-8 -*-
import psycopg2
from psycopg2.extras import Json
connection = psycopg2.connect(
host="xxx",
port="xxx",
database="xxx",
user="xxx",
password="xxx"
)
cursor = connection.cursor()
for i in range(10000,100000):
id = 2066645753478356990+i
order = '0521'+str(i)
cursor.execute("INSERT INTO public.mo_mfg_order (id, order, deleted, name, extra) VALUES (%s,%s,%s,%s,%s)",(id, order, False, None, Json({"line": [], "workshop": []})))
connection.commit()
cursor.close()
connection.close()
方式二:通过copy_from方式直接从csv文件拷贝到数据库,速度提高30倍
- csv文件中,json字段中有英文逗号,导致python识别时自动以逗号分隔字符,网上一堆方法都不好用,直接修改系统文件分割方式
- csv文件中的空值(对应数据库NULL),需要在copy_from中说明
-
修改系统配置
改完之后,需要用excel打开csv文件,重新处理(WPS不行!)
# -*- coding: UTF-8 -*-
import psycopg2
import csv
from io import StringIO
connection = psycopg2.connect(
host="xxx",
port="xxx",
database="xxx",
user="xxx",
password="xxx"
)
cursor = connection.cursor()
with open('data1.csv', mode='r') as f:
reader = csv.reader(f, delimiter='|')
# 创建一个内存文件对象用于存储预处理后的CSV数据
output = StringIO()
# 处理每一行数据
for row in reader:
output.write('|'.join(row) + '\n')
# 移动到内存文件的开头
output.seek(0)
# print(output.getvalue())
# 使用copy_from方法将数据插入PostgreSQL
cursor.copy_from(output, 'mo_mfg_order', null='', sep='|', columns=('id', 'name', 'age', 'extra'))
connection.commit()
cursor.close()
connection.close()
方式三:在方式二的基础上增加多线程
import psycopg2
import csv
from io import StringIO
from concurrent.futures import ThreadPoolExecutor
# 批量处理的数据量
BATCH_SIZE = 2000 #可调节
def process_batch(batch):
# 数据库连接配置
connection = psycopg2.connect(
host="dev-new.leadigital.net",
port="15435",
database="jicimi_biz_dc_dev_1",
user="cyf",
password="123456"
)
cursor = connection.cursor()
output = StringIO()
for row1 in batch:
output.write('|'.join(row1) + '\n')
output.seek(0)
cursor.copy_from(output, 'mo_mfg_order', null='', sep='|', columns=('id', 'name', 'age', 'extra'))
# 提交事务并关闭连接
connection.commit()
cursor.close()
connection.close()
# 打开CSV文件并分批处理数据
with open('data1.csv', mode='r') as f:
reader = csv.reader(f, delimiter='|')
batch = []
with ThreadPoolExecutor(max_workers=4) as executor: #可调节
for row in reader:
batch.append(row)
if len(batch) == BATCH_SIZE:
executor.submit(process_batch, batch)
batch = []
# 处理剩余的数据
if batch:
executor.submit(process_batch, batch)