feat(sync): implement lww conflict and tombstone handling

This commit is contained in:
2026-04-06 01:33:57 +08:00
parent c48e16a977
commit c98adb3051
9 changed files with 530 additions and 19 deletions
+20
View File
@@ -17,6 +17,7 @@ export type LocalTaskRecord = {
priority: LocalTaskPriority;
status: LocalTaskStatus;
ddlAt: number | null;
version: number;
createdAt: number;
updatedAt: number;
deletedAt: number | null;
@@ -97,6 +98,25 @@ class TodoLocalDb extends Dexie {
sync_inbox: "&opId,userId,entityId,serverTs,appliedAt"
});
this.version(4)
.stores({
tasks: "&id,userId,status,priority,ddlAt,updatedAt,deletedAt",
op_logs: "&opId,entityId,entityType,action,clientTs,syncedAt",
task_drafts: "&taskId,userId,updatedAt",
sync_states: "&userId,updatedAt,lastSyncedAt",
sync_inbox: "&opId,userId,entityId,serverTs,appliedAt"
})
.upgrade(async (tx) => {
await tx
.table("tasks")
.toCollection()
.modify((task: Partial<LocalTaskRecord>) => {
if (typeof task.version !== "number") {
task.version = 1;
}
});
});
this.tasks = this.table("tasks");
this.opLogs = this.table("op_logs");
this.taskDrafts = this.table("task_drafts");
+43
View File
@@ -118,6 +118,49 @@ export async function enqueueRemoteSyncOperations(
return records.length;
}
export async function listPendingRemoteOperations(
userId: string,
limit = 100
): Promise<LocalSyncInboxRecord[]> {
const records = await localDb.syncInbox.where("userId").equals(userId).toArray();
return records
.filter((record) => record.appliedAt === null)
.sort((left, right) => {
if (left.serverTs !== right.serverTs) {
return left.serverTs - right.serverTs;
}
if (left.clientTs !== right.clientTs) {
return left.clientTs - right.clientTs;
}
return left.opId.localeCompare(right.opId);
})
.slice(0, limit);
}
export async function markRemoteOperationsApplied(
opIds: string[],
appliedAt: number
): Promise<void> {
if (opIds.length === 0) {
return;
}
const records = await localDb.syncInbox.bulkGet(opIds);
const nextRecords = records
.filter((record): record is LocalSyncInboxRecord => record !== undefined)
.map((record) => ({
...record,
appliedAt
}));
if (nextRecords.length > 0) {
await localDb.syncInbox.bulkPut(nextRecords);
}
}
export async function countPendingRemoteOperations(userId: string): Promise<number> {
const records = await localDb.syncInbox.where("userId").equals(userId).toArray();
return records.filter((record) => record.appliedAt === null).length;
+62 -4
View File
@@ -24,6 +24,21 @@ export type UpdateLocalTaskInput = {
ddlAt?: number | null;
};
type SyncTaskPayload = {
id?: string;
userId?: string;
title: string;
contentJson: string | null;
contentText?: string | null;
priority: LocalTaskPriority;
status: LocalTaskStatus;
ddlAt: number | null;
version: number;
createdAt?: number;
updatedAt: number;
deletedAt?: number | null;
};
function resolveDeviceId(): string {
const savedDeviceId = window.localStorage.getItem(DEVICE_ID_STORAGE_KEY);
if (savedDeviceId) {
@@ -54,6 +69,18 @@ function createOpLogRecord(
};
}
function createSyncTaskPayload(payload: SyncTaskPayload): string {
const nextPayload: Record<string, unknown> = {
...payload
};
if (payload.contentJson !== null) {
delete nextPayload.contentText;
}
return JSON.stringify(nextPayload);
}
export async function listLocalTasksByUser(userId: string): Promise<LocalTaskRecord[]> {
const tasks = await localDb.tasks.where("userId").equals(userId).toArray();
return tasks
@@ -81,12 +108,30 @@ export async function createLocalTask(input: CreateLocalTaskInput): Promise<Loca
priority: "MEDIUM",
status: "TODO",
ddlAt: null,
version: 1,
createdAt: now,
updatedAt: now,
deletedAt: null
};
const opLog = createOpLogRecord(task.id, "CREATE", JSON.stringify(task));
const opLog = createOpLogRecord(
task.id,
"CREATE",
createSyncTaskPayload({
id: task.id,
userId: task.userId,
title: task.title,
contentJson: task.contentJson,
contentText: task.contentText,
priority: task.priority,
status: task.status,
ddlAt: task.ddlAt,
version: task.version,
createdAt: task.createdAt,
updatedAt: task.updatedAt,
deletedAt: task.deletedAt
})
);
await localDb.transaction("rw", localDb.tasks, localDb.opLogs, async () => {
await localDb.tasks.add(task);
@@ -104,6 +149,7 @@ export async function updateLocalTask(
return undefined;
}
const nextVersion = currentTask.version + 1;
const nextTask: LocalTaskRecord = {
...currentTask,
title: input.title !== undefined ? input.title.trim() || "未命名任务" : currentTask.title,
@@ -112,19 +158,21 @@ export async function updateLocalTask(
priority: input.priority ?? currentTask.priority,
status: input.status ?? currentTask.status,
ddlAt: input.ddlAt !== undefined ? input.ddlAt : currentTask.ddlAt,
version: nextVersion,
updatedAt: Date.now()
};
const opLog = createOpLogRecord(
nextTask.id,
"UPDATE",
JSON.stringify({
createSyncTaskPayload({
title: nextTask.title,
contentText: nextTask.contentText,
contentJson: nextTask.contentJson,
contentText: nextTask.contentText,
priority: nextTask.priority,
status: nextTask.status,
ddlAt: nextTask.ddlAt,
version: nextTask.version,
updatedAt: nextTask.updatedAt
})
);
@@ -144,13 +192,23 @@ export async function deleteLocalTask(id: string): Promise<boolean> {
}
const deletedAt = Date.now();
const nextVersion = currentTask.version + 1;
const nextTask: LocalTaskRecord = {
...currentTask,
version: nextVersion,
deletedAt,
updatedAt: deletedAt
};
const opLog = createOpLogRecord(id, "DELETE", JSON.stringify({ deletedAt }));
const opLog = createOpLogRecord(
id,
"DELETE",
JSON.stringify({
deletedAt,
version: nextTask.version,
updatedAt: nextTask.updatedAt
})
);
await localDb.transaction("rw", localDb.tasks, localDb.opLogs, async () => {
await localDb.tasks.put(nextTask);
+51 -9
View File
@@ -41,6 +41,10 @@ function resolveApiBaseUrl(): string {
}
async function parseErrorMessage(response: Response): Promise<string> {
if (response.status === 413) {
return "单次同步内容过大,请精简本次任务内容或等待系统分批重试。";
}
try {
const body = (await response.json()) as { message?: string | string[] };
if (Array.isArray(body.message)) {
@@ -57,10 +61,56 @@ async function parseErrorMessage(response: Response): Promise<string> {
return `请求失败(${response.status}`;
}
type SyncPushOperationRequest = {
opId: string;
entityId: string;
entityType: LocalOpLogRecord["entityType"];
action: LocalOpLogRecord["action"];
payload: string;
clientTs: number;
deviceId: string;
};
function compactOperationPayload(payload: string): string {
try {
const parsed = JSON.parse(payload) as unknown;
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
return payload;
}
const nextPayload = { ...(parsed as Record<string, unknown>) };
if (nextPayload.contentJson !== undefined && nextPayload.contentJson !== null) {
delete nextPayload.contentText;
}
return JSON.stringify(nextPayload);
} catch {
return payload;
}
}
export function serializeSyncOperationForRequest(
operation: LocalOpLogRecord
): SyncPushOperationRequest {
return {
opId: operation.opId,
entityId: operation.entityId,
entityType: operation.entityType,
action: operation.action,
payload: compactOperationPayload(operation.payload),
clientTs: operation.clientTs,
deviceId: operation.deviceId
};
}
export async function pushSyncOperations(
userId: string,
operations: LocalOpLogRecord[]
): Promise<SyncPushResult> {
const requestOperations = operations.map((operation) =>
serializeSyncOperationForRequest(operation)
);
const response = await fetch(`${resolveApiBaseUrl()}/sync/push`, {
method: "POST",
headers: {
@@ -68,15 +118,7 @@ export async function pushSyncOperations(
"x-user-id": userId
},
body: JSON.stringify({
operations: operations.map((operation) => ({
opId: operation.opId,
entityId: operation.entityId,
entityType: operation.entityType,
action: operation.action,
payload: operation.payload,
clientTs: operation.clientTs,
deviceId: operation.deviceId
}))
operations: requestOperations
})
});
+261
View File
@@ -0,0 +1,261 @@
import {
localDb,
type LocalSyncInboxRecord,
type LocalTaskPriority,
type LocalTaskRecord,
type LocalTaskStatus
} from "@/services/local-db";
import { listPendingRemoteOperations } from "@/services/local-sync-repo";
const TASK_PRIORITY_VALUES: LocalTaskPriority[] = ["LOW", "MEDIUM", "HIGH", "URGENT"];
const TASK_STATUS_VALUES: LocalTaskStatus[] = ["TODO", "IN_PROGRESS", "DONE", "ARCHIVED"];
type RemoteTaskPayload = {
userId?: unknown;
title?: unknown;
contentJson?: unknown;
contentText?: unknown;
priority?: unknown;
status?: unknown;
ddlAt?: unknown;
version?: unknown;
createdAt?: unknown;
updatedAt?: unknown;
deletedAt?: unknown;
};
function normalizePriority(value: unknown, fallback: LocalTaskPriority): LocalTaskPriority {
if (typeof value === "string" && TASK_PRIORITY_VALUES.includes(value as LocalTaskPriority)) {
return value as LocalTaskPriority;
}
return fallback;
}
function normalizeStatus(value: unknown, fallback: LocalTaskStatus): LocalTaskStatus {
if (typeof value === "string" && TASK_STATUS_VALUES.includes(value as LocalTaskStatus)) {
return value as LocalTaskStatus;
}
return fallback;
}
function normalizeStringOrNull(value: unknown, fallback: string | null): string | null {
if (typeof value === "string") {
return value;
}
if (value === null) {
return null;
}
return fallback;
}
function collectTextFromRichContent(value: unknown, fragments: string[]): void {
if (!value || typeof value !== "object") {
return;
}
const node = value as {
text?: unknown;
content?: unknown;
};
if (typeof node.text === "string" && node.text.trim().length > 0) {
fragments.push(node.text.trim());
}
if (Array.isArray(node.content)) {
for (const child of node.content) {
collectTextFromRichContent(child, fragments);
}
}
}
function extractTextFromContentJson(contentJson: string | null): string | null {
if (!contentJson) {
return null;
}
try {
const parsed = JSON.parse(contentJson) as unknown;
const fragments: string[] = [];
collectTextFromRichContent(parsed, fragments);
return fragments.length > 0 ? fragments.join(" ") : null;
} catch {
return null;
}
}
function normalizeNullableNumber(value: unknown, fallback: number | null): number | null {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
if (value === null) {
return null;
}
return fallback;
}
function normalizePositiveNumber(value: unknown, fallback: number): number {
if (typeof value === "number" && Number.isFinite(value) && value > 0) {
return value;
}
return fallback;
}
function parseOperationPayload(operation: LocalSyncInboxRecord): RemoteTaskPayload {
if (!operation.payload) {
return {};
}
const parsed = JSON.parse(operation.payload) as unknown;
if (!parsed || typeof parsed !== "object") {
return {};
}
return parsed as RemoteTaskPayload;
}
function createFallbackTask(
operation: LocalSyncInboxRecord,
userId: string,
updatedAt: number,
version: number
): LocalTaskRecord {
return {
id: operation.entityId,
userId,
title: "未命名任务",
contentJson: null,
contentText: null,
priority: "MEDIUM",
status: "TODO",
ddlAt: null,
version,
createdAt: updatedAt,
updatedAt,
deletedAt: null
};
}
function buildIncomingTaskRecord(
operation: LocalSyncInboxRecord,
currentTask: LocalTaskRecord | undefined
): LocalTaskRecord {
const payload = parseOperationPayload(operation);
const fallbackVersion = currentTask?.version ?? 1;
const version = normalizePositiveNumber(payload.version, fallbackVersion);
const updatedAt = normalizePositiveNumber(
payload.updatedAt,
normalizePositiveNumber(payload.deletedAt, operation.clientTs)
);
const fallbackTask =
currentTask ?? createFallbackTask(operation, operation.userId, updatedAt, version);
const contentJson = normalizeStringOrNull(payload.contentJson, fallbackTask.contentJson);
const contentText = normalizeStringOrNull(
payload.contentText,
extractTextFromContentJson(contentJson) ?? fallbackTask.contentText
);
if (operation.action === "DELETE") {
const deletedAt = normalizePositiveNumber(payload.deletedAt, updatedAt);
return {
...fallbackTask,
version,
updatedAt: deletedAt,
deletedAt
};
}
return {
...fallbackTask,
userId: typeof payload.userId === "string" ? payload.userId : fallbackTask.userId,
title:
typeof payload.title === "string" && payload.title.trim().length > 0
? payload.title
: fallbackTask.title,
contentJson,
contentText,
priority: normalizePriority(payload.priority, fallbackTask.priority),
status: normalizeStatus(payload.status, fallbackTask.status),
ddlAt: normalizeNullableNumber(payload.ddlAt, fallbackTask.ddlAt),
version,
createdAt: normalizePositiveNumber(payload.createdAt, fallbackTask.createdAt),
updatedAt,
deletedAt: normalizeNullableNumber(payload.deletedAt, null)
};
}
function getOperationTieBreaker(operation: LocalSyncInboxRecord): number {
if (operation.action === "DELETE") {
return 3;
}
if (operation.action === "UPDATE") {
return 2;
}
return 1;
}
function shouldApplyIncomingTask(
currentTask: LocalTaskRecord | undefined,
incomingTask: LocalTaskRecord,
operation: LocalSyncInboxRecord
): boolean {
if (!currentTask) {
return true;
}
if (incomingTask.updatedAt > currentTask.updatedAt) {
return true;
}
if (incomingTask.updatedAt < currentTask.updatedAt) {
return false;
}
if (incomingTask.version > currentTask.version) {
return true;
}
if (incomingTask.version < currentTask.version) {
return false;
}
return getOperationTieBreaker(operation) >= (currentTask.deletedAt === null ? 1 : 3);
}
export async function applyPendingRemoteOperations(userId: string): Promise<number> {
const pendingOperations = await listPendingRemoteOperations(userId);
if (pendingOperations.length === 0) {
return 0;
}
const appliedAt = Date.now();
await localDb.transaction("rw", localDb.tasks, localDb.syncInbox, async () => {
for (const operation of pendingOperations) {
if (operation.entityType !== "TASK") {
await localDb.syncInbox.update(operation.opId, { appliedAt });
continue;
}
const currentTask = await localDb.tasks.get(operation.entityId);
const incomingTask = buildIncomingTaskRecord(operation, currentTask);
if (shouldApplyIncomingTask(currentTask, incomingTask, operation)) {
await localDb.tasks.put(incomingTask);
}
await localDb.syncInbox.update(operation.opId, { appliedAt });
}
});
return pendingOperations.length;
}
+42 -4
View File
@@ -6,15 +6,45 @@ import {
markSyncOperationsSucceeded,
saveLocalSyncState
} from "@/services/local-sync-repo";
import { pullSyncOperations, pushSyncOperations } from "@/services/sync-api";
import { applyPendingRemoteOperations } from "@/services/sync-merge";
import {
pullSyncOperations,
pushSyncOperations,
serializeSyncOperationForRequest
} from "@/services/sync-api";
import type { LocalOpLogRecord } from "@/services/local-db";
const PUSH_BATCH_LIMIT = 20;
const PUSH_BATCH_MAX_BYTES = 256 * 1024;
const PULL_BATCH_LIMIT = 100;
const MAX_PULL_PAGES_PER_CYCLE = 5;
function estimateOperationBytes(operation: LocalOpLogRecord): number {
return new TextEncoder().encode(JSON.stringify(serializeSyncOperationForRequest(operation)))
.length;
}
function createPushBatch(operations: LocalOpLogRecord[]): LocalOpLogRecord[] {
const batch: LocalOpLogRecord[] = [];
let batchBytes = 0;
for (const operation of operations) {
const operationBytes = estimateOperationBytes(operation);
if (batch.length > 0 && batchBytes + operationBytes > PUSH_BATCH_MAX_BYTES) {
break;
}
batch.push(operation);
batchBytes += operationBytes;
}
return batch;
}
export type SyncCycleResult = {
pushedCount: number;
pulledCount: number;
appliedRemoteCount: number;
lastSyncedAt: number;
hasFailures: boolean;
failureMessage: string | null;
@@ -24,15 +54,17 @@ export async function runSyncWorkerCycle(userId: string): Promise<SyncCycleResul
const lastSyncedAt = Date.now();
let pushedCount = 0;
let pulledCount = 0;
let appliedRemoteCount = 0;
let hasFailures = false;
let failureMessage: string | null = null;
for (;;) {
const pendingOperations = await listPendingSyncOperations(PUSH_BATCH_LIMIT);
if (pendingOperations.length === 0) {
const pendingCandidates = await listPendingSyncOperations(PUSH_BATCH_LIMIT);
if (pendingCandidates.length === 0) {
break;
}
const pendingOperations = createPushBatch(pendingCandidates);
const pushResult = await pushSyncOperations(userId, pendingOperations);
const syncedOperationIds = pushResult.results
.filter((result) => result.status === "accepted" || result.status === "duplicate")
@@ -55,7 +87,10 @@ export async function runSyncWorkerCycle(userId: string): Promise<SyncCycleResul
break;
}
if (pendingOperations.length < PUSH_BATCH_LIMIT) {
if (
pendingCandidates.length < PUSH_BATCH_LIMIT ||
pendingOperations.length < pendingCandidates.length
) {
break;
}
}
@@ -94,9 +129,12 @@ export async function runSyncWorkerCycle(userId: string): Promise<SyncCycleResul
});
}
appliedRemoteCount = await applyPendingRemoteOperations(userId);
return {
pushedCount,
pulledCount,
appliedRemoteCount,
lastSyncedAt,
hasFailures,
failureMessage