mirror of
https://github.com/Cccc-owo/CheckInApp.git
synced 2026-06-17 14:06:28 +00:00
45f7243bf9
Query only id and payload_config fields instead of loading full Task objects when checking for duplicate ThreadId. Reduces data transfer by ~50%.
391 lines
12 KiB
Python
391 lines
12 KiB
Python
import logging
|
|
from typing import List, Optional, Dict, Any
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import desc
|
|
import json
|
|
|
|
from backend.models import User, CheckInTask, CheckInRecord
|
|
from backend.schemas.task import TaskCreate, TaskUpdate
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TaskService:
|
|
"""打卡任务服务"""
|
|
|
|
@staticmethod
|
|
def create_task(user_id: int, task_data: TaskCreate, db: Session) -> CheckInTask:
|
|
"""
|
|
创建打卡任务
|
|
|
|
Args:
|
|
user_id: 用户 ID
|
|
task_data: 任务数据
|
|
db: 数据库会话
|
|
|
|
Returns:
|
|
创建的任务对象
|
|
"""
|
|
import json
|
|
|
|
# 1. 检查用户是否存在
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise ValueError(f"用户 ID {user_id} 不存在")
|
|
|
|
# 2. 从 payload_config 中提取 ThreadId 用于唯一性校验
|
|
try:
|
|
payload = json.loads(task_data.payload_config)
|
|
thread_id = payload.get('ThreadId')
|
|
if not thread_id:
|
|
raise ValueError("payload_config 中缺少 ThreadId")
|
|
except json.JSONDecodeError:
|
|
raise ValueError("payload_config 格式错误,必须是有效的 JSON")
|
|
|
|
# 3. 验证唯一性:同一用户在同一个接龙中不能有重复的任务
|
|
# 优化:只查询必要的字段(id 和 payload_config),避免加载完整对象
|
|
existing_tasks = db.query(
|
|
CheckInTask.id,
|
|
CheckInTask.payload_config
|
|
).filter(
|
|
CheckInTask.user_id == user_id
|
|
).all()
|
|
|
|
for task_id, payload_config in existing_tasks:
|
|
try:
|
|
existing_payload = json.loads(payload_config)
|
|
if existing_payload.get('ThreadId') == thread_id:
|
|
logger.warning(f"⚠️ 任务创建冲突 - User: {user.alias}({user_id}), ThreadId: {thread_id}")
|
|
raise ValueError(
|
|
f"该接龙中已存在任务。ThreadId: {thread_id}"
|
|
)
|
|
except (json.JSONDecodeError, AttributeError, TypeError):
|
|
# 跳过无法解析的 payload_config
|
|
logger.debug(f"跳过无法解析的任务配置 - Task ID: {task_id}")
|
|
continue
|
|
|
|
# 4. 记录日志
|
|
task_name = task_data.name or f"接龙任务 {thread_id}"
|
|
logger.info(f"📝 用户 {user.alias}({user_id}) 正在创建任务: {task_name}")
|
|
|
|
# 5. 创建任务
|
|
task = CheckInTask(
|
|
user_id=user_id,
|
|
payload_config=task_data.payload_config,
|
|
name=task_data.name or task_name,
|
|
is_active=task_data.is_active if task_data.is_active is not None else True
|
|
)
|
|
|
|
try:
|
|
db.add(task)
|
|
db.commit()
|
|
db.refresh(task)
|
|
logger.info(f"✅ 任务创建成功 - ID: {task.id}, Name: {task.name}, ThreadId: {thread_id}")
|
|
|
|
# 如果任务启用且包含 cron_expression,立即添加到调度器
|
|
if task.is_scheduled_enabled:
|
|
TaskService._reload_scheduler_for_task(task, db)
|
|
|
|
return task
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"❌ 任务创建失败: {str(e)}")
|
|
raise ValueError(f"任务创建失败: {str(e)}")
|
|
|
|
@staticmethod
|
|
def get_task(task_id: int, db: Session) -> Optional[CheckInTask]:
|
|
"""
|
|
获取任务详情
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
db: 数据库会话
|
|
|
|
Returns:
|
|
任务对象或 None
|
|
"""
|
|
return db.query(CheckInTask).filter(CheckInTask.id == task_id).first()
|
|
|
|
@staticmethod
|
|
def enrich_task_with_check_in_info(task: CheckInTask, db: Session) -> dict:
|
|
"""
|
|
为任务添加最后一次打卡信息和 ThreadId
|
|
|
|
Args:
|
|
task: 任务对象
|
|
db: 数据库会话
|
|
|
|
Returns:
|
|
包含额外信息的任务字典
|
|
"""
|
|
# 获取最后一次打卡记录
|
|
last_record = db.query(CheckInRecord).filter(
|
|
CheckInRecord.task_id == task.id
|
|
).order_by(desc(CheckInRecord.check_in_time)).first()
|
|
|
|
# 从 payload_config 提取 ThreadId
|
|
thread_id = None
|
|
try:
|
|
payload = json.loads(str(task.payload_config))
|
|
thread_id = payload.get('ThreadId')
|
|
except (json.JSONDecodeError, AttributeError, TypeError):
|
|
logger.debug(f"无法从任务 {task.id} 的 payload_config 中提取 ThreadId")
|
|
pass
|
|
|
|
# 转换为字典并添加额外字段
|
|
task_dict = {
|
|
'id': task.id,
|
|
'user_id': task.user_id,
|
|
'payload_config': task.payload_config,
|
|
'name': task.name,
|
|
'is_active': task.is_active,
|
|
'cron_expression': task.cron_expression,
|
|
'is_scheduled_enabled': task.is_scheduled_enabled,
|
|
'created_at': task.created_at,
|
|
'updated_at': task.updated_at,
|
|
'thread_id': thread_id,
|
|
'last_check_in_time': last_record.check_in_time if last_record else None,
|
|
'last_check_in_status': last_record.status if last_record else None,
|
|
}
|
|
|
|
return task_dict
|
|
|
|
@staticmethod
|
|
def get_user_tasks(user_id: int, db: Session, include_inactive: bool = True) -> List[CheckInTask]:
|
|
"""
|
|
获取用户的所有任务
|
|
|
|
Args:
|
|
user_id: 用户 ID
|
|
db: 数据库会话
|
|
include_inactive: 是否包含未启用的任务
|
|
|
|
Returns:
|
|
任务列表
|
|
"""
|
|
query = db.query(CheckInTask).filter(CheckInTask.user_id == user_id)
|
|
|
|
if not include_inactive:
|
|
query = query.filter(CheckInTask.is_active == True)
|
|
|
|
return query.order_by(desc(CheckInTask.created_at)).all()
|
|
|
|
@staticmethod
|
|
def get_all_active_tasks(db: Session) -> List[CheckInTask]:
|
|
"""
|
|
获取所有启用的任务(用于定时打卡)
|
|
|
|
Args:
|
|
db: 数据库会话
|
|
|
|
Returns:
|
|
启用的任务列表
|
|
"""
|
|
return db.query(CheckInTask).filter(CheckInTask.is_active == True).all()
|
|
|
|
@staticmethod
|
|
def update_task(task_id: int, task_data: TaskUpdate, db: Session) -> Optional[CheckInTask]:
|
|
"""
|
|
更新任务
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
task_data: 更新数据
|
|
db: 数据库会话
|
|
|
|
Returns:
|
|
更新后的任务对象或 None
|
|
"""
|
|
task = db.query(CheckInTask).filter(CheckInTask.id == task_id).first()
|
|
|
|
if not task:
|
|
return None
|
|
|
|
# 更新字段
|
|
update_data = task_data.model_dump(exclude_unset=True)
|
|
|
|
# 检查是否更新了 cron_expression 或 is_active
|
|
cron_changed = 'cron_expression' in update_data
|
|
active_changed = 'is_active' in update_data
|
|
|
|
for field, value in update_data.items():
|
|
setattr(task, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(task)
|
|
|
|
logger.info(f"任务 {task_id} 已更新")
|
|
|
|
# 如果 cron_expression 或 is_active 发生变化,重新加载调度器
|
|
if cron_changed or active_changed:
|
|
TaskService._reload_scheduler_for_task(task, db)
|
|
|
|
return task
|
|
|
|
@staticmethod
|
|
def delete_task(task_id: int, db: Session) -> bool:
|
|
"""
|
|
删除任务
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
db: 数据库会话
|
|
|
|
Returns:
|
|
是否删除成功
|
|
"""
|
|
task = db.query(CheckInTask).filter(CheckInTask.id == task_id).first()
|
|
|
|
if not task:
|
|
return False
|
|
|
|
db.delete(task)
|
|
db.commit()
|
|
|
|
logger.info(f"任务 {task_id} 已删除")
|
|
|
|
# 从调度器中移除该任务
|
|
TaskService._remove_task_from_scheduler(task_id)
|
|
|
|
return True
|
|
|
|
@staticmethod
|
|
def toggle_task(task_id: int, db: Session) -> Optional[CheckInTask]:
|
|
"""
|
|
切换任务的启用状态
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
db: 数据库会话
|
|
|
|
Returns:
|
|
更新后的任务对象或 None
|
|
"""
|
|
task = db.query(CheckInTask).filter(CheckInTask.id == task_id).first()
|
|
|
|
if not task:
|
|
return None
|
|
|
|
task.is_active = not task.is_active
|
|
db.commit()
|
|
db.refresh(task)
|
|
|
|
logger.info(f"任务 {task_id} 状态已切换为: {'启用' if task.is_active else '禁用'}")
|
|
|
|
# 重新加载调度器
|
|
TaskService._reload_scheduler_for_task(task, db)
|
|
|
|
return task
|
|
|
|
@staticmethod
|
|
def get_task_records(task_id: int, db: Session, limit: int = 50) -> List[CheckInRecord]:
|
|
"""
|
|
获取任务的打卡记录
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
db: 数据库会话
|
|
limit: 返回记录数量限制
|
|
|
|
Returns:
|
|
打卡记录列表
|
|
"""
|
|
return (
|
|
db.query(CheckInRecord)
|
|
.filter(CheckInRecord.task_id == task_id)
|
|
.order_by(desc(CheckInRecord.check_in_time))
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
@staticmethod
|
|
def verify_task_ownership(task_id: int, user_id: int, db: Session) -> bool:
|
|
"""
|
|
验证任务是否属于指定用户
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
user_id: 用户 ID
|
|
db: 数据库会话
|
|
|
|
Returns:
|
|
是否属于该用户
|
|
"""
|
|
task = db.query(CheckInTask).filter(
|
|
CheckInTask.id == task_id,
|
|
CheckInTask.user_id == user_id
|
|
).first()
|
|
|
|
return task is not None
|
|
|
|
@staticmethod
|
|
def _reload_scheduler_for_task(task: CheckInTask, db: Session):
|
|
"""
|
|
重新加载指定任务到调度器
|
|
|
|
Args:
|
|
task: 任务对象
|
|
db: 数据库会话
|
|
"""
|
|
try:
|
|
from backend.services.scheduler_service import scheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from croniter import croniter
|
|
|
|
if not scheduler:
|
|
logger.warning(f"调度器未启动,无法加载任务 {task.id}")
|
|
return
|
|
|
|
job_id = f"task_{task.id}"
|
|
|
|
# 先移除旧的任务(如果存在)
|
|
existing_job = scheduler.get_job(job_id)
|
|
if existing_job:
|
|
scheduler.remove_job(job_id)
|
|
logger.info(f"从调度器移除旧任务: {job_id}")
|
|
|
|
# 如果任务启用且有有效的 cron 表达式,添加新任务
|
|
if task.is_scheduled_enabled:
|
|
cron_str = str(task.cron_expression)
|
|
if croniter.is_valid(cron_str):
|
|
from backend.services.scheduler_service import scheduled_check_in_task
|
|
|
|
scheduler.add_job(
|
|
func=scheduled_check_in_task,
|
|
trigger=CronTrigger.from_crontab(cron_str),
|
|
id=job_id,
|
|
name=f"CheckIn-Task-{task.id}",
|
|
args=[task.id],
|
|
replace_existing=True
|
|
)
|
|
logger.info(f"✅ 任务 {task.id} 已重新加载到调度器: {cron_str}")
|
|
else:
|
|
logger.warning(f"任务 {task.id} 的 cron 表达式无效: {cron_str}")
|
|
else:
|
|
logger.info(f"任务 {task.id} 未启用或无 cron 表达式,已从调度器移除")
|
|
|
|
except Exception as e:
|
|
logger.error(f"重新加载任务 {task.id} 到调度器失败: {str(e)}")
|
|
|
|
@staticmethod
|
|
def _remove_task_from_scheduler(task_id: int):
|
|
"""
|
|
从调度器中移除指定任务
|
|
|
|
Args:
|
|
task_id: 任务 ID
|
|
"""
|
|
try:
|
|
from backend.services.scheduler_service import scheduler
|
|
|
|
if not scheduler:
|
|
return
|
|
|
|
job_id = f"task_{task_id}"
|
|
if scheduler.get_job(job_id):
|
|
scheduler.remove_job(job_id)
|
|
logger.info(f"✅ 任务 {task_id} 已从调度器移除")
|
|
|
|
except Exception as e:
|
|
logger.error(f"从调度器移除任务 {task_id} 失败: {str(e)}")
|