diff --git a/apps/api/src/app.module.ts b/apps/api/src/app.module.ts index 4f6ee3f..8c2a78f 100644 --- a/apps/api/src/app.module.ts +++ b/apps/api/src/app.module.ts @@ -3,6 +3,7 @@ import { ConfigModule } from "@nestjs/config"; import { AttachmentModule } from "./attachment/attachment.module"; import { AuthModule } from "./auth/auth.module"; import { PrismaModule } from "./prisma/prisma.module"; +import { SyncModule } from "./sync/sync.module"; import { TaskModule } from "./task/task.module"; @Module({ @@ -14,7 +15,8 @@ import { TaskModule } from "./task/task.module"; PrismaModule, AuthModule, TaskModule, - AttachmentModule + AttachmentModule, + SyncModule ] }) export class AppModule {} diff --git a/apps/api/src/sync/dto/sync-push.dto.ts b/apps/api/src/sync/dto/sync-push.dto.ts new file mode 100644 index 0000000..91b3a06 --- /dev/null +++ b/apps/api/src/sync/dto/sync-push.dto.ts @@ -0,0 +1,62 @@ +import { Type } from "class-transformer"; +import { + ArrayMaxSize, + ArrayMinSize, + IsArray, + IsEnum, + IsInt, + IsOptional, + IsString, + MaxLength, + Min, + ValidateNested +} from "class-validator"; + +export enum SyncEntityTypeDto { + TASK = "TASK" +} + +export enum SyncActionTypeDto { + CREATE = "CREATE", + UPDATE = "UPDATE", + DELETE = "DELETE" +} + +export class SyncPushOperationDto { + @IsString() + @MaxLength(64) + opId!: string; + + @IsString() + @MaxLength(64) + entityId!: string; + + @IsEnum(SyncEntityTypeDto) + entityType!: SyncEntityTypeDto; + + @IsEnum(SyncActionTypeDto) + action!: SyncActionTypeDto; + + @IsOptional() + @IsString() + @MaxLength(50000) + payload?: string; + + @Type(() => Number) + @IsInt() + @Min(0) + clientTs!: number; + + @IsString() + @MaxLength(128) + deviceId!: string; +} + +export class SyncPushDto { + @IsArray() + @ArrayMinSize(1) + @ArrayMaxSize(200) + @ValidateNested({ each: true }) + @Type(() => SyncPushOperationDto) + operations!: SyncPushOperationDto[]; +} diff --git a/apps/api/src/sync/sync.controller.ts b/apps/api/src/sync/sync.controller.ts new file mode 100644 index 0000000..58ec30c --- /dev/null +++ b/apps/api/src/sync/sync.controller.ts @@ -0,0 +1,25 @@ +import { Body, Controller, Headers, Post, UnauthorizedException } from "@nestjs/common"; +import { SyncPushDto } from "./dto/sync-push.dto"; +import { SyncPushResponse, SyncService } from "./sync.service"; + +@Controller("sync") +export class SyncController { + constructor(private readonly syncService: SyncService) {} + + @Post("push") + async pushOperations( + @Headers("x-user-id") userIdHeader: string | string[] | undefined, + @Body() body: SyncPushDto + ): Promise { + return this.syncService.pushOperations(this.resolveUserId(userIdHeader), body); + } + + private resolveUserId(userIdHeader: string | string[] | undefined): string { + const userId = Array.isArray(userIdHeader) ? userIdHeader[0] : userIdHeader; + if (!userId) { + throw new UnauthorizedException("缺少用户上下文"); + } + + return userId; + } +} diff --git a/apps/api/src/sync/sync.module.ts b/apps/api/src/sync/sync.module.ts new file mode 100644 index 0000000..65f1492 --- /dev/null +++ b/apps/api/src/sync/sync.module.ts @@ -0,0 +1,11 @@ +import { Module } from "@nestjs/common"; +import { PrismaModule } from "../prisma/prisma.module"; +import { SyncController } from "./sync.controller"; +import { SyncService } from "./sync.service"; + +@Module({ + imports: [PrismaModule], + controllers: [SyncController], + providers: [SyncService] +}) +export class SyncModule {} diff --git a/apps/api/src/sync/sync.service.ts b/apps/api/src/sync/sync.service.ts new file mode 100644 index 0000000..1dac744 --- /dev/null +++ b/apps/api/src/sync/sync.service.ts @@ -0,0 +1,149 @@ +import { Injectable } from "@nestjs/common"; +import { Prisma } from "../../generated/prisma/client"; +import { PrismaService } from "../prisma/prisma.service"; +import { SyncPushDto, SyncPushOperationDto } from "./dto/sync-push.dto"; + +export type SyncPushItemStatus = "accepted" | "duplicate" | "failed"; + +export type SyncPushItemResult = { + opId: string; + status: SyncPushItemStatus; + serverTs: string | null; + reason: string | null; +}; + +export type SyncPushResponse = { + acceptedCount: number; + duplicateCount: number; + failedCount: number; + results: SyncPushItemResult[]; +}; + +type ExistingOperationRecord = { + opId: string; + serverTs: Date; +}; + +@Injectable() +export class SyncService { + constructor(private readonly prismaService: PrismaService) {} + + async pushOperations(userId: string, body: SyncPushDto): Promise { + const existingOperations = await this.loadExistingOperations(userId, body.operations); + const results: SyncPushItemResult[] = []; + const seenOperationIds = new Set(); + const acceptedOperationServerTs = new Map(); + + for (const operation of body.operations) { + if (seenOperationIds.has(operation.opId)) { + results.push({ + opId: operation.opId, + status: "duplicate", + serverTs: acceptedOperationServerTs.get(operation.opId) ?? null, + reason: "same_batch_duplicate" + }); + continue; + } + + seenOperationIds.add(operation.opId); + + const existingOperation = existingOperations.get(operation.opId); + if (existingOperation) { + results.push({ + opId: operation.opId, + status: "duplicate", + serverTs: existingOperation.serverTs.toISOString(), + reason: "already_synced" + }); + continue; + } + + try { + const createdOperation = await this.prismaService.syncOperation.create({ + data: { + opId: operation.opId, + userId, + deviceId: operation.deviceId, + entityType: operation.entityType, + entityId: operation.entityId, + action: operation.action, + payload: operation.payload, + clientTs: new Date(operation.clientTs) + }, + select: { + opId: true, + serverTs: true + } + }); + + const serverTs = createdOperation.serverTs.toISOString(); + acceptedOperationServerTs.set(createdOperation.opId, serverTs); + results.push({ + opId: createdOperation.opId, + status: "accepted", + serverTs, + reason: null + }); + } catch (error) { + if (this.isDuplicateOpIdError(error)) { + results.push({ + opId: operation.opId, + status: "duplicate", + serverTs: null, + reason: "already_synced" + }); + continue; + } + + results.push({ + opId: operation.opId, + status: "failed", + serverTs: null, + reason: "persist_failed" + }); + } + } + + return { + acceptedCount: results.filter((item) => item.status === "accepted").length, + duplicateCount: results.filter((item) => item.status === "duplicate").length, + failedCount: results.filter((item) => item.status === "failed").length, + results + }; + } + + private async loadExistingOperations( + userId: string, + operations: SyncPushOperationDto[] + ): Promise> { + const opIds = Array.from(new Set(operations.map((operation) => operation.opId))); + + const existingOperations = (await this.prismaService.syncOperation.findMany({ + where: { + userId, + opId: { + in: opIds + } + }, + select: { + opId: true, + serverTs: true + } + })) as ExistingOperationRecord[]; + + return new Map( + existingOperations.map((operation): [string, ExistingOperationRecord] => [ + operation.opId, + operation + ]) + ); + } + + private isDuplicateOpIdError(error: unknown): boolean { + if (!(error instanceof Prisma.PrismaClientKnownRequestError)) { + return false; + } + + return error.code === "P2002"; + } +} diff --git a/apps/api/test/sync-push.spec.ts b/apps/api/test/sync-push.spec.ts new file mode 100644 index 0000000..c14976b --- /dev/null +++ b/apps/api/test/sync-push.spec.ts @@ -0,0 +1,233 @@ +import request from "supertest"; +import { INestApplication, ValidationPipe } from "@nestjs/common"; +import { Test, TestingModule } from "@nestjs/testing"; +import { PrismaService } from "../src/prisma/prisma.service"; +import { SyncController } from "../src/sync/sync.controller"; +import { SyncService } from "../src/sync/sync.service"; + +type SyncOperationRecord = { + id: string; + opId: string; + userId: string; + deviceId: string; + entityType: string; + entityId: string; + action: string; + payload?: string; + clientTs: Date; + serverTs: Date; +}; + +class InMemoryPrismaService { + private syncOperationIdSequence = 1; + private syncOperations: SyncOperationRecord[] = []; + + readonly syncOperation = { + findMany: async (args: { + where: { + userId: string; + opId: { + in: string[]; + }; + }; + select: { + opId: true; + serverTs: true; + }; + }) => { + return this.syncOperations + .filter( + (item) => item.userId === args.where.userId && args.where.opId.in.includes(item.opId) + ) + .map((item) => ({ + opId: item.opId, + serverTs: item.serverTs + })); + }, + + create: async (args: { + data: { + opId: string; + userId: string; + deviceId: string; + entityType: string; + entityId: string; + action: string; + payload?: string; + clientTs: Date; + }; + select: { + opId: true; + serverTs: true; + }; + }) => { + const createdOperation: SyncOperationRecord = { + id: `sync_${this.syncOperationIdSequence++}`, + opId: args.data.opId, + userId: args.data.userId, + deviceId: args.data.deviceId, + entityType: args.data.entityType, + entityId: args.data.entityId, + action: args.data.action, + payload: args.data.payload, + clientTs: args.data.clientTs, + serverTs: new Date() + }; + + this.syncOperations.push(createdOperation); + + return { + opId: createdOperation.opId, + serverTs: createdOperation.serverTs + }; + } + }; + + getOperationCount(): number { + return this.syncOperations.length; + } +} + +describe("SyncController (integration)", () => { + let app: INestApplication; + let prismaService: InMemoryPrismaService; + + beforeAll(async () => { + prismaService = new InMemoryPrismaService(); + + const moduleRef: TestingModule = await Test.createTestingModule({ + controllers: [SyncController], + providers: [SyncService, { provide: PrismaService, useValue: prismaService }] + }).compile(); + + app = moduleRef.createNestApplication(); + app.useGlobalPipes( + new ValidationPipe({ + transform: true, + whitelist: true, + forbidNonWhitelisted: true + }) + ); + await app.init(); + }); + + afterAll(async () => { + await app.close(); + }); + + it("should accept operations once and mark repeated push as duplicate", async () => { + const payload = { + operations: [ + { + opId: "op-create-1", + entityType: "TASK", + entityId: "task-1", + action: "CREATE", + payload: '{"title":"任务一"}', + clientTs: 1712419200000, + deviceId: "device-a" + }, + { + opId: "op-update-1", + entityType: "TASK", + entityId: "task-1", + action: "UPDATE", + payload: '{"title":"任务一-更新"}', + clientTs: 1712419201000, + deviceId: "device-a" + } + ] + }; + + const firstResponse = await request(app.getHttpServer()) + .post("/sync/push") + .set("x-user-id", "user-1") + .send(payload) + .expect(201); + + expect(firstResponse.body.acceptedCount).toBe(2); + expect(firstResponse.body.duplicateCount).toBe(0); + expect(firstResponse.body.failedCount).toBe(0); + expect(firstResponse.body.results).toEqual([ + expect.objectContaining({ + opId: "op-create-1", + status: "accepted" + }), + expect.objectContaining({ + opId: "op-update-1", + status: "accepted" + }) + ]); + expect(prismaService.getOperationCount()).toBe(2); + + const secondResponse = await request(app.getHttpServer()) + .post("/sync/push") + .set("x-user-id", "user-1") + .send(payload) + .expect(201); + + expect(secondResponse.body.acceptedCount).toBe(0); + expect(secondResponse.body.duplicateCount).toBe(2); + expect(secondResponse.body.failedCount).toBe(0); + expect(secondResponse.body.results).toEqual([ + expect.objectContaining({ + opId: "op-create-1", + status: "duplicate", + reason: "already_synced" + }), + expect.objectContaining({ + opId: "op-update-1", + status: "duplicate", + reason: "already_synced" + }) + ]); + expect(prismaService.getOperationCount()).toBe(2); + }); + + it("should mark duplicated op ids in the same batch as duplicate", async () => { + const response = await request(app.getHttpServer()) + .post("/sync/push") + .set("x-user-id", "user-2") + .send({ + operations: [ + { + opId: "op-dup-1", + entityType: "TASK", + entityId: "task-2", + action: "CREATE", + payload: '{"title":"任务二"}', + clientTs: 1712419300000, + deviceId: "device-b" + }, + { + opId: "op-dup-1", + entityType: "TASK", + entityId: "task-2", + action: "UPDATE", + payload: '{"title":"任务二-重复"}', + clientTs: 1712419301000, + deviceId: "device-b" + } + ] + }) + .expect(201); + + expect(response.body.acceptedCount).toBe(1); + expect(response.body.duplicateCount).toBe(1); + expect(response.body.failedCount).toBe(0); + expect(response.body.results[0]).toEqual( + expect.objectContaining({ + opId: "op-dup-1", + status: "accepted" + }) + ); + expect(response.body.results[1]).toEqual( + expect.objectContaining({ + opId: "op-dup-1", + status: "duplicate", + reason: "same_batch_duplicate" + }) + ); + expect(prismaService.getOperationCount()).toBe(3); + }); +});