mirror of
https://github.com/Cccc-owo/CheckInApp.git
synced 2026-06-17 05:56:29 +00:00
style(backend): apply ruff format
This commit is contained in:
@@ -3,18 +3,16 @@
|
||||
|
||||
提供统一的资源查询、权限验证等通用功能
|
||||
"""
|
||||
|
||||
from typing import TypeVar, Type, Optional, Any
|
||||
from sqlalchemy.orm import Session
|
||||
from fastapi import HTTPException, status
|
||||
|
||||
T = TypeVar('T')
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def get_or_404(
|
||||
model: Type[T],
|
||||
model_id: int,
|
||||
db: Session,
|
||||
error_message: Optional[str] = None
|
||||
model: Type[T], model_id: int, db: Session, error_message: Optional[str] = None
|
||||
) -> T:
|
||||
"""
|
||||
查询资源,不存在则抛出 404
|
||||
@@ -35,18 +33,13 @@ def get_or_404(
|
||||
if not obj:
|
||||
default_message = f"{model.__name__}不存在"
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=error_message or default_message
|
||||
status_code=status.HTTP_404_NOT_FOUND, detail=error_message or default_message
|
||||
)
|
||||
return obj
|
||||
|
||||
|
||||
def get_owned_or_403(
|
||||
model: Type[T],
|
||||
model_id: int,
|
||||
user_id: int,
|
||||
db: Session,
|
||||
error_message: Optional[str] = None
|
||||
model: Type[T], model_id: int, user_id: int, db: Session, error_message: Optional[str] = None
|
||||
) -> T:
|
||||
"""
|
||||
查询资源并验证归属,否则抛出 403
|
||||
@@ -64,24 +57,19 @@ def get_owned_or_403(
|
||||
Raises:
|
||||
HTTPException: 403 无权访问此资源
|
||||
"""
|
||||
obj = db.query(model).filter(
|
||||
model.id == model_id,
|
||||
model.user_id == user_id
|
||||
).first()
|
||||
obj = db.query(model).filter(model.id == model_id, model.user_id == user_id).first()
|
||||
|
||||
if not obj:
|
||||
# 先检查资源是否存在
|
||||
exists = db.query(model).filter(model.id == model_id).first()
|
||||
if not exists:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"{model.__name__}不存在"
|
||||
status_code=status.HTTP_404_NOT_FOUND, detail=f"{model.__name__}不存在"
|
||||
)
|
||||
# 资源存在但不属于当前用户
|
||||
default_message = f"无权访问此{model.__name__}"
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail=error_message or default_message
|
||||
status_code=status.HTTP_403_FORBIDDEN, detail=error_message or default_message
|
||||
)
|
||||
|
||||
return obj
|
||||
@@ -92,7 +80,7 @@ def get_by_field_or_404(
|
||||
field_name: str,
|
||||
field_value: Any,
|
||||
db: Session,
|
||||
error_message: Optional[str] = None
|
||||
error_message: Optional[str] = None,
|
||||
) -> T:
|
||||
"""
|
||||
根据字段查询资源,不存在则抛出 404
|
||||
@@ -110,14 +98,11 @@ def get_by_field_or_404(
|
||||
Raises:
|
||||
HTTPException: 404 资源不存在
|
||||
"""
|
||||
obj = db.query(model).filter(
|
||||
getattr(model, field_name) == field_value
|
||||
).first()
|
||||
obj = db.query(model).filter(getattr(model, field_name) == field_value).first()
|
||||
|
||||
if not obj:
|
||||
default_message = f"{model.__name__}不存在"
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=error_message or default_message
|
||||
status_code=status.HTTP_404_NOT_FOUND, detail=error_message or default_message
|
||||
)
|
||||
return obj
|
||||
|
||||
@@ -3,6 +3,7 @@ JSON 处理辅助函数
|
||||
|
||||
提供安全的 JSON 解析和数据提取功能
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Optional, Any, Dict
|
||||
@@ -10,11 +11,7 @@ from typing import Optional, Any, Dict
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def safe_parse_json(
|
||||
json_str: Optional[str],
|
||||
default: Any = None,
|
||||
log_error: bool = True
|
||||
) -> Any:
|
||||
def safe_parse_json(json_str: Optional[str], default: Any = None, log_error: bool = True) -> Any:
|
||||
"""
|
||||
安全解析 JSON 字符串,失败时返回默认值
|
||||
|
||||
@@ -37,10 +34,7 @@ def safe_parse_json(
|
||||
return default
|
||||
|
||||
|
||||
def safe_parse_payload(
|
||||
payload_config: Optional[str],
|
||||
default: Optional[Dict] = None
|
||||
) -> Dict:
|
||||
def safe_parse_payload(payload_config: Optional[str], default: Optional[Dict] = None) -> Dict:
|
||||
"""
|
||||
安全解析 payload_config,失败时返回默认字典
|
||||
|
||||
@@ -70,7 +64,7 @@ def extract_thread_id(payload_config: Optional[str]) -> Optional[str]:
|
||||
ThreadId 或 None
|
||||
"""
|
||||
payload = safe_parse_payload(payload_config)
|
||||
return payload.get('ThreadId')
|
||||
return payload.get("ThreadId")
|
||||
|
||||
|
||||
def extract_signature(payload_config: Optional[str]) -> Optional[str]:
|
||||
@@ -84,7 +78,7 @@ def extract_signature(payload_config: Optional[str]) -> Optional[str]:
|
||||
Signature 或 None
|
||||
"""
|
||||
payload = safe_parse_payload(payload_config)
|
||||
return payload.get('Signature')
|
||||
return payload.get("Signature")
|
||||
|
||||
|
||||
def build_task_info(task) -> Dict[str, str]:
|
||||
@@ -98,6 +92,6 @@ def build_task_info(task) -> Dict[str, str]:
|
||||
包含 thread_id 和 name 的字典
|
||||
"""
|
||||
return {
|
||||
'thread_id': extract_thread_id(getattr(task, 'payload_config', None)) or '未知',
|
||||
'name': getattr(task, 'name', None) or f'Task-{getattr(task, "id", "Unknown")}'
|
||||
"thread_id": extract_thread_id(getattr(task, "payload_config", None)) or "未知",
|
||||
"name": getattr(task, "name", None) or f"Task-{getattr(task, 'id', 'Unknown')}",
|
||||
}
|
||||
|
||||
@@ -42,7 +42,7 @@ class JWTManager:
|
||||
"alias": user_alias,
|
||||
"iat": now, # Issued At - 签发时间
|
||||
"exp": exp, # Expiration Time - 过期时间
|
||||
"type": "access" # Token 类型
|
||||
"type": "access", # Token 类型
|
||||
}
|
||||
|
||||
token = jwt.encode(payload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
|
||||
@@ -97,10 +97,7 @@ class JWTManager:
|
||||
try:
|
||||
# decode 时设置 verify=False 跳过过期验证
|
||||
payload = jwt.decode(
|
||||
token,
|
||||
JWT_SECRET_KEY,
|
||||
algorithms=[JWT_ALGORITHM],
|
||||
options={"verify_exp": False}
|
||||
token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM], options={"verify_exp": False}
|
||||
)
|
||||
return payload.get("user_id")
|
||||
except Exception as e:
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
提供统一的时间戳处理和格式化功能
|
||||
"""
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional
|
||||
|
||||
@@ -85,7 +86,7 @@ def minutes_until_expiry(timestamp: int) -> int:
|
||||
return seconds // 60
|
||||
|
||||
|
||||
def format_timestamp(timestamp: int, format_str: str = '%Y-%m-%d %H:%M:%S') -> str:
|
||||
def format_timestamp(timestamp: int, format_str: str = "%Y-%m-%d %H:%M:%S") -> str:
|
||||
"""
|
||||
格式化时间戳为人类可读格式
|
||||
|
||||
|
||||
Reference in New Issue
Block a user