feat(api-sync): implement sync push endpoint with idempotency

This commit is contained in:
2026-04-06 00:53:36 +08:00
parent de1db459c2
commit ecf0d9ff03
6 changed files with 483 additions and 1 deletions
+3 -1
View File
@@ -3,6 +3,7 @@ import { ConfigModule } from "@nestjs/config";
import { AttachmentModule } from "./attachment/attachment.module";
import { AuthModule } from "./auth/auth.module";
import { PrismaModule } from "./prisma/prisma.module";
import { SyncModule } from "./sync/sync.module";
import { TaskModule } from "./task/task.module";
@Module({
@@ -14,7 +15,8 @@ import { TaskModule } from "./task/task.module";
PrismaModule,
AuthModule,
TaskModule,
AttachmentModule
AttachmentModule,
SyncModule
]
})
export class AppModule {}
+62
View File
@@ -0,0 +1,62 @@
import { Type } from "class-transformer";
import {
ArrayMaxSize,
ArrayMinSize,
IsArray,
IsEnum,
IsInt,
IsOptional,
IsString,
MaxLength,
Min,
ValidateNested
} from "class-validator";
export enum SyncEntityTypeDto {
TASK = "TASK"
}
export enum SyncActionTypeDto {
CREATE = "CREATE",
UPDATE = "UPDATE",
DELETE = "DELETE"
}
export class SyncPushOperationDto {
@IsString()
@MaxLength(64)
opId!: string;
@IsString()
@MaxLength(64)
entityId!: string;
@IsEnum(SyncEntityTypeDto)
entityType!: SyncEntityTypeDto;
@IsEnum(SyncActionTypeDto)
action!: SyncActionTypeDto;
@IsOptional()
@IsString()
@MaxLength(50000)
payload?: string;
@Type(() => Number)
@IsInt()
@Min(0)
clientTs!: number;
@IsString()
@MaxLength(128)
deviceId!: string;
}
export class SyncPushDto {
@IsArray()
@ArrayMinSize(1)
@ArrayMaxSize(200)
@ValidateNested({ each: true })
@Type(() => SyncPushOperationDto)
operations!: SyncPushOperationDto[];
}
+25
View File
@@ -0,0 +1,25 @@
import { Body, Controller, Headers, Post, UnauthorizedException } from "@nestjs/common";
import { SyncPushDto } from "./dto/sync-push.dto";
import { SyncPushResponse, SyncService } from "./sync.service";
@Controller("sync")
export class SyncController {
constructor(private readonly syncService: SyncService) {}
@Post("push")
async pushOperations(
@Headers("x-user-id") userIdHeader: string | string[] | undefined,
@Body() body: SyncPushDto
): Promise<SyncPushResponse> {
return this.syncService.pushOperations(this.resolveUserId(userIdHeader), body);
}
private resolveUserId(userIdHeader: string | string[] | undefined): string {
const userId = Array.isArray(userIdHeader) ? userIdHeader[0] : userIdHeader;
if (!userId) {
throw new UnauthorizedException("缺少用户上下文");
}
return userId;
}
}
+11
View File
@@ -0,0 +1,11 @@
import { Module } from "@nestjs/common";
import { PrismaModule } from "../prisma/prisma.module";
import { SyncController } from "./sync.controller";
import { SyncService } from "./sync.service";
@Module({
imports: [PrismaModule],
controllers: [SyncController],
providers: [SyncService]
})
export class SyncModule {}
+149
View File
@@ -0,0 +1,149 @@
import { Injectable } from "@nestjs/common";
import { Prisma } from "../../generated/prisma/client";
import { PrismaService } from "../prisma/prisma.service";
import { SyncPushDto, SyncPushOperationDto } from "./dto/sync-push.dto";
export type SyncPushItemStatus = "accepted" | "duplicate" | "failed";
export type SyncPushItemResult = {
opId: string;
status: SyncPushItemStatus;
serverTs: string | null;
reason: string | null;
};
export type SyncPushResponse = {
acceptedCount: number;
duplicateCount: number;
failedCount: number;
results: SyncPushItemResult[];
};
type ExistingOperationRecord = {
opId: string;
serverTs: Date;
};
@Injectable()
export class SyncService {
constructor(private readonly prismaService: PrismaService) {}
async pushOperations(userId: string, body: SyncPushDto): Promise<SyncPushResponse> {
const existingOperations = await this.loadExistingOperations(userId, body.operations);
const results: SyncPushItemResult[] = [];
const seenOperationIds = new Set<string>();
const acceptedOperationServerTs = new Map<string, string>();
for (const operation of body.operations) {
if (seenOperationIds.has(operation.opId)) {
results.push({
opId: operation.opId,
status: "duplicate",
serverTs: acceptedOperationServerTs.get(operation.opId) ?? null,
reason: "same_batch_duplicate"
});
continue;
}
seenOperationIds.add(operation.opId);
const existingOperation = existingOperations.get(operation.opId);
if (existingOperation) {
results.push({
opId: operation.opId,
status: "duplicate",
serverTs: existingOperation.serverTs.toISOString(),
reason: "already_synced"
});
continue;
}
try {
const createdOperation = await this.prismaService.syncOperation.create({
data: {
opId: operation.opId,
userId,
deviceId: operation.deviceId,
entityType: operation.entityType,
entityId: operation.entityId,
action: operation.action,
payload: operation.payload,
clientTs: new Date(operation.clientTs)
},
select: {
opId: true,
serverTs: true
}
});
const serverTs = createdOperation.serverTs.toISOString();
acceptedOperationServerTs.set(createdOperation.opId, serverTs);
results.push({
opId: createdOperation.opId,
status: "accepted",
serverTs,
reason: null
});
} catch (error) {
if (this.isDuplicateOpIdError(error)) {
results.push({
opId: operation.opId,
status: "duplicate",
serverTs: null,
reason: "already_synced"
});
continue;
}
results.push({
opId: operation.opId,
status: "failed",
serverTs: null,
reason: "persist_failed"
});
}
}
return {
acceptedCount: results.filter((item) => item.status === "accepted").length,
duplicateCount: results.filter((item) => item.status === "duplicate").length,
failedCount: results.filter((item) => item.status === "failed").length,
results
};
}
private async loadExistingOperations(
userId: string,
operations: SyncPushOperationDto[]
): Promise<Map<string, ExistingOperationRecord>> {
const opIds = Array.from(new Set(operations.map((operation) => operation.opId)));
const existingOperations = (await this.prismaService.syncOperation.findMany({
where: {
userId,
opId: {
in: opIds
}
},
select: {
opId: true,
serverTs: true
}
})) as ExistingOperationRecord[];
return new Map(
existingOperations.map((operation): [string, ExistingOperationRecord] => [
operation.opId,
operation
])
);
}
private isDuplicateOpIdError(error: unknown): boolean {
if (!(error instanceof Prisma.PrismaClientKnownRequestError)) {
return false;
}
return error.code === "P2002";
}
}
+233
View File
@@ -0,0 +1,233 @@
import request from "supertest";
import { INestApplication, ValidationPipe } from "@nestjs/common";
import { Test, TestingModule } from "@nestjs/testing";
import { PrismaService } from "../src/prisma/prisma.service";
import { SyncController } from "../src/sync/sync.controller";
import { SyncService } from "../src/sync/sync.service";
type SyncOperationRecord = {
id: string;
opId: string;
userId: string;
deviceId: string;
entityType: string;
entityId: string;
action: string;
payload?: string;
clientTs: Date;
serverTs: Date;
};
class InMemoryPrismaService {
private syncOperationIdSequence = 1;
private syncOperations: SyncOperationRecord[] = [];
readonly syncOperation = {
findMany: async (args: {
where: {
userId: string;
opId: {
in: string[];
};
};
select: {
opId: true;
serverTs: true;
};
}) => {
return this.syncOperations
.filter(
(item) => item.userId === args.where.userId && args.where.opId.in.includes(item.opId)
)
.map((item) => ({
opId: item.opId,
serverTs: item.serverTs
}));
},
create: async (args: {
data: {
opId: string;
userId: string;
deviceId: string;
entityType: string;
entityId: string;
action: string;
payload?: string;
clientTs: Date;
};
select: {
opId: true;
serverTs: true;
};
}) => {
const createdOperation: SyncOperationRecord = {
id: `sync_${this.syncOperationIdSequence++}`,
opId: args.data.opId,
userId: args.data.userId,
deviceId: args.data.deviceId,
entityType: args.data.entityType,
entityId: args.data.entityId,
action: args.data.action,
payload: args.data.payload,
clientTs: args.data.clientTs,
serverTs: new Date()
};
this.syncOperations.push(createdOperation);
return {
opId: createdOperation.opId,
serverTs: createdOperation.serverTs
};
}
};
getOperationCount(): number {
return this.syncOperations.length;
}
}
describe("SyncController (integration)", () => {
let app: INestApplication;
let prismaService: InMemoryPrismaService;
beforeAll(async () => {
prismaService = new InMemoryPrismaService();
const moduleRef: TestingModule = await Test.createTestingModule({
controllers: [SyncController],
providers: [SyncService, { provide: PrismaService, useValue: prismaService }]
}).compile();
app = moduleRef.createNestApplication();
app.useGlobalPipes(
new ValidationPipe({
transform: true,
whitelist: true,
forbidNonWhitelisted: true
})
);
await app.init();
});
afterAll(async () => {
await app.close();
});
it("should accept operations once and mark repeated push as duplicate", async () => {
const payload = {
operations: [
{
opId: "op-create-1",
entityType: "TASK",
entityId: "task-1",
action: "CREATE",
payload: '{"title":"任务一"}',
clientTs: 1712419200000,
deviceId: "device-a"
},
{
opId: "op-update-1",
entityType: "TASK",
entityId: "task-1",
action: "UPDATE",
payload: '{"title":"任务一-更新"}',
clientTs: 1712419201000,
deviceId: "device-a"
}
]
};
const firstResponse = await request(app.getHttpServer())
.post("/sync/push")
.set("x-user-id", "user-1")
.send(payload)
.expect(201);
expect(firstResponse.body.acceptedCount).toBe(2);
expect(firstResponse.body.duplicateCount).toBe(0);
expect(firstResponse.body.failedCount).toBe(0);
expect(firstResponse.body.results).toEqual([
expect.objectContaining({
opId: "op-create-1",
status: "accepted"
}),
expect.objectContaining({
opId: "op-update-1",
status: "accepted"
})
]);
expect(prismaService.getOperationCount()).toBe(2);
const secondResponse = await request(app.getHttpServer())
.post("/sync/push")
.set("x-user-id", "user-1")
.send(payload)
.expect(201);
expect(secondResponse.body.acceptedCount).toBe(0);
expect(secondResponse.body.duplicateCount).toBe(2);
expect(secondResponse.body.failedCount).toBe(0);
expect(secondResponse.body.results).toEqual([
expect.objectContaining({
opId: "op-create-1",
status: "duplicate",
reason: "already_synced"
}),
expect.objectContaining({
opId: "op-update-1",
status: "duplicate",
reason: "already_synced"
})
]);
expect(prismaService.getOperationCount()).toBe(2);
});
it("should mark duplicated op ids in the same batch as duplicate", async () => {
const response = await request(app.getHttpServer())
.post("/sync/push")
.set("x-user-id", "user-2")
.send({
operations: [
{
opId: "op-dup-1",
entityType: "TASK",
entityId: "task-2",
action: "CREATE",
payload: '{"title":"任务二"}',
clientTs: 1712419300000,
deviceId: "device-b"
},
{
opId: "op-dup-1",
entityType: "TASK",
entityId: "task-2",
action: "UPDATE",
payload: '{"title":"任务二-重复"}',
clientTs: 1712419301000,
deviceId: "device-b"
}
]
})
.expect(201);
expect(response.body.acceptedCount).toBe(1);
expect(response.body.duplicateCount).toBe(1);
expect(response.body.failedCount).toBe(0);
expect(response.body.results[0]).toEqual(
expect.objectContaining({
opId: "op-dup-1",
status: "accepted"
})
);
expect(response.body.results[1]).toEqual(
expect.objectContaining({
opId: "op-dup-1",
status: "duplicate",
reason: "same_batch_duplicate"
})
);
expect(prismaService.getOperationCount()).toBe(3);
});
});