feat(email): add admin notification settings

This commit is contained in:
2026-05-05 13:38:34 +08:00
parent a780c1bf52
commit 73d476bcea
21 changed files with 929 additions and 17 deletions
@@ -0,0 +1,194 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from backend.config import settings
from backend.models import EmailNotificationSettings
from backend.models.database import SessionLocal
from backend.schemas.email_settings import (
EmailNotificationSettingsResponse,
EmailNotificationSettingsUpdate,
)
SETTINGS_ROW_ID = 1
@dataclass(frozen=True)
class EmailSettingsSnapshot:
id: int
smtp_server: str
smtp_port: int
smtp_sender_email: str
smtp_sender_password: str
smtp_use_ssl: bool
notify_token_expiring: bool
notify_check_in_success: bool
has_smtp_sender_password: bool
created_at: datetime | None = None
updated_at: datetime | None = None
class EmailSettingsService:
"""读取和更新系统邮件配置。"""
@staticmethod
def _defaults() -> EmailNotificationSettings:
return EmailNotificationSettings(
id=SETTINGS_ROW_ID,
smtp_server=settings.SMTP_SERVER,
smtp_port=settings.SMTP_PORT,
smtp_sender_email=settings.SMTP_SENDER_EMAIL,
smtp_sender_password=settings.SMTP_SENDER_PASSWORD,
smtp_use_ssl=settings.SMTP_USE_SSL,
notify_token_expiring=True,
notify_check_in_success=True,
)
@staticmethod
def _default_snapshot() -> EmailSettingsSnapshot:
password = str(settings.SMTP_SENDER_PASSWORD or "")
return EmailSettingsSnapshot(
id=SETTINGS_ROW_ID,
smtp_server=settings.SMTP_SERVER,
smtp_port=settings.SMTP_PORT,
smtp_sender_email=settings.SMTP_SENDER_EMAIL,
smtp_sender_password=password,
smtp_use_ssl=settings.SMTP_USE_SSL,
notify_token_expiring=True,
notify_check_in_success=True,
has_smtp_sender_password=bool(password),
created_at=None,
updated_at=None,
)
@staticmethod
def _get_or_create(db: Session) -> EmailNotificationSettings:
row = (
db.query(EmailNotificationSettings)
.filter(EmailNotificationSettings.id == SETTINGS_ROW_ID)
.first()
)
if row:
return row
row = EmailSettingsService._defaults()
db.add(row)
db.commit()
db.refresh(row)
return row
@staticmethod
def _to_snapshot(row: EmailNotificationSettings) -> EmailSettingsSnapshot:
password = str(row.smtp_sender_password or "")
return EmailSettingsSnapshot(
id=row.id,
smtp_server=str(row.smtp_server or ""),
smtp_port=int(row.smtp_port or 0),
smtp_sender_email=str(row.smtp_sender_email or ""),
smtp_sender_password=password,
smtp_use_ssl=bool(row.smtp_use_ssl),
notify_token_expiring=bool(row.notify_token_expiring),
notify_check_in_success=bool(row.notify_check_in_success),
has_smtp_sender_password=bool(password),
created_at=row.created_at,
updated_at=row.updated_at,
)
@staticmethod
def _to_response(snapshot: EmailSettingsSnapshot) -> EmailNotificationSettingsResponse:
return EmailNotificationSettingsResponse(
id=snapshot.id,
smtp_server=snapshot.smtp_server,
smtp_port=snapshot.smtp_port,
smtp_sender_email=snapshot.smtp_sender_email,
smtp_use_ssl=snapshot.smtp_use_ssl,
notify_token_expiring=snapshot.notify_token_expiring,
notify_check_in_success=snapshot.notify_check_in_success,
has_smtp_sender_password=snapshot.has_smtp_sender_password,
created_at=snapshot.created_at,
updated_at=snapshot.updated_at,
)
@staticmethod
def get_snapshot(db: Session) -> EmailSettingsSnapshot:
return EmailSettingsService._to_snapshot(EmailSettingsService._get_or_create(db))
@staticmethod
def get_response(db: Session) -> EmailNotificationSettingsResponse:
return EmailSettingsService._to_response(EmailSettingsService.get_snapshot(db))
@staticmethod
def update_settings(
db: Session, payload: EmailNotificationSettingsUpdate
) -> EmailSettingsSnapshot:
row = EmailSettingsService._get_or_create(db)
row.smtp_server = payload.smtp_server
row.smtp_port = payload.smtp_port
row.smtp_sender_email = str(payload.smtp_sender_email)
row.smtp_use_ssl = payload.smtp_use_ssl
row.notify_token_expiring = payload.notify_token_expiring
row.notify_check_in_success = payload.notify_check_in_success
if payload.clear_smtp_sender_password:
row.smtp_sender_password = ""
elif payload.smtp_sender_password is not None:
row.smtp_sender_password = payload.smtp_sender_password
row.updated_at = datetime.now(timezone.utc)
db.commit()
db.refresh(row)
return EmailSettingsService._to_snapshot(row)
@staticmethod
def update_response(
db: Session, payload: EmailNotificationSettingsUpdate
) -> EmailNotificationSettingsResponse:
return EmailSettingsService._to_response(EmailSettingsService.update_settings(db, payload))
@staticmethod
def get_smtp_config() -> dict[str, object] | None:
db = SessionLocal()
try:
try:
snapshot = EmailSettingsService.get_snapshot(db)
except SQLAlchemyError:
snapshot = EmailSettingsService._default_snapshot()
finally:
db.close()
if not snapshot.smtp_server or not snapshot.smtp_sender_email or not snapshot.smtp_port:
return None
return {
"smtp_server": snapshot.smtp_server,
"smtp_port": snapshot.smtp_port,
"sender_email": snapshot.smtp_sender_email,
"sender_password": snapshot.smtp_sender_password,
"use_ssl": snapshot.smtp_use_ssl,
}
@staticmethod
def is_token_expiring_notification_enabled() -> bool:
db = SessionLocal()
try:
try:
return EmailSettingsService.get_snapshot(db).notify_token_expiring
except SQLAlchemyError:
return EmailSettingsService._default_snapshot().notify_token_expiring
finally:
db.close()
@staticmethod
def is_check_in_success_notification_enabled() -> bool:
db = SessionLocal()
try:
try:
return EmailSettingsService.get_snapshot(db).notify_check_in_success
except SQLAlchemyError:
return EmailSettingsService._default_snapshot().notify_check_in_success
finally:
db.close()