ORM项目
复习
字符串.join方法
print(','.join(['dddsd','sddsfdsf','sdfddsdsfdfsdf'])) # dddsd,sddsfdsf,sdfddsdsfdfsdf
print(','.join('sddsffffffffffff')) # s,d,d,s,f,f,f,f,f,f,f,f,f,f,f,f
%s 扩展用法
print('%s is %s'%('lyy','sb'))
print('%(name)s is %(feature)s'%({'name':'lyy','feature':'SB'}))
_getattr_ 和 _setattr_
# 关于__getattr__ 和 __setattr__
1.__getattr__ >>> 比如说user.name,user是由一个类实例化的对象
# 当一个对象.name的时候,就会往user对象里找name这个属性;
# 如果有name这个属性,则直接返回在name这个对象下返回的name的值
# 如果没有name这个属性,则就会调用__getattr__执行下面的代码块
2. __setattr__ >>> 比如说 user.name='lyysb' ,user是由一个类实例化的对象
# 当一个对象 user.name='lyysb'的时候, 会调用这个方法
# __init__ 下可以通过对象直接进行.语法赋值就是在底层调用了__setattr__方法
class Foo(dict):
def __init__(self,**kwargs):
print(kwargs)
super().__init__(**kwargs)
def __setattr__(self, key, value): # 当碰到Foo().a=1,就会触发这个方法
print('run')
self[key]=value
def __getattr__(self, item):
print(666)
try:
return self[item]
except TypeError:
raise ('没有该属性')
>>> 定义 __setattr__方法和 __getattr__方法的目的
name=Foo()
name.aaa='123' # 走的是__setattr__方法
print(name.aaa) # 走的时候__getattr__方法
################################################################################################
**kwargs的作用
class User(Foo):
pass
# 添加数据
a=User(name='kkk',age=34,password=2134)
print(a.name)
# 修改数据
a.name='sb'
print(a.name)
函数setattr和getattr
# hasattr(object, name)
判断一个对象里面是否有name属性或者name方法,返回BOOL值,有name特性返回True, 否则返回False。
需要注意的是name要用括号括起来
class test():
name='lyysb'
def run(self):
return 'lyy is a super SB'
t=test()
print(hasattr(t,'name')) # True
# getattr(object, name[,default])
获取对象object的属性或者方法,如果存在打印出来;
如果不存在,打印出默认值,默认值可选。
需要注意的是,如果是返回的对象的方法,返回的是方法的内存地址,如果需要运行这个方法,
可以在后面添加一对括号。
class test():
name='lyysb'
def run(self):
return 'lyy is a super SB'
t=test()
1.获取name属性,存在就打印出来。
print(getattr(t, "name")) # lyysb
2.获取run方法,存在就打印出方法的内存地址。
print(getattr(t,'run')) # <bound method test.run of <__main__.test object at 0x7fc729c5b7b8>>
3.获取run方法,后面加括号可以将这个方法运行。
print(getattr(t,'run')()) # lyy is a super SB
4.获取一个不存在的属性
# print(getattr(t,'sddsds')) # AttributeError: 'test' object has no attribute 'sddsds'
5.设置默认值 若属性不存在,返回一个默认值
print(getattr(t,'sdsdds','lyysb')) # 返回默认值lyysb
# setattr(object, name, values)
给对象的属性赋值,若属性不存在,先创建再赋值。
>>> 为属相赋值,并没有返回值
1. 类的名称空间
print(test.__dict__)
#{'__module__': '__main__', 'name': 'lyysb', 'run': <function test.run at 0x7f97378b1d90>, '__dict__': <attribute '__dict__' of 'test' objects>, '__weakref__': <attribute '__weakref__' of 'test' objects>, '__doc__': None}
2. 对象的名称空间
print(t.__dict__) #{}
3. 添加属性
print(setattr(t, "age", "18")) #None
4. 添加属性后对象的名称空间
{'age': '18'}
# 总结
对象刚创建,名称空间是空的,但是对象可以调用类的名称空间
当执行setattr 时候,属性会存入对象的名称空间,类的名称空间不变
打散机制
# 字典打散通过**,打散后的数据是key=value的形式,所以只能作为参数传给函数
attrs=[{'id': 1, 'name': 'lxx', 'password': '123'}]
res=attrs[0]
def f(id,name,password):
print(id)
print(name)
print(password)
f(id=1,name='lxx',password='123')
print(dict(**res))
元类
# 定义元类时候一定要返回 type.__new__(cls, name,bases,attrs)
class ModelsMetaclass(type):
def __new__(cls, name,bases,attrs): # 实例化成类对象下面所具备的属性
print(attrs)
print(attrs['TABLE_NAME'])
return type.__new__(cls, name,bases,attrs)
class User(metaclass=ModelsMetaclass):
TABLE_NAME='USER'
a=1
b=2
print(type(User)) # b<class '__main__.ModelsMetaclass'>
# 如果不返回 type.__new__(cls, name,bases,attrs),则生成的类对象就是nonetype
class ModelsMetaclass(type):
def __new__(cls, name,bases,attrs): # 实例化成类对象下面所具备的属性
pass
class User(metaclass=ModelsMetaclass):
TABLE_NAME='USER'
a=1
b=2
print(type(User)) # <class 'NoneType'>
元类的应用场景
取出由元类生成的类中的属性
class ModelsMetaclass(type):
def __new__(cls, name, bases, attrs): # 实例化成类对象下面所具备的属性
print(attrs) # {'__module__': '__main__', '__qualname__': 'User', 'TABLE_NAME': 'USER', 'a': 1, 'b': 2}
print(attrs['TABLE_NAME']) # USER
return type.__new__(cls, name, bases, attrs)
class User(metaclass=ModelsMetaclass):
TABLE_NAME = 'USER'
a = 1
b = 2
限定由元类产生的类对象的规则
class ModelsMetaclass(type):
def __new__(cls, name,bases,attrs): # 实例化成类对象下面所具备的属性
# print(name[0])
if name[0].islower():
raise Exception('类名称首字母应该大写')
print('造类成功')
# return type.__new__(cls, name,bases,attrs)
class ser(metaclass=ModelsMetaclass):
TABLE_NAME='USER'
a=1
b=2
常用报错
TypeError: NoneType takes no arguments
class User(None):
pass
# 任何类继承None 都会报错
# 如果不返回type.__new__(cls,name, bases, attrs),则就会造类失败
class ModelsMetaclass(type):
def __new__(cls, name, bases, attrs): # 实例化成类对象下面所具备的属性
print(attrs)
class Model(metaclass=ModelsMetaclass):# 类型是None
pass
class User(Model): # 这个类继承了None
TABLE_NAME = 'USER'
a = 1
b = 2
# 正确写法
class ModelsMetaclass(type):
def __new__(cls, name, bases, attrs): # 实例化成类对象下面所具备的属性
print(attrs)
return type.__new__(cls,name, bases, attrs)
class Model(metaclass=ModelsMetaclass):# 类型是None
pass
class User(Model): # 这个类继承了None
TABLE_NAME = 'USER'
a = 1
b = 2
复习pymysql
# 表结构
mysql> select * from user;
+----+------+----------+
| id | name | password |
+----+------+----------+
| 1 | lxx | 123 |
+----+------+----------+
1 row in set (0.00 sec)
# 当接收不到服务器的值的时候,最后返回空元祖
import pymysql
conn=pymysql.connect(
host='database',
port=3306,
user='java',
password='1234',
charset='utf8',
database='youku',
autocommit=True
)
cursor=conn.cursor(cursor=pymysql.cursors.DictCursor)
cursor.execute("select * from user where id=132")
print(cursor.fetchall()) # ()
# 防止SQL注入(不要做字符串拼接,在cursor提交命令时候再传入参数)
import pymysql
conn = pymysql.connect(
host='database',
port=3306,
user='java',
password='1234',
charset='utf8',
database='youku',
autocommit=True
)
sql='select * from user where id=%s'
cursor=conn.cursor(pymysql.cursors.DictCursor)
cursor.execute(sql,args=1)
print(cursor.fetchall())
单例模式
# 每次只能生成一个对象
import pymysql
class Mysql:
__instense=None
def __init__(self):
self.conn=pymysql.connect(
host='database',
port=3306,
user='java',
password='1234',
charset='utf8',
database='youku',
autocommit=True
)
self.cursor=self.conn.cursor(cursor=pymysql.cursors.DictCursor)
def close_db(self):
self.cursor.close()
self.conn.close()
def select(self,sql,args):
self.cursor.execute(sql,args)
#(selct * from user where id=%s,1)
'''
sql=select * from user where name=%s and password=%s
cursor.execute(sql,(lxx,123))
'''
rs=self.cursor.fetchall()
print(rs,'sssssssssssssssssss')
return rs
def execute(self,sql,args):
try:
self.cursor.execute(sql,args)
affected=self.cursor.rowcount
except BaseException as e:
print(e)
return affected
@classmethod
def singleton(cls):
if not cls.__instense:
cls.__instense=cls() #对象不存在,生成一个对象
return cls.__instense
if __name__ == '__main__':
ms=Mysql()
# re=ms.select('select * from user where id =%s',1)
# print(re)
print(ms.select('select * from user where id=%s', 1))
数据库连接池
import pymysql
from DBUtils.PooledDB import PooledDB
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=0,
# ping MySQL服务端,检查是否服务可用。
host='database',
port=3306,
user='java',
password='1234',
database='youku',
charset='utf8'
)
def func():
# 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
# 否则
# 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
# 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
# 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
# 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
conn = POOL.connection()
print( '链接被拿走了', conn._con)
print( '池子里目前有', POOL._idle_cache, '\r\n')
cursor = conn.cursor()
cursor.execute('select * from user')
result = cursor.fetchall()
print(result)
conn.close()
if __name__ == '__main__':
func()
对象关系映射
单例模式版本
- fuckorm.py
from ORM项目 import Mysql_singleton
# 父类
class Field:
def __init__(self, name, column_type, primary_key, default):
self.name = name
self.column_type = column_type
self.primary_key = primary_key
self.default = default
# # 定义varchar类
class StringField(Field):
def __init__(self, name=None, column_type='varchar(200)', primary_key=False, default=None):
super().__init__(name, column_type, primary_key, default)
# 定义int类
class IntegerField(Field):
def __init__(self, name=None, column_type='int', primary_key=False, default=None):
super().__init__(name, column_type, primary_key, default)
# 定义元类
# 让一个表对应到一个类里面,类里面需要有表名 哪个字段是主键
class ModelsMetaclass(type):
def __new__(cls, name, bases, attrs): # 实例化成类对象下面所具备的属性
if name == 'Models':
return type.__new__(cls, name, bases, attrs)
print(name)
print(attrs)
table_name = attrs.get('table_name', None) # 将表名存入类的名称空间里
# print(attrs)
if not table_name:
table_name = name
primary_key = None
mappings = dict() # 定义空字典,存的是列对象
for k, v in attrs.items():
if isinstance(v, Field):
mappings[k] = v # 将需要的字段都放进mappings这个字典里
if v.primary_key:
# 找到主键
if primary_key:
raise TypeError('主键重复:%s' % k)
primary_key = v.name
for k in mappings.keys():
attrs.pop(k) # 将需要的字段从attr这个名称空间中删除
if not primary_key:
raise TypeError('没有主键')
attrs['table_name'] = table_name
attrs['primary_key'] = primary_key
attrs['mappings'] = mappings
return type.__new__(cls, name, bases, attrs)
# 使得对象有. 语法
class Models(dict, metaclass=ModelsMetaclass):
def __init__(self, **kwargs):
super().__init__(**kwargs)
def __setattr__(self, key, value): # 字典对象
self[key] = value
def __getattr__(self, item): # 如果.语法后面的名字不是类中的属性(不在kwargs这个名称空间里),则就会报错,所以需要异常处理
try:
return self[item]
except TypeError:
raise ('没有该属性')
@classmethod
def select_one(cls, **kwargs):
# 只查一条
print(kwargs) # {'id': 1}
key = list(kwargs.keys())[0] # ['id']
value = kwargs[key]
sql = 'select * from %s where %s=?' % (cls.table_name, key) # select * from user where id=?
print(sql, '=========')
sql = sql.replace('?', '%s') # select * from user where id=%s
print(sql, '+++++++++++++++++++')
ms = Mysql_singleton.Mysql()
re = ms.select(sql, value) # 如果收不到就是空的元组
print(re, '&&&&&&&&&&&&&&&&&&&&&&&&&&&')
if re: # [{'id': 1, 'name': 'lxx', 'password': '123'}]
return cls(**re[0]) # User(id=1,name='lxx',password='123')
else: # user=User.select_one(id=2)
return
@classmethod
def select_many(cls, **kwargs):
ms = Mysql_singleton.Mysql()
if kwargs:
key = list(kwargs.keys())[0]
value = kwargs[key]
sql = 'select * from %s where %s=?' % (cls.table_name, key)
sql = sql.replace('?', '%s')
re = ms.select(sql, value)
else:
sql='select * from %s'(cls.table_name)
re=ms.select(sql,None)
if re:
return cls(**re[0])
else:
return
def update(self):
ms=Mysql_singleton.Mysql()
# update user set name='lyysb' and password='ssb'
fields=[] # 存所有非主键字段
args=[] # 存传入pysql的参数
pr=None # 存主键
for k,v in self.mappings.items():
if v.primary_key:
pr=getattr(self,v.name,None)
else:
fields.append(v.name+'=?')
args.append(getattr(self,v.name,None))
sql="update %s set %s where %s =%s"%(self.table_name,','.join(fields),self.primary_key,pr)
sql=sql.replace('?','%s')
print(sql)
ms.execute(sql,args)
def save(self):
ms=Mysql_singleton.Mysql()
# insert into user (name,password) values (?,?)
field=[]
values=[]
args=[]
for k,v in self.mappings.items():
if not v.primary_key:
field.append(v.name)
values.append('?')
args.append(getattr(self,v.name,None))
sql='insert into %s(%s) values (%s)'%(self.table_name,','.join(field),','.join(values))
sql=sql.replace('?','%s')
ms.execute(sql,args)
class User(Models):
table_name = 'user'
id = IntegerField(name='id', primary_key=True, default=0)
password = StringField('password')
if __name__ == '__main__':
user = User.select_one(id=1)
print(user.name)
# a = User(name='we', password='s231')
-
Mysql_singleton
import pymysql class Mysql: __instense = None def __init__(self): self.conn = pymysql.connect( host='database', port=3306, user='java', password='1234', charset='utf8', database='youku', autocommit=True ) self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor) def close_db(self): self.cursor.close() self.conn.close() def select(self, sql, args): self.cursor.execute(sql, args) # (selct * from user where id=%s,1) ''' sql=select * from user where name=%s and password=%s cursor.execute(sql,(lxx,123)) ''' rs = self.cursor.fetchall() print(rs, 'sssssssssssssssssss') return rs def execute(self, sql, args): try: self.cursor.execute(sql, args) affected = self.cursor.rowcount except BaseException as e: print(e) return affected @classmethod def singleton(cls): if not cls.__instense: cls.__instense = cls() return cls.__instense if __name__ == '__main__': ms = Mysql() # re=ms.select('select * from user where id =%s',1) # print(re) print(ms.select('select * from user where id=%s', 1))
连接池版本
- fuckorm.py
from ORM项目.ORM_POOL import Mysql_p
# 父类
class Field:
def __init__(self, name, column_type, primary_key, default):
self.name = name
self.column_type = column_type
self.primary_key = primary_key
self.default = default
#
# # 定义varchar类
class StringField(Field):
def __init__(self, name=None, column_type='varchar(200)', primary_key=False, default=None):
super().__init__(name, column_type, primary_key, default)
# 定义int类
class IntegerField(Field):
def __init__(self, name=None, column_type='int', primary_key=False, default=None):
super().__init__(name, column_type, primary_key, default)
# 定义元类
# 让一个表对应到一个类里面,类里面需要有表名 哪个字段是主键
class ModelsMetaclass(type):
def __new__(cls, name, bases, attrs): # 实例化成类对象下面所具备的属性
if name == 'Models':
return type.__new__(cls, name, bases, attrs)
print(name)
print(attrs)
table_name = attrs.get('table_name', None) # 将表名存入类的名称空间里
# print(attrs)
if not table_name:
table_name = name
primary_key = None
mappings = dict() # 定义空字典,存的是列对象
for k, v in attrs.items():
if isinstance(v, Field):
mappings[k] = v # 将需要的字段都放进mappings这个字典里
if v.primary_key:
# 找到主键
if primary_key:
raise TypeError('主键重复:%s' % k)
primary_key = v.name
for k in mappings.keys():
attrs.pop(k) # 将需要的字段从attr这个名称空间中删除
if not primary_key:
raise TypeError('没有主键')
attrs['table_name'] = table_name
attrs['primary_key'] = primary_key
attrs['mappings'] = mappings
return type.__new__(cls, name, bases, attrs)
# 使得对象有. 语法
class Models(dict, metaclass=ModelsMetaclass):
def __init__(self, **kwargs):
super().__init__(**kwargs)
def __setattr__(self, key, value): # 字典对象
self[key] = value
def __getattr__(self, item): # 如果.语法后面的名字不是类中的属性(不在kwargs这个名称空间里),则就会报错,所以需要异常处理
try:
return self[item]
except TypeError:
raise ('没有该属性')
@classmethod
def select_one(cls, **kwargs):
# 只查一条
print(kwargs) # {'id': 1}
key = list(kwargs.keys())[0] # ['id']
value = kwargs[key]
sql = 'select * from %s where %s=?' % (cls.table_name, key) # select * from user where id=?
print(sql, '=========')
sql = sql.replace('?', '%s') # select * from user where id=%s
print(sql, '+++++++++++++++++++')
ms = Mysql_p.Mysql()
re = ms.select(sql, value) # 如果收不到就是空的元组
print(re, '&&&&&&&&&&&&&&&&&&&&&&&&&&&')
if re: # [{'id': 1, 'name': 'lxx', 'password': '123'}]
return cls(**re[0]) # User(id=1,name='lxx',password='123')
else: # user=User.select_one(id=2)
return
@classmethod
def select_many(cls, **kwargs):
ms = Mysql_p.Mysql()
if kwargs:
key = list(kwargs.keys())[0]
value = kwargs[key]
sql = 'select * from %s where %s=?' % (cls.table_name, key)
sql = sql.replace('?', '%s')
re = ms.select(sql, value)
else:
sql='select * from %s'%(cls.table_name)
re=ms.select(sql,None)
if re:
lis_obj = [cls(**r) for r in re]
return lis_obj
else:
return
def update(self):
ms= Mysql_p.Mysql()
# update user set name='lyysb' and password='ssb'
fields=[] # 存所有非主键字段
args=[] # 存传入pysql的参数
pr=None # 存主键
for k,v in self.mappings.items():
if v.primary_key:
pr=getattr(self,v.name,None)
else:
fields.append(v.name+'=?')
args.append(getattr(self,v.name,None))
sql="update %s set %s where %s =%s"%(self.table_name,','.join(fields),self.primary_key,pr)
sql=sql.replace('?','%s')
print(sql)
ms.execute(sql,args)
def save(self):
ms= Mysql_p.Mysql()
# insert into user (name,password) values (?,?)
field=[]
values=[]
args=[]
for k,v in self.mappings.items():
if not v.primary_key:
field.append(v.name)
values.append('?')
args.append(getattr(self,v.name,None))
sql='insert into %s(%s) values (%s)'%(self.table_name,','.join(field),','.join(values))
sql=sql.replace('?','%s')
ms.execute(sql,args)
class User(Models):
table_name = 'user'
id = IntegerField(name='id', primary_key=True, default=0)
password = StringField('password')
class Notice(Models):
table_name='notice'
id=IntegerField(name='id',primary_key=True)
name=StringField('name')
content=StringField('content')
user_id=IntegerField('user_id')
if __name__ == '__main__':
user = User.select_one(id=1)
print(user.name)
# a = User(name='we', password='s231')
# 测试(select)
# notice=Notice.select_one(id=1)
# print(notice) #{'id': 1, 'name': '测试1', 'content': '内容1', 'user_id': 1, 'create_time': datetime.datetime(2019, 6, 16, 21, 29, 55)}
# print(notice.content) # 内容1
# select_many
# notice_list=Notice.select_many()
# print(notice_list) #[{'id': 1, 'name': '测试1', 'content': '内容1', 'user_id': 1, 'create_time': datetime.datetime(2019, 6, 16, 21, 29, 55)}, {'id': 2, 'name': '测试2', 'content': '内容2', 'user_id': 1, 'create_time': datetime.datetime(2019, 6, 16, 21, 30, 4)}] sssssssssssssssssss
# update
# notice.name='我改了'
# notice.update()
# 插入数据
notice=Notice(name='123',content='.....',user_id=1)
notice.save()
-
Mysql_p
import pymysql from ORM项目.ORM_POOL import mysql_pool class Mysql: def __init__(self): self.conn= mysql_pool.POOL.connection() self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor) def close_db(self): self.cursor.close() self.conn.close() def select(self, sql, args): self.cursor.execute(sql, args) # (selct * from user where id=%s,1) ''' sql=select * from user where name=%s and password=%s cursor.execute(sql,(lxx,123)) ''' rs = self.cursor.fetchall() print(rs, 'sssssssssssssssssss') return rs def execute(self, sql, args): try: self.cursor.execute(sql, args) affected = self.cursor.rowcount except BaseException as e: print(e) return affected if __name__ == '__main__': ms = Mysql() # re=ms.select('select * from user where id =%s',1) # print(re) print(ms.select('select * from user where id=%s', 1))
- mysql_pool
import pymysql from DBUtils.PooledDB import PooledDB POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。 ping=0, # ping MySQL服务端,检查是否服务可用。 host='database', port=3306, user='java', password='1234', database='youku', charset='utf8', autocommit=True ) def func(): # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常 # 否则 # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。 # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。 # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。 # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。 conn = POOL.connection() print( '链接被拿走了', conn._con) print( '池子里目前有', POOL._idle_cache, '\r\n') cursor = conn.cursor() cursor.execute('select * from user') result = cursor.fetchall() print(result) conn.close() if __name__ == '__main__': func()
sqlalchemy 模块实现对象关系映射
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint
engine = create_engine("mysql+pymysql://root:123@localhost/test", encoding='utf8')
Base = declarative_base()
class UserType(Base):
__tablename__ = 'Usertype'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(32), nullable=False, server_default='')
class User(Base):
__tablename__ = 'user'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(32), nullable=True, server_default='')
extra = Column(String(32), nullable=True, server_default='')
type_id = Column(Integer, ForeignKey(UserType.id))
Session = sessionmaker(engine)
session = Session()
def create_tabler():
Base.metadata.create_all(engine)
def drop_table():
Base.metadata.drop_all(engine)
def insert_user():
data = [
User(name='lyy', extra='SB', type_id=5),
User(name='lyy2', extra='SB2', type_id=3),
User(name='lyy3', extra='SB3', type_id=1),
User(name='lyy4', extra='SB4', type_id=3),
User(name='lyy5', extra='SB5', type_id=4)
]
session.add_all(data)
session.commit()
def insert_usertype_one():
'''
增加一条数据
:return:
'''
data = UserType(name='胡宏鹏SB')
session.add(data)
def insert_usertype_many():
'''
增加多条数据
:return:
'''
data = [
UserType(name='胡大鹏SB'),
UserType(name='胡二鹏SB'),
UserType(name='胡三鹏SB'),
UserType(name='胡四鹏SB'),
UserType(name='胡五鹏SB')
]
session.add_all(data)
session.commit()
def select_user_many():
res=session.query(User).all()
for row in res:
print(row.id,row.name,row.extra,row.type_id)
def select_all():
'''
查询所有记录
:return:
'''
res = session.query(UserType).all() # 列表中存放着对象,每一个对象指向表中的一条记录
for row in res:
print(row.id, row.name)
def select_first():
'''
查询第一条记录
:return:
'''
res = session.query(UserType).first()
print(res.id, res.name)
def select_choice():
'''
查询指定字段
:return:
'''
res = session.query(UserType, UserType.name, UserType.id).all()
'''
UserType 返回一个对象指向一条记录
UserType.name 取出name字段的值
UserType.id 取出id字段的值
最后每一条记录的这三个元素包含在一个元组里,所有的记录包含在一个大列表
[(<__main__.UserType object at 0x7fb41933ef28>, '胡宏鹏SB', 1),
(<__main__.UserType object at 0x7fb41933ef98>, '胡大鹏SB', 2),
(<__main__.UserType object at 0x7fb419361048>, '胡二鹏SB', 3),
(<__main__.UserType object at 0x7fb4193610b8>, '胡三鹏SB', 4),
(<__main__.UserType object at 0x7fb419361128>, '胡四鹏SB', 5),
(<__main__.UserType object at 0x7fb419361198>, '胡五鹏SB', 6)]
'''
for row in res:
print(row[2], row[1], row[0])
'''
1 胡宏鹏SB <__main__.UserType object at 0x7f7ecc640e80>
2 胡大鹏SB <__main__.UserType object at 0x7f7ecc640ef0>
3 胡二鹏SB <__main__.UserType object at 0x7f7ecc640f60>
4 胡三鹏SB <__main__.UserType object at 0x7f7ecc640fd0>
5 胡四鹏SB <__main__.UserType object at 0x7f7ecc661080>
6 胡五鹏SB <__main__.UserType object at 0x7f7ecc6610f0>
'''
def select_get():
'''
查询指定id的数据
:return:
'''
res = session.query(UserType).get(3)
print(res.id, res.name, res) # 3 胡二鹏SB <__main__.UserType object at 0x7fe336c34da0>
def select_where():
# res1=session.query(UserType).filter(UserType.name=='胡宏鹏SB').all() # [<__main__.UserType object at 0x7fa60fa32da0>]
# print(res1[0].id,res1[0].name,res1) # 1 胡宏鹏SB [<__main__.UserType object at 0x7f1a7c17ee10>]
#
# res2=session.query(UserType).filter(UserType.id>3).all()
# for row in res2:
# print(row.id,row.name,row)
'''
/usr/bin/python3.6 /home/java/Desktop/pycharm_project/ORM_POOL/SQLAlchemy的操作/第一遍/execute.py
4 胡三鹏SB <__main__.UserType object at 0x7f5ff32c0d30>
5 胡四鹏SB <__main__.UserType object at 0x7f5ff32c0e10>
6 胡五鹏SB <__main__.UserType object at 0x7f5ff32c0e80>
4 胡三鹏SB <__main__.UserType object at 0x7f5ff32c03c8>
5 胡四鹏SB <__main__.UserType object at 0x7f5ff32c0f60>
6 胡五鹏SB <__main__.UserType object at 0x7f5ff32c0fd0>
'''
test = session.query(UserType).filter(UserType.id > 1, UserType.name == '胡大鹏SB')
# SELECT `Usertype`.id AS `Usertype_id`, `Usertype`.name AS `Usertype_name` FROM `Usertype`
# WHERE `Usertype`.id > %(id_1)s AND `Usertype`.name = %(name_1)s
res3 = test.all()
print(res3[0].id, res3[0].name, res3) # 2 胡大鹏SB [<__main__.UserType object at 0x7fae5ca7bf60>]
def select_between():
'''
查询某一条件区间的记录,一般是id
:return:
'''
res = session.query(UserType).filter(UserType.id.between(1, 4)).all()
for row in res:
print(row.id, row.name, row)
'''
1 胡宏鹏SB <__main__.UserType object at 0x7ff2d4f82dd8>
2 胡大鹏SB <__main__.UserType object at 0x7ff2d4f82ef0>
3 胡二鹏SB <__main__.UserType object at 0x7ff2d4f82f60>
4 胡三鹏SB <__main__.UserType object at 0x7ff2d4f82fd0>
'''
def select_in():
res = session.query(UserType).filter(UserType.name.in_(['胡二鹏SB', '胡大鹏SB', '胡三鹏SB'])).all()
for row in res:
print(row.id, row.name, row)
'''
2 胡大鹏SB <__main__.UserType object at 0x7f292e0abf28>
3 胡二鹏SB <__main__.UserType object at 0x7f292e0d6048>
4 胡三鹏SB <__main__.UserType object at 0x7f292e0d60b8>
'''
def select_not_in():
res = session.query(UserType).filter(UserType.name.notin_(['胡二鹏SB', '胡大鹏SB', '胡三鹏SB'])).all()
for row in res:
print(row.id, row.name, row)
'''
1 胡宏鹏SB <__main__.UserType object at 0x7f0500aecf60>
5 胡四鹏SB <__main__.UserType object at 0x7f0500b16080>
6 胡五鹏SB <__main__.UserType object at 0x7f0500b160f0>
'''
def select_sym(): # 通配符查询
ret = session.query(UserType).filter(UserType.name.like('%二%')).all()
for row in ret:
print(row.name) # 胡二鹏SB
def select_limit():
ret = session.query(UserType)[0:2] # 切片操作
for row in ret:
print(row.name)
def select_sort():
# ret=session.query(User).order_by(User.name.desc()).all()
# for row in ret:
# print(row.name)
'''
lyy5
lyy4
lyy3
lyy2
lyy
'''
ret2=session.query(User).order_by(User.name.desc(),User.id.asc()).all()
for row in ret2:
print(row.name)
'''
lyy5
lyy4
lyy3
lyy2
lyy
'''
def select_group():
from sqlalchemy.sql import func
res=session.query(User.type_id,func.max(User.id),func.min(User.id)).group_by(User.type_id).all()
print(res) # [(1, 3, 3), (3, 4, 2), (4, 5, 5), (5, 1, 1)]
res2=session.query(User.name,func.max(User.id),func.min(User.id)).group_by(User.type_id).having(func.min(User.id>2)).all()
print(res2) # [('lyy3', 3, 3), ('lyy5', 5, 5)]
session.close()