一、安装三方库
pip install sqlalchemy
pip install pymysql
二、创建表
为了更好的实现项目代码管理,可以将数据库操作放到其他文件中进行,而不是全部代码都堆积到
main.py
中
1、根据截图创建对应文件
-
database.py
:用于创建可调用的数据库引擎使用 -
models.py
:用于定义数据表模型 -
__init__.py
: 空文件,表示mysql
是一个自定义的库包,可以在其他文件中直接使用import
引用到
database.py
内容如下:
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 定义数据库连接的url
# 格式: 数据库类型+数据库驱动://用户名:密码@Ip:端口/数据库名
# 注意:数据库需要先前置创建好,不会自动创建数据库
SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:123456@127.0.0.1:3306/fastapi_data"
# 使用url创建一个数据库引擎
engine = create_engine(SQLALCHEMY_DATABASE_URL)
# 创建数据库库会话
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 返回一个base基类,用于继承
Base = declarative_base()
models.py
内容如下:
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
# 数据库模型
class User(Base):
# 创建一个名为users的表
__tablename__ = "users"
# 字段
id = Column(Integer, primary_key=True, index=True)
name = Column(String(20))
main.py
内容如下:
from fastapi import FastAPI
from mysql import models, database
app = FastAPI()
@app.get("/")
def root():
# 创建数据表
models.Base.metadata.create_all(bind=database.engine)
return {"create table success"}
然后启动服务,浏览器访问root
节点
2、验证数据库是否创建成功
然后使用下述命令查表是否创建成功
# 连接数据库,会车后需要输入root用户密码再回车才能连上数据库
mysql -uroot -p
# 显示出当前有哪些库
show databases;
# 使用 fastApiData 库
use fastapi_data;
# 查看数据库下有哪些表
show tables;
# 查看users表下有哪些字段
show Columns from users;
当然,也可以使用一些可视化的数据库查看工具去查看,省去命令记忆
三、增删改查
在之前的基础上对main.py
进行如下修改:
from fastapi import FastAPI, Depends
from mysql import models, database
app = FastAPI()
# 创建数据库表
@app.get("/")
def root():
# 创建数据表
models.Base.metadata.create_all(bind=database.engine)
return {"create table success"}
# 新增数据, 这里为了演示用的get请求,一般用post
@app.get('/add/{userName}')
def add_user(userName):
userInfo = models.User(name=userName) # 创建新增实例
db = database.SessionLocal() # 获取数据库会话
db.add(userInfo) # 新增
db.commit() # 提交
db.refresh(userInfo) # 刷新数据
return f"数据库新增数据成功, 用户名: {userName}"
# 查询数据
@app.get('/search/{searchParm}')
def search_data(searchParm):
db = database.SessionLocal() # 获取数据库会话
if searchParm == 'all':
users = db.query(models.User).all() # 查看所有用户
return users
else:
user = db.query(models.User).filter(models.User.id == int(searchParm)).first() # 使用过滤条件查看单个用户
return user
# 修改数据,这里为了演示用了get请求,一般用put
@app.get('/modify/{oldName}&{newName}')
def modify_data(oldName, newName):
db = database.SessionLocal() # 获取数据库会话
existing_user = db.query(models.User).filter(models.User.name == oldName).first()
existing_user.name = newName
db.commit()
db.refresh(existing_user)
return existing_user
# 删除数据,这边为了演示用了get请求,一般用delete
@app.get('/delete/{userName}')
def delete_data(userName):
db = database.SessionLocal() # 获取数据库会话
db.query(models.User).filter(models.User.name == userName).delete()
db.commit()
return f"名为: {userName} 的用户已经被删除"