mirror of
https://github.com/Cccc-owo/CheckInApp.git
synced 2026-06-17 05:56:29 +00:00
377 lines
13 KiB
Python
377 lines
13 KiB
Python
import logging
|
||
import os
|
||
import time
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from apscheduler.schedulers.background import BackgroundScheduler
|
||
from apscheduler.triggers.cron import CronTrigger
|
||
from filelock import FileLock
|
||
from sqlalchemy.orm import Session
|
||
from croniter import croniter
|
||
|
||
from backend.config import settings
|
||
from backend.models import get_db, User, CheckInTask
|
||
from backend.services.check_in_service import CheckInService
|
||
from backend.services.admin_service import AdminService
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 全局调度器实例
|
||
scheduler = None
|
||
scheduler_lock = None
|
||
|
||
|
||
def load_scheduled_tasks(db: Session, scheduler_instance):
|
||
"""
|
||
从数据库加载所有启用的定时任务并添加到 APScheduler
|
||
|
||
只加载满足以下条件的任务:
|
||
- is_active = True
|
||
- cron_expression IS NOT NULL
|
||
|
||
Args:
|
||
db: 数据库会话
|
||
scheduler_instance: APScheduler BackgroundScheduler 实例
|
||
|
||
Returns:
|
||
包含统计信息的字典
|
||
"""
|
||
logger.info("正在从数据库加载定时任务...")
|
||
|
||
# 移除所有现有的动态任务(保留系统任务)
|
||
for job in scheduler_instance.get_jobs():
|
||
if job.id.startswith('task_'):
|
||
scheduler_instance.remove_job(job.id)
|
||
|
||
# 查询所有启用且有 cron 表达式的任务
|
||
tasks = db.query(CheckInTask).filter(
|
||
CheckInTask.is_active == True,
|
||
CheckInTask.cron_expression.isnot(None)
|
||
).all()
|
||
|
||
loaded_count = 0
|
||
skipped_count = 0
|
||
error_count = 0
|
||
|
||
for task in tasks:
|
||
try:
|
||
# 验证 cron 表达式
|
||
cron_str = str(task.cron_expression) if task.cron_expression else None
|
||
if not cron_str or not croniter.is_valid(cron_str):
|
||
logger.warning(f"跳过任务 {task.id}: 无效的 cron 表达式 '{task.cron_expression}'")
|
||
skipped_count += 1
|
||
continue
|
||
|
||
# 创建任务 ID
|
||
job_id = f"task_{task.id}"
|
||
|
||
# 检查任务是否已存在
|
||
if scheduler_instance.get_job(job_id):
|
||
logger.debug(f"任务 {task.id} 已存在,跳过")
|
||
continue
|
||
|
||
# 添加任务到调度器
|
||
scheduler_instance.add_job(
|
||
func=scheduled_check_in_task,
|
||
trigger=CronTrigger.from_crontab(cron_str),
|
||
id=job_id,
|
||
name=f"CheckIn-Task-{task.id}",
|
||
args=[task.id],
|
||
replace_existing=True
|
||
)
|
||
|
||
logger.info(f"✅ 加载任务 {task.id}: {task.name} (Cron: {task.cron_expression})")
|
||
loaded_count += 1
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ 加载任务 {task.id} 时出错: {str(e)}")
|
||
error_count += 1
|
||
|
||
result = {
|
||
"loaded": loaded_count,
|
||
"skipped": skipped_count,
|
||
"errors": error_count,
|
||
"total": len(tasks)
|
||
}
|
||
|
||
logger.info(f"任务加载完成: {result}")
|
||
return result
|
||
|
||
|
||
def scheduled_check_in_task(task_id: int):
|
||
"""
|
||
执行指定任务的定时打卡
|
||
|
||
这是由 APScheduler 在 cron 触发器触发时调用的函数
|
||
使用与批量打卡相同的逻辑
|
||
"""
|
||
from backend.models.database import SessionLocal
|
||
|
||
db = SessionLocal()
|
||
try:
|
||
task = db.query(CheckInTask).filter(CheckInTask.id == task_id).first()
|
||
if not task:
|
||
logger.error(f"任务 {task_id} 不存在")
|
||
return
|
||
|
||
if not task.is_scheduled_enabled:
|
||
logger.info(f"任务 {task_id} 未启用定时打卡 (is_active={task.is_active}, cron={task.cron_expression})")
|
||
return
|
||
|
||
logger.info(f"🤖 执行定时打卡任务 {task_id}")
|
||
|
||
# 开始异步打卡
|
||
CheckInService.start_async_check_in(task, "scheduled", db)
|
||
|
||
except Exception as e:
|
||
logger.error(f"执行定时打卡任务 {task_id} 时出错: {str(e)}", exc_info=True)
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def cleanup_expired_pending_users():
|
||
"""定时清理过期未审批用户(24小时未审批)"""
|
||
logger.info("Scheduler: 正在清理过期未审批用户...")
|
||
|
||
try:
|
||
# 创建数据库会话
|
||
db = next(get_db())
|
||
|
||
try:
|
||
count = AdminService.delete_expired_pending_users(db)
|
||
logger.info(f"Scheduler: 已删除 {count} 个过期未审批用户")
|
||
finally:
|
||
db.close()
|
||
|
||
except Exception as e:
|
||
logger.error(f"Scheduler: 清理过期用户任务发生错误: {e}", exc_info=True)
|
||
|
||
|
||
def check_token_expiration():
|
||
"""
|
||
检查打卡 Token 是否即将过期,并发送邮件提醒
|
||
|
||
检查所有用户的打卡 authorization token,如果在 30 分钟内过期,发送提醒邮件
|
||
注意:检查的是打卡业务 token,不是网站登录 JWT token
|
||
"""
|
||
logger.info("Scheduler: 正在执行打卡 Token 过期检查...")
|
||
|
||
try:
|
||
# 创建数据库会话
|
||
db = next(get_db())
|
||
|
||
try:
|
||
# 获取所有用户
|
||
from backend.services.auth_service import AuthService
|
||
|
||
users = db.query(User).all()
|
||
current_timestamp = int(datetime.now().timestamp())
|
||
|
||
notified_count = 0
|
||
|
||
for user in users:
|
||
# 使用统一的验证方法
|
||
result = AuthService.verify_checkin_authorization(user)
|
||
|
||
# 获取过期时间戳和剩余时间
|
||
exp_timestamp = result.get("expires_at")
|
||
if not exp_timestamp:
|
||
continue
|
||
|
||
time_until_expiry = exp_timestamp - current_timestamp
|
||
|
||
# 情况1:Token 即将过期(过期前 30 分钟内,且还未过期)
|
||
if 0 < time_until_expiry < 1800: # 30分钟 = 1800秒
|
||
if user.email and not user.token_expiring_notified:
|
||
logger.info(f"用户 {user.alias} 的打卡 Token 即将过期,发送邮件提醒到 {user.email}...")
|
||
from backend.services.email_service import EmailService
|
||
jwt_exp_value = user.jwt_exp
|
||
jwt_exp_str = str(jwt_exp_value) if jwt_exp_value is not None else "0"
|
||
|
||
# 发送"即将过期"邮件
|
||
success = EmailService.notify_token_expiring(user, jwt_exp_str)
|
||
|
||
if success:
|
||
user.token_expiring_notified = True
|
||
db.commit()
|
||
notified_count += 1
|
||
logger.info(f"用户 {user.alias} 的打卡 Token 即将过期邮件已发送并标记")
|
||
else:
|
||
logger.warning(f"用户 {user.alias} 的打卡 Token 即将过期邮件发送失败")
|
||
|
||
# 情况2:Token 已过期
|
||
# 修改逻辑:只要过期就发送提醒(不限制在30分钟内)
|
||
# 但为了避免频繁发送,使用 token_expired_notified 标志
|
||
elif time_until_expiry <= 0: # Token 已过期
|
||
if user.email and not user.token_expired_notified:
|
||
logger.info(f"用户 {user.alias} 的打卡 Token 已过期,发送邮件提醒到 {user.email}...")
|
||
from backend.services.email_service import EmailService
|
||
|
||
# 发送"已过期"邮件
|
||
success = EmailService.notify_token_expired(user)
|
||
|
||
if success:
|
||
user.token_expired_notified = True
|
||
db.commit()
|
||
notified_count += 1
|
||
logger.info(f"用户 {user.alias} 的打卡 Token 已过期邮件已发送并标记")
|
||
else:
|
||
logger.warning(f"用户 {user.alias} 的打卡 Token 已过期邮件发送失败")
|
||
|
||
# 情况3:Token 正常(剩余时间 > 30 分钟),重置提醒标志
|
||
elif time_until_expiry >= 1800:
|
||
if user.token_expiring_notified or user.token_expired_notified:
|
||
user.token_expiring_notified = False
|
||
user.token_expired_notified = False
|
||
db.commit()
|
||
logger.info(f"用户 {user.alias} 的打卡 Token 已刷新,重置所有提醒标志")
|
||
|
||
logger.info(f"Scheduler: 打卡 Token 过期检查完成,共发送 {notified_count} 封提醒邮件")
|
||
|
||
finally:
|
||
db.close()
|
||
|
||
except Exception as e:
|
||
logger.error(f"Scheduler: Token 过期检查任务发生错误: {e}", exc_info=True)
|
||
|
||
|
||
def cleanup_old_sessions():
|
||
"""
|
||
清理旧的会话文件
|
||
|
||
删除超过指定时间的会话文件
|
||
"""
|
||
logger.info("Scheduler: 开始清理旧会话文件...")
|
||
|
||
try:
|
||
session_dir = settings.SESSION_DIR
|
||
|
||
if not session_dir.exists():
|
||
logger.info("Scheduler: 会话目录不存在,跳过清理")
|
||
return
|
||
|
||
current_time = time.time()
|
||
cleanup_threshold = settings.SESSION_CLEANUP_HOURS * 3600 # 转换为秒
|
||
|
||
deleted_count = 0
|
||
|
||
for file_path in session_dir.glob("*.json"):
|
||
try:
|
||
# 获取文件修改时间
|
||
file_mtime = file_path.stat().st_mtime
|
||
file_age = current_time - file_mtime
|
||
|
||
# 如果文件超过阈值,删除它
|
||
if file_age > cleanup_threshold:
|
||
# 同时删除对应的锁文件
|
||
lock_file = session_dir / f"{file_path.stem}.json.lock"
|
||
|
||
file_path.unlink()
|
||
if lock_file.exists():
|
||
lock_file.unlink()
|
||
|
||
deleted_count += 1
|
||
logger.debug(f"删除旧会话文件: {file_path.name}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"删除会话文件 {file_path.name} 时出错: {e}")
|
||
|
||
logger.info(f"Scheduler: 会话文件清理完成,共删除 {deleted_count} 个文件")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Scheduler: 清理会话文件任务发生错误: {e}", exc_info=True)
|
||
|
||
|
||
def start_scheduler():
|
||
"""
|
||
启动调度器
|
||
|
||
使用文件锁确保在多进程部署时只有一个调度器运行
|
||
"""
|
||
global scheduler, scheduler_lock
|
||
|
||
# 创建调度器锁文件
|
||
lock_file = settings.BASE_DIR / "scheduler.lock"
|
||
scheduler_lock = FileLock(lock_file, timeout=1)
|
||
|
||
try:
|
||
# 尝试获取锁
|
||
scheduler_lock.acquire(blocking=False)
|
||
|
||
logger.info("成功获取调度器锁,启动调度器...")
|
||
|
||
# 创建后台调度器
|
||
scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
|
||
|
||
# 添加 Token 过期检查任务(每隔指定分钟)
|
||
scheduler.add_job(
|
||
check_token_expiration,
|
||
trigger="interval",
|
||
minutes=settings.TOKEN_CHECK_INTERVAL_MINUTES,
|
||
id="check_token_expiration",
|
||
name="Token 过期检查任务",
|
||
replace_existing=True
|
||
)
|
||
logger.info(
|
||
f"已添加 Token 过期检查任务: 每 {settings.TOKEN_CHECK_INTERVAL_MINUTES} 分钟"
|
||
)
|
||
|
||
# 添加会话文件清理任务(每隔指定小时)
|
||
scheduler.add_job(
|
||
cleanup_old_sessions,
|
||
trigger="interval",
|
||
hours=settings.SESSION_CLEANUP_INTERVAL_HOURS,
|
||
id="cleanup_old_sessions",
|
||
name="清理旧会话文件任务",
|
||
replace_existing=True
|
||
)
|
||
logger.info(
|
||
f"已添加会话清理任务: 每 {settings.SESSION_CLEANUP_INTERVAL_HOURS} 小时"
|
||
)
|
||
|
||
# 添加清理过期未审批用户任务(每小时执行一次)
|
||
scheduler.add_job(
|
||
cleanup_expired_pending_users,
|
||
trigger="interval",
|
||
hours=1,
|
||
id="cleanup_expired_pending_users",
|
||
name="清理过期未审批用户任务",
|
||
replace_existing=True
|
||
)
|
||
logger.info("已添加清理过期未审批用户任务: 每 1 小时")
|
||
|
||
# 新增:从数据库加载动态任务
|
||
db = next(get_db())
|
||
try:
|
||
load_scheduled_tasks(db, scheduler)
|
||
finally:
|
||
db.close()
|
||
|
||
# 启动调度器
|
||
scheduler.start()
|
||
logger.info("调度器已启动")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"无法获取调度器锁或启动失败: {e}")
|
||
logger.info("可能其他进程已经在运行调度器,跳过启动")
|
||
scheduler_lock = None
|
||
|
||
|
||
def stop_scheduler():
|
||
"""
|
||
停止调度器并释放锁
|
||
"""
|
||
global scheduler, scheduler_lock
|
||
|
||
if scheduler:
|
||
logger.info("正在停止调度器...")
|
||
scheduler.shutdown()
|
||
logger.info("调度器已停止")
|
||
|
||
if scheduler_lock:
|
||
try:
|
||
scheduler_lock.release()
|
||
logger.info("已释放调度器锁")
|
||
except Exception as e:
|
||
logger.warning(f"释放调度器锁时出错: {e}")
|