diff --git a/apps/api/src/app.module.ts b/apps/api/src/app.module.ts index 4f6ee3f..8c2a78f 100644 --- a/apps/api/src/app.module.ts +++ b/apps/api/src/app.module.ts @@ -3,6 +3,7 @@ import { ConfigModule } from "@nestjs/config"; import { AttachmentModule } from "./attachment/attachment.module"; import { AuthModule } from "./auth/auth.module"; import { PrismaModule } from "./prisma/prisma.module"; +import { SyncModule } from "./sync/sync.module"; import { TaskModule } from "./task/task.module"; @Module({ @@ -14,7 +15,8 @@ import { TaskModule } from "./task/task.module"; PrismaModule, AuthModule, TaskModule, - AttachmentModule + AttachmentModule, + SyncModule ] }) export class AppModule {} diff --git a/apps/api/src/main.ts b/apps/api/src/main.ts index 3f891f4..6a4f0ca 100644 --- a/apps/api/src/main.ts +++ b/apps/api/src/main.ts @@ -1,10 +1,18 @@ import "reflect-metadata"; import { ValidationPipe } from "@nestjs/common"; import { NestFactory } from "@nestjs/core"; +import type { NestExpressApplication } from "@nestjs/platform-express"; import { AppModule } from "./app.module"; async function bootstrap(): Promise { - const app = await NestFactory.create(AppModule); + const app = await NestFactory.create(AppModule); + const bodyLimit = process.env.API_BODY_LIMIT ?? "8mb"; + + app.useBodyParser("json", { limit: bodyLimit }); + app.useBodyParser("urlencoded", { + extended: true, + limit: bodyLimit + }); app.enableCors({ origin: true, credentials: true diff --git a/apps/api/src/sync/dto/sync-pull.dto.ts b/apps/api/src/sync/dto/sync-pull.dto.ts new file mode 100644 index 0000000..c5c99fb --- /dev/null +++ b/apps/api/src/sync/dto/sync-pull.dto.ts @@ -0,0 +1,16 @@ +import { Type } from "class-transformer"; +import { IsInt, IsOptional, IsString, Max, MaxLength, Min } from "class-validator"; + +export class SyncPullQueryDto { + @IsOptional() + @IsString() + @MaxLength(512) + cursor?: string; + + @Type(() => Number) + @IsOptional() + @IsInt() + @Min(1) + @Max(200) + limit?: number; +} diff --git a/apps/api/src/sync/dto/sync-push.dto.ts b/apps/api/src/sync/dto/sync-push.dto.ts new file mode 100644 index 0000000..2e43da4 --- /dev/null +++ b/apps/api/src/sync/dto/sync-push.dto.ts @@ -0,0 +1,62 @@ +import { Type } from "class-transformer"; +import { + ArrayMaxSize, + ArrayMinSize, + IsArray, + IsEnum, + IsInt, + IsOptional, + IsString, + MaxLength, + Min, + ValidateNested +} from "class-validator"; + +export enum SyncEntityTypeDto { + TASK = "TASK" +} + +export enum SyncActionTypeDto { + CREATE = "CREATE", + UPDATE = "UPDATE", + DELETE = "DELETE" +} + +export class SyncPushOperationDto { + @IsString() + @MaxLength(64) + opId!: string; + + @IsString() + @MaxLength(64) + entityId!: string; + + @IsEnum(SyncEntityTypeDto) + entityType!: SyncEntityTypeDto; + + @IsEnum(SyncActionTypeDto) + action!: SyncActionTypeDto; + + @IsOptional() + @IsString() + @MaxLength(5000000) + payload?: string; + + @Type(() => Number) + @IsInt() + @Min(0) + clientTs!: number; + + @IsString() + @MaxLength(128) + deviceId!: string; +} + +export class SyncPushDto { + @IsArray() + @ArrayMinSize(1) + @ArrayMaxSize(200) + @ValidateNested({ each: true }) + @Type(() => SyncPushOperationDto) + operations!: SyncPushOperationDto[]; +} diff --git a/apps/api/src/sync/sync.controller.ts b/apps/api/src/sync/sync.controller.ts new file mode 100644 index 0000000..72ccfac --- /dev/null +++ b/apps/api/src/sync/sync.controller.ts @@ -0,0 +1,34 @@ +import { Body, Controller, Get, Headers, Post, Query, UnauthorizedException } from "@nestjs/common"; +import { SyncPullQueryDto } from "./dto/sync-pull.dto"; +import { SyncPushDto } from "./dto/sync-push.dto"; +import { SyncPullResponse, SyncPushResponse, SyncService } from "./sync.service"; + +@Controller("sync") +export class SyncController { + constructor(private readonly syncService: SyncService) {} + + @Get("pull") + async pullOperations( + @Headers("x-user-id") userIdHeader: string | string[] | undefined, + @Query() query: SyncPullQueryDto + ): Promise { + return this.syncService.pullOperations(this.resolveUserId(userIdHeader), query); + } + + @Post("push") + async pushOperations( + @Headers("x-user-id") userIdHeader: string | string[] | undefined, + @Body() body: SyncPushDto + ): Promise { + return this.syncService.pushOperations(this.resolveUserId(userIdHeader), body); + } + + private resolveUserId(userIdHeader: string | string[] | undefined): string { + const userId = Array.isArray(userIdHeader) ? userIdHeader[0] : userIdHeader; + if (!userId) { + throw new UnauthorizedException("缺少用户上下文"); + } + + return userId; + } +} diff --git a/apps/api/src/sync/sync.module.ts b/apps/api/src/sync/sync.module.ts new file mode 100644 index 0000000..65f1492 --- /dev/null +++ b/apps/api/src/sync/sync.module.ts @@ -0,0 +1,11 @@ +import { Module } from "@nestjs/common"; +import { PrismaModule } from "../prisma/prisma.module"; +import { SyncController } from "./sync.controller"; +import { SyncService } from "./sync.service"; + +@Module({ + imports: [PrismaModule], + controllers: [SyncController], + providers: [SyncService] +}) +export class SyncModule {} diff --git a/apps/api/src/sync/sync.service.ts b/apps/api/src/sync/sync.service.ts new file mode 100644 index 0000000..9bab5e2 --- /dev/null +++ b/apps/api/src/sync/sync.service.ts @@ -0,0 +1,313 @@ +import { BadRequestException, Injectable } from "@nestjs/common"; +import { Prisma } from "../../generated/prisma/client"; +import { PrismaService } from "../prisma/prisma.service"; +import { SyncPullQueryDto } from "./dto/sync-pull.dto"; +import { SyncPushDto, SyncPushOperationDto } from "./dto/sync-push.dto"; + +export type SyncPushItemStatus = "accepted" | "duplicate" | "failed"; + +export type SyncPushItemResult = { + opId: string; + status: SyncPushItemStatus; + serverTs: string | null; + reason: string | null; +}; + +export type SyncPushResponse = { + acceptedCount: number; + duplicateCount: number; + failedCount: number; + results: SyncPushItemResult[]; +}; + +type ExistingOperationRecord = { + opId: string; + serverTs: Date; +}; + +type SyncPullCursorState = { + serverTs: string; + opId: string; +}; + +type SyncPullOperationRecord = { + opId: string; + entityId: string; + entityType: string; + action: string; + payload: Prisma.JsonValue | null; + clientTs: Date; + deviceId: string; + serverTs: Date; +}; + +export type SyncPullItem = { + opId: string; + entityId: string; + entityType: string; + action: string; + payload: string | null; + clientTs: number; + deviceId: string; + serverTs: string; +}; + +export type SyncPullResponse = { + items: SyncPullItem[]; + nextCursor: string | null; + hasMore: boolean; +}; + +@Injectable() +export class SyncService { + constructor(private readonly prismaService: PrismaService) {} + + async pullOperations(userId: string, query: SyncPullQueryDto): Promise { + const limit = query.limit ?? 100; + const cursor = this.parseCursor(query.cursor); + + const operations = (await this.prismaService.syncOperation.findMany({ + where: this.buildPullWhereInput(userId, cursor), + orderBy: [{ serverTs: "asc" }, { opId: "asc" }], + take: limit + 1, + select: { + opId: true, + entityId: true, + entityType: true, + action: true, + payload: true, + clientTs: true, + deviceId: true, + serverTs: true + } + })) as SyncPullOperationRecord[]; + + const hasMore = operations.length > limit; + const pageItems = hasMore ? operations.slice(0, limit) : operations; + const lastOperation = pageItems.at(-1); + + return { + items: pageItems.map((operation) => this.serializePullItem(operation)), + nextCursor: lastOperation + ? this.encodeCursor({ + serverTs: lastOperation.serverTs.toISOString(), + opId: lastOperation.opId + }) + : (query.cursor ?? null), + hasMore + }; + } + + async pushOperations(userId: string, body: SyncPushDto): Promise { + const existingOperations = await this.loadExistingOperations(userId, body.operations); + const results: SyncPushItemResult[] = []; + const seenOperationIds = new Set(); + const acceptedOperationServerTs = new Map(); + + for (const operation of body.operations) { + if (seenOperationIds.has(operation.opId)) { + results.push({ + opId: operation.opId, + status: "duplicate", + serverTs: acceptedOperationServerTs.get(operation.opId) ?? null, + reason: "same_batch_duplicate" + }); + continue; + } + + seenOperationIds.add(operation.opId); + + const existingOperation = existingOperations.get(operation.opId); + if (existingOperation) { + results.push({ + opId: operation.opId, + status: "duplicate", + serverTs: existingOperation.serverTs.toISOString(), + reason: "already_synced" + }); + continue; + } + + try { + const createdOperation = await this.prismaService.syncOperation.create({ + data: { + opId: operation.opId, + userId, + deviceId: operation.deviceId, + entityType: operation.entityType, + entityId: operation.entityId, + action: operation.action, + payload: operation.payload, + clientTs: new Date(operation.clientTs) + }, + select: { + opId: true, + serverTs: true + } + }); + + const serverTs = createdOperation.serverTs.toISOString(); + acceptedOperationServerTs.set(createdOperation.opId, serverTs); + results.push({ + opId: createdOperation.opId, + status: "accepted", + serverTs, + reason: null + }); + } catch (error) { + if (this.isDuplicateOpIdError(error)) { + results.push({ + opId: operation.opId, + status: "duplicate", + serverTs: null, + reason: "already_synced" + }); + continue; + } + + results.push({ + opId: operation.opId, + status: "failed", + serverTs: null, + reason: "persist_failed" + }); + } + } + + return { + acceptedCount: results.filter((item) => item.status === "accepted").length, + duplicateCount: results.filter((item) => item.status === "duplicate").length, + failedCount: results.filter((item) => item.status === "failed").length, + results + }; + } + + private async loadExistingOperations( + userId: string, + operations: SyncPushOperationDto[] + ): Promise> { + const opIds = Array.from(new Set(operations.map((operation) => operation.opId))); + + const existingOperations = (await this.prismaService.syncOperation.findMany({ + where: { + userId, + opId: { + in: opIds + } + }, + select: { + opId: true, + serverTs: true + } + })) as ExistingOperationRecord[]; + + return new Map( + existingOperations.map((operation): [string, ExistingOperationRecord] => [ + operation.opId, + operation + ]) + ); + } + + private buildPullWhereInput( + userId: string, + cursor: SyncPullCursorState | null + ): Prisma.SyncOperationWhereInput { + if (!cursor) { + return { userId }; + } + + const cursorDate = new Date(cursor.serverTs); + + return { + userId, + // 同一毫秒内可能有多条操作,必须使用 opId 作为二级游标来保证稳定分页。 + OR: [ + { + serverTs: { + gt: cursorDate + } + }, + { + serverTs: cursorDate, + opId: { + gt: cursor.opId + } + } + ] + }; + } + + private serializePullItem(operation: SyncPullOperationRecord): SyncPullItem { + return { + opId: operation.opId, + entityId: operation.entityId, + entityType: operation.entityType, + action: operation.action, + payload: this.serializePayload(operation.payload), + clientTs: operation.clientTs.getTime(), + deviceId: operation.deviceId, + serverTs: operation.serverTs.toISOString() + }; + } + + private serializePayload(payload: Prisma.JsonValue | null): string | null { + if (payload === null) { + return null; + } + + if (typeof payload === "string") { + return payload; + } + + return JSON.stringify(payload); + } + + private parseCursor(cursor: string | undefined): SyncPullCursorState | null { + if (!cursor) { + return null; + } + + let decodedCursor: unknown; + try { + decodedCursor = JSON.parse(Buffer.from(cursor, "base64url").toString("utf8")); + } catch { + throw new BadRequestException("Invalid sync cursor"); + } + + if (typeof decodedCursor !== "object" || decodedCursor === null) { + throw new BadRequestException("Invalid sync cursor"); + } + + const cursorRecord = decodedCursor as { + serverTs?: unknown; + opId?: unknown; + }; + + if ( + typeof cursorRecord.serverTs !== "string" || + typeof cursorRecord.opId !== "string" || + Number.isNaN(Date.parse(cursorRecord.serverTs)) || + cursorRecord.opId.trim().length === 0 + ) { + throw new BadRequestException("Invalid sync cursor"); + } + + return { + serverTs: cursorRecord.serverTs, + opId: cursorRecord.opId + }; + } + + private encodeCursor(cursor: SyncPullCursorState): string { + return Buffer.from(JSON.stringify(cursor), "utf8").toString("base64url"); + } + + private isDuplicateOpIdError(error: unknown): boolean { + if (!(error instanceof Prisma.PrismaClientKnownRequestError)) { + return false; + } + + return error.code === "P2002"; + } +} diff --git a/apps/api/src/task/task.service.ts b/apps/api/src/task/task.service.ts index 3f62dea..f754003 100644 --- a/apps/api/src/task/task.service.ts +++ b/apps/api/src/task/task.service.ts @@ -75,7 +75,7 @@ export class TaskService { ]); return { - items: items.map((item) => this.serializeTask(item)), + items: items.map((item: TaskEntity) => this.serializeTask(item)), page, pageSize, total @@ -363,7 +363,7 @@ export class TaskService { ); await tx.taskTag.createMany({ - data: tags.map((tag) => ({ + data: tags.map((tag: { id: string }) => ({ taskId, tagId: tag.id })), @@ -382,7 +382,7 @@ export class TaskService { ddl: task.ddl?.toISOString() ?? null, completedAt: task.completedAt?.toISOString() ?? null, version: task.version, - tags: task.taskTags.map((taskTag) => taskTag.tag.name), + tags: task.taskTags.map((taskTag: { tag: { name: string } }) => taskTag.tag.name), createdAt: task.createdAt.toISOString(), updatedAt: task.updatedAt.toISOString() }; diff --git a/apps/api/test/sync-push.spec.ts b/apps/api/test/sync-push.spec.ts new file mode 100644 index 0000000..dfbacba --- /dev/null +++ b/apps/api/test/sync-push.spec.ts @@ -0,0 +1,419 @@ +import request from "supertest"; +import { INestApplication, ValidationPipe } from "@nestjs/common"; +import { Test, TestingModule } from "@nestjs/testing"; +import { PrismaService } from "../src/prisma/prisma.service"; +import { SyncController } from "../src/sync/sync.controller"; +import { SyncService } from "../src/sync/sync.service"; + +type SyncOperationRecord = { + id: string; + opId: string; + userId: string; + deviceId: string; + entityType: string; + entityId: string; + action: string; + payload: string | null; + clientTs: Date; + serverTs: Date; +}; + +type SyncOperationSelect = { + opId?: true; + entityId?: true; + entityType?: true; + action?: true; + payload?: true; + clientTs?: true; + deviceId?: true; + serverTs?: true; +}; + +type SyncOperationFindManyArgs = { + where: { + userId: string; + opId?: { + in: string[]; + }; + OR?: Array< + | { + serverTs: { + gt: Date; + }; + } + | { + serverTs: Date; + opId: { + gt: string; + }; + } + >; + }; + select: SyncOperationSelect; + orderBy?: Array<{ + serverTs?: "asc" | "desc"; + opId?: "asc" | "desc"; + }>; + take?: number; +}; + +type SyncOperationCreateArgs = { + data: { + opId: string; + userId: string; + deviceId: string; + entityType: string; + entityId: string; + action: string; + payload?: string; + clientTs: Date; + }; + select: { + opId: true; + serverTs: true; + }; +}; + +class InMemoryPrismaService { + private syncOperationIdSequence = 1; + private syncOperations: SyncOperationRecord[] = []; + + readonly syncOperation = { + findMany: async (args: SyncOperationFindManyArgs) => { + let items = this.syncOperations.filter((item) => item.userId === args.where.userId); + + if (args.where.opId?.in) { + items = items.filter((item) => args.where.opId?.in.includes(item.opId)); + } + + if (args.where.OR && args.where.OR.length > 0) { + items = items.filter((item) => + args.where.OR?.some((condition) => { + if ("gt" in condition.serverTs) { + return item.serverTs.getTime() > condition.serverTs.gt.getTime(); + } + + if ("opId" in condition) { + return ( + item.serverTs.getTime() === condition.serverTs.getTime() && + item.opId > condition.opId.gt + ); + } + + return false; + }) + ); + } + + if (args.orderBy && args.orderBy.length > 0) { + items = [...items].sort((left, right) => { + for (const orderRule of args.orderBy ?? []) { + if (orderRule.serverTs) { + const diff = left.serverTs.getTime() - right.serverTs.getTime(); + if (diff !== 0) { + return orderRule.serverTs === "asc" ? diff : -diff; + } + } + + if (orderRule.opId) { + const diff = left.opId.localeCompare(right.opId); + if (diff !== 0) { + return orderRule.opId === "asc" ? diff : -diff; + } + } + } + + return 0; + }); + } + + const limitedItems = args.take ? items.slice(0, args.take) : items; + + return limitedItems.map((item) => this.pickSelectedFields(item, args.select)); + }, + + create: async (args: SyncOperationCreateArgs) => { + const createdOperation: SyncOperationRecord = { + id: `sync_${this.syncOperationIdSequence++}`, + opId: args.data.opId, + userId: args.data.userId, + deviceId: args.data.deviceId, + entityType: args.data.entityType, + entityId: args.data.entityId, + action: args.data.action, + payload: args.data.payload ?? null, + clientTs: args.data.clientTs, + serverTs: new Date() + }; + + this.syncOperations.push(createdOperation); + + return { + opId: createdOperation.opId, + serverTs: createdOperation.serverTs + }; + } + }; + + getOperationCount(): number { + return this.syncOperations.length; + } + + seedOperations(records: Array>): void { + for (const record of records) { + this.syncOperations.push({ + ...record, + id: `sync_${this.syncOperationIdSequence++}` + }); + } + } + + private pickSelectedFields( + item: SyncOperationRecord, + select: SyncOperationSelect + ): Partial { + const result: Record = {}; + + for (const key of Object.keys(select) as Array) { + if (!select[key]) { + continue; + } + + const recordKey = key as keyof SyncOperationRecord; + result[recordKey] = item[recordKey]; + } + + return result as Partial; + } +} + +describe("SyncController (integration)", () => { + let app: INestApplication; + let prismaService: InMemoryPrismaService; + + beforeAll(async () => { + prismaService = new InMemoryPrismaService(); + + const moduleRef: TestingModule = await Test.createTestingModule({ + controllers: [SyncController], + providers: [SyncService, { provide: PrismaService, useValue: prismaService }] + }).compile(); + + app = moduleRef.createNestApplication(); + app.useGlobalPipes( + new ValidationPipe({ + transform: true, + whitelist: true, + forbidNonWhitelisted: true + }) + ); + await app.init(); + }); + + afterAll(async () => { + await app.close(); + }); + + it("should accept operations once and mark repeated push as duplicate", async () => { + const payload = { + operations: [ + { + opId: "op-create-1", + entityType: "TASK", + entityId: "task-1", + action: "CREATE", + payload: '{"title":"任务一"}', + clientTs: 1712419200000, + deviceId: "device-a" + }, + { + opId: "op-update-1", + entityType: "TASK", + entityId: "task-1", + action: "UPDATE", + payload: '{"title":"任务一-更新"}', + clientTs: 1712419201000, + deviceId: "device-a" + } + ] + }; + + const firstResponse = await request(app.getHttpServer()) + .post("/sync/push") + .set("x-user-id", "user-1") + .send(payload) + .expect(201); + + expect(firstResponse.body.acceptedCount).toBe(2); + expect(firstResponse.body.duplicateCount).toBe(0); + expect(firstResponse.body.failedCount).toBe(0); + expect(firstResponse.body.results).toEqual([ + expect.objectContaining({ + opId: "op-create-1", + status: "accepted" + }), + expect.objectContaining({ + opId: "op-update-1", + status: "accepted" + }) + ]); + expect(prismaService.getOperationCount()).toBe(2); + + const secondResponse = await request(app.getHttpServer()) + .post("/sync/push") + .set("x-user-id", "user-1") + .send(payload) + .expect(201); + + expect(secondResponse.body.acceptedCount).toBe(0); + expect(secondResponse.body.duplicateCount).toBe(2); + expect(secondResponse.body.failedCount).toBe(0); + expect(secondResponse.body.results).toEqual([ + expect.objectContaining({ + opId: "op-create-1", + status: "duplicate", + reason: "already_synced" + }), + expect.objectContaining({ + opId: "op-update-1", + status: "duplicate", + reason: "already_synced" + }) + ]); + expect(prismaService.getOperationCount()).toBe(2); + }); + + it("should mark duplicated op ids in the same batch as duplicate", async () => { + const response = await request(app.getHttpServer()) + .post("/sync/push") + .set("x-user-id", "user-2") + .send({ + operations: [ + { + opId: "op-dup-1", + entityType: "TASK", + entityId: "task-2", + action: "CREATE", + payload: '{"title":"任务二"}', + clientTs: 1712419300000, + deviceId: "device-b" + }, + { + opId: "op-dup-1", + entityType: "TASK", + entityId: "task-2", + action: "UPDATE", + payload: '{"title":"任务二-重复"}', + clientTs: 1712419301000, + deviceId: "device-b" + } + ] + }) + .expect(201); + + expect(response.body.acceptedCount).toBe(1); + expect(response.body.duplicateCount).toBe(1); + expect(response.body.failedCount).toBe(0); + expect(response.body.results[0]).toEqual( + expect.objectContaining({ + opId: "op-dup-1", + status: "accepted" + }) + ); + expect(response.body.results[1]).toEqual( + expect.objectContaining({ + opId: "op-dup-1", + status: "duplicate", + reason: "same_batch_duplicate" + }) + ); + expect(prismaService.getOperationCount()).toBe(3); + }); + + it("should pull operations incrementally with a stable cursor", async () => { + prismaService.seedOperations([ + { + opId: "pull-op-1", + userId: "user-pull", + deviceId: "device-c", + entityType: "TASK", + entityId: "task-10", + action: "CREATE", + payload: '{"title":"任务甲"}', + clientTs: new Date("2026-04-06T10:00:00.000Z"), + serverTs: new Date("2026-04-06T10:10:00.000Z") + }, + { + opId: "pull-op-2", + userId: "user-pull", + deviceId: "device-c", + entityType: "TASK", + entityId: "task-10", + action: "UPDATE", + payload: '{"title":"任务甲-更新"}', + clientTs: new Date("2026-04-06T10:01:00.000Z"), + serverTs: new Date("2026-04-06T10:10:00.000Z") + }, + { + opId: "pull-op-3", + userId: "user-pull", + deviceId: "device-c", + entityType: "TASK", + entityId: "task-11", + action: "CREATE", + payload: '{"title":"任务乙"}', + clientTs: new Date("2026-04-06T10:02:00.000Z"), + serverTs: new Date("2026-04-06T10:11:00.000Z") + }, + { + opId: "pull-op-other-user", + userId: "user-other", + deviceId: "device-z", + entityType: "TASK", + entityId: "task-99", + action: "CREATE", + payload: '{"title":"其他用户任务"}', + clientTs: new Date("2026-04-06T10:03:00.000Z"), + serverTs: new Date("2026-04-06T10:12:00.000Z") + } + ]); + + const firstResponse = await request(app.getHttpServer()) + .get("/sync/pull") + .set("x-user-id", "user-pull") + .query({ limit: 2 }) + .expect(200); + + expect(firstResponse.body.items.map((item: { opId: string }) => item.opId)).toEqual([ + "pull-op-1", + "pull-op-2" + ]); + expect(firstResponse.body.hasMore).toBe(true); + expect(firstResponse.body.nextCursor).toEqual(expect.any(String)); + + const secondResponse = await request(app.getHttpServer()) + .get("/sync/pull") + .set("x-user-id", "user-pull") + .query({ + limit: 2, + cursor: firstResponse.body.nextCursor + }) + .expect(200); + + expect(secondResponse.body.items.map((item: { opId: string }) => item.opId)).toEqual([ + "pull-op-3" + ]); + expect(secondResponse.body.hasMore).toBe(false); + expect(secondResponse.body.nextCursor).toEqual(expect.any(String)); + }); + + it("should reject invalid cursor payload", async () => { + await request(app.getHttpServer()) + .get("/sync/pull") + .set("x-user-id", "user-invalid-cursor") + .query({ + cursor: "not-a-valid-cursor" + }) + .expect(400); + }); +}); diff --git a/apps/web/src/components/task-rich-editor.tsx b/apps/web/src/components/task-rich-editor.tsx index 578a30d..769f001 100644 --- a/apps/web/src/components/task-rich-editor.tsx +++ b/apps/web/src/components/task-rich-editor.tsx @@ -1,9 +1,9 @@ -import { useEffect, useRef, useState, type ChangeEvent } from "react"; +import { memo, useEffect, useRef, useState, type ChangeEvent } from "react"; import imageCompression from "browser-image-compression"; import type { Editor as TiptapEditor } from "@tiptap/core"; import Link from "@tiptap/extension-link"; import StarterKit from "@tiptap/starter-kit"; -import { EditorContent, type JSONContent, useEditor } from "@tiptap/react"; +import { EditorContent, type JSONContent, useEditor, useEditorState } from "@tiptap/react"; import { ResizableImage } from "@/extensions/resizable-image"; import { ResizableVideo } from "@/extensions/resizable-video"; import { ResizableYoutube } from "@/extensions/resizable-youtube"; @@ -11,6 +11,7 @@ import { cn } from "@/lib/utils"; const MAX_IMAGE_UPLOAD_BYTES = 20 * 1024 * 1024; const MAX_VIDEO_UPLOAD_BYTES = 10 * 1024 * 1024; +const EDITOR_CHANGE_DEBOUNCE_MS = 120; type TaskRichEditorProps = { valueJson: string | null; @@ -25,7 +26,37 @@ type ToolbarButtonProps = { onClick: () => void; }; -function ToolbarButton({ label, disabled = false, active = false, onClick }: ToolbarButtonProps) { +type ToolbarState = { + bold: boolean; + italic: boolean; + heading: boolean; + bulletList: boolean; + link: boolean; +}; + +type EditorToolbarProps = { + editor: TiptapEditor | null; + onInsertImageUrl: () => void; + onOpenImageUpload: () => void; + onInsertVideoUrl: () => void; + onOpenVideoUpload: () => void; + onSetLink: () => void; +}; + +const DEFAULT_TOOLBAR_STATE: ToolbarState = { + bold: false, + italic: false, + heading: false, + bulletList: false, + link: false +}; + +const ToolbarButton = memo(function ToolbarButton({ + label, + disabled = false, + active = false, + onClick +}: ToolbarButtonProps) { return ( + + + + ); +}); + +type TaskListPanelProps = { + tasks: LocalTaskRecord[]; + selectedTaskId: string | null; + quotaSnapshot: StorageQuotaSnapshot | null; + creating: boolean; + onCreateTask: () => void; + onSelectTask: (taskId: string) => void; +}; + +const TaskListPanel = memo(function TaskListPanel({ + tasks, + selectedTaskId, + quotaSnapshot, + creating, + onCreateTask, + onSelectTask +}: TaskListPanelProps) { + return ( +
+
+

任务列表

+ +
+ + {quotaSnapshot ? ( +

= 85 ? "text-destructive" : "text-muted-foreground" + )} + > + 空间占用(估算):{formatStorageSize(quotaSnapshot.usedBytes)} /{" "} + {formatStorageSize(quotaSnapshot.quotaBytes)}({quotaSnapshot.usedPercent.toFixed(1)}%) +

+ ) : null} + + {tasks.length === 0 ? ( +

+ 还没有任务,点击右上角“新建任务”。 +

+ ) : ( +
+ {tasks.map((task) => { + const isActive = task.id === selectedTaskId; + return ( + + ); + })} +
+ )} +
+ ); +}); + +type TaskDetailPanelProps = { + selectedTaskId: string | null; + selectedTask: LocalTaskRecord | undefined; + formState: TaskFormState; + editorKey: string; + editorSeedState: TaskEditorState; + saving: boolean; + deleting: boolean; + onSaveTask: () => void; + onDeleteTask: () => void; + onTitleChange: (value: string) => void; + onStatusChange: (value: LocalTaskStatus) => void; + onPriorityChange: (value: LocalTaskPriority) => void; + onDdlChange: (value: string) => void; + onEditorChange: (payload: { json: string | null; text: string }) => void; +}; + +const TaskDetailPanel = memo(function TaskDetailPanel({ + selectedTaskId, + selectedTask, + formState, + editorKey, + editorSeedState, + saving, + deleting, + onSaveTask, + onDeleteTask, + onTitleChange, + onStatusChange, + onPriorityChange, + onDdlChange, + onEditorChange +}: TaskDetailPanelProps) { + return ( +
+
+

任务详情

+
+ + +
+
+ + {!selectedTaskId || !selectedTask ? ( +

+ 请选择一个任务进行编辑。 +

+ ) : ( +
+ + +
+ + + +
+ + + +
+

任务内容

+
+ +
+
+
+ )} +
+ ); +}); + export function TodoShellPage({ session }: TodoShellPageProps) { const [selectedTaskId, setSelectedTaskId] = useState(null); const [formState, setFormState] = useState(DEFAULT_FORM_STATE); @@ -147,7 +548,13 @@ export function TodoShellPage({ session }: TodoShellPageProps) { const [feedback, setFeedback] = useState(null); const [feedbackVisible, setFeedbackVisible] = useState(false); const [draftReadyTaskId, setDraftReadyTaskId] = useState(null); - const savedTaskSnapshotRef = useRef(serializeFormState(DEFAULT_FORM_STATE)); + const [editorSeedState, setEditorSeedState] = useState(DEFAULT_EDITOR_STATE); + const [editorKey, setEditorKey] = useState("editor-empty"); + const savedTaskSnapshotRef = useRef(serializeFormState(DEFAULT_FORM_STATE, DEFAULT_EDITOR_STATE)); + const formStateRef = useRef(DEFAULT_FORM_STATE); + const editorStateRef = useRef(DEFAULT_EDITOR_STATE); + const draftPersistTimeoutRef = useRef(null); + const { status: syncStatus, triggerSync } = useSyncEngine(session); const userId = session?.user.id ?? ""; @@ -175,6 +582,49 @@ export function TodoShellPage({ session }: TodoShellPageProps) { return getLocalTaskById(selectedTaskId); }, [selectedTaskId]); + useEffect(() => { + formStateRef.current = formState; + }, [formState]); + + const scheduleDraftPersist = useCallback((): void => { + if (!selectedTaskId || draftReadyTaskId !== selectedTaskId || !userId) { + return; + } + + if (draftPersistTimeoutRef.current !== null) { + window.clearTimeout(draftPersistTimeoutRef.current); + } + + const currentTaskId = selectedTaskId; + const currentUserId = userId; + const currentFormState = formStateRef.current; + const currentEditorState = editorStateRef.current; + const currentSnapshot = serializeFormState(currentFormState, currentEditorState); + + draftPersistTimeoutRef.current = window.setTimeout(() => { + async function persistDraft(): Promise { + if (currentSnapshot === savedTaskSnapshotRef.current) { + await deleteLocalTaskDraft(currentTaskId); + return; + } + + await saveLocalTaskDraft({ + taskId: currentTaskId, + userId: currentUserId, + title: currentFormState.title, + contentJson: currentEditorState.contentJson, + contentText: currentEditorState.contentText, + priority: currentFormState.priority, + status: currentFormState.status, + ddlInput: currentFormState.ddlInput + }); + } + + void persistDraft(); + draftPersistTimeoutRef.current = null; + }, DRAFT_PERSIST_DEBOUNCE_MS); + }, [draftReadyTaskId, selectedTaskId, userId]); + useEffect(() => { if (!tasks || tasks.length === 0) { setSelectedTaskId(null); @@ -195,8 +645,12 @@ export function TodoShellPage({ session }: TodoShellPageProps) { useEffect(() => { if (!selectedTaskId) { setFormState(DEFAULT_FORM_STATE); + formStateRef.current = DEFAULT_FORM_STATE; + editorStateRef.current = DEFAULT_EDITOR_STATE; + setEditorSeedState(DEFAULT_EDITOR_STATE); + setEditorKey("editor-empty"); setDraftReadyTaskId(null); - savedTaskSnapshotRef.current = serializeFormState(DEFAULT_FORM_STATE); + savedTaskSnapshotRef.current = serializeFormState(DEFAULT_FORM_STATE, DEFAULT_EDITOR_STATE); return; } @@ -209,14 +663,26 @@ export function TodoShellPage({ session }: TodoShellPageProps) { async function hydrateFormState(): Promise { const persistedTaskState = createFormStateFromTask(currentTask); + const persistedEditorState = createEditorStateFromTask(currentTask); const localDraft = await getLocalTaskDraft(currentTask.id); if (cancelled) { return; } - savedTaskSnapshotRef.current = serializeFormState(persistedTaskState); - setFormState(localDraft ? createFormStateFromDraft(localDraft) : persistedTaskState); + const nextFormState = localDraft ? createFormStateFromDraft(localDraft) : persistedTaskState; + const nextEditorState = localDraft + ? createEditorStateFromDraft(localDraft) + : persistedEditorState; + + savedTaskSnapshotRef.current = serializeFormState(persistedTaskState, persistedEditorState); + formStateRef.current = nextFormState; + editorStateRef.current = nextEditorState; + setFormState(nextFormState); + setEditorSeedState(nextEditorState); + setEditorKey( + `${currentTask.id}:${currentTask.updatedAt}:${localDraft?.updatedAt ?? currentTask.updatedAt}` + ); setDraftReadyTaskId(currentTask.id); } @@ -228,34 +694,16 @@ export function TodoShellPage({ session }: TodoShellPageProps) { }, [selectedTask, selectedTaskId]); useEffect(() => { - if (!selectedTaskId || !selectedTask || draftReadyTaskId !== selectedTaskId || !userId) { - return; - } + scheduleDraftPersist(); + }, [formState, scheduleDraftPersist]); - const currentSnapshot = serializeFormState(formState); - const currentTaskId = selectedTaskId; - const currentUserId = userId; - - async function persistDraft(): Promise { - if (currentSnapshot === savedTaskSnapshotRef.current) { - await deleteLocalTaskDraft(currentTaskId); - return; + useEffect(() => { + return () => { + if (draftPersistTimeoutRef.current !== null) { + window.clearTimeout(draftPersistTimeoutRef.current); } - - await saveLocalTaskDraft({ - taskId: currentTaskId, - userId: currentUserId, - title: formState.title, - contentJson: formState.contentJson, - contentText: formState.contentText, - priority: formState.priority, - status: formState.status, - ddlInput: formState.ddlInput - }); - } - - void persistDraft(); - }, [draftReadyTaskId, formState, selectedTask, selectedTaskId, userId]); + }; + }, []); const showFeedback = useCallback((message: string, tone: FeedbackNotice["tone"]): void => { setFeedback({ message, tone }); @@ -339,11 +787,12 @@ export function TodoShellPage({ session }: TodoShellPageProps) { try { setSaving(true); + const currentEditorState = editorStateRef.current; const updatedTask = await updateLocalTask({ id: selectedTaskId, title: formState.title, - contentText: formState.contentText || null, - contentJson: formState.contentJson, + contentText: currentEditorState.contentText || null, + contentJson: currentEditorState.contentJson, priority: formState.priority, status: formState.status, ddlAt: parseDatetimeLocalValue(formState.ddlInput) @@ -354,7 +803,10 @@ export function TodoShellPage({ session }: TodoShellPageProps) { return; } - savedTaskSnapshotRef.current = serializeFormState(createFormStateFromTask(updatedTask)); + savedTaskSnapshotRef.current = serializeFormState( + createFormStateFromTask(updatedTask), + createEditorStateFromTask(updatedTask) + ); await deleteLocalTaskDraft(selectedTaskId); showFeedback("任务已保存。", "success"); } finally { @@ -382,6 +834,49 @@ export function TodoShellPage({ session }: TodoShellPageProps) { } }, [deleting, selectedTaskId, showFeedback]); + const handleEditorChange = useCallback( + (payload: { json: string | null; text: string }): void => { + editorStateRef.current = { + contentJson: payload.json, + contentText: payload.text + }; + scheduleDraftPersist(); + }, + [scheduleDraftPersist] + ); + + const handleSelectTask = useCallback((taskId: string): void => { + setSelectedTaskId(taskId); + }, []); + + const handleTitleChange = useCallback((value: string): void => { + setFormState((previous) => ({ + ...previous, + title: value + })); + }, []); + + const handleStatusChange = useCallback((value: LocalTaskStatus): void => { + setFormState((previous) => ({ + ...previous, + status: value + })); + }, []); + + const handlePriorityChange = useCallback((value: LocalTaskPriority): void => { + setFormState((previous) => ({ + ...previous, + priority: value + })); + }, []); + + const handleDdlChange = useCallback((value: string): void => { + setFormState((previous) => ({ + ...previous, + ddlInput: value + })); + }, []); + useEffect(() => { function handleKeydown(event: KeyboardEvent): void { const isSaveShortcut = (event.ctrlKey || event.metaKey) && event.key.toLowerCase() === "s"; @@ -417,191 +912,41 @@ export function TodoShellPage({ session }: TodoShellPageProps) { } const taskList = tasks ?? []; + const quotaPanelSnapshot = quotaSnapshot ?? null; return ( <> {renderFeedbackBanner()} -
-
-
-

任务列表

- -
+
+ - {quotaSnapshot ? ( -

= 85 ? "text-destructive" : "text-muted-foreground" - )} - > - 空间占用(估算):{formatStorageSize(quotaSnapshot.usedBytes)} /{" "} - {formatStorageSize(quotaSnapshot.quotaBytes)}({quotaSnapshot.usedPercent.toFixed(1)} - %) -

- ) : null} +
+ - {taskList.length === 0 ? ( -

- 还没有任务,点击右上角“新建任务”。 -

- ) : ( -
- {taskList.map((task) => { - const isActive = task.id === selectedTaskId; - return ( - - ); - })} -
- )} -
- -
-
-

任务详情

-
- - -
-
- - {!selectedTaskId || !selectedTask ? ( -

- 请选择一个任务进行编辑。 -

- ) : ( -
- - -
- - - -
- - - -
-

任务内容

-
- - setFormState((previous) => ({ - ...previous, - contentJson: payload.json, - contentText: payload.text - })) - } - /> -
-
-
- )} -
+ +
); diff --git a/apps/web/src/services/local-db.ts b/apps/web/src/services/local-db.ts index 9e934c4..9fa0d97 100644 --- a/apps/web/src/services/local-db.ts +++ b/apps/web/src/services/local-db.ts @@ -17,6 +17,7 @@ export type LocalTaskRecord = { priority: LocalTaskPriority; status: LocalTaskStatus; ddlAt: number | null; + version: number; createdAt: number; updatedAt: number; deletedAt: number | null; @@ -47,10 +48,33 @@ export type LocalTaskDraftRecord = { updatedAt: number; }; +export type LocalSyncStateRecord = { + userId: string; + cursor: string | null; + lastSyncedAt: number | null; + updatedAt: number; +}; + +export type LocalSyncInboxRecord = { + opId: string; + userId: string; + entityId: string; + entityType: SyncEntityType; + action: SyncActionType; + payload: string | null; + clientTs: number; + deviceId: string; + serverTs: number; + receivedAt: number; + appliedAt: number | null; +}; + class TodoLocalDb extends Dexie { declare tasks: Table; declare opLogs: Table; declare taskDrafts: Table; + declare syncStates: Table; + declare syncInbox: Table; constructor() { super("todolist-web-db"); @@ -66,9 +90,38 @@ class TodoLocalDb extends Dexie { task_drafts: "&taskId,userId,updatedAt" }); + this.version(3).stores({ + tasks: "&id,userId,status,priority,ddlAt,updatedAt,deletedAt", + op_logs: "&opId,entityId,entityType,action,clientTs,syncedAt", + task_drafts: "&taskId,userId,updatedAt", + sync_states: "&userId,updatedAt,lastSyncedAt", + sync_inbox: "&opId,userId,entityId,serverTs,appliedAt" + }); + + this.version(4) + .stores({ + tasks: "&id,userId,status,priority,ddlAt,updatedAt,deletedAt", + op_logs: "&opId,entityId,entityType,action,clientTs,syncedAt", + task_drafts: "&taskId,userId,updatedAt", + sync_states: "&userId,updatedAt,lastSyncedAt", + sync_inbox: "&opId,userId,entityId,serverTs,appliedAt" + }) + .upgrade(async (tx) => { + await tx + .table("tasks") + .toCollection() + .modify((task: Partial) => { + if (typeof task.version !== "number") { + task.version = 1; + } + }); + }); + this.tasks = this.table("tasks"); this.opLogs = this.table("op_logs"); this.taskDrafts = this.table("task_drafts"); + this.syncStates = this.table("sync_states"); + this.syncInbox = this.table("sync_inbox"); } } diff --git a/apps/web/src/services/local-sync-repo.ts b/apps/web/src/services/local-sync-repo.ts new file mode 100644 index 0000000..87d13e9 --- /dev/null +++ b/apps/web/src/services/local-sync-repo.ts @@ -0,0 +1,167 @@ +import { + localDb, + type LocalOpLogRecord, + type LocalSyncInboxRecord, + type LocalSyncStateRecord +} from "@/services/local-db"; +import type { SyncPullItem } from "@/services/sync-api"; + +export const MAX_SYNC_RETRY_COUNT = 5; + +export async function listPendingSyncOperations(limit = 20): Promise { + const records = await localDb.opLogs.orderBy("clientTs").toArray(); + + return records + .filter((record) => record.syncedAt === null && record.retryCount < MAX_SYNC_RETRY_COUNT) + .slice(0, limit); +} + +export async function countPendingSyncOperations(): Promise { + const records = await localDb.opLogs.toArray(); + return records.filter( + (record) => record.syncedAt === null && record.retryCount < MAX_SYNC_RETRY_COUNT + ).length; +} + +export async function countBlockedSyncOperations(): Promise { + const records = await localDb.opLogs.toArray(); + return records.filter( + (record) => record.syncedAt === null && record.retryCount >= MAX_SYNC_RETRY_COUNT + ).length; +} + +export async function markSyncOperationsSucceeded( + opIds: string[], + syncedAt: number +): Promise { + if (opIds.length === 0) { + return; + } + + const records = await localDb.opLogs.bulkGet(opIds); + const nextRecords = records + .filter((record): record is LocalOpLogRecord => record !== undefined) + .map((record) => ({ + ...record, + syncedAt, + errorMessage: null + })); + + if (nextRecords.length > 0) { + await localDb.opLogs.bulkPut(nextRecords); + } +} + +export async function markSyncOperationsFailed( + failures: Array<{ opId: string; errorMessage: string }> +): Promise { + if (failures.length === 0) { + return; + } + + const failureMap = new Map(failures.map((failure) => [failure.opId, failure.errorMessage])); + const records = await localDb.opLogs.bulkGet(failures.map((failure) => failure.opId)); + const nextRecords = records + .filter((record): record is LocalOpLogRecord => record !== undefined) + .map((record) => ({ + ...record, + retryCount: record.retryCount + 1, + errorMessage: failureMap.get(record.opId) ?? "同步失败" + })); + + if (nextRecords.length > 0) { + await localDb.opLogs.bulkPut(nextRecords); + } +} + +export async function getLocalSyncState(userId: string): Promise { + return localDb.syncStates.get(userId); +} + +export async function saveLocalSyncState(input: { + userId: string; + cursor: string | null; + lastSyncedAt: number | null; +}): Promise { + await localDb.syncStates.put({ + userId: input.userId, + cursor: input.cursor, + lastSyncedAt: input.lastSyncedAt, + updatedAt: Date.now() + }); +} + +export async function enqueueRemoteSyncOperations( + userId: string, + operations: SyncPullItem[] +): Promise { + if (operations.length === 0) { + return 0; + } + + const receivedAt = Date.now(); + const records: LocalSyncInboxRecord[] = operations.map((operation) => ({ + opId: operation.opId, + userId, + entityId: operation.entityId, + entityType: operation.entityType, + action: operation.action, + payload: operation.payload, + clientTs: operation.clientTs, + deviceId: operation.deviceId, + serverTs: new Date(operation.serverTs).getTime(), + receivedAt, + appliedAt: null + })); + + await localDb.syncInbox.bulkPut(records); + return records.length; +} + +export async function listPendingRemoteOperations( + userId: string, + limit = 100 +): Promise { + const records = await localDb.syncInbox.where("userId").equals(userId).toArray(); + + return records + .filter((record) => record.appliedAt === null) + .sort((left, right) => { + if (left.serverTs !== right.serverTs) { + return left.serverTs - right.serverTs; + } + + if (left.clientTs !== right.clientTs) { + return left.clientTs - right.clientTs; + } + + return left.opId.localeCompare(right.opId); + }) + .slice(0, limit); +} + +export async function markRemoteOperationsApplied( + opIds: string[], + appliedAt: number +): Promise { + if (opIds.length === 0) { + return; + } + + const records = await localDb.syncInbox.bulkGet(opIds); + const nextRecords = records + .filter((record): record is LocalSyncInboxRecord => record !== undefined) + .map((record) => ({ + ...record, + appliedAt + })); + + if (nextRecords.length > 0) { + await localDb.syncInbox.bulkPut(nextRecords); + } +} + +export async function countPendingRemoteOperations(userId: string): Promise { + const records = await localDb.syncInbox.where("userId").equals(userId).toArray(); + return records.filter((record) => record.appliedAt === null).length; +} diff --git a/apps/web/src/services/local-task-repo.ts b/apps/web/src/services/local-task-repo.ts index cd8edfd..b611047 100644 --- a/apps/web/src/services/local-task-repo.ts +++ b/apps/web/src/services/local-task-repo.ts @@ -24,6 +24,21 @@ export type UpdateLocalTaskInput = { ddlAt?: number | null; }; +type SyncTaskPayload = { + id?: string; + userId?: string; + title: string; + contentJson: string | null; + contentText?: string | null; + priority: LocalTaskPriority; + status: LocalTaskStatus; + ddlAt: number | null; + version: number; + createdAt?: number; + updatedAt: number; + deletedAt?: number | null; +}; + function resolveDeviceId(): string { const savedDeviceId = window.localStorage.getItem(DEVICE_ID_STORAGE_KEY); if (savedDeviceId) { @@ -54,6 +69,18 @@ function createOpLogRecord( }; } +function createSyncTaskPayload(payload: SyncTaskPayload): string { + const nextPayload: Record = { + ...payload + }; + + if (payload.contentJson !== null) { + delete nextPayload.contentText; + } + + return JSON.stringify(nextPayload); +} + export async function listLocalTasksByUser(userId: string): Promise { const tasks = await localDb.tasks.where("userId").equals(userId).toArray(); return tasks @@ -81,12 +108,30 @@ export async function createLocalTask(input: CreateLocalTaskInput): Promise { await localDb.tasks.add(task); @@ -104,6 +149,7 @@ export async function updateLocalTask( return undefined; } + const nextVersion = currentTask.version + 1; const nextTask: LocalTaskRecord = { ...currentTask, title: input.title !== undefined ? input.title.trim() || "未命名任务" : currentTask.title, @@ -112,19 +158,21 @@ export async function updateLocalTask( priority: input.priority ?? currentTask.priority, status: input.status ?? currentTask.status, ddlAt: input.ddlAt !== undefined ? input.ddlAt : currentTask.ddlAt, + version: nextVersion, updatedAt: Date.now() }; const opLog = createOpLogRecord( nextTask.id, "UPDATE", - JSON.stringify({ + createSyncTaskPayload({ title: nextTask.title, - contentText: nextTask.contentText, contentJson: nextTask.contentJson, + contentText: nextTask.contentText, priority: nextTask.priority, status: nextTask.status, ddlAt: nextTask.ddlAt, + version: nextTask.version, updatedAt: nextTask.updatedAt }) ); @@ -144,13 +192,23 @@ export async function deleteLocalTask(id: string): Promise { } const deletedAt = Date.now(); + const nextVersion = currentTask.version + 1; const nextTask: LocalTaskRecord = { ...currentTask, + version: nextVersion, deletedAt, updatedAt: deletedAt }; - const opLog = createOpLogRecord(id, "DELETE", JSON.stringify({ deletedAt })); + const opLog = createOpLogRecord( + id, + "DELETE", + JSON.stringify({ + deletedAt, + version: nextTask.version, + updatedAt: nextTask.updatedAt + }) + ); await localDb.transaction("rw", localDb.tasks, localDb.opLogs, async () => { await localDb.tasks.put(nextTask); diff --git a/apps/web/src/services/sync-api.ts b/apps/web/src/services/sync-api.ts new file mode 100644 index 0000000..e2c2ed0 --- /dev/null +++ b/apps/web/src/services/sync-api.ts @@ -0,0 +1,159 @@ +import type { LocalOpLogRecord } from "@/services/local-db"; + +export type SyncPushResult = { + acceptedCount: number; + duplicateCount: number; + failedCount: number; + results: Array<{ + opId: string; + status: "accepted" | "duplicate" | "failed"; + serverTs: string | null; + reason: string | null; + }>; +}; + +export type SyncPullItem = { + opId: string; + entityId: string; + entityType: "TASK"; + action: "CREATE" | "UPDATE" | "DELETE"; + payload: string | null; + clientTs: number; + deviceId: string; + serverTs: string; +}; + +export type SyncPullResult = { + items: SyncPullItem[]; + nextCursor: string | null; + hasMore: boolean; +}; + +const DEFAULT_API_BASE_URL = "http://localhost:3000"; + +function resolveApiBaseUrl(): string { + const envBaseUrl = import.meta.env.VITE_API_BASE_URL as string | undefined; + if (!envBaseUrl) { + return DEFAULT_API_BASE_URL; + } + + return envBaseUrl.replace(/\/+$/, ""); +} + +async function parseErrorMessage(response: Response): Promise { + if (response.status === 413) { + return "单次同步内容过大,请精简本次任务内容或等待系统分批重试。"; + } + + try { + const body = (await response.json()) as { message?: string | string[] }; + if (Array.isArray(body.message)) { + return body.message.join(","); + } + + if (typeof body.message === "string" && body.message.trim()) { + return body.message; + } + } catch { + return `请求失败(${response.status})`; + } + + return `请求失败(${response.status})`; +} + +type SyncPushOperationRequest = { + opId: string; + entityId: string; + entityType: LocalOpLogRecord["entityType"]; + action: LocalOpLogRecord["action"]; + payload: string; + clientTs: number; + deviceId: string; +}; + +function compactOperationPayload(payload: string): string { + try { + const parsed = JSON.parse(payload) as unknown; + if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { + return payload; + } + + const nextPayload = { ...(parsed as Record) }; + if (nextPayload.contentJson !== undefined && nextPayload.contentJson !== null) { + delete nextPayload.contentText; + } + + return JSON.stringify(nextPayload); + } catch { + return payload; + } +} + +export function serializeSyncOperationForRequest( + operation: LocalOpLogRecord +): SyncPushOperationRequest { + return { + opId: operation.opId, + entityId: operation.entityId, + entityType: operation.entityType, + action: operation.action, + payload: compactOperationPayload(operation.payload), + clientTs: operation.clientTs, + deviceId: operation.deviceId + }; +} + +export async function pushSyncOperations( + userId: string, + operations: LocalOpLogRecord[] +): Promise { + const requestOperations = operations.map((operation) => + serializeSyncOperationForRequest(operation) + ); + + const response = await fetch(`${resolveApiBaseUrl()}/sync/push`, { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-user-id": userId + }, + body: JSON.stringify({ + operations: requestOperations + }) + }); + + if (!response.ok) { + throw new Error(await parseErrorMessage(response)); + } + + return (await response.json()) as SyncPushResult; +} + +export async function pullSyncOperations(input: { + userId: string; + cursor: string | null; + limit?: number; +}): Promise { + const requestUrl = new URL(`${resolveApiBaseUrl()}/sync/pull`); + + if (input.cursor) { + requestUrl.searchParams.set("cursor", input.cursor); + } + + if (input.limit !== undefined) { + requestUrl.searchParams.set("limit", String(input.limit)); + } + + const response = await fetch(requestUrl, { + method: "GET", + headers: { + "x-user-id": input.userId + } + }); + + if (!response.ok) { + throw new Error(await parseErrorMessage(response)); + } + + return (await response.json()) as SyncPullResult; +} diff --git a/apps/web/src/services/sync-merge.ts b/apps/web/src/services/sync-merge.ts new file mode 100644 index 0000000..666d919 --- /dev/null +++ b/apps/web/src/services/sync-merge.ts @@ -0,0 +1,261 @@ +import { + localDb, + type LocalSyncInboxRecord, + type LocalTaskPriority, + type LocalTaskRecord, + type LocalTaskStatus +} from "@/services/local-db"; +import { listPendingRemoteOperations } from "@/services/local-sync-repo"; + +const TASK_PRIORITY_VALUES: LocalTaskPriority[] = ["LOW", "MEDIUM", "HIGH", "URGENT"]; +const TASK_STATUS_VALUES: LocalTaskStatus[] = ["TODO", "IN_PROGRESS", "DONE", "ARCHIVED"]; + +type RemoteTaskPayload = { + userId?: unknown; + title?: unknown; + contentJson?: unknown; + contentText?: unknown; + priority?: unknown; + status?: unknown; + ddlAt?: unknown; + version?: unknown; + createdAt?: unknown; + updatedAt?: unknown; + deletedAt?: unknown; +}; + +function normalizePriority(value: unknown, fallback: LocalTaskPriority): LocalTaskPriority { + if (typeof value === "string" && TASK_PRIORITY_VALUES.includes(value as LocalTaskPriority)) { + return value as LocalTaskPriority; + } + + return fallback; +} + +function normalizeStatus(value: unknown, fallback: LocalTaskStatus): LocalTaskStatus { + if (typeof value === "string" && TASK_STATUS_VALUES.includes(value as LocalTaskStatus)) { + return value as LocalTaskStatus; + } + + return fallback; +} + +function normalizeStringOrNull(value: unknown, fallback: string | null): string | null { + if (typeof value === "string") { + return value; + } + + if (value === null) { + return null; + } + + return fallback; +} + +function collectTextFromRichContent(value: unknown, fragments: string[]): void { + if (!value || typeof value !== "object") { + return; + } + + const node = value as { + text?: unknown; + content?: unknown; + }; + + if (typeof node.text === "string" && node.text.trim().length > 0) { + fragments.push(node.text.trim()); + } + + if (Array.isArray(node.content)) { + for (const child of node.content) { + collectTextFromRichContent(child, fragments); + } + } +} + +function extractTextFromContentJson(contentJson: string | null): string | null { + if (!contentJson) { + return null; + } + + try { + const parsed = JSON.parse(contentJson) as unknown; + const fragments: string[] = []; + collectTextFromRichContent(parsed, fragments); + return fragments.length > 0 ? fragments.join(" ") : null; + } catch { + return null; + } +} + +function normalizeNullableNumber(value: unknown, fallback: number | null): number | null { + if (typeof value === "number" && Number.isFinite(value)) { + return value; + } + + if (value === null) { + return null; + } + + return fallback; +} + +function normalizePositiveNumber(value: unknown, fallback: number): number { + if (typeof value === "number" && Number.isFinite(value) && value > 0) { + return value; + } + + return fallback; +} + +function parseOperationPayload(operation: LocalSyncInboxRecord): RemoteTaskPayload { + if (!operation.payload) { + return {}; + } + + const parsed = JSON.parse(operation.payload) as unknown; + if (!parsed || typeof parsed !== "object") { + return {}; + } + + return parsed as RemoteTaskPayload; +} + +function createFallbackTask( + operation: LocalSyncInboxRecord, + userId: string, + updatedAt: number, + version: number +): LocalTaskRecord { + return { + id: operation.entityId, + userId, + title: "未命名任务", + contentJson: null, + contentText: null, + priority: "MEDIUM", + status: "TODO", + ddlAt: null, + version, + createdAt: updatedAt, + updatedAt, + deletedAt: null + }; +} + +function buildIncomingTaskRecord( + operation: LocalSyncInboxRecord, + currentTask: LocalTaskRecord | undefined +): LocalTaskRecord { + const payload = parseOperationPayload(operation); + const fallbackVersion = currentTask?.version ?? 1; + const version = normalizePositiveNumber(payload.version, fallbackVersion); + const updatedAt = normalizePositiveNumber( + payload.updatedAt, + normalizePositiveNumber(payload.deletedAt, operation.clientTs) + ); + const fallbackTask = + currentTask ?? createFallbackTask(operation, operation.userId, updatedAt, version); + const contentJson = normalizeStringOrNull(payload.contentJson, fallbackTask.contentJson); + const contentText = normalizeStringOrNull( + payload.contentText, + extractTextFromContentJson(contentJson) ?? fallbackTask.contentText + ); + + if (operation.action === "DELETE") { + const deletedAt = normalizePositiveNumber(payload.deletedAt, updatedAt); + return { + ...fallbackTask, + version, + updatedAt: deletedAt, + deletedAt + }; + } + + return { + ...fallbackTask, + userId: typeof payload.userId === "string" ? payload.userId : fallbackTask.userId, + title: + typeof payload.title === "string" && payload.title.trim().length > 0 + ? payload.title + : fallbackTask.title, + contentJson, + contentText, + priority: normalizePriority(payload.priority, fallbackTask.priority), + status: normalizeStatus(payload.status, fallbackTask.status), + ddlAt: normalizeNullableNumber(payload.ddlAt, fallbackTask.ddlAt), + version, + createdAt: normalizePositiveNumber(payload.createdAt, fallbackTask.createdAt), + updatedAt, + deletedAt: normalizeNullableNumber(payload.deletedAt, null) + }; +} + +function getOperationTieBreaker(operation: LocalSyncInboxRecord): number { + if (operation.action === "DELETE") { + return 3; + } + + if (operation.action === "UPDATE") { + return 2; + } + + return 1; +} + +function shouldApplyIncomingTask( + currentTask: LocalTaskRecord | undefined, + incomingTask: LocalTaskRecord, + operation: LocalSyncInboxRecord +): boolean { + if (!currentTask) { + return true; + } + + if (incomingTask.updatedAt > currentTask.updatedAt) { + return true; + } + + if (incomingTask.updatedAt < currentTask.updatedAt) { + return false; + } + + if (incomingTask.version > currentTask.version) { + return true; + } + + if (incomingTask.version < currentTask.version) { + return false; + } + + return getOperationTieBreaker(operation) >= (currentTask.deletedAt === null ? 1 : 3); +} + +export async function applyPendingRemoteOperations(userId: string): Promise { + const pendingOperations = await listPendingRemoteOperations(userId); + if (pendingOperations.length === 0) { + return 0; + } + + const appliedAt = Date.now(); + + await localDb.transaction("rw", localDb.tasks, localDb.syncInbox, async () => { + for (const operation of pendingOperations) { + if (operation.entityType !== "TASK") { + await localDb.syncInbox.update(operation.opId, { appliedAt }); + continue; + } + + const currentTask = await localDb.tasks.get(operation.entityId); + const incomingTask = buildIncomingTaskRecord(operation, currentTask); + + if (shouldApplyIncomingTask(currentTask, incomingTask, operation)) { + await localDb.tasks.put(incomingTask); + } + + await localDb.syncInbox.update(operation.opId, { appliedAt }); + } + }); + + return pendingOperations.length; +} diff --git a/apps/web/src/services/sync-worker.ts b/apps/web/src/services/sync-worker.ts new file mode 100644 index 0000000..e59f9f7 --- /dev/null +++ b/apps/web/src/services/sync-worker.ts @@ -0,0 +1,142 @@ +import { + enqueueRemoteSyncOperations, + getLocalSyncState, + listPendingSyncOperations, + markSyncOperationsFailed, + markSyncOperationsSucceeded, + saveLocalSyncState +} from "@/services/local-sync-repo"; +import { applyPendingRemoteOperations } from "@/services/sync-merge"; +import { + pullSyncOperations, + pushSyncOperations, + serializeSyncOperationForRequest +} from "@/services/sync-api"; +import type { LocalOpLogRecord } from "@/services/local-db"; + +const PUSH_BATCH_LIMIT = 20; +const PUSH_BATCH_MAX_BYTES = 256 * 1024; +const PULL_BATCH_LIMIT = 100; +const MAX_PULL_PAGES_PER_CYCLE = 5; + +function estimateOperationBytes(operation: LocalOpLogRecord): number { + return new TextEncoder().encode(JSON.stringify(serializeSyncOperationForRequest(operation))) + .length; +} + +function createPushBatch(operations: LocalOpLogRecord[]): LocalOpLogRecord[] { + const batch: LocalOpLogRecord[] = []; + let batchBytes = 0; + + for (const operation of operations) { + const operationBytes = estimateOperationBytes(operation); + if (batch.length > 0 && batchBytes + operationBytes > PUSH_BATCH_MAX_BYTES) { + break; + } + + batch.push(operation); + batchBytes += operationBytes; + } + + return batch; +} + +export type SyncCycleResult = { + pushedCount: number; + pulledCount: number; + appliedRemoteCount: number; + lastSyncedAt: number; + hasFailures: boolean; + failureMessage: string | null; +}; + +export async function runSyncWorkerCycle(userId: string): Promise { + const lastSyncedAt = Date.now(); + let pushedCount = 0; + let pulledCount = 0; + let appliedRemoteCount = 0; + let hasFailures = false; + let failureMessage: string | null = null; + + for (;;) { + const pendingCandidates = await listPendingSyncOperations(PUSH_BATCH_LIMIT); + if (pendingCandidates.length === 0) { + break; + } + + const pendingOperations = createPushBatch(pendingCandidates); + const pushResult = await pushSyncOperations(userId, pendingOperations); + const syncedOperationIds = pushResult.results + .filter((result) => result.status === "accepted" || result.status === "duplicate") + .map((result) => result.opId); + const failedOperations = pushResult.results + .filter((result) => result.status === "failed") + .map((result) => ({ + opId: result.opId, + errorMessage: result.reason ?? "同步失败" + })); + + await markSyncOperationsSucceeded(syncedOperationIds, lastSyncedAt); + await markSyncOperationsFailed(failedOperations); + + pushedCount += syncedOperationIds.length; + + if (failedOperations.length > 0) { + hasFailures = true; + failureMessage = failedOperations[0]?.errorMessage ?? "同步失败"; + break; + } + + if ( + pendingCandidates.length < PUSH_BATCH_LIMIT || + pendingOperations.length < pendingCandidates.length + ) { + break; + } + } + + const currentState = await getLocalSyncState(userId); + let nextCursor = currentState?.cursor ?? null; + + for (let page = 0; page < MAX_PULL_PAGES_PER_CYCLE; page += 1) { + const pullResult = await pullSyncOperations({ + userId, + cursor: nextCursor, + limit: PULL_BATCH_LIMIT + }); + + if (pullResult.items.length > 0) { + pulledCount += await enqueueRemoteSyncOperations(userId, pullResult.items); + } + + nextCursor = pullResult.nextCursor; + await saveLocalSyncState({ + userId, + cursor: nextCursor, + lastSyncedAt + }); + + if (!pullResult.hasMore) { + break; + } + } + + if (currentState === undefined && nextCursor === null) { + await saveLocalSyncState({ + userId, + cursor: null, + lastSyncedAt + }); + } + + appliedRemoteCount = await applyPendingRemoteOperations(userId); + + return { + pushedCount, + pulledCount, + appliedRemoteCount, + lastSyncedAt, + hasFailures, + failureMessage + }; +}