mirror of
https://github.com/Cccc-owo/CheckInApp.git
synced 2026-06-17 14:06:28 +00:00
d4d6f87730
BREAKING CHANGE: root backend/frontend directories and old run/manage entrypoints were removed. Use apps/backend, apps/frontend, and python main.py commands instead.
125 lines
3.7 KiB
Python
125 lines
3.7 KiB
Python
from datetime import datetime
|
||
from typing import Optional
|
||
import logging
|
||
import jwt as pyjwt
|
||
from fastapi import Depends, HTTPException, Header, status
|
||
from sqlalchemy.orm import Session
|
||
from backend.models import get_db, User
|
||
from backend.utils.jwt import JWTManager
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
async def get_current_user(
|
||
authorization: Optional[str] = Header(None),
|
||
db: Session = Depends(get_db)
|
||
) -> User:
|
||
"""
|
||
获取当前用户(使用 JWT 认证)
|
||
|
||
认证说明:
|
||
1. 网站登录使用 JWT token(存储在前端,21天过期)
|
||
2. 打卡业务使用 authorization token(存储在数据库 User.authorization)
|
||
3. JWT 过期后需要重新登录,但打卡 token 过期不影响网站使用
|
||
"""
|
||
if not authorization:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="未提供认证信息",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
|
||
# 移除 "Bearer " 前缀(如果存在)
|
||
token = authorization.replace("Bearer ", "") if authorization.startswith("Bearer ") else authorization
|
||
|
||
try:
|
||
# 验证 JWT token
|
||
payload = JWTManager.verify_token(token)
|
||
user_id = payload.get("user_id")
|
||
|
||
if not user_id:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="Token 格式错误",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
|
||
# 从数据库获取用户
|
||
user = db.query(User).filter(User.id == user_id).first()
|
||
|
||
if not user:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="用户不存在",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
|
||
return user
|
||
|
||
except pyjwt.ExpiredSignatureError:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="登录已过期,请重新登录",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
except pyjwt.InvalidTokenError:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="无效的认证信息",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"认证失败: {str(e)}")
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="认证失败",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
|
||
|
||
async def require_approved_user(
|
||
current_user: User = Depends(get_current_user)
|
||
) -> User:
|
||
"""
|
||
要求用户已通过审批
|
||
"""
|
||
if not current_user.is_approved:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_403_FORBIDDEN,
|
||
detail="您的账户正在等待管理员审批,请耐心等待(24小时内)"
|
||
)
|
||
|
||
return current_user
|
||
|
||
|
||
async def get_current_admin_user(
|
||
current_user: User = Depends(require_approved_user)
|
||
) -> User:
|
||
"""
|
||
获取当前管理员用户
|
||
验证用户是否具有管理员权限
|
||
"""
|
||
if current_user.role != "admin":
|
||
raise HTTPException(
|
||
status_code=status.HTTP_403_FORBIDDEN,
|
||
detail="权限不足,需要管理员权限"
|
||
)
|
||
return current_user
|
||
|
||
|
||
async def get_optional_user(
|
||
authorization: Optional[str] = Header(None),
|
||
db: Session = Depends(get_db)
|
||
) -> Optional[User]:
|
||
"""
|
||
可选的用户认证
|
||
如果提供了 Token 则返回用户,否则返回 None
|
||
"""
|
||
if not authorization:
|
||
return None
|
||
|
||
try:
|
||
return await get_current_user(authorization, db)
|
||
except HTTPException:
|
||
return None
|