使用psycopg2进行批量更新数据库及批量插入

1处理数据

    def update():
        # 获取太阳光照
        local_data = pd.read_csv('./data.csv')
        # 调用数据库工具类生成对象
        db = DbHandle()
        engine = create_engine("postgresql+psycopg2://username:password@host:port/database")
        data_range = [datetime.datetime.strptime('2019-04-25 08:00:00', '%Y-%m-%d %H:%M:%S'), datetime.datetime.strptime('2019-09-27 00:00:00', '%Y-%m-%d %H:%M:%S')]
        data_day_delta = (data_range[1]-data_range[0]).days
        for delta_day in range(data_day_delta + 1):
            data_add_delta_day = data_range[0] + datetime.timedelta(days=delta_day)
            for hour in range(24):
                data_add_delta_day_hour = data_add_delta_day + datetime.timedelta(hours=hour)
                data_add_delta_day_hour = str(data_add_delta_day_hour)
                print(f'更新第{data_add_delta_day_hour}天数据')
                sql = f"select id, grid_id, data, to_char(published_at, 'YYYY-MM-DD HH24:MI:SS') as published_at from grid_weather where published_at='{data_add_delta_day_hour}'"
                # 获取空气质量
                db_data = pd.read_sql(sql, engine)
                # 如果dataframe不为空
                if not db_data.empty:
                    db_data = pd.merge(db_data, local_data, left_on=['grid_id', 'published_at'], right_on=['id', 'date'])
                    if not db_data.empty:
                        # 取特定列
                        db_data = db_data[['grid_id', 'data', 'published_at', 'ssra', 'id_x']]
                        print(db_data)
                        # 将dataframe转换为numpy格式(结果为列表套列表)
                        db_data = db_data.to_numpy()
                        insert_data = []
                        for data in db_data:
                            # 如果data中的第二个数据中的字典里有solar_radiation字段,则不作任何处理,否则将特定值赋值给该字典中的solar_radiation字段
                            if 'solar_radiation' in data[1]:
                                pass
                            else:
                                data[1]['solar_radiation'] = data[3]
                               # data = (json.dumps(data[1]), data[0], data[2])
                                data = (json.dumps(data[1]), data[4])
                                # 将数据经过处理后加入一个元组,一个元组就是一条更新信息,然后加入一个列表(此处列表中元组个数为5000个)
                                insert_data.append(data)
                        print(insert_data)
                        if insert_data:
                            # 对5000个元组组成的列表调用对象方法进行批量更新
                            db.update_db(insert_data)

2批量更新对象方法

import psycopg2
import psycopg2.extras

class DbHandle:
    def __init__(self):
        self.link_pgsql = {
               'database': 'test',
               'user': 'username',
               'password': 'password',
               'host': '127.0.0.1',
               'port': 5432
          }

        self.link = psycopg2.connect(**self.link_pgsql)
        self.corsur = self.link.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

    def update_db(self, data, name):
            # sql = 'update grid_weather set data=new_data.data::json from (values %s) as new_data (data, grid_id, published_at_time) where grid_weather.grid_id=new_data.grid_id and published_at=published_at_time;'
        # 批量更新sql,将传入的列表当作一个临时表new_data,字段为data, id,要和元组的顺序对应,注意传入的若有字符串形式的字典要在字段后接::json将其转换为json格式
        columns = data.columns
        sql = f"""update {name} set data=new_data.data::json from (values %s) as new_data ({','.join(columns)}) where {name}.id=new_data.id;"""
        try:
            print(data)
            print(len(data))
            data = data.to_numpy()
            # 批量更新用execute_values进行,传入游标,sql,数据,和最大数据长度
            psycopg2.extras.execute_values(self.corsur, sql, data, page_size=5000)
            self.link.commit()
            print('更新成功')
            return True
        except Exception as e:
            print(e)
            print('更新失败')
            return False

