feat: migrate from Element Plus to Ant Design Vue and update Vite configuration for better dependency management

- Updated Vite configuration to manually chunk Ant Design Vue for improved dependency management.
- Added a comprehensive migration testing checklist for transitioning from Element Plus 2.13.0 to Ant Design Vue 4.x, covering various components and functionalities.
This commit is contained in:
2026-01-03 01:38:38 +08:00
parent 42a1046750
commit 827c9198ae
57 changed files with 5517 additions and 2982 deletions
-1
View File
@@ -121,7 +121,6 @@ backend/
### 管理员 API (`/api/admin`)
- `POST /api/admin/batch_toggle_active` - 批量启用/禁用用户
- `POST /api/admin/batch_check_in` - 批量触发打卡
- `GET /api/admin/logs` - 获取系统日志
- `GET /api/admin/stats` - 获取系统统计
+21
View File
@@ -100,6 +100,27 @@ async def get_qrcode_status(
)
@router.delete("/qrcode_session/{session_id}", response_model=dict, summary="取消二维码登录会话")
async def cancel_qrcode_session(
session_id: str
):
"""
取消二维码登录会话
- **session_id**: 会话 ID
用于用户关闭二维码对话框时,终止后台的 Selenium 进程
"""
try:
result = AuthService.cancel_qrcode_session(session_id)
return result
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"取消会话失败: {str(e)}"
)
@router.post("/verify_token", response_model=dict, summary="验证 Token 有效性")
async def verify_token(
request: TokenVerifyRequest,
+8 -2
View File
@@ -2,6 +2,7 @@ from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
from datetime import datetime, timedelta
from pydantic import BaseModel, Field
from backend.models import get_db, User
from backend.schemas.task import TaskCreate, TaskUpdate, TaskResponse
@@ -11,6 +12,11 @@ from backend.dependencies import get_current_user
router = APIRouter()
class CronValidateRequest(BaseModel):
"""Cron 表达式验证请求"""
cron_expression: str = Field(..., min_length=9, description="Crontab 表达式")
@router.post("/", response_model=TaskResponse, status_code=status.HTTP_201_CREATED, summary="创建打卡任务")
async def create_task(
task_data: TaskCreate,
@@ -181,7 +187,7 @@ async def toggle_task(
@router.post("/validate-cron", summary="验证 Crontab 表达式")
async def validate_cron_expression(request: dict):
async def validate_cron_expression(request: CronValidateRequest):
"""
验证 Crontab 表达式并预览下一个执行时间
@@ -199,7 +205,7 @@ async def validate_cron_expression(request: dict):
"description": "每天 20:00"
}
"""
cron_expr = request.get('cron_expression', '').strip()
cron_expr = request.cron_expression.strip()
if not cron_expr:
raise HTTPException(
+35 -2
View File
@@ -138,8 +138,11 @@ async def get_current_user_token_status(
minutes_until_expiry = (exp_timestamp - current_timestamp) // 60
expiring_soon = minutes_until_expiry <= 30
except ValueError:
pass
except ValueError as e:
# jwt_exp 格式不正确,记录警告
import logging
logger = logging.getLogger(__name__)
logger.warning(f"用户 {current_user.id} ({current_user.alias}) 的 jwt_exp 格式不正确: {current_user.jwt_exp}, 错误: {e}")
return {
"is_valid": is_valid,
@@ -256,7 +259,37 @@ async def update_user(
)
try:
# 获取更新前的用户状态
old_user = UserService.get_user_by_id(user_id, db)
if not old_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"用户 ID {user_id} 不存在"
)
# 保存更新前的审批状态 (先读取后转换为 Python bool)
old_approved_value = old_user.is_approved
was_approved_before = True if old_approved_value else False
# 更新用户信息
user = UserService.update_user(user_id, user_data, db)
# 检查是否需要发送审批通过邮件
new_approved_value = user.is_approved
is_approved_now = True if new_approved_value else False
is_admin = (current_user.role == "admin")
needs_notification = (is_admin and (not was_approved_before) and is_approved_now)
if needs_notification:
try:
from backend.services.email_service import EmailService
EmailService.notify_user_approved(user)
except Exception as e:
# 邮件发送失败不影响审批操作
import logging
logging.getLogger(__name__).error(f"发送审批通过邮件失败: {e}")
return user
except ValueError as e:
raise HTTPException(
+3
View File
@@ -51,6 +51,9 @@ class Settings(BaseSettings):
SMTP_SENDER_PASSWORD: str = ""
SMTP_USE_SSL: bool = True
# 前端 URL 配置(用于邮件中的链接)
FRONTEND_URL: str = "http://localhost:3000"
# 定时任务配置(可通过环境变量配置)
TOKEN_CHECK_INTERVAL_MINUTES: int = 30 # Token 检查间隔(分钟)
SESSION_CLEANUP_INTERVAL_HOURS: int = 24 # 会话清理间隔(小时)
+56 -8
View File
@@ -1,9 +1,12 @@
from datetime import datetime
from typing import Optional
import logging
from fastapi import Depends, HTTPException, Header, status
from sqlalchemy.orm import Session
from backend.models import get_db, User
logger = logging.getLogger(__name__)
async def get_current_user(
authorization: Optional[str] = Header(None),
@@ -11,7 +14,9 @@ async def get_current_user(
) -> User:
"""
获取当前用户
从 Authorization header 中验证 Token 并返回用户
支持两种认证方式:
1. Token 认证(QQ 扫码登录)
2. User ID 认证(密码登录,格式:user_id:xxx
"""
if not authorization:
raise HTTPException(
@@ -23,6 +28,40 @@ async def get_current_user(
# 移除 "Bearer " 前缀(如果存在)
token = authorization.replace("Bearer ", "") if authorization.startswith("Bearer ") else authorization
# 检查是否为 user_id 格式的认证(用于密码登录)
if token.startswith("user_id:"):
user_id_str = token.replace("user_id:", "")
try:
user_id = int(user_id_str)
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户不存在",
headers={"WWW-Authenticate": "Bearer"},
)
# 用户ID认证成功,检查是否设置了密码
has_password = bool(user.password_hash)
if not has_password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="该账户未设置密码,请使用扫码登录",
headers={"WWW-Authenticate": "Bearer"},
)
# 密码登录的用户可以访问,无需检查 Token
return user
except ValueError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的用户ID格式",
headers={"WWW-Authenticate": "Bearer"},
)
# Token 认证(原有逻辑)
# 从数据库查询用户
user = db.query(User).filter(User.authorization == token).first()
@@ -39,13 +78,22 @@ async def get_current_user(
exp_timestamp = int(user.jwt_exp)
current_timestamp = int(datetime.now().timestamp())
if current_timestamp > exp_timestamp:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token 已过期,请重新登录",
headers={"WWW-Authenticate": "Bearer"},
)
except ValueError:
pass # jwt_exp 格式不正确,跳过验证
# 如果用户设置了密码,允许继续使用(Token 过期但不强制退出)
has_password = bool(user.password_hash)
if has_password:
# Token 过期但有密码,允许访问,但在响应头中添加警告
# 注意:这里不抛出异常,让用户继续使用
pass
else:
# 没有密码的用户,Token 过期必须重新扫码登录
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token 已过期,请重新扫码登录",
headers={"WWW-Authenticate": "Bearer"},
)
except ValueError as e:
# jwt_exp 格式不正确,记录警告后跳过 Token 过期验证
logger.warning(f"用户 {user.id} ({user.alias}) 的 jwt_exp 格式不正确: {user.jwt_exp}, 错误: {e}")
return user
-1
View File
@@ -18,7 +18,6 @@ class User(Base):
jwt_exp = Column(String(20), default="0", comment="Token 过期时间戳")
role = Column(String(20), default="user", index=True, comment="角色: user/admin")
is_approved = Column(Boolean, default=False, index=True, comment="是否已通过管理员审批")
registered_ip = Column(String(50), nullable=True, comment="注册时的 IP 地址")
created_at = Column(DateTime(timezone=True), server_default=func.now(), comment="创建时间")
updated_at = Column(DateTime(timezone=True), onupdate=func.now(), comment="更新时间")
+3 -2
View File
@@ -12,6 +12,7 @@ class UserCreate(UserBase):
"""创建用户 Schema(管理员手动创建,只需要别名)"""
role: Optional[str] = Field("user", description="角色: user/admin")
email: Optional[str] = Field(None, description="邮箱地址")
is_approved: Optional[bool] = Field(True, description="是否已审批(默认已审批)")
class UserUpdate(BaseModel):
@@ -36,7 +37,7 @@ class UserResponse(BaseModel):
"""用户响应 Schema"""
id: int
alias: str
jwt_sub: str
jwt_sub: Optional[str] = None
role: str
is_approved: bool
jwt_exp: str
@@ -58,7 +59,7 @@ class TokenStatus(BaseModel):
"""Token 状态 Schema"""
is_valid: bool
jwt_exp: str
jwt_sub: str
jwt_sub: Optional[str] = None
expires_at: Optional[int] = None # Unix 时间戳(秒)
days_until_expiry: Optional[int] = None
expiring_soon: bool = False # 是否即将过期(30分钟内)
@@ -1,72 +0,0 @@
"""
数据库迁移脚本:为 task_templates 表添加 parent_id 字段
运行方法:
python backend/scripts/migrate_add_parent_id_to_templates.py
"""
import sys
import os
from pathlib import Path
# 设置 UTF-8 编码输出(Windows 兼容)
if sys.platform == "win32":
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
# 添加项目根目录到 Python 路径
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
from sqlalchemy import text
from backend.models.database import engine, SessionLocal
def migrate():
"""为 task_templates 表添加 parent_id 字段"""
print("=" * 60)
print("开始数据库迁移:添加 parent_id 字段到 task_templates 表")
print("=" * 60)
db = SessionLocal()
try:
# 检查字段是否已存在
result = db.execute(text(
"SELECT COUNT(*) FROM pragma_table_info('task_templates') WHERE name='parent_id'"
))
field_exists = result.fetchone()[0] > 0
if field_exists:
print("⚠️ parent_id 字段已存在,跳过迁移")
return
# 添加 parent_id 字段
print("📝 正在添加 parent_id 字段...")
db.execute(text(
"ALTER TABLE task_templates ADD COLUMN parent_id INTEGER"
))
db.commit()
print("✅ parent_id 字段添加成功")
# 创建外键约束(SQLite 不支持直接添加外键,需要重建表)
print("\n📝 注意:SQLite 不支持直接添加外键约束")
print(" 如需外键约束,请重建表或在下次完整迁移时处理")
print("\n" + "=" * 60)
print("✅ 数据库迁移完成!")
print("=" * 60)
except Exception as e:
print(f"\n❌ 迁移失败: {str(e)}")
db.rollback()
import traceback
traceback.print_exc()
sys.exit(1)
finally:
db.close()
if __name__ == "__main__":
migrate()
@@ -1,57 +0,0 @@
"""
添加 payload_config 字段到 check_in_tasks 表的迁移脚本
运行方式:
python backend/scripts/migrate_add_payload_config.py
.venv/Scripts/python.exe backend/scripts/migrate_add_payload_config.py
"""
import sys
import os
from pathlib import Path
# 添加项目根目录到 Python 路径
project_root = Path(__file__).resolve().parent.parent.parent
sys.path.insert(0, str(project_root))
from sqlalchemy import text
from backend.models.database import engine
def migrate():
"""执行迁移"""
print("开始迁移:添加 payload_config 字段...")
with engine.connect() as conn:
# 检查字段是否已存在
result = conn.execute(text("PRAGMA table_info(check_in_tasks)"))
columns = [row[1] for row in result]
if 'payload_config' in columns:
print("[OK] payload_config 字段已存在,跳过迁移")
return
# 添加 payload_config 字段(JSON 文本,存储完整的 payload 配置)
print("添加 payload_config 字段...")
conn.execute(text("""
ALTER TABLE check_in_tasks
ADD COLUMN payload_config TEXT DEFAULT '{}' NOT NULL
"""))
conn.commit()
print("[OK] payload_config 字段添加成功")
print("\n注意:现有任务的 payload_config 默认为空 JSON {}")
print(" Worker 将使用默认的固定字段值。")
print(" 新创建的任务将从模板继承完整的 payload 配置。")
if __name__ == "__main__":
try:
migrate()
print("\n[SUCCESS] 迁移完成!")
except Exception as e:
print(f"\n[ERROR] 迁移失败: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
@@ -1,132 +0,0 @@
"""
删除 check_in_tasks 表中不再需要的旧列的迁移脚本
删除的列:
- signature (VARCHAR) - 已在 payload_config 中
- texts (VARCHAR) - 已在 payload_config 中
- values (TEXT) - 已在 payload_config 中
- thread_id (VARCHAR) - 已在 payload_config 的 ThreadId 中
- email (VARCHAR) - 从 user 表的 email 字段获取
新架构只保留:
- id, user_id, payload_config, name, is_active, created_at, updated_at
运行方式:
python backend/scripts/migrate_remove_old_columns.py
venv/Scripts/python.exe backend/scripts/migrate_remove_old_columns.py
"""
import sys
import os
from pathlib import Path
# 添加项目根目录到 Python 路径
project_root = Path(__file__).resolve().parent.parent.parent
sys.path.insert(0, str(project_root))
from sqlalchemy import text, inspect
from backend.models.database import engine
def migrate():
"""执行迁移:删除旧列"""
print("开始迁移:删除 check_in_tasks 表中的旧列...")
print("将删除的列: signature, texts, values, thread_id, email")
print("=" * 60)
with engine.connect() as conn:
# 检查表结构
inspector = inspect(engine)
columns = [col['name'] for col in inspector.get_columns('check_in_tasks')]
print(f"\n当前表列: {', '.join(columns)}")
old_columns = ['signature', 'texts', 'values', 'thread_id', 'email']
columns_to_remove = [col for col in old_columns if col in columns]
if not columns_to_remove:
print("\n[OK] 旧列已被删除,跳过迁移")
return
print(f"\n需要删除的列: {', '.join(columns_to_remove)}")
# SQLite 不支持直接 DROP COLUMN,需要重建表
# 步骤:
# 1. 创建新表(只包含需要的列)
# 2. 复制数据
# 3. 删除旧表
# 4. 重命名新表
print("\n正在重建表结构...")
# 1. 创建新表
conn.execute(text("""
CREATE TABLE check_in_tasks_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
payload_config TEXT NOT NULL DEFAULT '{}',
name VARCHAR(100) DEFAULT '',
is_active BOOLEAN DEFAULT 1,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
)
"""))
print(" [OK] 创建新表结构")
# 2. 复制数据(只复制保留的列)
conn.execute(text("""
INSERT INTO check_in_tasks_new
(id, user_id, payload_config, name, is_active, created_at, updated_at)
SELECT
id, user_id, payload_config, name, is_active, created_at, updated_at
FROM check_in_tasks
"""))
print(" [OK] 复制数据到新表")
# 3. 删除旧表
conn.execute(text("DROP TABLE check_in_tasks"))
print(" [OK] 删除旧表")
# 4. 重命名新表
conn.execute(text("ALTER TABLE check_in_tasks_new RENAME TO check_in_tasks"))
print(" [OK] 重命名新表")
# 5. 重建索引
conn.execute(text("""
CREATE INDEX ix_check_in_tasks_user_id ON check_in_tasks(user_id)
"""))
conn.execute(text("""
CREATE INDEX ix_check_in_tasks_id ON check_in_tasks(id)
"""))
conn.execute(text("""
CREATE INDEX ix_task_user_active ON check_in_tasks(user_id, is_active)
"""))
print(" [OK] 重建索引")
conn.commit()
print("\n[SUCCESS] 表结构迁移成功!")
print("\n新的表结构:")
inspector = inspect(engine)
new_columns = [col['name'] for col in inspector.get_columns('check_in_tasks')]
print(f" 列: {', '.join(new_columns)}")
if __name__ == "__main__":
try:
migrate()
print("\n" + "=" * 60)
print("[完成] 迁移成功完成!")
print("\n数据库已更新为新架构:")
print(" - 删除了 signature, texts, values, thread_id, email 列")
print(" - 保留了 payload_config 列(存储完整的 JSON payload")
print(" - ThreadId 现在存储在 payload_config 中")
print(" - Email 现在从 user 表获取")
print("=" * 60)
except Exception as e:
print(f"\n[ERROR] 迁移失败: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
@@ -0,0 +1,135 @@
"""
删除 users 表中 registered_ip 列的迁移脚本
删除的列:
- registered_ip (VARCHAR) - 注册IP地址,不再需要
新架构中移除该字段以保护用户隐私。
运行方式:
python backend/scripts/migrate_remove_registered_ip.py
venv/Scripts/python.exe backend/scripts/migrate_remove_registered_ip.py
"""
import sys
import os
from pathlib import Path
# 添加项目根目录到 Python 路径
project_root = Path(__file__).resolve().parent.parent.parent
sys.path.insert(0, str(project_root))
from sqlalchemy import text, inspect
from backend.models.database import engine
def migrate():
"""执行迁移:删除 registered_ip 列"""
print("开始迁移:删除 users 表中的 registered_ip 列...")
print("=" * 60)
with engine.connect() as conn:
# 检查表结构
inspector = inspect(engine)
columns = [col['name'] for col in inspector.get_columns('users')]
print(f"\n当前表列: {', '.join(columns)}")
if 'registered_ip' not in columns:
print("\n[OK] registered_ip 列已被删除,跳过迁移")
return
print(f"\n需要删除的列: registered_ip")
# SQLite 不支持直接 DROP COLUMN,需要重建表
# 步骤:
# 1. 创建新表(不包含 registered_ip
# 2. 复制数据
# 3. 删除旧表
# 4. 重命名新表
print("\n正在重建表结构...")
# 1. 创建新表
conn.execute(text("""
CREATE TABLE users_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
jwt_sub VARCHAR(200) UNIQUE,
alias VARCHAR(50) NOT NULL UNIQUE,
email VARCHAR(100),
password_hash VARCHAR(200),
authorization TEXT,
jwt_exp VARCHAR(20) DEFAULT '0',
role VARCHAR(20) DEFAULT 'user',
is_approved BOOLEAN DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME
)
"""))
print(" [OK] 创建新表结构")
# 2. 复制数据(不包含 registered_ip
conn.execute(text("""
INSERT INTO users_new
(id, jwt_sub, alias, email, password_hash, authorization, jwt_exp,
role, is_approved, created_at, updated_at)
SELECT
id, jwt_sub, alias, email, password_hash, authorization, jwt_exp,
role, is_approved, created_at, updated_at
FROM users
"""))
print(" [OK] 复制数据到新表")
# 3. 删除旧表
conn.execute(text("DROP TABLE users"))
print(" [OK] 删除旧表")
# 4. 重命名新表
conn.execute(text("ALTER TABLE users_new RENAME TO users"))
print(" [OK] 重命名新表")
# 5. 重建索引
conn.execute(text("""
CREATE INDEX ix_users_jwt_sub ON users(jwt_sub)
"""))
conn.execute(text("""
CREATE INDEX ix_users_alias ON users(alias)
"""))
conn.execute(text("""
CREATE INDEX ix_users_role ON users(role)
"""))
conn.execute(text("""
CREATE INDEX ix_users_is_approved ON users(is_approved)
"""))
conn.execute(text("""
CREATE INDEX ix_users_id ON users(id)
"""))
conn.execute(text("""
CREATE INDEX ix_user_role_approved ON users(role, is_approved)
"""))
print(" [OK] 重建索引")
conn.commit()
print("\n[SUCCESS] 表结构迁移成功!")
print("\n新的表结构:")
inspector = inspect(engine)
new_columns = [col['name'] for col in inspector.get_columns('users')]
print(f" 列: {', '.join(new_columns)}")
if __name__ == "__main__":
try:
migrate()
print("\n" + "=" * 60)
print("[完成] 迁移成功完成!")
print("\n数据库已更新为新架构:")
print(" - 删除了 registered_ip 列(保护用户隐私)")
print(" - 保留了所有其他用户数据")
print("=" * 60)
except Exception as e:
print(f"\n[ERROR] 迁移失败: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
+33 -7
View File
@@ -25,7 +25,7 @@ class AuthService:
Args:
alias: 用户别名
client_ip: 客户端 IP 地址
client_ip: 客户端 IP 地址(用于会话标识)
db: 数据库会话
Returns:
@@ -42,11 +42,11 @@ class AuthService:
if existing_user:
# 检查是否为空 jwt_sub(测试账号)
if not existing_user.jwt_sub or existing_user.jwt_sub == "":
logger.warning(f"用户 {alias} 是测试账号(空 jwt_sub),禁止登录")
if not existing_user.jwt_sub:
logger.warning(f"用户 {alias} 是测试账号(未绑定 QQ),禁止扫码登录")
return {
"status": "error",
"message": "此账户为测试账号,暂未绑定 QQ,无法登录"
"message": "此账户为测试账号,暂未绑定 QQ,无法扫码登录"
}
# 老用户:刷新 Token
@@ -243,7 +243,6 @@ class AuthService:
}
# 创建新用户(待审批状态)
client_ip = session_data.get("client_ip", "")
new_user = User(
jwt_sub=jwt_sub,
alias=alias,
@@ -251,7 +250,6 @@ class AuthService:
jwt_exp=jwt_exp,
role="user",
is_approved=False, # 待审批
registered_ip=client_ip
)
db.add(new_user)
@@ -427,7 +425,9 @@ class AuthService:
"message": "登录成功",
"user_id": user.id,
"authorization": user.authorization,
"alias": user.alias
"alias": user.alias,
"role": user.role,
"is_approved": user.is_approved
}
# 如果 Token 有问题,添加警告信息
@@ -475,3 +475,29 @@ class AuthService:
except Exception as e:
logger.error(f"密码验证异常:{e}")
return False
@staticmethod
def cancel_qrcode_session(session_id: str) -> Dict[str, Any]:
"""
取消二维码登录会话
Args:
session_id: 会话 ID
Returns:
包含取消结果的字典
"""
from backend.workers.token_refresher import cancel_session
success = cancel_session(session_id)
if success:
return {
"success": True,
"message": "会话已取消"
}
else:
return {
"success": False,
"message": "取消失败或会话不存在"
}
+27 -13
View File
@@ -173,8 +173,9 @@ class CheckInService:
"status": "failure",
"message": f"{error_msg},请重新扫码登录"
}
except ValueError:
pass
except ValueError as e:
# jwt_exp 格式不正确,记录警告后跳过 Token 过期验证
logger.warning(f"任务 {task.id} 的用户 jwt_exp 格式不正确: {user.jwt_exp}, 错误: {e}")
# 创建待处理记录
record_id = CheckInService.create_pending_check_in_record(task, trigger_type, db)
@@ -264,8 +265,9 @@ class CheckInService:
"message": f"{error_msg},请重新扫码登录",
"record_id": record.id
}
except ValueError:
pass
except ValueError as e:
# jwt_exp 格式不正确,记录警告后跳过 Token 过期验证
logger.warning(f"任务 {task.id} 的用户 jwt_exp 格式不正确: {user.jwt_exp}, 错误: {e}")
# 执行打卡(传递 task 对象和用户 token)
logger.info(f"🤖 调用 Selenium Worker 执行打卡...")
@@ -409,8 +411,9 @@ class CheckInService:
logger.warning(f"任务 ID: {task.id} 的用户 Token 已过期,跳过")
results["skipped"] += 1
continue
except ValueError:
pass
except ValueError as e:
# jwt_exp 格式不正确,记录警告后继续执行打卡
logger.warning(f"任务 {task.id} 的用户 jwt_exp 格式不正确: {task.user.jwt_exp}, 错误: {e}")
# 执行打卡
result = CheckInService.perform_task_check_in(task, "scheduled", db)
@@ -514,7 +517,7 @@ class CheckInService:
status: Optional[str] = None
) -> List[CheckInRecord]:
"""
获取所有打卡记录(管理员)
获取所有打卡记录(管理员)- 使用联表查询优化性能
Args:
db: 数据库会话
@@ -526,7 +529,12 @@ class CheckInService:
Returns:
打卡记录列表
"""
query = db.query(CheckInRecord)
from sqlalchemy.orm import joinedload
# 使用 joinedload 预加载关联的 task 和 user,避免 N+1 查询
query = db.query(CheckInRecord).options(
joinedload(CheckInRecord.task).joinedload(CheckInTask.user)
)
if task_id:
query = query.filter(CheckInRecord.task_id == task_id)
@@ -543,15 +551,18 @@ class CheckInService:
"""
为打卡记录添加用户和任务信息
注意:如果使用了 joinedloadtask 和 user 已经预加载,不会产生额外查询
Args:
record: 打卡记录对象
db: 数据库会话
db: 数据库会话(可选,仅在未使用 joinedload 时使用)
Returns:
包含额外信息的记录字典
"""
# 获取任务信息
task = db.query(CheckInTask).filter(CheckInTask.id == record.task_id).first()
# 尝试使用已加载的关联对象,如果没有则查询
task = record.task if hasattr(record, 'task') and record.task else \
db.query(CheckInTask).filter(CheckInTask.id == record.task_id).first()
# 获取用户信息
user = None
@@ -559,14 +570,17 @@ class CheckInService:
thread_id = None
if task:
user = db.query(User).filter(User.id == task.user_id).first()
# 尝试使用已加载的 user,否则查询
user = task.user if hasattr(task, 'user') and task.user else \
db.query(User).filter(User.id == task.user_id).first()
task_name = task.name
# 从 payload_config 提取 ThreadId
try:
payload = json.loads(str(task.payload_config))
thread_id = payload.get('ThreadId')
except:
except (json.JSONDecodeError, KeyError, TypeError, AttributeError) as e:
logger.debug(f"解析任务 {task.id} 的 payload_config 失败: {e}")
pass
# 转换为字典并添加额外字段
+392 -49
View File
@@ -1,24 +1,33 @@
import smtplib
"""
邮件业务服务 (高级)
职能:提供业务相关的邮件操作
- 新用户注册通知
- 用户审批通知
- 打卡结果通知
- Token 到期提醒
- 调用底层 EmailNotifier 发送邮件
"""
import logging
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import List
from datetime import datetime
from typing import List
from sqlalchemy.orm import Session
from backend.config import settings
from backend.models import User
from backend.workers.email_notifier import EmailNotifier
from backend.config import settings
logger = logging.getLogger(__name__)
class EmailService:
"""邮件通知服务"""
"""邮件业务服务(高级服务)"""
@staticmethod
def send_email(to_emails: List[str], subject: str, body_html: str) -> bool:
"""
发送邮件
发送邮件(业务层方法,调用底层 EmailNotifier
Args:
to_emails: 收件人邮箱列表
@@ -28,39 +37,7 @@ class EmailService:
Returns:
是否发送成功
"""
# 检查邮件配置
if not all([settings.SMTP_SERVER, settings.SMTP_SENDER_EMAIL, settings.SMTP_SENDER_PASSWORD]):
logger.warning("邮件配置不完整,跳过发送邮件")
return False
try:
# 创建邮件
msg = MIMEMultipart('alternative')
msg['From'] = settings.SMTP_SENDER_EMAIL
msg['To'] = ', '.join(to_emails)
msg['Subject'] = subject
# 添加 HTML 正文
html_part = MIMEText(body_html, 'html', 'utf-8')
msg.attach(html_part)
# 连接 SMTP 服务器并发送
if settings.SMTP_USE_SSL:
server = smtplib.SMTP_SSL(settings.SMTP_SERVER, settings.SMTP_PORT)
else:
server = smtplib.SMTP(settings.SMTP_SERVER, settings.SMTP_PORT)
server.starttls()
server.login(settings.SMTP_SENDER_EMAIL, settings.SMTP_SENDER_PASSWORD)
server.sendmail(settings.SMTP_SENDER_EMAIL, to_emails, msg.as_string())
server.quit()
logger.info(f"邮件发送成功: {subject} -> {', '.join(to_emails)}")
return True
except Exception as e:
logger.error(f"邮件发送失败: {e}")
return False
return EmailNotifier.send_email(to_emails, subject, body_html)
@staticmethod
def notify_new_user_registration(user: User, db: Session) -> bool:
@@ -76,7 +53,12 @@ class EmailService:
"""
# 查询所有管理员邮箱
admins = db.query(User).filter(User.role == "admin", User.email.isnot(None)).all()
admin_emails = [admin.email for admin in admins if admin.email]
# 使用 str() 转换避免类型检查问题,并过滤空值
admin_emails: List[str] = []
for admin in admins:
email_value = admin.email
if email_value is not None: # 使用 is not None 避免布尔转换
admin_emails.append(str(email_value))
if not admin_emails:
logger.warning("没有找到管理员邮箱,无法发送通知")
@@ -85,6 +67,10 @@ class EmailService:
# 构建邮件内容
subject = f"【接龙自动打卡系统】新用户注册通知 - {user.alias}"
# 安全获取创建时间
created_at_value = user.created_at
created_time = created_at_value.strftime('%Y-%m-%d %H:%M:%S') if created_at_value is not None else '未知'
body_html = f"""
<!DOCTYPE html>
<html>
@@ -161,11 +147,7 @@ class EmailService:
</tr>
<tr>
<td>注册时间</td>
<td>{user.created_at.strftime('%Y-%m-%d %H:%M:%S') if user.created_at else '未知'}</td>
</tr>
<tr>
<td>注册 IP</td>
<td>{user.registered_ip or '未记录'}</td>
<td>{created_time}</td>
</tr>
</table>
@@ -175,7 +157,7 @@ class EmailService:
<p>请登录管理后台进行审批操作。</p>
</div>
<p>登录地址:<a href="http://localhost:5173/admin/users">http://localhost:5173/admin/users</a></p>
<p>登录地址:<a href="{settings.FRONTEND_URL}/admin/users">{settings.FRONTEND_URL}/admin/users</a></p>
</div>
<div class="footer">
<p>此邮件由系统自动发送,请勿直接回复。</p>
@@ -188,6 +170,366 @@ class EmailService:
return EmailService.send_email(admin_emails, subject, body_html)
@staticmethod
def notify_user_approved(user: User) -> bool:
"""
通知用户审批已通过
Args:
user: 已通过审批的用户
Returns:
是否发送成功
"""
user_email = user.email
if user_email is None:
logger.info(f"用户 {user.alias} 未设置邮箱,跳过审批通知")
return False
# 构建邮件内容
subject = f"【接龙自动打卡系统】账户审批通过 - {user.alias}"
# 安全获取创建时间
user_created_at = user.created_at
created_time = user_created_at.strftime('%Y-%m-%d %H:%M:%S') if user_created_at is not None else '未知'
body_html = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
body {{
font-family: Arial, sans-serif;
line-height: 1.6;
color: #333;
}}
.container {{
max-width: 600px;
margin: 0 auto;
padding: 20px;
}}
.header {{
background-color: #28a745;
color: white;
padding: 20px;
text-align: center;
border-radius: 5px 5px 0 0;
}}
.content {{
background-color: #f9f9f9;
padding: 20px;
border: 1px solid #ddd;
border-radius: 0 0 5px 5px;
}}
.info-table {{
width: 100%;
border-collapse: collapse;
margin: 15px 0;
}}
.info-table td {{
padding: 10px;
border-bottom: 1px solid #ddd;
}}
.info-table td:first-child {{
font-weight: bold;
width: 120px;
}}
.footer {{
margin-top: 20px;
text-align: center;
color: #999;
font-size: 12px;
}}
.success-box {{
background-color: #d4edda;
border-left: 4px solid #28a745;
padding: 15px;
margin: 15px 0;
}}
.btn {{
display: inline-block;
padding: 12px 24px;
background-color: #667eea;
color: white;
text-decoration: none;
border-radius: 5px;
margin: 10px 0;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h2>🎉 恭喜!账户审批通过</h2>
</div>
<div class="content">
<p>您好,{user.alias}</p>
<p>恭喜您的账户已通过管理员审批,现在可以使用所有功能了。</p>
<div class="success-box">
<strong>✅ 审批结果:</strong> 已通过
<br>
<strong>审批时间:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
</div>
<table class="info-table">
<tr>
<td>用户名</td>
<td>{user.alias}</td>
</tr>
<tr>
<td>账户角色</td>
<td>{user.role}</td>
</tr>
<tr>
<td>注册时间</td>
<td>{created_time}</td>
</tr>
</table>
<p><strong>接下来您可以:</strong></p>
<ul>
<li>登录系统创建自动打卡任务</li>
<li>配置打卡时间和内容</li>
<li>查看打卡记录和统计</li>
</ul>
<p style="text-align: center;">
<a href="{settings.FRONTEND_URL}/login" class="btn">立即登录</a>
</p>
<p style="color: #666; font-size: 14px;">
💡 <strong>温馨提示:</strong>如果您还没有设置密码,建议在个人设置中设置密码,方便后续登录。
</p>
</div>
<div class="footer">
<p>此邮件由系统自动发送,请勿直接回复。</p>
<p>接龙自动打卡系统 © {datetime.now().year}</p>
</div>
</div>
</body>
</html>
"""
return EmailService.send_email([str(user_email)], subject, body_html)
@staticmethod
def notify_user_rejected(user: User, reason: str = "") -> bool:
"""
通知用户审批被拒绝
Args:
user: 被拒绝的用户
reason: 拒绝原因(可选)
Returns:
是否发送成功
"""
user_email = user.email
if user_email is None:
logger.info(f"用户 {user.alias} 未设置邮箱,跳过拒绝通知")
return False
# 构建邮件内容
subject = f"【接龙自动打卡系统】账户审批结果 - {user.alias}"
reason_html = f"<p><strong>拒绝原因:</strong>{reason}</p>" if reason else ""
body_html = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
body {{
font-family: Arial, sans-serif;
line-height: 1.6;
color: #333;
}}
.container {{
max-width: 600px;
margin: 0 auto;
padding: 20px;
}}
.header {{
background-color: #dc3545;
color: white;
padding: 20px;
text-align: center;
border-radius: 5px 5px 0 0;
}}
.content {{
background-color: #f9f9f9;
padding: 20px;
border: 1px solid #ddd;
border-radius: 0 0 5px 5px;
}}
.footer {{
margin-top: 20px;
text-align: center;
color: #999;
font-size: 12px;
}}
.error-box {{
background-color: #f8d7da;
border-left: 4px solid #dc3545;
padding: 15px;
margin: 15px 0;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h2>账户审批结果通知</h2>
</div>
<div class="content">
<p>您好,{user.alias}</p>
<p>很遗憾,您的账户注册申请未能通过审批。</p>
<div class="error-box">
<strong>❌ 审批结果:</strong> 未通过
<br>
<strong>处理时间:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
</div>
{reason_html}
<p>如有疑问,请联系系统管理员。</p>
</div>
<div class="footer">
<p>此邮件由系统自动发送,请勿直接回复。</p>
<p>接龙自动打卡系统 © {datetime.now().year}</p>
</div>
</div>
</body>
</html>
"""
return EmailService.send_email([str(user_email)], subject, body_html)
@staticmethod
def notify_token_expiring(user: User, jwt_exp: str) -> bool:
"""
通知用户 Token 即将过期
Args:
user: 用户对象
jwt_exp: Token 过期时间戳
Returns:
是否发送成功
"""
user_email = user.email
if user_email is None:
logger.info(f"用户 {user.alias} 未设置邮箱,跳过 Token 过期通知")
return False
# 计算剩余时间
try:
exp_timestamp = int(jwt_exp)
current_timestamp = int(datetime.now().timestamp())
minutes_left = (exp_timestamp - current_timestamp) // 60
except ValueError:
minutes_left = 0
# 构建邮件内容
subject = f"【接龙自动打卡系统】登录凭证即将过期 - {user.alias}"
body_html = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
body {{
font-family: Arial, sans-serif;
line-height: 1.6;
color: #333;
}}
.container {{
max-width: 600px;
margin: 0 auto;
padding: 20px;
}}
.header {{
background-color: #ff9800;
color: white;
padding: 20px;
text-align: center;
border-radius: 5px 5px 0 0;
}}
.content {{
background-color: #f9f9f9;
padding: 20px;
border: 1px solid #ddd;
border-radius: 0 0 5px 5px;
}}
.warning-box {{
background-color: #fff3cd;
border-left: 4px solid #ff9800;
padding: 15px;
margin: 15px 0;
}}
.footer {{
margin-top: 20px;
text-align: center;
color: #999;
font-size: 12px;
}}
.btn {{
display: inline-block;
padding: 12px 24px;
background-color: #667eea;
color: white;
text-decoration: none;
border-radius: 5px;
margin: 10px 0;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h2>⚠️ 登录凭证即将过期</h2>
</div>
<div class="content">
<p>您好,{user.alias}</p>
<p>您的 QQ 登录凭证即将在 <strong>{minutes_left} 分钟</strong>后过期。</p>
<div class="warning-box">
<strong>⚠️ 重要提示:</strong>
<ul style="margin: 10px 0; padding-left: 20px;">
<li>登录凭证过期后,系统将无法自动执行您的打卡任务</li>
<li>建议尽快登录系统刷新凭证</li>
<li>如果您已设置密码,可以使用密码登录后扫码刷新凭证</li>
</ul>
</div>
<p><strong>如何刷新凭证:</strong></p>
<ol style="margin: 10px 0; padding-left: 20px;">
<li>登录系统(扫码或密码登录)</li>
<li>在个人设置中点击"刷新凭证"</li>
<li>使用手机 QQ 扫描二维码完成刷新</li>
</ol>
<p style="text-align: center;">
<a href="{settings.FRONTEND_URL}/login" class="btn">立即登录刷新</a>
</p>
</div>
<div class="footer">
<p>此邮件由系统自动发送,请勿直接回复。</p>
<p>接龙自动打卡系统 © {datetime.now().year}</p>
</div>
</div>
</body>
</html>
"""
return EmailService.send_email([str(user_email)], subject, body_html)
@staticmethod
def notify_check_in_result(user: User, task_info: dict, success: bool, message: str = "") -> bool:
"""
@@ -202,7 +544,8 @@ class EmailService:
Returns:
是否发送成功
"""
if not user.email:
user_email = user.email
if user_email is None:
logger.info(f"用户 {user.alias} 未设置邮箱,跳过打卡通知")
return False
@@ -298,4 +641,4 @@ class EmailService:
</html>
"""
return EmailService.send_email([user.email], subject, body_html)
return EmailService.send_email([str(user_email)], subject, body_html)
+4 -2
View File
@@ -13,7 +13,6 @@ from backend.config import settings
from backend.models import get_db, User, CheckInTask
from backend.services.check_in_service import CheckInService
from backend.services.admin_service import AdminService
from backend.workers.email_notifier import send_expiration_notification
logger = logging.getLogger(__name__)
@@ -182,7 +181,10 @@ def check_token_expiration():
# 使用用户账户的邮箱发送通知
if user.email:
logger.info(f"用户 {user.alias} 的 Token 即将过期,发送邮件提醒到 {user.email}...")
send_expiration_notification(user.email, user.jwt_exp)
from backend.services.email_service import EmailService
jwt_exp_value = user.jwt_exp
jwt_exp_str = str(jwt_exp_value) if jwt_exp_value is not None else "0"
EmailService.notify_token_expiring(user, jwt_exp_str)
notified_count += 1
except ValueError:
+7 -8
View File
@@ -32,10 +32,10 @@ class UserService:
# 创建用户(管理员创建的用户没有 jwt_sub,需要后续扫码绑定)
user = User(
jwt_sub="", # 空字符串表示未绑定 QQ
jwt_sub=None, # NULL 表示未绑定 QQ
alias=user_data.alias,
role=user_data.role or "user",
is_approved=True, # 管理员创建的用户默认已审批
is_approved=user_data.is_approved if user_data.is_approved is not None else True, # 使用请求中的值,默认已审批
jwt_exp="0",
authorization=None,
)
@@ -114,12 +114,11 @@ class UserService:
# 搜索过滤
if search:
query = query.filter(
or_(
User.alias.ilike(f"%{search}%"),
User.jwt_sub.ilike(f"%{search}%")
)
)
# 注意:jwt_sub 可能为 NULL,需要处理
search_conditions = [User.alias.ilike(f"%{search}%")]
# 只有当 jwt_sub 不为空时才搜索
search_conditions.append(User.jwt_sub.ilike(f"%{search}%"))
query = query.filter(or_(*search_conditions))
# 角色过滤
if role:
+38 -12
View File
@@ -9,7 +9,6 @@ from selenium.webdriver.chrome.options import Options
from typing import Dict, Any
from backend.config import settings
from backend.workers.email_notifier import send_success_notification, send_failure_notification
logger = logging.getLogger(__name__)
@@ -99,11 +98,20 @@ def get_live_x_api_payload(auth_token: str) -> str:
except Exception as e:
logger.error(f"获取现场 x-api-request-payload 时失败: {e}")
debug_screenshot = os.path.join(settings.BASE_DIR, 'payload_debug.png')
driver.save_screenshot(debug_screenshot)
try:
debug_screenshot = os.path.join(settings.BASE_DIR, 'payload_debug.png')
driver.save_screenshot(debug_screenshot)
except Exception as screenshot_error:
logger.warning(f"保存调试截图失败: {screenshot_error}")
finally:
driver.quit()
# 优雅关闭 WebDriver,避免 Windows asyncio ConnectionResetError
try:
driver.quit()
except Exception as e:
# 忽略 WebDriver 关闭时的连接错误(Windows 平台常见问题)
if "WinError 10054" not in str(e) and "ConnectionResetError" not in str(e):
logger.warning(f"关闭 WebDriver 时出现警告: {e}")
return payload_signature
@@ -127,7 +135,8 @@ def perform_check_in(task, user_token: str) -> Dict[str, Any]:
try:
payload_dict = json.loads(task.payload_config) if task.payload_config else {}
signature = payload_dict.get('Signature', 'Unknown')
except:
except (json.JSONDecodeError, KeyError, TypeError, AttributeError) as e:
logger.debug(f"解析任务 {task.id} 的 payload_config 失败: {e}")
signature = 'Unknown'
logger.info(f"Selenium打卡: 正在为任务 ID: {task.id} (Signature: {signature}) 执行打卡...")
@@ -196,14 +205,21 @@ def perform_check_in(task, user_token: str) -> Dict[str, Any]:
logger.info(f"✉️ 任务 ID: {task.id} (Signature: {signature}) 打卡请求完成!响应: {response_text}")
# 判断响应内容(参考 V1 实现逻辑)
# 使用用户账户的邮箱,而不是任务的邮箱
email = task.user.email if task.user else None
# 情况1: 明确包含"打卡成功" → 成功
if "打卡成功" in response_text:
logger.info(f"✅ 检测到成功关键字 '打卡成功',打卡成功")
if email:
send_success_notification(email)
# 发送成功邮件通知
if task.user and task.user.email:
try:
from backend.services.email_service import EmailService
task_info = {
'thread_id': payload.get('ThreadId', '未知'),
'name': getattr(task, 'name', '打卡任务')
}
EmailService.notify_check_in_result(task.user, task_info, True, "打卡成功")
except Exception as e:
logger.error(f"发送打卡成功邮件失败: {e}")
return {
"success": True,
"status": "success",
@@ -238,8 +254,18 @@ def perform_check_in(task, user_token: str) -> Dict[str, Any]:
# 情况4: Token 失效的特征标识 → 失败
elif ("登录" in response_text):
logger.warning(f"⚠️ 检测到登录失败关键字,Token 可能已失效")
if email:
send_failure_notification(email)
# 发送失败邮件通知
if task.user and task.user.email:
try:
from backend.services.email_service import EmailService
task_info = {
'thread_id': payload.get('ThreadId', '未知'),
'name': getattr(task, 'name', '打卡任务')
}
EmailService.notify_check_in_result(task.user, task_info, False, "Token 已失效,需要重新授权")
except Exception as e:
logger.error(f"发送打卡失败邮件失败: {e}")
return {
"success": False,
"status": "failure",
+92 -485
View File
@@ -1,512 +1,119 @@
"""
邮件发送引擎 (底层)
职能:提供基础的 SMTP 邮件发送功能
- SMTP 服务器连接
- 邮件发送
- 配置管理
- 不包含业务逻辑
"""
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import time
import logging
from typing import List, Optional
from backend.config import settings
logger = logging.getLogger(__name__)
# --- 邮件模板 ---
EXPIRATION_HTML_TEMPLATE = """
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Token 到期提醒</title>
<style>
body {{
margin: 0;
padding: 0;
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', 'PingFang SC', 'Hiragino Sans GB', 'Microsoft YaHei', sans-serif;
background-color: #f5f5f5;
line-height: 1.6;
}}
.container {{
max-width: 600px;
margin: 40px auto;
background-color: #ffffff;
border-radius: 12px;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.1);
overflow: hidden;
}}
.header {{
background: linear-gradient(135deg, #f44336 0%, #e91e63 100%);
color: white;
padding: 30px 20px;
text-align: center;
}}
.header h1 {{
margin: 0;
font-size: 24px;
font-weight: 600;
}}
.content {{
padding: 30px 40px;
}}
.alert-box {{
background-color: #fff3e0;
border-left: 4px solid #ff9800;
padding: 16px;
margin: 20px 0;
border-radius: 4px;
}}
.info-item {{
margin: 16px 0;
padding: 12px;
background-color: #f9f9f9;
border-radius: 6px;
}}
.info-item strong {{
color: #333;
display: inline-block;
min-width: 100px;
}}
.highlight {{
color: #f44336;
font-weight: 600;
}}
.action-button {{
display: inline-block;
margin: 20px 0;
padding: 12px 32px;
background: linear-gradient(135deg, #f44336 0%, #e91e63 100%);
color: white;
text-decoration: none;
border-radius: 6px;
font-weight: 500;
}}
.footer {{
background-color: #fafafa;
padding: 20px;
text-align: center;
color: #999;
font-size: 12px;
border-top: 1px solid #eeeeee;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>⚠️ Token 即将到期提醒</h1>
</div>
<div class="content">
<p>您好,</p>
<div class="alert-box">
<p>您的接龙打卡系统 <span class="highlight">Token 即将到期</span>,为避免影响自动打卡功能,请尽快刷新您的 Token。</p>
</div>
<div class="info-item">
<strong>到期时间:</strong><span class="highlight">{exp_time}</span>
</div>
<div class="info-item">
<strong>通知时间:</strong>{send_time}
</div>
<p style="margin-top: 20px; color: #666;">
请登录系统,前往 <strong>用户设置</strong> 页面刷新您的 Token,以确保自动打卡功能正常运行。
</p>
</div>
<div class="footer">
<p>此邮件由接龙自动打卡系统自动发送,请勿直接回复</p>
<p>CheckIn App V2 © 2026</p>
</div>
</div>
</body>
</html>
"""
class EmailNotifier:
"""邮件发送引擎(底层服务)"""
SUCCESS_HTML_TEMPLATE = """
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>打卡成功通知</title>
<style>
body {{
margin: 0;
padding: 0;
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', 'PingFang SC', 'Hiragino Sans GB', 'Microsoft YaHei', sans-serif;
background-color: #f5f5f5;
line-height: 1.6;
}}
.container {{
max-width: 600px;
margin: 40px auto;
background-color: #ffffff;
border-radius: 12px;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.1);
overflow: hidden;
}}
.header {{
background: linear-gradient(135deg, #4caf50 0%, #66bb6a 100%);
color: white;
padding: 30px 20px;
text-align: center;
}}
.header h1 {{
margin: 0;
font-size: 24px;
font-weight: 600;
}}
.content {{
padding: 30px 40px;
}}
.success-icon {{
text-align: center;
font-size: 64px;
margin: 20px 0;
}}
.success-box {{
background-color: #e8f5e9;
border-left: 4px solid #4caf50;
padding: 16px;
margin: 20px 0;
border-radius: 4px;
}}
.info-item {{
margin: 16px 0;
padding: 12px;
background-color: #f9f9f9;
border-radius: 6px;
}}
.info-item strong {{
color: #333;
display: inline-block;
min-width: 100px;
}}
.highlight {{
color: #4caf50;
font-weight: 600;
}}
.footer {{
background-color: #fafafa;
padding: 20px;
text-align: center;
color: #999;
font-size: 12px;
border-top: 1px solid #eeeeee;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>✅ 打卡成功</h1>
</div>
<div class="content">
<div class="success-icon">🎉</div>
<p>您好,</p>
<div class="success-box">
<p><strong>自动打卡已成功完成!</strong></p>
</div>
<div class="info-item">
<strong>打卡时间:</strong><span class="highlight">{send_time}</span>
</div>
<div class="info-item">
<strong>打卡状态:</strong><span class="highlight">成功 ✓</span>
</div>
<p style="margin-top: 20px; color: #666;">
您无需进行任何操作,系统已自动为您完成打卡。此邮件仅作通知。
</p>
</div>
<div class="footer">
<p>此邮件由接龙自动打卡系统自动发送,请勿直接回复</p>
<p>CheckIn App V2 © 2026</p>
</div>
</div>
</body>
</html>
"""
@staticmethod
def get_email_config() -> Optional[dict]:
"""
从环境变量读取邮件配置
FAILURE_HTML_TEMPLATE = """
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>打卡失败通知</title>
<style>
body {{
margin: 0;
padding: 0;
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', 'PingFang SC', 'Hiragino Sans GB', 'Microsoft YaHei', sans-serif;
background-color: #f5f5f5;
line-height: 1.6;
}}
.container {{
max-width: 600px;
margin: 40px auto;
background-color: #ffffff;
border-radius: 12px;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.1);
overflow: hidden;
}}
.header {{
background: linear-gradient(135deg, #f44336 0%, #e91e63 100%);
color: white;
padding: 30px 20px;
text-align: center;
}}
.header h1 {{
margin: 0;
font-size: 24px;
font-weight: 600;
}}
.content {{
padding: 30px 40px;
}}
.error-icon {{
text-align: center;
font-size: 64px;
margin: 20px 0;
}}
.error-box {{
background-color: #ffebee;
border-left: 4px solid #f44336;
padding: 16px;
margin: 20px 0;
border-radius: 4px;
}}
.info-item {{
margin: 16px 0;
padding: 12px;
background-color: #f9f9f9;
border-radius: 6px;
}}
.info-item strong {{
color: #333;
display: inline-block;
min-width: 100px;
}}
.highlight {{
color: #f44336;
font-weight: 600;
}}
.action-box {{
background-color: #fff3e0;
padding: 16px;
margin: 20px 0;
border-radius: 6px;
border: 1px solid #ffb74d;
}}
.action-box h3 {{
margin: 0 0 12px 0;
color: #ff6f00;
font-size: 16px;
}}
.action-box ul {{
margin: 8px 0;
padding-left: 20px;
}}
.action-box li {{
margin: 6px 0;
color: #666;
}}
.footer {{
background-color: #fafafa;
padding: 20px;
text-align: center;
color: #999;
font-size: 12px;
border-top: 1px solid #eeeeee;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>❌ 打卡失败通知</h1>
</div>
<div class="content">
<div class="error-icon">⚠️</div>
<p>您好,</p>
<div class="error-box">
<p><strong>自动打卡失败,需要您的关注!</strong></p>
</div>
<div class="info-item">
<strong>失败时间:</strong>{send_time}
</div>
<div class="info-item">
<strong>失败原因:</strong><span class="highlight">Token 已失效(需要登录)</span>
</div>
<div class="action-box">
<h3>📋 需要您执行以下操作:</h3>
<ul>
<li>登录接龙自动打卡系统</li>
<li>刷新您的 Authorization Token</li>
<li>确认 Token 更新成功</li>
</ul>
</div>
<p style="margin-top: 20px; color: #666;">
Token 失效是正常现象,通常在一段时间后会自动过期。刷新 Token 后,系统将恢复自动打卡功能。
</p>
</div>
<div class="footer">
<p>此邮件由接龙自动打卡系统自动发送,请勿直接回复</p>
<p>CheckIn App V2 © 2026</p>
</div>
</div>
</body>
</html>
"""
Returns:
dict: 邮件配置,如果配置不完整则返回 None
"""
# 检查必要的邮件配置是否存在
if not settings.SMTP_SERVER or not settings.SMTP_SENDER_EMAIL:
logger.debug("邮件配置未完成(SMTP_SERVER 或 SMTP_SENDER_EMAIL 为空),邮件发送功能已禁用")
return None
if not settings.SMTP_PORT:
logger.debug("邮件配置未完成(SMTP_PORT 为空),邮件发送功能已禁用")
return None
def get_email_settings():
"""
从环境变量读取邮件配置
# 返回配置字典
return {
'smtp_server': settings.SMTP_SERVER,
'smtp_port': settings.SMTP_PORT,
'sender_email': settings.SMTP_SENDER_EMAIL,
'sender_password': settings.SMTP_SENDER_PASSWORD,
'use_ssl': settings.SMTP_USE_SSL
}
如果 SMTP_SERVER、SMTP_PORT 或 SMTP_SENDER_EMAIL 有任一为空,则禁用邮件功能
@staticmethod
def send_email(
to_emails: List[str],
subject: str,
html_content: str,
from_email: Optional[str] = None
) -> bool:
"""
发送邮件(底层方法)
Returns:
dict: 邮件配置,如果配置不完整则返回 None
"""
# 检查必要的邮件配置是否存在
if not settings.SMTP_SERVER or not settings.SMTP_SENDER_EMAIL:
logger.debug("邮件配置未完成(SMTP_SERVER 或 SMTP_SENDER_EMAIL 为空),邮件发送功能已禁用")
return None
Args:
to_emails: 收件人邮箱列表
subject: 邮件主题
html_content: HTML 邮件内容
from_email: 发件人邮箱(可选,默认使用配置中的发件人)
if not settings.SMTP_PORT:
logger.debug("邮件配置未完成(SMTP_PORT 为空),邮件发送功能已禁用")
return None
Returns:
是否发送成功
"""
email_config = EmailNotifier.get_email_config()
if not email_config:
logger.warning("邮件配置不完整,跳过发送邮件")
return False
# 返回配置字典
return {
'smtpserver': settings.SMTP_SERVER,
'smtpport': settings.SMTP_PORT,
'senderemail': settings.SMTP_SENDER_EMAIL,
'senderpassword': settings.SMTP_SENDER_PASSWORD,
'use_ssl': settings.SMTP_USE_SSL
}
try:
# 创建邮件
msg = MIMEMultipart('alternative')
msg['From'] = from_email or email_config['sender_email']
msg['To'] = ', '.join(to_emails)
msg['Subject'] = subject
# 添加 HTML 正文
html_part = MIMEText(html_content, 'html', 'utf-8')
msg.attach(html_part)
def _send_email(to_email: str, subject: str, html_content: str, email_settings: dict) -> bool:
"""
发送邮件
Args:
to_email: 收件人邮箱
subject: 邮件主题
html_content: HTML 邮件内容
email_settings: 邮件配置
Returns:
是否发送成功
"""
try:
msg = MIMEMultipart()
msg["From"] = email_settings['senderemail']
msg["To"] = to_email
msg["Subject"] = subject
msg.attach(MIMEText(html_content, 'html', 'utf-8'))
# 根据配置选择使用 SSL 或普通 SMTP
if email_settings.get('use_ssl', True):
with smtplib.SMTP_SSL(email_settings['smtpserver'], int(email_settings['smtpport'])) as server:
server.login(email_settings['senderemail'], email_settings['senderpassword'])
server.sendmail(msg["From"], msg["To"], msg.as_string())
else:
with smtplib.SMTP(email_settings['smtpserver'], int(email_settings['smtpport'])) as server:
# 连接 SMTP 服务器并发送
if email_config.get('use_ssl', True):
server = smtplib.SMTP_SSL(
email_config['smtp_server'],
int(email_config['smtp_port'])
)
else:
server = smtplib.SMTP(
email_config['smtp_server'],
int(email_config['smtp_port'])
)
server.starttls()
server.login(email_settings['senderemail'], email_settings['senderpassword'])
server.sendmail(msg["From"], msg["To"], msg.as_string())
logger.info(f"已成功向 {to_email} 发送邮件,主题: {subject}")
return True
server.login(email_config['sender_email'], email_config['sender_password'])
server.sendmail(msg['From'], to_emails, msg.as_string())
server.quit()
except Exception as e:
logger.error(f"{to_email} 发送邮件时失败: {e}")
return False
logger.info(f"邮件发送成功: {subject} -> {', '.join(to_emails)}")
return True
except Exception as e:
logger.error(f"邮件发送失败: {e}")
return False
def send_expiration_notification(email: str, jwt_exp: str) -> bool:
"""
发送 Token 到期提醒邮件
@staticmethod
def is_email_enabled() -> bool:
"""
检查邮件功能是否启用
Args:
email: 收件人邮箱
jwt_exp: Token 过期时间戳
Returns:
邮件功能是否可用
"""
return EmailNotifier.get_email_config() is not None
Returns:
是否发送成功
"""
email_settings = get_email_settings()
if not email_settings:
return False
try:
exp_time = time.strftime("%Y年%m月%d%H:%M:%S", time.localtime(float(jwt_exp)))
send_time = time.strftime("%Y年%m月%d%H:%M:%S", time.localtime())
html = EXPIRATION_HTML_TEMPLATE.format(
name=email,
exp_time=exp_time,
send_time=send_time
)
return _send_email(email, "接龙管家Token到期通知", html, email_settings)
except Exception as e:
logger.error(f"发送过期通知邮件失败: {e}")
return False
def send_success_notification(email: str) -> bool:
"""
发送打卡成功通知邮件
Args:
email: 收件人邮箱
Returns:
是否发送成功
"""
email_settings = get_email_settings()
if not email_settings:
return False
try:
send_time = time.strftime("%Y年%m月%d%H:%M:%S", time.localtime())
html = SUCCESS_HTML_TEMPLATE.format(
name=email,
send_time=send_time
)
return _send_email(email, "自动打卡成功通知", html, email_settings)
except Exception as e:
logger.error(f"发送成功通知邮件失败: {e}")
return False
def send_failure_notification(email: str) -> bool:
"""
发送打卡失败通知邮件
Args:
email: 收件人邮箱
Returns:
是否发送成功
"""
email_settings = get_email_settings()
if not email_settings:
return False
try:
send_time = time.strftime("%Y年%m月%d%H:%M:%S", time.localtime())
html = FAILURE_HTML_TEMPLATE.format(
name=email,
send_time=send_time
)
return _send_email(email, "打卡失败 - 需要刷新Token", html, email_settings)
except Exception as e:
logger.error(f"发送失败通知邮件失败: {e}")
return False
+67 -1
View File
@@ -86,6 +86,53 @@ def get_session_data(session_id: str) -> dict:
return None
def cancel_session(session_id: str) -> bool:
"""
取消登录会话
Args:
session_id: 会话 ID
Returns:
是否成功取消
"""
filepath = settings.SESSION_DIR / f"{session_id}.json"
lock_path = settings.SESSION_DIR / f"{session_id}.json.lock"
if not filepath.exists():
logger.warning(f"尝试取消不存在的会话: {session_id}")
return False
try:
with FileLock(lock_path, timeout=5):
# 读取当前会话数据
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
if not content:
return False
data = json.loads(content)
# 如果已经成功,不允许取消
if data.get('status') == 'success':
logger.info(f"会话 {session_id} 已成功,无法取消")
return False
# 标记为已取消
data['status'] = 'cancelled'
data['message'] = '用户取消登录'
# 写回文件
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"✅ 会话 {session_id} 已取消")
return True
except Exception as e:
logger.error(f"取消会话 {session_id} 失败: {e}")
return False
def get_token_headless(session_id: str, jwt_sub: str = None, alias: str = None, client_ip: str = "") -> None:
"""
使用 Selenium 获取 QQ 扫码登录的 Token
@@ -193,7 +240,26 @@ def get_token_headless(session_id: str, jwt_sub: str = None, alias: str = None,
current_step = "等待用户扫描登录 (Cookie 'token' 出现)"
cookie_name_to_find = "token"
logger.info(f"Selenium ({session_id}): {current_step}...")
WebDriverWait(driver, 120, 1).until(lambda d: d.get_cookie(cookie_name_to_find) is not None) # 改为 120 秒(2分钟)
# 自定义等待逻辑:每秒检查cookie和session状态
max_wait_seconds = 120
import time
for i in range(max_wait_seconds):
# 检查session是否被取消
status = get_session_status(session_id)
if status == 'cancelled':
logger.info(f"Selenium ({session_id}): 用户取消了登录,终止会话")
raise Exception("用户取消登录")
# 检查cookie是否出现
cookie = driver.get_cookie(cookie_name_to_find)
if cookie:
break
time.sleep(1)
else:
# 超时未获取到cookie
raise TimeoutException("等待扫码超时")
cookie = driver.get_cookie(cookie_name_to_find)
if cookie: