Files
CheckInApp/apps/backend/services/email_settings_service.py
T

237 lines
8.6 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from backend.config import settings
from backend.models import EmailNotificationSettings
from backend.models.database import SessionLocal
from backend.schemas.email_settings import (
EmailNotificationSettingsResponse,
EmailNotificationSettingsUpdate,
)
SETTINGS_ROW_ID = 1
@dataclass(frozen=True)
class EmailSettingsSnapshot:
id: int
smtp_server: str
smtp_port: int
smtp_sender_email: str
smtp_sender_password: str
smtp_use_ssl: bool
notify_token_expiring: bool
notify_check_in_success: bool
require_admin_approval_for_registration: bool
warn_unverified_email_before_approval: bool
has_smtp_sender_password: bool
created_at: datetime | None = None
updated_at: datetime | None = None
class EmailSettingsService:
"""读取和更新系统邮件配置。"""
@staticmethod
def _defaults() -> EmailNotificationSettings:
return EmailNotificationSettings(
id=SETTINGS_ROW_ID,
smtp_server=settings.SMTP_SERVER,
smtp_port=settings.SMTP_PORT,
smtp_sender_email=settings.SMTP_SENDER_EMAIL,
smtp_sender_password=settings.SMTP_SENDER_PASSWORD,
smtp_use_ssl=settings.SMTP_USE_SSL,
notify_token_expiring=True,
notify_check_in_success=True,
require_admin_approval_for_registration=True,
warn_unverified_email_before_approval=True,
)
@staticmethod
def _default_snapshot() -> EmailSettingsSnapshot:
password = str(settings.SMTP_SENDER_PASSWORD or "")
return EmailSettingsSnapshot(
id=SETTINGS_ROW_ID,
smtp_server=settings.SMTP_SERVER,
smtp_port=settings.SMTP_PORT,
smtp_sender_email=settings.SMTP_SENDER_EMAIL,
smtp_sender_password=password,
smtp_use_ssl=settings.SMTP_USE_SSL,
notify_token_expiring=True,
notify_check_in_success=True,
require_admin_approval_for_registration=True,
warn_unverified_email_before_approval=True,
has_smtp_sender_password=bool(password),
created_at=None,
updated_at=None,
)
@staticmethod
def _get_or_create(db: Session) -> EmailNotificationSettings:
row = (
db.query(EmailNotificationSettings)
.filter(EmailNotificationSettings.id == SETTINGS_ROW_ID)
.first()
)
if row:
return row
row = EmailSettingsService._defaults()
db.add(row)
db.commit()
db.refresh(row)
return row
@staticmethod
def _to_snapshot(row: EmailNotificationSettings) -> EmailSettingsSnapshot:
password = str(row.smtp_sender_password or "")
return EmailSettingsSnapshot(
id=row.id,
smtp_server=str(row.smtp_server or ""),
smtp_port=int(row.smtp_port or 0),
smtp_sender_email=str(row.smtp_sender_email or ""),
smtp_sender_password=password,
smtp_use_ssl=bool(row.smtp_use_ssl),
notify_token_expiring=bool(row.notify_token_expiring),
notify_check_in_success=bool(row.notify_check_in_success),
require_admin_approval_for_registration=bool(
row.require_admin_approval_for_registration
),
warn_unverified_email_before_approval=bool(row.warn_unverified_email_before_approval),
has_smtp_sender_password=bool(password),
created_at=row.created_at,
updated_at=row.updated_at,
)
@staticmethod
def _to_response(snapshot: EmailSettingsSnapshot) -> EmailNotificationSettingsResponse:
return EmailNotificationSettingsResponse(
id=snapshot.id,
smtp_server=snapshot.smtp_server,
smtp_port=snapshot.smtp_port,
smtp_sender_email=snapshot.smtp_sender_email,
smtp_use_ssl=snapshot.smtp_use_ssl,
notify_token_expiring=snapshot.notify_token_expiring,
notify_check_in_success=snapshot.notify_check_in_success,
require_admin_approval_for_registration=snapshot.require_admin_approval_for_registration,
warn_unverified_email_before_approval=snapshot.warn_unverified_email_before_approval,
has_smtp_sender_password=snapshot.has_smtp_sender_password,
created_at=snapshot.created_at,
updated_at=snapshot.updated_at,
)
@staticmethod
def get_snapshot(db: Session) -> EmailSettingsSnapshot:
return EmailSettingsService._to_snapshot(EmailSettingsService._get_or_create(db))
@staticmethod
def get_response(db: Session) -> EmailNotificationSettingsResponse:
return EmailSettingsService._to_response(EmailSettingsService.get_snapshot(db))
@staticmethod
def update_settings(
db: Session, payload: EmailNotificationSettingsUpdate
) -> EmailSettingsSnapshot:
row = EmailSettingsService._get_or_create(db)
row.smtp_server = payload.smtp_server
row.smtp_port = payload.smtp_port
row.smtp_sender_email = str(payload.smtp_sender_email)
row.smtp_use_ssl = payload.smtp_use_ssl
row.notify_token_expiring = payload.notify_token_expiring
row.notify_check_in_success = payload.notify_check_in_success
row.require_admin_approval_for_registration = (
payload.require_admin_approval_for_registration
)
row.warn_unverified_email_before_approval = payload.warn_unverified_email_before_approval
if payload.clear_smtp_sender_password:
row.smtp_sender_password = ""
elif payload.smtp_sender_password is not None:
row.smtp_sender_password = payload.smtp_sender_password
row.updated_at = datetime.now(timezone.utc)
db.commit()
db.refresh(row)
return EmailSettingsService._to_snapshot(row)
@staticmethod
def update_response(
db: Session, payload: EmailNotificationSettingsUpdate
) -> EmailNotificationSettingsResponse:
return EmailSettingsService._to_response(EmailSettingsService.update_settings(db, payload))
@staticmethod
def get_smtp_config() -> dict[str, object] | None:
db = SessionLocal()
try:
try:
snapshot = EmailSettingsService.get_snapshot(db)
except SQLAlchemyError:
snapshot = EmailSettingsService._default_snapshot()
finally:
db.close()
if not snapshot.smtp_server or not snapshot.smtp_sender_email or not snapshot.smtp_port:
return None
return {
"smtp_server": snapshot.smtp_server,
"smtp_port": snapshot.smtp_port,
"sender_email": snapshot.smtp_sender_email,
"sender_password": snapshot.smtp_sender_password,
"use_ssl": snapshot.smtp_use_ssl,
}
@staticmethod
def is_token_expiring_notification_enabled() -> bool:
db = SessionLocal()
try:
try:
return EmailSettingsService.get_snapshot(db).notify_token_expiring
except SQLAlchemyError:
return EmailSettingsService._default_snapshot().notify_token_expiring
finally:
db.close()
@staticmethod
def is_check_in_success_notification_enabled() -> bool:
db = SessionLocal()
try:
try:
return EmailSettingsService.get_snapshot(db).notify_check_in_success
except SQLAlchemyError:
return EmailSettingsService._default_snapshot().notify_check_in_success
finally:
db.close()
@staticmethod
def is_registration_approval_required() -> bool:
db = SessionLocal()
try:
try:
return EmailSettingsService.get_snapshot(db).require_admin_approval_for_registration
except SQLAlchemyError:
return (
EmailSettingsService._default_snapshot().require_admin_approval_for_registration
)
finally:
db.close()
@staticmethod
def should_warn_unverified_email_before_approval() -> bool:
db = SessionLocal()
try:
try:
return EmailSettingsService.get_snapshot(db).warn_unverified_email_before_approval
except SQLAlchemyError:
return (
EmailSettingsService._default_snapshot().warn_unverified_email_before_approval
)
finally:
db.close()