feat(backend): replace Selenium with Playwright

BREAKING CHANGE: backend now requires Python 3.12 or newer.
This commit is contained in:
2026-05-04 21:20:30 +08:00
parent fa07b340e7
commit d811c20932
15 changed files with 451 additions and 1570 deletions
+1 -1
View File
@@ -108,7 +108,7 @@ async def cancel_qrcode_session(session_id: str):
- **session_id**: 会话 ID
用于用户关闭二维码对话框时,终止后台的 Selenium 进程
用于用户关闭二维码对话框时,终止后台的 Playwright 进程
"""
try:
result = AuthService.cancel_qrcode_session(session_id)
+2 -3
View File
@@ -60,9 +60,8 @@ class Settings(BaseSettings):
TOKEN_CHECK_INTERVAL_MINUTES: int = 30 # Token 检查间隔(分钟)
SESSION_CLEANUP_INTERVAL_HOURS: int = 24 # 会话清理间隔(小时)
# Selenium / Chrome 配置(从 .env 读取)
CHROME_BINARY_PATH: str = ""
CHROMEDRIVER_PATH: str = ""
# Playwright / browser 配置(从 .env 读取)
BROWSER_EXECUTABLE_PATH: str = ""
settings = Settings()
+2 -2
View File
@@ -50,7 +50,7 @@ class AuthService:
# 老用户:刷新 Token
logger.info(f"老用户 {alias} 请求刷新 Token,会话: {session_id}")
# 在后台线程启动 Selenium,传入 jwt_sub
# 在后台线程启动 Playwright,传入 jwt_sub
thread = threading.Thread(
target=get_token_headless,
args=(session_id, existing_user.jwt_sub, alias, client_ip),
@@ -69,7 +69,7 @@ class AuthService:
logger.info(f"新用户 {alias} 请求注册,会话: {session_id},已预占用户名")
# 在后台线程启动 Selenium,不传入 jwt_sub(新用户)
# 在后台线程启动 Playwright,不传入 jwt_sub(新用户)
thread = threading.Thread(
target=get_token_headless, args=(session_id, None, alias, client_ip), daemon=True
)
+1 -1
View File
@@ -287,7 +287,7 @@ class CheckInService:
}
# 执行打卡(传递 task 对象和用户 token)
logger.info(f"🤖 调用 Selenium Worker 执行打卡...")
logger.info(f"🤖 调用 Playwright Worker 执行打卡...")
result = perform_check_in(task, user.authorization)
# 如果是 Token 过期导致的失败,处理 Token 过期情况
@@ -0,0 +1,70 @@
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable, Mapping
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36"
)
def _mapping_get(item: Any, key: str, default: Any = None) -> Any:
if isinstance(item, Mapping):
return item.get(key, default)
return getattr(item, key, default)
@dataclass(frozen=True)
class PlaywrightLaunchConfig:
executable_path: str = ""
headless: bool = True
args: tuple[str, ...] = field(
default_factory=lambda: ("--no-sandbox", "--disable-dev-shm-usage")
)
user_agent: str = DEFAULT_USER_AGENT
viewport_width: int = 1920
viewport_height: int = 1080
ignore_https_errors: bool = True
def to_launch_kwargs(self) -> dict[str, Any]:
kwargs: dict[str, Any] = {
"headless": self.headless,
"args": list(self.args),
}
if self.executable_path:
kwargs["executable_path"] = self.executable_path
return kwargs
def to_context_kwargs(self) -> dict[str, Any]:
return {
"user_agent": self.user_agent,
"viewport": {"width": self.viewport_width, "height": self.viewport_height},
"ignore_https_errors": self.ignore_https_errors,
}
def extract_cookie_value(cookies: Iterable[Any], cookie_name: str) -> str | None:
for cookie in cookies:
if _mapping_get(cookie, "name") == cookie_name:
value = _mapping_get(cookie, "value")
if isinstance(value, str) and value.strip():
return value
return None
def extract_payload_header(headers: Mapping[str, Any]) -> str | None:
for key, value in headers.items():
if key.lower() == "x-api-request-payload" and isinstance(value, str):
stripped = value.strip()
if stripped:
return stripped
return None
def save_page_debug_artifacts(page: Any, screenshot_path: Path, html_path: Path) -> None:
screenshot_path.parent.mkdir(parents=True, exist_ok=True)
page.screenshot(path=str(screenshot_path))
html_path.write_text(page.content(), encoding="utf-8")
+84 -92
View File
@@ -1,23 +1,33 @@
import requests
from __future__ import annotations
import json
import time
import os
import logging
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
import requests
import time
from typing import Dict, Any
from playwright.sync_api import sync_playwright
from backend.config import settings
from backend.workers.browser_automation import (
PlaywrightLaunchConfig,
extract_payload_header,
save_page_debug_artifacts,
)
logger = logging.getLogger(__name__)
# Chrome 配置路径 - 从设置中读取
CHROME_BINARY_PATH = settings.CHROME_BINARY_PATH
CHROMEDRIVER_PATH = settings.CHROMEDRIVER_PATH
BASE_DIR = settings.BASE_DIR
DEBUG_SCREENSHOT_PATH = BASE_DIR / "payload_debug.png"
DEBUG_PAGE_SOURCE_PATH = BASE_DIR / "payload_debug_page_source.html"
def get_live_x_api_payload(auth_token: str) -> str:
def get_browser_config() -> PlaywrightLaunchConfig:
"""获取 Playwright 浏览器配置(从 settings 读取)"""
return PlaywrightLaunchConfig(executable_path=settings.BROWSER_EXECUTABLE_PATH)
def get_live_x_api_payload(auth_token: str) -> str | None:
"""
启动一个临时的无头浏览器会话,获取新鲜的 x-api-request-payload
@@ -27,89 +37,89 @@ def get_live_x_api_payload(auth_token: str) -> str:
Returns:
x-api-request-payload 值,失败返回 None
"""
logger.info("正在启动临时浏览器会话以监听网络日志...")
# 根据配置创建 Service
if CHROMEDRIVER_PATH:
service = Service(executable_path=CHROMEDRIVER_PATH)
else:
service = Service() # 使用 Selenium Manager 自动管理
chrome_options = Options()
# 如果配置了 Chrome 路径,则使用配置的路径
if CHROME_BINARY_PATH:
chrome_options.binary_location = CHROME_BINARY_PATH
# 开启性能日志记录功能
logging_prefs = {"performance": "ALL"}
chrome_options.set_capability("goog:loggingPrefs", logging_prefs)
# Headless 模式配置
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36"
chrome_options.add_argument(f"user-agent={user_agent}")
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument("--ignore-certificate-errors")
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
driver = webdriver.Chrome(service=service, options=chrome_options)
logger.info("正在启动临时 Playwright 会话以监听网络请求...")
browser = None
context = None
page = None
payload_signature = None
playwright = None
try:
# 导航到同源空白页,用于设置 Cookie
driver.get("https://i.jielong.com/my-class")
browser_config = get_browser_config()
# 注入长期 Token
driver.add_cookie({"name": "token", "value": auth_token, "domain": ".jielong.com"})
playwright = sync_playwright().start()
browser = playwright.chromium.launch(**browser_config.to_launch_kwargs())
context = browser.new_context(**browser_config.to_context_kwargs())
page = context.new_page()
# 导航到触发 API 的页面
driver.get("https://i.jielong.com/my-form")
def on_request(request) -> None:
nonlocal payload_signature
if payload_signature:
return
# 等待并捕获 x-api-request-payload
max_wait_time = 20 # 最多等待20秒
payload = extract_payload_header(request.headers)
if payload:
payload_signature = payload
logger.info("成功通过 Playwright 捕获到现场的 x-api-request-payload")
page.on("request", on_request)
page.goto("https://i.jielong.com/my-class", wait_until="domcontentloaded", timeout=60000)
context.add_cookies(
[
{
"name": "token",
"value": auth_token,
"domain": ".jielong.com",
"path": "/",
}
]
)
page.goto("https://i.jielong.com/my-form", wait_until="domcontentloaded", timeout=60000)
max_wait_time = 20
start_time = time.time()
found = False
while time.time() - start_time < max_wait_time:
logs = driver.get_log("performance")
for entry in logs:
log = json.loads(entry["message"])["message"]
if log["method"] == "Network.requestWillBeSent":
headers = log.get("params", {}).get("request", {}).get("headers", {})
headers_lower = {k.lower(): v for k, v in headers.items()}
if "x-api-request-payload" in headers_lower:
payload_signature = headers_lower["x-api-request-payload"]
logger.info("成功通过网络日志捕获到现场的 x-api-request-payload")
found = True
break
if found:
if payload_signature:
break
time.sleep(1)
page.wait_for_timeout(500)
if not payload_signature:
raise Exception(
f"{max_wait_time} 秒内未能通过网络日志捕获到 x-api-request-payload。"
f"{max_wait_time} 秒内未能通过网络请求捕获到 x-api-request-payload。"
)
except Exception as e:
logger.error(f"获取现场 x-api-request-payload 时失败: {e}")
try:
debug_screenshot = os.path.join(settings.BASE_DIR, "payload_debug.png")
driver.save_screenshot(debug_screenshot)
if page:
save_page_debug_artifacts(page, DEBUG_SCREENSHOT_PATH, DEBUG_PAGE_SOURCE_PATH)
except Exception as screenshot_error:
logger.warning(f"保存调试截图失败: {screenshot_error}")
finally:
# 优雅关闭 WebDriver,避免 Windows asyncio ConnectionResetError
try:
driver.quit()
except Exception as e:
# 忽略 WebDriver 关闭时的连接错误(Windows 平台常见问题)
if "WinError 10054" not in str(e) and "ConnectionResetError" not in str(e):
logger.warning(f"关闭 WebDriver 时出现警告: {e}")
if page:
try:
page.close()
except Exception:
pass
if context:
try:
context.close()
except Exception:
pass
if browser:
try:
browser.close()
except Exception as e:
logger.warning(f"关闭 Playwright 浏览器时出现警告: {e}")
if playwright:
try:
playwright.stop()
except Exception as e:
logger.warning(f"关闭 Playwright runtime 时出现警告: {e}")
return payload_signature
@@ -129,13 +139,12 @@ def perform_check_in(task, user_token: str) -> Dict[str, Any]:
- response_text: 响应文本
- error_message: 错误信息
"""
# 从 payload_config 中提取 Signature 用于日志
from backend.utils.json_helpers import safe_parse_payload, extract_signature
payload_dict = safe_parse_payload(task.payload_config)
signature = extract_signature(task.payload_config) or "Unknown"
logger.info(f"Selenium打卡: 正在为任务 ID: {task.id} (Signature: {signature}) 执行打卡...")
logger.info(f"Playwright打卡: 正在为任务 ID: {task.id} (Signature: {signature}) 执行打卡...")
if not user_token:
error_msg = f"任务 ID: {task.id} (Signature: {signature}) 的 Token 为空,跳过。"
@@ -147,7 +156,6 @@ def perform_check_in(task, user_token: str) -> Dict[str, Any]:
"error_message": error_msg,
}
# 获取 x-api-request-payload
payload_signature = get_live_x_api_payload(user_token)
if not payload_signature:
error_msg = f"任务 ID: {task.id} (Signature: {signature}) 未能获取到现场签名,打卡中止。"
@@ -160,7 +168,6 @@ def perform_check_in(task, user_token: str) -> Dict[str, Any]:
}
try:
# 使用任务的 payload_config(从模板生成的完整配置,包含 ThreadId)
from backend.utils.json_helpers import safe_parse_payload, extract_thread_id
payload = safe_parse_payload(task.payload_config)
@@ -190,7 +197,6 @@ def perform_check_in(task, user_token: str) -> Dict[str, Any]:
url = "https://api.jielong.com/api/CheckIn/EditRecord"
# 打印请求详情用于调试
payload_json = json.dumps(payload, ensure_ascii=False)
logger.info(f"📤 打卡请求详情 - 任务 ID: {task.id} (Signature: {signature})")
logger.info(f"📍 URL: {url}")
@@ -205,11 +211,8 @@ def perform_check_in(task, user_token: str) -> Dict[str, Any]:
f"✉️ 任务 ID: {task.id} (Signature: {signature}) 打卡请求完成!响应: {response_text}"
)
# 判断响应内容(参考 V1 实现逻辑)
# 情况1: 明确包含"打卡成功" → 成功
if "打卡成功" in response_text:
logger.info(f"✅ 检测到成功关键字 '打卡成功',打卡成功")
# 发送成功邮件通知
if task.user and task.user.email:
try:
from backend.services.email_service import EmailService
@@ -229,8 +232,6 @@ def perform_check_in(task, user_token: str) -> Dict[str, Any]:
"error_message": "",
}
# 情况2: 已经提交过了(重复提交)→ 视为成功,但不发送邮件
# 匹配 "已被提交" 或 "已经打卡"
elif (
"已被提交" in response_text
or "已经打卡" in response_text
@@ -244,8 +245,6 @@ def perform_check_in(task, user_token: str) -> Dict[str, Any]:
"error_message": "",
}
# 情况3: 不在打卡时间范围 → 标记为时间范围外
# 匹配 Data 或 Description 中的内容
elif "不在打卡时间范围" in response_text or "不在打卡时间" in response_text:
logger.warning(f"⏰ 检测到'不在打卡时间范围',打卡时间不符")
return {
@@ -255,8 +254,6 @@ def perform_check_in(task, user_token: str) -> Dict[str, Any]:
"error_message": "不在打卡时间范围内",
}
# 情况4: Token 失效的特征标识 → 失败
# 扩展检测条件:检测多种 Token 失效的响应特征
elif (
"登录" in response_text
or "授权" in response_text
@@ -266,16 +263,13 @@ def perform_check_in(task, user_token: str) -> Dict[str, Any]:
or response.status_code == 401
):
logger.warning(f"⚠️ 检测到Token失效特征,Token 可能已失效")
# 发送打卡失败邮件通知(邮件内容已包含Token失效提醒和刷新指引)
if task.user and task.user.email:
try:
from backend.services.email_service import EmailService
from backend.utils.json_helpers import build_task_info
# 使用辅助函数构建 task_info(从 task 对象提取信息)
task_info = build_task_info(task)
# 只发送打卡失败通知(内容已说明Token失效)
EmailService.notify_check_in_result(
task.user, task_info, False, "Token 已失效,需要重新授权"
)
@@ -284,15 +278,13 @@ def perform_check_in(task, user_token: str) -> Dict[str, Any]:
return {
"success": False,
"status": "token_expired", # 特殊状态,用于标识 Token 过期
"status": "token_expired",
"response_text": response_text,
"error_message": "Token 已失效,需要重新授权",
}
# 情况5: 其他响应 → 需要人工确认(标记为异常)
else:
logger.warning(f"⚠️ 未识别的响应内容,请检查: {response_text[:200]}...")
# 标记为未知状态,记录完整响应供后续分析
return {
"success": False,
"status": "unknown",
+124 -166
View File
@@ -1,34 +1,32 @@
import os
import logging
from __future__ import annotations
import base64
import json
from pathlib import Path
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import logging
import time
from filelock import FileLock
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import sync_playwright
from backend.config import settings
from backend.services.registration_manager import registration_manager
from backend.workers.browser_automation import (
PlaywrightLaunchConfig,
extract_cookie_value,
save_page_debug_artifacts,
)
logger = logging.getLogger(__name__)
# Chrome 配置路径
BASE_DIR = settings.BASE_DIR
# 调试文件路径
DEBUG_SCREENSHOT_PATH = os.path.join(BASE_DIR, "debug_screenshot.png")
DEBUG_PAGE_SOURCE_PATH = os.path.join(BASE_DIR, "debug_page_source.html")
DEBUG_SCREENSHOT_PATH = BASE_DIR / "debug_screenshot.png"
DEBUG_PAGE_SOURCE_PATH = BASE_DIR / "debug_page_source.html"
def get_chrome_config():
"""获取 Chrome 配置(从 settings 读取)"""
return {
"chrome_binary": settings.CHROME_BINARY_PATH,
"chromedriver": settings.CHROMEDRIVER_PATH,
}
def get_browser_config() -> PlaywrightLaunchConfig:
"""获取 Playwright 浏览器配置(从 settings 读取)"""
return PlaywrightLaunchConfig(executable_path=settings.BROWSER_EXECUTABLE_PATH)
def update_session_file(session_id: str, data: dict) -> None:
@@ -44,7 +42,7 @@ def update_session_file(session_id: str, data: dict) -> None:
logger.error(f"写入会话文件 {filepath} 失败: {e}")
def get_session_status(session_id: str) -> str:
def get_session_status(session_id: str) -> str | None:
"""安全地读取会话文件的状态"""
filepath = settings.SESSION_DIR / f"{session_id}.json"
lock_path = settings.SESSION_DIR / f"{session_id}.json.lock"
@@ -67,7 +65,7 @@ def get_session_status(session_id: str) -> str:
return None
def get_session_data(session_id: str) -> dict:
def get_session_data(session_id: str) -> dict | None:
"""读取完整的会话数据"""
filepath = settings.SESSION_DIR / f"{session_id}.json"
lock_path = settings.SESSION_DIR / f"{session_id}.json.lock"
@@ -108,7 +106,6 @@ def cancel_session(session_id: str) -> bool:
try:
with FileLock(lock_path, timeout=5):
# 读取当前会话数据
from backend.utils.json_helpers import safe_parse_json
with open(filepath, "r", encoding="utf-8") as f:
@@ -117,16 +114,13 @@ def cancel_session(session_id: str) -> bool:
return False
data = safe_parse_json(content, {})
# 如果已经成功,不允许取消
if data.get("status") == "success":
logger.info(f"会话 {session_id} 已成功,无法取消")
return False
# 标记为已取消
data["status"] = "cancelled"
data["message"] = "用户取消登录"
# 写回文件
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
@@ -138,11 +132,29 @@ def cancel_session(session_id: str) -> bool:
return False
def _close_quietly(resource, label: str) -> None:
if not resource:
return
try:
resource.close()
except Exception as e:
logger.warning(f"关闭 {label} 时出现警告: {e}")
def _release_alias_if_needed(alias: str | None, session_id: str) -> None:
if not alias:
return
registration_manager.release_alias(alias, session_id)
logger.info(f"释放用户名预占: {alias}")
def get_token_headless(
session_id: str, jwt_sub: str = None, alias: str = None, client_ip: str = ""
) -> None:
"""
使用 Selenium 获取 QQ 扫码登录的 Token
使用 Playwright 获取 QQ 扫码登录的 Token
Args:
session_id: 会话 ID
@@ -150,177 +162,119 @@ def get_token_headless(
alias: 用户别名(用于新用户注册)
client_ip: 客户端 IP 地址
"""
driver = None
current_step = "初始化"
browser = None
context = None
page = None
playwright = None
try:
# 获取 Chrome 配置
chrome_config = get_chrome_config()
chrome_binary_path = chrome_config["chrome_binary"]
chromedriver_path = chrome_config["chromedriver"]
browser_config = get_browser_config()
logger.info(f"Playwright ({session_id}): {current_step}...")
# 配置 Chrome 选项
current_step = "配置 ChromeDriver"
logger.info(f"Selenium ({session_id}): {current_step}...")
playwright = sync_playwright().start()
current_step = "启动浏览器"
logger.info(f"Playwright ({session_id}): {current_step}...")
browser = playwright.chromium.launch(**browser_config.to_launch_kwargs())
logger.info(f"Playwright ({session_id}): 浏览器启动成功")
chrome_options = Options()
current_step = "创建上下文"
context = browser.new_context(**browser_config.to_context_kwargs())
page = context.new_page()
# 如果指定了自定义 Chrome 路径,则使用
if chrome_binary_path:
chrome_options.binary_location = chrome_binary_path
logger.info(f"Selenium ({session_id}): 使用自定义 Chrome 路径: {chrome_binary_path}")
else:
logger.info(f"Selenium ({session_id}): 使用系统默认 Chrome")
# Headless 模式配置
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36"
chrome_options.add_argument(f"user-agent={user_agent}")
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument("--ignore-certificate-errors")
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
# 启动浏览器
current_step = "启动 Chrome 浏览器"
logger.info(f"Selenium ({session_id}): {current_step}...")
# 如果指定了 ChromeDriver 路径,则使用 Service;否则让 Selenium 自动管理
if chromedriver_path:
service = Service(executable_path=chromedriver_path)
driver = webdriver.Chrome(service=service, options=chrome_options)
logger.info(f"Selenium ({session_id}): 使用自定义 ChromeDriver: {chromedriver_path}")
else:
driver = webdriver.Chrome(options=chrome_options)
logger.info(f"Selenium ({session_id}): 使用 Selenium Manager 自动管理 ChromeDriver")
logger.info(f"Selenium ({session_id}): Chrome 浏览器启动成功")
current_step = "导航到登录页面"
logger.info(f"Selenium ({session_id}): {current_step}...")
driver.get("https://i.jielong.com/login?redirectTo=https%3A%2F%2Fi.jielong.com%2F")
logger.info(f"Playwright ({session_id}): {current_step}...")
page.goto(
"https://i.jielong.com/login?redirectTo=https%3A%2F%2Fi.jielong.com%2F",
wait_until="domcontentloaded",
timeout=60000,
)
wait = WebDriverWait(driver, 60)
# --- 步骤 1: 点击切换到 QQ 登录 ---
current_step = "查找并点击切换按钮"
toggle_button_selector = "div.login-wrap .toggle"
logger.info(f"Selenium ({session_id}): {current_step} ({toggle_button_selector})...")
toggle_button = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, toggle_button_selector))
)
toggle_button.click()
toggle_button = page.locator("div.login-wrap .toggle")
logger.info(f"Playwright ({session_id}): {current_step}...")
toggle_button.click(timeout=60000)
# --- 步骤 2: 勾选同意服务协议 ---
current_step = "勾选同意服务协议"
checkbox_selector = "input.ant-checkbox-input[type='checkbox']"
logger.info(f"Selenium ({session_id}): {current_step} ({checkbox_selector})...")
checkbox = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, checkbox_selector)))
if not checkbox.is_selected():
checkbox.click()
logger.info(f"Selenium ({session_id}): 已勾选服务协议")
checkbox = page.locator("input.ant-checkbox-input[type='checkbox']")
logger.info(f"Playwright ({session_id}): {current_step}...")
if not checkbox.is_checked():
checkbox.click(timeout=60000)
logger.info(f"Playwright ({session_id}): 已勾选服务协议")
# --- 步骤 3: 点击"立即登录"按钮 ---
current_step = "点击立即登录按钮"
login_button_selector = "button.css-1wli0ry.ant-btn.ant-btn-default.login-btn"
logger.info(f"Selenium ({session_id}): {current_step} ({login_button_selector})...")
login_button = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, login_button_selector))
)
login_button.click()
login_button = page.locator("button.css-1wli0ry.ant-btn.ant-btn-default.login-btn")
logger.info(f"Playwright ({session_id}): {current_step}...")
login_button.click(timeout=60000)
# --- 步骤 4: 等待二维码加载 ---
import time
time.sleep(3) # 等待几秒让二维码刷新出来
current_step = "等待二维码刷新"
page.wait_for_timeout(3000)
current_step = "等待QQ二维码图片加载"
qq_qr_image_selector = "#login_container img"
logger.info(f"Selenium ({session_id}): {current_step} ({qq_qr_image_selector})...")
qr_element = wait.until(
EC.visibility_of_element_located((By.CSS_SELECTOR, qq_qr_image_selector))
)
qr_locator = page.locator("#login_container img").first
logger.info(f"Playwright ({session_id}): {current_step}...")
qr_locator.wait_for(state="visible", timeout=60000)
logger.info(f"Selenium ({session_id}): 成功找到QQ二维码元素,正在截图...")
qr_base64 = qr_element.screenshot_as_base64
logger.info(f"Playwright ({session_id}): 成功找到QQ二维码元素,正在截图...")
qr_base64 = base64.b64encode(qr_locator.screenshot(timeout=60000)).decode("ascii")
update_session_file(
session_id,
{
"status": "waiting_scan",
"qr_image_data": qr_base64,
"jwt_sub": jwt_sub,
"alias": alias, # 新增:保存 alias
"client_ip": client_ip, # 新增:保存 IP
"alias": alias,
"client_ip": client_ip,
},
)
current_step = "等待用户扫描登录 (Cookie 'token' 出现)"
cookie_name_to_find = "token"
logger.info(f"Selenium ({session_id}): {current_step}...")
logger.info(f"Playwright ({session_id}): {current_step}...")
# 自定义等待逻辑:每秒检查cookie和session状态
max_wait_seconds = 120
import time
for i in range(max_wait_seconds):
# 检查session是否被取消
for _ in range(max_wait_seconds):
status = get_session_status(session_id)
if status == "cancelled":
logger.info(f"Selenium ({session_id}): 用户取消了登录,终止会话")
raise Exception("用户取消登录")
logger.info(f"Playwright ({session_id}): 用户取消了登录,终止会话")
_release_alias_if_needed(alias, session_id)
return
# 检查cookie是否出现
cookie = driver.get_cookie(cookie_name_to_find)
if cookie:
break
token = extract_cookie_value(context.cookies(), "token")
if token:
logger.info(f"Playwright ({session_id}): 成功在Cookie中捕获到Token")
update_session_file(
session_id,
{
"status": "success",
"token": token,
"alias": alias,
"client_ip": client_ip,
},
)
return
time.sleep(1)
else:
# 超时未获取到cookie
raise TimeoutException("等待扫码超时")
cookie = driver.get_cookie(cookie_name_to_find)
if cookie:
logger.info(f"Selenium ({session_id}): 成功在Cookie中捕获到Token")
update_session_file(
session_id,
{
"status": "success",
"token": cookie["value"],
"alias": alias, # 保存 alias
"client_ip": client_ip, # 保存 IP
},
)
else:
raise Exception("等待Cookie成功但获取失败")
raise PlaywrightTimeoutError("等待扫码超时")
except TimeoutException:
except PlaywrightTimeoutError:
if get_session_status(session_id) == "success":
logger.warning(
f"Selenium ({session_id}): 一个并发线程超时,但会话已成功,将忽略此超时。"
f"Playwright ({session_id}): 一个并发线程超时,但会话已成功,将忽略此超时。"
)
else:
# 释放预占的用户名
if alias:
from backend.services.registration_manager import registration_manager
_release_alias_if_needed(alias, session_id)
error_message = f"操作超时!卡在了步骤: '{current_step}'。请检查页面选择器或网络。"
logger.error(f"Playwright ({session_id}): {error_message}")
registration_manager.release_alias(alias, session_id)
logger.info(f"超时释放用户名预占: {alias}")
error_message = f"操作超时!卡在了步骤: '{current_step}'。请检查CSS选择器或网络。"
logger.error(f"Selenium ({session_id}): {error_message}")
# 保存调试信息(仅当 driver 已创建时)
if driver:
if page:
try:
driver.save_screenshot(DEBUG_SCREENSHOT_PATH)
with open(DEBUG_PAGE_SOURCE_PATH, "w", encoding="utf-8") as f:
f.write(driver.page_source)
save_page_debug_artifacts(page, DEBUG_SCREENSHOT_PATH, DEBUG_PAGE_SOURCE_PATH)
logger.error(
f"Selenium ({session_id}): 调试截图和源码已保存。当前URL: {driver.current_url}"
f"Playwright ({session_id}): 调试截图和源码已保存。当前URL: {page.url}"
)
except Exception as debug_error:
logger.error(f"Selenium ({session_id}): 保存调试信息失败: {debug_error}")
logger.error(f"Playwright ({session_id}): 保存调试信息失败: {debug_error}")
update_session_file(
session_id, {"status": "error", "message": error_message, "jwt_sub": jwt_sub}
@@ -329,25 +283,29 @@ def get_token_headless(
except Exception as e:
if get_session_status(session_id) == "success":
logger.warning(
f"Selenium ({session_id}): 一个并发线程出错 ({e}),但会话已成功,将忽略此错误。"
f"Playwright ({session_id}): 一个并发线程出错 ({e}),但会话已成功,将忽略此错误。"
)
else:
# 释放预占的用户名
if alias:
from backend.services.registration_manager import registration_manager
_release_alias_if_needed(alias, session_id)
logger.error(f"Playwright ({session_id}): 发生未知错误: {e}", exc_info=True)
registration_manager.release_alias(alias, session_id)
logger.info(f"异常释放用户名预占: {alias}")
if page:
try:
save_page_debug_artifacts(page, DEBUG_SCREENSHOT_PATH, DEBUG_PAGE_SOURCE_PATH)
except Exception as debug_error:
logger.error(f"Playwright ({session_id}): 保存调试信息失败: {debug_error}")
logger.error(f"Selenium ({session_id}): 发生未知错误: {e}", exc_info=True)
update_session_file(
session_id, {"status": "error", "message": str(e), "jwt_sub": jwt_sub}
)
finally:
if driver:
_close_quietly(page, "页面")
_close_quietly(context, "浏览器上下文")
_close_quietly(browser, "浏览器")
if playwright:
try:
driver.quit()
logger.info(f"Selenium ({session_id}): 浏览器已关闭")
except Exception as quit_error:
logger.error(f"Selenium ({session_id}): 关闭浏览器失败: {quit_error}")
playwright.stop()
except Exception as e:
logger.warning(f"关闭 Playwright runtime 时出现警告: {e}")
logger.info(f"Playwright ({session_id}): 浏览器已关闭")