refactor(backend): adopt typed SQLAlchemy models

This commit is contained in:
2026-05-04 22:05:33 +08:00
parent d811c20932
commit f939a50950
9 changed files with 129 additions and 71 deletions
+17 -14
View File
@@ -1,35 +1,38 @@
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, Index
from sqlalchemy.orm import relationship
from datetime import datetime, timezone
from typing import TYPE_CHECKING
from sqlalchemy import DateTime, ForeignKey, Index, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from backend.models.database import Base
if TYPE_CHECKING:
from backend.models.check_in_task import CheckInTask
class CheckInRecord(Base):
"""打卡记录模型"""
__tablename__ = "check_in_records"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
task_id = Column(
Integer,
id: Mapped[int] = mapped_column(primary_key=True, index=True, autoincrement=True)
task_id: Mapped[int] = mapped_column(
ForeignKey("check_in_tasks.id", ondelete="CASCADE"),
nullable=False,
index=True,
comment="任务 ID",
)
status = Column(
status: Mapped[str] = mapped_column(
String(20),
nullable=False,
index=True,
comment="状态: success/failure/out_of_time/unknown/pending",
)
response_text = Column(Text, default="", comment="响应文本")
error_message = Column(Text, default="", comment="错误信息")
location = Column(Text, default="{}", comment="位置信息 JSON")
trigger_type = Column(
response_text: Mapped[str] = mapped_column(Text, default="", comment="响应文本")
error_message: Mapped[str] = mapped_column(Text, default="", comment="错误信息")
location: Mapped[str] = mapped_column(Text, default="{}", comment="位置信息 JSON")
trigger_type: Mapped[str] = mapped_column(
String(50), default="scheduled", comment="触发类型: scheduled/manual/admin"
)
check_in_time = Column(
check_in_time: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
index=True,
@@ -37,7 +40,7 @@ class CheckInRecord(Base):
)
# 关联任务
task = relationship("CheckInTask", back_populates="check_in_records")
task: Mapped["CheckInTask"] = relationship(back_populates="check_in_records")
# 添加复合索引:加速常见查询
__table_args__ = (
+27 -15
View File
@@ -1,45 +1,57 @@
from sqlalchemy import Column, Integer, String, Boolean, Text, DateTime, ForeignKey, Index
from sqlalchemy.orm import relationship
from datetime import datetime
from typing import TYPE_CHECKING
from sqlalchemy import Boolean, DateTime, ForeignKey, Index, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from backend.models.database import Base
if TYPE_CHECKING:
from backend.models.check_in_record import CheckInRecord
from backend.models.user import User
class CheckInTask(Base):
"""打卡任务模型"""
__tablename__ = "check_in_tasks"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
user_id = Column(
Integer,
id: Mapped[int] = mapped_column(primary_key=True, index=True, autoincrement=True)
user_id: Mapped[int] = mapped_column(
ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
index=True,
comment="用户 ID",
)
payload_config = Column(
payload_config: Mapped[str] = mapped_column(
Text,
default="{}",
nullable=False,
comment="完整的 payload 配置 JSON(从模板生成,包含 ThreadId 和所有字段)",
)
name = Column(String(100), default="", comment="任务名称(用户自定义)")
is_active = Column(Boolean, default=True, comment="是否启用自动打卡(不影响手动打卡)")
cron_expression = Column(
name: Mapped[str] = mapped_column(String(100), default="", comment="任务名称(用户自定义)")
is_active: Mapped[bool] = mapped_column(
Boolean, default=True, comment="是否启用自动打卡(不影响手动打卡)"
)
cron_expression: Mapped[str | None] = mapped_column(
String(100),
default="0 20 * * *",
nullable=True,
comment="Crontab 表达式(NULL 表示禁用自动打卡,否则按表达式执行)",
)
created_at = Column(DateTime(timezone=True), server_default=func.now(), comment="创建时间")
updated_at = Column(DateTime(timezone=True), onupdate=func.now(), comment="更新时间")
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), comment="创建时间"
)
updated_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), onupdate=func.now(), comment="更新时间"
)
# 关联用户
user = relationship("User", back_populates="tasks")
user: Mapped["User"] = relationship(back_populates="tasks")
# 关联打卡记录
check_in_records = relationship(
"CheckInRecord", back_populates="task", cascade="all, delete-orphan"
check_in_records: Mapped[list["CheckInRecord"]] = relationship(
back_populates="task", cascade="all, delete-orphan"
)
# 添加索引:加速查询
+7 -5
View File
@@ -1,7 +1,8 @@
from sqlalchemy import create_engine, event
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime, timezone
from sqlalchemy import create_engine, event
from sqlalchemy.orm import DeclarativeBase, sessionmaker
from backend.config import settings
# 创建数据库引擎
@@ -14,8 +15,9 @@ engine = create_engine(
# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 创建基类
Base = declarative_base()
class Base(DeclarativeBase):
pass
# SQLite timezone 修复:在加载对象后,将所有 naive datetime 转换为 UTC timezone-aware
+23 -12
View File
@@ -1,6 +1,11 @@
from sqlalchemy import Column, Integer, String, Boolean, Text, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from __future__ import annotations
from datetime import datetime
from sqlalchemy import Boolean, DateTime, ForeignKey, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from backend.models.database import Base
@@ -9,27 +14,33 @@ class TaskTemplate(Base):
__tablename__ = "task_templates"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
name = Column(String(100), nullable=False, comment="模板名称")
description = Column(Text, nullable=True, comment="模板描述")
id: Mapped[int] = mapped_column(primary_key=True, index=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(100), nullable=False, comment="模板名称")
description: Mapped[str | None] = mapped_column(Text, nullable=True, comment="模板描述")
# 父模板 ID(用于继承)
parent_id = Column(
Integer,
parent_id: Mapped[int | None] = mapped_column(
ForeignKey("task_templates.id", ondelete="SET NULL"),
nullable=True,
comment="父模板 ID",
)
# 字段配置(JSON 格式)
field_config = Column(Text, nullable=False, comment="字段配置(JSON")
field_config: Mapped[str] = mapped_column(Text, nullable=False, comment="字段配置(JSON")
is_active = Column(Boolean, default=True, comment="是否启用")
created_at = Column(DateTime(timezone=True), server_default=func.now(), comment="创建时间")
updated_at = Column(DateTime(timezone=True), onupdate=func.now(), comment="更新时间")
is_active: Mapped[bool] = mapped_column(Boolean, default=True, comment="是否启用")
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), comment="创建时间"
)
updated_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), onupdate=func.now(), comment="更新时间"
)
# 自引用关系:父模板和子模板
parent = relationship("TaskTemplate", remote_side=[id], backref="children")
parent: Mapped[TaskTemplate | None] = relationship(
remote_side="TaskTemplate.id", back_populates="children"
)
children: Mapped[list[TaskTemplate]] = relationship(back_populates="parent")
def __repr__(self):
return f"<TaskTemplate(id={self.id}, name='{self.name}', is_active={self.is_active})>"
+46 -19
View File
@@ -1,56 +1,83 @@
from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, Index
from sqlalchemy.orm import relationship
from datetime import datetime
from typing import TYPE_CHECKING
from sqlalchemy import Boolean, DateTime, Index, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from backend.models.database import Base
if TYPE_CHECKING:
from backend.models.check_in_task import CheckInTask
class User(Base):
"""用户模型 - 账户信息"""
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
jwt_sub = Column(
id: Mapped[int] = mapped_column(primary_key=True, index=True, autoincrement=True)
jwt_sub: Mapped[str | None] = mapped_column(
String(200),
unique=True,
nullable=True,
index=True,
comment="QQ 扫码登录的唯一用户标识(注册时为空)",
)
alias = Column(
alias: Mapped[str] = mapped_column(
String(50), unique=True, nullable=False, index=True, comment="用户别名(用于登录)"
)
email = Column(String(100), nullable=True, comment="用户邮箱(用于接收通知)")
password_hash = Column(String(200), nullable=True, comment="密码哈希(bcrypt加密")
authorization = Column(Text, nullable=True, comment="当前有效的 QQ Token")
jwt_exp = Column(String(20), default="0", comment="Token 过期时间戳")
token_expiring_notified = Column(
email: Mapped[str | None] = mapped_column(
String(100), nullable=True, comment="用户邮箱(用于接收通知"
)
password_hash: Mapped[str | None] = mapped_column(
String(200), nullable=True, comment="密码哈希(bcrypt加密)"
)
authorization: Mapped[str | None] = mapped_column(
Text, nullable=True, comment="当前有效的 QQ Token"
)
jwt_exp: Mapped[str] = mapped_column(String(20), default="0", comment="Token 过期时间戳")
token_expiring_notified: Mapped[bool] = mapped_column(
Boolean,
default=False,
nullable=False,
comment="Token 即将过期提醒是否已发送(过期前30分钟)",
)
token_expired_notified = Column(
token_expired_notified: Mapped[bool] = mapped_column(
Boolean,
default=False,
nullable=False,
comment="Token 已过期提醒是否已发送(过期后30分钟内)",
)
role = Column(String(20), default="user", index=True, comment="角色: user/admin")
is_approved = Column(Boolean, default=False, index=True, comment="是否已通过管理员审批")
role: Mapped[str] = mapped_column(
String(20), default="user", index=True, comment="角色: user/admin"
)
is_approved: Mapped[bool] = mapped_column(
Boolean, default=False, index=True, comment="是否已通过管理员审批"
)
# 账户锁定相关字段
failed_login_attempts = Column(Integer, default=0, nullable=False, comment="连续登录失败次数")
locked_until = Column(DateTime(timezone=True), nullable=True, comment="账户锁定到期时间")
last_failed_login = Column(
failed_login_attempts: Mapped[int] = mapped_column(
default=0, nullable=False, comment="连续登录失败次数"
)
locked_until: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True, comment="账户锁定到期时间"
)
last_failed_login: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True, comment="最后一次登录失败时间"
)
created_at = Column(DateTime(timezone=True), server_default=func.now(), comment="创建时间")
updated_at = Column(DateTime(timezone=True), onupdate=func.now(), comment="更新时间")
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), comment="创建时间"
)
updated_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), onupdate=func.now(), comment="更新时间"
)
# 关联打卡任务
tasks = relationship("CheckInTask", back_populates="user", cascade="all, delete-orphan")
tasks: Mapped[list["CheckInTask"]] = relationship(
back_populates="user", cascade="all, delete-orphan"
)
# 添加复合索引:加速审批管理查询
__table_args__ = (