批量更新要点:
1.先组装数据数组;
2.写批量更新sql(将传入的数组当作临时表,数组中的元素当作临时表中的字段,进行条件判断更新),数组中的元素一般为元组,元组中的字典元素需转换为json格式(py中其形式为字符串),然后在更新时要在字符串形式的json字段后面接::json将其转换为数据库中的json格式,否则会报错,正确形式如下图。
sql = 'update grid_weather set data=new_data.data::json from (values %s) as new_data (data, grid_id, published_at_time) where grid_weather.grid_id=new_data.grid_id and published_at=published_at_time;'此处条件判断时最好用唯一键进行判断,慎用时间相关的字段进行判断,否则有可能会因为两个时间格式不同而无法更新。

3批量插入的三种方法

本方法都是将pandas的dataframe的值批量插入数据库

import psycopg2
import psycopg2.extras

class DbHandle:
    def __init__(self):
        self.link_pgsql = {
                    'database': 'test',
                    'user': 'spider',
                    'password': '123456',
                    'host': '127.0.0.1',
                    'port': 5432
                }
        self.link = psycopg2.connect(**self.link_pgsql)
        self.corsur = self.link.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    def update_db(self, data, name):
        """批量更新"""
        # sql = 'update grid_weather set data=new_data.data::json from (values %s) as new_data (data, grid_id, published_at_time) where grid_weather.grid_id=new_data.grid_id and published_at=published_at_time;'
        columns = data.columns
        sql = f"""update {name} set data=new_data.data::json from (values %s) as new_data ({','.join(columns)}) where {name}.id=new_data.id;"""
        try:
            print(data)
            print(len(data))
            data = data.to_numpy()
            psycopg2.extras.execute_values(self.corsur, sql, data, page_size=4900)
            self.link.commit()
            print('更新成功')
            return True
        except Exception as e:
            print(e)
            print('更新失败')
            return False

    def insert_lots_of_by_many(self, df, name):
        """简单实用,属于游标的对象方法"""
        # sql = f'insert into {name}(grid_id, data, published_at) values (%s, %s, %s);'
        columns = data.columns
        sql = f"""insert into {name}({','.join(columns)}) values ({','.join(['%s'] * len(columns))});"""
        print(sql)
        data = df.to_numpy()
        print(data)
        self.corsur.executemany(sql, data)
        self.link.commit()

    def insert_lots_of_by_values(self, data, name):
        """官方推荐,要批量操作的字段的值必须相同"""
        columns = data.columns
        sql = f'insert into {name}({",".join(columns)}) values %s;'
        print(sql)
        try:
            data = data.to_numpy()
            print(data)
            print(len(data))
            psycopg2.extras.execute_values(self.corsur, sql, data, page_size=4900)
            self.link.commit()
            print('更新成功')
            return True
        except Exception as e:
            print(e)
            print('更新失败')
            return False

    def insert_lots_of_by_batch(self, data, name):
        """性能好,速度快,属于类方法"""
        # sql = f"""insert into {name}(grid_id, data, published_at) values (%s, %s, %s);"""
        columns = data.columns
        sql = f"""insert into {name}({','.join(columns)}) values ({','.join(['%s']*len(columns))});"""
        print(sql)
        try:
            data = data.to_numpy()
            psycopg2.extras.execute_batch(self.corsur, sql, data, page_size=4900)
            self.link.commit()
            print('更新成功')
            return True
        except Exception as e:
            print(e)
            print('更新失败')
            return False
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容

  • 国家电网公司企业标准(Q/GDW)- 面向对象的用电信息数据交换协议 - 报批稿:20170802 前言: 排版 ...
    庭说阅读 10,849评论 6 13
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,082评论 1 32
  • 批量任务 账务处理子系统作为核心银行系统的系统内核,这使得账务处理子系统的设计和实现成为商业银行核心业务系统构建过...
    JC1265阅读 11,363评论 0 17
  • 1.埋点是做什么的 2.如何进行埋点 3.埋点方案的设计 近期常被问到这个问题,我担心我的答案会将一些天真烂漫的孩...
    lxg阅读 2,011评论 0 1
  • http://www.itpub.net/thread-1393352-1-1.htmlhi,看到你关于数据库存储...
    yahzon阅读 579评论 1 0