diff --git a/apps/api/src/sync/dto/sync-pull.dto.ts b/apps/api/src/sync/dto/sync-pull.dto.ts new file mode 100644 index 0000000..c5c99fb --- /dev/null +++ b/apps/api/src/sync/dto/sync-pull.dto.ts @@ -0,0 +1,16 @@ +import { Type } from "class-transformer"; +import { IsInt, IsOptional, IsString, Max, MaxLength, Min } from "class-validator"; + +export class SyncPullQueryDto { + @IsOptional() + @IsString() + @MaxLength(512) + cursor?: string; + + @Type(() => Number) + @IsOptional() + @IsInt() + @Min(1) + @Max(200) + limit?: number; +} diff --git a/apps/api/src/sync/sync.controller.ts b/apps/api/src/sync/sync.controller.ts index 58ec30c..72ccfac 100644 --- a/apps/api/src/sync/sync.controller.ts +++ b/apps/api/src/sync/sync.controller.ts @@ -1,11 +1,20 @@ -import { Body, Controller, Headers, Post, UnauthorizedException } from "@nestjs/common"; +import { Body, Controller, Get, Headers, Post, Query, UnauthorizedException } from "@nestjs/common"; +import { SyncPullQueryDto } from "./dto/sync-pull.dto"; import { SyncPushDto } from "./dto/sync-push.dto"; -import { SyncPushResponse, SyncService } from "./sync.service"; +import { SyncPullResponse, SyncPushResponse, SyncService } from "./sync.service"; @Controller("sync") export class SyncController { constructor(private readonly syncService: SyncService) {} + @Get("pull") + async pullOperations( + @Headers("x-user-id") userIdHeader: string | string[] | undefined, + @Query() query: SyncPullQueryDto + ): Promise { + return this.syncService.pullOperations(this.resolveUserId(userIdHeader), query); + } + @Post("push") async pushOperations( @Headers("x-user-id") userIdHeader: string | string[] | undefined, diff --git a/apps/api/src/sync/sync.service.ts b/apps/api/src/sync/sync.service.ts index 1dac744..9bab5e2 100644 --- a/apps/api/src/sync/sync.service.ts +++ b/apps/api/src/sync/sync.service.ts @@ -1,6 +1,7 @@ -import { Injectable } from "@nestjs/common"; +import { BadRequestException, Injectable } from "@nestjs/common"; import { Prisma } from "../../generated/prisma/client"; import { PrismaService } from "../prisma/prisma.service"; +import { SyncPullQueryDto } from "./dto/sync-pull.dto"; import { SyncPushDto, SyncPushOperationDto } from "./dto/sync-push.dto"; export type SyncPushItemStatus = "accepted" | "duplicate" | "failed"; @@ -24,10 +25,79 @@ type ExistingOperationRecord = { serverTs: Date; }; +type SyncPullCursorState = { + serverTs: string; + opId: string; +}; + +type SyncPullOperationRecord = { + opId: string; + entityId: string; + entityType: string; + action: string; + payload: Prisma.JsonValue | null; + clientTs: Date; + deviceId: string; + serverTs: Date; +}; + +export type SyncPullItem = { + opId: string; + entityId: string; + entityType: string; + action: string; + payload: string | null; + clientTs: number; + deviceId: string; + serverTs: string; +}; + +export type SyncPullResponse = { + items: SyncPullItem[]; + nextCursor: string | null; + hasMore: boolean; +}; + @Injectable() export class SyncService { constructor(private readonly prismaService: PrismaService) {} + async pullOperations(userId: string, query: SyncPullQueryDto): Promise { + const limit = query.limit ?? 100; + const cursor = this.parseCursor(query.cursor); + + const operations = (await this.prismaService.syncOperation.findMany({ + where: this.buildPullWhereInput(userId, cursor), + orderBy: [{ serverTs: "asc" }, { opId: "asc" }], + take: limit + 1, + select: { + opId: true, + entityId: true, + entityType: true, + action: true, + payload: true, + clientTs: true, + deviceId: true, + serverTs: true + } + })) as SyncPullOperationRecord[]; + + const hasMore = operations.length > limit; + const pageItems = hasMore ? operations.slice(0, limit) : operations; + const lastOperation = pageItems.at(-1); + + return { + items: pageItems.map((operation) => this.serializePullItem(operation)), + nextCursor: lastOperation + ? this.encodeCursor({ + serverTs: lastOperation.serverTs.toISOString(), + opId: lastOperation.opId + }) + : (query.cursor ?? null), + hasMore + }; + } + async pushOperations(userId: string, body: SyncPushDto): Promise { const existingOperations = await this.loadExistingOperations(userId, body.operations); const results: SyncPushItemResult[] = []; @@ -139,6 +209,100 @@ export class SyncService { ); } + private buildPullWhereInput( + userId: string, + cursor: SyncPullCursorState | null + ): Prisma.SyncOperationWhereInput { + if (!cursor) { + return { userId }; + } + + const cursorDate = new Date(cursor.serverTs); + + return { + userId, + // 同一毫秒内可能有多条操作,必须使用 opId 作为二级游标来保证稳定分页。 + OR: [ + { + serverTs: { + gt: cursorDate + } + }, + { + serverTs: cursorDate, + opId: { + gt: cursor.opId + } + } + ] + }; + } + + private serializePullItem(operation: SyncPullOperationRecord): SyncPullItem { + return { + opId: operation.opId, + entityId: operation.entityId, + entityType: operation.entityType, + action: operation.action, + payload: this.serializePayload(operation.payload), + clientTs: operation.clientTs.getTime(), + deviceId: operation.deviceId, + serverTs: operation.serverTs.toISOString() + }; + } + + private serializePayload(payload: Prisma.JsonValue | null): string | null { + if (payload === null) { + return null; + } + + if (typeof payload === "string") { + return payload; + } + + return JSON.stringify(payload); + } + + private parseCursor(cursor: string | undefined): SyncPullCursorState | null { + if (!cursor) { + return null; + } + + let decodedCursor: unknown; + try { + decodedCursor = JSON.parse(Buffer.from(cursor, "base64url").toString("utf8")); + } catch { + throw new BadRequestException("Invalid sync cursor"); + } + + if (typeof decodedCursor !== "object" || decodedCursor === null) { + throw new BadRequestException("Invalid sync cursor"); + } + + const cursorRecord = decodedCursor as { + serverTs?: unknown; + opId?: unknown; + }; + + if ( + typeof cursorRecord.serverTs !== "string" || + typeof cursorRecord.opId !== "string" || + Number.isNaN(Date.parse(cursorRecord.serverTs)) || + cursorRecord.opId.trim().length === 0 + ) { + throw new BadRequestException("Invalid sync cursor"); + } + + return { + serverTs: cursorRecord.serverTs, + opId: cursorRecord.opId + }; + } + + private encodeCursor(cursor: SyncPullCursorState): string { + return Buffer.from(JSON.stringify(cursor), "utf8").toString("base64url"); + } + private isDuplicateOpIdError(error: unknown): boolean { if (!(error instanceof Prisma.PrismaClientKnownRequestError)) { return false; diff --git a/apps/api/test/sync-push.spec.ts b/apps/api/test/sync-push.spec.ts index c14976b..dfbacba 100644 --- a/apps/api/test/sync-push.spec.ts +++ b/apps/api/test/sync-push.spec.ts @@ -13,54 +13,126 @@ type SyncOperationRecord = { entityType: string; entityId: string; action: string; - payload?: string; + payload: string | null; clientTs: Date; serverTs: Date; }; +type SyncOperationSelect = { + opId?: true; + entityId?: true; + entityType?: true; + action?: true; + payload?: true; + clientTs?: true; + deviceId?: true; + serverTs?: true; +}; + +type SyncOperationFindManyArgs = { + where: { + userId: string; + opId?: { + in: string[]; + }; + OR?: Array< + | { + serverTs: { + gt: Date; + }; + } + | { + serverTs: Date; + opId: { + gt: string; + }; + } + >; + }; + select: SyncOperationSelect; + orderBy?: Array<{ + serverTs?: "asc" | "desc"; + opId?: "asc" | "desc"; + }>; + take?: number; +}; + +type SyncOperationCreateArgs = { + data: { + opId: string; + userId: string; + deviceId: string; + entityType: string; + entityId: string; + action: string; + payload?: string; + clientTs: Date; + }; + select: { + opId: true; + serverTs: true; + }; +}; + class InMemoryPrismaService { private syncOperationIdSequence = 1; private syncOperations: SyncOperationRecord[] = []; readonly syncOperation = { - findMany: async (args: { - where: { - userId: string; - opId: { - in: string[]; - }; - }; - select: { - opId: true; - serverTs: true; - }; - }) => { - return this.syncOperations - .filter( - (item) => item.userId === args.where.userId && args.where.opId.in.includes(item.opId) - ) - .map((item) => ({ - opId: item.opId, - serverTs: item.serverTs - })); + findMany: async (args: SyncOperationFindManyArgs) => { + let items = this.syncOperations.filter((item) => item.userId === args.where.userId); + + if (args.where.opId?.in) { + items = items.filter((item) => args.where.opId?.in.includes(item.opId)); + } + + if (args.where.OR && args.where.OR.length > 0) { + items = items.filter((item) => + args.where.OR?.some((condition) => { + if ("gt" in condition.serverTs) { + return item.serverTs.getTime() > condition.serverTs.gt.getTime(); + } + + if ("opId" in condition) { + return ( + item.serverTs.getTime() === condition.serverTs.getTime() && + item.opId > condition.opId.gt + ); + } + + return false; + }) + ); + } + + if (args.orderBy && args.orderBy.length > 0) { + items = [...items].sort((left, right) => { + for (const orderRule of args.orderBy ?? []) { + if (orderRule.serverTs) { + const diff = left.serverTs.getTime() - right.serverTs.getTime(); + if (diff !== 0) { + return orderRule.serverTs === "asc" ? diff : -diff; + } + } + + if (orderRule.opId) { + const diff = left.opId.localeCompare(right.opId); + if (diff !== 0) { + return orderRule.opId === "asc" ? diff : -diff; + } + } + } + + return 0; + }); + } + + const limitedItems = args.take ? items.slice(0, args.take) : items; + + return limitedItems.map((item) => this.pickSelectedFields(item, args.select)); }, - create: async (args: { - data: { - opId: string; - userId: string; - deviceId: string; - entityType: string; - entityId: string; - action: string; - payload?: string; - clientTs: Date; - }; - select: { - opId: true; - serverTs: true; - }; - }) => { + create: async (args: SyncOperationCreateArgs) => { const createdOperation: SyncOperationRecord = { id: `sync_${this.syncOperationIdSequence++}`, opId: args.data.opId, @@ -69,7 +141,7 @@ class InMemoryPrismaService { entityType: args.data.entityType, entityId: args.data.entityId, action: args.data.action, - payload: args.data.payload, + payload: args.data.payload ?? null, clientTs: args.data.clientTs, serverTs: new Date() }; @@ -86,6 +158,33 @@ class InMemoryPrismaService { getOperationCount(): number { return this.syncOperations.length; } + + seedOperations(records: Array>): void { + for (const record of records) { + this.syncOperations.push({ + ...record, + id: `sync_${this.syncOperationIdSequence++}` + }); + } + } + + private pickSelectedFields( + item: SyncOperationRecord, + select: SyncOperationSelect + ): Partial { + const result: Record = {}; + + for (const key of Object.keys(select) as Array) { + if (!select[key]) { + continue; + } + + const recordKey = key as keyof SyncOperationRecord; + result[recordKey] = item[recordKey]; + } + + return result as Partial; + } } describe("SyncController (integration)", () => { @@ -230,4 +329,91 @@ describe("SyncController (integration)", () => { ); expect(prismaService.getOperationCount()).toBe(3); }); + + it("should pull operations incrementally with a stable cursor", async () => { + prismaService.seedOperations([ + { + opId: "pull-op-1", + userId: "user-pull", + deviceId: "device-c", + entityType: "TASK", + entityId: "task-10", + action: "CREATE", + payload: '{"title":"任务甲"}', + clientTs: new Date("2026-04-06T10:00:00.000Z"), + serverTs: new Date("2026-04-06T10:10:00.000Z") + }, + { + opId: "pull-op-2", + userId: "user-pull", + deviceId: "device-c", + entityType: "TASK", + entityId: "task-10", + action: "UPDATE", + payload: '{"title":"任务甲-更新"}', + clientTs: new Date("2026-04-06T10:01:00.000Z"), + serverTs: new Date("2026-04-06T10:10:00.000Z") + }, + { + opId: "pull-op-3", + userId: "user-pull", + deviceId: "device-c", + entityType: "TASK", + entityId: "task-11", + action: "CREATE", + payload: '{"title":"任务乙"}', + clientTs: new Date("2026-04-06T10:02:00.000Z"), + serverTs: new Date("2026-04-06T10:11:00.000Z") + }, + { + opId: "pull-op-other-user", + userId: "user-other", + deviceId: "device-z", + entityType: "TASK", + entityId: "task-99", + action: "CREATE", + payload: '{"title":"其他用户任务"}', + clientTs: new Date("2026-04-06T10:03:00.000Z"), + serverTs: new Date("2026-04-06T10:12:00.000Z") + } + ]); + + const firstResponse = await request(app.getHttpServer()) + .get("/sync/pull") + .set("x-user-id", "user-pull") + .query({ limit: 2 }) + .expect(200); + + expect(firstResponse.body.items.map((item: { opId: string }) => item.opId)).toEqual([ + "pull-op-1", + "pull-op-2" + ]); + expect(firstResponse.body.hasMore).toBe(true); + expect(firstResponse.body.nextCursor).toEqual(expect.any(String)); + + const secondResponse = await request(app.getHttpServer()) + .get("/sync/pull") + .set("x-user-id", "user-pull") + .query({ + limit: 2, + cursor: firstResponse.body.nextCursor + }) + .expect(200); + + expect(secondResponse.body.items.map((item: { opId: string }) => item.opId)).toEqual([ + "pull-op-3" + ]); + expect(secondResponse.body.hasMore).toBe(false); + expect(secondResponse.body.nextCursor).toEqual(expect.any(String)); + }); + + it("should reject invalid cursor payload", async () => { + await request(app.getHttpServer()) + .get("/sync/pull") + .set("x-user-id", "user-invalid-cursor") + .query({ + cursor: "not-a-valid-cursor" + }) + .expect(400); + }); });