From e842a9bc1dd8295ef11c4b27f796d2af2705ba4d Mon Sep 17 00:00:00 2001 From: Cccc_ Date: Wed, 14 Jan 2026 21:42:32 +0800 Subject: [PATCH] refactor: split some logic into util func --- backend/api/admin.py | 4 +- backend/api/check_in.py | 9 +- backend/api/tasks.py | 46 +++----- backend/schemas/task.py | 33 +++--- backend/services/auth_service.py | 83 +++++++-------- backend/services/check_in_service.py | 25 +---- backend/services/email_service.py | 10 +- backend/services/scheduler_service.py | 7 +- backend/services/task_service.py | 44 +++----- backend/utils/db_helpers.py | 123 +++++++++++++++++++++ backend/utils/json_helpers.py | 103 ++++++++++++++++++ backend/utils/time_helpers.py | 148 ++++++++++++++++++++++++++ backend/workers/check_in_worker.py | 26 ++--- backend/workers/token_refresher.py | 14 ++- 14 files changed, 500 insertions(+), 175 deletions(-) create mode 100644 backend/utils/db_helpers.py create mode 100644 backend/utils/json_helpers.py create mode 100644 backend/utils/time_helpers.py diff --git a/backend/api/admin.py b/backend/api/admin.py index 1565d85..2dabd77 100644 --- a/backend/api/admin.py +++ b/backend/api/admin.py @@ -189,7 +189,9 @@ async def get_system_stats( # Token 即将过期的用户数(7天内) # 使用 SQL 直接查询,避免 N+1 问题 - current_timestamp = int(datetime.now().timestamp()) + from backend.utils.time_helpers import now_timestamp + + current_timestamp = now_timestamp() expiring_soon_timestamp = current_timestamp + (7 * 24 * 60 * 60) # 7天后 # 直接在数据库层面筛选即将过期的用户 diff --git a/backend/api/check_in.py b/backend/api/check_in.py index fac50f7..a3ec232 100644 --- a/backend/api/check_in.py +++ b/backend/api/check_in.py @@ -66,13 +66,10 @@ async def get_check_in_record_status( 返回状态:pending(进行中)、success(成功)、failure(失败) """ + from backend.utils.db_helpers import get_or_404 + # 获取打卡记录 - record = db.query(CheckInRecord).filter(CheckInRecord.id == record_id).first() - if not record: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="打卡记录不存在" - ) + record = get_or_404(CheckInRecord, record_id, db, "打卡记录不存在") # 验证记录归属(通过任务归属) if not TaskService.verify_task_ownership(record.task_id, current_user.id, db): diff --git a/backend/api/tasks.py b/backend/api/tasks.py index 975de47..1d94004 100644 --- a/backend/api/tasks.py +++ b/backend/api/tasks.py @@ -52,20 +52,11 @@ async def get_task( 需要验证任务属于当前用户 """ - # 验证任务归属 - if not TaskService.verify_task_ownership(task_id, current_user.id, db): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="无权访问此任务" - ) + from backend.models import CheckInTask + from backend.utils.db_helpers import get_owned_or_403 - task = TaskService.get_task(task_id, db) - - if not task: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="任务不存在" - ) + # 获取任务并验证归属 + task = get_owned_or_403(CheckInTask, task_id, current_user.id, db) # type: ignore return task @@ -82,12 +73,11 @@ async def update_task( 需要验证任务属于当前用户 """ - # 验证任务归属 - if not TaskService.verify_task_ownership(task_id, current_user.id, db): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="无权访问此任务" - ) + from backend.models import CheckInTask + from backend.utils.db_helpers import get_owned_or_403 + + # 验证任务归属并获取任务 + get_owned_or_403(CheckInTask, task_id, current_user.id, db) # type: ignore task = TaskService.update_task(task_id, task_data, db) @@ -111,12 +101,11 @@ async def delete_task( 需要验证任务属于当前用户,删除后会同时删除所有关联的打卡记录 """ + from backend.models import CheckInTask + from backend.utils.db_helpers import get_owned_or_403 + # 验证任务归属 - if not TaskService.verify_task_ownership(task_id, current_user.id, db): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="无权访问此任务" - ) + get_owned_or_403(CheckInTask, task_id, current_user.id, db) # type: ignore success = TaskService.delete_task(task_id, db) @@ -138,12 +127,11 @@ async def toggle_task( 需要验证任务属于当前用户 """ + from backend.models import CheckInTask + from backend.utils.db_helpers import get_owned_or_403 + # 验证任务归属 - if not TaskService.verify_task_ownership(task_id, current_user.id, db): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="无权访问此任务" - ) + get_owned_or_403(CheckInTask, task_id, current_user.id, db) # type: ignore task = TaskService.toggle_task(task_id, db) diff --git a/backend/schemas/task.py b/backend/schemas/task.py index 2cd36b3..f105827 100644 --- a/backend/schemas/task.py +++ b/backend/schemas/task.py @@ -1,6 +1,5 @@ from datetime import datetime from typing import Optional -import json from pydantic import BaseModel, Field, field_validator @@ -16,25 +15,23 @@ class TaskBase(BaseModel): """ 验证 payload_config 是否为有效的 JSON,并且包含必需的 ThreadId 字段 """ + from backend.utils.json_helpers import safe_parse_json, extract_thread_id + if not v or not v.strip(): raise ValueError("payload_config 不能为空") - try: - payload = json.loads(v) - except json.JSONDecodeError as e: - raise ValueError(f"payload_config 必须是有效的 JSON 格式: {str(e)}") + payload = safe_parse_json(v) + if payload is None: + raise ValueError("payload_config 必须是有效的 JSON 格式") # 检查是否为字典类型 if not isinstance(payload, dict): raise ValueError("payload_config 必须是 JSON 对象(字典)") # 检查必需字段 ThreadId - if 'ThreadId' not in payload: - raise ValueError("payload_config 必须包含 ThreadId 字段") - - thread_id = payload.get('ThreadId') + thread_id = extract_thread_id(v) if not thread_id or not str(thread_id).strip(): - raise ValueError("ThreadId 不能为空") + raise ValueError("payload_config 必须包含有效的 ThreadId 字段") return v @@ -84,28 +81,26 @@ class TaskUpdate(BaseModel): """ 验证 payload_config 是否为有效的 JSON(如果提供的话) """ + from backend.utils.json_helpers import safe_parse_json, extract_thread_id + if v is None: return v if not v.strip(): raise ValueError("payload_config 不能为空字符串") - try: - payload = json.loads(v) - except json.JSONDecodeError as e: - raise ValueError(f"payload_config 必须是有效的 JSON 格式: {str(e)}") + payload = safe_parse_json(v) + if payload is None: + raise ValueError("payload_config 必须是有效的 JSON 格式") # 检查是否为字典类型 if not isinstance(payload, dict): raise ValueError("payload_config 必须是 JSON 对象(字典)") # 检查必需字段 ThreadId - if 'ThreadId' not in payload: - raise ValueError("payload_config 必须包含 ThreadId 字段") - - thread_id = payload.get('ThreadId') + thread_id = extract_thread_id(v) if not thread_id or not str(thread_id).strip(): - raise ValueError("ThreadId 不能为空") + raise ValueError("payload_config 必须包含有效的 ThreadId 字段") return v diff --git a/backend/services/auth_service.py b/backend/services/auth_service.py index 65d488b..d513e26 100644 --- a/backend/services/auth_service.py +++ b/backend/services/auth_service.py @@ -381,6 +381,14 @@ class AuthService: Returns: 包含打卡 token 验证结果的字典 """ + from backend.utils.time_helpers import ( + parse_jwt_exp, + is_timestamp_expired, + days_until_expiry, + minutes_until_expiry, + seconds_until_expiry + ) + # 检查是否有 authorization token if not user.authorization or user.authorization == "": return { @@ -389,52 +397,42 @@ class AuthService: "reason": "no_token" } - # 检查 Token 是否过期 - if not user.jwt_exp or user.jwt_exp == "0": + # 解析 jwt_exp + exp_timestamp = parse_jwt_exp(user.jwt_exp) + if not exp_timestamp: return { "is_valid": False, "message": "打卡凭证无效", "reason": "invalid_expiry" } - try: - exp_timestamp = int(user.jwt_exp) - current_timestamp = int(datetime.now().timestamp()) - - if current_timestamp > exp_timestamp: - days_expired = (current_timestamp - exp_timestamp) // 86400 - return { - "is_valid": False, - "message": f"打卡凭证已过期 {days_expired} 天", - "reason": "expired", - "days_expired": days_expired - } - - # 计算剩余时间 - seconds_remaining = exp_timestamp - current_timestamp - days_remaining = seconds_remaining // 86400 - minutes_remaining = seconds_remaining // 60 - - # 判断是否即将过期(30分钟内) - expiring_soon = minutes_remaining <= 30 - - return { - "is_valid": True, - "message": "打卡凭证有效", - "days_remaining": days_remaining, - "minutes_remaining": minutes_remaining, - "expiring_soon": expiring_soon, - "expires_at": exp_timestamp - } - - except ValueError: - logger.error(f"用户 {user.id} 的 jwt_exp 格式不正确: {user.jwt_exp}") + # 检查是否过期 + if is_timestamp_expired(exp_timestamp): + days_expired = abs(days_until_expiry(exp_timestamp)) return { "is_valid": False, - "message": "打卡凭证格式错误", - "reason": "invalid_format" + "message": f"打卡凭证已过期 {days_expired} 天", + "reason": "expired", + "days_expired": days_expired } + # Token 有效,计算剩余时间 + seconds_remaining = seconds_until_expiry(exp_timestamp) + days_remaining = days_until_expiry(exp_timestamp) + minutes_remaining = minutes_until_expiry(exp_timestamp) + + # 判断是否即将过期(30分钟内) + expiring_soon = minutes_remaining <= 30 + + return { + "is_valid": True, + "message": "打卡凭证有效", + "days_remaining": days_remaining, + "minutes_remaining": minutes_remaining, + "expiring_soon": expiring_soon, + "expires_at": exp_timestamp + } + @staticmethod def alias_login(alias: str, password: str, db: Session) -> Dict[str, Any]: """ @@ -532,15 +530,12 @@ class AuthService: token_warning = "token_invalid" else: # 检查 Token 是否过期 - try: - exp_timestamp = int(user.jwt_exp) - current_timestamp = int(datetime.now().timestamp()) + from backend.utils.time_helpers import parse_jwt_exp, is_timestamp_expired - if current_timestamp > exp_timestamp: - logger.info(f"用户 {alias} Token 已过期,允许密码登录但需提示用户更新") - token_warning = "token_expired" - except ValueError: - logger.error(f"用户 {user.id} 的 jwt_exp 格式不正确: {user.jwt_exp}") + exp_timestamp = parse_jwt_exp(user.jwt_exp) + if exp_timestamp and is_timestamp_expired(exp_timestamp): + logger.info(f"用户 {alias} Token 已过期,允许密码登录但需提示用户更新") + token_warning = "token_expired" # 登录成功 logger.info(f"✅ 用户 {alias} (ID: {user.id}) 别名登录成功") diff --git a/backend/services/check_in_service.py b/backend/services/check_in_service.py index 0442106..b09b75d 100644 --- a/backend/services/check_in_service.py +++ b/backend/services/check_in_service.py @@ -2,7 +2,6 @@ import logging from typing import List, Dict, Any, Optional from datetime import datetime from sqlalchemy.orm import Session -import json import threading from backend.models import User, CheckInTask, CheckInRecord @@ -34,20 +33,10 @@ class CheckInService: try: from backend.services.email_service import EmailService + from backend.utils.json_helpers import build_task_info - # 构建 task_info - task_info = { - 'thread_id': '未知', - 'name': task.name or f'Task-{task.id}' - } - - # 尝试从 payload_config 中获取 ThreadId - if task.payload_config: - try: - payload = json.loads(task.payload_config) - task_info['thread_id'] = payload.get('ThreadId', '未知') - except (json.JSONDecodeError, KeyError): - pass + # 使用辅助函数构建 task_info + task_info = build_task_info(task) # 发送打卡失败通知(内容包含 Token 失效说明和刷新指引) EmailService.notify_check_in_result(user, task_info, False, "Token 已失效,需要重新授权") @@ -553,12 +542,8 @@ class CheckInService: task_name = task.name # 从 payload_config 提取 ThreadId - try: - payload = json.loads(str(task.payload_config)) - thread_id = payload.get('ThreadId') - except (json.JSONDecodeError, KeyError, TypeError, AttributeError) as e: - logger.debug(f"解析任务 {task.id} 的 payload_config 失败: {e}") - pass + from backend.utils.json_helpers import extract_thread_id + thread_id = extract_thread_id(task.payload_config) # type: ignore # 转换为字典并添加额外字段 record_dict = { diff --git a/backend/services/email_service.py b/backend/services/email_service.py index 2085629..fe6c6d6 100644 --- a/backend/services/email_service.py +++ b/backend/services/email_service.py @@ -428,12 +428,10 @@ class EmailService: return False # 计算剩余时间 - try: - exp_timestamp = int(jwt_exp) - current_timestamp = int(datetime.now().timestamp()) - minutes_left = (exp_timestamp - current_timestamp) // 60 - except ValueError: - minutes_left = 0 + from backend.utils.time_helpers import parse_jwt_exp, minutes_until_expiry + + exp_timestamp = parse_jwt_exp(jwt_exp) + minutes_left = minutes_until_expiry(exp_timestamp) if exp_timestamp else 0 # 构建邮件内容 subject = f"【接龙自动打卡系统】登录凭证即将过期 - {user.alias}" diff --git a/backend/services/scheduler_service.py b/backend/services/scheduler_service.py index 5861738..a13b1dc 100644 --- a/backend/services/scheduler_service.py +++ b/backend/services/scheduler_service.py @@ -1,7 +1,6 @@ import logging import os import time -from datetime import datetime from pathlib import Path from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger @@ -154,6 +153,8 @@ def check_token_expiration(): 检查所有用户的打卡 authorization token,如果在 30 分钟内过期,发送提醒邮件 注意:检查的是打卡业务 token,不是网站登录 JWT token """ + from backend.utils.time_helpers import seconds_until_expiry + logger.info("Scheduler: 正在执行打卡 Token 过期检查...") try: @@ -165,8 +166,6 @@ def check_token_expiration(): from backend.services.auth_service import AuthService users = db.query(User).all() - current_timestamp = int(datetime.now().timestamp()) - notified_count = 0 for user in users: @@ -178,7 +177,7 @@ def check_token_expiration(): if not exp_timestamp: continue - time_until_expiry = exp_timestamp - current_timestamp + time_until_expiry = seconds_until_expiry(exp_timestamp) # 情况1:Token 即将过期(过期前 30 分钟内,且还未过期) if 0 < time_until_expiry < 1800: # 30分钟 = 1800秒 diff --git a/backend/services/task_service.py b/backend/services/task_service.py index fbae9ad..63f6696 100644 --- a/backend/services/task_service.py +++ b/backend/services/task_service.py @@ -2,7 +2,6 @@ import logging from typing import List, Optional, Dict, Any from sqlalchemy.orm import Session from sqlalchemy import desc -import json from backend.models import User, CheckInTask, CheckInRecord from backend.schemas.task import TaskCreate, TaskUpdate @@ -34,35 +33,26 @@ class TaskService: raise ValueError(f"用户 ID {user_id} 不存在") # 2. 从 payload_config 中提取 ThreadId 用于唯一性校验 - try: - payload = json.loads(task_data.payload_config) - thread_id = payload.get('ThreadId') - if not thread_id: - raise ValueError("payload_config 中缺少 ThreadId") - except json.JSONDecodeError: - raise ValueError("payload_config 格式错误,必须是有效的 JSON") + from backend.utils.json_helpers import safe_parse_payload, extract_thread_id + + payload = safe_parse_payload(task_data.payload_config) + thread_id = payload.get('ThreadId') + if not thread_id: + raise ValueError("payload_config 中缺少 ThreadId") # 3. 验证唯一性:同一用户在同一个接龙中不能有重复的任务 - # 优化:只查询必要的字段(id 和 payload_config),避免加载完整对象 existing_tasks = db.query( - CheckInTask.id, CheckInTask.payload_config ).filter( CheckInTask.user_id == user_id ).all() - for task_id, payload_config in existing_tasks: - try: - existing_payload = json.loads(payload_config) - if existing_payload.get('ThreadId') == thread_id: - logger.warning(f"⚠️ 任务创建冲突 - User: {user.alias}({user_id}), ThreadId: {thread_id}") - raise ValueError( - f"该接龙中已存在任务。ThreadId: {thread_id}" - ) - except (json.JSONDecodeError, AttributeError, TypeError): - # 跳过无法解析的 payload_config - logger.debug(f"跳过无法解析的任务配置 - Task ID: {task_id}") - continue + for (payload_config,) in existing_tasks: + existing_thread_id = extract_thread_id(payload_config) + # extract_thread_id 已处理异常,失败时返回 None + if existing_thread_id and existing_thread_id == thread_id: + logger.warning(f"⚠️ 任务创建冲突 - User: {user.alias}({user_id}), ThreadId: {thread_id}") + raise ValueError(f"该接龙中已存在任务。ThreadId: {thread_id}") # 4. 记录日志 task_name = task_data.name or f"接龙任务 {thread_id}" @@ -118,19 +108,15 @@ class TaskService: Returns: 包含额外信息的任务字典 """ + from backend.utils.json_helpers import extract_thread_id + # 获取最后一次打卡记录 last_record = db.query(CheckInRecord).filter( CheckInRecord.task_id == task.id ).order_by(desc(CheckInRecord.check_in_time)).first() # 从 payload_config 提取 ThreadId - thread_id = None - try: - payload = json.loads(str(task.payload_config)) - thread_id = payload.get('ThreadId') - except (json.JSONDecodeError, AttributeError, TypeError): - logger.debug(f"无法从任务 {task.id} 的 payload_config 中提取 ThreadId") - pass + thread_id = extract_thread_id(task.payload_config) # type: ignore # 转换为字典并添加额外字段 task_dict = { diff --git a/backend/utils/db_helpers.py b/backend/utils/db_helpers.py new file mode 100644 index 0000000..383aefc --- /dev/null +++ b/backend/utils/db_helpers.py @@ -0,0 +1,123 @@ +""" +数据库操作辅助函数 + +提供统一的资源查询、权限验证等通用功能 +""" +from typing import TypeVar, Type, Optional, Any +from sqlalchemy.orm import Session +from fastapi import HTTPException, status + +T = TypeVar('T') + + +def get_or_404( + model: Type[T], + model_id: int, + db: Session, + error_message: Optional[str] = None +) -> T: + """ + 查询资源,不存在则抛出 404 + + Args: + model: SQLAlchemy 模型类 + model_id: 资源 ID + db: 数据库会话 + error_message: 自定义错误消息 + + Returns: + 查询到的资源对象 + + Raises: + HTTPException: 404 资源不存在 + """ + obj = db.query(model).filter(model.id == model_id).first() + if not obj: + default_message = f"{model.__name__}不存在" + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=error_message or default_message + ) + return obj + + +def get_owned_or_403( + model: Type[T], + model_id: int, + user_id: int, + db: Session, + error_message: Optional[str] = None +) -> T: + """ + 查询资源并验证归属,否则抛出 403 + + Args: + model: SQLAlchemy 模型类(必须有 user_id 字段) + model_id: 资源 ID + user_id: 当前用户 ID + db: 数据库会话 + error_message: 自定义错误消息 + + Returns: + 查询到的资源对象 + + Raises: + HTTPException: 403 无权访问此资源 + """ + obj = db.query(model).filter( + model.id == model_id, + model.user_id == user_id + ).first() + + if not obj: + # 先检查资源是否存在 + exists = db.query(model).filter(model.id == model_id).first() + if not exists: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"{model.__name__}不存在" + ) + # 资源存在但不属于当前用户 + default_message = f"无权访问此{model.__name__}" + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=error_message or default_message + ) + + return obj + + +def get_by_field_or_404( + model: Type[T], + field_name: str, + field_value: Any, + db: Session, + error_message: Optional[str] = None +) -> T: + """ + 根据字段查询资源,不存在则抛出 404 + + Args: + model: SQLAlchemy 模型类 + field_name: 字段名 + field_value: 字段值 + db: 数据库会话 + error_message: 自定义错误消息 + + Returns: + 查询到的资源对象 + + Raises: + HTTPException: 404 资源不存在 + """ + obj = db.query(model).filter( + getattr(model, field_name) == field_value + ).first() + + if not obj: + default_message = f"{model.__name__}不存在" + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=error_message or default_message + ) + return obj diff --git a/backend/utils/json_helpers.py b/backend/utils/json_helpers.py new file mode 100644 index 0000000..f8efe8b --- /dev/null +++ b/backend/utils/json_helpers.py @@ -0,0 +1,103 @@ +""" +JSON 处理辅助函数 + +提供安全的 JSON 解析和数据提取功能 +""" +import json +import logging +from typing import Optional, Any, Dict + +logger = logging.getLogger(__name__) + + +def safe_parse_json( + json_str: Optional[str], + default: Any = None, + log_error: bool = True +) -> Any: + """ + 安全解析 JSON 字符串,失败时返回默认值 + + Args: + json_str: JSON 字符串 + default: 解析失败时的默认值 + log_error: 是否记录解析错误日志 + + Returns: + 解析后的对象,失败时返回 default + """ + if not json_str: + return default + + try: + return json.loads(str(json_str)) + except (json.JSONDecodeError, AttributeError, TypeError) as e: + if log_error: + logger.debug(f"JSON 解析失败: {str(e)}, 原始数据: {json_str[:100]}...") + return default + + +def safe_parse_payload( + payload_config: Optional[str], + default: Optional[Dict] = None +) -> Dict: + """ + 安全解析 payload_config,失败时返回默认字典 + + Args: + payload_config: payload 配置字符串 + default: 解析失败时的默认值 + + Returns: + 解析后的字典 + """ + result = safe_parse_json(payload_config, default or {}) + # 确保返回值是字典类型 + if not isinstance(result, dict): + logger.warning(f"payload_config 不是字典类型: {type(result)}") + return default or {} + return result + + +def extract_thread_id(payload_config: Optional[str]) -> Optional[str]: + """ + 从 payload_config 中提取 ThreadId + + Args: + payload_config: payload 配置字符串 + + Returns: + ThreadId 或 None + """ + payload = safe_parse_payload(payload_config) + return payload.get('ThreadId') + + +def extract_signature(payload_config: Optional[str]) -> Optional[str]: + """ + 从 payload_config 中提取 Signature + + Args: + payload_config: payload 配置字符串 + + Returns: + Signature 或 None + """ + payload = safe_parse_payload(payload_config) + return payload.get('Signature') + + +def build_task_info(task) -> Dict[str, str]: + """ + 从 task 对象构建 task_info 字典(用于邮件通知等场景) + + Args: + task: CheckInTask 对象 + + Returns: + 包含 thread_id 和 name 的字典 + """ + return { + 'thread_id': extract_thread_id(getattr(task, 'payload_config', None)) or '未知', + 'name': getattr(task, 'name', None) or f'Task-{getattr(task, "id", "Unknown")}' + } diff --git a/backend/utils/time_helpers.py b/backend/utils/time_helpers.py new file mode 100644 index 0000000..ef1823a --- /dev/null +++ b/backend/utils/time_helpers.py @@ -0,0 +1,148 @@ +""" +时间处理辅助函数 + +提供统一的时间戳处理和格式化功能 +""" +from datetime import datetime, timedelta +from typing import Optional + + +def now_timestamp() -> int: + """ + 获取当前时间戳(秒) + + Returns: + 当前时间戳 + """ + return int(datetime.now().timestamp()) + + +def is_timestamp_expired(timestamp: int) -> bool: + """ + 检查时间戳是否已过期 + + Args: + timestamp: 时间戳(秒) + + Returns: + 是否已过期 + """ + return now_timestamp() > timestamp + + +def seconds_until_expiry(timestamp: int) -> int: + """ + 计算距离过期的秒数(负数表示已过期) + + Args: + timestamp: 时间戳(秒) + + Returns: + 距离过期的秒数 + """ + return timestamp - now_timestamp() + + +def days_until_expiry(timestamp: int) -> int: + """ + 计算距离过期的天数(负数表示已过期) + + Args: + timestamp: 时间戳(秒) + + Returns: + 距离过期的天数 + """ + seconds = seconds_until_expiry(timestamp) + return seconds // 86400 + + +def hours_until_expiry(timestamp: int) -> int: + """ + 计算距离过期的小时数(负数表示已过期) + + Args: + timestamp: 时间戳(秒) + + Returns: + 距离过期的小时数 + """ + seconds = seconds_until_expiry(timestamp) + return seconds // 3600 + + +def minutes_until_expiry(timestamp: int) -> int: + """ + 计算距离过期的分钟数(负数表示已过期) + + Args: + timestamp: 时间戳(秒) + + Returns: + 距离过期的分钟数 + """ + seconds = seconds_until_expiry(timestamp) + return seconds // 60 + + +def format_timestamp(timestamp: int, format_str: str = '%Y-%m-%d %H:%M:%S') -> str: + """ + 格式化时间戳为人类可读格式 + + Args: + timestamp: 时间戳(秒) + format_str: 时间格式字符串 + + Returns: + 格式化后的时间字符串 + """ + dt = datetime.fromtimestamp(timestamp) + return dt.strftime(format_str) + + +def format_expiry_time(timestamp: int) -> str: + """ + 格式化过期时间为人类可读格式(带中文说明) + + Args: + timestamp: 时间戳(秒) + + Returns: + 格式化后的时间字符串,如 "2024-01-01 12:00:00 (已过期 2 天)" + """ + formatted_time = format_timestamp(timestamp) + days = days_until_expiry(timestamp) + + if days > 0: + return f"{formatted_time} (还剩 {days} 天)" + elif days == 0: + hours = hours_until_expiry(timestamp) + if hours > 0: + return f"{formatted_time} (还剩 {hours} 小时)" + else: + minutes = minutes_until_expiry(timestamp) + if minutes > 0: + return f"{formatted_time} (还剩 {minutes} 分钟)" + else: + return f"{formatted_time} (即将过期)" + else: + return f"{formatted_time} (已过期 {abs(days)} 天)" + + +def parse_jwt_exp(jwt_exp: Optional[str]) -> Optional[int]: + """ + 解析 jwt_exp 字段为时间戳 + + Args: + jwt_exp: jwt_exp 字符串(可能是 "0" 或数字字符串) + + Returns: + 时间戳,无效时返回 None + """ + if not jwt_exp or jwt_exp == "0": + return None + + try: + return int(jwt_exp) + except (ValueError, TypeError): + return None diff --git a/backend/workers/check_in_worker.py b/backend/workers/check_in_worker.py index 67cdd09..a8d78ae 100644 --- a/backend/workers/check_in_worker.py +++ b/backend/workers/check_in_worker.py @@ -132,12 +132,10 @@ def perform_check_in(task, user_token: str) -> Dict[str, Any]: - error_message: 错误信息 """ # 从 payload_config 中提取 Signature 用于日志 - try: - payload_dict = json.loads(task.payload_config) if task.payload_config else {} - signature = payload_dict.get('Signature', 'Unknown') - except (json.JSONDecodeError, KeyError, TypeError, AttributeError) as e: - logger.debug(f"解析任务 {task.id} 的 payload_config 失败: {e}") - signature = 'Unknown' + from backend.utils.json_helpers import safe_parse_payload, extract_signature + + payload_dict = safe_parse_payload(task.payload_config) + signature = extract_signature(task.payload_config) or 'Unknown' logger.info(f"Selenium打卡: 正在为任务 ID: {task.id} (Signature: {signature}) 执行打卡...") @@ -165,9 +163,12 @@ def perform_check_in(task, user_token: str) -> Dict[str, Any]: try: # 使用任务的 payload_config(从模板生成的完整配置,包含 ThreadId) - payload = json.loads(task.payload_config) if task.payload_config else {} + from backend.utils.json_helpers import safe_parse_payload, extract_thread_id - if not payload.get('ThreadId'): + payload = safe_parse_payload(task.payload_config) + thread_id = extract_thread_id(task.payload_config) + + if not thread_id: error_msg = f"任务 ID: {task.id} 的 payload_config 缺少 ThreadId" logger.error(error_msg) return { @@ -261,10 +262,11 @@ def perform_check_in(task, user_token: str) -> Dict[str, Any]: if task.user and task.user.email: try: from backend.services.email_service import EmailService - task_info = { - 'thread_id': payload.get('ThreadId', '未知'), - 'name': getattr(task, 'name', '打卡任务') - } + from backend.utils.json_helpers import build_task_info + + # 使用辅助函数构建 task_info(从 task 对象提取信息) + task_info = build_task_info(task) + # 只发送打卡失败通知(内容已说明Token失效) EmailService.notify_check_in_result(task.user, task_info, False, "Token 已失效,需要重新授权") except Exception as e: diff --git a/backend/workers/token_refresher.py b/backend/workers/token_refresher.py index a3d4e6e..b9e8187 100644 --- a/backend/workers/token_refresher.py +++ b/backend/workers/token_refresher.py @@ -59,9 +59,10 @@ def get_session_status(session_id: str) -> str: content = f.read() if not content: return None - data = json.loads(content) + from backend.utils.json_helpers import safe_parse_json + data = safe_parse_json(content, {}) return data.get('status') - except (IOError, json.JSONDecodeError) as e: + except IOError as e: logger.error(f"读取会话文件 {filepath} 失败: {e}") return None @@ -80,8 +81,9 @@ def get_session_data(session_id: str) -> dict: content = f.read() if not content: return None - return json.loads(content) - except (IOError, json.JSONDecodeError) as e: + from backend.utils.json_helpers import safe_parse_json + return safe_parse_json(content, {}) + except IOError as e: logger.error(f"读取会话文件 {filepath} 失败: {e}") return None @@ -106,11 +108,13 @@ def cancel_session(session_id: str) -> bool: try: with FileLock(lock_path, timeout=5): # 读取当前会话数据 + from backend.utils.json_helpers import safe_parse_json + with open(filepath, 'r', encoding='utf-8') as f: content = f.read() if not content: return False - data = json.loads(content) + data = safe_parse_json(content, {}) # 如果已经成功,不允许取消 if data.get('status') == 'success':