中间件
创建中间件 使用装饰器 @app.middleware('http')
中间件接收两个参数
- request:Request
- call_next -> call_next接收request参数,此函数会传递request给相应的路径操作,然后返回response
最后返回 response
@app.middleware('http')
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers['X-Process-Time'] = str(process_time)
return response
添加现成中间件,可以使用 app.add_middleware(HTTPSRedirectMiddleware)
HTTPSRedirectMiddleware 是强制所有传入请求必须为https或wss 的中间件
CORSMiddleware 跨域资源共享配置的中间件
CORS(跨域资源共享)
from fastapi.middleware.cors import CORSMiddleware
origins = [
"http://localhost.tiangolo.com",
"https://localhost.tiangolo.com",
"http://localhost",
"http://localhost:8080",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_methods=['*'],
allow_headers=['*'],
)
CORSMiddleware 默认参数是限制性的,因此需要显式的启用特定的来源,方法,或者标头
支持以下参数:
- allow_origins 允许进行跨域请求的来源列表 ['*'] 允许任何来源
- allow_origins_regex 正则匹配允许跨域请求的来源url
- allow_methods 跨域请求允许的http方法列表 默认为['GET']
- allow_headers 跨域请求允许的http标头列表 默认为[] ['*']允许所有标头
- allow_credentials 跨域请求是否支持cookie 默认False
- expose_handers 浏览器可以访问的任何响应头 默认为[]
- max_age 设置浏览器缓存cors响应的最长时间(秒为单位) 默认600
SQL数据库
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
MYSQL_USER = ''
MYSQL_PASS = ''
MYSQL_HOST = ''
MYSQL_PORT = '3306'
MYSQL_DB = ''
SQLALCHEMY_DATABASE_URI = f'mysql+pymysql://{MYSQL_USER}:{MYSQL_PASS}@{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DB}'
#SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db" # 配置数据库url
# SQLALCHEMY_DATABASE_URL = "postgresql://user:password@postgresserver/db"
engine = create_engine(SQLALCHEMY_DATABASE_URI) # 初始化数据库连接
#engine = create_engine(
# SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False} # connect_args={"check_same_thread": False} 仅用于sqlite
#)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) # 会话工厂
Base = declarative_base() # 模型的基类
# 编写model models.py
class User(Base):
__tablename__ = 'user'
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
items = relationship("Item", back_populates="owner")
# 编写序列化器 schemas.py
class User(BaseModel):
id: Optional[int] = None
username: str
email: str
class Config:
orm_mode = True # 配置orm_mode除了可以让pydantic读取字典类型的数据,还支持读取属性数据,比如sqlalchemy模型的数据
# 这样pydantic数据模型就可以兼容sqlalchemy数据模型,可以直接在路径操作函数中直接返回sqlalchemy数据模型
# orm操作
add 添加对象到数据库会话
commit 提交对数据库的更改
refresh 刷新实例(以便包含数据库中的任何新数据,例如生成的id)
def create_user(db: Session, user: schemas.UserCreate):
fake_hashed_password = user.password
db_user = models.User(email=user.email hashed_password=fake_hashed_password)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
# 获取数据库会话 使用完成自动关闭会话
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
create_engine 创建一个带连接池的引擎
创建一个session,连接池会分配一个connection.当session使用后需要显式的调用session.close(),commit()或者session.rollback()来把连接
归还到QueuePool连接池管理并复用连接
在使用create_engine创建引擎时,如果默认不指定连接池设置的话,一般情况下,SQLAlchemy 会使用一个 QueuePool 绑定在新创建的引擎上。并附上合适的连接池参数
create_engine()函数和连接池相关的参数有:
- pool_recycle, 默认-1, 推荐设置7200,即如果connection空闲来7200秒,自动重新获取,防止connection被db server关闭
- pool_size 连接数大小,默认为5,正式环境该数值太小,需根据实际情况调整
- max_overflow 超出pool_size后可允许的最大连接数,默认10, 这 10 个连接在使用过后,不放在 pool 中,而是被真正关闭。
- pool_timeout 获取连接的超时阈值,默认30秒
不使用连接池: 添加参数 poolclass=NullPool 在create_engine中添加此参数
sessionmaker创建了一个工厂类,在创建这个工厂类时我们配置了参数绑定了引擎,将其赋值给SessionLocal,
每次实例化SessionLocal都会创建一个绑定了引擎的SessionLocal,这样这个SessionLocal在访问数据库时都会通过这个绑定好的引擎来获取连接资源
pydantic模型可以传入额外参数
Item(**item.dict(), owner_id=user_id)
创建数据库表
Base.metadata.create_all(bind=engine)
创建依赖 -> 一个能自动释放的session方法, sessionLocal在单个请求中使用,请求完成后被关闭
python3.6版本需要安装
pip install async-exit-stack async-generator
def get_db():
db = sessionLocal()
try:
yield db
finally:
db.close()
# 使用它:
def read_items(db: Session = Depends(get_db)):
...
# 如果使用不了yield也装不了插件
可以使用中间件替代
@app.middleware('http')
async def db_session_middleware(request: Request, call_next):
response = Response("Internal server error", status_code=500)
try:
request.state.db = SessionLocal()
response = await call_next(request)
finally:
request.state.db.close()
request response
def get_db(request: Request):
return request.state.db
最好使用yield
db 参数实际上是SessionLocal类型,其实也是代理sqlalchemy的session, 声明成session让编译器能提供更好的支持 类型声明不影响实际对象
路由
APIRouter
from fastapi import APIRouter
router = APIRouter()
@router.get('item')
async def read_users():
xxxxx
如果有相同的路径需要集合在一个文件夹 如: /items/, /items/{item_id}/, 这类的相同的/items/路径
可以使用
router = APIRouter(
prefix='/items', # 统一的路径
tags=['items'] # 标签 在文档中有作用
dependencies=[Depends(get_token_header等等))] # 注入一些无返回值的依赖
response={404: {'description': 'not found'}}, # 特定的response返回
)
之后的路径操作:
@router.get('/') # /items/
async def xxx():
xxx
主文件的配置:
app.include_router(users.router)
app.include_router(
admin.router,
prefix="/admin",
tags=["admin"],
dependencies=[Depends(get_token_header)],
responses={418: {"description": "I'm a teapot"}},
)
可以嵌套include_router