from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from typing import List from datetime import datetime, timedelta from pydantic import BaseModel, Field from backend.exceptions import BaseAPIException from backend.models import get_db, User from backend.schemas.task import TaskUpdate, TaskResponse from backend.services.task_service import TaskService from backend.dependencies import get_current_user router = APIRouter() class CronValidateRequest(BaseModel): """Cron 表达式验证请求""" cron_expression: str = Field(..., min_length=9, description="Crontab 表达式") # create_task_from_template: 已在 templates.py 中定义 @router.get("/", response_model=List[TaskResponse], summary="获取当前用户的任务列表") async def get_tasks( include_inactive: bool = True, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """ 获取当前用户的所有打卡任务 - **include_inactive**: 是否包含未启用的任务(默认 true) """ try: tasks = TaskService.get_user_tasks(current_user.id, db, include_inactive) # 为每个任务添加额外信息 enriched_tasks = [TaskService.enrich_task_with_check_in_info(task, db) for task in tasks] return enriched_tasks except (BaseAPIException, HTTPException): raise except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"获取任务列表失败: {str(e)}" ) @router.get("/{task_id}", response_model=TaskResponse, summary="获取任务详情") async def get_task( task_id: int, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """ 获取指定任务的详情 需要验证任务属于当前用户 """ from backend.models import CheckInTask from backend.utils.db_helpers import get_owned_or_403 # 获取任务并验证归属 task = get_owned_or_403(CheckInTask, task_id, current_user.id, db) # type: ignore return task @router.put("/{task_id}", response_model=TaskResponse, summary="更新任务") async def update_task( task_id: int, task_data: TaskUpdate, current_user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """ 更新指定任务的信息 需要验证任务属于当前用户 """ from backend.models import CheckInTask from backend.utils.db_helpers import get_owned_or_403 # 验证任务归属并获取任务 get_owned_or_403(CheckInTask, task_id, current_user.id, db) # type: ignore task = TaskService.update_task(task_id, task_data, db) if not task: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="任务不存在") return task @router.delete("/{task_id}", status_code=status.HTTP_204_NO_CONTENT, summary="删除任务") async def delete_task( task_id: int, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """ 删除指定任务 需要验证任务属于当前用户,删除后会同时删除所有关联的打卡记录 """ from backend.models import CheckInTask from backend.utils.db_helpers import get_owned_or_403 # 验证任务归属 get_owned_or_403(CheckInTask, task_id, current_user.id, db) # type: ignore success = TaskService.delete_task(task_id, db) if not success: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="任务不存在") @router.post("/{task_id}/toggle", response_model=TaskResponse, summary="切换任务启用状态") async def toggle_task( task_id: int, current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): """ 切换任务的启用/禁用状态 需要验证任务属于当前用户 """ from backend.models import CheckInTask from backend.utils.db_helpers import get_owned_or_403 # 验证任务归属 get_owned_or_403(CheckInTask, task_id, current_user.id, db) # type: ignore task = TaskService.toggle_task(task_id, db) if not task: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="任务不存在") return task @router.post("/validate-cron", summary="验证 Crontab 表达式") async def validate_cron_expression(request: CronValidateRequest): """ 验证 Crontab 表达式并预览下一个执行时间 请求体: {"cron_expression": "0 20 * * *"} 返回: { "valid": true, "message": "有效的 Crontab 表达式", "next_times": [ "2024-01-02 20:00:00", "2024-01-03 20:00:00", ... ], "description": "每天 20:00" } """ cron_expr = request.cron_expression.strip() if not cron_expr: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="cron_expression 是必需的" ) try: from croniter import croniter if not croniter.is_valid(cron_expr): raise ValueError("无效的格式") # 生成接下来的 5 个执行时间 cron = croniter(cron_expr, datetime.now()) next_times = [cron.get_next(datetime).strftime("%Y-%m-%d %H:%M:%S") for _ in range(5)] return { "valid": True, "message": "有效的 Crontab 表达式", "next_times": next_times, "description": generate_cron_description(cron_expr), } except HTTPException: raise except Exception as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"无效的 Crontab 表达式: {str(e)}" ) def generate_cron_description(cron_expr: str) -> str: """生成 Crontab 表达式的人类可读描述""" parts = cron_expr.split() if len(parts) != 5: return cron_expr minute, hour, day, month, dow = parts descriptions = [] if hour == "*" and minute == "*": descriptions.append("每分钟") elif hour == "*": descriptions.append(f"每小时的第 {minute} 分钟") elif day == "*" and month == "*" and dow == "*": descriptions.append(f"每天 {hour}:{minute:0>2}") else: descriptions.append(f"复杂的时间表: {cron_expr}") return ", ".join(descriptions)