diff --git a/apps/api/src/main.ts b/apps/api/src/main.ts index 3f891f4..6a4f0ca 100644 --- a/apps/api/src/main.ts +++ b/apps/api/src/main.ts @@ -1,10 +1,18 @@ import "reflect-metadata"; import { ValidationPipe } from "@nestjs/common"; import { NestFactory } from "@nestjs/core"; +import type { NestExpressApplication } from "@nestjs/platform-express"; import { AppModule } from "./app.module"; async function bootstrap(): Promise { - const app = await NestFactory.create(AppModule); + const app = await NestFactory.create(AppModule); + const bodyLimit = process.env.API_BODY_LIMIT ?? "8mb"; + + app.useBodyParser("json", { limit: bodyLimit }); + app.useBodyParser("urlencoded", { + extended: true, + limit: bodyLimit + }); app.enableCors({ origin: true, credentials: true diff --git a/apps/api/src/sync/dto/sync-push.dto.ts b/apps/api/src/sync/dto/sync-push.dto.ts index 91b3a06..2e43da4 100644 --- a/apps/api/src/sync/dto/sync-push.dto.ts +++ b/apps/api/src/sync/dto/sync-push.dto.ts @@ -39,7 +39,7 @@ export class SyncPushOperationDto { @IsOptional() @IsString() - @MaxLength(50000) + @MaxLength(5000000) payload?: string; @Type(() => Number) diff --git a/apps/web/src/hooks/use-sync-engine.ts b/apps/web/src/hooks/use-sync-engine.ts index c6da2ea..09808e2 100644 --- a/apps/web/src/hooks/use-sync-engine.ts +++ b/apps/web/src/hooks/use-sync-engine.ts @@ -7,6 +7,7 @@ import { getLocalSyncState } from "@/services/local-sync-repo"; import type { WebSession } from "@/services/session-storage"; +import { applyPendingRemoteOperations } from "@/services/sync-merge"; import { runSyncWorkerCycle } from "@/services/sync-worker"; const PERIODIC_SYNC_INTERVAL_MS = 30_000; @@ -70,6 +71,7 @@ export function useSyncEngine(session: WebSession | null): { const retryAttemptRef = useRef(0); const runningRef = useRef(false); + const mergeRunningRef = useRef(false); useEffect(() => { setLastSyncedAt(storedSyncState?.lastSyncedAt ?? null); @@ -117,6 +119,37 @@ export function useSyncEngine(session: WebSession | null): { void runCycle(); }, [runCycle]); + const runMerge = useCallback(async () => { + if (!userId || mergeRunningRef.current) { + return; + } + + mergeRunningRef.current = true; + + try { + await applyPendingRemoteOperations(userId); + + if (!runningRef.current) { + setPhase((currentPhase) => { + if (!window.navigator.onLine) { + return "offline"; + } + + if (currentPhase === "backoff") { + return currentPhase; + } + + return blockedCount > 0 ? "attention" : "idle"; + }); + } + } catch (error) { + setLastError(getErrorMessage(error)); + setPhase("attention"); + } finally { + mergeRunningRef.current = false; + } + }, [blockedCount, userId]); + useEffect(() => { function handleOnline(): void { setIsOnline(true); @@ -190,6 +223,14 @@ export function useSyncEngine(session: WebSession | null): { }; }, [isOnline, nextRetryAt, runCycle]); + useEffect(() => { + if (!userId || pendingRemoteCount === 0 || runningRef.current) { + return; + } + + void runMerge(); + }, [pendingRemoteCount, runMerge, userId]); + useEffect(() => { if (!userId) { setLastError(null); diff --git a/apps/web/src/services/local-db.ts b/apps/web/src/services/local-db.ts index 5cdf57f..9fa0d97 100644 --- a/apps/web/src/services/local-db.ts +++ b/apps/web/src/services/local-db.ts @@ -17,6 +17,7 @@ export type LocalTaskRecord = { priority: LocalTaskPriority; status: LocalTaskStatus; ddlAt: number | null; + version: number; createdAt: number; updatedAt: number; deletedAt: number | null; @@ -97,6 +98,25 @@ class TodoLocalDb extends Dexie { sync_inbox: "&opId,userId,entityId,serverTs,appliedAt" }); + this.version(4) + .stores({ + tasks: "&id,userId,status,priority,ddlAt,updatedAt,deletedAt", + op_logs: "&opId,entityId,entityType,action,clientTs,syncedAt", + task_drafts: "&taskId,userId,updatedAt", + sync_states: "&userId,updatedAt,lastSyncedAt", + sync_inbox: "&opId,userId,entityId,serverTs,appliedAt" + }) + .upgrade(async (tx) => { + await tx + .table("tasks") + .toCollection() + .modify((task: Partial) => { + if (typeof task.version !== "number") { + task.version = 1; + } + }); + }); + this.tasks = this.table("tasks"); this.opLogs = this.table("op_logs"); this.taskDrafts = this.table("task_drafts"); diff --git a/apps/web/src/services/local-sync-repo.ts b/apps/web/src/services/local-sync-repo.ts index fbb79ca..87d13e9 100644 --- a/apps/web/src/services/local-sync-repo.ts +++ b/apps/web/src/services/local-sync-repo.ts @@ -118,6 +118,49 @@ export async function enqueueRemoteSyncOperations( return records.length; } +export async function listPendingRemoteOperations( + userId: string, + limit = 100 +): Promise { + const records = await localDb.syncInbox.where("userId").equals(userId).toArray(); + + return records + .filter((record) => record.appliedAt === null) + .sort((left, right) => { + if (left.serverTs !== right.serverTs) { + return left.serverTs - right.serverTs; + } + + if (left.clientTs !== right.clientTs) { + return left.clientTs - right.clientTs; + } + + return left.opId.localeCompare(right.opId); + }) + .slice(0, limit); +} + +export async function markRemoteOperationsApplied( + opIds: string[], + appliedAt: number +): Promise { + if (opIds.length === 0) { + return; + } + + const records = await localDb.syncInbox.bulkGet(opIds); + const nextRecords = records + .filter((record): record is LocalSyncInboxRecord => record !== undefined) + .map((record) => ({ + ...record, + appliedAt + })); + + if (nextRecords.length > 0) { + await localDb.syncInbox.bulkPut(nextRecords); + } +} + export async function countPendingRemoteOperations(userId: string): Promise { const records = await localDb.syncInbox.where("userId").equals(userId).toArray(); return records.filter((record) => record.appliedAt === null).length; diff --git a/apps/web/src/services/local-task-repo.ts b/apps/web/src/services/local-task-repo.ts index cd8edfd..b611047 100644 --- a/apps/web/src/services/local-task-repo.ts +++ b/apps/web/src/services/local-task-repo.ts @@ -24,6 +24,21 @@ export type UpdateLocalTaskInput = { ddlAt?: number | null; }; +type SyncTaskPayload = { + id?: string; + userId?: string; + title: string; + contentJson: string | null; + contentText?: string | null; + priority: LocalTaskPriority; + status: LocalTaskStatus; + ddlAt: number | null; + version: number; + createdAt?: number; + updatedAt: number; + deletedAt?: number | null; +}; + function resolveDeviceId(): string { const savedDeviceId = window.localStorage.getItem(DEVICE_ID_STORAGE_KEY); if (savedDeviceId) { @@ -54,6 +69,18 @@ function createOpLogRecord( }; } +function createSyncTaskPayload(payload: SyncTaskPayload): string { + const nextPayload: Record = { + ...payload + }; + + if (payload.contentJson !== null) { + delete nextPayload.contentText; + } + + return JSON.stringify(nextPayload); +} + export async function listLocalTasksByUser(userId: string): Promise { const tasks = await localDb.tasks.where("userId").equals(userId).toArray(); return tasks @@ -81,12 +108,30 @@ export async function createLocalTask(input: CreateLocalTaskInput): Promise { await localDb.tasks.add(task); @@ -104,6 +149,7 @@ export async function updateLocalTask( return undefined; } + const nextVersion = currentTask.version + 1; const nextTask: LocalTaskRecord = { ...currentTask, title: input.title !== undefined ? input.title.trim() || "未命名任务" : currentTask.title, @@ -112,19 +158,21 @@ export async function updateLocalTask( priority: input.priority ?? currentTask.priority, status: input.status ?? currentTask.status, ddlAt: input.ddlAt !== undefined ? input.ddlAt : currentTask.ddlAt, + version: nextVersion, updatedAt: Date.now() }; const opLog = createOpLogRecord( nextTask.id, "UPDATE", - JSON.stringify({ + createSyncTaskPayload({ title: nextTask.title, - contentText: nextTask.contentText, contentJson: nextTask.contentJson, + contentText: nextTask.contentText, priority: nextTask.priority, status: nextTask.status, ddlAt: nextTask.ddlAt, + version: nextTask.version, updatedAt: nextTask.updatedAt }) ); @@ -144,13 +192,23 @@ export async function deleteLocalTask(id: string): Promise { } const deletedAt = Date.now(); + const nextVersion = currentTask.version + 1; const nextTask: LocalTaskRecord = { ...currentTask, + version: nextVersion, deletedAt, updatedAt: deletedAt }; - const opLog = createOpLogRecord(id, "DELETE", JSON.stringify({ deletedAt })); + const opLog = createOpLogRecord( + id, + "DELETE", + JSON.stringify({ + deletedAt, + version: nextTask.version, + updatedAt: nextTask.updatedAt + }) + ); await localDb.transaction("rw", localDb.tasks, localDb.opLogs, async () => { await localDb.tasks.put(nextTask); diff --git a/apps/web/src/services/sync-api.ts b/apps/web/src/services/sync-api.ts index 7bc803e..e2c2ed0 100644 --- a/apps/web/src/services/sync-api.ts +++ b/apps/web/src/services/sync-api.ts @@ -41,6 +41,10 @@ function resolveApiBaseUrl(): string { } async function parseErrorMessage(response: Response): Promise { + if (response.status === 413) { + return "单次同步内容过大,请精简本次任务内容或等待系统分批重试。"; + } + try { const body = (await response.json()) as { message?: string | string[] }; if (Array.isArray(body.message)) { @@ -57,10 +61,56 @@ async function parseErrorMessage(response: Response): Promise { return `请求失败(${response.status})`; } +type SyncPushOperationRequest = { + opId: string; + entityId: string; + entityType: LocalOpLogRecord["entityType"]; + action: LocalOpLogRecord["action"]; + payload: string; + clientTs: number; + deviceId: string; +}; + +function compactOperationPayload(payload: string): string { + try { + const parsed = JSON.parse(payload) as unknown; + if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { + return payload; + } + + const nextPayload = { ...(parsed as Record) }; + if (nextPayload.contentJson !== undefined && nextPayload.contentJson !== null) { + delete nextPayload.contentText; + } + + return JSON.stringify(nextPayload); + } catch { + return payload; + } +} + +export function serializeSyncOperationForRequest( + operation: LocalOpLogRecord +): SyncPushOperationRequest { + return { + opId: operation.opId, + entityId: operation.entityId, + entityType: operation.entityType, + action: operation.action, + payload: compactOperationPayload(operation.payload), + clientTs: operation.clientTs, + deviceId: operation.deviceId + }; +} + export async function pushSyncOperations( userId: string, operations: LocalOpLogRecord[] ): Promise { + const requestOperations = operations.map((operation) => + serializeSyncOperationForRequest(operation) + ); + const response = await fetch(`${resolveApiBaseUrl()}/sync/push`, { method: "POST", headers: { @@ -68,15 +118,7 @@ export async function pushSyncOperations( "x-user-id": userId }, body: JSON.stringify({ - operations: operations.map((operation) => ({ - opId: operation.opId, - entityId: operation.entityId, - entityType: operation.entityType, - action: operation.action, - payload: operation.payload, - clientTs: operation.clientTs, - deviceId: operation.deviceId - })) + operations: requestOperations }) }); diff --git a/apps/web/src/services/sync-merge.ts b/apps/web/src/services/sync-merge.ts new file mode 100644 index 0000000..666d919 --- /dev/null +++ b/apps/web/src/services/sync-merge.ts @@ -0,0 +1,261 @@ +import { + localDb, + type LocalSyncInboxRecord, + type LocalTaskPriority, + type LocalTaskRecord, + type LocalTaskStatus +} from "@/services/local-db"; +import { listPendingRemoteOperations } from "@/services/local-sync-repo"; + +const TASK_PRIORITY_VALUES: LocalTaskPriority[] = ["LOW", "MEDIUM", "HIGH", "URGENT"]; +const TASK_STATUS_VALUES: LocalTaskStatus[] = ["TODO", "IN_PROGRESS", "DONE", "ARCHIVED"]; + +type RemoteTaskPayload = { + userId?: unknown; + title?: unknown; + contentJson?: unknown; + contentText?: unknown; + priority?: unknown; + status?: unknown; + ddlAt?: unknown; + version?: unknown; + createdAt?: unknown; + updatedAt?: unknown; + deletedAt?: unknown; +}; + +function normalizePriority(value: unknown, fallback: LocalTaskPriority): LocalTaskPriority { + if (typeof value === "string" && TASK_PRIORITY_VALUES.includes(value as LocalTaskPriority)) { + return value as LocalTaskPriority; + } + + return fallback; +} + +function normalizeStatus(value: unknown, fallback: LocalTaskStatus): LocalTaskStatus { + if (typeof value === "string" && TASK_STATUS_VALUES.includes(value as LocalTaskStatus)) { + return value as LocalTaskStatus; + } + + return fallback; +} + +function normalizeStringOrNull(value: unknown, fallback: string | null): string | null { + if (typeof value === "string") { + return value; + } + + if (value === null) { + return null; + } + + return fallback; +} + +function collectTextFromRichContent(value: unknown, fragments: string[]): void { + if (!value || typeof value !== "object") { + return; + } + + const node = value as { + text?: unknown; + content?: unknown; + }; + + if (typeof node.text === "string" && node.text.trim().length > 0) { + fragments.push(node.text.trim()); + } + + if (Array.isArray(node.content)) { + for (const child of node.content) { + collectTextFromRichContent(child, fragments); + } + } +} + +function extractTextFromContentJson(contentJson: string | null): string | null { + if (!contentJson) { + return null; + } + + try { + const parsed = JSON.parse(contentJson) as unknown; + const fragments: string[] = []; + collectTextFromRichContent(parsed, fragments); + return fragments.length > 0 ? fragments.join(" ") : null; + } catch { + return null; + } +} + +function normalizeNullableNumber(value: unknown, fallback: number | null): number | null { + if (typeof value === "number" && Number.isFinite(value)) { + return value; + } + + if (value === null) { + return null; + } + + return fallback; +} + +function normalizePositiveNumber(value: unknown, fallback: number): number { + if (typeof value === "number" && Number.isFinite(value) && value > 0) { + return value; + } + + return fallback; +} + +function parseOperationPayload(operation: LocalSyncInboxRecord): RemoteTaskPayload { + if (!operation.payload) { + return {}; + } + + const parsed = JSON.parse(operation.payload) as unknown; + if (!parsed || typeof parsed !== "object") { + return {}; + } + + return parsed as RemoteTaskPayload; +} + +function createFallbackTask( + operation: LocalSyncInboxRecord, + userId: string, + updatedAt: number, + version: number +): LocalTaskRecord { + return { + id: operation.entityId, + userId, + title: "未命名任务", + contentJson: null, + contentText: null, + priority: "MEDIUM", + status: "TODO", + ddlAt: null, + version, + createdAt: updatedAt, + updatedAt, + deletedAt: null + }; +} + +function buildIncomingTaskRecord( + operation: LocalSyncInboxRecord, + currentTask: LocalTaskRecord | undefined +): LocalTaskRecord { + const payload = parseOperationPayload(operation); + const fallbackVersion = currentTask?.version ?? 1; + const version = normalizePositiveNumber(payload.version, fallbackVersion); + const updatedAt = normalizePositiveNumber( + payload.updatedAt, + normalizePositiveNumber(payload.deletedAt, operation.clientTs) + ); + const fallbackTask = + currentTask ?? createFallbackTask(operation, operation.userId, updatedAt, version); + const contentJson = normalizeStringOrNull(payload.contentJson, fallbackTask.contentJson); + const contentText = normalizeStringOrNull( + payload.contentText, + extractTextFromContentJson(contentJson) ?? fallbackTask.contentText + ); + + if (operation.action === "DELETE") { + const deletedAt = normalizePositiveNumber(payload.deletedAt, updatedAt); + return { + ...fallbackTask, + version, + updatedAt: deletedAt, + deletedAt + }; + } + + return { + ...fallbackTask, + userId: typeof payload.userId === "string" ? payload.userId : fallbackTask.userId, + title: + typeof payload.title === "string" && payload.title.trim().length > 0 + ? payload.title + : fallbackTask.title, + contentJson, + contentText, + priority: normalizePriority(payload.priority, fallbackTask.priority), + status: normalizeStatus(payload.status, fallbackTask.status), + ddlAt: normalizeNullableNumber(payload.ddlAt, fallbackTask.ddlAt), + version, + createdAt: normalizePositiveNumber(payload.createdAt, fallbackTask.createdAt), + updatedAt, + deletedAt: normalizeNullableNumber(payload.deletedAt, null) + }; +} + +function getOperationTieBreaker(operation: LocalSyncInboxRecord): number { + if (operation.action === "DELETE") { + return 3; + } + + if (operation.action === "UPDATE") { + return 2; + } + + return 1; +} + +function shouldApplyIncomingTask( + currentTask: LocalTaskRecord | undefined, + incomingTask: LocalTaskRecord, + operation: LocalSyncInboxRecord +): boolean { + if (!currentTask) { + return true; + } + + if (incomingTask.updatedAt > currentTask.updatedAt) { + return true; + } + + if (incomingTask.updatedAt < currentTask.updatedAt) { + return false; + } + + if (incomingTask.version > currentTask.version) { + return true; + } + + if (incomingTask.version < currentTask.version) { + return false; + } + + return getOperationTieBreaker(operation) >= (currentTask.deletedAt === null ? 1 : 3); +} + +export async function applyPendingRemoteOperations(userId: string): Promise { + const pendingOperations = await listPendingRemoteOperations(userId); + if (pendingOperations.length === 0) { + return 0; + } + + const appliedAt = Date.now(); + + await localDb.transaction("rw", localDb.tasks, localDb.syncInbox, async () => { + for (const operation of pendingOperations) { + if (operation.entityType !== "TASK") { + await localDb.syncInbox.update(operation.opId, { appliedAt }); + continue; + } + + const currentTask = await localDb.tasks.get(operation.entityId); + const incomingTask = buildIncomingTaskRecord(operation, currentTask); + + if (shouldApplyIncomingTask(currentTask, incomingTask, operation)) { + await localDb.tasks.put(incomingTask); + } + + await localDb.syncInbox.update(operation.opId, { appliedAt }); + } + }); + + return pendingOperations.length; +} diff --git a/apps/web/src/services/sync-worker.ts b/apps/web/src/services/sync-worker.ts index cbbadc2..e59f9f7 100644 --- a/apps/web/src/services/sync-worker.ts +++ b/apps/web/src/services/sync-worker.ts @@ -6,15 +6,45 @@ import { markSyncOperationsSucceeded, saveLocalSyncState } from "@/services/local-sync-repo"; -import { pullSyncOperations, pushSyncOperations } from "@/services/sync-api"; +import { applyPendingRemoteOperations } from "@/services/sync-merge"; +import { + pullSyncOperations, + pushSyncOperations, + serializeSyncOperationForRequest +} from "@/services/sync-api"; +import type { LocalOpLogRecord } from "@/services/local-db"; const PUSH_BATCH_LIMIT = 20; +const PUSH_BATCH_MAX_BYTES = 256 * 1024; const PULL_BATCH_LIMIT = 100; const MAX_PULL_PAGES_PER_CYCLE = 5; +function estimateOperationBytes(operation: LocalOpLogRecord): number { + return new TextEncoder().encode(JSON.stringify(serializeSyncOperationForRequest(operation))) + .length; +} + +function createPushBatch(operations: LocalOpLogRecord[]): LocalOpLogRecord[] { + const batch: LocalOpLogRecord[] = []; + let batchBytes = 0; + + for (const operation of operations) { + const operationBytes = estimateOperationBytes(operation); + if (batch.length > 0 && batchBytes + operationBytes > PUSH_BATCH_MAX_BYTES) { + break; + } + + batch.push(operation); + batchBytes += operationBytes; + } + + return batch; +} + export type SyncCycleResult = { pushedCount: number; pulledCount: number; + appliedRemoteCount: number; lastSyncedAt: number; hasFailures: boolean; failureMessage: string | null; @@ -24,15 +54,17 @@ export async function runSyncWorkerCycle(userId: string): Promise result.status === "accepted" || result.status === "duplicate") @@ -55,7 +87,10 @@ export async function runSyncWorkerCycle(userId: string): Promise