安全认证
- 登录
- 选择验证方式
在fastapi.security里 有OAuth2方法 OAuth2PasswordBearer方法和OAuth2AuthorizationCodeBearer方法
OAuth2 在请求头的格式:{Authorization: token}
OAuth2PasswordBearer 格式:{Authorization: bearer token}
OAuth2AuthorizationCodeBearer 格式一样,逻辑上有些不同,具体差异在了解中,,
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='token') # 获取请求头里的Authorization值 并且是bearer开头的token
- 登录接口
OAuth2PasswordRequestForm -> 指定了登录字段
username 用户名
password 密码
还有一些其他参数(可不填) scope 作用域,grant_type,client_id,client_secret 这些字段遵循OAuth2规范
@app.post('/token')
async login(form_data: OAuth2PasswordRequestForm = Depends())
# from_data 里包含了username,password等参数
# usernem = from_data.username
# password = from_data.password
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail='Incorrect username or password')
user = UserInDB(**user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail='Incorrect password')
return {'access_token': user.username, 'token_type': 'bearer'} # 必须返回json对象
- 添加验证
@app.get('/items')
async def read_items(current_user: User = Depends(get_current_user)): # 验证程序
# 能直接拿到用户对象,拿不到的情况会在处理token和获取user时被返回异常
return current_user
一些需要的库
pip install python-jose pip install PassLib
生成安全的随机密钥 在终端中运行
openssl rand -hex 32
# 完整的我认为需要的方法
# 验证密码
def verify_password(plain_password: str, hashed_password: str):
return pwd_content.verify(plain_password, hashed_password)
# 获取hash后的密码
def get_password_hash(password: str):
return pwd_content.hash(password)
# 验证用户 通过账号密码
def authenticate_user(db, username, password):
user = get_user(db, username)
if not user:
return None
if not verify_password(password, user.password):
return None
return user
# 获取用户 通过token
async def get_current_user(token=Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
# 获取活动用户
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled
raise HTTPException(status_code=400, detail='Inactive user')
return current_user