mirror of
https://github.com/Cccc-owo/CheckInApp.git
synced 2026-06-17 05:56:29 +00:00
827c9198ae
- Updated Vite configuration to manually chunk Ant Design Vue for improved dependency management. - Added a comprehensive migration testing checklist for transitioning from Element Plus 2.13.0 to Ant Design Vue 4.x, covering various components and functionalities.
146 lines
4.9 KiB
Python
146 lines
4.9 KiB
Python
from datetime import datetime
|
||
from typing import Optional
|
||
import logging
|
||
from fastapi import Depends, HTTPException, Header, status
|
||
from sqlalchemy.orm import Session
|
||
from backend.models import get_db, User
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
async def get_current_user(
|
||
authorization: Optional[str] = Header(None),
|
||
db: Session = Depends(get_db)
|
||
) -> User:
|
||
"""
|
||
获取当前用户
|
||
支持两种认证方式:
|
||
1. Token 认证(QQ 扫码登录)
|
||
2. User ID 认证(密码登录,格式:user_id:xxx)
|
||
"""
|
||
if not authorization:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="未提供认证信息",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
|
||
# 移除 "Bearer " 前缀(如果存在)
|
||
token = authorization.replace("Bearer ", "") if authorization.startswith("Bearer ") else authorization
|
||
|
||
# 检查是否为 user_id 格式的认证(用于密码登录)
|
||
if token.startswith("user_id:"):
|
||
user_id_str = token.replace("user_id:", "")
|
||
try:
|
||
user_id = int(user_id_str)
|
||
user = db.query(User).filter(User.id == user_id).first()
|
||
|
||
if not user:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="用户不存在",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
|
||
# 用户ID认证成功,检查是否设置了密码
|
||
has_password = bool(user.password_hash)
|
||
if not has_password:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="该账户未设置密码,请使用扫码登录",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
|
||
# 密码登录的用户可以访问,无需检查 Token
|
||
return user
|
||
|
||
except ValueError:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="无效的用户ID格式",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
|
||
# Token 认证(原有逻辑)
|
||
# 从数据库查询用户
|
||
user = db.query(User).filter(User.authorization == token).first()
|
||
|
||
if not user:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="无效的认证信息",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
|
||
# 检查 Token 是否过期
|
||
if user.jwt_exp and user.jwt_exp != "0":
|
||
try:
|
||
exp_timestamp = int(user.jwt_exp)
|
||
current_timestamp = int(datetime.now().timestamp())
|
||
if current_timestamp > exp_timestamp:
|
||
# 如果用户设置了密码,允许继续使用(Token 过期但不强制退出)
|
||
has_password = bool(user.password_hash)
|
||
if has_password:
|
||
# Token 过期但有密码,允许访问,但在响应头中添加警告
|
||
# 注意:这里不抛出异常,让用户继续使用
|
||
pass
|
||
else:
|
||
# 没有密码的用户,Token 过期必须重新扫码登录
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="Token 已过期,请重新扫码登录",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
except ValueError as e:
|
||
# jwt_exp 格式不正确,记录警告后跳过 Token 过期验证
|
||
logger.warning(f"用户 {user.id} ({user.alias}) 的 jwt_exp 格式不正确: {user.jwt_exp}, 错误: {e}")
|
||
|
||
return user
|
||
|
||
|
||
async def require_approved_user(
|
||
current_user: User = Depends(get_current_user)
|
||
) -> User:
|
||
"""
|
||
要求用户已通过审批
|
||
"""
|
||
if not current_user.is_approved:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_403_FORBIDDEN,
|
||
detail="您的账户正在等待管理员审批,请耐心等待(24小时内)"
|
||
)
|
||
|
||
return current_user
|
||
|
||
|
||
async def get_current_admin_user(
|
||
current_user: User = Depends(require_approved_user)
|
||
) -> User:
|
||
"""
|
||
获取当前管理员用户
|
||
验证用户是否具有管理员权限
|
||
"""
|
||
if current_user.role != "admin":
|
||
raise HTTPException(
|
||
status_code=status.HTTP_403_FORBIDDEN,
|
||
detail="权限不足,需要管理员权限"
|
||
)
|
||
return current_user
|
||
|
||
|
||
async def get_optional_user(
|
||
authorization: Optional[str] = Header(None),
|
||
db: Session = Depends(get_db)
|
||
) -> Optional[User]:
|
||
"""
|
||
可选的用户认证
|
||
如果提供了 Token 则返回用户,否则返回 None
|
||
"""
|
||
if not authorization:
|
||
return None
|
||
|
||
try:
|
||
return await get_current_user(authorization, db)
|
||
except HTTPException:
|
||
return None
|