feat: implement JWT auth and optimize token validation

- Separate JWT login (21d) from check-in token
- Unify check-in token validation with verify_checkin_authorization()
- Update API docs for dual-token architecture
This commit is contained in:
2026-01-05 23:02:50 +08:00
parent b32b53853a
commit a9b141fc69
13 changed files with 464 additions and 336 deletions
+39 -60
View File
@@ -1,9 +1,11 @@
from datetime import datetime
from typing import Optional
import logging
import jwt as pyjwt
from fastapi import Depends, HTTPException, Header, status
from sqlalchemy.orm import Session
from backend.models import get_db, User
from backend.utils.jwt import JWTManager
logger = logging.getLogger(__name__)
@@ -13,10 +15,12 @@ async def get_current_user(
db: Session = Depends(get_db)
) -> User:
"""
获取当前用户
支持两种认证方式:
1. Token 认证(QQ 扫码登录)
2. User ID 认证(密码登录,格式:user_id:xxx
获取当前用户(使用 JWT 认证)
认证说明:
1. 网站登录使用 JWT token(存储在前端,21天过期
2. 打卡业务使用 authorization token(存储在数据库 User.authorization
3. JWT 过期后需要重新登录,但打卡 token 过期不影响网站使用
"""
if not authorization:
raise HTTPException(
@@ -28,74 +32,49 @@ async def get_current_user(
# 移除 "Bearer " 前缀(如果存在)
token = authorization.replace("Bearer ", "") if authorization.startswith("Bearer ") else authorization
# 检查是否为 user_id 格式的认证(用于密码登录)
if token.startswith("user_id:"):
user_id_str = token.replace("user_id:", "")
try:
user_id = int(user_id_str)
user = db.query(User).filter(User.id == user_id).first()
try:
# 验证 JWT token
payload = JWTManager.verify_token(token)
user_id = payload.get("user_id")
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户不存在",
headers={"WWW-Authenticate": "Bearer"},
)
# 用户ID认证成功,检查是否设置了密码
has_password = bool(user.password_hash)
if not has_password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="该账户未设置密码,请使用扫码登录",
headers={"WWW-Authenticate": "Bearer"},
)
# 密码登录的用户可以访问,无需检查 Token
return user
except ValueError:
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的用户ID格式",
detail="Token 格式错误",
headers={"WWW-Authenticate": "Bearer"},
)
# Token 认证(原有逻辑)
# 从数据库查询用户
user = db.query(User).filter(User.authorization == token).first()
# 从数据库获取用户
user = db.query(User).filter(User.id == user_id).first()
if not user:
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户不存在",
headers={"WWW-Authenticate": "Bearer"},
)
return user
except pyjwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="登录已过期,请重新登录",
headers={"WWW-Authenticate": "Bearer"},
)
except pyjwt.InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证信息",
headers={"WWW-Authenticate": "Bearer"},
)
# 检查 Token 是否过期
if user.jwt_exp and user.jwt_exp != "0":
try:
exp_timestamp = int(user.jwt_exp)
current_timestamp = int(datetime.now().timestamp())
if current_timestamp > exp_timestamp:
# 如果用户设置了密码,允许继续使用(Token 过期但不强制退出)
has_password = bool(user.password_hash)
if has_password:
# Token 过期但有密码,允许访问,但在响应头中添加警告
# 注意:这里不抛出异常,让用户继续使用
pass
else:
# 没有密码的用户,Token 过期必须重新扫码登录
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token 已过期,请重新扫码登录",
headers={"WWW-Authenticate": "Bearer"},
)
except ValueError as e:
# jwt_exp 格式不正确,记录警告后跳过 Token 过期验证
logger.warning(f"用户 {user.id} ({user.alias}) 的 jwt_exp 格式不正确: {user.jwt_exp}, 错误: {e}")
return user
except Exception as e:
logger.error(f"认证失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="认证失败",
headers={"WWW-Authenticate": "Bearer"},
)
async def require_approved_user(