feat(web-sync): add background sync worker and retry strategy

This commit is contained in:
2026-04-06 01:15:50 +08:00
parent 661788ae75
commit c48e16a977
6 changed files with 953 additions and 162 deletions
+255
View File
@@ -0,0 +1,255 @@
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { useLiveQuery } from "dexie-react-hooks";
import {
countBlockedSyncOperations,
countPendingRemoteOperations,
countPendingSyncOperations,
getLocalSyncState
} from "@/services/local-sync-repo";
import type { WebSession } from "@/services/session-storage";
import { runSyncWorkerCycle } from "@/services/sync-worker";
const PERIODIC_SYNC_INTERVAL_MS = 30_000;
const MAX_RETRY_DELAY_MS = 60_000;
const BASE_RETRY_DELAY_MS = 2_000;
export type SyncEngineStatus = {
isOnline: boolean;
phase: "idle" | "syncing" | "offline" | "backoff" | "attention";
pendingCount: number;
blockedCount: number;
pendingRemoteCount: number;
lastSyncedAt: number | null;
nextRetryAt: number | null;
lastError: string | null;
};
function getErrorMessage(error: unknown): string {
if (error instanceof Error && error.message.trim()) {
return error.message;
}
return "同步失败,请稍后重试";
}
function calculateRetryDelay(attempt: number): number {
return Math.min(BASE_RETRY_DELAY_MS * 2 ** Math.max(attempt - 1, 0), MAX_RETRY_DELAY_MS);
}
export function useSyncEngine(session: WebSession | null): {
status: SyncEngineStatus;
triggerSync: () => void;
} {
const userId = session?.user.id ?? "";
const pendingCount = useLiveQuery(async () => countPendingSyncOperations(), [userId]) ?? 0;
const blockedCount = useLiveQuery(async () => countBlockedSyncOperations(), [userId]) ?? 0;
const pendingRemoteCount =
useLiveQuery(async () => {
if (!userId) {
return 0;
}
return countPendingRemoteOperations(userId);
}, [userId]) ?? 0;
const storedSyncState =
useLiveQuery(async () => {
if (!userId) {
return null;
}
return getLocalSyncState(userId);
}, [userId]) ?? null;
const [isOnline, setIsOnline] = useState(() => window.navigator.onLine);
const [phase, setPhase] = useState<SyncEngineStatus["phase"]>(
window.navigator.onLine ? "idle" : "offline"
);
const [lastError, setLastError] = useState<string | null>(null);
const [nextRetryAt, setNextRetryAt] = useState<number | null>(null);
const [lastSyncedAt, setLastSyncedAt] = useState<number | null>(null);
const retryAttemptRef = useRef(0);
const runningRef = useRef(false);
useEffect(() => {
setLastSyncedAt(storedSyncState?.lastSyncedAt ?? null);
}, [storedSyncState]);
const runCycle = useCallback(async () => {
if (!userId || runningRef.current || !window.navigator.onLine) {
return;
}
runningRef.current = true;
setPhase("syncing");
setLastError(null);
setNextRetryAt(null);
try {
const result = await runSyncWorkerCycle(userId);
retryAttemptRef.current = 0;
setLastSyncedAt(result.lastSyncedAt);
if (result.hasFailures) {
const nextAttempt = retryAttemptRef.current + 1;
retryAttemptRef.current = nextAttempt;
const delay = calculateRetryDelay(nextAttempt);
setLastError(result.failureMessage ?? "同步失败");
setNextRetryAt(Date.now() + delay);
setPhase("backoff");
return;
}
setPhase(blockedCount > 0 ? "attention" : "idle");
} catch (error) {
const nextAttempt = retryAttemptRef.current + 1;
retryAttemptRef.current = nextAttempt;
const delay = calculateRetryDelay(nextAttempt);
setLastError(getErrorMessage(error));
setNextRetryAt(Date.now() + delay);
setPhase("backoff");
} finally {
runningRef.current = false;
}
}, [blockedCount, userId]);
const triggerSync = useCallback(() => {
void runCycle();
}, [runCycle]);
useEffect(() => {
function handleOnline(): void {
setIsOnline(true);
setPhase(blockedCount > 0 ? "attention" : "idle");
void runCycle();
}
function handleOffline(): void {
setIsOnline(false);
setNextRetryAt(null);
setPhase("offline");
}
function handleVisibilityChange(): void {
if (document.visibilityState === "visible" && window.navigator.onLine) {
void runCycle();
}
}
window.addEventListener("online", handleOnline);
window.addEventListener("offline", handleOffline);
document.addEventListener("visibilitychange", handleVisibilityChange);
return () => {
window.removeEventListener("online", handleOnline);
window.removeEventListener("offline", handleOffline);
document.removeEventListener("visibilitychange", handleVisibilityChange);
};
}, [blockedCount, runCycle]);
useEffect(() => {
if (!userId || !isOnline) {
return;
}
if (pendingCount === 0 && pendingRemoteCount === 0) {
return;
}
void runCycle();
}, [isOnline, pendingCount, pendingRemoteCount, runCycle, userId]);
useEffect(() => {
if (!userId || !isOnline) {
return;
}
const intervalId = window.setInterval(() => {
void runCycle();
}, PERIODIC_SYNC_INTERVAL_MS);
return () => {
window.clearInterval(intervalId);
};
}, [isOnline, runCycle, userId]);
useEffect(() => {
if (!nextRetryAt || !isOnline) {
return;
}
const timeoutId = window.setTimeout(
() => {
void runCycle();
},
Math.max(nextRetryAt - Date.now(), 0)
);
return () => {
window.clearTimeout(timeoutId);
};
}, [isOnline, nextRetryAt, runCycle]);
useEffect(() => {
if (!userId) {
setLastError(null);
setLastSyncedAt(null);
setNextRetryAt(null);
setPhase(window.navigator.onLine ? "idle" : "offline");
retryAttemptRef.current = 0;
}
}, [userId]);
const status = useMemo<SyncEngineStatus>(() => {
if (!isOnline) {
return {
isOnline,
phase: "offline",
pendingCount,
blockedCount,
pendingRemoteCount,
lastSyncedAt,
nextRetryAt: null,
lastError
};
}
if (blockedCount > 0 && phase !== "syncing") {
return {
isOnline,
phase: "attention",
pendingCount,
blockedCount,
pendingRemoteCount,
lastSyncedAt,
nextRetryAt,
lastError
};
}
return {
isOnline,
phase,
pendingCount,
blockedCount,
pendingRemoteCount,
lastSyncedAt,
nextRetryAt,
lastError
};
}, [
blockedCount,
isOnline,
lastError,
lastSyncedAt,
nextRetryAt,
pendingCount,
pendingRemoteCount,
phase
]);
return {
status,
triggerSync
};
}
+162 -4
View File
@@ -1,6 +1,14 @@
import { useCallback, useEffect, useRef, useState } from "react"; import { useCallback, useEffect, useRef, useState } from "react";
import { useLiveQuery } from "dexie-react-hooks"; import { useLiveQuery } from "dexie-react-hooks";
import { CheckCircle2, CircleAlert } from "lucide-react"; import {
CheckCircle2,
CircleAlert,
CloudOff,
LoaderCircle,
RefreshCw,
ServerCrash
} from "lucide-react";
import { useSyncEngine, type SyncEngineStatus } from "@/hooks/use-sync-engine";
import { TaskRichEditor } from "@/components/task-rich-editor"; import { TaskRichEditor } from "@/components/task-rich-editor";
import { Button } from "@/components/ui/button"; import { Button } from "@/components/ui/button";
import { cn } from "@/lib/utils"; import { cn } from "@/lib/utils";
@@ -138,6 +146,105 @@ function serializeFormState(formState: TaskFormState): string {
return JSON.stringify(formState); return JSON.stringify(formState);
} }
function formatSyncTimestamp(timestamp: number | null): string {
if (timestamp === null) {
return "尚未完成同步";
}
return new Date(timestamp).toLocaleString("zh-CN", {
month: "2-digit",
day: "2-digit",
hour: "2-digit",
minute: "2-digit"
});
}
function formatRetryTime(timestamp: number | null): string {
if (timestamp === null) {
return "稍后";
}
return new Date(timestamp).toLocaleTimeString("zh-CN", {
hour: "2-digit",
minute: "2-digit",
second: "2-digit"
});
}
function getSyncSummary(status: SyncEngineStatus): {
title: string;
description: string;
accentClassName: string;
icon: typeof RefreshCw;
iconClassName: string;
} {
if (status.phase === "offline") {
return {
title: "离线工作中",
description:
status.pendingCount > 0
? `当前离线,已保留 ${status.pendingCount} 条待上传改动。`
: "当前离线,本地仍可继续编辑,联网后会自动同步。",
accentClassName: "border-amber-200/80 bg-amber-50/80 text-amber-950",
icon: CloudOff,
iconClassName: "text-amber-600"
};
}
if (status.phase === "syncing") {
return {
title: "正在同步",
description: "正在上传本地改动并拉取最新云端增量。",
accentClassName: "border-primary/20 bg-primary/10 text-foreground",
icon: LoaderCircle,
iconClassName: "animate-spin text-primary"
};
}
if (status.phase === "backoff") {
return {
title: "同步稍后重试",
description: `${status.lastError ?? "同步失败"},系统将在 ${formatRetryTime(
status.nextRetryAt
)} 再试一次。`,
accentClassName: "border-destructive/20 bg-destructive/8 text-foreground",
icon: ServerCrash,
iconClassName: "text-destructive"
};
}
if (status.phase === "attention") {
return {
title: "需要人工关注",
description: `${status.blockedCount} 条同步记录已达到重试上限,请检查接口配置或网络环境。`,
accentClassName: "border-destructive/20 bg-destructive/8 text-foreground",
icon: CircleAlert,
iconClassName: "text-destructive"
};
}
if (status.pendingRemoteCount > 0) {
return {
title: "云端变更已接收",
description: `已收到 ${status.pendingRemoteCount} 条云端变更,后续会进入本地合并流程。`,
accentClassName: "border-sky-200/80 bg-sky-50/80 text-sky-950",
icon: RefreshCw,
iconClassName: "text-sky-600"
};
}
return {
title: "同步状态正常",
description:
status.pendingCount > 0
? `还有 ${status.pendingCount} 条本地改动待处理。`
: "本地改动与云端增量传输均处于正常状态。",
accentClassName: "border-emerald-200/80 bg-emerald-50/80 text-emerald-950",
icon: CheckCircle2,
iconClassName: "text-emerald-600"
};
}
export function TodoShellPage({ session }: TodoShellPageProps) { export function TodoShellPage({ session }: TodoShellPageProps) {
const [selectedTaskId, setSelectedTaskId] = useState<string | null>(null); const [selectedTaskId, setSelectedTaskId] = useState<string | null>(null);
const [formState, setFormState] = useState<TaskFormState>(DEFAULT_FORM_STATE); const [formState, setFormState] = useState<TaskFormState>(DEFAULT_FORM_STATE);
@@ -148,6 +255,7 @@ export function TodoShellPage({ session }: TodoShellPageProps) {
const [feedbackVisible, setFeedbackVisible] = useState(false); const [feedbackVisible, setFeedbackVisible] = useState(false);
const [draftReadyTaskId, setDraftReadyTaskId] = useState<string | null>(null); const [draftReadyTaskId, setDraftReadyTaskId] = useState<string | null>(null);
const savedTaskSnapshotRef = useRef(serializeFormState(DEFAULT_FORM_STATE)); const savedTaskSnapshotRef = useRef(serializeFormState(DEFAULT_FORM_STATE));
const { status: syncStatus, triggerSync } = useSyncEngine(session);
const userId = session?.user.id ?? ""; const userId = session?.user.id ?? "";
@@ -417,10 +525,58 @@ export function TodoShellPage({ session }: TodoShellPageProps) {
} }
const taskList = tasks ?? []; const taskList = tasks ?? [];
const syncSummary = getSyncSummary(syncStatus);
const SyncSummaryIcon = syncSummary.icon;
return ( return (
<> <>
{renderFeedbackBanner()} {renderFeedbackBanner()}
<div className="space-y-4">
<section
className={cn(
"rounded-[1.75rem] border px-4 py-4 shadow-[0_24px_70px_-42px_hsl(var(--primary)/0.38)] backdrop-blur md:px-5",
syncSummary.accentClassName
)}
>
<div className="flex flex-col gap-4 md:flex-row md:items-center md:justify-between">
<div className="flex items-start gap-3">
<div className="rounded-2xl bg-white/70 p-2.5 shadow-sm ring-1 ring-black/5">
<SyncSummaryIcon className={cn("h-5 w-5", syncSummary.iconClassName)} />
</div>
<div className="space-y-1">
<p className="text-sm font-semibold">{syncSummary.title}</p>
<p className="text-sm leading-6 text-current/80">{syncSummary.description}</p>
</div>
</div>
<div className="flex flex-wrap items-center gap-2">
<span className="rounded-full border border-current/10 bg-white/70 px-3 py-1 text-xs text-current/80">
{syncStatus.pendingCount}
</span>
<span className="rounded-full border border-current/10 bg-white/70 px-3 py-1 text-xs text-current/80">
{syncStatus.pendingRemoteCount}
</span>
{syncStatus.blockedCount > 0 ? (
<span className="rounded-full border border-destructive/20 bg-white/70 px-3 py-1 text-xs text-destructive">
{syncStatus.blockedCount}
</span>
) : null}
<span className="rounded-full border border-current/10 bg-white/70 px-3 py-1 text-xs text-current/80">
{formatSyncTimestamp(syncStatus.lastSyncedAt)}
</span>
<Button
type="button"
variant="outline"
className="border-current/15 bg-white/70 text-current hover:bg-white"
onClick={triggerSync}
disabled={!syncStatus.isOnline || syncStatus.phase === "syncing"}
>
{syncStatus.phase === "syncing" ? "同步中..." : "立即同步"}
</Button>
</div>
</div>
</section>
<div className="grid gap-4 lg:grid-cols-[320px_minmax(0,1fr)]"> <div className="grid gap-4 lg:grid-cols-[320px_minmax(0,1fr)]">
<section className="rounded-2xl border border-border bg-card/90 p-4 shadow-[0_24px_70px_-42px_hsl(var(--primary)/0.6)] backdrop-blur"> <section className="rounded-2xl border border-border bg-card/90 p-4 shadow-[0_24px_70px_-42px_hsl(var(--primary)/0.6)] backdrop-blur">
<div className="mb-3 flex items-center justify-between gap-2"> <div className="mb-3 flex items-center justify-between gap-2">
@@ -444,7 +600,8 @@ export function TodoShellPage({ session }: TodoShellPageProps) {
)} )}
> >
{formatStorageSize(quotaSnapshot.usedBytes)} /{" "} {formatStorageSize(quotaSnapshot.usedBytes)} /{" "}
{formatStorageSize(quotaSnapshot.quotaBytes)}{quotaSnapshot.usedPercent.toFixed(1)} {formatStorageSize(quotaSnapshot.quotaBytes)}
{quotaSnapshot.usedPercent.toFixed(1)}
% %
</p> </p>
) : null} ) : null}
@@ -471,8 +628,8 @@ export function TodoShellPage({ session }: TodoShellPageProps) {
> >
<p className="truncate text-sm font-medium text-foreground">{task.title}</p> <p className="truncate text-sm font-medium text-foreground">{task.title}</p>
<p className="mt-1 text-xs text-muted-foreground"> <p className="mt-1 text-xs text-muted-foreground">
{STATUS_LABEL_MAP[task.status]} · {PRIORITY_LABEL_MAP[task.priority]} · {" "} {STATUS_LABEL_MAP[task.status]} · {PRIORITY_LABEL_MAP[task.priority]} ·
{formatUpdatedAt(task.updatedAt)} {formatUpdatedAt(task.updatedAt)}
</p> </p>
</button> </button>
); );
@@ -603,6 +760,7 @@ export function TodoShellPage({ session }: TodoShellPageProps) {
)} )}
</section> </section>
</div> </div>
</div>
</> </>
); );
} }
+33
View File
@@ -47,10 +47,33 @@ export type LocalTaskDraftRecord = {
updatedAt: number; updatedAt: number;
}; };
export type LocalSyncStateRecord = {
userId: string;
cursor: string | null;
lastSyncedAt: number | null;
updatedAt: number;
};
export type LocalSyncInboxRecord = {
opId: string;
userId: string;
entityId: string;
entityType: SyncEntityType;
action: SyncActionType;
payload: string | null;
clientTs: number;
deviceId: string;
serverTs: number;
receivedAt: number;
appliedAt: number | null;
};
class TodoLocalDb extends Dexie { class TodoLocalDb extends Dexie {
declare tasks: Table<LocalTaskRecord, string>; declare tasks: Table<LocalTaskRecord, string>;
declare opLogs: Table<LocalOpLogRecord, string>; declare opLogs: Table<LocalOpLogRecord, string>;
declare taskDrafts: Table<LocalTaskDraftRecord, string>; declare taskDrafts: Table<LocalTaskDraftRecord, string>;
declare syncStates: Table<LocalSyncStateRecord, string>;
declare syncInbox: Table<LocalSyncInboxRecord, string>;
constructor() { constructor() {
super("todolist-web-db"); super("todolist-web-db");
@@ -66,9 +89,19 @@ class TodoLocalDb extends Dexie {
task_drafts: "&taskId,userId,updatedAt" task_drafts: "&taskId,userId,updatedAt"
}); });
this.version(3).stores({
tasks: "&id,userId,status,priority,ddlAt,updatedAt,deletedAt",
op_logs: "&opId,entityId,entityType,action,clientTs,syncedAt",
task_drafts: "&taskId,userId,updatedAt",
sync_states: "&userId,updatedAt,lastSyncedAt",
sync_inbox: "&opId,userId,entityId,serverTs,appliedAt"
});
this.tasks = this.table("tasks"); this.tasks = this.table("tasks");
this.opLogs = this.table("op_logs"); this.opLogs = this.table("op_logs");
this.taskDrafts = this.table("task_drafts"); this.taskDrafts = this.table("task_drafts");
this.syncStates = this.table("sync_states");
this.syncInbox = this.table("sync_inbox");
} }
} }
+124
View File
@@ -0,0 +1,124 @@
import {
localDb,
type LocalOpLogRecord,
type LocalSyncInboxRecord,
type LocalSyncStateRecord
} from "@/services/local-db";
import type { SyncPullItem } from "@/services/sync-api";
export const MAX_SYNC_RETRY_COUNT = 5;
export async function listPendingSyncOperations(limit = 20): Promise<LocalOpLogRecord[]> {
const records = await localDb.opLogs.orderBy("clientTs").toArray();
return records
.filter((record) => record.syncedAt === null && record.retryCount < MAX_SYNC_RETRY_COUNT)
.slice(0, limit);
}
export async function countPendingSyncOperations(): Promise<number> {
const records = await localDb.opLogs.toArray();
return records.filter(
(record) => record.syncedAt === null && record.retryCount < MAX_SYNC_RETRY_COUNT
).length;
}
export async function countBlockedSyncOperations(): Promise<number> {
const records = await localDb.opLogs.toArray();
return records.filter(
(record) => record.syncedAt === null && record.retryCount >= MAX_SYNC_RETRY_COUNT
).length;
}
export async function markSyncOperationsSucceeded(
opIds: string[],
syncedAt: number
): Promise<void> {
if (opIds.length === 0) {
return;
}
const records = await localDb.opLogs.bulkGet(opIds);
const nextRecords = records
.filter((record): record is LocalOpLogRecord => record !== undefined)
.map((record) => ({
...record,
syncedAt,
errorMessage: null
}));
if (nextRecords.length > 0) {
await localDb.opLogs.bulkPut(nextRecords);
}
}
export async function markSyncOperationsFailed(
failures: Array<{ opId: string; errorMessage: string }>
): Promise<void> {
if (failures.length === 0) {
return;
}
const failureMap = new Map(failures.map((failure) => [failure.opId, failure.errorMessage]));
const records = await localDb.opLogs.bulkGet(failures.map((failure) => failure.opId));
const nextRecords = records
.filter((record): record is LocalOpLogRecord => record !== undefined)
.map((record) => ({
...record,
retryCount: record.retryCount + 1,
errorMessage: failureMap.get(record.opId) ?? "同步失败"
}));
if (nextRecords.length > 0) {
await localDb.opLogs.bulkPut(nextRecords);
}
}
export async function getLocalSyncState(userId: string): Promise<LocalSyncStateRecord | undefined> {
return localDb.syncStates.get(userId);
}
export async function saveLocalSyncState(input: {
userId: string;
cursor: string | null;
lastSyncedAt: number | null;
}): Promise<void> {
await localDb.syncStates.put({
userId: input.userId,
cursor: input.cursor,
lastSyncedAt: input.lastSyncedAt,
updatedAt: Date.now()
});
}
export async function enqueueRemoteSyncOperations(
userId: string,
operations: SyncPullItem[]
): Promise<number> {
if (operations.length === 0) {
return 0;
}
const receivedAt = Date.now();
const records: LocalSyncInboxRecord[] = operations.map((operation) => ({
opId: operation.opId,
userId,
entityId: operation.entityId,
entityType: operation.entityType,
action: operation.action,
payload: operation.payload,
clientTs: operation.clientTs,
deviceId: operation.deviceId,
serverTs: new Date(operation.serverTs).getTime(),
receivedAt,
appliedAt: null
}));
await localDb.syncInbox.bulkPut(records);
return records.length;
}
export async function countPendingRemoteOperations(userId: string): Promise<number> {
const records = await localDb.syncInbox.where("userId").equals(userId).toArray();
return records.filter((record) => record.appliedAt === null).length;
}
+117
View File
@@ -0,0 +1,117 @@
import type { LocalOpLogRecord } from "@/services/local-db";
export type SyncPushResult = {
acceptedCount: number;
duplicateCount: number;
failedCount: number;
results: Array<{
opId: string;
status: "accepted" | "duplicate" | "failed";
serverTs: string | null;
reason: string | null;
}>;
};
export type SyncPullItem = {
opId: string;
entityId: string;
entityType: "TASK";
action: "CREATE" | "UPDATE" | "DELETE";
payload: string | null;
clientTs: number;
deviceId: string;
serverTs: string;
};
export type SyncPullResult = {
items: SyncPullItem[];
nextCursor: string | null;
hasMore: boolean;
};
const DEFAULT_API_BASE_URL = "http://localhost:3000";
function resolveApiBaseUrl(): string {
const envBaseUrl = import.meta.env.VITE_API_BASE_URL as string | undefined;
if (!envBaseUrl) {
return DEFAULT_API_BASE_URL;
}
return envBaseUrl.replace(/\/+$/, "");
}
async function parseErrorMessage(response: Response): Promise<string> {
try {
const body = (await response.json()) as { message?: string | string[] };
if (Array.isArray(body.message)) {
return body.message.join("");
}
if (typeof body.message === "string" && body.message.trim()) {
return body.message;
}
} catch {
return `请求失败(${response.status}`;
}
return `请求失败(${response.status}`;
}
export async function pushSyncOperations(
userId: string,
operations: LocalOpLogRecord[]
): Promise<SyncPushResult> {
const response = await fetch(`${resolveApiBaseUrl()}/sync/push`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"x-user-id": userId
},
body: JSON.stringify({
operations: operations.map((operation) => ({
opId: operation.opId,
entityId: operation.entityId,
entityType: operation.entityType,
action: operation.action,
payload: operation.payload,
clientTs: operation.clientTs,
deviceId: operation.deviceId
}))
})
});
if (!response.ok) {
throw new Error(await parseErrorMessage(response));
}
return (await response.json()) as SyncPushResult;
}
export async function pullSyncOperations(input: {
userId: string;
cursor: string | null;
limit?: number;
}): Promise<SyncPullResult> {
const requestUrl = new URL(`${resolveApiBaseUrl()}/sync/pull`);
if (input.cursor) {
requestUrl.searchParams.set("cursor", input.cursor);
}
if (input.limit !== undefined) {
requestUrl.searchParams.set("limit", String(input.limit));
}
const response = await fetch(requestUrl, {
method: "GET",
headers: {
"x-user-id": input.userId
}
});
if (!response.ok) {
throw new Error(await parseErrorMessage(response));
}
return (await response.json()) as SyncPullResult;
}
+104
View File
@@ -0,0 +1,104 @@
import {
enqueueRemoteSyncOperations,
getLocalSyncState,
listPendingSyncOperations,
markSyncOperationsFailed,
markSyncOperationsSucceeded,
saveLocalSyncState
} from "@/services/local-sync-repo";
import { pullSyncOperations, pushSyncOperations } from "@/services/sync-api";
const PUSH_BATCH_LIMIT = 20;
const PULL_BATCH_LIMIT = 100;
const MAX_PULL_PAGES_PER_CYCLE = 5;
export type SyncCycleResult = {
pushedCount: number;
pulledCount: number;
lastSyncedAt: number;
hasFailures: boolean;
failureMessage: string | null;
};
export async function runSyncWorkerCycle(userId: string): Promise<SyncCycleResult> {
const lastSyncedAt = Date.now();
let pushedCount = 0;
let pulledCount = 0;
let hasFailures = false;
let failureMessage: string | null = null;
for (;;) {
const pendingOperations = await listPendingSyncOperations(PUSH_BATCH_LIMIT);
if (pendingOperations.length === 0) {
break;
}
const pushResult = await pushSyncOperations(userId, pendingOperations);
const syncedOperationIds = pushResult.results
.filter((result) => result.status === "accepted" || result.status === "duplicate")
.map((result) => result.opId);
const failedOperations = pushResult.results
.filter((result) => result.status === "failed")
.map((result) => ({
opId: result.opId,
errorMessage: result.reason ?? "同步失败"
}));
await markSyncOperationsSucceeded(syncedOperationIds, lastSyncedAt);
await markSyncOperationsFailed(failedOperations);
pushedCount += syncedOperationIds.length;
if (failedOperations.length > 0) {
hasFailures = true;
failureMessage = failedOperations[0]?.errorMessage ?? "同步失败";
break;
}
if (pendingOperations.length < PUSH_BATCH_LIMIT) {
break;
}
}
const currentState = await getLocalSyncState(userId);
let nextCursor = currentState?.cursor ?? null;
for (let page = 0; page < MAX_PULL_PAGES_PER_CYCLE; page += 1) {
const pullResult = await pullSyncOperations({
userId,
cursor: nextCursor,
limit: PULL_BATCH_LIMIT
});
if (pullResult.items.length > 0) {
pulledCount += await enqueueRemoteSyncOperations(userId, pullResult.items);
}
nextCursor = pullResult.nextCursor;
await saveLocalSyncState({
userId,
cursor: nextCursor,
lastSyncedAt
});
if (!pullResult.hasMore) {
break;
}
}
if (currentState === undefined && nextCursor === null) {
await saveLocalSyncState({
userId,
cursor: null,
lastSyncedAt
});
}
return {
pushedCount,
pulledCount,
lastSyncedAt,
hasFailures,
failureMessage
};
}