From c48e16a97709c09b45b0fc4e98e9413c8dbf2842 Mon Sep 17 00:00:00 2001 From: Yaosanqi137 Date: Mon, 6 Apr 2026 01:15:50 +0800 Subject: [PATCH] feat(web-sync): add background sync worker and retry strategy --- apps/web/src/hooks/use-sync-engine.ts | 255 ++++++++++++ apps/web/src/pages/todo-shell-page.tsx | 482 +++++++++++++++-------- apps/web/src/services/local-db.ts | 33 ++ apps/web/src/services/local-sync-repo.ts | 124 ++++++ apps/web/src/services/sync-api.ts | 117 ++++++ apps/web/src/services/sync-worker.ts | 104 +++++ 6 files changed, 953 insertions(+), 162 deletions(-) create mode 100644 apps/web/src/hooks/use-sync-engine.ts create mode 100644 apps/web/src/services/local-sync-repo.ts create mode 100644 apps/web/src/services/sync-api.ts create mode 100644 apps/web/src/services/sync-worker.ts diff --git a/apps/web/src/hooks/use-sync-engine.ts b/apps/web/src/hooks/use-sync-engine.ts new file mode 100644 index 0000000..c6da2ea --- /dev/null +++ b/apps/web/src/hooks/use-sync-engine.ts @@ -0,0 +1,255 @@ +import { useCallback, useEffect, useMemo, useRef, useState } from "react"; +import { useLiveQuery } from "dexie-react-hooks"; +import { + countBlockedSyncOperations, + countPendingRemoteOperations, + countPendingSyncOperations, + getLocalSyncState +} from "@/services/local-sync-repo"; +import type { WebSession } from "@/services/session-storage"; +import { runSyncWorkerCycle } from "@/services/sync-worker"; + +const PERIODIC_SYNC_INTERVAL_MS = 30_000; +const MAX_RETRY_DELAY_MS = 60_000; +const BASE_RETRY_DELAY_MS = 2_000; + +export type SyncEngineStatus = { + isOnline: boolean; + phase: "idle" | "syncing" | "offline" | "backoff" | "attention"; + pendingCount: number; + blockedCount: number; + pendingRemoteCount: number; + lastSyncedAt: number | null; + nextRetryAt: number | null; + lastError: string | null; +}; + +function getErrorMessage(error: unknown): string { + if (error instanceof Error && error.message.trim()) { + return error.message; + } + + return "同步失败,请稍后重试"; +} + +function calculateRetryDelay(attempt: number): number { + return Math.min(BASE_RETRY_DELAY_MS * 2 ** Math.max(attempt - 1, 0), MAX_RETRY_DELAY_MS); +} + +export function useSyncEngine(session: WebSession | null): { + status: SyncEngineStatus; + triggerSync: () => void; +} { + const userId = session?.user.id ?? ""; + const pendingCount = useLiveQuery(async () => countPendingSyncOperations(), [userId]) ?? 0; + const blockedCount = useLiveQuery(async () => countBlockedSyncOperations(), [userId]) ?? 0; + const pendingRemoteCount = + useLiveQuery(async () => { + if (!userId) { + return 0; + } + + return countPendingRemoteOperations(userId); + }, [userId]) ?? 0; + const storedSyncState = + useLiveQuery(async () => { + if (!userId) { + return null; + } + + return getLocalSyncState(userId); + }, [userId]) ?? null; + + const [isOnline, setIsOnline] = useState(() => window.navigator.onLine); + const [phase, setPhase] = useState( + window.navigator.onLine ? "idle" : "offline" + ); + const [lastError, setLastError] = useState(null); + const [nextRetryAt, setNextRetryAt] = useState(null); + const [lastSyncedAt, setLastSyncedAt] = useState(null); + + const retryAttemptRef = useRef(0); + const runningRef = useRef(false); + + useEffect(() => { + setLastSyncedAt(storedSyncState?.lastSyncedAt ?? null); + }, [storedSyncState]); + + const runCycle = useCallback(async () => { + if (!userId || runningRef.current || !window.navigator.onLine) { + return; + } + + runningRef.current = true; + setPhase("syncing"); + setLastError(null); + setNextRetryAt(null); + + try { + const result = await runSyncWorkerCycle(userId); + retryAttemptRef.current = 0; + setLastSyncedAt(result.lastSyncedAt); + + if (result.hasFailures) { + const nextAttempt = retryAttemptRef.current + 1; + retryAttemptRef.current = nextAttempt; + const delay = calculateRetryDelay(nextAttempt); + setLastError(result.failureMessage ?? "同步失败"); + setNextRetryAt(Date.now() + delay); + setPhase("backoff"); + return; + } + + setPhase(blockedCount > 0 ? "attention" : "idle"); + } catch (error) { + const nextAttempt = retryAttemptRef.current + 1; + retryAttemptRef.current = nextAttempt; + const delay = calculateRetryDelay(nextAttempt); + setLastError(getErrorMessage(error)); + setNextRetryAt(Date.now() + delay); + setPhase("backoff"); + } finally { + runningRef.current = false; + } + }, [blockedCount, userId]); + + const triggerSync = useCallback(() => { + void runCycle(); + }, [runCycle]); + + useEffect(() => { + function handleOnline(): void { + setIsOnline(true); + setPhase(blockedCount > 0 ? "attention" : "idle"); + void runCycle(); + } + + function handleOffline(): void { + setIsOnline(false); + setNextRetryAt(null); + setPhase("offline"); + } + + function handleVisibilityChange(): void { + if (document.visibilityState === "visible" && window.navigator.onLine) { + void runCycle(); + } + } + + window.addEventListener("online", handleOnline); + window.addEventListener("offline", handleOffline); + document.addEventListener("visibilitychange", handleVisibilityChange); + + return () => { + window.removeEventListener("online", handleOnline); + window.removeEventListener("offline", handleOffline); + document.removeEventListener("visibilitychange", handleVisibilityChange); + }; + }, [blockedCount, runCycle]); + + useEffect(() => { + if (!userId || !isOnline) { + return; + } + + if (pendingCount === 0 && pendingRemoteCount === 0) { + return; + } + + void runCycle(); + }, [isOnline, pendingCount, pendingRemoteCount, runCycle, userId]); + + useEffect(() => { + if (!userId || !isOnline) { + return; + } + + const intervalId = window.setInterval(() => { + void runCycle(); + }, PERIODIC_SYNC_INTERVAL_MS); + + return () => { + window.clearInterval(intervalId); + }; + }, [isOnline, runCycle, userId]); + + useEffect(() => { + if (!nextRetryAt || !isOnline) { + return; + } + + const timeoutId = window.setTimeout( + () => { + void runCycle(); + }, + Math.max(nextRetryAt - Date.now(), 0) + ); + + return () => { + window.clearTimeout(timeoutId); + }; + }, [isOnline, nextRetryAt, runCycle]); + + useEffect(() => { + if (!userId) { + setLastError(null); + setLastSyncedAt(null); + setNextRetryAt(null); + setPhase(window.navigator.onLine ? "idle" : "offline"); + retryAttemptRef.current = 0; + } + }, [userId]); + + const status = useMemo(() => { + if (!isOnline) { + return { + isOnline, + phase: "offline", + pendingCount, + blockedCount, + pendingRemoteCount, + lastSyncedAt, + nextRetryAt: null, + lastError + }; + } + + if (blockedCount > 0 && phase !== "syncing") { + return { + isOnline, + phase: "attention", + pendingCount, + blockedCount, + pendingRemoteCount, + lastSyncedAt, + nextRetryAt, + lastError + }; + } + + return { + isOnline, + phase, + pendingCount, + blockedCount, + pendingRemoteCount, + lastSyncedAt, + nextRetryAt, + lastError + }; + }, [ + blockedCount, + isOnline, + lastError, + lastSyncedAt, + nextRetryAt, + pendingCount, + pendingRemoteCount, + phase + ]); + + return { + status, + triggerSync + }; +} diff --git a/apps/web/src/pages/todo-shell-page.tsx b/apps/web/src/pages/todo-shell-page.tsx index 9af63ad..8904f43 100644 --- a/apps/web/src/pages/todo-shell-page.tsx +++ b/apps/web/src/pages/todo-shell-page.tsx @@ -1,6 +1,14 @@ import { useCallback, useEffect, useRef, useState } from "react"; import { useLiveQuery } from "dexie-react-hooks"; -import { CheckCircle2, CircleAlert } from "lucide-react"; +import { + CheckCircle2, + CircleAlert, + CloudOff, + LoaderCircle, + RefreshCw, + ServerCrash +} from "lucide-react"; +import { useSyncEngine, type SyncEngineStatus } from "@/hooks/use-sync-engine"; import { TaskRichEditor } from "@/components/task-rich-editor"; import { Button } from "@/components/ui/button"; import { cn } from "@/lib/utils"; @@ -138,6 +146,105 @@ function serializeFormState(formState: TaskFormState): string { return JSON.stringify(formState); } +function formatSyncTimestamp(timestamp: number | null): string { + if (timestamp === null) { + return "尚未完成同步"; + } + + return new Date(timestamp).toLocaleString("zh-CN", { + month: "2-digit", + day: "2-digit", + hour: "2-digit", + minute: "2-digit" + }); +} + +function formatRetryTime(timestamp: number | null): string { + if (timestamp === null) { + return "稍后"; + } + + return new Date(timestamp).toLocaleTimeString("zh-CN", { + hour: "2-digit", + minute: "2-digit", + second: "2-digit" + }); +} + +function getSyncSummary(status: SyncEngineStatus): { + title: string; + description: string; + accentClassName: string; + icon: typeof RefreshCw; + iconClassName: string; +} { + if (status.phase === "offline") { + return { + title: "离线工作中", + description: + status.pendingCount > 0 + ? `当前离线,已保留 ${status.pendingCount} 条待上传改动。` + : "当前离线,本地仍可继续编辑,联网后会自动同步。", + accentClassName: "border-amber-200/80 bg-amber-50/80 text-amber-950", + icon: CloudOff, + iconClassName: "text-amber-600" + }; + } + + if (status.phase === "syncing") { + return { + title: "正在同步", + description: "正在上传本地改动并拉取最新云端增量。", + accentClassName: "border-primary/20 bg-primary/10 text-foreground", + icon: LoaderCircle, + iconClassName: "animate-spin text-primary" + }; + } + + if (status.phase === "backoff") { + return { + title: "同步稍后重试", + description: `${status.lastError ?? "同步失败"},系统将在 ${formatRetryTime( + status.nextRetryAt + )} 再试一次。`, + accentClassName: "border-destructive/20 bg-destructive/8 text-foreground", + icon: ServerCrash, + iconClassName: "text-destructive" + }; + } + + if (status.phase === "attention") { + return { + title: "需要人工关注", + description: `有 ${status.blockedCount} 条同步记录已达到重试上限,请检查接口配置或网络环境。`, + accentClassName: "border-destructive/20 bg-destructive/8 text-foreground", + icon: CircleAlert, + iconClassName: "text-destructive" + }; + } + + if (status.pendingRemoteCount > 0) { + return { + title: "云端变更已接收", + description: `已收到 ${status.pendingRemoteCount} 条云端变更,后续会进入本地合并流程。`, + accentClassName: "border-sky-200/80 bg-sky-50/80 text-sky-950", + icon: RefreshCw, + iconClassName: "text-sky-600" + }; + } + + return { + title: "同步状态正常", + description: + status.pendingCount > 0 + ? `还有 ${status.pendingCount} 条本地改动待处理。` + : "本地改动与云端增量传输均处于正常状态。", + accentClassName: "border-emerald-200/80 bg-emerald-50/80 text-emerald-950", + icon: CheckCircle2, + iconClassName: "text-emerald-600" + }; +} + export function TodoShellPage({ session }: TodoShellPageProps) { const [selectedTaskId, setSelectedTaskId] = useState(null); const [formState, setFormState] = useState(DEFAULT_FORM_STATE); @@ -148,6 +255,7 @@ export function TodoShellPage({ session }: TodoShellPageProps) { const [feedbackVisible, setFeedbackVisible] = useState(false); const [draftReadyTaskId, setDraftReadyTaskId] = useState(null); const savedTaskSnapshotRef = useRef(serializeFormState(DEFAULT_FORM_STATE)); + const { status: syncStatus, triggerSync } = useSyncEngine(session); const userId = session?.user.id ?? ""; @@ -417,191 +525,241 @@ export function TodoShellPage({ session }: TodoShellPageProps) { } const taskList = tasks ?? []; + const syncSummary = getSyncSummary(syncStatus); + const SyncSummaryIcon = syncSummary.icon; return ( <> {renderFeedbackBanner()} -
-
-
-

任务列表

- -
- - {quotaSnapshot ? ( -

= 85 ? "text-destructive" : "text-muted-foreground" - )} - > - 空间占用(估算):{formatStorageSize(quotaSnapshot.usedBytes)} /{" "} - {formatStorageSize(quotaSnapshot.quotaBytes)}({quotaSnapshot.usedPercent.toFixed(1)} - %) -

- ) : null} - - {taskList.length === 0 ? ( -

- 还没有任务,点击右上角“新建任务”。 -

- ) : ( -
- {taskList.map((task) => { - const isActive = task.id === selectedTaskId; - return ( - - ); - })} -
+
+
+
+
+
+ +
+
+

{syncSummary.title}

+

{syncSummary.description}

+
+
+ +
+ + 待上传 {syncStatus.pendingCount} + + + 云端待合并 {syncStatus.pendingRemoteCount} + + {syncStatus.blockedCount > 0 ? ( + + 阻塞 {syncStatus.blockedCount} + + ) : null} + + 上次成功 {formatSyncTimestamp(syncStatus.lastSyncedAt)} + + +
+
-
-
-

任务详情

-
+
+
+
+

任务列表

-
-
- {!selectedTaskId || !selectedTask ? ( -

- 请选择一个任务进行编辑。 -

- ) : ( -
- + {quotaSnapshot ? ( +

= 85 ? "text-destructive" : "text-muted-foreground" + )} + > + 空间占用(估算):{formatStorageSize(quotaSnapshot.usedBytes)} /{" "} + {formatStorageSize(quotaSnapshot.quotaBytes)}( + {quotaSnapshot.usedPercent.toFixed(1)} + %) +

+ ) : null} -
- - - + {taskList.length === 0 ? ( +

+ 还没有任务,点击右上角“新建任务”。 +

+ ) : ( +
+ {taskList.map((task) => { + const isActive = task.id === selectedTaskId; + return ( + + ); + })}
+ )} +
- +
+
+

任务详情

+
+ + +
+
-
-

任务内容

-
- + {!selectedTaskId || !selectedTask ? ( +

+ 请选择一个任务进行编辑。 +

+ ) : ( +
+ + +
+ + + +
+ + + +
+

任务内容

+
+ + setFormState((previous) => ({ + ...previous, + contentJson: payload.json, + contentText: payload.text + })) + } + /> +
-
- )} -
+ )} +
+
); diff --git a/apps/web/src/services/local-db.ts b/apps/web/src/services/local-db.ts index 9e934c4..5cdf57f 100644 --- a/apps/web/src/services/local-db.ts +++ b/apps/web/src/services/local-db.ts @@ -47,10 +47,33 @@ export type LocalTaskDraftRecord = { updatedAt: number; }; +export type LocalSyncStateRecord = { + userId: string; + cursor: string | null; + lastSyncedAt: number | null; + updatedAt: number; +}; + +export type LocalSyncInboxRecord = { + opId: string; + userId: string; + entityId: string; + entityType: SyncEntityType; + action: SyncActionType; + payload: string | null; + clientTs: number; + deviceId: string; + serverTs: number; + receivedAt: number; + appliedAt: number | null; +}; + class TodoLocalDb extends Dexie { declare tasks: Table; declare opLogs: Table; declare taskDrafts: Table; + declare syncStates: Table; + declare syncInbox: Table; constructor() { super("todolist-web-db"); @@ -66,9 +89,19 @@ class TodoLocalDb extends Dexie { task_drafts: "&taskId,userId,updatedAt" }); + this.version(3).stores({ + tasks: "&id,userId,status,priority,ddlAt,updatedAt,deletedAt", + op_logs: "&opId,entityId,entityType,action,clientTs,syncedAt", + task_drafts: "&taskId,userId,updatedAt", + sync_states: "&userId,updatedAt,lastSyncedAt", + sync_inbox: "&opId,userId,entityId,serverTs,appliedAt" + }); + this.tasks = this.table("tasks"); this.opLogs = this.table("op_logs"); this.taskDrafts = this.table("task_drafts"); + this.syncStates = this.table("sync_states"); + this.syncInbox = this.table("sync_inbox"); } } diff --git a/apps/web/src/services/local-sync-repo.ts b/apps/web/src/services/local-sync-repo.ts new file mode 100644 index 0000000..fbb79ca --- /dev/null +++ b/apps/web/src/services/local-sync-repo.ts @@ -0,0 +1,124 @@ +import { + localDb, + type LocalOpLogRecord, + type LocalSyncInboxRecord, + type LocalSyncStateRecord +} from "@/services/local-db"; +import type { SyncPullItem } from "@/services/sync-api"; + +export const MAX_SYNC_RETRY_COUNT = 5; + +export async function listPendingSyncOperations(limit = 20): Promise { + const records = await localDb.opLogs.orderBy("clientTs").toArray(); + + return records + .filter((record) => record.syncedAt === null && record.retryCount < MAX_SYNC_RETRY_COUNT) + .slice(0, limit); +} + +export async function countPendingSyncOperations(): Promise { + const records = await localDb.opLogs.toArray(); + return records.filter( + (record) => record.syncedAt === null && record.retryCount < MAX_SYNC_RETRY_COUNT + ).length; +} + +export async function countBlockedSyncOperations(): Promise { + const records = await localDb.opLogs.toArray(); + return records.filter( + (record) => record.syncedAt === null && record.retryCount >= MAX_SYNC_RETRY_COUNT + ).length; +} + +export async function markSyncOperationsSucceeded( + opIds: string[], + syncedAt: number +): Promise { + if (opIds.length === 0) { + return; + } + + const records = await localDb.opLogs.bulkGet(opIds); + const nextRecords = records + .filter((record): record is LocalOpLogRecord => record !== undefined) + .map((record) => ({ + ...record, + syncedAt, + errorMessage: null + })); + + if (nextRecords.length > 0) { + await localDb.opLogs.bulkPut(nextRecords); + } +} + +export async function markSyncOperationsFailed( + failures: Array<{ opId: string; errorMessage: string }> +): Promise { + if (failures.length === 0) { + return; + } + + const failureMap = new Map(failures.map((failure) => [failure.opId, failure.errorMessage])); + const records = await localDb.opLogs.bulkGet(failures.map((failure) => failure.opId)); + const nextRecords = records + .filter((record): record is LocalOpLogRecord => record !== undefined) + .map((record) => ({ + ...record, + retryCount: record.retryCount + 1, + errorMessage: failureMap.get(record.opId) ?? "同步失败" + })); + + if (nextRecords.length > 0) { + await localDb.opLogs.bulkPut(nextRecords); + } +} + +export async function getLocalSyncState(userId: string): Promise { + return localDb.syncStates.get(userId); +} + +export async function saveLocalSyncState(input: { + userId: string; + cursor: string | null; + lastSyncedAt: number | null; +}): Promise { + await localDb.syncStates.put({ + userId: input.userId, + cursor: input.cursor, + lastSyncedAt: input.lastSyncedAt, + updatedAt: Date.now() + }); +} + +export async function enqueueRemoteSyncOperations( + userId: string, + operations: SyncPullItem[] +): Promise { + if (operations.length === 0) { + return 0; + } + + const receivedAt = Date.now(); + const records: LocalSyncInboxRecord[] = operations.map((operation) => ({ + opId: operation.opId, + userId, + entityId: operation.entityId, + entityType: operation.entityType, + action: operation.action, + payload: operation.payload, + clientTs: operation.clientTs, + deviceId: operation.deviceId, + serverTs: new Date(operation.serverTs).getTime(), + receivedAt, + appliedAt: null + })); + + await localDb.syncInbox.bulkPut(records); + return records.length; +} + +export async function countPendingRemoteOperations(userId: string): Promise { + const records = await localDb.syncInbox.where("userId").equals(userId).toArray(); + return records.filter((record) => record.appliedAt === null).length; +} diff --git a/apps/web/src/services/sync-api.ts b/apps/web/src/services/sync-api.ts new file mode 100644 index 0000000..7bc803e --- /dev/null +++ b/apps/web/src/services/sync-api.ts @@ -0,0 +1,117 @@ +import type { LocalOpLogRecord } from "@/services/local-db"; + +export type SyncPushResult = { + acceptedCount: number; + duplicateCount: number; + failedCount: number; + results: Array<{ + opId: string; + status: "accepted" | "duplicate" | "failed"; + serverTs: string | null; + reason: string | null; + }>; +}; + +export type SyncPullItem = { + opId: string; + entityId: string; + entityType: "TASK"; + action: "CREATE" | "UPDATE" | "DELETE"; + payload: string | null; + clientTs: number; + deviceId: string; + serverTs: string; +}; + +export type SyncPullResult = { + items: SyncPullItem[]; + nextCursor: string | null; + hasMore: boolean; +}; + +const DEFAULT_API_BASE_URL = "http://localhost:3000"; + +function resolveApiBaseUrl(): string { + const envBaseUrl = import.meta.env.VITE_API_BASE_URL as string | undefined; + if (!envBaseUrl) { + return DEFAULT_API_BASE_URL; + } + + return envBaseUrl.replace(/\/+$/, ""); +} + +async function parseErrorMessage(response: Response): Promise { + try { + const body = (await response.json()) as { message?: string | string[] }; + if (Array.isArray(body.message)) { + return body.message.join(","); + } + + if (typeof body.message === "string" && body.message.trim()) { + return body.message; + } + } catch { + return `请求失败(${response.status})`; + } + + return `请求失败(${response.status})`; +} + +export async function pushSyncOperations( + userId: string, + operations: LocalOpLogRecord[] +): Promise { + const response = await fetch(`${resolveApiBaseUrl()}/sync/push`, { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-user-id": userId + }, + body: JSON.stringify({ + operations: operations.map((operation) => ({ + opId: operation.opId, + entityId: operation.entityId, + entityType: operation.entityType, + action: operation.action, + payload: operation.payload, + clientTs: operation.clientTs, + deviceId: operation.deviceId + })) + }) + }); + + if (!response.ok) { + throw new Error(await parseErrorMessage(response)); + } + + return (await response.json()) as SyncPushResult; +} + +export async function pullSyncOperations(input: { + userId: string; + cursor: string | null; + limit?: number; +}): Promise { + const requestUrl = new URL(`${resolveApiBaseUrl()}/sync/pull`); + + if (input.cursor) { + requestUrl.searchParams.set("cursor", input.cursor); + } + + if (input.limit !== undefined) { + requestUrl.searchParams.set("limit", String(input.limit)); + } + + const response = await fetch(requestUrl, { + method: "GET", + headers: { + "x-user-id": input.userId + } + }); + + if (!response.ok) { + throw new Error(await parseErrorMessage(response)); + } + + return (await response.json()) as SyncPullResult; +} diff --git a/apps/web/src/services/sync-worker.ts b/apps/web/src/services/sync-worker.ts new file mode 100644 index 0000000..cbbadc2 --- /dev/null +++ b/apps/web/src/services/sync-worker.ts @@ -0,0 +1,104 @@ +import { + enqueueRemoteSyncOperations, + getLocalSyncState, + listPendingSyncOperations, + markSyncOperationsFailed, + markSyncOperationsSucceeded, + saveLocalSyncState +} from "@/services/local-sync-repo"; +import { pullSyncOperations, pushSyncOperations } from "@/services/sync-api"; + +const PUSH_BATCH_LIMIT = 20; +const PULL_BATCH_LIMIT = 100; +const MAX_PULL_PAGES_PER_CYCLE = 5; + +export type SyncCycleResult = { + pushedCount: number; + pulledCount: number; + lastSyncedAt: number; + hasFailures: boolean; + failureMessage: string | null; +}; + +export async function runSyncWorkerCycle(userId: string): Promise { + const lastSyncedAt = Date.now(); + let pushedCount = 0; + let pulledCount = 0; + let hasFailures = false; + let failureMessage: string | null = null; + + for (;;) { + const pendingOperations = await listPendingSyncOperations(PUSH_BATCH_LIMIT); + if (pendingOperations.length === 0) { + break; + } + + const pushResult = await pushSyncOperations(userId, pendingOperations); + const syncedOperationIds = pushResult.results + .filter((result) => result.status === "accepted" || result.status === "duplicate") + .map((result) => result.opId); + const failedOperations = pushResult.results + .filter((result) => result.status === "failed") + .map((result) => ({ + opId: result.opId, + errorMessage: result.reason ?? "同步失败" + })); + + await markSyncOperationsSucceeded(syncedOperationIds, lastSyncedAt); + await markSyncOperationsFailed(failedOperations); + + pushedCount += syncedOperationIds.length; + + if (failedOperations.length > 0) { + hasFailures = true; + failureMessage = failedOperations[0]?.errorMessage ?? "同步失败"; + break; + } + + if (pendingOperations.length < PUSH_BATCH_LIMIT) { + break; + } + } + + const currentState = await getLocalSyncState(userId); + let nextCursor = currentState?.cursor ?? null; + + for (let page = 0; page < MAX_PULL_PAGES_PER_CYCLE; page += 1) { + const pullResult = await pullSyncOperations({ + userId, + cursor: nextCursor, + limit: PULL_BATCH_LIMIT + }); + + if (pullResult.items.length > 0) { + pulledCount += await enqueueRemoteSyncOperations(userId, pullResult.items); + } + + nextCursor = pullResult.nextCursor; + await saveLocalSyncState({ + userId, + cursor: nextCursor, + lastSyncedAt + }); + + if (!pullResult.hasMore) { + break; + } + } + + if (currentState === undefined && nextCursor === null) { + await saveLocalSyncState({ + userId, + cursor: null, + lastSyncedAt + }); + } + + return { + pushedCount, + pulledCount, + lastSyncedAt, + hasFailures, + failureMessage + }; +}