peewee是什么
peewee是Python一个ORM框架,相比于著名的SQLAlchemy,peewee更为轻量,且更加简单易上手。
peewee原生支持sqlite、mysql以及postgresql。
学习资源
数据建模
示例
from peewee import *
import datetime
from playhouse.sqlite_ext import SqliteExtDatabase
db = MySQLDatabase(host='localhost', user='playground', passwd='playground', database='playground')
# db = SqliteExtDatabase('my_database.db')
class BaseModel(Model):
class Meta:
database = db
# 用户表
class User(BaseModel):
username = CharField(unique=True)
# Twitter表
class Tweet(BaseModel):
user = ForeignKeyField(User, related_name='tweets')
message = TextField()
created_date = DateTimeField(default=datetime.datetime.now)
is_published = BooleanField(default=True)
支持的字段类型
- CharField varchar varchar varchar
- FixedCharField char char char
- TextField text text longtext
- DateTimeField datetime timestamp datetime
- IntegerField integer integer integer
- BooleanField integer boolean bool
- FloatField real real real
- DoubleField real double precision double precision
- BigIntegerField integer bigint bigint
- SmallIntegerField integer smallint smallint
- DecimalField decimal numeric numeric
- PrimaryKeyField integer serial integer
- ForeignKeyField integer integer integer
- DateField date date date
- TimeField time time time
- TimestampField integer integer integer
- BlobField blob bytea blob
- UUIDField text uuid varchar(40)
- BareField untyped not supported not supported
支持的参数
- null = False – boolean indicating whether null values are allowed to be stored
- index = False – boolean indicating whether to create an index on this column
- unique = False – boolean indicating whether to create a unique index on this column. See - also adding composite indexes.
- verbose_name = None – string representing the “user-friendly” name of this field
- help_text = None – string representing any helpful text for this field
- db_column = None – string representing the underlying column to use if different, useful - for legacy databases
- default = None – any value to use as a default for uninitialized models
- choices = None – an optional iterable containing 2-tuples of value, display
- primary_key = False – whether this field is the primary key for the table
- sequence = None – sequence to populate field (if backend supports it)
- constraints = None - a list of one or more constraints, e.g. [Check('price > 0')]
- schema = None – optional name of the schema to use, if your db supports this.
特殊的参数
- CharField max_length
- FixedCharField max_length
- DateTimeField formats
- DateField formats
- TimeField formats
- TimestampField resolution, utc
- DecimalField max_digits, decimal_places, auto_round, rounding
- ForeignKeyField rel_model, related_name, to_field, on_delete, on_update, extra
- BareField coerce
连接数据库并生成表
def create_table():
db.connect()
db.create_tables([User, Tweet])
常用操作
插入记录
charlie = User.create(username='charlie')
huey = User(username='huey')
huey.save()
# 不需要设置 `is_published` 或 `created_date`,保存时会自动使用模型中指定的默认值
Tweet.create(user=charlie, message='My first tweet')
查询单条记录
user = User.get(User.id == 1)
user = User.get(User.username == 'charlie')
查询多条记录
for user in User.select():
print(user.username)
多条件查询
Tweet.select().where(Tweet.user == user, Tweet.is_published == True)
in查询,使用"<<"来代入多个条件
usernames = ['charlie', 'huey', 'mickey']
users = User.select().where(User.username << usernames)
join查询
tweets = (Tweet
.select()
.join(User)
.where(User.username << usernames))
count
Tweet.select().where(Tweet.id > 50).count()
count今天发布了几条Twitter
tweets_today = (Tweet
.select()
.where(
(Tweet.created_date >= datetime.date.today()) &
(Tweet.is_published == True))
.count())
排序及分页
User.select().order_by(User.username).paginate(3, 20)
Tweet.select().join(User).order_by(User.username, Tweet.created_date.desc())
join和gruopby,将用户按Twitter数排序
tweet_ct = fn.Count(Tweet.id)
users = (User
.select(User, tweet_ct.alias('ct'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User)
.order_by(tweet_ct.desc()))
更新
User.update(username='foo').where(User.id == 2).execute()
user = User.get(User.id == 1)
user.username='bar'
user.save()
删除
Tweet.delete().where(id==1).execute()
user = User.get(User.id == 8)
user.delete_instance()