feat: optimize Token expiration monitor

This commit is contained in:
2026-01-03 17:02:28 +08:00
parent ed8019c14b
commit 523da50123
6 changed files with 335 additions and 8 deletions
+2
View File
@@ -16,6 +16,8 @@ class User(Base):
password_hash = Column(String(200), nullable=True, comment="密码哈希(bcrypt加密)")
authorization = Column(Text, nullable=True, comment="当前有效的 QQ Token")
jwt_exp = Column(String(20), default="0", comment="Token 过期时间戳")
token_expiring_notified = Column(Boolean, default=False, nullable=False, comment="Token 即将过期提醒是否已发送(过期前30分钟)")
token_expired_notified = Column(Boolean, default=False, nullable=False, comment="Token 已过期提醒是否已发送(过期后30分钟内)")
role = Column(String(20), default="user", index=True, comment="角色: user/admin")
is_approved = Column(Boolean, default=False, index=True, comment="是否已通过管理员审批")
created_at = Column(DateTime(timezone=True), server_default=func.now(), comment="创建时间")
@@ -0,0 +1,153 @@
"""
为 users 表添加 Token 过期提醒字段的迁移脚本
新增的列:
- token_expiring_notified (BOOLEAN) - Token 即将过期提醒是否已发送(过期前30分钟内)
- token_expired_notified (BOOLEAN) - Token 已过期提醒是否已发送(过期后30分钟内)
这两个字段用于实现两次 Token 过期提醒:
1. 过期前 30 分钟内:提醒用户 Token 即将过期
2. 过期后 30 分钟内:提醒用户 Token 已过期,需要刷新
运行方式:
python backend/scripts/migrate_add_token_expiry_notified.py
venv/Scripts/python.exe backend/scripts/migrate_add_token_expiry_notified.py
"""
import sys
import os
from pathlib import Path
# 添加项目根目录到 Python 路径
project_root = Path(__file__).resolve().parent.parent.parent
sys.path.insert(0, str(project_root))
from sqlalchemy import text, inspect
from backend.models.database import engine
def migrate():
"""执行迁移:添加 token_expiring_notified 和 token_expired_notified 列"""
print("开始迁移:为 users 表添加 Token 过期提醒字段...")
print("=" * 60)
with engine.connect() as conn:
# 检查表结构
inspector = inspect(engine)
columns = [col['name'] for col in inspector.get_columns('users')]
print(f"\n当前表列: {', '.join(columns)}")
# 检查是否已存在新字段
has_expiring = 'token_expiring_notified' in columns
has_expired = 'token_expired_notified' in columns
if has_expiring and has_expired:
print("\n[OK] Token 过期提醒字段已存在,跳过迁移")
return
print(f"\n需要添加的列:")
print(f" - token_expiring_notified (BOOLEAN, DEFAULT False)")
print(f" - token_expired_notified (BOOLEAN, DEFAULT False)")
# SQLite 不支持直接 ALTER TABLE ADD COLUMN with constraints,需要重建表
# 步骤:
# 1. 创建新表(包含两个新字段)
# 2. 复制数据
# 3. 删除旧表
# 4. 重命名新表
print("\n正在重建表结构...")
# 1. 创建新表
conn.execute(text("""
CREATE TABLE users_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
jwt_sub VARCHAR(200) UNIQUE,
alias VARCHAR(50) NOT NULL UNIQUE,
email VARCHAR(100),
password_hash VARCHAR(200),
authorization TEXT,
jwt_exp VARCHAR(20) DEFAULT '0',
token_expiring_notified BOOLEAN NOT NULL DEFAULT 0,
token_expired_notified BOOLEAN NOT NULL DEFAULT 0,
role VARCHAR(20) DEFAULT 'user',
is_approved BOOLEAN DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME
)
"""))
print(" [OK] 创建新表结构")
# 2. 复制数据(为旧数据设置两个字段都为 False)
conn.execute(text("""
INSERT INTO users_new
(id, jwt_sub, alias, email, password_hash, authorization, jwt_exp,
token_expiring_notified, token_expired_notified,
role, is_approved, created_at, updated_at)
SELECT
id, jwt_sub, alias, email, password_hash, authorization, jwt_exp,
0, 0,
role, is_approved, created_at, updated_at
FROM users
"""))
print(" [OK] 复制数据到新表(所有用户的提醒字段默认为 False)")
# 3. 删除旧表
conn.execute(text("DROP TABLE users"))
print(" [OK] 删除旧表")
# 4. 重命名新表
conn.execute(text("ALTER TABLE users_new RENAME TO users"))
print(" [OK] 重命名新表")
# 5. 重建索引
conn.execute(text("""
CREATE INDEX ix_users_jwt_sub ON users(jwt_sub)
"""))
conn.execute(text("""
CREATE INDEX ix_users_alias ON users(alias)
"""))
conn.execute(text("""
CREATE INDEX ix_users_role ON users(role)
"""))
conn.execute(text("""
CREATE INDEX ix_users_is_approved ON users(is_approved)
"""))
conn.execute(text("""
CREATE INDEX ix_users_id ON users(id)
"""))
conn.execute(text("""
CREATE INDEX ix_user_role_approved ON users(role, is_approved)
"""))
print(" [OK] 重建索引")
conn.commit()
print("\n[SUCCESS] 表结构迁移成功!")
print("\n新的表结构:")
inspector = inspect(engine)
new_columns = [col['name'] for col in inspector.get_columns('users')]
print(f" 列: {', '.join(new_columns)}")
if __name__ == "__main__":
try:
migrate()
print("\n" + "=" * 60)
print("[完成] 迁移成功完成!")
print("\n数据库已更新为新架构:")
print(" - 添加了 token_expiring_notified 列(Token 即将过期提醒,过期前30分钟)")
print(" - 添加了 token_expired_notified 列(Token 已过期提醒,过期后30分钟内)")
print(" - 所有现有用户的提醒字段默认为 False")
print("\n提醒逻辑:")
print(" 1. 过期前 30 分钟内:发送\"即将过期\"邮件,设置 token_expiring_notified = True")
print(" 2. 过期后 30 分钟内:发送\"已过期\"邮件,设置 token_expired_notified = True")
print(" 3. 用户刷新 Token 后:重置两个字段为 False")
print("=" * 60)
except Exception as e:
print(f"\n[ERROR] 迁移失败: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
+2
View File
@@ -200,6 +200,8 @@ class AuthService:
user.authorization = pure_token # 存储清理后的 token
user.jwt_exp = jwt_exp
user.token_expiring_notified = False # 重置"即将过期"提醒标志
user.token_expired_notified = False # 重置"已过期"提醒标志
user.updated_at = datetime.now()
db.commit()
db.refresh(user)
+112 -1
View File
@@ -511,7 +511,118 @@ class EmailService:
<p><strong>如何刷新凭证:</strong></p>
<ol style="margin: 10px 0; padding-left: 20px;">
<li>登录系统(扫码或密码登录)</li>
<li>在个人设置中点击"刷新凭证"</li>
<li>在个人设置旁的按钮中进行刷新 Token</li>
<li>使用手机 QQ 扫描二维码完成刷新</li>
</ol>
<p style="text-align: center;">
<a href="{settings.FRONTEND_URL}/login" class="btn">立即登录刷新</a>
</p>
</div>
<div class="footer">
<p>此邮件由系统自动发送,请勿直接回复。</p>
<p>接龙自动打卡系统 © {datetime.now().year}</p>
</div>
</div>
</body>
</html>
"""
return EmailService.send_email([str(user_email)], subject, body_html)
@staticmethod
def notify_token_expired(user: User) -> bool:
"""
通知用户 Token 已过期
Args:
user: 用户对象
Returns:
是否发送成功
"""
user_email = user.email
if user_email is None:
logger.info(f"用户 {user.alias} 未设置邮箱,跳过 Token 已过期通知")
return False
# 构建邮件内容
subject = f"【接龙自动打卡系统】登录凭证已过期 - {user.alias}"
body_html = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
body {{
font-family: Arial, sans-serif;
line-height: 1.6;
color: #333;
}}
.container {{
max-width: 600px;
margin: 0 auto;
padding: 20px;
}}
.header {{
background-color: #dc3545;
color: white;
padding: 20px;
text-align: center;
border-radius: 5px 5px 0 0;
}}
.content {{
background-color: #f9f9f9;
padding: 20px;
border: 1px solid #ddd;
border-radius: 0 0 5px 5px;
}}
.error-box {{
background-color: #f8d7da;
border-left: 4px solid #dc3545;
padding: 15px;
margin: 15px 0;
}}
.footer {{
margin-top: 20px;
text-align: center;
color: #999;
font-size: 12px;
}}
.btn {{
display: inline-block;
padding: 12px 24px;
background-color: #667eea;
color: white;
text-decoration: none;
border-radius: 5px;
margin: 10px 0;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h2>❌ 登录凭证已过期</h2>
</div>
<div class="content">
<p>您好,{user.alias}</p>
<p>您的 QQ 登录凭证已过期,系统已无法自动执行打卡任务。</p>
<div class="error-box">
<strong>⚠️ 重要提示:</strong>
<ul style="margin: 10px 0; padding-left: 20px;">
<li>登录凭证已过期,所有自动打卡任务已暂停</li>
<li>请尽快登录系统刷新凭证以恢复服务</li>
<li>如果您已设置密码,可以使用密码登录后扫码刷新凭证</li>
</ul>
</div>
<p><strong>如何刷新 Token</strong></p>
<ol style="margin: 10px 0; padding-left: 20px;">
<li>登录系统(扫码或密码登录)</li>
<li>在个人设置旁的按钮中进行刷新 Token</li>
<li>使用手机 QQ 扫描二维码完成刷新</li>
</ol>
+39 -5
View File
@@ -174,18 +174,52 @@ def check_token_expiration():
try:
exp_timestamp = int(user.jwt_exp)
# 检查是否在 30 分钟内过期(0 < 剩余时间 < 1800秒)
# 检查 Token 状态并发送对应的提醒
time_until_expiry = exp_timestamp - current_timestamp
# 情况1:Token 即将过期(过期前 30 分钟内,且还未过期)
if 0 < time_until_expiry < 1800: # 30分钟 = 1800秒
# 使用用户账户的邮箱发送通知
if user.email:
if user.email and not user.token_expiring_notified:
logger.info(f"用户 {user.alias} 的 Token 即将过期,发送邮件提醒到 {user.email}...")
from backend.services.email_service import EmailService
jwt_exp_value = user.jwt_exp
jwt_exp_str = str(jwt_exp_value) if jwt_exp_value is not None else "0"
EmailService.notify_token_expiring(user, jwt_exp_str)
notified_count += 1
# 发送"即将过期"邮件
success = EmailService.notify_token_expiring(user, jwt_exp_str)
if success:
user.token_expiring_notified = True
db.commit()
notified_count += 1
logger.info(f"用户 {user.alias} 的 Token 即将过期邮件已发送并标记")
else:
logger.warning(f"用户 {user.alias} 的 Token 即将过期邮件发送失败")
# 情况2:Token 已过期(过期后 30 分钟内)
elif -1800 < time_until_expiry <= 0: # 过期后 30 分钟内
if user.email and not user.token_expired_notified:
logger.info(f"用户 {user.alias} 的 Token 已过期,发送邮件提醒到 {user.email}...")
from backend.services.email_service import EmailService
# 发送"已过期"邮件(可以使用不同的邮件模板或内容)
success = EmailService.notify_token_expired(user)
if success:
user.token_expired_notified = True
db.commit()
notified_count += 1
logger.info(f"用户 {user.alias} 的 Token 已过期邮件已发送并标记")
else:
logger.warning(f"用户 {user.alias} 的 Token 已过期邮件发送失败")
# 情况3Token 正常(剩余时间 > 30 分钟),重置提醒标志
elif time_until_expiry >= 1800:
if user.token_expiring_notified or user.token_expired_notified:
user.token_expiring_notified = False
user.token_expired_notified = False
db.commit()
logger.info(f"用户 {user.alias} 的 Token 已刷新,重置所有提醒标志")
except ValueError:
logger.warning(f"用户 {user.alias} 的 jwt_exp 格式不正确: {user.jwt_exp}")